kafka Consumer分区数与多线程消费topic

简介:

单线程消费数据适合在本地跑。

参考文档: http://kafka.apache.org/documentation.html

wKiom1lLetjiYUkrAAFXPwoSGH8093.png-wh_50

对于一个topic,可以发送给若干个partitions. partition在创建topic的时候就指定分区的数目。

分区、Offset、消费线程、group.id的关系

1)一组(类)消息通常由某个topic来归类,我们可以把这组消息“分发”给若干个分区(partition),每个分区的消息各不相同;

2)每个分区都维护着他自己的偏移量(Offset),记录着该分区的消息此时被消费的位置,由consumer自己上报到zk;

3)一个消费线程可以对应若干个分区,但一个分区只能被具体某一个消费线程消费;

4)group.id用于标记某一个消费组,每一个消费组都会被记录他在某一个分区的Offset,即不同consumer group针对同一个分区,都有“各自”的偏移量。

必须要注意的一点是,必须确认kafka的server.properties里面的一个属性num.partitions必须被设置成大于1的值,否则消费端再怎么折腾,也用不了多线程哦。测试环境设置为5个partition. 因为只有一个分区是不存在多线程同时消费的情况。

看一下消费者的代码便于理解

 //通过实现runnable接口实现多线程

 public class KafkaConsumer implements Runnable {
    String ;
    KafkaStream<[], []> ;

    KafkaConsumer(String title, KafkaStream<[], []> stream) {
        .= title;
        .= stream;
    }

    run() {
        System..println(+ );
        ConsumerIterator<[], []> it = .iterator();
(it.hasNext()) {
            MessageAndMetadata<[], []> data = it.next();
            String topic = data.topic(); partition = data.partition(); offset = data.offset(); String msg = String(data.message()); System..println(String.(
                    ,
                    , topic, partition, offset, msg));

  }
        System..println(String.(, ));
    }

    main(String[] args) {

        Properties props = Properties();
        props.put(, );
        props.put(, );
        props.put(, );props.put(, );

        ConsumerConfig config = ConsumerConfig(props);
        String topic1 = ;
      Map<String, Integer> topicCountMap = HashMap<String, Integer>();
        topicCountMap.put(topic1, );
        ConsumerConnector consumerConn = Consumer.(config);


        Map<String, List<KafkaStream<[], []>>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap);

        List<KafkaStream<[], []>> streams = topicStreamsMap.get(topic1);
        ExecutorService executor = Executors.();
        (i = ; i < streams.size(); i++)
            executor.execute(KafkaConsumer(+ (i + ), streams.get(i)));
      }

}

// producer 端的代码
public class KafkaConsumer implements Runnable {

    public String title;
    public KafkaStream<byte[], byte[]> stream;

    public KafkaConsumer(String title, KafkaStream<byte[], byte[]> stream) {
        this.title = title;
        this.stream = stream;
    }

    @Override
    public void run() {
        System.out.println("开始运行 " + title);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
/**
 * 不停地从stream读取新到来的消息,在等待新的消息时,hasNext()会阻塞
 * 如果调用 `ConsumerConnector#shutdown`,那么`hasNext`会返回false
 * */
        while (it.hasNext()) {
            MessageAndMetadata<byte[], byte[]> data = it.next();
            String topic = data.topic(); //主题
            int partition = data.partition(); //分区
            long offset = data.offset(); //偏移量
            String msg = new String(data.message()); //数据

//            System.out.println("Consumer:["+title+"], Topic: ["+);

            System.out.println(String.format(
            "Consumer: [%s],  Topic: [%s],  PartitionId: [%d], Offset: [%d], msg: [%s]",
                    title, topic, partition, offset, msg));

            //String keyWord=null;
//            try {
//                keyWord = URLDecoder.decode(msg, "UTF-8");
//                System.out.println("keyWord:"+keyWord);
//            } catch (UnsupportedEncodingException e) {
//                e.printStackTrace();
//            }
//            System.out.println(String.format(
//                    "Consumer: [%s],  Topic: [%s],  PartitionId: [%d], Offset: [%d], keyWord: [%s]",
//                    title, topic, partition, offset, keyWord));
        }
        System.out.println(String.format("Consumer: [%s] exiting ...", title));
    }

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("group.id", "test");
        props.put("zookeeper.connect", "xxx:2181,xxx:2181");
        props.put("auto.offset.reset", "smallest");//从最后开始消费数据
        props.put("auto.commit.interval.ms", "1000");

        //创建一个消费者配置文件
        ConsumerConfig config = new ConsumerConfig(props);
        String topic1 = "test_kafka";
       //String topic2 = "newTopic01";
        //定义一个map集合,里面封装了需要消费的主题的分区数
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic1, 4);
        // topicCountMap.put(topic2, 5);


        //只要ConsumerConnector还在的话,consumer会一直等待新消息,不会自己退出
        ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(config);


        //Map<String, List<KafkaStream<byte[], byte[]>> 中String是topic, List<KafkaStream<byte[], byte[]>是对应的流
        Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap);

        //取出 `test_kafka` 对应的 streams
        List<KafkaStream<byte[], byte[]>> streams = topicStreamsMap.get(topic1);
        //创建一个容量为4的线程池
        ExecutorService executor = Executors.newFixedThreadPool(4);
        //创建20个consumer threads

        for (int i = 0; i < streams.size(); i++)


            executor.execute(new KafkaConsumer("消费者" + (i + 1), streams.get(i)));       }

//运行结果: 证明了一个分区只能被一个线程消费,但一个消费线程可以消费多个分区的数据!虽然我指定了线程池的线程数为5,但并不是所有的线程都去消费了,这当然跟线程池的调度有关系了。并不是一个消费线程对应地去消费一个分区的数据。线程由zookeeper来声明它拥有1个或多个分区;真正有数据存在的分区是由生产发送端来决定,即使你的kafka设置了10个分区,消费端在消费的时候,消费线程虽然会根据zookeeper的某种机制来声明它所消费的分区,但实际消费过程中,还是会消费真正存在数据的分区;

建议设置:实际发送分区数(一般就等于设置的分区数)= topicCountMap的value = 线程池大小  否则极易出现reblance的异常;

本文转自   ChinaUnicom110   51CTO博客,原文链接:http://blog.51cto.com/xingyue2011/1941009



相关文章
|
2天前
|
消息中间件 存储 负载均衡
【Kafka】Kafka 分区
【4月更文挑战第5天】【Kafka】Kafka 分区
|
1天前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用合集之支持sink到多分区的kafka ,还能保持有序吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
16 0
|
2天前
|
消息中间件 存储 网络协议
Kafka 线程模型痛点攻克: 提升分区写入 2 倍性能
Apache Kafka的单分区写入性能在某些严格保序场景中至关重要,但其现有线程模型限制了性能发挥。本文分析了Kafka的串行处理模型,包括SocketServer、KafkaChannel、RequestChannel等组件,指出其通过KafkaChannel状态机确保请求顺序处理,导致处理效率低下。AutoMQ提出流水线处理模型,简化KafkaChannel状态机,实现网络解析、校验定序和持久化的阶段间并行化,提高处理效率。测试结果显示,AutoMQ的极限吞吐是Kafka的2倍,P99延迟降低至11ms。
20 3
Kafka 线程模型痛点攻克: 提升分区写入 2 倍性能
|
2天前
|
消息中间件 Cloud Native Kafka
一文搞懂 Kafka consumer 与 broker 交互机制与原理
AutoMQ致力于打造下一代云原生Kafka系统,解决Kafka痛点。本文深入解析Kafka Consumer与Broker的交互机制,涉及消费者角色、核心组件及常用接口。消费者以group形式工作,包括leader和follower。交互流程涵盖FindCoordinator、JoinGroup、SyncGroup、拉取消息和退出过程。文章还探讨了broker的consumer group状态管理和rebalance原理。AutoMQ团队分享Kafka技术,感兴趣的话可以关注他们。
35 2
一文搞懂 Kafka consumer 与 broker 交互机制与原理
|
2天前
|
消息中间件 负载均衡 监控
【Kafka】Kafka 创建Topic后如何将分区放置到不同的 Broker 中?
【4月更文挑战第13天】【Kafka】Kafka 创建Topic后如何将分区放置到不同的 Broker 中?
|
2天前
|
消息中间件 监控 Kafka
【Kafka】分区副本中的 Leader 如果宕机但 ISR 却为空该如何处理
【4月更文挑战第12天】【Kafka】分区副本中的 Leader 如果宕机但 ISR 却为空该如何处理
|
2天前
|
消息中间件 运维 监控
【Kafka】分区副本什么情况下会从 ISR 中剔出
【4月更文挑战第12天】【Kafka】分区副本什么情况下会从 ISR 中剔出
|
2天前
|
消息中间件 存储 负载均衡
深度解析Kafka分区策略的精妙之处
深度解析Kafka分区策略的精妙之处
32 1
|
2天前
|
消息中间件 存储 负载均衡
【Kafka】Kafka 的分区分配策略分析
【4月更文挑战第7天】【Kafka】Kafka 的分区分配策略分析
|
2天前
|
消息中间件 监控 Kafka
【Kafka】Kafka 分区Leader选举策略
【4月更文挑战第7天】【Kafka】Kafka 分区Leader选举策略