Kafka SparkStreaming 保证数据不丢失问题 >0.10版本

简介:

sparkstreaming 处理kafka数据,几种数据丢失的情况,
1、雪崩效应导致的异常 kill掉进程 ,导致数据丢失
2、程序bug 导致进程挂了,导致数据丢失

以上是使用自动提交offset会存在的问题,若要保证数据0丢失,需要使用offset commit api
手动提交offset,自己保存offset,自己提交处理完的offset。
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

官方提供几种保存offset的方式

  1. checkpoint的方式

问题:数据和offset并不同步无法保证事物的概念,生成小文件太多,存在hdfs,会造成namenode和datanode的压力

  1. your own data store :zk、 hbase、。。。

缺点就是需要维护业务,比较麻烦
官网代码
// begin from the the offsets committed to the database
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap

val stream = KafkaUtils.createDirectStream[String, String]
(
streamingContext,
PreferConsistent,
Assign[String, String]
(
fromOffsets.keys.toList, kafkaParams, fromOffsets)
)

stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

val results = yourCalculation(rdd)

// begin your transaction

// update results
// update offsets where the end of existing offsets matches the beginning of this batch of offsets
// assert that offsets were updated correctly

// end your transaction
}
3.Kafka itself kafka本身提供的api自我维护
设置enable.auto.commit to false
//坑,foreachRDD 之前不能使用map orderby等生成新的rdd,这样offset信息会丢失
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

// 业务处理,异步提交
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
这里的问题就是如果在业务处理完还没异步提交offset,其实再次启动消费会重复处理没提交offset的数据。
如何在保证数据不丢失的同时,对重复数据做处理呢?

                                                                                                                           ----若泽数据
相关文章
|
9月前
|
消息中间件 Java Kafka
Springboot集成高低版本kafka
Springboot集成高低版本kafka
299 0
|
9月前
|
消息中间件 分布式计算 Kafka
SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)
SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(一)
126 5
|
消息中间件 存储 分布式计算
Spark学习---6、SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(二)
Spark学习---6、SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(二)
|
消息中间件 分布式计算 Kafka
Spark学习---6、SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(一)
Spark学习---6、SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(一)
|
消息中间件 SQL 分布式计算
Spark分布式计算框架之SparkStreaming+kafka
Spark分布式计算框架之SparkStreaming+kafka
147 0
|
消息中间件 分布式计算 Kafka
SparkStreaming 整合 Kafka
SparkStreaming 整合 Kafka
109 0
|
消息中间件 Java Kafka
springboot 连接 kafka集群(kafka版本 2.13-3.4.0)
发布者我们使用 KafkaTemplate 来进行消息发布,所以需要先对其进行一些必要的配置
321 1
|
消息中间件 存储 算法
Kafka 如何保证数据不丢失
Kafka 如何保证数据不丢失
2402 0
|
消息中间件 数据采集 分布式计算
flume kafka和sparkstreaming整合
flume kafka和sparkstreaming整合
112 0
|
SQL 消息中间件 分布式计算
如何查看spark与hadoop、kafka、Scala、flume、hive等兼容版本【适用于任何版本】
如何查看spark与hadoop、kafka、Scala、flume、hive等兼容版本【适用于任何版本】
1085 0
如何查看spark与hadoop、kafka、Scala、flume、hive等兼容版本【适用于任何版本】