【spring-kafka】属性concurrency的作用及如何配置(RoundRobinAssignor 、RangeAssignor)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 【spring-kafka】属性concurrency的作用及如何配置(RoundRobinAssignor 、RangeAssignor)

目录

concurrency属性作用

什么情况下设置concurrency,以及设置多少

RoundRobinAssignor 和 RangeAssignor 作用

不同配置的实验分析

分区数3|concurrency = 1|启动一个客户端(单机)

分区数3|concurrency = 1|启动2个客户端(分布式模式)

分区数3|concurrency = 3|启动一个客户端

分区数3|concurrency = 3|启动2个客户端(分布式模式)

批量消费

concurrency属性作用

concurrency默认是1;


container.setConcurrency(3)表示创建三个KafkaMessageListenerContainer实例。

一个KafkaMessageListenerContainer实例分配一个分区进行消费;

如果设置为1的情况下, 这一个实例消费Topic的所有分区;

如果设置多个,那么会平均分配所有分区;

如果实例>分区数; 那么空出来的实例会浪费掉;

如果实例<=分区数 那么会有一部分实例消费多个实例,但也是均衡分配的


如果在分布式情况下, 那么总的KafkaMessageListenerContainer实例数= 服务器机器数量*concurrency ;


什么情况下设置concurrency,以及设置多少

这个得看我们给Topic设置的分区数量; 总的来说就是 机器数量*concurrency <= 分区数


例如分区=3; 而且同时有3台机器 ,那么concurrency=1就行了; 设置多了就会浪费资源;、


例如分区=9; 只有3台机器;那么可以concurrency=3 ; 每台机器3个消费者连接3个分区; 那么你可能会问我们concurrency=1不也可以吗; 反正都是一台机器消费3个分区;

话是没有错; 但是他们的差别在 一个线程消费3个分区和 3个线程消费3个分区 , 单线程和多线程你选哪个


RoundRobinAssignor 和 RangeAssignor 作用

默认情况下 spring.kafka.consumer.properties.partition.assignment.strategy=\ org.apache.kafka.clients.consumer.RangeAssignor


假如如下情况,同时监听了2个Topic; 并且每个topic的分区都是3; concurrency设置为6;

    @KafkaListener(id = "consumer-id6", topics = {"SHI_TOPIC3","SHI_TOPIC4"}, containerFactory = "concurrencyFactory"
            , clientIdPrefix = "myClientId6")
    public void consumer6(List<?> list) {
        StringBuffer sb = new StringBuffer();
        list.forEach((l)->{
            sb.append("|msg:").append(l);
        });
        log.info("线程:{} consumer-id6 消费->{}",Thread.currentThread(),sb);
    }

那么你期望的是不是 2*3=6 刚好6个线程;一个线程分配一个分区; 那么我们运行看看结果

image.png

看上图中,我们发现并没有按照我们的预期去做; 有三个消费者其实是闲置状态的; 只有另外的3个消费者负责了2个Topic的总共6个分区; 因为默认的分配策略是 spring.kafka.consumer.properties.partition.assignment.strategy=\ org.apache.kafka.clients.consumer.RangeAssignor ;


如果想达到我们的预期;那你可以修改策略; spring.kafka.consumer.properties.partition.assignment.strategy=\ org.apache.kafka.clients.consumer.RoundRobinAssignor


修改之后

image.png

每个线程分配一个分区

不同配置的实验分析

分区数3|concurrency = 1|启动一个客户端(单机)

创建了名为 SHI_TOPIC3并且分区数为3的Topic

image.png

代码启动,设置concurrency = 1, 只启动一个客户端;

启动日志

2020-11-18 17:14:42 o.a.k.c.c.i.ConsumerCoordinator 611 [INFO] [Consumer clientId=myClientId5-0, groupId=consumer-id5] 
Finished assignment for group at generation 6: {myClientId5-0-a273480d-2370-49e5-9187-ed10fe6dcf51=
Assignment(partitions=[SHI_TOPIC3-0, SHI_TOPIC3-1, SHI_TOPIC3-2])}
 2020-11-18 17:14:42 o.s.k.l.KafkaMessageListenerContainer 292 [INFO] consumer-id5: 
 partitions assigned: [SHI_TOPIC3-2, SHI_TOPIC3-1, SHI_TOPIC3-0]

可以看到这个客户端myClientId5-0-a273480d-2370-49e5-9187-ed10fe6dcf51 被分配了3个分区SHI_TOPIC3-0, SHI_TOPIC3-1, SHI_TOPIC3-2;

消费日志

2020-11-18 17:14:45 c.d.b.k.KafkaListeners 109 [INFO] 线程:Thread[consumer-id5-0-C-1,5,main] consumer-id5 消费->record:ConsumerRecord(topic = SHI_TOPIC3, partition = 2, leaderEpoch = 0, offset = 0, CreateTime = 1605690882681, serialized key size = 13, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1605690882615, value = 我是data0),value:我是data0,partition:2,offset:0
 2020-11-18 17:14:45 c.d.b.k.KafkaListeners 109 [INFO] 线程:Thread[consumer-id5-0-C-1,5,main] consumer-id5 消费->record:ConsumerRecord(topic = SHI_TOPIC3, partition = 2, leaderEpoch = 0, offset = 1, CreateTime = 1605690882705, serialized key size = 13, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1605690882705, value = 我是data4),value:我是data4,partition:2,offset:1
 2020-11-18 17:14:45 c.d.b.k.KafkaListeners 109 [INFO] 线程:Thread[consumer-id5-0-C-1,5,main] consumer-id5 消费->record:ConsumerRecord(topic = SHI_TOPIC3, partition = 2, leaderEpoch = 0, offset = 2, CreateTime = 1605690882705, serialized key size = 13, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1605690882705, value = 我是data5),value:我是data5,partition:2,offset:2
 2020-11-18 17:14:45 c.d.b.k.KafkaListeners 109 [INFO] 线程:Thread[consumer-id5-0-C-1,5,main] consumer-id5 消费->record:ConsumerRecord(topic = SHI_TOPIC3, partition = 2, leaderEpoch = 0, offset = 3, CreateTime = 1605690882706, serialized key size = 13, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1605690882705, value = 我是data6),value:我是data6,partition:2,offset:3
 2020-11-18 17:14:45 c.d.b.k.KafkaListeners 109 [INFO] 线程:Thread[consumer-id5-0-C-1,5,main] consumer-id5 消费->record:ConsumerRecord(topic = SHI_TOPIC3, partition = 2, leaderEpoch = 0, offset = 4, CreateTime = 1605690882706, serialized key size = 13, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1605690882706, value = 我是data7),value:我是data7,partition:2,offset:4
.....

可以看到线程都是同一个 Thread[consumer-id5-0-C-1,5,main] ; 说明的问题就是 在消费的时候是单线程消费的,并且还是一个线程去消费 3个分区的数据; 又涉及到切换消费分区的问题;

查询这个消费组的消费情况;

image.png

也证实只有一个消费者myClientId5-0-a273480d-2370-49e5-9187-ed10fe6dcf51在消费3个分区的数据;


分区数3|concurrency = 1|启动2个客户端(分布式模式)

第一个客户端不动,继续运行, 然后启动第二个客户端

第一个客户端发生的变化

 2020-11-18 17:34:24 o.a.k.c.c.i.ConsumerCoordinator 611 [INFO] [Consumer clientId=myClientId5-0, groupId=consumer-id5] Finished assignment for group at generation 9: {myClientId5-0-66a81e88-d924-4890-8b8e-2c6960ed0704=Assignment(partitions=[SHI_TOPIC3-2]), myClientId5-0-31c9a99f-5735-4a1d-b537-95bc5ab4533f=Assignment(partitions=[SHI_TOPIC3-0, SHI_TOPIC3-1])}

第一个客户端进行了 再平衡 ; 因为多了第二个可以分担压力进行消费; 可以看到把SHI_TOPIC3-2平衡出去了

第二个客户端的日志

 2020-11-18 17:34:24 o.a.k.c.Metadata 277 [INFO] [Consumer clientId=myClientId5-0, groupId=consumer-id5] Cluster ID: O304VSOeSEyporzbs5AITA
 2020-11-18 17:34:24 o.a.k.c.c.i.AbstractCoordinator 797 [INFO] [Consumer clientId=myClientId5-0, groupId=consumer-id5] Discovered group coordinator xxxxxx:9092 (id: 2147483645 rack: null)
 2020-11-18 17:34:24 o.a.k.c.c.i.AbstractCoordinator 552 [INFO] [Consumer clientId=myClientId5-0, groupId=consumer-id5] (Re-)joining group
 2020-11-18 17:34:25 o.s.k.l.KafkaMessageListenerContainer 292 [INFO] consumer-id5: partitions assigned: [SHI_TOPIC3-2]

查询客户端消费情况

image.png

可以看到第二个客户端分配到了SHI_TOPIC3--2的分区进行消费; 并且是单线程消费;

分区数3|concurrency = 3|启动一个客户端

客户端日志

2020-11-18 17:50:42 o.a.k.c.c.i.ConsumerCoordinator 273 [INFO] [Consumer clientId=myClientId5-1, groupId=consumer-id5] Adding newly assigned partitions: SHI_TOPIC3-1
 2020-11-18 17:50:42 o.a.k.c.c.i.ConsumerCoordinator 273 [INFO] [Consumer clientId=myClientId5-0, groupId=consumer-id5] Adding newly assigned partitions: SHI_TOPIC3-0
 2020-11-18 17:50:42 o.a.k.c.c.i.ConsumerCoordinator 273 [INFO] [Consumer clientId=myClientId5-2, groupId=consumer-id5] Adding newly assigned partitions: SHI_TOPIC3-2
 2020-11-18 17:50:42 o.s.k.l.KafkaMessageListenerContainer 292 [INFO] consumer-id5: partitions assigned: [SHI_TOPIC3-2]
 2020-11-18 17:50:42 o.s.k.l.KafkaMessageListenerContainer 292 [INFO] consumer-id5: partitions assigned: [SHI_TOPIC3-0]
 2020-11-18 17:50:42 o.s.k.l.KafkaMessageListenerContainer 292 [INFO] consumer-id5: partitions assigned: [SHI_TOPIC3-1]

上面日志显示 创建了3个消费者,他们都属于同一个消费组groupId=consumer-id5,3个分区刚好3个消费者一人一个分区平均分配;

客户端日志

 2020-11-18 17:50:45 c.d.b.k.KafkaListeners 109 [INFO] 线程:Thread[consumer-id5-0-C-1,5,main] consumer-id5 消费->record:ConsumerRecord(topic = SHI_TOPIC3, partition = 0, leaderEpoch = 0, offset = 11, CreateTime = 1605693042720, serialized key size = 13, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1605693042432, value = 我是data0),value:我是data0,partition:0,offset:11
 2020-11-18 17:50:45 c.d.b.k.KafkaListeners 109 [INFO] 线程:Thread[consumer-id5-2-C-1,5,main] consumer-id5 消费->record:ConsumerRecord(topic = SHI_TOPIC3, partition = 2, leaderEpoch = 0, offset = 12, CreateTime = 1605693042751, serialized key size = 13, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1605693042750, value = 我是data1),value:我是data1,partition:2,offset:12
 2020-11-18 17:50:45 c.d.b.k.KafkaListeners 109 [INFO] 线程:Thread[consumer-id5-1-C-1,5,main] consumer-id5 消费->record:ConsumerRecord(topic = SHI_TOPIC3, partition = 1, leaderEpoch = 0, offset = 17, CreateTime = 1605693042757, serialized key size = 13, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1605693042757, value = 我是data7),value:我是data7,partition:1,offset:17

image.png

每个消费者都是单线程,一个线程消费一个分区

分区数3|concurrency = 3|启动2个客户端(分布式模式)

启动第一个客户端

image.png

启动第二个客户端

image.png

启动第二个客户端之后就发生了 再分配rebalance; 可以看到,总共就有6个消费者, 但是其中的3个都是处于空闲状态;
因为一个分区最多只能有一个分区来进行消费;

批量消费

  /**
     * 监听器工厂 批量消费
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> concurrencyFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory());
        factory.setConcurrency(1);
        //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        factory.setBatchListener(true);
        return factory;
    }

配置文件设置 批量的最大条数

kafka.consumer.max-poll-records = 20

消费

    @KafkaListener(id = "consumer-id6", topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory"
            , clientIdPrefix = "myClientId6")
    public void consumer6(List<?> list) {
        StringBuffer sb = new StringBuffer();
        list.forEach((l)->{
            sb.append("|msg:").append(l);
        });
        log.info("线程:{} consumer-id6 消费->{}",Thread.currentThread(),sb);
    }



相关文章
|
10天前
|
Java Spring
【Spring】方法注解@Bean,配置类扫描路径
@Bean方法注解,如何在同一个类下面定义多个Bean对象,配置扫描路径
136 73
|
2月前
|
Java 开发者 微服务
手写模拟Spring Boot自动配置功能
【11月更文挑战第19天】随着微服务架构的兴起,Spring Boot作为一种快速开发框架,因其简化了Spring应用的初始搭建和开发过程,受到了广大开发者的青睐。自动配置作为Spring Boot的核心特性之一,大大减少了手动配置的工作量,提高了开发效率。
69 0
|
3月前
|
Java API 数据库
构建RESTful API已经成为现代Web开发的标准做法之一。Spring Boot框架因其简洁的配置、快速的启动特性及丰富的功能集而备受开发者青睐。
【10月更文挑战第11天】本文介绍如何使用Spring Boot构建在线图书管理系统的RESTful API。通过创建Spring Boot项目,定义`Book`实体类、`BookRepository`接口和`BookService`服务类,最后实现`BookController`控制器来处理HTTP请求,展示了从基础环境搭建到API测试的完整过程。
63 4
|
3月前
|
Java API 数据库
Spring Boot框架因其简洁的配置、快速的启动特性及丰富的功能集而备受开发者青睐
本文通过在线图书管理系统案例,详细介绍如何使用Spring Boot构建RESTful API。从项目基础环境搭建、实体类与数据访问层定义,到业务逻辑实现和控制器编写,逐步展示了Spring Boot的简洁配置和强大功能。最后,通过Postman测试API,并介绍了如何添加安全性和异常处理,确保API的稳定性和安全性。
61 0
|
10天前
|
Java Spring
【Spring配置相关】启动类为Current File,如何更改
问题场景:当我们切换类的界面的时候,重新启动的按钮是灰色的,不能使用,并且只有一个Current File 项目,下面介绍两种方法来解决这个问题。
|
10天前
|
Java Spring
【Spring配置】idea编码格式导致注解汉字无法保存
问题一:对于同一个项目,我们在使用idea的过程中,使用汉字注解完后,再打开该项目,汉字变成乱码问题二:本来a项目中,汉字注解调试好了,没有乱码了,但是创建出来的新的项目,写的注解又成乱码了。
|
10天前
|
Java Spring
【Spring配置】创建yml文件和properties或yml文件没有绿叶
本文主要针对,一个项目中怎么创建yml和properties两种不同文件,进行配置,和启动类没有绿叶标识进行解决。
|
19天前
|
NoSQL Java Redis
Spring Boot 自动配置机制:从原理到自定义
Spring Boot 的自动配置机制通过 `spring.factories` 文件和 `@EnableAutoConfiguration` 注解,根据类路径中的依赖和条件注解自动配置所需的 Bean,大大简化了开发过程。本文深入探讨了自动配置的原理、条件化配置、自定义自动配置以及实际应用案例,帮助开发者更好地理解和利用这一强大特性。
68 14
|
16天前
|
XML Java 数据格式
Spring容器Bean之XML配置方式
通过对以上内容的掌握,开发人员可以灵活地使用Spring的XML配置方式来管理应用程序的Bean,提高代码的模块化和可维护性。
53 6
|
3月前
|
Java API Spring
在 Spring 配置文件中配置 Filter 的步骤
【10月更文挑战第21天】在 Spring 配置文件中配置 Filter 是实现请求过滤的重要手段。通过合理的配置,可以灵活地对请求进行处理,满足各种应用需求。还可以根据具体的项目要求和实际情况,进一步深入研究和优化 Filter 的配置,以提高应用的性能和安全性。