大数据实时流处理零数据丢失

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 大数据实时流处理零数据丢失 1.整体流程: a)kafka:作为流处理程序的生产者 b)sparkStreaming:作为消费者,设置合理batch c)DB:输出到redis/ES 2.存在问题: 雪崩效应: kill 出现,导致的数据丢失 sparkStreaming程序挂掉了,到知道的数据丢失 解决: 1.使用checkpoint。

大数据实时流处理零数据丢失

1.整体流程:

    a)kafka:作为流处理程序的生产者
    b)sparkStreaming:作为消费者,设置合理batch
    c)DB:输出到redis/ES

2.存在问题:

雪崩效应: kill 出现,导致的数据丢失
sparkStreaming程序挂掉了,到知道的数据丢失

解决:
    1.使用checkpoint。维护太麻烦,流程序修改后需要删除checkpoint下的数据才可以,但是这回导致数据丢失或者重复。
    2.官方建议:同时启动新旧两个流程序,然后认为关掉旧的,并记录偏移量,然后新的流程序从指定的偏移量消费。

    3.自己保存偏移量到外部设备,每次启动流程序先读取外设,然后判断,offset的读取方式是earliest 还是指定偏移量

    4.配置kafak限速参数,以及流程序的序列化方式。

    5.关于偏移量的存储,可以存mysql,redis,HBASE,kafka,zk。。。


    6.关于数据重复,为了达到exactly once, 需要跟实际的业务结合。例如最后的结果如果是写入到HBase,可以将偏移量作为HBase对应的业务表中的一个列,实现事务性,幂等。

代码:

1.pom文件:

     <properties>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.3.1</spark.version>
        <kafka.version>0.10.0.0</kafka.version>
    </properties>

    <repositories>
        <repository>
            <id>scala-tools.org</id>
            <name>Scala-Tools Maven2 Repository</name>
            <url>http://scala-tools.org/repo-releases</url>
        </repository>

        <repository>
            <id>scalikeJDBC</id>
            <name>scalikeJDBC</name>
            <url>https://mvnrepository.com/artifact/org.scalikejdbc/scalikejdbc</url>
        </repository>
    </repositories>

    <pluginRepositories>
        <pluginRepository>
            <id>scala-tools.org</id>
            <name>Scala-Tools Maven2 Repository</name>
            <url>http://scala-tools.org/repo-releases</url>
        </pluginRepository>
    </pluginRepositories>

    <dependencies>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!--sparksql -kafka-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>${kafka.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>

        <!--json解析-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.36</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.40</version>
        </dependency>

        <!--redis-->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>

        <!--config-->
        <dependency>
            <groupId>com.typesafe</groupId>
            <artifactId>config</artifactId>
            <version>1.3.1</version>
        </dependency>

        <!--RDBMS访问-->
        <dependency>
            <groupId>org.scalikejdbc</groupId>
            <artifactId>scalikejdbc_2.11</artifactId>
            <version>2.5.0</version>
        </dependency>
        <dependency>
            <groupId>org.scalikejdbc</groupId>
            <artifactId>scalikejdbc-core_2.11</artifactId>
            <version>2.5.0</version>
        </dependency>
        <dependency>
            <groupId>org.scalikejdbc</groupId>
            <artifactId>scalikejdbc-config_2.11</artifactId>
            <version>2.5.0</version>
        </dependency>


        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

2.kafka topic的创建:

    a) bin/kafka-topics.sh --create --zookeeper pj-01:2181 --replication-factor 1 --partitions 1 --topic saprkProcess
    b) 修改kafka的分区数目:提高spark的并行度,提升消息的处理吞吐量。
    bin/kafka-topics.sh --alter --zookeeper pj-01:2181 --topic saprkProcess --partitions 3

    c) note:在0.8版本,修改完kafka 对应的topic的partition的后,流程序是侦测不到,导致数据丢失。其实,在kafka的logs下面数据已经有了,所以需要在代码中判断是否有新的分区,并获取到新的分区,然后将其偏移量初始化为0.
        但是在0.10版本后,就不存在这个问题了。

3.模拟kafka生产者:

    package Producer;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;

    import java.util.Properties;
    import java.util.Random;
    import java.util.UUID;


    public class KafkaProducerApp {
        public static void main(String[] args) throws InterruptedException {
            final Properties props = new Properties();
            //Assign topicName to string variable
            String topics = "saprkProcess";
            // create instance for properties to access producer configs
            //Assign localhost id
            props.put("bootstrap.servers", "pj-01:9092");
            //Set acknowledgements for producer requests.
            props.put("acks", "all");
            //If the request fails, the producer can automatically retry,
            props.put("retries", 0);
            //Specify buffer size in config
            props.put("batch.size", 16384);
            //Reduce the no of requests less than 0
            props.put("linger.ms", 1);
            //The buffer.memory controls the total amount of memory available to the producer for buffering.
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
            String value[] = {"alibaba","baidu","tencent","facebook","amazon","apple","google","linkedin","twitter","ant","ICBC","Lakers","cleveland"};
            int i = value.length;
            final Random random = new Random();
            int ret = 100;
            while (ret>=0) {
                Thread.sleep(10);
                final ProducerRecord<String, String> msg = new ProducerRecord<String, String>(
                        topics
    //                    , (new Random()).nextInt(3)
                        ,0
                        ,System.currentTimeMillis()
                        , UUID.randomUUID().toString().substring(6, 14).replace("-", "")
                        , value[random.nextInt(i)]
                );
                producer.send(msg);
                System.out.println("msg = " + "alibaba -A: "+msg);
                ret --;
            }
        }
    }

4.创建:mysql 偏移量表:

  a)这里MySQL相关操作使用的scalikeJDBC,这个是针对scala的一款db库,操作api简单明了,默认数据库配置通过读取resources下的(applicaiton.conf/application.json,applicaiton.properties)文件。

  b)更新偏移量的api使用replace into:
    note:如果没有设置主键,默认replace操作就是新增,不会与表中已存在的数据进行逻辑比对。
        01:所以下面建表语句中:使用topic,groupid,partition作为联合主键,来确定一条记录。
        02:因为我们的流程序一经启动:一般就是7*24,所以表名暂定为streaming_offset_24。这个自定义程度很高。
    建表语句:

  create table streaming_offset_24(topic varchar(50),groupid varchar(20), partitions int, offset bigint, primary  key(topic,groupid,partitions));

5.流程序: spark streaming 处理demo:

    package pjnet
    import org.apache.kafka.common.TopicPartition
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.SparkConf
    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.kafka010._
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import scalikejdbc.config.DBs
    import scalikejdbc.{DB, _}


    object StreamingProApp {


      Logger.getLogger("org.apache").setLevel(Level.WARN)  //设置日志显示

      def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("StreamingProApp")
          //指定每秒钟每个分区kafka拉取消息的速率
          .set("spark.streaming.kafka.maxRatePerPartition", "100")
          // 修改序列化为Kyro,减少shuffle量
          .set("spark.serilizer", "org.apache.spark.serializer.KryoSerializer")
          // 开启rdd的压缩
          .set("spark.rdd.compress", "true")
    //    设置批次的处理时间
        val ssc = new StreamingContext(conf, Seconds(10))

        //一参数设置
        val groupId = "1" //也可以通过读取配置文件,使用typesafe的 ConfigFactory读取

        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> "pj-01:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> groupId,
          "auto.offset.reset" -> "earliest",
          "enable.auto.commit" -> (false: java.lang.Boolean) //自己维护偏移量。连接kafka的集群。
        )
        val topics = Array("saprkProcess")

        //二参数设置
        DBs.setup()
        val fromdbOffset: Map[TopicPartition, Long] =
          DB.readOnly { implicit session =>
            SQL(s"select * from `streaming_offset_24` where groupId = '${groupId}'")
              .map(rs => (new TopicPartition(rs.string("topic"), rs.int("partitions")), rs.long("offset")))
              .list().apply()
          }.toMap

        //程序启动,拉取kafka的消息。
        val stream = if (fromdbOffset.size == 0) {
          KafkaUtils.createDirectStream[String, String](
            ssc,
            LocationStrategies.PreferConsistent,
            ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
          )
        } else {
          KafkaUtils.createDirectStream(
            ssc,
            LocationStrategies.PreferConsistent,
            ConsumerStrategies.Assign[String, String](fromdbOffset.keys, kafkaParams, fromdbOffset)
          )
        }


        stream.foreachRDD({
          rdd =>
            val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
            //数据处理
            val resout: RDD[(String, Int)] = rdd.flatMap(_.value().split(" ")).map((_, 1)).reduceByKey(_ + _)

            //可以将结果保存至redis,hbase等db
            resout.foreach(println)
    //        resout.foreachPartition({
    ////          it =>
    ////            val jedis = RedisUtils.getJedis
    ////            it.foreach({
    ////              va =>
    ////                jedis.hincrBy(field,val1,val2)
    ////            })
    ////            jedis.close()
    //
    //        })

            //偏移量存入mysql,使用scalikejdbc框架,下面两种方式都可以。
//note: localTx is transactional, if metric update or offset update fails, neither will be committed DB.localTx { implicit session => for (or <- offsetRanges) { SQL("replace into `streaming_offset_24`(topic,groupId,partitions,offset) values(?,?,?,?)") .bind(or.topic,groupId, or.partition, or.untilOffset).update().apply() } } // offsetRanges.foreach(osr => { // DB.autoCommit{ implicit session => // sql"REPLACE INTO streaming_offset_24(topic, groupid, partitions, offset) VALUES(?,?,?,?)" // .bind(osr.topic, groupId, osr.partition, osr.untilOffset).update().apply() // } // }) }) stream.count().print() ssc.start() ssc.awaitTermination() } }
相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
5月前
|
监控 大数据 Java
使用Apache Flink进行大数据实时流处理
Apache Flink是开源流处理框架,擅长低延迟、高吞吐量实时数据流处理。本文深入解析Flink的核心概念、架构(包括客户端、作业管理器、任务管理器和数据源/接收器)和事件时间、窗口、状态管理等特性。通过实战代码展示Flink在词频统计中的应用,讨论其实战挑战与优化。Flink作为大数据处理的关键组件,将持续影响实时处理领域。
881 5
|
3月前
|
消息中间件 数据挖掘 Kafka
揭秘大数据时代的极速王者!Flink:颠覆性流处理引擎,让实时数据分析燃爆你的想象力!
【8月更文挑战第29天】Apache Flink 是一个高性能的分布式流处理框架,适用于高吞吐量和低延迟的实时数据处理。它采用统一执行引擎处理有界和无界数据流,具备精确状态管理和灵活窗口操作等特性。Flink 支持毫秒级处理和广泛生态集成,但学习曲线较陡峭,社区相对较小。通过实时日志分析示例,我们展示了如何利用 Flink 从 Kafka 中读取数据并进行词频统计,体现了其强大功能和灵活性。
75 0
|
3月前
|
消息中间件 大数据 Kafka
Apache Flink 大揭秘:征服大数据实时流处理的神奇魔法,等你来解锁!
【8月更文挑战第5天】Apache Flink 是一款强大的开源大数据处理框架,专长于实时流处理。本教程通过两个示例引导你入门:一是计算数据流中元素的平均值;二是从 Kafka 中读取数据并实时处理。首先确保已安装配置好 Flink 和 Kafka 环境。第一个 Java 示例展示了如何创建流执行环境,生成数据流,利用 `flatMap` 转换数据,并使用 `keyBy` 和 `sum` 计算平均值。第二个示例则演示了如何设置 Kafka 消费者属性,并从 Kafka 主题读取数据。这两个示例为你提供了使用 Flink 进行实时流处理的基础。随着进一步学习,你将能应对更复杂的实时数据挑战。
77 0
|
6月前
|
消息中间件 存储 监控
[AIGC 大数据基础] 大数据流处理 Kafka
[AIGC 大数据基础] 大数据流处理 Kafka
|
6月前
|
SQL 大数据 数据处理
[AIGC大数据基础] Flink: 大数据流处理的未来
[AIGC大数据基础] Flink: 大数据流处理的未来
|
6月前
|
大数据
大数据重点技术----数据丢失还原
大数据重点技术----数据丢失还原
67 0
|
存储 分布式计算 大数据
大数据处理平台的架构演进:从批处理到实时流处理
大数据处理平台的架构演进:从批处理到实时流处理
336 0
|
存储 大数据 API
大数据Flink流处理相关概念
大数据Flink流处理相关概念
90 0
|
消息中间件 存储 SQL
大数据流处理平台的技术选型参考
大数据流处理平台的技术选型参考
大数据流处理平台的技术选型参考
下一篇
无影云桌面