rocketmq消费源码

简介: rocketmq消费源码

mq消费源码

依赖


com.aliyun.openservices

ons-client


阿里云rocketmq消息队列

参考https://blog.csdn.net/alan_liuyue/article/details/86645887 SpringBoot整合阿里云rocketmq消息队列,创建生产者和消费者实例

进来createConsumer方法


return new ConsumerImpl(ONSUtil.extractProperties(properties));


可以看到阿里云 opensevices提供许多mq的功能,包括创建生产者、创建消费者、批量创建消费者、批量创建生产者、创建顺序Consumer(全家顺序和局部顺序,和上面的并发消费区分开来),顺序消费的使用场景是某些业务必须同步完成的时候可以使用,就是这一个业务线的数据都发到同一个queue中并且使用同一个消费者来消费,这样相对于每个queue就是有序的

最后一个是创建事务生产者,所以做rocketmq包括阿里云那套对常见的方法封装的比较好,这也是这个mq的优势之一

接着回来ConsumerImpl 消费的源码


这个方法中先调用了父类ONSConsumerAbstract中的构造方法,在ONSConsumerAbstract的构造方法中又调用了ONSClientAbstract的构造方法。


ONSClientAbstract的构造方法检查参数,包括连接地址、访问密钥等

private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
public Thread newThread(Runnable r) {
return new Thread(r, “ONSClient-UpdateNameServerThread”);
}
});

然后有个线程池,每90秒检测一次nameServer地址是否变更

这块逻辑其实consumer与producer公用的。ProducerImpl也会去调用父类方法super(properties);



DefaultMQPushConsumer,核心代码,ONSConsumerAbstract的构造方法中创建消费者,使用的是push的方式。

回顾一下push pull区别


push方式,consumer把轮询过程封装了,并注册messagelistener监听器,取到消息后,唤醒messagelister的consumerMessage消费,感觉队列被push过来进行订阅。

pull方式,消息自己去取,首先通过打算消费topic拿到messagequeue集合,遍历messagequeue集合,然后针对每个messgequeue批量取消息,一次取完后,记录该队列下一次要取的offset,直到取完了,再换另一个messagequeue.


https://blog.csdn.net/qq_21383435/article/details/101113808

到回DefaultMQPushConsumer之后,配置项设置consumerGroup,实例名称,nameServer地址,消费者线程的最小和最大线程数,包括一些在PropertyKeyConst类中的配置(客户端缓存消息数量,客户端缓存最大内容)。



最后setPostSubscriptionWhenPull设置为false,

consumer.setMessageModel(MessageModel.CLUSTERING)集群模式(消费模式分集群、广播)


总结:消费方式mq ;设置Properties[图片]获取yml里面的访问密钥、密钥、mq服务器地址、设置发送超时时间,单位毫秒 1000=1s;






然后获取消费组进行遍历,以及groupid,然后就可以创建消费者,创建消费者上面说了,

创建方式构造方法,[图片]把这些参数初始化,然后做一些校验和在赋值。

拉取消息的前置和后置处理类的创建

// 为Consumer增加消息轨迹回发模块
String msgTraceSwitch = properties.getProperty(PropertyKeyConst.MsgTraceSwitch);
if (!UtilAll.isBlank(msgTraceSwitch) && (!Boolean.parseBoolean(msgTraceSwitch))) {
log.info(“MQ Client Disable the Trace Hook!”);
} else {
try {
Properties tempProperties = new Properties();
tempProperties.put(OnsTraceConstants.AccessKey, sessionCredentials.getAccessKey());
tempProperties.put(OnsTraceConstants.SecretKey, sessionCredentials.getSecretKey());
tempProperties.put(OnsTraceConstants.MaxMsgSize, “128000”);
tempProperties.put(OnsTraceConstants.AsyncBufferSize, “2048”);
tempProperties.put(OnsTraceConstants.MaxBatchNum, “100”);
tempProperties.put(OnsTraceConstants.NAMESRV_ADDR, this.getNameServerAddr());
tempProperties.put(OnsTraceConstants.InstanceName, “PID_CLIENT_INNER_TRACE_PRODUCER”);
tempProperties.put(OnsTraceConstants.TraceDispatcherType, OnsTraceDispatcherType.CONSUMER.name());
AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, sessionCredentials);
dispatcher.setHostConsumer(defaultMQPushConsumer.getDefaultMQPushConsumerImpl());
traceDispatcher = dispatcher;
this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
new OnsConsumeMessageHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error(“system mqtrace hook init failed ,maybe can’t send msg trace data”, e);
}
}

然后回来就是设置pull flase,集群模式。

然后把创建的消费者去订阅消息

consumerBean.subscribe(consumer.getTopic(), consumer.getTag(),

SpringUtil.getBean(consumer.getBeanName()));





subscribeTable放入订阅map,[图片]topic和tag过来消费者订阅消息了,[图片],发送的时候用了new ReentrantLock()锁

public void sendHeartbeatToAllBrokerWithLock() {
if (this.lockHeartbeat.tryLock()) {
try {
this.sendHeartbeatToAllBroker();
this.uploadFilterClassSource();
} catch (Exception var5) {
this.log.error(“sendHeartbeatToAllBroker exception”, var5);
} finally {
this.lockHeartbeat.unlock();
}
} else {
this.log.warn(“lock heartBeat, but failed.”);
}
}

https://blog.csdn.net/weixin_27252001/article/details/112730756

push:

理下流程:


首先 new DefaultMQPushConsumer 对象,并指定一个消费组名。

然后设置相关参数,例如 nameSrvAdd、消费失败重试次数、线程数等

通过调用 setConsumeFromWhere 方法指定初次启动时从什么地方消费,默认是最新的消息开始消费。

通过调用 setAllocateMessageQueueStrategy 指定队列负载机制,默认平均分配。

通过调用 registerMessageListener 设置消息监听器,即消息处理逻辑,最终返回 CONSUME_SUCCESS(成功消费)或 RECONSUME_LATER(需要重试)。


pull

首先根据 MQConsumer 的 fetchSubscribeMessageQueues 的方法获取 Topic 的所有队列信息

然后遍历所有队列,依次通过 MQConsuemr 的 PULL 方法从 Broker 端拉取消息。

对拉取的消息进行消费处理

通过调用 MQConsumer 的 updateConsumeOffset 方法更新位点,但需要注意的是这个方法并不是实时向 Broker 提交,而是客户端会启用以线程,默认每隔 5s 向 Broker 集中上报一次。


https://blog.csdn.net/weixin_41098980/article/details/79880957

1.集群消费方式

一个ConsumerGroup中的Consumer实例平均分摊消费生产者发送的消息。例如某个Topic有九条消息,其中一个Consumer Group有三个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的3条消息,Consumer不指定消费方式的话默认是集群消费的,适用于大部分消息的业务

2.广播消费方式

一条消息被多个Consumer消费,几十这些Consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer消费一次,广播消费中的ConsumerGroup概念可以认为在消息划分层面没有意义,适用于一些分发消息的场景,比如我订单下单成功了,需要通知财务系统,客服系统等等这种分发的场景,可以通过修改Consumer中的MessageModel来设置消费方式为广播消费


//广播

BROADCASTING(“BROADCASTING”),

//集群

CLUSTERING(“CLUSTERING”);

源码

String messageModel = properties.getProperty(“MessageModel”, “CLUSTERING”);

this.defaultMQPushConsumer.setMessageModel(MessageModel.valueOf(messageModel));


相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
Java API 网络架构
关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码3
关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码
1868 0
关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码3
|
消息中间件 Java API
RocketMQ事务消息, 图文、源码学习探究~
介绍 RocketMQ是阿里巴巴开源的分布式消息中间件,它是一个高性能、低延迟、可靠的消息队列系统,用于在分布式系统中进行异步通信。 从4.3.0版本开始正式支持分布式事务消息~ RocketMq事务消息支持最终一致性:在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 原理、流程 本质上RocketMq的事务能力是基于二阶段提交来实现的 在消息发送上,将二阶段提交与本地事务绑定 本地事务执行成功,则事务消息成功,可以交由Consumer消费 本地事务执行失败,则事务消息失败,Consumer无法消费 但是,RocketMq只能保证本地事务
|
消息中间件 Apache 开发工具
RocketMQ-初体验RocketMQ(08)-IDEA拉取调测RocketMQ源码
RocketMQ-初体验RocketMQ(08)-IDEA拉取调测RocketMQ源码
159 0
|
消息中间件 数据可视化 Go
Rabbitmq 搭建使用案例 [附源码]
Rabbitmq 搭建使用案例 [附源码]
120 0
|
10月前
|
消息中间件 存储 监控
深度写作:深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(Message Queue, MQ)扮演着至关重要的角色。MQ不仅实现了应用间的解耦,还提供了异步消息处理、流量削峰等功能。而在MQ的众多特性中,长轮询(Long Polling)机制因其能有效提升消息处理的实时性和效率,备受关注。
338 12
|
10月前
|
消息中间件 存储 Java
深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(MQ)作为一种重要的中间件,广泛应用于解耦、异步处理、流量削峰等场景。其中,延时消息和定时消息作为MQ的高级功能,能够进一步满足复杂的业务需求。为了实现这些功能,MQ系统需要进行一系列优化,长轮询机制便是其中的关键一环。本文将深入探讨MQ如何设计延时消息和定时消息的优化机制,特别是长轮询机制的实现原理及其在Java中的模拟实现。
184 2
|
消息中间件 中间件 Kafka
RocketMQ源码(二)消息消费的模式到底是Push还是Pull?
RocketMQ源码(二)消息消费的模式到底是Push还是Pull?
339 1
|
11月前
|
传感器 数据可视化 网络协议
DIY可视化整合MQTT生成UniApp源码
DIY可视化整合MQTT生成UniApp源码
195 0
|
消息中间件 Java 调度
【深度挖掘RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)
【深度挖掘RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)
192 1
|
消息中间件 Java RocketMQ
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
96 1