Kafka核心之Consumer

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 本文介绍Kafka的核心之Consumer。了解了什么是kafka( https://www.cnblogs.com/tree1123/p/11226880.html)以后学习Kafka核心之消费者,kafka的消费者经过几次版本变化,特别容易混乱,所以一定要搞清楚是哪个版本再研究。

一、旧版本consumer



只有旧版本(0.9以前)才有 high-level consumer 和 low-level consumer之分,很多的文章提到的就是这两个:低阶消费者和高阶消费者,低阶消费者更灵活但是需要自己维护很多东西,高阶就死板一点但是不需要维护太多东西。

high-level consumer就是消费者组。

low-level consumer是单独一个消费者,单个consumer没有什么消费者组的概念,与其他consumer相互之间不关联。


1、low-level consumer


low-level consumer底层实现是

SimpleConsumer 他可以自行管理消费者

Storm的Kafka插件 storm-kafka就是使用了SimpleConsumer

优点是灵活 , 可以从任意位置拿消息 。

如果需要:重复读取数据 只消费部分分区数据 精确消费 就得用这个,

不过必须自己处理位移提交 寻找分区leader broker 处理leader变更。

接口中的方法:

fetch
send  发送请求
getOffsetBefore
commitOffsets
fetchOffsets
earliestOrlatestOffset
close

使用步骤:

参照官网,比较复杂需要好几步来拉取消息。

Find an active Broker and find out which Broker is the leader for your topic and partition

找到活跃的broker 找到哪个broker是你的topic和partition的leader

Determine who the replica Brokers are for your topic and partition

查出replica 的brokers

Build the request defining what data you are interested in

建立请求

Fetch the data

拿数据

Identify and recover from leader changes

leader变化时恢复

也可以查询一些offset等metadata信息,具体代码如下。

//根据指定的分区从主题元数据中找到主副本
SimpleConsumer consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024,
                        "leaderLookup");
List<String> topics = Collections.singletonList(a_topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<TopicMetadata> metaData = resp.topicsMetadata();
String  leader = metaData.leader().host();
//获取分区的offset等信息
//比如获取lastoffset
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
long[] offsets = response.offsets(topic, partition);
long lastoffset = offsets[0];

这个api现在应用不多,除非你有特殊需求,比如要自己写监控,你可能需要更多的元数据信息。


2、high-level consumer


主要使用的类:ConsumerConnector

屏蔽了每个topic的每个Partition的offset的管理(自动读取zookeeper中该Consumer group的last offset)

Broker失败转移,增减Partition Consumer时的负载均衡(当Partiotion和Consumer增减时,Kafka自动负载均衡)

这些功能low-level consumer都需要自己实现的。

主要方法如下:

createMessageStreams
createMessageStreamsByFilter
commitOffsets
setconsumerReblanceListener
shutdown


group通过zookeeper完成核心功能,

zookeeper目录结构如下:

/consumers/groupId/ids/consumre.id

记录该consumer的订阅信息,还被用来监听consumer存活状态。这是一个临时节点,会话失效将会自动删除。

/consumers/groupId/owners/topic/partition

保存consumer各个消费线程的id,执行rebalance时保存。

/consumers/groupId/offsets/topic/partition

保存该group消费指定分区的位移信息。

这个consumer支持多线程设计,只创建一个consumer实例,但如果是多个分区,将会自动创建多个线程消费。

使用步骤:

Properties properties = new Properties();
   properties.put("zookeeper.connect", "ip1:2181,ip2:2181,ip3:2181");//声明zk
   properties.put("group.id", "group03");
   ConsumerConnector  consumer =  Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
   Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
   topicCountMap.put(topic, 1); // 一次从主题中获取一个数据
   Map<String, List<KafkaStream<byte[], byte[]>>>  messageStreams = consumer.createMessageStreams(topicCountMap);
   KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 获取每次接收到的这个数据  如果是多线程在这里处理多分区的情况
   ConsumerIterator<byte[], byte[]> iterator =  stream.iterator();
   while(iterator.hasNext()){
        String message = new String(iterator.next().message());
        System.out.println("接收到: " + message);
   }
//auto.offset.reset 默认值为largest
//从头消费 properties.put("auto.offset.reset", "smallest");

很简单,我们0.9版本之前使用的很多都是他,集成spring的方法等等。不过0.9版本以后新的consumer出现了。


二、新版本consumer



先说一下版本的问题:

Kafka 0.10.0.0之后 增加了 Kafka Streams 所以Kafka1.0开始Streams 就稳定了。

kafka security 0.9.0.0以后 0.10.0.1之后稳定

0.10.1.0之后 新版本consumer稳定

storm有两个连kafka的包:

storm-kafka 使用了旧版本的consumer

storm-kafka-client 使用了新版本consumer

kafka 0.9.0.0废弃了旧版producer和consumer 旧版时scala版 新版用java开发


版本 推荐producer 推荐consumer 原因
0.8.2.2 旧版 旧版 新producer尚不稳定
0.9.0.x 新版 旧版 新producer稳定
0.10.0.x 新版 旧版 新consumer不稳定
0.10.1.0 新版 新版 新consumer稳定
0.10.2.x 新版 新版 都稳定了


旧版本中offset管理依托zookeeper,新版本中不在依靠zookeeper。



语言 包名 主要使用类
旧版本 scala kafka.consumer.* ZookeeperConsumerConnector SimpleConsumer
新版本 java org.apache.kafka.clients.consumer.* KafkaConsumer


新版本的几个核心概念:


consumer group

消费者使用一个消费者组名(group.id)来标记自己,topic的每条消息都只会发送到每个订阅他的消费者组的一个消费者实例上。

1、一个消费者组有若干个消费者。

2、对于同一个group,topic的每条消息只能被发送到group下的一个consumer实例上。

3、topic消息可以被发送到多个group中。

consumer端offset

记录每一个consumer消费的分区的位置

kafka没有把这个放在服务器端,保存在了consumer group中,并定期持久化。

旧版本会把这个offset定期存在zookeeper中:路径是 /consumers/groupid/offsets/topic/partitionid

新版本将offset放在了一个内部topic中:__consumer_offsets(前面两个下划线) 里面有50个分区

所以新版本的consumer就不需要连zookeeper了。

旧版本设置offsets.storage=kafka设置位移提交到这,不常使用。

__consumer_offsets中的结构:key = group.id+topic+partition value=offset

consumer group reblance

单个consumer是没有rebalance的。

他规定了一个consumer group下的所有consumer如何去分配所有的分区。


单线程示例代码:
Properties props = new Properties();
        props.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset","earliest");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("foo", "bar"));
      try{
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
         }
        }finally{
          consumer.close();
        }

很简单,1、只需要配置kafka的server groupid autocommit 序列化 autooffsetreset(其中 bootstrap.server group.id key.deserializer value.deserializer 必须指定);

2、用这些Properties构建consumer对象(KafkaConsumer还有其他构造,可以把序列化传进去);

3、subscribe订阅topic列表(可以用正则订阅Pattern.compile("kafka.*")

使用正则必须指定一个listener subscribe(Pattern pattern, ConsumerRebalanceListener listener)); 可以重写这个接口来实现 分区变更时的逻辑。如果设置了enable.auto.commit = true 就不用理会这个逻辑。

4、然后循环poll消息(这里的1000是超时设定,如果没有很多数据,也就等一秒);

5、处理消息(打印了offset key value 这里写处理逻辑)。

6、关闭KafkaConsumer(可以传一个timeout值 等待秒数 默认是30)。


Properties详解:


bootstrap.server(最好用主机名不用ip kafka内部用的主机名 除非自己配置了ip)

deserializer 反序列化consumer从broker端获取的是字节数组,还原回对象类型。

默认有十几种:StringDeserializer LongDeserializer DoubleDeserializer。。

也可以自定义:定义serializer格式 创建自定义deserializer类实现Deserializer 接口 重写逻辑


除了四个必传的 bootstrap.server group.id key.deserializer value.deserializer

还有session.timeout.ms "coordinator检测失败的时间"

是检测consumer挂掉的时间 为了可以及时的rebalance 默认是10秒 可以设置更小的值避免消息延迟。

max.poll.interval.ms "consumer处理逻辑最大时间"

处理逻辑比较复杂的时候 可以设置这个值 避免造成不必要的 rebalance ,因为两次poll时间超过了这个参数,kafka认为这个consumer已经跟不上了,会踢出组,而且不能提交offset,就会重复消费。默认是5分钟。

auto.offset.reset "无位移或者位移越界时kafka的应对策略"

所以如果启动了一个group从头消费 成功提交位移后 重启后还是接着消费 这个参数无效

所以3个值的解释是:

earliset 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从最早的位移消费

latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

(注意kafka-0.10.1.X版本之前: auto.offset.reset 的值为smallest,和,largest.(offest保存在zk中) 、

我们这是说的是新版本:kafka-0.10.1.X版本之后: auto.offset.reset 的值更改为:earliest,latest,和none (offest保存在kafka的一个特殊的topic名为:__consumer_offsets里面))

enable.auto.commit 是否自动提交位移

true 自动提交 false需要用户手动提交 有只处理一次需要的 最近设置为false自己控制。

fetch.max.bytes consumer单次获取最大字节数

max.poll.records 单次poll返回的最大消息数

默认500条 如果消费很轻量 可以适当提高这个值 增加消费速度。

hearbeat.interval.ms consumer其他组员感知rabalance的时间

该值必须小于 session.timeout.ms 如果检测到 consumer挂掉 也就根本无法感知rabalance了

connections.max.idle.ms 定期关闭连接的时间

默认是9分钟 可以设置为-1 永不关闭


poll方法详解:


(旧版本:多分区多线程 新版本:一个线程管理多个socket连接)

但新版本KafkaConsumer是双线程的,主线程负责:消息获取,rebalance,coordinator,位移提交等等,

另一个是后台心跳线程。

根据上边的各种配置,poll方法会找到offset,当获取了足够多的可用数据,或者等待时间超过了指定的超时时间,就会返回。

java consumer不是线程安全的,同一个KafkaConsumer用在了多个线程中,将会报Kafka Consumer is not safe for multi-threaded assess异常。可以加一个同步锁进行保护。

poll的超时参数,已经说过1000的话是超时设定,如果没有很多数据,也就等一秒,就返回了,比如定时5秒的将消息写入,就可以将超时参数设置为5000,达到效率最大化。

如果没有定时任务呢,那就设置为 Long.MAX_VALUE 未获取足够多的数据就无限等待。这里要捕获一下WakeupException。


consumer offset详解:


consumer需要定期向kafka提交自己的offset信息。已经学过 新版本将他提交到了一个topic中 __consumer_offsets。

offset有一个更大的作用是实现交付语义:

最多一次 at most once 可能丢失 不会重复

最少一次 at least once 可能重复 不会丢失

精确一次 exactly once 不丢失 不重复 就一次

若consumer在消费之前提交位移 就实现了at most once

若是消费后提交 就实现了 at least once 默认是这个。

consumer的多个位置信息:

上次提交的位置 当前位置 水位 日志最新位移

0 1 。。5 。。10 。。15

上次提交位置:consumer最近一次提交的offset值;

当前位置:consumer上次poll 到了这个位置 但是还没提交;

水位:这是分区日志的管理 consumer无法读取水位以上的消息;

最新位移:也是分区日志的管理 最大的位移值 一定不会比水位小。

新版本的consumer会在broker选一个broker作为consumergroup的coordinator,用于实现组成员管理,消费分配方案,提交位移。如果consumer崩溃,他负责的分区就分配给其他consumer,如果没有做好位移提交就可能重复消费。

多次提交的情况,kafka只关注最新一次的提交。

默认consumer自动提交位移 提交间隔为5秒 可以通过 auto.commit.interval.ms 设置这个间隔。

自动提交可以减少开发,但是可能重复消费,所以需要精准消费时还是要手动提交。设置手动提交 enable.auto.commit = false,然后调用 consumer.commitSync() 或者 consumer.commitAync() Sync为同步方式,阻塞 Aync为异步方式,不会阻塞。这两个方法可以传参,指定为哪个分区提交,这样更合理一些。

(旧版本的自动提交设置是 auto.commit.enable 默认间隔为60秒)


rebalance详解:


rebalance是consumer group如何分配topic的所有分区。

正常情况,比如有10个分区,5个consumer 那么consumer group将为每个consumer 平均分配两个分区。

每个分区只会分给一个consumer实例。有consumer出现问题,会重新执行这个过程,这个过程就是rebalance。

(旧版本通过zookeeper管理rebalance,新版本会选取某个broker为group coordinator来管理)

rebalance的触发条件:

1、有新的consumer加入,或者有consumer离开或者挂掉。

2、group订阅的topic发生变更,比如正则订阅。

3、group订阅的分区数发生变化。

第一个经常出现,不一定是挂掉,也可能是处理太慢,为了避免频繁rebalance,要调整好request.timeout.ms max.poll.records和ma.poll.interval.


rebalance分区策略:

partition.assignment.strategy 设置 自定义分区策略-创建分区器 assignor

range策略(默认),将分区划分为分区段,一次分配给每个consumer。

round-robin策略,轮询分配。

sticky策略(0.11.0.0出现,更优秀),range策略在订阅多个topic时会不均匀。

sticky有两个原则,当两者发生冲突时,第一个目标优先于第二个目标。

  1. 分区的分配要尽可能的均匀;
  2. 分区的分配尽可能的与上次分配的保持相同。

rebalance generation分代机制保证rabalance时重复提交的问题,延迟的offset提交时旧的generation信息会报异常ILLEGAL_GENERATION

rebalance过程:

1、确定coordinator所在的broker,建立socket连接。

确定算法:Math.abs(groupID.hashCode) % offsets.topic.num.partition 参数值(默认50)

寻找__consumer_offset分区50的leader副本所在的broker,该broker即为这个group的coordinator

2、加入组

所有consumer会向coordinator发送JoinGroup请求,收到所有请求后选一个consumer做leader(这个leader是consumer coordinator是broker),coordinator把成员和订阅信息发给coordinator。

3、同步分配方案

leader制定分配方案,通过SyncGroup请求发给coordinator,每个consumer也会发请求返回方案。

kafka也支持offset不提交到__consumer_offset,可以自定义,这时候就需要实现一个监听器ConsumerRebalanceListener,在这里重新处理Rebalance的逻辑。


多线程示例代码:
这里要根据自身需求开发,我这里只举一个简单的例子,就是几个分区就启动几个consumer,一一对应。三个类:Main:public static void main(String[] args) {                String bootstrapServers = "kafka01:9092,kafka02:9092";         String groupId = "test";        String topic = "testtopic";        int consumerNum = 3;        ConsumerGroup cg = new ConsumerGroup(consumerNum,bootstrapServers,groupId,topic);        cg.execute();}import java.util.ArrayList;import java.util.List;public class ConsumerGroup {        private List<ConsumerRunnable> consumers;        public ConsumerGroup(int consumerNum,String bootstrapServers,String groupId,String topic){                consumers = new ArrayList<>(consumerNum);                for(int i=0;i < consumerNum;i++){            ConsumerRunnable ConsumerRunnable = new ConsumerRunnable(bootstrapServers,groupId,topic);            consumers.add(ConsumerRunnable);        }    }        public void execute(){                for(ConsumerRunnable consumerRunnable:consumers){            new Thread(consumerRunnable).start();        }    }}import java.util.Arrays;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;public class ConsumerRunnable implements Runnable{        private final KafkaConsumer<String,String> consumer;        public ConsumerRunnable(String bootstrapServers,String groupId,String topic){                Properties props = new Properties();        props.put("bootstrap.servers", bootstrapServers);        props.put("group.id", groupId);        props.put("enable.auto.commit", "true");        props.put("auto.commit.interval.ms", "1000");        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        props.put("auto.offset.reset","earliest");        this.consumer = new KafkaConsumer<>(props);        consumer.subscribe(Arrays.asList(topic));    }    @Override    public void run() {        while (true) {            ConsumerRecords<String, String> records = consumer.poll(10);            for (ConsumerRecord<String, String> record : records) {                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());            }        }    }}


standalone consumer

有一些需求,需要指定一个消费者消费某一个分区。彼此之间不干扰,一个standalone consumer崩溃不会影响其他。

类似旧版本的低阶消费者。

示例代码如下:consumer.assign方法订阅分区

public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset","earliest");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        List<TopicPartition> partitions = new ArrayList<>();
        List<PartitionInfo> allpartitions = consumer.partitionsFor("testtopic");
        if(allpartitions!=null && !allpartitions.isEmpty()){
            for(PartitionInfo partitionInfo:allpartitions){
                partitions.add(new TopicPartition(partitionInfo.topic(),partitionInfo.partition()));
            }
            consumer.assign(partitions);
        }
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(10);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }

以上为kafka消费者的学习

相关文章
|
8月前
|
消息中间件 Kafka API
kafka Consumer high-level api 之白名单
kafka Consumer high-level api 之白名单
|
2月前
|
消息中间件 Kafka
使用kafka consumer加载数据加载异常并且报source table and destination table are not same错误解决办法
使用kafka consumer加载数据加载异常并且报source table and destination table are not same错误解决办法
|
3月前
|
消息中间件 存储 分布式计算
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
94 4
|
4月前
|
消息中间件 安全 大数据
Kafka多线程Consumer是实现高并发数据处理的有效手段之一
【9月更文挑战第2天】Kafka多线程Consumer是实现高并发数据处理的有效手段之一
378 4
|
5月前
|
消息中间件 大数据 Kafka
Kafka消息封装揭秘:从Producer到Consumer,一文掌握高效传输的秘诀!
【8月更文挑战第24天】在分布式消息队列领域,Apache Kafka因其实现的高吞吐量、良好的可扩展性和数据持久性备受开发者青睐。Kafka中的消息以Record形式存在,包括固定的头部与可变长度的消息体。生产者(Producer)将消息封装为`ProducerRecord`对象后发送;消费者(Consumer)则从Broker拉取并解析为`ConsumerRecord`。消息格式简化示意如下:消息头 + 键长度 + 键 + 值长度 + 值。键和值均为字节数组,需使用特定的序列化/反序列化器。理解Kafka的消息封装机制对于实现高效、可靠的数据传输至关重要。
140 4
|
5月前
|
消息中间件 监控 Java
【Kafka节点存活大揭秘】如何让Kafka集群时刻保持“心跳”?探索Broker、Producer和Consumer的生死关头!
【8月更文挑战第24天】在分布式系统如Apache Kafka中,确保节点的健康运行至关重要。Kafka通过Broker、Producer及Consumer间的交互实现这一目标。文章介绍Kafka如何监测节点活性,包括心跳机制、会话超时与故障转移策略。示例Java代码展示了Producer如何通过定期发送心跳维持与Broker的连接。合理配置这些机制能有效保障Kafka集群的稳定与高效运行。
107 2
|
5月前
|
消息中间件 Java Kafka
"Kafka快速上手:从环境搭建到Java Producer与Consumer实战,轻松掌握分布式流处理平台"
【8月更文挑战第10天】Apache Kafka作为分布式流处理平台的领头羊,凭借其高吞吐量、可扩展性和容错性,在大数据处理、实时日志收集及消息队列领域表现卓越。初学者需掌握Kafka基本概念与操作。Kafka的核心组件包括Producer(生产者)、Broker(服务器)和Consumer(消费者)。Producer发送消息到Topic,Broker负责存储与转发,Consumer则读取这些消息。首先确保已安装Java和Kafka,并启动服务。接着可通过命令行创建Topic,并使用提供的Java API实现Producer发送消息和Consumer读取消息的功能。
96 8
|
5月前
|
消息中间件 Java 大数据
"深入理解Kafka单线程Consumer:核心参数配置、Java实现与实战指南"
【8月更文挑战第10天】在大数据领域,Apache Kafka以高吞吐和可扩展性成为主流数据流处理平台。Kafka的单线程Consumer因其实现简单且易于管理而在多种场景中受到欢迎。本文解析单线程Consumer的工作机制,强调其在错误处理和状态管理方面的优势,并通过详细参数说明及示例代码展示如何有效地使用KafkaConsumer类。了解这些内容将帮助开发者优化实时数据处理系统的性能与可靠性。
112 7
|
5月前
|
消息中间件 安全 Kafka
"深入实践Kafka多线程Consumer:案例分析、实现方式、优缺点及高效数据处理策略"
【8月更文挑战第10天】Apache Kafka是一款高性能的分布式流处理平台,以高吞吐量和可扩展性著称。为提升数据处理效率,常采用多线程消费Kafka数据。本文通过电商订单系统的案例,探讨了多线程Consumer的实现方法及其利弊,并提供示例代码。案例展示了如何通过并行处理加快订单数据的处理速度,确保数据正确性和顺序性的同时最大化资源利用。多线程Consumer有两种主要模式:每线程一个实例和单实例多worker线程。前者简单易行但资源消耗较大;后者虽能解耦消息获取与处理,却增加了系统复杂度。通过合理设计,多线程Consumer能够有效支持高并发数据处理需求。
204 4

热门文章

最新文章