Kafka SparkStreaming 保证数据不丢失问题 >0.10版本-阿里云开发者社区

开发者社区> 数据库> 正文

Kafka SparkStreaming 保证数据不丢失问题 >0.10版本

简介:

sparkstreaming 处理kafka数据,几种数据丢失的情况,
1、雪崩效应导致的异常 kill掉进程 ,导致数据丢失
2、程序bug 导致进程挂了,导致数据丢失

以上是使用自动提交offset会存在的问题,若要保证数据0丢失,需要使用offset commit api
手动提交offset,自己保存offset,自己提交处理完的offset。
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

官方提供几种保存offset的方式

  1. checkpoint的方式
    问题:数据和offset并不同步无法保证事物的概念,生成小文件太多,存在hdfs,会造成namenode和datanode的压力
  2. your own data store :zk、 hbase、。。。
    缺点就是需要维护业务,比较麻烦

官网代码
// begin from the the offsets committed to the database
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap

val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
AssignString, String
)

stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

val results = yourCalculation(rdd)

// begin your transaction

// update results
// update offsets where the end of existing offsets matches the beginning of this batch of offsets
// assert that offsets were updated correctly

// end your transaction
}
3.Kafka itself kafka本身提供的api自我维护
设置enable.auto.commit to false
//坑,foreachRDD 之前不能使用map orderby等生成新的rdd,这样offset信息会丢失
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

// 业务处理,异步提交
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
这里的问题就是如果在业务处理完还没异步提交offset,其实再次启动消费会重复处理没提交offset的数据。
如何在保证数据不丢失的同时,对重复数据做处理呢?

                                                                                                                           ----若泽数据

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
数据库
使用钉钉扫一扫加入圈子
+ 订阅

分享数据库前沿,解构实战干货,推动数据库技术变革

其他文章