【Kafka从入门到成神系列 八】Kafka 多线程消费者及TCP连接

简介: 【Kafka从入门到成神系列 八】Kafka 多线程消费者及TCP连接

一、引言

ece78dc62964431b906a5e6330c87422.png

二、多线程开发消费者

1. 消费者设计原理

Kafka consumer 是一个单线程的设计方案,从 Kafka Consumer 的入口类 KafkaConsumerKafkaConsumer 是一个双线程的操作,既用户线程和心跳线程

  • 用户线程:启动 Consumer 应用程序 main 方法的线程
  • 心跳线程:负责定期给对应的 Broker 发送心跳请求,以标识消费者的存活性。

2. 多线程设计方案

我们要明确的是:KafkaConsumer 类不是线程安全的,所有的 I/O 操作都是发生在用户主线程中。多个线程不能共享一个 KafkaConsumer

我们制定两套设计方案:

  • 消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整消息获取,处理消息逻辑。

867e2147d478782e6ef0df2852174e30.png

  • 消息者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。

这里我们解释一下这两个方案分别代表什么意思:

  • 第一种:每一个线程创建一个 kafkaConsumer,来执行消息获取和消息处理逻辑
  • 第二种:多个线程拉取消息,交于线程池进行消息的处理

两个方案的优缺点:

方案一:

优点:

  • 实现简单,符合我们当前使用 consumer 的习惯。
  • 多个线程间没有交互,省去很多线程安全保障的开销
  • Kafka 每个线程使用专属的 KafkaConsumer 实例来执行消息获取和消息处理逻辑,因此 Kafka 主题中的每个分区都能保证只被一个线程处理。容易实现分区内的消息消费顺序。

缺点:


每个线程维护自己的 KafkaConsumer 实例,占用更多的系统资源,如:内存、TCP连接等

方案的线程数受限于订阅主题的分区数。

  • 一个消费组中,一个分区只能由一个消费者消费。
  • 每个线程完整的执行消息获取和消息处理逻辑。一旦消息处理逻辑很重,消息处理过慢,容易出现重平衡

方案二:

优点:

  • 将任务切分为消息获取和消息处理两个部分,拥有极高的可伸缩性

缺点:

  • 实现难度过大,两组线程管理
  • 无法保证分区消息消费的顺序性
  • 多线程提交位移,可能导致正确提交位移较困难,可能会出现重复性消费。

3. 实战演示

方案一:

public class FirstMultiConsumerThreadDemo {
  // 集群的地址
    public static final String brokerList = "cluster1:9092";
    // 订阅的主题
    public static final String topic = "test.topic";
    // 消费组的ID
    public static final String groupId = "group.demo";
    // 配置文件
    public static Properties initConfig(){
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        properties.put("key.deserializer", StringDeserializer.class.getName());
        properties.put("value.deserializer",StringDeserializer.class.getName());
        return properties;
    }
    public static void main(String[] args) {
        Properties props = initConfig();
        int consumerThreadNum = 4;
        // 启动 4 个 KafkaConsumer 实例
        for (int i = 0; i < consumerThreadNum; i++) {
            new KafkaConsumerThread(props,topic).start();
        }
    }
    public static class KafkaConsumerThread extends Thread{
        private KafkaConsumer<String,String> kafkaConsumer;
        public KafkaConsumerThread(Properties props, String topic) {
            this.kafkaConsumer = new KafkaConsumer<>(props);
            this.kafkaConsumer.subscribe(Arrays.asList(topic));
        }
        @Override
        public void run() {
            try {
                while (true){
                    // 获取消息
                    ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofMillis(100));
                    for (ConsumerRecord<String,String> record : records){
                        //实现处理逻辑
                        System.out.println(record.value());
                    }
                }
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                kafkaConsumer.close();
            }
        }
    }
}

方案二:

private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
...
private int workerNum = ...;
// 创建一个线程陈
executors = new ThreadPoolExecutor(
  workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
  new ArrayBlockingQueue<>(1000), 
  new ThreadPoolExecutor.CallerRunsPolicy());
...
while (true)  {
  ConsumerRecords<String, String> records = 
    consumer.poll(Duration.ofSeconds(1));
    // 获取消息后,使用线程池来处理消息
  for (final ConsumerRecord record : records) {
    executors.submit(new Worker(record));
  }
}
..

三、消费者TCP连接

我们之前讲过,生产者是如何管理TCP连接的,同样,我们的消费者也可以管理TCP连接。

1. 何时创建TCP连接

和我们的生产者类型,消费者也需要构建一个 KafkaConsumer 实例,但是消费者在构建KafkaConsumer 实例的时候,是不会创建TCP连接的。

原因在于:生产者在进行创建的时候,会启动一个 Sender 线程,这个线程负责 Socket 连接的创建。

// 会 new 一个 Sender 的类,这个类继承 Runnable 接口
this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
// KafkaThread 继承 Thread,启动 Sender 线程
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();

我们的 KafkaConsumer TCP 连接是在调用 KafkaConsumer.poll() 方法是被创建的

从细粒度来说,在 poll 方法的内部有三个时机可以创建 TCP 连接。

  1. 1.发起 FindCoordinator 请求时
  2. 消费者端有个组件叫做协调者(Coordinator),在我们的 Broker 里面,负责消费组的成员管理和各个消费者位移提交管理。

消费者首次调用 poll 方法时,向 Kafka 集群发送一个名为 FindCoordinator 的请求,希望 Kafka 集群告诉消费者哪一个

  • Borker 是他的协调者。请求会发给当前负载量最小的那台 Broker
  • 这个时候会创建一个 TCP 连接
  1. 2.连接协调者时

当我们的 Broker 处理完 FindCoordinator 请求时,会给我们的消费者返回一个响应结果,告诉我们消费者哪一台 Broker 是真正的协调者。

  • 当消费者知道真正的协调者时
  • ,创建与该 Broker 的 TCP 连接。这个时候,我们的协调者可以做一些组协调操作,比如: 加入组、等待组分配、心跳请求处理、位移获取、位移提交等
  1. 3.消费数据时
  • 消费者会为每个要消费的分区创建与该分区领导者的 TCP 连接。如果当前消费者需要消费 5 个分区的数据,这 5个分区的领导者副本分布在 4 个 Broker 上,那么我们需要创建与这 4 台 Broker 的连接

2. 创建多少个 TCP 连接

  • 第一类连接:消费者向 Kafka 集群发送元数据请求以获取整个集群的信息。然后发送 FindCoordinator 请求获取协调者所在的 Broker。
  • 第二类连接:连接协调者所在的 Broker,消费者进程开启消费者组的各种功能以及后续的消息消费。
  • 第三类连接:消费者去向所有分区的领导者副本所在的 Broker 获取数据,创建每个的 TCP 连接。
  • 这里有一个需要注意的点:当我们第三类TCP连接创建成功后,消费者程序将会抛弃第一类TCP连接。之后定期请求元数据,也是使用的第三类。
  • 第一类 TCP 连接仅仅是为了首次获取元数据而创建的,后面就会被废弃掉。最根本的原因是,消费者在启动时还不知道 Kafka 集群的信息,只能使用一个“假”的 ID 去注册,即使消费者获取了真实的 Broker ID,它依旧无法区分这个“假”ID 对应的是哪台 Broker,因此也就无法重用这个 Socket 连接,只能再重新创建一个新的连接

3. 何时关闭 TCP 连接

消费者关闭 TCP 连接分为主动关闭和自动关闭

主动关闭:

  • 手动调用 KafkaConsumer.close() 方法,或者执行 kill 命令

自动关闭:

  • 消费者端参数 connection.max.idle.ms ,如果在规定的时间该 TCP 连接没有请求的话,则自动关闭。


相关文章
|
11月前
|
存储 监控 Java
【Java并发】【线程池】带你从0-1入门线程池
欢迎来到我的技术博客!我是一名热爱编程的开发者,梦想是编写高端CRUD应用。2025年我正在沉淀中,博客更新速度加快,期待与你一起成长。 线程池是一种复用线程资源的机制,通过预先创建一定数量的线程并管理其生命周期,避免频繁创建/销毁线程带来的性能开销。它解决了线程创建成本高、资源耗尽风险、响应速度慢和任务执行缺乏管理等问题。
553 60
【Java并发】【线程池】带你从0-1入门线程池
|
8月前
|
消息中间件 Linux Kafka
linux命令使用消费kafka的生产者、消费者
linux命令使用消费kafka的生产者、消费者
379 16
|
安全 数据处理 开发者
Python中的多线程编程:从入门到精通
本文将深入探讨Python中的多线程编程,包括其基本原理、应用场景、实现方法以及常见问题和解决方案。通过本文的学习,读者将对Python多线程编程有一个全面的认识,能够在实际项目中灵活运用。
|
10月前
|
消息中间件 Java Kafka
SpringBoot使用Kafka生产者、消费者
SpringBoot使用Kafka生产者、消费者
549 10
|
11月前
|
消息中间件 Kafka
【赵渝强老师】Kafka的消费者与消费者组
Kafka消费者是从Kafka集群中消费数据的客户端。单消费者模型在数据生产速度超过消费速度时会导致数据堆积。为解决此问题,Kafka引入了消费者组的概念,允许多个消费者共同消费同一主题的消息。消费者组由一个或多个消费者组成,它们动态分配和重新分配主题分区,确保消息处理的高效性和可靠性。视频讲解及示意图详细展示了这一机制。
275 1
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
285 1
Java—多线程实现生产消费者
|
算法 NoSQL Java
Springboot3新特性:GraalVM Native Image Support和虚拟线程(从入门到精通)
这篇文章介绍了Spring Boot 3中GraalVM Native Image Support的新特性,提供了将Spring Boot Web项目转换为可执行文件的步骤,并探讨了虚拟线程在Spring Boot中的使用,包括如何配置和启动虚拟线程支持。
1163 9
Springboot3新特性:GraalVM Native Image Support和虚拟线程(从入门到精通)
|
Java 关系型数据库 MySQL
如何用java的虚拟线程连接数据库
本文介绍了如何使用Java虚拟线程连接数据库,包括设置JDK版本、创建虚拟线程的方法和使用虚拟线程连接MySQL数据库的示例代码。
347 6
如何用java的虚拟线程连接数据库
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin
|
安全 Java 调度
Java中的多线程编程入门
【10月更文挑战第29天】在Java的世界中,多线程就像是一场精心编排的交响乐。每个线程都是乐团中的一个乐手,他们各自演奏着自己的部分,却又和谐地共同完成整场演出。本文将带你走进Java多线程的世界,让你从零基础到能够编写基本的多线程程序。
157 1