一、为什么我要写RocketMQ呢?
a、公司用的原因:因为刚到新的公司,后期我会结合公司的用到的情况,把涉
及到线上的关于RocketMQ的问题的经验分享大家。
b、自己也想买了一些书关于RocketMQ的书籍,到时总结出来写成文章。
c、听说以前叮咚买菜的公司的中间件是叮咚买菜的CTO用c语言写的,虽然性能挺强,但是好像不能够通用的原因,后期给pass了。现在转为阿里出的RocketMQ。
二、如何去研究一款陌生的中间件系统
研究一款开源中间件,首先我们需要了解它的整体的架构以及如何在开发环境调试源码,从代码入手才能快速熟悉一个开源项目,只有这样才能够抽丝剥茧地理解透彻,了解作者的设计思想和实现原理。
三、如何去获取和调试RocketMQ的源代码
我这边不用eclipse开发工具来搭建RocketMQ源代码了,直接上IntelliJ IDEA。
step1:
在Intellij IDEA VCS 菜单中选择 Check from. Version Control,再选择Git,然后
弹出对话框,如下图所示:
step2:
我是基于maven的方式来构建源代码的,所以之后build下就可以了,然后就成了如下图中的样子:
四、调试RocketMQ源码
1、启动NameServer
step1:
需要在如下图中创建一个ROCKETMQ_HOME的RocketMQ的运行主目录:
step2:
在RocketMQ运行的主目录中创建conf,logs,store三个文件夹。
step3:
从RocketMQ distribution部署目录中将broker.conf,logback_broker.xml,logback_namesrv.xml文件复制到自己创建的conf目录下。然后把broker.conf文件内容做下修改如下:
然后另外两个文件只需要修改对应的路径为RocketMQ的运行的主目录就ok了。
2、启动Broker
step1:
需要配置-c属性指定broker配置文件路径,以及RocketMQ主目录,如下图所示:
3、分别启动nameserver和broker,nameserver看控制台,broker看日志是否报错,如下图
上面的步骤都成功的话就代表RocketMQ在本地部署好了源码
五、体验一把发送消息的生产者和消费消息的消费者的代码吧
step1:
生产者的代码如下:
/**
* 实例生产着
*/
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("pgroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 1000; i++) {
try {
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ" + i).getBytes());
SendResult result = producer.send(msg);
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(100);
}
}
producer.shutdown();
}
}
step2:
消费者的代码如下:
/**
* 消费者
*/
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("xx-consumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New " +
"Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
step3:
生产者运行的结果如下,我贴出一小部分,因为各个线程去消费的结果都是一样的结构,只是值不同。
SendResult [sendStatus=SEND_OK, msgId=2409891E92603FCD047CA03AE754E85F077618B4AAC2486B4B4A03D9, offsetMsgId=C0A82B0900002A9F000000000006DB6E, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=496]
消费端运行的结果如下,这个我也贴出一小部分,因为各个线程去消费的结果都是一样的结构,只是值不同。
ConsumeMessageThread_14 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=226, queueOffset=485, sysFlag=0, bornTimestamp=1618421391164, bornHost=/192.168.43.9:50693, storeTimestamp=1618421391164, storeHost=/192.168.43.9:10911, msgId=C0A82B0900002A9F000000000006B578, commitLogOffset=439672, bodyCRC=185152384, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=500, CONSUME_START_TIME=1618421582967, UNIQ_KEY=2409891E92603FCD047CA03AE754E85F077618B4AAC2486B4B3C03AE, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 57, 52, 50], transactionId='null'}]]
六、RocketMQ的设计理念
1、设计理念如下:
RocketMQ的核心功能:消息发送,消息存储,消息消费。整体设计追求简
单和性能非常高的理念,主要体现如下几个方面:
a、NameServer设计简单
nameserver用来实现元数据的管理,因为Topic路由信息无需在集群之间保持强一致性,最终一致性就可以了。所以nameserver集群之间互不通信。降低了nameserver的复杂度以及对网络的要求也降低了不少。
b、高效的IO存储机制
存储文件设置成文件组,组内单个文件的大小固定,方便引入内存映射机制
主题的消息是顺序写的,提升消息的写性能
引入消息队列文件和索引文件来兼顾消息消费和消息查找
c、容忍设计上的缺陷
RocketMQ设计者的难题:不能同时保证消息一定能被消息消费者消费,并且再保证只消费一次。而是只保证消息被消费者消费,但是设计上是允许消息被重复消费的。这样简化了消息中间件的内核,而且消息发送高可用变得非常简单与高效 ,消息重复问题需要在消费时实现幂等就可以了。
明天继续~~