kafka从指定位置消费

简介: kafka从指定位置消费

消费者消费方式

1、KafkaConsumer.subscribe():为consumer自动分配partition,有内部算法保证topic-partition以最优的方式均匀分配给同group下的不同consumer。

2、KafkaConsumer.assign():为consumer手动、显示的指定需要消费的topic-partitions,不受group.id限制,相当与指定的group无效(this method does not use the consumer’s group management)。

注意:consumer.assign()是不会被消费者的组管理功能管理的,他相对于是一个临时的,不会改变当前group.id的offset,比如:在使用consumer.subscribe(Arrays.asList(topicName));时offset为20,如果再通过assign方式已经获取了消息后,在下次通过consumer.subscribe(Arrays.asList(topicName));来获取消息时offset还是20,还是会获取20以后的消息。

3、我们还可以配置如下属性auto.offset.reset来,设置消费者从分区的开头或者末尾进行消费数据。当然这也是有条件的。

//一般配置earliest 或者latest 值

props.put("auto.offset.reset", "latest");

我把上述三种情况的消费者不同使用方式下,消费者提交offset的情况进行了归总和说明:

注意:

只要不更改group.id,每次重新消费kafka,都是从上次消费结束的地方继续开始,不论"auto.offset.reset”属性设置的是什么

场景一:Kafka上在实时被灌入数据,但kafka上已经积累了两天的数据,如何从最新的offset开始消费?

(最新指相对于当前系统时间最新)

1.将group.id换成新的名字(相当于加入新的消费组)

2.网上文章写还要设置 properties.setProperty("auto.offset.reset", "latest”)

实验发现即使不设置这个,只要group.id是全新的,就会从最新的的offset开始消费

场景二:kafka在实时在灌入数据,kafka上已经积累了两天的数据,如何从两天前最开始的位置消费?

1.将group.id换成新的名字

2.properties.setProperty("auto.offset.reset", "earliest”)

场景三:不更改group.id,只是添加了properties.setProperty("auto.offset.reset", "earliest”),consumer会从两天前最开始的位置消费吗?

不会,只要不更改消费组,只会从上次消费结束的地方继续消费

场景四:不更改group.id,只是添加了properties.setProperty("auto.offset.reset", "latest”),consumer会从距离现在最近的位置消费吗?

不会,只要不更改消费组,只会从上次消费结束的地方继续消费

应用:

正式打包上线前应该使用新的group.id,以便于从kafka最新的位置开始消费

只要将group.id换成全新的,不论"auto.offset.reset”是否设置,设置成什么,都会从最新的位置开始消费


相关文章
|
移动开发 Java HTML5
Springboot web静态资源配置
Springboot web静态资源配置
1091 0
|
消息中间件 分布式计算 Kafka
Kafka(四)【Kafka 消费者】(4)
Kafka(四)【Kafka 消费者】
|
消息中间件 监控 Kafka
查询Kafka集群中消费组(group)信息和对应topic的消费情况
查询Kafka集群中消费组(group)信息和对应topic的消费情况
6922 0
|
Prometheus 监控 Cloud Native
在 Java 中,如何使用线程池监控以及动态调整线程池?
【10月更文挑战第22天】线程池的监控和动态调整是一项重要的任务,需要我们结合具体的应用场景和需求,选择合适的方法和策略,以确保线程池始终处于最优状态,提高系统的性能和稳定性。
2335 2
|
消息中间件 Kafka 流计算
FlinkKafkaConsumer相同group.id多个任务消费kafka问题
当使用FlinkKafkaConsumer消费Kafka时,即使设置了相同的group.id,由于Flink内部管理partition的消费offset,两个程序仍能同时消费所有数据。这与KafkaConsumer不同,后者严格遵循消费组隔离原则,避免重复消费同一分区的数据。Flink为实现exactly-once语义,需要独立管理offset,这导致了上述现象。
|
消息中间件 Kafka 测试技术
Kafka常用命令大全及kafka-console-consumer.sh及参数说明
该文章汇总了Kafka常用命令,包括集群管理、Topic操作、生产者与消费者的命令行工具使用方法等,适用于Kafka的日常运维和开发需求。
4393 3
|
存储 监控 Java
OpenFeign请求拦截器组件RequestInterceptor原理与使用场景
该文章讲述了OpenFeign中的请求拦截器组件RequestInterceptor的原理及其常见使用场景。
OpenFeign请求拦截器组件RequestInterceptor原理与使用场景
|
SQL 存储 JSON
Python写入MySQL数据库to_sql()一文详解+代码展示
Python写入MySQL数据库to_sql()一文详解+代码展示
5079 0
Python写入MySQL数据库to_sql()一文详解+代码展示
|
消息中间件 Java Kafka
【消息中心】kafka消费失败重试10次的问题
【消息中心】kafka消费失败重试10次的问题
2156 0
|
NoSQL Oracle 关系型数据库
cassandra使用场景判断:何时使用及何时不用
介绍 我有一个具有以下功能的数据库服务器: 高可用设计。 可以全球分布。 允许应用程序随时随地写入任何节点。 只需向群集添加更多节点即可进行线性扩展。 自动负载及数据均衡。 一种看起来很像SQL的查询语言。
9554 1