Kafka学习---4、消费者(分区消费、分区平衡策略、offset、漏消费和重复消费)(一)

简介: Kafka学习---4、消费者(分区消费、分区平衡策略、offset、漏消费和重复消费)(一)

1、消费者

1.1 Kafka消费方式

1、pull(拉)模式:consumer采用从broker中主动拉取数据。

2、push(推)模式:Kafka没有采用这种方式。因为broker决定消息发生速率,很难适应所有消费者的消费速率。例如推送的速度是50M/s,Consumer1、Consumer2就来不及处理消息。

pull模式不足之处是如果Kafka没有数据,消费者可能会陷入循环中,一直返回空数据。

1.2 Kafka消费者工作流程

1.2.1 消费者总体工作流程

768dab87ab11461b97bcc297a12b65df.png

1.2.2 消费者组原理

Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。

消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。

消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。


63f28cf17f0948169c57708b76ae27dc.png

f69fa68cdfe54453a5351d1f242029a6.png

1、消费者组初始化流程

(1)coordinator:辅助实现消费者组的初始化和分区的分配

coordinator节点选择=groupid的hashcode值%50(_consumer_offsets的分区数量)

例如:groupid的hashcode值=1,1%50=1,那么_consumer_offsets主题的1号分区,在哪个broker上,就选择这个节点的coordinator作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset。

6b85ca4f8a9d404b9dd7830b7aca2cff.png


2、消费者组详细消费流程


41f1423b397742a89442e9ee3ab8f017.png

1.2.3 消费者重要参数

333333333333.jpg

1.3 消费者API

1.3.1 独立消费者案例(订阅主题)

1、需求

创建一个独立的消费者,消费first主题中的数据


aea2f117c5d04d40ad61791b1f1b3fe0.png

注意:在消费者API代码中必须配置消费者组id。命令行启动消费者不填写消费者组id会被自动填写随机的消费者组id。

2、实现步骤

(1)创建包名:com.zhm.consumer

(2)编写代码

package org.zhm.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
/**
 * @ClassName CustomConsumer
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/14 9:08
 * @Version 1.0
 */
public class CustomConsumer {
    public static void main(String[] args) {
        //1、创建消费者的配置对象
        Properties properties=new Properties();
        //2、给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        //配置反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //配置消费者组(组内任意起名) 必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test2");
//        //修改分区分配策略
//        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
        //测试:修改分区分配策略
        ArrayList<String> startegys=new ArrayList<>();
        startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,startegys);
        //3、创建消费者对象
        KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties);
        //注册要消费的主题(可以消费多个主题)
        ArrayList<String> topics=new ArrayList<>();
        topics.add("first");
        kafkaConsumer.subscribe(topics);
        //拉取数据打印
        while (true){
            //设置1s中消费一批数据
            ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));
            //打印消费到的数据
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }
    }
}

3、测试

(1)在IDEA中执行消费者程序


ea4fa9d93f9a49d78434aff09cc2f6b7.png

(2)在 IDEA 中执行生产者程序 CustomProducerCallback()在控制台观察生成的数据。

844fa366a1bb444e88068c82e6b742b5.png

(3)在IDEA控制台观察收到的数据

be1c289dcdb945cbb0163abbc57d27e2.png

相关文章
|
22天前
|
消息中间件 分布式计算 Kafka
Kafka(四)【Kafka 消费者】(4)
Kafka(四)【Kafka 消费者】
|
17天前
|
消息中间件 Kafka
Kafka生产者和消费者相关命令行操作
Kafka生产者和消费者相关命令行操作
18 1
|
2天前
|
消息中间件 监控 安全
Kafka客户端工具:Offset Explorer 使用指南
Kafka客户端工具:Offset Explorer 使用指南
4 0
|
26天前
|
消息中间件 Java Kafka
springboot整合kafka消费者最佳实践
springboot整合kafka消费者最佳实践
61 1
|
26天前
|
消息中间件 负载均衡 监控
Kafka消费者:监听模式VS主动拉取,哪种更适合你?
Kafka消费者:监听模式VS主动拉取,哪种更适合你?
17 1
|
9天前
|
消息中间件 监控 Java
好文!12个策略解决 Kafka 数据丢失问题
以上这些策略对于解决 kafka 数据丢失问题很有帮助,如果你正在使用 kafka,或者正在学习 kafka,V 哥觉得你都应该把这12种策略收藏起来并消化掉,这对你在大型项目应用中非常有用。欢迎关注威哥爱编程,一起向技术大神进发。
|
16天前
|
消息中间件 缓存 监控
Kafka性能优化策略综述:提升吞吐量与可靠性
Kafka性能优化策略综述:提升吞吐量与可靠性
24 0
|
22天前
|
消息中间件 存储 算法
Kafka(四)【Kafka 消费者】(3)
Kafka(四)【Kafka 消费者】
|
22天前
|
消息中间件 Kafka API
Kafka(四)【Kafka 消费者】(2)
Kafka(四)【Kafka 消费者】
|
22天前
|
消息中间件 存储 Java
Kafka(四)【Kafka 消费者】(1)
Kafka(四)【Kafka 消费者】