目录
concurrency属性作用
什么情况下设置concurrency,以及设置多少
RoundRobinAssignor 和 RangeAssignor 作用
不同配置的实验分析
分区数3|concurrency = 1|启动一个客户端(单机)
分区数3|concurrency = 1|启动2个客户端(分布式模式)
分区数3|concurrency = 3|启动一个客户端
分区数3|concurrency = 3|启动2个客户端(分布式模式)
批量消费
concurrency属性作用
concurrency默认是1;
container.setConcurrency(3)表示创建三个KafkaMessageListenerContainer实例。
一个KafkaMessageListenerContainer实例分配一个分区进行消费;
如果设置为1的情况下, 这一个实例消费Topic的所有分区;
如果设置多个,那么会平均分配所有分区;
如果实例>分区数; 那么空出来的实例会浪费掉;
如果实例<=分区数 那么会有一部分实例消费多个实例,但也是均衡分配的
如果在分布式情况下, 那么总的KafkaMessageListenerContainer实例数= 服务器机器数量*concurrency ;
什么情况下设置concurrency,以及设置多少
这个得看我们给Topic设置的分区数量; 总的来说就是 机器数量*concurrency <= 分区数
例如分区=3; 而且同时有3台机器 ,那么concurrency=1就行了; 设置多了就会浪费资源;、
例如分区=9; 只有3台机器;那么可以concurrency=3 ; 每台机器3个消费者连接3个分区; 那么你可能会问我们concurrency=1不也可以吗; 反正都是一台机器消费3个分区;
话是没有错; 但是他们的差别在 一个线程消费3个分区和 3个线程消费3个分区 , 单线程和多线程你选哪个
RoundRobinAssignor 和 RangeAssignor 作用
默认情况下 spring.kafka.consumer.properties.partition.assignment.strategy=\ org.apache.kafka.clients.consumer.RangeAssignor
假如如下情况,同时监听了2个Topic; 并且每个topic的分区都是3; concurrency设置为6;
@KafkaListener(id = "consumer-id6", topics = {"SHI_TOPIC3","SHI_TOPIC4"}, containerFactory = "concurrencyFactory" , clientIdPrefix = "myClientId6") public void consumer6(List<?> list) { StringBuffer sb = new StringBuffer(); list.forEach((l)->{ sb.append("|msg:").append(l); }); log.info("线程:{} consumer-id6 消费->{}",Thread.currentThread(),sb); }
那么你期望的是不是 2*3=6 刚好6个线程;一个线程分配一个分区; 那么我们运行看看结果
看上图中,我们发现并没有按照我们的预期去做; 有三个消费者其实是闲置状态的; 只有另外的3个消费者负责了2个Topic的总共6个分区; 因为默认的分配策略是 spring.kafka.consumer.properties.partition.assignment.strategy=\ org.apache.kafka.clients.consumer.RangeAssignor ;
如果想达到我们的预期;那你可以修改策略; spring.kafka.consumer.properties.partition.assignment.strategy=\ org.apache.kafka.clients.consumer.RoundRobinAssignor
修改之后
每个线程分配一个分区
不同配置的实验分析
分区数3|concurrency = 1|启动一个客户端(单机)
创建了名为 SHI_TOPIC3
并且分区数为3的Topic
代码启动,设置concurrency = 1
, 只启动一个客户端;
启动日志
2020-11-18 17:14:42 o.a.k.c.c.i.ConsumerCoordinator 611 [INFO] [Consumer clientId=myClientId5-0, groupId=consumer-id5] Finished assignment for group at generation 6: {myClientId5-0-a273480d-2370-49e5-9187-ed10fe6dcf51= Assignment(partitions=[SHI_TOPIC3-0, SHI_TOPIC3-1, SHI_TOPIC3-2])} 2020-11-18 17:14:42 o.s.k.l.KafkaMessageListenerContainer 292 [INFO] consumer-id5: partitions assigned: [SHI_TOPIC3-2, SHI_TOPIC3-1, SHI_TOPIC3-0]
可以看到这个客户端myClientId5-0-a273480d-2370-49e5-9187-ed10fe6dcf51
被分配了3个分区SHI_TOPIC3-0, SHI_TOPIC3-1, SHI_TOPIC3-2
;
消费日志
2020-11-18 17:14:45 c.d.b.k.KafkaListeners 109 [INFO] 线程:Thread[consumer-id5-0-C-1,5,main] consumer-id5 消费->record:ConsumerRecord(topic = SHI_TOPIC3, partition = 2, leaderEpoch = 0, offset = 0, CreateTime = 1605690882681, serialized key size = 13, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1605690882615, value = 我是data0),value:我是data0,partition:2,offset:0 2020-11-18 17:14:45 c.d.b.k.KafkaListeners 109 [INFO] 线程:Thread[consumer-id5-0-C-1,5,main] consumer-id5 消费->record:ConsumerRecord(topic = SHI_TOPIC3, partition = 2, leaderEpoch = 0, offset = 1, CreateTime = 1605690882705, serialized key size = 13, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1605690882705, value = 我是data4),value:我是data4,partition:2,offset:1 2020-11-18 17:14:45 c.d.b.k.KafkaListeners 109 [INFO] 线程:Thread[consumer-id5-0-C-1,5,main] consumer-id5 消费->record:ConsumerRecord(topic = SHI_TOPIC3, partition = 2, leaderEpoch = 0, offset = 2, CreateTime = 1605690882705, serialized key size = 13, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1605690882705, value = 我是data5),value:我是data5,partition:2,offset:2 2020-11-18 17:14:45 c.d.b.k.KafkaListeners 109 [INFO] 线程:Thread[consumer-id5-0-C-1,5,main] consumer-id5 消费->record:ConsumerRecord(topic = SHI_TOPIC3, partition = 2, leaderEpoch = 0, offset = 3, CreateTime = 1605690882706, serialized key size = 13, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1605690882705, value = 我是data6),value:我是data6,partition:2,offset:3 2020-11-18 17:14:45 c.d.b.k.KafkaListeners 109 [INFO] 线程:Thread[consumer-id5-0-C-1,5,main] consumer-id5 消费->record:ConsumerRecord(topic = SHI_TOPIC3, partition = 2, leaderEpoch = 0, offset = 4, CreateTime = 1605690882706, serialized key size = 13, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1605690882706, value = 我是data7),value:我是data7,partition:2,offset:4 .....
可以看到线程都是同一个 Thread[consumer-id5-0-C-1,5,main]
; 说明的问题就是 在消费的时候是单线程消费的,并且还是一个线程去消费 3个分区的数据; 又涉及到切换消费分区的问题;
查询这个消费组的消费情况;
也证实只有一个消费者myClientId5-0-a273480d-2370-49e5-9187-ed10fe6dcf51在消费3个分区的数据;
分区数3|concurrency = 1|启动2个客户端(分布式模式)
第一个客户端不动,继续运行, 然后启动第二个客户端
第一个客户端发生的变化
2020-11-18 17:34:24 o.a.k.c.c.i.ConsumerCoordinator 611 [INFO] [Consumer clientId=myClientId5-0, groupId=consumer-id5] Finished assignment for group at generation 9: {myClientId5-0-66a81e88-d924-4890-8b8e-2c6960ed0704=Assignment(partitions=[SHI_TOPIC3-2]), myClientId5-0-31c9a99f-5735-4a1d-b537-95bc5ab4533f=Assignment(partitions=[SHI_TOPIC3-0, SHI_TOPIC3-1])}
第一个客户端进行了 再平衡 ; 因为多了第二个可以分担压力进行消费; 可以看到把SHI_TOPIC3-2
平衡出去了
第二个客户端的日志
2020-11-18 17:34:24 o.a.k.c.Metadata 277 [INFO] [Consumer clientId=myClientId5-0, groupId=consumer-id5] Cluster ID: O304VSOeSEyporzbs5AITA 2020-11-18 17:34:24 o.a.k.c.c.i.AbstractCoordinator 797 [INFO] [Consumer clientId=myClientId5-0, groupId=consumer-id5] Discovered group coordinator xxxxxx:9092 (id: 2147483645 rack: null) 2020-11-18 17:34:24 o.a.k.c.c.i.AbstractCoordinator 552 [INFO] [Consumer clientId=myClientId5-0, groupId=consumer-id5] (Re-)joining group 2020-11-18 17:34:25 o.s.k.l.KafkaMessageListenerContainer 292 [INFO] consumer-id5: partitions assigned: [SHI_TOPIC3-2]
查询客户端消费情况
可以看到第二个客户端分配到了SHI_TOPIC3--2
的分区进行消费; 并且是单线程消费;
分区数3|concurrency = 3|启动一个客户端
客户端日志
2020-11-18 17:50:42 o.a.k.c.c.i.ConsumerCoordinator 273 [INFO] [Consumer clientId=myClientId5-1, groupId=consumer-id5] Adding newly assigned partitions: SHI_TOPIC3-1 2020-11-18 17:50:42 o.a.k.c.c.i.ConsumerCoordinator 273 [INFO] [Consumer clientId=myClientId5-0, groupId=consumer-id5] Adding newly assigned partitions: SHI_TOPIC3-0 2020-11-18 17:50:42 o.a.k.c.c.i.ConsumerCoordinator 273 [INFO] [Consumer clientId=myClientId5-2, groupId=consumer-id5] Adding newly assigned partitions: SHI_TOPIC3-2 2020-11-18 17:50:42 o.s.k.l.KafkaMessageListenerContainer 292 [INFO] consumer-id5: partitions assigned: [SHI_TOPIC3-2] 2020-11-18 17:50:42 o.s.k.l.KafkaMessageListenerContainer 292 [INFO] consumer-id5: partitions assigned: [SHI_TOPIC3-0] 2020-11-18 17:50:42 o.s.k.l.KafkaMessageListenerContainer 292 [INFO] consumer-id5: partitions assigned: [SHI_TOPIC3-1]
上面日志显示 创建了3个消费者,他们都属于同一个消费组groupId=consumer-id5
,3个分区刚好3个消费者一人一个分区平均分配;
客户端日志
2020-11-18 17:50:45 c.d.b.k.KafkaListeners 109 [INFO] 线程:Thread[consumer-id5-0-C-1,5,main] consumer-id5 消费->record:ConsumerRecord(topic = SHI_TOPIC3, partition = 0, leaderEpoch = 0, offset = 11, CreateTime = 1605693042720, serialized key size = 13, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1605693042432, value = 我是data0),value:我是data0,partition:0,offset:11 2020-11-18 17:50:45 c.d.b.k.KafkaListeners 109 [INFO] 线程:Thread[consumer-id5-2-C-1,5,main] consumer-id5 消费->record:ConsumerRecord(topic = SHI_TOPIC3, partition = 2, leaderEpoch = 0, offset = 12, CreateTime = 1605693042751, serialized key size = 13, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1605693042750, value = 我是data1),value:我是data1,partition:2,offset:12 2020-11-18 17:50:45 c.d.b.k.KafkaListeners 109 [INFO] 线程:Thread[consumer-id5-1-C-1,5,main] consumer-id5 消费->record:ConsumerRecord(topic = SHI_TOPIC3, partition = 1, leaderEpoch = 0, offset = 17, CreateTime = 1605693042757, serialized key size = 13, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1605693042757, value = 我是data7),value:我是data7,partition:1,offset:17
每个消费者都是单线程,一个线程消费一个分区
分区数3|concurrency = 3|启动2个客户端(分布式模式)
启动第一个客户端
启动第二个客户端
启动第二个客户端之后就发生了 再分配rebalance
; 可以看到,总共就有6个消费者, 但是其中的3个都是处于空闲状态;
因为一个分区最多只能有一个分区来进行消费;
批量消费
/** * 监听器工厂 批量消费 * @return */ @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> concurrencyFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(kafkaConsumerFactory()); factory.setConcurrency(1); //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG factory.setBatchListener(true); return factory; }
配置文件设置 批量的最大条数
kafka.consumer.max-poll-records = 20
消费
@KafkaListener(id = "consumer-id6", topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory" , clientIdPrefix = "myClientId6") public void consumer6(List<?> list) { StringBuffer sb = new StringBuffer(); list.forEach((l)->{ sb.append("|msg:").append(l); }); log.info("线程:{} consumer-id6 消费->{}",Thread.currentThread(),sb); }