消息队列之推还是拉,RocketMQ 和 Kafka 是如何做的?(下)

简介: 消息队列之推还是拉,RocketMQ 和 Kafka 是如何做的?(下)

Kafka 中的长轮询


像 Kafka 在拉请求中有参数,可以使得消费者请求在 “长轮询” 中阻塞等待。

简单的说就是消费者去 Broker 拉消息,定义了一个超时时间,也就是说消费者去请求消息,如果有的话马上返回消息,如果没有的话消费者等着直到超时,然后再次发起拉消息请求。

并且 Broker 也得配合,如果消费者请求过来,有消息肯定马上返回,没有消息那就建立一个延迟操作,等条件满足了再返回。

我们来简单的看一下源码,为了突出重点,我会删减一些代码。

先来看消费者端的代码。


image.png


image.png

最后调用的就是 Kafka 包装过的 selector,而最终会调用 Java nio 的 select(timeout)

现在消费者端的代码已经清晰了,我们再来看看 Broker 如何做的

Broker 处理所有请求的入口其实我在之前的文章介绍过,就在 KafkaApis.scala 文件的 handle 方法下,这次的主角就是 handleFetchRequest 。


image.png

image.png


这个炼狱名字取得很有趣,简单的说就是利用我之前文章提到的时间轮,来执行定时任务,例如这里是delayedFetchPurgatory,专门用来处理延迟拉取操作。

我们先简单想一下,这个延迟操作都需要实现哪些方法,首先构建的延迟操作需要有检查机制,来查看消息是否已经到了,然后呢还得有个消息到了之后该执行的方法,还需要有执行完毕之后该干啥的方法,当然还得有个超时之后得干啥的方法。

这几个方法其实对应的就是代码里的 DelayedFetch ,这个类继承了 DelayedOperation 内部有:

  • isCompleted 检查条件是否满足的方法
  • tryComplete 条件满足之后执行的方法
  • onComplete 执行完毕之后调用的方法
  • onExpiration 过期之后需要执行的方法

判断是否过期就是由时间轮来推动判断的,但是总不能等过期的时候再去看消息到了没吧?

这里 Kafka 和 RocketMQ 的机制一样,也会在消息写入的时候提醒这些延迟请求消息来了,具体代码我不贴了, 在 ReplicaManager#appendRecords 方法内部再深入个两方法可以看到。

不过虽说代码不贴,图还是要画一下的。


image.png


小结一下


可以看到 RocketMQ  和 Kafka 都是采用“长轮询”的机制,具体的做法都是通过消费者等待消息,当有消息的时候 Broker 会直接返回消息,如果没有消息都会采取延迟处理的策略,并且为了保证消息的及时性,在对应队列或者分区有新消息到来的时候都会提醒消息来了,及时返回消息。

一句话说就是消费者和 Broker 相互配合,拉取消息请求不满足条件的时候 hold 住,避免了多次频繁的拉取动作,当消息一到就提醒返回。


最后


总的而言推拉模式各有优劣,而我个人觉得一般情况下拉模式更适合于消息队列。

看了这篇文章相信之后面试官问你推还是拉?建议给他个歪嘴笑。

image.png



相关文章
|
1月前
|
消息中间件 存储 缓存
Kafka【基础知识 01】消息队列介绍+Kafka架构及核心概念(图片来源于网络)
【2月更文挑战第20天】Kafka【基础知识 01】消息队列介绍+Kafka架构及核心概念(图片来源于网络)
92 2
|
1月前
|
消息中间件 存储 Kafka
RabbitMQ、RocketMQ和Kafka全面对决,谁是最佳选择?
1、应用场景 1.RabbitMQ: 适用于易用性和灵活性要求较高的场景 异步任务处理:RabbitMQ提供可靠的消息传递机制,适用于处理异步任务,例如将耗时的任务放入消息队列中,然后由消费者异步处理,提高系统的响应速度和可伸缩性。 解耦系统组件:通过使用RabbitMQ作为消息中间件,不同的系统组件可以通过消息进行解耦,实现松耦合的架构,提高系统的可维护性和灵活性。 事件驱动架构:RabbitMQ的发布-订阅模式可以用于构建事件驱动架构,将系统中的事件作为消息发布到相应的主题,不同的消费者可以订阅感兴趣的主题进行相应的处理。
172 2
|
1月前
|
消息中间件 Java
springboot整合消息队列——RabbitMQ
springboot整合消息队列——RabbitMQ
75 0
|
9天前
|
消息中间件 存储 Java
深度探索:使用Apache Kafka构建高效Java消息队列处理系统
【4月更文挑战第17天】本文介绍了在Java环境下使用Apache Kafka进行消息队列处理的方法。Kafka是一个分布式流处理平台,采用发布/订阅模型,支持高效的消息生产和消费。文章详细讲解了Kafka的核心概念,包括主题、生产者和消费者,以及消息的存储和消费流程。此外,还展示了Java代码示例,说明如何创建生产者和消费者。最后,讨论了在高并发场景下的优化策略,如分区、消息压缩和批处理。通过理解和应用这些策略,可以构建高性能的消息系统。
|
18天前
|
消息中间件 存储 负载均衡
消息队列学习之RabbitMQ
【4月更文挑战第3天】消息队列学习之RabbitMQ,一种基于erlang语言开发的流行的开源消息中间件。
15 0
|
22天前
|
消息中间件 存储 负载均衡
消息队列学习之kafka
【4月更文挑战第2天】消息队列学习之kafka,一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台。
17 2
|
27天前
|
消息中间件 存储 Kafka
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
48 1
|
1月前
|
消息中间件 存储 中间件
【SpringCloud Stream消息驱动、设计思想以及整合rabbitmq消息队列案例--学习笔记】
【SpringCloud Stream消息驱动、设计思想以及整合rabbitmq消息队列案例--学习笔记】
50 0
|
1月前
|
消息中间件 缓存 API
|
1月前
|
消息中间件 存储 Cloud Native
【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程
【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程

相关产品

  • 云消息队列 Kafka 版
  • 云消息队列 MQ