消息队列应用场景:
通过异步处理提高系统性能(减少响应所需时间)
削峰/限流
降低系统耦合性
消息队列对比
Kafka:
- 追求高吞吐量,一开始用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务,大型公司可以选用。
- 如果有日志采集功能,肯定首选 Kafka。
RocketMQ:
- 天生为金融互联网领域而生,对于可靠性要求很高的场景。
- 尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。
- RoketMQ 在稳定性上可能更值得信赖。
- 这些业务场景在阿里双 11 已经经历了多次考验,如果业务有上述并发场景,建议选择 RocketMQ。
RabbitMQ:
- 结合 Erlang 语言本身的并发优势,性能较好,社区活跃度也比较高,但是不利于做二次开发和维护。
- 不过 RabbitMQ 的社区十分活跃,可以解决开发过程中遇到的 Bug。
- 如果数据量没有那么大,小公司优先选择功能比较完备的 RabbitMQ。
基础知识
版本号
版本命名:Scala 2.11 - kafka_2.11-2.1.1
前面的版本号是编译 Kafka 源代码的 Scala 编译器版本。Kafka 服务器端的代码完全由 Scala 语言编写,Scala 同时支持面向对象编程和函数式编程。
真正的 Kafka 版本号实际上是 2.1.1。
前面的 2 表示大版本号,即 Major Version;中间的 1 表示小版本号或次版本号,即 Minor Version;最后的 1 表示修订版本号,也就是 Patch 号。
为什么吞吐量大、速度快?
顺序读写:
Kafka的message是不断追加到本地磁盘文件末尾的,而不是随机的写入。
Page Cache:
通过操作系统的Page Cache,Kafka的读写操作基本上是基于内存的,读写速度得到了极大的提升。
零拷贝,批量读写,批量压缩。
分区分段+索引:
Kafka的message是按topic分类存储的,topic中的数据又是按照一个一个的partition即分区存储到不同broker节点。
每个partition对应了操作系统上的一个文件夹,partition实际上又是按照segment分段存储的。
Kafka又默认为分段后的数据文件建立了索引文件,就是文件系统上的
.index
文件。
基本概念
主题:
- Topic主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务。
分区:
- Partition,一个有序不变的消息序列,每个主题下可以有多个分区。
消息位移:
- Offset,表示分区中每条消息的位置信息,是一个单调递增切不变的值。
副本:
- Replica,进行数据冗余。
生产者:
- Producer,向主题发送新消息的应用程序。
消费者:
- Consumer,从主题订阅新消息的应用程序。
消费者位移:
- Consumer Offset,表征消费者消费进度。
消费者组:
- Consumer Group,多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐量。
HW
HW (High Watermark)
俗称高水位,它标识了一个特定的消息偏移量(offset
),消费者只能拉取到这个offset
之前的消息。
LEO
LEO (Log End Offset),标识当前日志文件中下一条待写入的消息的offset。
上图中offset为9的位置即为当前日志文件的 LEO,LEO 的大小相当于当前日志分区中最后一条消息的offset值加1。
分区 ISR 集合中的每个副本都会维护自身的 LEO,而 ISR 集合中最小的 LEO 即为分区的 HW,对消费者而言只能消费 HW 之前的消息。
消息模型
消费者
消费者策略
RangeAssignor
:默认消费者策略。
对一个消费者组来说,消费方式是以分区总数除以消费者总数来决定,如果不能整除,往往是从头开始将剩余的分区分配。
RoundRobinAssignor
:对于同一组消费者来说,使用轮训的方式来决定消费者消费的分区,既依次分配一个,直到分区被分配完毕。
StickyAssignor
,是在0.11.x
新增的,保证分配最大程度地平衡,同时保留尽可能多的现有分区分配。
意思就是前面两个当同组内有新的消费者加入或者旧的消费者退出的时候,会从新开始决定消费者消费方式,但是Sticky在同组中有新的消费者加入或者旧的消费者退出时,不会直接开始重构分配策略,而是保留现有消费者消费策略,将退出的消费者所消费的分区平均分配给现有消费者,新增消费者同理,同其他现存消费者的消费策略中分离。
CooperativeStickyAssignor
,它继承了StickyAssignor的逻辑,但允许重构分区策略。
Push和Pull
Kafka消费端是通过主动拉取消息的方式来消费的。
消费者组
Consumer Group是指组内有多个消费者或消费者实例,它们共享一个公共的 ID,这个 ID 被称为 Group ID。组内的所有消费者协调在一起来消费订阅主题的所有分区。
Consumer Group 下可以有一个或多个 Consumer 实例。
Group ID 是一个字符串,在一个 Kafka 集群中,它标识唯一的一个 Consumer Group。
Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内的某个 Consumer 实例消费,这个分区也可以被其他的 Group 消费。
点对点模型和发布/订阅模型
如果所有实例都属于同一个 Group,那么它实现的就是消息队列模型;如果所有实例分别属于不同的 Group,那么它实现的就是发布/订阅模型。
Consumer实例个数
理想情况下,Consumer 实例的数量应该等于该 Group 订阅主题的分区总数。
位移主题
Consumer 的位移管理机制就是将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 __consumer_offsets
中。
__consumer_offsets
的主要作用是保存 Kafka 消费者的位移信息。它要求这个提交过程不仅要实现高持久性,还要支持高频的写操作。
位移主题怎么被创建的?
当 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建位移主题。
位移主题的分区由 Broker 端参数
offsets.topic.num.partitions
设置,默认值是 50,因此 Kafka 会自动创建一个 50 分区的位移主题。
副本数由 Broker 端参数 offsets.topic.replication.factor
设置,它的默认值是 3。
怎么提交位移?
提交位移的方式有两种:自动提交位移和手动提交位移。
Consumer 端有个参数叫 enable.auto.commit
,如果值是 true,则 Consumer 在后台默默地定期提交位移,提交间隔由一个参数 auto.commit.interval.ms
来控制。
问题:只要 Consumer 一直启动着,它就会无限期地向位移主题写入消息。
假设 Consumer 当前消费到了某个主题的最新一条消息,位移是 100,之后该主题没有任何新消息产生,故 Consumer 无消息可消费了,所以位移永远保持在 100。
由于是自动提交位移,位移主题中会不停地写入位移 =100 的消息。
Kafka 使用 Compact 策略来删除位移主题中的过期消息,避免该主题无限期膨胀。
Kafka 提供了专门的后台线程定期巡检待 Compact 的主题,看看是否存在满足条件的可删除数据。后台线程叫 Log Cleaner。
重平衡
Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。
比如某个 Group 下有 20 个 Consumer 实例,它订阅了一个具有 100 个分区的 Topic。正常情况下,Kafka 平均会为每个 Consumer 分配 5 个分区。这个分配的过程就叫 Rebalance。
Rebalance的触发条件:
组成员数发生变更:比如有新的 Consumer 实例加入组或者离开组。
订阅主题数发生变更。
订阅主题的分区数发生变更:当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。
在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。
可能发生Rebalance的场景
1、未能及时发送心跳,导致 Consumer 被踢出Group而引发的。
#单位ms 设置心跳传送时间几毫秒一次 ,默认是3000ms heartbeat.interval.ms #单位ms 多长时间没有心跳,后连接超时,默认10000ms session.timeout.ms
2、Consumer消费时间过长导致的,默认10分钟。
分区机制
分区策略
分区策略是决定生产者将消息发送到哪个分区的算法。
轮询策略(默认)
顺序分配。比如一个主题下有 3 个分区,那么第一条消息被发送到分区 0,第二条被发送到分区 1,第三条被发送到分区 2,以此类推。当生产第 4 条消息时又会重新开始,即将其分配到分区 0。
随机策略
随意地将消息放置到任意一个分区上。
消息键策略
每条消息定义消息Key,同一个 Key 的所有消息都进入到相同的分区里面。
压缩机制
在 Kafka 中,压缩发生在两个地方:生产者端和 Broker 端。
生产者程序中配置 compression.type
参数即表示启用指定类型的压缩算法。
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 开启GZIP压缩 props.put("compression.type", "gzip"); Producer<String, String> producer = new KafkaProducer<>(props);
何时解压缩?
当消息到达 Consumer 端后,由 Consumer 自行解压缩还原成之前的消息。
拦截器
Kafka 拦截器分为生产者拦截器和消费者拦截器。
生产者拦截器允许在发送消息前以及消息提交成功后植入拦截器逻辑;而消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑。
当前 Kafka 拦截器的设置方法是通过参数配置完成的。生产者和消费者两端有一个相同的参数,名字叫 interceptor.classes
,它指定的是一组类的列表,每个类就是特定逻辑的拦截器实现类。
假设第一个拦截器的完整类路径是 com.yourcompany.kafkaproject.interceptors.AddTimeStampInterceptor
,第二个类是com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor
:
Properties props = new Properties(); List<String> interceptors = new ArrayList<>(); interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); ......
副本机制
同一个分区下有多个副本,分散保存在不同的Broker 上,能够对抗部分 Broker 宕机带来的数据不可用。
副本角色
追随者副本是不对外提供服务的,任何一个追随者副本都不能响应消费者和生产者的读写请求,所有的请求都必须由领导者副本来处理,追随者副本不处理客户端请求,它唯一的任务就是从领导者副本异步拉取消息,并写入到自己的提交日志中,从而实现与领导者副本的同步。
当领导者副本挂掉了,或者说领导者副本所在的 Broker 宕机时,Kafka 依托于ZooKeeper 提供的监控功能能够实时感知到,并立即开启新一轮的领导者选举,从追随者副本中选一个作为新的领导者,老 Leader 副本重启回来后,只能作为追随者副本加入到集群中。
ISR副本集合
ISR 中的副本都是与 Leader 同步的副本,相反,不在 ISR 中的追随者副本就被认为是与 Leader 不同步的。
Leader 副本天然就在 ISR 中。也就是说,ISR 不只是追随者副本集合,它必然包括 Leader 副本。甚至在某些情况下,ISR 只有 Leader 这一个副本。
Follower 是否与 Leader 同步的标准:
Broker 端参数
replica.lag.time.max.ms
参数。这个参数的含义是Follower 副本能够落后 Leader 副本的最长时间间隔,默认值是 10 秒。只要一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于Leader 副本中的消息。
倘若该副本后面慢慢地追上了 Leader 的进度,那么它是能够重新被加回ISR 的,ISR 是一个动态调整的集合,而非静态不变的。
Unclean领导者选举
通常来说,非同步副本落后Leader 太多,因此,如果选择这些副本作为新 Leader,就可能出现数据的丢失。
在 Kafka 中,选举这种副本的过程称为 Unclean 领导者选举。Broker 端参数
unclean.leader.election.enable
控制是否允许 Unclean 领导者选举。
选举机制
控制器(Broker)选举
控制器就是一个Borker。
在Kafka集群中,有多个Broker节点,但是它们之间需要选举出一个Leader,其他的Broker充当Follower角色。
集群中第一个启动的Broker会通过在Zookeeper中创建临时节点
/controller
来让自己成为控制器。其他Broker启动时也会在Zookeeper中创建临时节点,但是发现节点已经存在,所以它们会收到一个异常,意识到控制器已经存在,那么就会在Zookeeper中创建Watch对象,便于它们收到控制器变更的通知。
优先副本选举
如果一个分区的Leader副本不可用,就意味着整个分区不可用,此时需要从Follower副本中选举出新的Leader副本提供服务。
优先副本
指一个分区所在的AR集合的第一个副本。
比如分区1,它的AR集合是
[2,0,1]
,表示分区1的优先副本就是在Broker2
上。
- 理想情况下,优先副本应该就是Leader副本。
对分区Leader副本进行选举的时候,尽可能让优先副本成为Leader副本。
控制器
控制器主要作用是在 ZooKeeper 的帮助下管理和协调整个 Kafka 集群。
集群中任意一台 Broker 都能充当控制器的角色,但是在运行过程中只能有一个 Broker 成为控制器。
Broker 在启动时,会尝试去 ZooKeeper 中创建 /controller
节点。
第一个成功创建
/controller
节点的 Broker 会被指定为控制器。
控制器的作用:
1.主题管理(创建、删除、增加分区)
2.分区重分配
3.Preferred 领导者选举
4.集群成员管理(新增 Broker、Broker 主动关闭、Broker 宕机)
自动检测新增 Broker、Broker 主动关闭及被动宕机。
- 比如,控制器组件会利用Watch 机制检查 ZooKeeper 的
/brokers/ids
节点下的子节点数量变更。目前,当有新 Broker 启动后,它会在
/brokers
下创建专属的 znode 节点。一旦创建完毕,ZooKeeper 会通过 Watch 机制将消息通知推送给控制器。
5.数据服务
向其他 Broker 提供数据服务。控制器上保存了最全的集群元数据信息,其他所有 Broker 会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据。
控制器故障转移(Failover)
当运行中的控制器突然宕机或意外终止时,Kafka 能够快速地感知到,并立即启用备用控制器来代替之前失败的控制器。
- 该过程是自动完成的。
幂等性
在 0.11.0.0
版本支持增加了对幂等的支持。幂等是针对生产者角度的特性。幂等可以保证上生产者发送的消息,不会丢失,而且不会重复。
开启幂等性配置
只需要把 Producer 的配置 enable.idempotence
设置为 true 即可
props.put(“enable.idempotence”, ture) //或者 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)
底层具体实现原理就是用空间换时间的优化思路,即在broker端多存一些字段来标识数据的唯一性。
当Producer发送了具有相同字段值的消息后,broker会进行匹配去重,丢弃重复的数据。
他只能保证单分区上的幂等性,即一个幂等性Producer只能够保证某个topic的一个分区上不出现重复消息,无法实现多分区的幂等。此外,如果Producer重启,也会导致幂等重置。
事务
事务Producer保证消息写入分区的原子性,即这批消息要么全部写入成功,要么全失败。此外,Producer重启回来后,kafka依然保证它们发送消息的精确一次处理。
开启enable.idempotence = true
设置Producer端参数transctional.id
数据的发送需要放在beginTransaction和commitTransaction之间。
Consumer端的代码也需要加上isolation.level
参数,用以处理事务提交的数据。
producer.initTransactions(); try { producer.beginTransaction(); producer.send(record1); producer.send(record2); producer.commitTransaction(); } catch (KafkaException e) { producer.abortTransaction(); }
事务Producer虽然在多分区的数据处理上保证了幂等,但是处理性能上相应的是会有一些下降的。
数据存储
Kafka 消息以 Partition 作为存储单元,每个 Topic 的消息被一个或者多个 Partition 进行管理。
- Partition 是一个有序的,不变的消息队列,消息总是被追加到尾部。
- 一个 Partition 不能被切分成多个散落在多个 Broker 上或者多个磁盘上。
Partition 又划分成多个 Segment 来组织数据。
Segment 在它的下面还有两个组成部分:
- 索引文件:以
.index
后缀结尾,存储当前数据文件的索引。- 数据文件:以
.log
后缀结尾,存储当前索引文件名对应的数据文件。
请求模型
请求到Broker后,也会通过类似于请求转发的组件Acceptor转发到对应的工作线程上,Kafka中被称为网络线程池,一般默认每个Broker上为3个工作线程,可以通过参数 num.network.threads
进行配置。
并且采用轮询的策略,可以很均匀的将请求分发到不同的网络线程中进行处理。
但是实际的处理请求并不是由网络线程池进行处理的,而是会交给后续的IO线程池,当网络线程接受到请求的时候,会将请求写入到共享的请求队列中,而IO线程池会进行异步的处理,默认情况下是8个,可以通过
num.io.threads
进行配置。
常见场景
重复消费
consumer 在消费过程中,应用进程被强制kill掉或发生异常退出。
例如在一次poll500条消息后,消费到200条时,进程被强制kill消费到offset未提交,或出现异常退出导致消费到offset未提交。
下次重启时,依然会重新拉取500消息,造成之前消费到200条消息重复消费了两次。
消费者消费时间过长。
max.poll.interval.ms
参数定义了两次poll的最大间隔,它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起 离开组 的请求,Coordinator 也会开启新一轮 Rebalance。因为上次消费的offset未提交,再次拉取的消息是之前消费过的消息,造成重复消费。
提高消费能力,提高单条消息的处理速度;根据实际场景
max.poll.interval.ms
值设置大一点,避免不必要的rebalance;可适当减小
max.poll.records
的值,默认值是500,可根据实际消息速率适当调小。
消息丢失
消费者程序丢失数据
Consumer 程序从 Kafka 获取到消息后开启了多个线程异步处理消息,而 Consumer 程序自动地向前更新位移。
假如某个线程运行失败了,它负责的消息没有被成功处理,但位移已经被更新了,因此这条消息对于 Consumer 而言实际上是丢失了。
最佳配置:
不要使用
producer.send(msg)
,而要使用producer.send(msg, callback)
。设置 acks = all:
- 设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是 已提交。
设置 retries 为一个较大的值。
- 当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了
retries > 0
的 Producer 能够自动重试消息发送,避免消息丢失。设置
unclean.leader.election.enable = false
。设置
replication.factor >= 3
。
- 防止消息丢失的主要机制就是冗余。
设置
min.insync.replicas > 1
。
- 控制的是消息至少要被写入到多少个副本才算是 已提交 。
- 设置成大于 1 可以提升消息持久性。
- 在实际环境中千万不要使用默认值 1。
确保
replication.factor > min.insync.replicas
。
- 如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。
确保消息消费完成再提交。
- Consumer 端有个参数
enable.auto.commit
,最好把它设置成 false,并采用手动提交位移的方式。
消息顺序
乱序场景一
因为一个topic可以有多个partition,kafka只能保证partition内部有序。
1、可以设置topic 有且只有一个partition。
2、根据业务需要,需要顺序的指定为同一个partition。
乱序场景二
对于同一业务进入了同一个消费者组之后,用了多线程来处理消息,会导致消息的乱序。
消费者内部根据线程数量创建等量的内存队列,对于需要顺序的一系列业务数据,根据key或者业务数据,放到同一个内存队列中,然后线程从对应的内存队列中取出并操作。