Kafka 多线程消费问题-阿里云开发者社区

开发者社区> 云服务技术课堂> 正文

Kafka 多线程消费问题

简介: 消息队列Kafka版是阿里云提供的分布式、高吞吐、可扩展的消息队列服务。消息队列Kafka版广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域,已成为大数据生态中不可或缺的部分。为了提升消费端的消费速率,可以通过多线程的方式来加快消费进度,下面介绍一种多线程的消费方法。

Step By Step

Kafka实例创建,这里使用阿里云Kafka消息队列。为了方便本地测试,创建公网 + VPC实例,参考链接
公网接入

消费端程序

1、参数配置

参考:Kafka消息发送的三种模式 连接参数配置部分。

2、消费端:Code Sample

import com.base.JavaKafkaConfigurer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerDemo {

    //加载kafka.properties
    public static Properties kafkaProperties =  JavaKafkaConfigurer.getKafkaProperties();

    public static Properties initConfig()
    {
        Properties props = new Properties();
        //消息的反序列化方式
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        //设置接入点,即控制台的实例详情页显示的“默认接入点”
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));

        //两次poll之间的最大允许间隔
        //请不要改得太大,服务器会掐掉空闲连接,不要超过30000
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 25000);

        //每次poll的最大数量
        //注意该值不要改得太大,如果poll太多数据,而不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);

        //当前消费实例所属的消费组,请在控制台申请之后填写
        //属于同一个组的消费实例,会负载消费消息
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));

        return props;
    }


    public static void main(String[] args) {

        Properties properties = initConfig();
        int consumerThreadNum = 6; // 可以设置和Topic的分区数据一致,这样一个分区就可以分配一个线程来消费消息。

        String topic  = kafkaProperties.getProperty("topic");
        for (int i = 0; i < consumerThreadNum; i++) {
            new KafkaConsumerThread(properties, topic).start();
        }
    }

    public static class KafkaConsumerThread extends Thread{
        private KafkaConsumer<String, String> kafkaConsumer;

        public KafkaConsumerThread(Properties props, String topic)
        {
            this.kafkaConsumer = new KafkaConsumer<String, String>(props);
            this.kafkaConsumer.subscribe(Arrays.asList(topic));
        }

        public void run()
        {
            try {
                while (true)
                {
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                    for (ConsumerRecord<String, String> record: records) {
                        System.out.println("Thread num: " + this.getName());
                        System.out.println(String.format("Consume partition:%d offset:%d the message body:%s", record.partition(), record.offset(),record.value()));
                    }
                }
            }
            catch (Exception e)
            {
                e.printStackTrace();
            }finally {
                kafkaConsumer.close();
            }
        }
    }
}

3、消费端情况
图片.png

4、服务端控制台查看
图片.png

参考链接

Kafka消息发送的三种模式
订阅者最佳实践

版权声明:本文中所有内容均属于阿里云开发者社区所有,任何媒体、网站或个人未经阿里云开发者社区协议授权不得转载、链接、转贴或以其他方式复制发布/发表。申请授权请邮件developerteam@list.alibaba-inc.com,已获得阿里云开发者社区协议授权的媒体、网站,在转载使用时必须注明"稿件来源:阿里云开发者社区,原文作者姓名",违者本社区将依法追究责任。 如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件至:developer2020@service.aliyun.com 进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:

云服务技术课堂,各类技术课程、最佳实践输出,来好好听课吧!

官方博客