前言
在消息系统中,消费者消费消息有拉和推
消息两种实现方式,拉消息
是消费者主动向消息服务器发送拉消息请求
,消息服务器将消息返回给消费者,而推消息
是消息服务器主动向消费者推送消息
的形式,这两种消息消费实现各有各的优势和劣势。
在RocketMQ中,有两种消费者客户端,一种是Push模式消费者, 一种是Pull模式消费者,这两个其实都是表现,在RocketMQ底层实现中采用长轮询的机制
来实现消息拉取消费功能。
长轮询模式兼顾了拉和推消息的优势。本文将从源码层面分析RocketMQ的消费者角色的工作原理。从整体看下org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
消费者实现类在消费客户端的启动流程,RocketMQ的消费者客户端即支持拉又支持推,但是底层都是基于长轮询的实现。
既然内核底层是长轮训模式,为什么RocketMQ会有Push,Pull客户端呢?
Push只是在客户端,RocketMQ客户端组件将消息拉取到本地后,自动回调我们业务方的业务逻辑执行消费。
Pull模式,其实也是Rocketmq会将消息拉取到本地,我们的业务代码主动从本地读消息进行消费。
这两种客户端对我们开发人员更加友好,可以自己选择最合适的消费方式。
Push消费者启动流程
看源码前,我们可以思考下,消费者能够从broker拉取到消息主要需要做哪些事情?
1、首先肯定需要和Broker创建连接
,RocketMQ底层使用Netty组件通信,肯定会使用Netty和Broker建立连接。
2、和Broker建立连接之后,需要告诉Broker拉取Topic
下哪个队列
的消息,这里就需要用到消费者负载均衡机制
了。
3、向Broker拉取消息,还有一个关键的属性,那就是消息拉取偏移量offset
, 消费者需要知道下次从哪个偏移量开始拉取消息。在RocketMQ中广播消息模式是将偏移量存储在消费者本地, 集群消息模式是将消息偏移量存储在Broker。因此消费者启动时还需要加载好各个topic的偏移量
分析了上面主线逻辑之后,我们可以看看源码是如何做的。
消费者启动流程在源码 org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
加载偏移量存储服务。
偏移量根据消费模式有两个策略,如果是广播��息偏移量存储在本地
,如果是集群消息则存储在broker
。
消费者调用业务逻辑执行消费逻辑
消费者实例启动,里面的主逻辑非常清晰。
org.apache.rocketmq.client.impl.factory.MQClientInstance#start
public void start() throws MQClientException {
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// 启动netty客户端,和Broker建立通信连接
this.mQClientAPIImpl.start();
// 定时一些任务
this.startScheduledTask();
// 启动拉消息的服务,通过线程异步拉取
this.pullMessageService.start();
// 启动负载均衡服务
this.rebalanceService.start();
// 启动内置消息发送服务 this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
//更新消费者启动状态
this.serviceState = ServiceState.RUNNING;
}
通过上面的步骤消费者实例就启动完成了。
我们发现,消费者客户端主要包含了几个核心组件服务
1、偏移量加载
服务 => 记录消息消费进度
2、消息拉取
服务 => 建立连接,拉取消息
3、负载均衡
服务 => 分配消费队列
4、消息消费
服务 => 回调消费消息业务逻辑
和我们一开始的想的没有很大差别。 这里体现了单一职责设计原则
的设计思想,不同的类专门负责各自不同的职责。
看到这里,我们就把消费者客户端启动过程的主线捋清楚了。
客户端拉取消息请求发起
我们接着看最主要的流程,拉取消息服务PullMessageService
, 看看他做了哪些事情。
点进去看发现他是一个线程任务。自然我们就会想到去看他的run方法实现,因此我们可以猜测,他是通过异步拉取消息的。
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
从阻塞队列拿拉数据请求
private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
每个拉取请求里包含了对应的消费者组,消息队列,拉取偏移量,从这些信息就可以知道要拉取哪个topic
下面哪个队列
,哪个偏移量
开始的消息了。
public class PullRequest {
private String consumerGroup;
private MessageQueue messageQueue;
private ProcessQueue processQueue;
private long nextOffset;
private boolean lockedFirst = false;
在org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl
中会发起拉消息请求,将消费者组,topic,队列号,偏移量传给broker。
我们可以注意到,上面还有两个重要的参数brokerSuspendMaxTimeMillis
,timeoutMillis
这两个个暂停时间参数就和大名鼎鼎的长轮询机制有着密切的关系。
brokerSuspendMaxTimeMillis
参数表示broker在没有消息拉取的时候会暂停等待多久返回给消费者客户端,
timeoutMillis
则是表示消费者客户端最长等待多久broker返回消息的。
在消费者客户端,我们可以看到timeoutMillis
的使用,控制了请求超时时间。
看看默认的超时时间配置
brokerSuspendMaxTimeMillis
默认15s, 也就是broker会在没有消息拉取停15s,再返回客户端
timeoutMillis
默认30s,也就是消费者客户端会等待broker30s。
下文我们还可以看到broker端对拉取消息的处理时对参数brokerSuspendMaxTimeMillis
的应用。
消费者实例已经发起拉取消息请求了,broker把消息返回后,消费者做什么呢?
消费者需要对消息内容解码,然后将消息分发给我们写的业务代码执行。
这里就会把拉取到消息,通过任务的形式投递给消息消费处理线程池,线程池把消息转交给业务类处理。
我们经常会写个Listerner来监听RocketMQ消息,这种消费消息的方式,RocketMQ就是在Push模式中实现的。
消费完消息,还需要更新最新的消息消费偏移量,在这里RocketMQ会同步更新内存中的偏移量
,不会直接远程调用Broker。
看到这几个点,RocketMQ中,拉取消息支持异步
,消费消息异步
,还有偏移量更新到Brocker是异步
,都是异步的,这种就像io线程和业务线程做了隔离的设计思想,如果要说RocketMQ性能高的原因,这个点也可以说。
看完上面的主线逻辑,我们其实已经能够说明白RocketMQ消费者端的主流程了。
下面看看Broker端处理拉取消息请求的流程
Broker端处理拉取消息
Broker端有个专门处理类处理拉取消息请求,PullMessageProcessor
。
我们也可以想下,RocketMQ拉取消息时需要做哪些事情?
1、因为消息是存储在CommitLog中,CommitLog对应的是一个一个的文件,怎么定位到具体的文件呢?也就是怎么定位到具体的CommitLog?
2、我们订阅消息消息,可能会使用不同的tag,tag会传到Broker端,因此Broker端需要做tag过滤
RocketMQ如何定位CommitLog?
在Broker端,会先根据topic和队列号定位到ConsumeQueue
。
ConsumeQueue
是Commitlog文件的索引,里面缓存了所有的Commitlog文件信息和物理偏移量信息
通过ConsumeQueue就可以知道的CommitLog的物理偏移量了。
消息查询前会根据tag的hash值进行过滤,这里是不是经常问消息tag是在哪过滤的? 消息的tag的会以hash
值形式存储在ConsumeQueue,由于ConsumeQueue是索引
,所以Broker端查询消息前会进行过滤。
执行完消息过滤,就可以真正的查询消息啦。查询消息就是返回了ByteBuffer
。
到这里,消息查询出来了,但是长轮询
处理在哪呢?在拉取结果处理的地方。
Broker将当前请求加入等待队列,response置为null, 这样不会立即把响应写到消费者客户端。
长轮询机制
的核心实现马上就要揭开神秘面纱了。 Broker通过将当前请求的快照信息存储下来。
每5秒检查一次每个等待的请求队列里是否有新消息,有新消息就会返回消息给消费者客户端,或者超时了也不会再等待。
看到这里,Broker处理消息拉取逻辑也清晰了。
总结
本文分析了RocketMQ消费消息逻辑的实现原理,我们平时写消费者逻辑应该是比较多的,消费消息本质上是我们应用程序和Broker服务进行rpc通信,交换业务请求和消息的过程。
我们发现RocketMQ在代码设计上有几个比较突出的优点,值得我们学习和借鉴
1、代码的单一职责化
很明显,各个类都各司其职,专门处理不同领域的逻辑。
2、线程池的使用
,将消息拉取,消息偏移量同步,消息长轮询机制等都通过线程池,异步处理,同时也把消息拉取和消息消费进行了解耦
,确实非常妙。