深入浅出分析kafka客户端程序设计 ----- 消费者篇----万字总结(上):https://developer.aliyun.com/article/1393837?spm=a2c6h.24874632.expert-profile.8.6af22f31AcelSW
5 消费Rebalance机制
当kafka遇到如下四种情况的时候,kafka会触发Rebalance机制:
1. 消费组成员发生了变更,比如有新的消费者加入了消费组组或者有消费者宕机。
2. 消费者无法在指定的时间之内完成消息的消费。
3. 消费组订阅的Topic发生了变化。
4. 订阅的Topic的partition发生了变化。
6 其他重要的消费者参数
在 KafkaConsumer 中,除了前面讲的必要的客户端参数,大部分的参数都有合理的默认值,一般我们
也不需要去修改它们。不过了解这些参数可以让我们更好地使用消费者客户端,其中还有一些重要的参
数涉及程序的可用性和性能,如果能够熟练掌握它们,也可以让我们在编写相关的程序时能够更好地进
行性能调优与故障排查。下面挑选一些重要的参数来做细致的讲解
6.1 fetch.min.bytes
该参数用来配置 Consumer 在一次拉取请求(调用 poll() 方法)中能从 Kafka 中拉取的最小数据量,默
认值为1(B)。Kafka 在收到 Consumer 的拉取请求时,如果返回给 Consumer 的数据量小于这个参
数所配置的值,那么它就需要进行等待,直到数据量满足这个参数的配置大小。可以适当调大这个参数
的值以提高一定的吞吐量,不过也会造成额外的延迟(latency),对于延迟敏感的应用可能就不可取
了。
6.2 fetch.max.bytes
该参数与 fetch.min.bytes 参数对应,它用来配置 Consumer 在一次拉取请求中从Kafka中拉取的最大
数据量,默认值为52428800(B),也就是50MB。
如果这个参数设置的值比任何一条写入 Kafka 中的消息要小,那么会不会造成无法消费呢?很多资料对
此参数的解读认为是无法消费的,比如一条消息的大小为10B,而这个参数的值是1(B),既然此参数
设定的值是一次拉取请求中所能拉取的最大数据量,那么显然1B<10B,所以无法拉取。这个观点是错误
的,该参数设定的不是绝对的最大值,如果在第一个非空分区中拉取的第一条消息大于该值,那么该消
息将仍然返回,以确保消费者继续工作。也就是说,上面问题的答案是可以正常消费。
与此相关的,Kafka 中所能接收的最大消息的大小通过服务端参数 message.max.bytes(对应于主题端
参数 max.message.bytes)来设置。
6.3 fetch.max.wait.ms
这个参数也和 fetch.min.bytes 参数有关,如果 Kafka 仅仅参考 fetch.min.bytes 参数的要求,那么有可
能会一直阻塞等待而无法发送响应给 Consumer,显然这是不合理的。fetch.max.wait.ms 参数用于指
定 Kafka 的等待时间,默认值为500(ms)。如果 Kafka 中没有足够多的消息而满足不了
fetch.min.bytes 参数的要求,那么最终会等待500ms。这个参数的设定和 Consumer 与 Kafka 之间的
延迟也有关系,如果业务应用对延迟敏感,那么可以适当调小这个参数。
6.4 max.partition.fetch.bytes
这个参数用来配置从每个分区里返回给 Consumer 的最大数据量,默认值为1048576(B),即1MB。
这个参数与 fetch.max.bytes 参数相似,只不过前者用来限制一次拉取中每个分区的消息大小,而后者
用来限制一次拉取中整体消息的大小。同样,如果这个参数设定的值比消息的大小要小,那么也不会造
成无法消费,Kafka 为了保持消费逻辑的正常运转不会对此做强硬的限制。
6.5 max.poll.records
这个参数用来配置 Consumer 在一次拉取请求中拉取的最大消息数,默认值为500(条)。如果消息的
大小都比较小,则可以适当调大这个参数值来提升一定的消费速度。
如果用户的消息处理逻辑很轻量,默认的 500 条消息通常不能满足实际的消息处理速度 。
6.6 connections.max.idle.ms
这个参数用来指定在多久之后关闭闲置的连接,默认值是540000(ms),即9分钟。
6.7 exclude.internal.topics
Kafka 中有两个内部的主题: consumer_offsets 和 transaction_state。exclude.internal.topics 用来指定 Kafka 中的内部主题是否可以向消费者公开,默认值为 true。如果设置为 true,那么只能使用
subscribe(Collection)的方式而不能使用 subscribe(Pattern)的方式来订阅内部主题,设置为 false 则没
有这个限制。
6.8 receive.buffer.bytes
同生产者。
6.9 send.buffer.bytes
同生产者。
6.10 request.timeout.ms
这个参数用来配置 Consumer 等待请求响应的最长时间,默认值为30000(ms)。
6.11 metadata.max.age.ms
这个参数用来配置元数据的过期时间,默认值为300000(ms),即5分钟。如果元数据在此参数所限定的时间范围内没有进行更新,则会被强制更新,即使没有任何分区变化或有新的 broker 加入。
6.12 reconnect.backoff.ms
这个参数用来配置尝试重新连接指定主机之前的等待时间(也称为退避时间),避免频繁地连接主机,默认值为50(ms)。这种机制适用于消费者向 broker 发送的所有请求。
6.13 retry.backoff.ms
这个参数用来配置尝试重新发送失败的请求到指定的主题分区之前的等待(退避)时间,避免在某些故障情况下频繁地重复发送,默认值为100(ms)。
6.14 isolation.level
这个参数用来配置消费者的事务隔离级别。字符串类型,有效值为“read_uncommitted”和
“read_committed”,表示消费者所消费到的位置,如果设置为“read_committed”,那么消费者就会忽略事务未提交的消息,即只能消费到LSO(LastStableOffset)的位置,默认情况下为
“read_uncommitted”,即可以消费到 HW(High Watermark)处的位置。
6.15 session.timeout.ms
非常重要的参数之一 !
session.timeout.ms 是 consumer group 检测组内成员发送崩溃的时间 。
假设你设置该参数为 5 分钟,那么当某个 group 成员突然崩攒了(比如被 kill -9 或岩机), 管理group 的 Kafka 组件(即消费者组协调者,也称 group coordinator)有可能需要 5 分钟才能感知到这个崩溃。显然我们想要缩短这个时间,让coordinator 能够更快地检测到 consumer 失败 。这个参数还有另外一重含义 :consumer 消息处理逻辑的最大时间。
倘若 consumer 两次 poll 之间的间隔超过了该参数所设置的阑值,那么 coordinator 就会认为这个consumer 己经追不上组内其他成员的消费进度了,因此会将该 consumer 实例“踢出”组,该consumer 负责的分区也会被分配给其他 consumer。在最好的情况下,这会导致不必要的 rebalance,因为 consumer 需要重新加入 group 。更糟的是,对于那些在被踢出 group 后处理的消息, consumer 都无法提交位移一一这就意味着这些消息在rebalance 之后会被重新消费一遍。如果一条消息或一组消息总是需要花费很长的时间处理,那么consumer 甚至无法执行任何消费,除非用户重新调整参数 。鉴于以上的“窘境”, Kafka 社区于 0.10.1.0 版本对该参数的含义进行了拆分 。 在该版本及以后的版本中, session.timeout.ms 参数被明确
为“ coordinator 检测失败的时间” 。因此在实际使用中,用户可以为该参数设置一个比较小的值,让 coordinator 能够更快地检测consumer 崩溃的情况,从而更快地开启 rebalance,避免造成更大的消费滞后( consumer lag ) 。目前该参数的默认值是 10 秒。
6.16 max.poll.interval.ms
Apache官网max.poll.interval.ms上的解释如下,消费者组中的一员在拉取消息时如果超过了设置的最大拉取时间,则会认为消费者消费消息失败,kafka会重新进行重新负载均衡,以便把消息分配给另一个消费组成员。在一个典型的 consumer 使用场景中,用户对于消息的处理可能需要花费很长时间。这个参数就是用于设置消息处理逻辑的最大时间的 。 假设用户的业务场景中消息处理逻辑是把消息、“落地”到远程数据库中,且这个过程平均处理时间是 2 分钟,那么用户仅需要将 max.poll.interval.ms 设置为稍稍大于 2 分钟的值即可,而不必为 session. neout.ms 也设置这么大的值。通过将该参数设置成实际的逻辑处理时间再结合较低的session.timeout.ms 参数值,consumer group既实现了快速的 consumer 崩溃检测,也保证了复杂的事件处理逻辑不会造成不必要的 rebalance 。
6.17 heartbeat.interval.ms
该参数和 request.timeout.ms 、max.poll.interval.ms 参数是最难理解的 consumer 参数 。 前面己经
讨论了后两个参数的含义,这里解析一下heartbeat.interval.ms的含义及用法 。
从表面上看,该参数似乎是心跳的问隔时间,但既然己经有了上面的 session.timeout.ms 用于设置超
时,为何还要引入这个参数呢?
这里的关键在于要搞清楚 consumer group 的其他成员,如何得知要开启新一轮 rebalance;当coordinator 决定开启新一轮 rebalance 时,它会将这个决定以 REBALANCE_IN_PROGRESS 异常的形
式“塞进” consumer 心跳请求的 response 中,这样其他成员拿到 response 后才能知道它需要重新加入group。显然这个过程越快越好,而heartbeat. interval.ms 就是用来做这件事情的 。比较推荐的做法是设置一个比较低的值,让 group 下的其他 consumer 成员能够更快地感知新一轮rebalance. 开启了。注意,该值必须小于 session.timeout.ms !这很容易理解,毕竟如果 consumer 在session.timeout.ms 这段时间内都不发送心跳, coordinator 就会认为它已经 dead,因此也就没有必要让它知晓 coordinator 的决定了。
7 broker参数详解
broker 端参数需要在 Kafka 目录下的 config/server.properties 文件中进行设置。当前对于绝大多数的
broker 端参数而言, Kafka 尚不支持动态修改一一这就是说,如果要新增、修改,抑或是删除某些
broker 参数的话,需要重启对应的 broker 服务器。
7.1 broker.id
Kafka 使用唯一的一 个整数来标识每个 broker ,这就是 broker.id ;该参数默认是-1 。如果不指定,
Kafka 会自动生成一个唯一值;总之,不管用户指定什么都必须保证该值在 Kafka 集群中是唯一的,不
能与其他 broker 冲突 。
在实际使用中,推荐使用从0开始的数字序列,如0、1、2……
7.2 log.dirs
# A comma separated list of directories under which to store log files log.dirs=/tmp/kafka-logs
该参数指定了 Kafka 持久化消息的目录;非常重要的参数!
若不设置该参数, Kafka 默认使用tmp/kafka-logs 作为消息保存的目录(商用一定要自己设置目录)。
把消息保存在 tmp 录下,生产环境中是不可取的。若待保存的消息数量非常多,那么最好确保该文件夹
下有充足的磁盘空间。
该参数可以设置多个目录,以逗号分隔,比如**/home/kafka1,/home/kafka2** 。在实际使用过程中,
指定多个目录的做法通常是被推荐的,因为这样 Kafka 可以把负载均匀地分配到多个目录下。
若用户机器上有N块物理硬盘,那么设置多 个目录(须挂载在不同磁盘上的目录)是一个很好的选择。N
个磁头可以同时执行写操作,极大地提升了吞吐量。
注意 ,这里的“均匀”是根据目录下的分区数进行比较的,而不是根据实际的磁盘空间。
7.3 zookeeper.connect
同样是非常重要的参数。主要是在zookeeper中,kafka的配置数据,有一个公共开始的根节点;该参数没有默认的值,如果不配置,则使用zookeeper的/目录;
例如:zkl :218l,zk2:2181,zk3:2181/kafka_clusterl;/kafka_cluster就是kafka的配置的目录;配置了,
可以起到很好的隔离效果。这样管理 Kafka 集群将变得更加容易。
多个 Zookeeper 实例
在 Kafka 集群中,可以配置多个 Zookeeper 实例来提高可用性和容错性。Zookeeper 主要用于协调和存储 Kafka 集群的元数据和配置信息。通过配置多个 Zookeeper 实例,可以防止单点故障,并提供更好的集群健壮性。
上述zkl :218l,zk2:2181,zk3:2181/kafka_clusterl 这个就是配置了三个zookeeper
7.4 listeners
broker 监听器的 csv (comma-separated values)列表,格式是[协议://:主机名:端口],[协议]:[// 主机
名:端口]。
该参数主要用于客户端连接 broker 使用,可以认为是 broker 端开放给 clients的监听端口 如果不指定
主机名,则表示绑定默认网卡:如果0.0.0.0,则表示绑定所有网卡。 Kafka 当前支持的协议类型包括
PLAINTEXT、SSL以及 SASL SSL 等。
1.listeners:
1.作用:listeners参数定义了 Kafka Broker 用于接受客户端连接的监听器和端口。
2.配置示例:listeners=PLAINTEXT://:9092,SSL://:9093
3.解释:此配置示例指示 Kafka Broker 使用两个监听器端口:一个是9092端口,使用PLAINTEXT协议(非加密),另一个是9093端口,使用SSL协议(加密通信)。
假设您有一个 Kafka 集群部署在云环境中,每个 Broker 机器有私有IP和公有IP。您希望集群内的 Broker 使用不同的协议和端口与客户端通信,但客户端需要连接到公有IP。
listeners=PLAINTEXT://:9092,SSL://:9093:Broker使用两个监听器端口,一个用于PLAINTEXT通信(9092端口),另一个用于SSL加密通信(9093端口)。
7.5 advertised.listeners
跟 listeners 类似,该参数也是用于发布给 clients 的监昕器;不过该参数主要用于 IaaS 环境,比如云上
的机器通常都配有多块网卡(私网网卡和公网网卡)。
对于这种机器,用户可以设置该参数绑定公网 IP 供外部 clients 使用,然后配置上面的 listeners 来绑定
私网 IP供broker间通信使用。
当然不设置该参数也是可以的,只是云上的机器很容易出现 clients 无法获取数据的问题,原因就是
listeners 绑定的是默认网卡,而默认网卡通常都是绑定私网的。
在实际使用场景中,对于配有多块网卡的机器而言,这个参数通常都是需要配置的。
1.advertised.listeners:
1.作用:advertised.listeners参数用于向客户端公布 Kafka Broker 的监听器信息。
2.配置示例:advertised.listeners=PLAINTEXT://public_ip:9092,SSL://public_ip:9093
3.解释:在云环境或多网络接口的场景中,机器可能拥有私有和公有 IP。该参数的配置示例中,公布给客户端的是公有 IP 地址,这样客户端就可以通过公有 IP 正确访问 Kafka Broker。这对于确保在复杂网络环境中客户端与 Broker 的连接十分重要。
假设您有一个 Kafka 集群部署在云环境中,每个 Broker 机器有私有IP和公有IP。您希望集群内的 Broker 使用不同的协议和端口与客户端通信,但客户端需要连接到公有IP。
advertised.listeners=PLAINTEXT://public_ip:9092,SSL://public_ip:9093:这里的public_ip是您的机器的公有IP地址。客户端通过这些公开的地址访问 Broker,确保能够连接到正确的端口以进行通信。
7.6 unclean.leader.election.enable
是否开启 unclean leader 选举。Kafka 社区在1.0.0 版本才正式将该参数默认值调整为 false ;
即表明如果发生这种情况, Kafka 不允许从剩下存活的非 ISR 副本中选择一个当 leader。因为如果
true,这样做固然可以让 Kafka 继续提供服务给 clients ,但会造成消息数据的丢失,正式环境中,数据
不丢失是基本的业务需求;
番外:ISR解释
ISR 全称是in-sync replica ,翻译过来就是与 leader replica 保持同步的 replica 集合 ;一个partition 可以配置N个replica ,那么这是否就意味着该 partition可以容忍 N-1 replica 失效而不丢失数据呢?
答案是“否”!
Kafka partition 动态维护 一个replica 集合。该集合中的所有 replica 保存的消息日志都与leader
replica 保持同步状态。只有这个集合中的 replica 才能被选举为 leader ,也只有该集合中所有 replica
都接收到了同一条消息, Kafka 才会将该消息置于“己提交”状态,即认为这条消息发送成功。
Kafka 中只要这个集合中至少存在一个 replica ,那些“己提交”状态的消息就不会丢失;
这句话的两个关键点:
ISR 中至少存在一个“活着的”replica;replica “己提 ”消息 。Kafka 对于没有提交成功的消息不做任何保证,只保证在 ISR 存活的情况下“己提交”的消息不会丢失。正常情况下,partition 的所有 replica (含 leader replica )都应该与 leader replica 保持同步,即所有replica 都在 ISR 中。因为各种各样的原因,一小部分 replica 开始落后于 leader replica的进度 。当滞后 一定程度时, Kafka会将这些 replica “踢”出 ISR;相反地,当这replica 重新追上leader进度时候,kafka会再次把它们加回ISR中。
7.7 delete.topic.enable
是否允许 Kafka 删除 topic 。
默认情况下, Kafka 集群允许用户删除topic 及其数据。这样当用户发起删除 topic 操作时, broker
端会执行 topic 删除逻辑。
在实际生产环境中我们发现允许 Kafka 删除 opic 其实是一个很方便的功能,再加上自Kafka 0.0 新增的
ACL 权限特性,以往对于误操作和恶意操作的担心完全消失了,因此设置该参数为 true 是推荐的做法。
7.8 log.retention. {hours|minutes|ms }
这组参数控制了消息数据的留存时间;默认的留存时间是7天(168小时);即 Kafka 只会保存最近 7天的
数据,井自动删除 7天前的数据。
当前较新版本的Kafka 会根据消息的时间戳信息进行留存与否的判断;老版本消息格式没有时间戳,
Kafka 会根据日志文件的最近修改时间( last modified time )进行判断。
三个参数如果同时设置,优先选取ms的设置,minutes次之,hours最后。
7.9 log.retention.bytes
这个参数定义了空间维度上的留存策略;参数默认值是-1,表示 Kafka 永远不会根据消息日志文件总大
小来删除日志。
对于大小超过该参数的分区日志而言, Kafka 会自动清理该分区的过期日志段文件。
7.10 min.insync.replicas
该参数表示kafka存储的最小副本数,producer发送数据的时候,指定了 broker 端必须成功响应
clients 消息发送的最少副本数;
该参数其实是与 producer 端的 acks 参数配合使用的 ;并且min.insync.replicas 也只有在 acks=-1 时
才有意义;acks=-1 表示 producer端寻求最高等级的持久化保证;
假如 broker 端无法满足该条件,则 clients 的消息发送并不会被视为成功。它与 acks 配合使用可以令
Kafka集群达成最高等级的消息持久化;
举 个例子,假设某个topic 的每个分区的副本数是3 ,那么推荐设置该参数为 2,这样我们就能够容忍
一台broker 宕机而不影响服务;若设置参数为3 ,那么只要任何一台 broker 岩机,整个Kafka 集群将
无法继续提供服务。
7.11 num.network threads (fastdfs 网络数据读取)
一个 broker 在后台用于处理网络请求的线程数,默认是3 。
broker启动时会创建多个线程处理来自其他broker和clients 发送过来的各种请求。会将接收到的请求转
发到后面的处理线程中。在真实的环境中,用户需要不断地监控NetworkProcessorAvgldlePercent JMX
指标; 如果该指标持续低于0.3 ;建议适当增加该参数的值。
7.12 num.io.threads (处理我们读取到网络数据)
这个参数就是控制 broker 端实际处理网络请求的线程数,默认值是8;
Kafka broker 默认创建 8个线程以轮询方式不停地监昕转发过来的网络请求井进行实时处理。 Kafka 同
样也为请求处理提供了一个 JMX 监控指标 RequestHandler A vgldlePercent。如果发现该指标持续低于
0.3 ,则可以考虑适当增加该参数的值。
7.13 message.max. bytes
Kafka broker 能够接收的最大消息大小,默认是 977kB;还不到1MB ,可见是非常小的。
在实际使用场景中,突破 1MB 大小的消息十分常见,因此用户有必要综合考虑 Kafka 集群可能处理的
最大消息尺寸井设置该参数值
7.14 log.segment.bytes (分片的概念)
(默认: 1GB) – kafka数据文件的大小,确保这个数值大于一个消息的长度。一般说来使用默认值即可(一
般一个消息很难大于1G,因为这是一个消息系统,而不是文件系统)。
# The maximum size of a log segment file. When this size is reached a new log segment will be created. #log.segment.bytes=1073741824 # 配置成5M做测试 log.segment.bytes==5242880
8 总结
RdKafka提供了两种消费者API,低级API的Consumer和高级API的KafkaConsumer。
Kafka Consumer使用流程:
(1)创建Kafka配置实例。
(2)创建Topic配置实例。
(3)设置Kafka配置实例Broker属性。
(4)设置Topic配置实例属性。
(5)注册回调函数。
(6)创建Kafka Consumer客户端实例。
(7)创建Topic实例。
(8)订阅主题。
(9)消费消息。
(10)关闭消费者实例。
(11)销毁释放RdKafka资源。