Kafka(四)【Kafka 消费者】(1)

简介: Kafka(四)【Kafka 消费者】

前言

       截止昨天 Flink 第一遍是过了,当然得深入复习,把相关的书都看一遍。那么今天开始就得同样抓紧把 Kafka 、Flume 过完第一遍,完了看看相关的书。最后用这些先做一个小的项目。至于剩下的时间,就是每天离线数仓、实时数仓的学了,必须掌握到 70~90%。虽然不知道结果怎么样,但是无路可退了,那就肝到底吧。

       此外还有 SSM、SpringBoot 也是需要掌握的,好在兴趣使然,没有多大压力。

Kafka 消费者

1、消费方式

       Kafka 消费者使用 pull 的方式从 broker 主动拉取数据,而不是让 broker 去主动把数据 push(推/主动发送)给消费者,因为毕竟每个消费者的速度是不同的,最好还是根据消费者自己的性能来获取数据。

2、消费者工作流程

2.1、消费者总体工作流程

  • 每个消费者可以消费多个分区,但是一个分区的数据只能被一个消费者组里的一个消费者消费。

如果消费者在消费完某个数据之后挂掉了,有后续新的消费者代替它,那么新的小肥猪怎么继续消费?

其实,在消费者读取数据的 offset 是会被保存在 Kafka broker 系统主题中的,也就是说,即使消费者挂了,下一个消费者可以从 broker  的系统主题里获得上次消费的 offset ,然后接着继续消费。(旧版本 offset 是存储在 zookeeper 中的,但是当消费者非常多的时候可能会造成大量的网络交互;)

2.2、消费者组原理

2.2.1、消费者组

consumer group:消费者组,由多个 consumer 组成。形成一个消费者组的条件是所有消费者的 groupid 相同。

  • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内的一个消费者消费(防止数据被重复消费)。
  • 消费者组之间互不影响。所有消费者都属于某个消费者,即消费者组是逻辑上的一个订阅者。
  • 如果消费者组的组员数量 > 分区数量,则就会有多余的消费者闲置。
2.2.2、消费者组初始化流程

  • coordinator:辅助实现消费者组的初始化和分区的分配。coordinator 节点选择 = groupid.hashcode()%50 (goupid 是我们自己写代码的时候指定的,50 是 _consumer_offsets 的分区数量默认是 50)

选定 coordinator 后:

  1. 消费者组内的每个消费者都会向 coordinator 发送一个 JoinGroup 请求(请求加入到 groupid 这个组)
  2. coordinator 会从发来请求的所有消费者中随机选择一个作为消费者组中的 leader。
  3. coordinator 会把自己从这些消费者中收集到的请求中的信息都发送给这个 leader,也就是说,coordinator 只是辅助消费者组的分区选择,真正的分区分配是由 leader 完成的。
  4. leader 会定制一个消费方案。
  5. 制定好消费方案后,leader 会把制定好的计划发送给 coordinator。
  6. coordinator 然后把消费方案下发给每个消费者。
  7. 每个消费者都会和 coordinator 保持心跳(默认 3s),一旦超时(45s)该消费者就会被移除,并触发再平衡;或者消费者处理时间太长(5分钟)也会触发再平衡

消费者消费数据的条件:

  • fetch.min.bytes 每批次最小抓取字节数:只要达到该字节数就进行返回
  • fetch.max.wait.ms 一批数据最小值未到达的超时时间 :即使没有达到最小字节数,当等待时间达到该值时也会进行返回
  • fetch.max.bytes 每批次最大抓取字节数

消费者消费的参数:

  • max.poll.records 每次拉取的最大消息数,默认 500 条

此外,消费者可以和生产者一样在拦截器这里对数据进行处理。

2.3、消费者重要参数

参数名称

描述

bootstrap.servers

Kafka集群建立初始连接用到的host/port列表。

key.deserializer和value.deserializer

指定接收消息的key和value的反序列化类型。一定要写全类名。

group.id

标记消费者所属的消费者组。

enable.auto.commit

默认值为true,消费者会自动周期性地向服务器提交偏移量。

auto.commit.interval.ms

如果设置了 enable.auto.commit 的值为true, 则该值定义了消费者偏移量向Kafka提交的频率,默认5s。

auto.offset.reset

当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。

offsets.topic.num.partitions

__consumer_offsets的分区数,默认是50个分区。

heartbeat.interval.ms

Kafka消费者和coordinator之间的心跳时间,默认3s。

该条目的值必须小于 session.timeout.ms ,也不应该高于 session.timeout.ms 的1/3。

session.timeout.ms

Kafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。

max.poll.interval.ms

消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。

fetch.min.bytes

默认1个字节。消费者获取服务器端一批消息最小的字节数。

fetch.max.wait.ms

默认500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。

fetch.max.bytes

默认Default: 5242880050 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。

max.poll.records

一次poll拉取数据返回消息的最大条数,默认是500条。

Kafka(四)【Kafka 消费者】(2)https://developer.aliyun.com/article/1532345

相关文章
|
1月前
|
消息中间件 负载均衡 Kafka
【Kafka面试演练】那Kafka消费者手动提交、自动提交有什么区别?
嗯嗯Ok。分区的作用主要就是为了提高Kafka处理消息吞吐量。每一个topic会被分为多个分区。假如同一个topic下有n个分区、n个消费者,这样的话每个分区就会发送消息给对应的一个消费者,这样n个消费者负载均衡地处理消息。同时生产者会发送消息给不同分区,每个分区分给不同的brocker处理,让集群平坦压力,这样大大提高了Kafka的吞吐量。面试官思考中…
125 4
|
1月前
|
消息中间件 安全 Kafka
深度解析Kafka中消费者的奥秘
深度解析Kafka中消费者的奥秘
54 0
|
1月前
|
消息中间件 Java Kafka
关于kafka消费者超时配置
关于kafka消费者超时配置
175 2
|
1月前
|
消息中间件 分布式计算 Java
探究Kafka原理-3.生产者消费者API原理解析(上)
探究Kafka原理-3.生产者消费者API原理解析
48 0
|
21天前
|
消息中间件 分布式计算 Kafka
Kafka(四)【Kafka 消费者】(4)
Kafka(四)【Kafka 消费者】
|
15天前
|
消息中间件 Kafka
Kafka生产者和消费者相关命令行操作
Kafka生产者和消费者相关命令行操作
18 1
|
24天前
|
消息中间件 Java Kafka
springboot整合kafka消费者最佳实践
springboot整合kafka消费者最佳实践
58 1
|
24天前
|
消息中间件 负载均衡 监控
Kafka消费者:监听模式VS主动拉取,哪种更适合你?
Kafka消费者:监听模式VS主动拉取,哪种更适合你?
17 1
|
21天前
|
消息中间件 存储 算法
Kafka(四)【Kafka 消费者】(3)
Kafka(四)【Kafka 消费者】
|
21天前
|
消息中间件 Kafka API
Kafka(四)【Kafka 消费者】(2)
Kafka(四)【Kafka 消费者】

热门文章

最新文章