Kafka单线程Consumer及参数详解

简介: Kafka单线程Consumer及参数详解

请使用0.9以后的版本:

示例代码


Properties props = new Properties();
      props.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("auto.offset.reset","earliest");
      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
      consumer.subscribe(Arrays.asList("foo", "bar"));
    try{
        while (true) {
          ConsumerRecords<String, String> records = consumer.poll(1000);
          for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
          }
       }
        }finally{
          consumer.close();
        }


1、只需要配置kafka的server  groupid  autocommit   序列化   autooffsetreset(其中 bootstrap.server       group.id           key.deserializer         value.deserializer  必须指定);

2、用这些Properties构建consumer对象(KafkaConsumer还有其他构造,可以把序列化传进去);

3、subscribe订阅topic列表(可以用正则订阅Pattern.compile("kafka.*")

使用正则必须指定一个listener    subscribe(Pattern pattern, ConsumerRebalanceListener listener));  可以重写这个接口来实现  分区变更时的逻辑。如果设置了enable.auto.commit  = true   就不用理会这个逻辑。

4、然后循环poll消息(这里的1000是超时设定,如果没有很多数据,也就等一秒);

5、处理消息(打印了offset  key  value  这里写处理逻辑)。

6、关闭KafkaConsumer(可以传一个timeout值 等待秒数   默认是30)。


参数详解


bootstrap.server(最好用主机名不用ip  kafka内部用的主机名  除非自己配置了ip)

deserializer         反序列化consumer从broker端获取的是字节数组,还原回对象类型。

默认有十几种:StringDeserializer  LongDeserializer  DoubleDeserializer。。

也可以自定义:定义serializer格式     创建自定义deserializer类实现Deserializer 接口 重写逻辑


除了四个必传的 bootstrap.server       group.id           key.deserializer         value.deserializer

还有session.timeout.ms       "coordinator检测失败的时间"

是检测consumer挂掉的时间   为了可以及时的rebalance   默认是10秒   可以设置更小的值避免消息延迟。

max.poll.interval.ms     "consumer处理逻辑最大时间"

处理逻辑比较复杂的时候  可以设置这个值  避免造成不必要的  rebalance  ,因为两次poll时间超过了这个参数,kafka认为这个consumer已经跟不上了,会踢出组,而且不能提交offset,就会重复消费。默认是5分钟。

auto.offset.reset  "无位移或者位移越界时kafka的应对策略"

所以如果启动了一个group从头消费  成功提交位移后   重启后还是接着消费  这个参数无效

所以3个值的解释是:

earliset  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从最早的位移消费

latest  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据       none  topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

(注意kafka-0.10.1.X版本之前: auto.offset.reset 的值为smallest,和,largest.(offest保存在zk中)  、

我们这是说的是新版本:kafka-0.10.1.X版本之后: auto.offset.reset 的值更改为:earliest,latest,和none (offest保存在kafka的一个特殊的topic名为:__consumer_offsets里面))

enable.auto.commit   是否自动提交位移

true 自动提交     false需要用户手动提交   有只处理一次需要的  最近设置为false自己控制。

fetch.max.bytes   consumer单次获取最大字节数

max.poll.records  单次poll返回的最大消息数

默认500条   如果消费很轻量  可以适当提高这个值   增加消费速度。

hearbeat.interval.ms  consumer其他组员感知rabalance的时间

该值必须小于 session.timeout.ms   如果检测到 consumer挂掉   也就根本无法感知rabalance了

connections.max.idle.ms  定期关闭连接的时间

默认是9分钟  可以设置为-1  永不关闭

相关文章
|
3月前
|
消息中间件 Kafka
使用kafka consumer加载数据加载异常并且报source table and destination table are not same错误解决办法
使用kafka consumer加载数据加载异常并且报source table and destination table are not same错误解决办法
|
3月前
|
Java
线程池七大参数
核心线程数:线程池中的基本线程数量 最大线程数:当阻塞队列满了之后,逐一启动 最大线程的存活时间:当阻塞队列的任务执行完后,最大线长的回收时间 最大线程的存活时间单位 阻塞队列:当核心线程满后,后面来的任务都进入阻塞队列 线程工厂:用于生产线程
|
4月前
|
消息中间件 SQL 分布式计算
大数据-62 Kafka 高级特性 主题 kafka-topics相关操作参数 KafkaAdminClient 偏移量管理
大数据-62 Kafka 高级特性 主题 kafka-topics相关操作参数 KafkaAdminClient 偏移量管理
56 6
|
4月前
|
消息中间件 存储 分布式计算
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
115 4
|
4月前
|
消息中间件 存储 负载均衡
大数据-60 Kafka 高级特性 消息消费01-消费组图例 心跳机制图例 附参数详解与建议值
大数据-60 Kafka 高级特性 消息消费01-消费组图例 心跳机制图例 附参数详解与建议值
86 3
|
5月前
|
消息中间件 Kafka 测试技术
Kafka常用命令大全及kafka-console-consumer.sh及参数说明
该文章汇总了Kafka常用命令,包括集群管理、Topic操作、生产者与消费者的命令行工具使用方法等,适用于Kafka的日常运维和开发需求。
1124 2
|
4月前
|
设计模式 Java 物联网
【多线程-从零开始-玖】内核态,用户态,线程池的参数、使用方法详解
【多线程-从零开始-玖】内核态,用户态,线程池的参数、使用方法详解
88 0
|
5月前
|
消息中间件 安全 大数据
Kafka多线程Consumer是实现高并发数据处理的有效手段之一
【9月更文挑战第2天】Kafka多线程Consumer是实现高并发数据处理的有效手段之一
446 4
|
6月前
|
开发者 图形学 前端开发
绝招放送:彻底解锁Unity UI系统奥秘,五大步骤教你如何缔造令人惊叹的沉浸式游戏体验,从Canvas到动画,一步一个脚印走向大师级UI设计
【8月更文挑战第31天】随着游戏开发技术的进步,UI成为提升游戏体验的关键。本文探讨如何利用Unity的UI系统创建美观且功能丰富的界面,包括Canvas、UI元素及Event System的使用,并通过具体示例代码展示按钮点击事件及淡入淡出动画的实现过程,助力开发者打造沉浸式的游戏体验。
181 0