开发者社区> 问答> 正文

Consumer 最佳实践有哪几种办法?


Consumer Group,与控制台或者 Demo 中见到的 CID、Group ID 表达同一个意思。如果涉及到具体的参数或者代码,均来源于 Kafka Java 客户端,其它语言客户端原理类似,但具体参数代码可能有差异。
Kafka消费的一般流程是:

  1. Poll数据
  2. 执行消费逻辑
  3. 再次poll数据


负载消费


每个 Consumer Group 可以包含多个消费实例,也即可以启动多个 Kafka Consumer,并把参数 group.id 设置成相同的值。属于同一个 Consumer Group 的消费实例会负载消费订阅的 topic:
[backcolor=transparent]举例:Consumer Group A 订阅了 Topic A,并开启三个消费实例 C1、C2、C3,则发送到 Topic A 的每条消息最终只会传给C1、C2、C3的某一个。Kafka 默认会均匀地把消息传给各个消息实例,以做到消费负载均衡。
Kafka 负载消费的内部原理是,把订阅的 topic 的分区,平均分配给各个消费实例。因此,消费实例的个数不要大于分区的数量。云上分区的数量至少是 16 个,已经足够满足大部分用户的需求,且云上服务会根据容量调整分区数。

多个订阅


一个 Consumer Group 可以订阅多个 topic。一个 topic 也可以被多个 Consumer Group 订阅,且各个 Consumer Group 独立消费 topic 的所有内容
[backcolor=transparent]举例:Consumer Group A 订阅了 Topic A,Consumer Group B 也订阅了 Topic A,则发送到 Topic A 的每条消息,不仅会传一份给 Consumer Group A的消费实例,也会传一份给 Consumer Group B 的消费实例,且这两个过程相互独立,相互没有任何影响。

消费位点


每个Topic会有多个分区,每个分区会统计当前消息的总条数,这个称为最大位点MaxOffset。KafkaConsumer会按顺序依次消费分区内的每条消息,记录已经消费了的消息条数,称为ConsumerOffset。
剩余的未消费的条数(也称为消息堆积量) = MaxOffset - ConsumerOffset

位点提交


Kafka消费者有两个相关参数:
  • enable.auto.commit:默认值为 true。
  • auto.commit.interval.ms: 默认值为1000,也即 1s。

这两个参数组合的结果就是,每次poll时,再拉取数据前会预先做下面这件事:
检查上次提交位点的时间,如果距离当前时间已经超过 auto.commit.interval.ms,则启动位点提交动作。
因此,如果 enable.auto.commit 设置为 true,需要在每次poll时,确保前一次poll出来的数据已经消费完毕,否则可能导致位点跳跃。
如果想自己控制位点提交,则把 enable.auto.commit 设为 false,并调用commit(offsets)函数自行控制位点提交。

消息重复以及消费幂等


Kafka消费的语义是 [backcolor=transparent]At Lease Once, 也就是至少投递一次,保证消息不丢,但是不会保证消息不重复。在出现网络问题、客户端重启时均有可能出现少量重复消息,此时应用消费端,如果对消息重复比较敏感(比如说订单交易类),则应该做到消息幂等。以数据库类应用为例,常用做法是:
  • 发送消息时,传入key作为唯一流水号ID
  • 消费消息时,判断key是否已经消费过,如果已经消费过了,则忽略,如果没消费过,则消费一次

当然,如果应用本身对少量消息重复不敏感,则不需要做此类幂等检查。

消费失败


Kafka是按分区一条一条消息顺序向前消费推进的。如果消费端拿到某条消息后,消费逻辑失败,比如应用服务器出现了脏数据,导致某条消息处理失败,等待人工干预,该怎么办呢?如果失败后,一直尝试再次执行消费逻辑,则有可能造成消费线程阻塞在当前消息,无法向前推进,造成消息堆积。Kafka 自身没有处理失败消息的设计,实践中通常会打印失败的消息、或者存储到某个服务(比如创建一个 topic 专门用来放失败的消息),然后定时check一下失败消息的情况,分析失败原因,根据情况处理。

消费阻塞以及堆积


消费端最常见的问题就是消费堆积,最常造成堆积的原因是:
  • 消费速度跟不上生产速度,此时应该提高消费速度,详情见本文下一节
  • 消费端产生了阻塞

消费端拿到消息后,执行消费逻辑,通常会执行一些远程调用,如果这个时候同步等待结果,则有可能造成一直等待,消费进程无法向前推进。
消费端应该竭力避免堵塞消费线程,如果存在等待调用结果的情况,设置等待的超时时间,超过时间后,作消费失败处理。

消费速度


提高消费速度有两个办法:
  • 增加Consumer实例个数
  • 增加消费线程

增加Consumer实例,可以在进程内直接增加(需要保证每个实例一个线程,否则没有太大意义),也可以部署多个消费实例进程;需要注意的是,实例个数超过分区数量后就不再能提高速度,将会有消费实例不工作;
增加Consumer实例本质上也是增加线程的方式来提升速度,因此更加重要的性能提升方式是增加消费线程,最基本的方式如下:
  1. 定义一个线程池
  2. poll数据
  3. 把数据提交到线程池进行并发处理
  4. 等并发结果返回成功再次poll数据执行


消息过滤


Kafka 自身没有消息过滤的语义。实践中可以采取以下两个办法:
  • 如果过滤的种类不多,可以采取多个Topic的方式达到过滤的目的
  • 如果过滤的种类多,则最好在客户端业务层面自行过滤

实践中根据业务具体情况进行选择,可以综合运用上面两种办法。
[backcolor=transparent]注意:云上 Kafka 会收取 topic 资源占用费,每天2块钱。

消息广播


Kafka自身没有消息广播的语义。可以通过创建不同的 Consumer Group来模拟实现。

订阅关系


同一个 Consumer Group 内,各个消费实例订阅的 topic 最好保持一致,避免给排查问题带来干扰。

展开
收起
猫饭先生 2017-10-27 10:30:21 2030 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
为什么 Apache RocektMQ 是业务消息的首选 立即下载
如何实现应用的持续发布 立即下载
Rocket MQ 使用排查指南 立即下载