消息中间件-RocketMQ技术(一)

简介: 消息中间件-RocketMQ技术(一)

一、为什么我要写RocketMQ呢?

a、公司用的原因:因为刚到新的公司,后期我会结合公司的用到的情况,把涉

及到线上的关于RocketMQ的问题的经验分享大家。

b、自己也想买了一些书关于RocketMQ的书籍,到时总结出来写成文章。

c、听说以前叮咚买菜的公司的中间件是叮咚买菜的CTO用c语言写的,虽然性能挺强,但是好像不能够通用的原因,后期给pass了。现在转为阿里出的RocketMQ。

二、如何去研究一款陌生的中间件系统

研究一款开源中间件,首先我们需要了解它的整体的架构以及如何在开发环境调试源码,从代码入手才能快速熟悉一个开源项目,只有这样才能够抽丝剥茧地理解透彻,了解作者的设计思想和实现原理。

三、如何去获取和调试RocketMQ的源代码

我这边不用eclipse开发工具来搭建RocketMQ源代码了,直接上IntelliJ IDEA。

step1:

在Intellij  IDEA  VCS 菜单中选择 Check from. Version Control,再选择Git,然后

弹出对话框,如下图所示:

step2:

我是基于maven的方式来构建源代码的,所以之后build下就可以了,然后就成了如下图中的样子:

四、调试RocketMQ源码

1、启动NameServer

step1:

需要在如下图中创建一个ROCKETMQ_HOME的RocketMQ的运行主目录:

step2:

在RocketMQ运行的主目录中创建conf,logs,store三个文件夹。

step3:

从RocketMQ distribution部署目录中将broker.conf,logback_broker.xml,logback_namesrv.xml文件复制到自己创建的conf目录下。然后把broker.conf文件内容做下修改如下:

然后另外两个文件只需要修改对应的路径为RocketMQ的运行的主目录就ok了。

2、启动Broker

step1:

需要配置-c属性指定broker配置文件路径,以及RocketMQ主目录,如下图所示:

3、分别启动nameserver和broker,nameserver看控制台,broker看日志是否报错,如下图

上面的步骤都成功的话就代表RocketMQ在本地部署好了源码

五、体验一把发送消息的生产者和消费消息的消费者的代码吧

step1:

生产者的代码如下:


/**

* 实例生产着

*/

public class Producer {

   public static void main(String[] args) throws MQClientException, InterruptedException {

       DefaultMQProducer producer = new DefaultMQProducer("pgroup");

       producer.setNamesrvAddr("127.0.0.1:9876");

       producer.start();

       for (int i = 0; i < 1000; i++) {

           try {

               Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ" + i).getBytes());

               SendResult result = producer.send(msg);

               System.out.println(result);

           } catch (Exception e) {

               e.printStackTrace();

               Thread.sleep(100);

           }

       }

       producer.shutdown();

   }

}

step2:

消费者的代码如下:

/**

* 消费者

*/

public class Consumer {

   public static void main(String[] args) throws InterruptedException, MQClientException {

       DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("xx-consumer");

       consumer.setNamesrvAddr("127.0.0.1:9876");

       consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

       consumer.subscribe("TopicTest","*");

       consumer.registerMessageListener(new MessageListenerConcurrently() {

           @Override

           public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

               System.out.printf("%s Receive New " +

                      "Messages: %s %n", Thread.currentThread().getName(), msgs);

               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

           }

       });

       consumer.start();

 }

}

step3:

生产者运行的结果如下,我贴出一小部分,因为各个线程去消费的结果都是一样的结构,只是值不同。

SendResult [sendStatus=SEND_OK, msgId=2409891E92603FCD047CA03AE754E85F077618B4AAC2486B4B4A03D9, offsetMsgId=C0A82B0900002A9F000000000006DB6E, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=496]

消费端运行的结果如下,这个我也贴出一小部分,因为各个线程去消费的结果都是一样的结构,只是值不同。

ConsumeMessageThread_14 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=226, queueOffset=485, sysFlag=0, bornTimestamp=1618421391164, bornHost=/192.168.43.9:50693, storeTimestamp=1618421391164, storeHost=/192.168.43.9:10911, msgId=C0A82B0900002A9F000000000006B578, commitLogOffset=439672, bodyCRC=185152384, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=500, CONSUME_START_TIME=1618421582967, UNIQ_KEY=2409891E92603FCD047CA03AE754E85F077618B4AAC2486B4B3C03AE, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 57, 52, 50], transactionId='null'}]]

六、RocketMQ的设计理念

1、设计理念如下:

RocketMQ的核心功能:消息发送,消息存储,消息消费。整体设计追求简

单和性能非常高的理念,主要体现如下几个方面:

a、NameServer设计简单

nameserver用来实现元数据的管理,因为Topic路由信息无需在集群之间保持强一致性,最终一致性就可以了。所以nameserver集群之间互不通信。降低了nameserver的复杂度以及对网络的要求也降低了不少。

b、高效的IO存储机制

存储文件设置成文件组,组内单个文件的大小固定,方便引入内存映射机制

主题的消息是顺序写的,提升消息的写性能

引入消息队列文件和索引文件来兼顾消息消费和消息查找

c、容忍设计上的缺陷

RocketMQ设计者的难题:不能同时保证消息一定能被消息消费者消费,并且再保证只消费一次。而是只保证消息被消费者消费,但是设计上是允许消息被重复消费的。这样简化了消息中间件的内核,而且消息发送高可用变得非常简单与高效 ,消息重复问题需要在消费时实现幂等就可以了。


明天继续~~

相关文章
|
7天前
|
弹性计算 人工智能 架构师
阿里云携手Altair共拓云上工业仿真新机遇
2024年9月12日,「2024 Altair 技术大会杭州站」成功召开,阿里云弹性计算产品运营与生态负责人何川,与Altair中国技术总监赵阳在会上联合发布了最新的“云上CAE一体机”。
阿里云携手Altair共拓云上工业仿真新机遇
|
4天前
|
机器学习/深度学习 算法 大数据
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
2024“华为杯”数学建模竞赛,对ABCDEF每个题进行详细的分析,涵盖风电场功率优化、WLAN网络吞吐量、磁性元件损耗建模、地理环境问题、高速公路应急车道启用和X射线脉冲星建模等多领域问题,解析了问题类型、专业和技能的需要。
2463 14
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
|
4天前
|
机器学习/深度学习 算法 数据可视化
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码
2024年中国研究生数学建模竞赛C题聚焦磁性元件磁芯损耗建模。题目背景介绍了电能变换技术的发展与应用,强调磁性元件在功率变换器中的重要性。磁芯损耗受多种因素影响,现有模型难以精确预测。题目要求通过数据分析建立高精度磁芯损耗模型。具体任务包括励磁波形分类、修正斯坦麦茨方程、分析影响因素、构建预测模型及优化设计条件。涉及数据预处理、特征提取、机器学习及优化算法等技术。适合电气、材料、计算机等多个专业学生参与。
1499 14
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码
|
1月前
|
运维 Cloud Native Devops
一线实战:运维人少,我们从 0 到 1 实践 DevOps 和云原生
上海经证科技有限公司为有效推进软件项目管理和开发工作,选择了阿里云云效作为 DevOps 解决方案。通过云效,实现了从 0 开始,到现在近百个微服务、数百条流水线与应用交付的全面覆盖,有效支撑了敏捷开发流程。
19273 29
|
1月前
|
人工智能 自然语言处理 搜索推荐
阿里云Elasticsearch AI搜索实践
本文介绍了阿里云 Elasticsearch 在AI 搜索方面的技术实践与探索。
18822 20
|
1月前
|
Rust Apache 对象存储
Apache Paimon V0.9最新进展
Apache Paimon V0.9 版本即将发布,此版本带来了多项新特性并解决了关键挑战。Paimon自2022年从Flink社区诞生以来迅速成长,已成为Apache顶级项目,并广泛应用于阿里集团内外的多家企业。
17515 13
Apache Paimon V0.9最新进展
|
6天前
|
编解码 JSON 自然语言处理
通义千问重磅开源Qwen2.5,性能超越Llama
击败Meta,阿里Qwen2.5再登全球开源大模型王座
361 11
|
1月前
|
存储 人工智能 前端开发
AI 网关零代码解决 AI 幻觉问题
本文主要介绍了 AI Agent 的背景,概念,探讨了 AI Agent 网关插件的使用方法,效果以及实现原理。
18697 16
|
2天前
|
算法 Java
JAVA并发编程系列(8)CountDownLatch核心原理
面试中的编程题目“模拟拼团”,我们通过使用CountDownLatch来实现多线程条件下的拼团逻辑。此外,深入解析了CountDownLatch的核心原理及其内部实现机制,特别是`await()`方法的具体工作流程。通过详细分析源码与内部结构,帮助读者更好地理解并发编程的关键概念。
|
2天前
|
SQL 监控 druid
Druid连接池学习
Druid学习笔记,使用Druid进行密码加密。参考文档:https://github.com/alibaba/druid
195 82