【Kafka从入门到成神系列 八】Kafka 多线程消费者及TCP连接

简介: 【Kafka从入门到成神系列 八】Kafka 多线程消费者及TCP连接

一、引言

ece78dc62964431b906a5e6330c87422.png

二、多线程开发消费者

1. 消费者设计原理

Kafka consumer 是一个单线程的设计方案,从 Kafka Consumer 的入口类 KafkaConsumerKafkaConsumer 是一个双线程的操作,既用户线程和心跳线程

  • 用户线程:启动 Consumer 应用程序 main 方法的线程
  • 心跳线程:负责定期给对应的 Broker 发送心跳请求,以标识消费者的存活性。

2. 多线程设计方案

我们要明确的是:KafkaConsumer 类不是线程安全的,所有的 I/O 操作都是发生在用户主线程中。多个线程不能共享一个 KafkaConsumer

我们制定两套设计方案:

  • 消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整消息获取,处理消息逻辑。

867e2147d478782e6ef0df2852174e30.png

  • 消息者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。

这里我们解释一下这两个方案分别代表什么意思:

  • 第一种:每一个线程创建一个 kafkaConsumer,来执行消息获取和消息处理逻辑
  • 第二种:多个线程拉取消息,交于线程池进行消息的处理

两个方案的优缺点:

方案一:

优点:

  • 实现简单,符合我们当前使用 consumer 的习惯。
  • 多个线程间没有交互,省去很多线程安全保障的开销
  • Kafka 每个线程使用专属的 KafkaConsumer 实例来执行消息获取和消息处理逻辑,因此 Kafka 主题中的每个分区都能保证只被一个线程处理。容易实现分区内的消息消费顺序。

缺点:


每个线程维护自己的 KafkaConsumer 实例,占用更多的系统资源,如:内存、TCP连接等

方案的线程数受限于订阅主题的分区数。

  • 一个消费组中,一个分区只能由一个消费者消费。
  • 每个线程完整的执行消息获取和消息处理逻辑。一旦消息处理逻辑很重,消息处理过慢,容易出现重平衡

方案二:

优点:

  • 将任务切分为消息获取和消息处理两个部分,拥有极高的可伸缩性

缺点:

  • 实现难度过大,两组线程管理
  • 无法保证分区消息消费的顺序性
  • 多线程提交位移,可能导致正确提交位移较困难,可能会出现重复性消费。

3. 实战演示

方案一:

public class FirstMultiConsumerThreadDemo {
  // 集群的地址
    public static final String brokerList = "cluster1:9092";
    // 订阅的主题
    public static final String topic = "test.topic";
    // 消费组的ID
    public static final String groupId = "group.demo";
    // 配置文件
    public static Properties initConfig(){
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        properties.put("key.deserializer", StringDeserializer.class.getName());
        properties.put("value.deserializer",StringDeserializer.class.getName());
        return properties;
    }
    public static void main(String[] args) {
        Properties props = initConfig();
        int consumerThreadNum = 4;
        // 启动 4 个 KafkaConsumer 实例
        for (int i = 0; i < consumerThreadNum; i++) {
            new KafkaConsumerThread(props,topic).start();
        }
    }
    public static class KafkaConsumerThread extends Thread{
        private KafkaConsumer<String,String> kafkaConsumer;
        public KafkaConsumerThread(Properties props, String topic) {
            this.kafkaConsumer = new KafkaConsumer<>(props);
            this.kafkaConsumer.subscribe(Arrays.asList(topic));
        }
        @Override
        public void run() {
            try {
                while (true){
                    // 获取消息
                    ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofMillis(100));
                    for (ConsumerRecord<String,String> record : records){
                        //实现处理逻辑
                        System.out.println(record.value());
                    }
                }
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                kafkaConsumer.close();
            }
        }
    }
}

方案二:

private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
...
private int workerNum = ...;
// 创建一个线程陈
executors = new ThreadPoolExecutor(
  workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
  new ArrayBlockingQueue<>(1000), 
  new ThreadPoolExecutor.CallerRunsPolicy());
...
while (true)  {
  ConsumerRecords<String, String> records = 
    consumer.poll(Duration.ofSeconds(1));
    // 获取消息后,使用线程池来处理消息
  for (final ConsumerRecord record : records) {
    executors.submit(new Worker(record));
  }
}
..

三、消费者TCP连接

我们之前讲过,生产者是如何管理TCP连接的,同样,我们的消费者也可以管理TCP连接。

1. 何时创建TCP连接

和我们的生产者类型,消费者也需要构建一个 KafkaConsumer 实例,但是消费者在构建KafkaConsumer 实例的时候,是不会创建TCP连接的。

原因在于:生产者在进行创建的时候,会启动一个 Sender 线程,这个线程负责 Socket 连接的创建。

// 会 new 一个 Sender 的类,这个类继承 Runnable 接口
this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
// KafkaThread 继承 Thread,启动 Sender 线程
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();

我们的 KafkaConsumer TCP 连接是在调用 KafkaConsumer.poll() 方法是被创建的

从细粒度来说,在 poll 方法的内部有三个时机可以创建 TCP 连接。

  1. 1.发起 FindCoordinator 请求时
  2. 消费者端有个组件叫做协调者(Coordinator),在我们的 Broker 里面,负责消费组的成员管理和各个消费者位移提交管理。

消费者首次调用 poll 方法时,向 Kafka 集群发送一个名为 FindCoordinator 的请求,希望 Kafka 集群告诉消费者哪一个

  • Borker 是他的协调者。请求会发给当前负载量最小的那台 Broker
  • 这个时候会创建一个 TCP 连接
  1. 2.连接协调者时

当我们的 Broker 处理完 FindCoordinator 请求时,会给我们的消费者返回一个响应结果,告诉我们消费者哪一台 Broker 是真正的协调者。

  • 当消费者知道真正的协调者时
  • ,创建与该 Broker 的 TCP 连接。这个时候,我们的协调者可以做一些组协调操作,比如: 加入组、等待组分配、心跳请求处理、位移获取、位移提交等
  1. 3.消费数据时
  • 消费者会为每个要消费的分区创建与该分区领导者的 TCP 连接。如果当前消费者需要消费 5 个分区的数据,这 5个分区的领导者副本分布在 4 个 Broker 上,那么我们需要创建与这 4 台 Broker 的连接

2. 创建多少个 TCP 连接

  • 第一类连接:消费者向 Kafka 集群发送元数据请求以获取整个集群的信息。然后发送 FindCoordinator 请求获取协调者所在的 Broker。
  • 第二类连接:连接协调者所在的 Broker,消费者进程开启消费者组的各种功能以及后续的消息消费。
  • 第三类连接:消费者去向所有分区的领导者副本所在的 Broker 获取数据,创建每个的 TCP 连接。
  • 这里有一个需要注意的点:当我们第三类TCP连接创建成功后,消费者程序将会抛弃第一类TCP连接。之后定期请求元数据,也是使用的第三类。
  • 第一类 TCP 连接仅仅是为了首次获取元数据而创建的,后面就会被废弃掉。最根本的原因是,消费者在启动时还不知道 Kafka 集群的信息,只能使用一个“假”的 ID 去注册,即使消费者获取了真实的 Broker ID,它依旧无法区分这个“假”ID 对应的是哪台 Broker,因此也就无法重用这个 Socket 连接,只能再重新创建一个新的连接

3. 何时关闭 TCP 连接

消费者关闭 TCP 连接分为主动关闭和自动关闭

主动关闭:

  • 手动调用 KafkaConsumer.close() 方法,或者执行 kill 命令

自动关闭:

  • 消费者端参数 connection.max.idle.ms ,如果在规定的时间该 TCP 连接没有请求的话,则自动关闭。


相关文章
|
10天前
|
存储 安全 Java
Qt线程池+生产者消费者模型
Qt线程池+生产者消费者模型
35 5
|
1天前
|
消息中间件 Java Kafka
springboot整合kafka消费者最佳实践
springboot整合kafka消费者最佳实践
6 1
|
8天前
|
设计模式 安全 Java
多线程(代码案例: 单例模式, 阻塞队列, 生产者消费者模型,定时器)
多线程(代码案例: 单例模式, 阻塞队列, 生产者消费者模型,定时器)
20 2
|
9天前
|
消息中间件 Java Kafka
关于kafka消费者超时配置
关于kafka消费者超时配置
|
13天前
|
消息中间件 Oracle 关系型数据库
实时计算 Flink版操作报错之连接外部kafka本地执行测试代码报错如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
13天前
|
设计模式 安全 Java
【Linux 系统】多线程(生产者消费者模型、线程池、STL+智能指针与线程安全、读者写者问题)-- 详解
【Linux 系统】多线程(生产者消费者模型、线程池、STL+智能指针与线程安全、读者写者问题)-- 详解
|
15天前
|
消息中间件 存储 Kafka
Kafka【基础入门】
Kafka【基础入门】
35 1
|
21天前
|
消息中间件 存储 网络协议
Kafka 线程模型痛点攻克: 提升分区写入 2 倍性能
Apache Kafka的单分区写入性能在某些严格保序场景中至关重要,但其现有线程模型限制了性能发挥。本文分析了Kafka的串行处理模型,包括SocketServer、KafkaChannel、RequestChannel等组件,指出其通过KafkaChannel状态机确保请求顺序处理,导致处理效率低下。AutoMQ提出流水线处理模型,简化KafkaChannel状态机,实现网络解析、校验定序和持久化的阶段间并行化,提高处理效率。测试结果显示,AutoMQ的极限吞吐是Kafka的2倍,P99延迟降低至11ms。
29 3
Kafka 线程模型痛点攻克: 提升分区写入 2 倍性能
|
21天前
|
算法 安全 调度
【C++入门到精通】 线程库 | thread类 C++11 [ C++入门 ]
【C++入门到精通】 线程库 | thread类 C++11 [ C++入门 ]
22 1
|
21天前
|
设计模式 算法 安全
Java多线程编程实战:从入门到精通
【4月更文挑战第30天】本文介绍了Java多线程编程的基础,包括线程概念、创建线程(继承`Thread`或实现`Runnable`)、线程生命周期。还讨论了线程同步与锁(同步代码块、`ReentrantLock`)、线程间通信(等待/通知、并发集合)以及实战技巧,如使用线程池、线程安全设计模式和避免死锁。性能优化方面,建议减少锁粒度和使用非阻塞算法。理解这些概念和技术对于编写高效、可靠的多线程程序至关重要。