Spark Streaming kafka实现数据零丢失的几种方式

简介:

 在使用Spark streaming消费kafka数据时,程序异常中断的情况下发现会有数据丢失的风险,本文简单介绍如何解决这些问题。

在问题开始之前先解释下流处理中的几种可靠性语义:

1、At most once - 每条数据最多被处理一次(0次或1次),这种语义下会出现数据丢失的问题;
2、At least once - 每条数据最少被处理一次 (1次或更多),这个不会出现数据丢失,但是会出现数据重复;

  3、Exactly once - 每条数据只会被处理一次,没有数据会丢失,并且没有数据会被多次处理,这种语义是大家最想要的,但是也是最难实现的。


Kafka高级API

  如果不做容错,将会带来数据丢失,因为Receiver一直在接收数据,在其没有处理的时候(已通知zk数据接收到),Executor突然挂掉(或是driver挂掉通知executor关闭),缓存在内存中的数据就会丢失。因为这个问题,Spark1.2开始加入了WAL(Write ahead log)开启 WAL,将receiver获取数据的存储级别修改为StorageLevel.MEMORY_AND_DISK_SER,使用代码片段如下:

1
2
3
4
5
6
val conf =  new  SparkConf()  
conf.set( "spark.streaming.receiver.writeAheadLog.enable" , "true" )  
val sc=  new  SparkContext(conf)  
val ssc =  new  StreamingContext(sc,Seconds( 5 ))  
ssc.checkpoint( "checkpoint" )  
val lines = <span  class = "wp_keywordlink_affiliate" ><a data-original-title= "View all posts in Kafka"  href= "https://www.iteblog.com/archives/tag/kafka"  title= ""  target= "_blank" >Kafka</a></span>Utils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2)


但是开启WAL后,依旧存在数据丢失问题,即使按官方说的设置了WAL,依旧会有数据丢失,这是为什么?因为在任务中断时receiver也被强行终止了,将会造成数据丢失,提示如下:



 

1
2
3
ERROR ReceiverTracker: Deregistered receiver  for  stream  0 : Stopped by driver  
WARN BlockGenerator: Cannot stop BlockGenerator as its not in the Active state [state = StoppedAll]  
WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer queue interrupted.


在Streaming程序的最后添加代码,只有在确认所有receiver都关闭的情况下才终止程序。我们可以调用StreamingContext的stop方法,其原型如下:


  1. 1
    def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit



可以如下使用:

  1. 1
    2
    3
    sys.addShutdownHook({  
       ssc.stop( true , true )  
    )})


WAL带来的问题
WAL实现的是At-least-once语义。如果在写入到外部存储的数据还没有将offset更新到zookeeper就挂掉,这些数据将会被反复消费。同时,因为需要把数据写入到可靠的外部系统,这会牺牲系统的整个吞吐量。


Kafka Direct API

  Kafka direct API 的运行方式,将不再使用receiver来读取数据,也不用使用WAL机制。同时保证了exactly-once语义,不会在WAL中消费重复数据。不过需要自己完成将offset写入zk的过程。调用方式可以参见下面:


  1. 1
    2
    3
    4
    5
    6
    7
    messages.foreachRDD(rdd=>{  
        val message = rdd.map(_._2)    
        //对数据进行一些操作  
        message.map(method)  
        //更新zk上的offset (自己实现)  
        updateZKOffsets(rdd)  
    })










本文转自里冲51CTO博客,原文链接:http://blog.51cto.com/coollast/1887077  ,如需转载请自行联系原作者


相关文章
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
144 3
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
236 0
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
276 0
|
10月前
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
468 1
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
184 0
|
消息中间件 分布式计算 Kafka
195 Spark Streaming整合Kafka完成网站点击流实时统计
195 Spark Streaming整合Kafka完成网站点击流实时统计
181 0
|
消息中间件 分布式计算 Kafka
大数据Spark Structured Streaming集成 Kafka
大数据Spark Structured Streaming集成 Kafka
356 0
|
消息中间件 分布式计算 Kafka
大数据Spark Streaming集成Kafka
大数据Spark Streaming集成Kafka
340 0
|
消息中间件 分布式计算 Java
【Spark Streaming】(五)Spark Streaming 与 Kafka 集成实战!
【Spark Streaming】(五)Spark Streaming 与 Kafka 集成实战!
1030 0
【Spark Streaming】(五)Spark Streaming 与 Kafka 集成实战!