1、消费者
1.1 Kafka消费方式
1、pull(拉)模式:consumer采用从broker中主动拉取数据。
2、push(推)模式:Kafka没有采用这种方式。因为broker决定消息发生速率,很难适应所有消费者的消费速率。例如推送的速度是50M/s,Consumer1、Consumer2就来不及处理消息。
pull模式不足之处是如果Kafka没有数据,消费者可能会陷入循环中,一直返回空数据。
1.2 Kafka消费者工作流程
1.2.1 消费者总体工作流程
1.2.2 消费者组原理
Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。
消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。
消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
1、消费者组初始化流程
(1)coordinator:辅助实现消费者组的初始化和分区的分配
coordinator节点选择=groupid的hashcode值%50(_consumer_offsets的分区数量)
例如:groupid的hashcode值=1,1%50=1,那么_consumer_offsets主题的1号分区,在哪个broker上,就选择这个节点的coordinator作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset。
2、消费者组详细消费流程
1.2.3 消费者重要参数
1.3 消费者API
1.3.1 独立消费者案例(订阅主题)
1、需求
创建一个独立的消费者,消费first主题中的数据
注意:在消费者API代码中必须配置消费者组id。命令行启动消费者不填写消费者组id会被自动填写随机的消费者组id。
2、实现步骤
(1)创建包名:com.zhm.consumer
(2)编写代码
package org.zhm.consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.ArrayList; import java.util.Properties; /** * @ClassName CustomConsumer * @Description TODO * @Author Zouhuiming * @Date 2023/6/14 9:08 * @Version 1.0 */ public class CustomConsumer { public static void main(String[] args) { //1、创建消费者的配置对象 Properties properties=new Properties(); //2、给消费者配置对象添加参数 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092"); //配置反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); //配置消费者组(组内任意起名) 必须 properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test2"); // //修改分区分配策略 // properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor"); //测试:修改分区分配策略 ArrayList<String> startegys=new ArrayList<>(); startegys.add("org.apache.kafka.clients.consumer.StickyAssignor"); properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,startegys); //3、创建消费者对象 KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties); //注册要消费的主题(可以消费多个主题) ArrayList<String> topics=new ArrayList<>(); topics.add("first"); kafkaConsumer.subscribe(topics); //拉取数据打印 while (true){ //设置1s中消费一批数据 ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1)); //打印消费到的数据 for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } } }
3、测试
(1)在IDEA中执行消费者程序
(2)在 IDEA 中执行生产者程序 CustomProducerCallback()在控制台观察生成的数据。
(3)在IDEA控制台观察收到的数据