Kafka 多线程消费问题

简介: 消息队列Kafka版是阿里云提供的分布式、高吞吐、可扩展的消息队列服务。消息队列Kafka版广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域,已成为大数据生态中不可或缺的部分。为了提升消费端的消费速率,可以通过多线程的方式来加快消费进度,下面介绍一种多线程的消费方法。

Step By Step

Kafka实例创建,这里使用阿里云Kafka消息队列。为了方便本地测试,创建公网 + VPC实例,参考链接
公网接入

消费端程序

1、参数配置

参考:Kafka消息发送的三种模式 连接参数配置部分。

2、消费端:Code Sample

import com.base.JavaKafkaConfigurer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerDemo {

    //加载kafka.properties
    public static Properties kafkaProperties =  JavaKafkaConfigurer.getKafkaProperties();

    public static Properties initConfig()
    {
        Properties props = new Properties();
        //消息的反序列化方式
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        //设置接入点,即控制台的实例详情页显示的“默认接入点”
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));

        //两次poll之间的最大允许间隔
        //请不要改得太大,服务器会掐掉空闲连接,不要超过30000
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 25000);

        //每次poll的最大数量
        //注意该值不要改得太大,如果poll太多数据,而不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);

        //当前消费实例所属的消费组,请在控制台申请之后填写
        //属于同一个组的消费实例,会负载消费消息
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));

        return props;
    }


    public static void main(String[] args) {

        Properties properties = initConfig();
        int consumerThreadNum = 6; // 可以设置和Topic的分区数据一致,这样一个分区就可以分配一个线程来消费消息。

        String topic  = kafkaProperties.getProperty("topic");
        for (int i = 0; i < consumerThreadNum; i++) {
            new KafkaConsumerThread(properties, topic).start();
        }
    }

    public static class KafkaConsumerThread extends Thread{
        private KafkaConsumer<String, String> kafkaConsumer;

        public KafkaConsumerThread(Properties props, String topic)
        {
            this.kafkaConsumer = new KafkaConsumer<String, String>(props);
            this.kafkaConsumer.subscribe(Arrays.asList(topic));
        }

        public void run()
        {
            try {
                while (true)
                {
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                    for (ConsumerRecord<String, String> record: records) {
                        System.out.println("Thread num: " + this.getName());
                        System.out.println(String.format("Consume partition:%d offset:%d the message body:%s", record.partition(), record.offset(),record.value()));
                    }
                }
            }
            catch (Exception e)
            {
                e.printStackTrace();
            }finally {
                kafkaConsumer.close();
            }
        }
    }
}

3、消费端情况
图片.png

4、服务端控制台查看
图片.png

参考链接

Kafka消息发送的三种模式
订阅者最佳实践

相关文章
|
3月前
|
消息中间件 安全 大数据
Kafka多线程Consumer是实现高并发数据处理的有效手段之一
【9月更文挑战第2天】Kafka多线程Consumer是实现高并发数据处理的有效手段之一
346 4
|
4月前
|
消息中间件 Java 大数据
"深入理解Kafka单线程Consumer:核心参数配置、Java实现与实战指南"
【8月更文挑战第10天】在大数据领域,Apache Kafka以高吞吐和可扩展性成为主流数据流处理平台。Kafka的单线程Consumer因其实现简单且易于管理而在多种场景中受到欢迎。本文解析单线程Consumer的工作机制,强调其在错误处理和状态管理方面的优势,并通过详细参数说明及示例代码展示如何有效地使用KafkaConsumer类。了解这些内容将帮助开发者优化实时数据处理系统的性能与可靠性。
110 7
|
4月前
|
消息中间件 安全 Kafka
"深入实践Kafka多线程Consumer:案例分析、实现方式、优缺点及高效数据处理策略"
【8月更文挑战第10天】Apache Kafka是一款高性能的分布式流处理平台,以高吞吐量和可扩展性著称。为提升数据处理效率,常采用多线程消费Kafka数据。本文通过电商订单系统的案例,探讨了多线程Consumer的实现方法及其利弊,并提供示例代码。案例展示了如何通过并行处理加快订单数据的处理速度,确保数据正确性和顺序性的同时最大化资源利用。多线程Consumer有两种主要模式:每线程一个实例和单实例多worker线程。前者简单易行但资源消耗较大;后者虽能解耦消息获取与处理,却增加了系统复杂度。通过合理设计,多线程Consumer能够有效支持高并发数据处理需求。
199 4
|
7月前
|
消息中间件 存储 网络协议
Kafka 线程模型痛点攻克: 提升分区写入 2 倍性能
Apache Kafka的单分区写入性能在某些严格保序场景中至关重要,但其现有线程模型限制了性能发挥。本文分析了Kafka的串行处理模型,包括SocketServer、KafkaChannel、RequestChannel等组件,指出其通过KafkaChannel状态机确保请求顺序处理,导致处理效率低下。AutoMQ提出流水线处理模型,简化KafkaChannel状态机,实现网络解析、校验定序和持久化的阶段间并行化,提高处理效率。测试结果显示,AutoMQ的极限吞吐是Kafka的2倍,P99延迟降低至11ms。
143 3
Kafka 线程模型痛点攻克: 提升分区写入 2 倍性能
|
消息中间件 网络协议 安全
【Kafka从入门到成神系列 八】Kafka 多线程消费者及TCP连接
【Kafka从入门到成神系列 八】Kafka 多线程消费者及TCP连接
【Kafka从入门到成神系列 八】Kafka 多线程消费者及TCP连接
|
消息中间件 存储 缓存
十分钟,了解Kafka的Sender线程
十分钟,了解Kafka的Sender线程
125 0
|
消息中间件 JSON NoSQL
记一次Flink 消费Kafka数据积压排查解决
记一次Flink 消费Kafka数据积压排查解决
记一次Flink 消费Kafka数据积压排查解决
|
消息中间件 NoSQL Kafka
【Flink-FlinkUtils】高级自定义封装工具类实现消费kafka数据保存数据到Redis
【Flink-FlinkUtils】高级自定义封装工具类实现消费kafka数据保存数据到Redis
363 0
【Flink-FlinkUtils】高级自定义封装工具类实现消费kafka数据保存数据到Redis
|
消息中间件 弹性计算 Java
【采坑-Flink消费kafka中的数据】阿里云ECS/VMware之zookeeper和kafka单机/集群环境
【采坑-Flink消费kafka中的数据】阿里云ECS/VMware之zookeeper和kafka单机/集群环境
300 0
【采坑-Flink消费kafka中的数据】阿里云ECS/VMware之zookeeper和kafka单机/集群环境
|
消息中间件 存储 运维
Kafka消费组/者协调器的介绍
什么是协调器 协调器是用于协调多个消费者之间能够正确的工作的一个角色, 比如计算消费的分区分配策略,又或者消费者的加入组与离开组的处理逻辑, 有一点类似Kafka种的控制器的角色。
Kafka消费组/者协调器的介绍

热门文章

最新文章