Kafka学习---4、消费者(分区消费、分区平衡策略、offset、漏消费和重复消费)(一)

简介: Kafka学习---4、消费者(分区消费、分区平衡策略、offset、漏消费和重复消费)(一)

1、消费者

1.1 Kafka消费方式

1、pull(拉)模式:consumer采用从broker中主动拉取数据。

2、push(推)模式:Kafka没有采用这种方式。因为broker决定消息发生速率,很难适应所有消费者的消费速率。例如推送的速度是50M/s,Consumer1、Consumer2就来不及处理消息。

pull模式不足之处是如果Kafka没有数据,消费者可能会陷入循环中,一直返回空数据。

1.2 Kafka消费者工作流程

1.2.1 消费者总体工作流程

768dab87ab11461b97bcc297a12b65df.png

1.2.2 消费者组原理

Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。

消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。

消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。


63f28cf17f0948169c57708b76ae27dc.png

f69fa68cdfe54453a5351d1f242029a6.png

1、消费者组初始化流程

(1)coordinator:辅助实现消费者组的初始化和分区的分配

coordinator节点选择=groupid的hashcode值%50(_consumer_offsets的分区数量)

例如:groupid的hashcode值=1,1%50=1,那么_consumer_offsets主题的1号分区,在哪个broker上,就选择这个节点的coordinator作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset。

6b85ca4f8a9d404b9dd7830b7aca2cff.png


2、消费者组详细消费流程


41f1423b397742a89442e9ee3ab8f017.png

1.2.3 消费者重要参数

333333333333.jpg

1.3 消费者API

1.3.1 独立消费者案例(订阅主题)

1、需求

创建一个独立的消费者,消费first主题中的数据


aea2f117c5d04d40ad61791b1f1b3fe0.png

注意:在消费者API代码中必须配置消费者组id。命令行启动消费者不填写消费者组id会被自动填写随机的消费者组id。

2、实现步骤

(1)创建包名:com.zhm.consumer

(2)编写代码

package org.zhm.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
/**
 * @ClassName CustomConsumer
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/14 9:08
 * @Version 1.0
 */
public class CustomConsumer {
    public static void main(String[] args) {
        //1、创建消费者的配置对象
        Properties properties=new Properties();
        //2、给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        //配置反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //配置消费者组(组内任意起名) 必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test2");
//        //修改分区分配策略
//        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
        //测试:修改分区分配策略
        ArrayList<String> startegys=new ArrayList<>();
        startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,startegys);
        //3、创建消费者对象
        KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties);
        //注册要消费的主题(可以消费多个主题)
        ArrayList<String> topics=new ArrayList<>();
        topics.add("first");
        kafkaConsumer.subscribe(topics);
        //拉取数据打印
        while (true){
            //设置1s中消费一批数据
            ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));
            //打印消费到的数据
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }
    }
}

3、测试

(1)在IDEA中执行消费者程序


ea4fa9d93f9a49d78434aff09cc2f6b7.png

(2)在 IDEA 中执行生产者程序 CustomProducerCallback()在控制台观察生成的数据。

844fa366a1bb444e88068c82e6b742b5.png

(3)在IDEA控制台观察收到的数据

be1c289dcdb945cbb0163abbc57d27e2.png

相关文章
|
2天前
|
消息中间件 监控 大数据
优化Apache Kafka性能:最佳实践与调优策略
【10月更文挑战第24天】作为一名已经对Apache Kafka有所了解并有实际使用经验的开发者,我深知在大数据处理和实时数据流传输中,Kafka的重要性不言而喻。然而,在面对日益增长的数据量和业务需求时,如何保证系统的高性能和稳定性成为了摆在我们面前的一个挑战。本文将从我的个人视角出发,分享一些关于如何通过合理的配置和调优来提高Kafka性能的经验和建议。
20 4
|
22天前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
31 0
|
22天前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
63 0
|
2天前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
17 2
|
11天前
|
消息中间件 监控 负载均衡
在Kafka中,如何进行主题的分区和复制?
在Kafka中,如何进行主题的分区和复制?
|
22天前
|
消息中间件 JSON 大数据
大数据-65 Kafka 高级特性 分区 Broker自动再平衡 ISR 副本 宕机恢复再重平衡 实测
大数据-65 Kafka 高级特性 分区 Broker自动再平衡 ISR 副本 宕机恢复再重平衡 实测
53 4
|
22天前
|
消息中间件 分布式计算 算法
大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器
大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器
40 3
|
22天前
|
消息中间件 大数据 Kafka
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)
23 2
|
22天前
|
消息中间件 JSON 大数据
大数据-66 Kafka 高级特性 分区Partition 副本因子Replication Factor replicas动态修改 线上动态修改副本数
大数据-66 Kafka 高级特性 分区Partition 副本因子Replication Factor replicas动态修改 线上动态修改副本数
27 1
|
22天前
|
消息中间件 NoSQL 大数据
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
29 1