RocketMQ工程代码分析|学习笔记

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 快速学习RocketMQ工程代码分析

开发者学堂课程【高校精品课-厦门大学 -JavaEE 平台技术RocketMQ工程代码分析学习笔记,与课程紧密联系,让用户快速学习知识

课程地址:https://developer.aliyun.com/learning/course/80/detail/15955


RocketMQ工程代码分析


通过例子来看 rocket MQ的这个程序怎样去写,在这里中间,我们首先会告诉大家如何去安装启动这个 rocket MQ的 name server和 beiker,以及去看这个 never broke 的这个运行状况,第二我们会展示一个简单的生产者和消费者的例子,我们利用了这个 rockman kill的 spring boot的 starter,可以非常快的来实现一个这个生产者和消费者。

第三个部分,我们用了这个 rocket MQ的在阿帕奇基金会上头,在阿帕奇上面的一个这个孵化的项目叫做 rocket MQ的 external,这里头它有很多的东西,我们用的是其中的这个 rocking to console这个工程,通过这个工程我们可以用 web去看一下在这个 rocket MQ的这个博客上面的一些相关的信息,就可以去看看到我们在前面的所讲的这个 Rocket MQ的队列是怎样被管理的,以及队列中间的消息是怎样分配给不同的消费者。

image.png

我们下面来看一下这个生产者和消费者是怎样来运行,这个代码在JAVA1PLATFORM 的 rocket MQ底下,一个工程叫做 rocket MQ DEMO,在这个过程中间,我们写了两个消费者和生产者的代码,首先来看一下这个生产者的部分,生产者的部分我们在这个工程中间,因为我们引入了这个 rocket MQ的

starter,它使得我们的生产者和消费者的代码就写起来非常的简单。

三者代码的这个部分,我们可以看到我们定义了一个 service,这个 service里面写了两个方法,分别用来产生这个日志的消息和我们支付的那个延时的消息。在这个日志这边,我们写的是这个 message。那这里可以看到说我们的消息的发送的消息的格式,主要是殉情的消息,所以我们把数据通过这个 jackson的 you tell把它转成了一个json格式的数据,然后用 message的 message build,把它构造成为了这个我们的消息这里头,加上了这个消息的头,然后就利用这个 rocket mq template把它发送出去,那第一个在这个日志的消息,这边我们发送的是单向的消息,因为我们不需要知道他的回执,直接发出去就好了,再发送单向消息的时候,我们用了方法的第一个参数是它的这个 topic和它的 tag,冒号后面的是这个 tag,然后第二个参数就是它的消息,因为不需要知道回执,直接发出去就好,这是第一个方法,第二个方法是想发送一个延时的支付消息,当一个订单完成之后,30分钟内需要支付完成,没有完成就需要把订单取消掉,到这我们用 rocket mq的延时发送的这样的一种方式,发送了一个消息,让他在十秒之后再来发送,这样我们可以比较快的来看到这样的一个效果,在 rocket mq中间其实他没有专门的发送延时的这个方法,他就是用的这个异步发送的方式,在异步发送的方式的这个方法里头,这个template 的这个 st这个方法里头,

他第一个参数是它的这个 topic,我们没有定义它的 tag,

第二个是他的这个消息,我们看到消息的方式我们依然用 sg build把它构造出来这个消息,

第三个参数至少得回调函数,因为异步的这个调用,他需要知道是否发送成功了还是没成功,在这个回调函数里头,我们定义两个方法,一个是 access,一个是 an exception,这个 an success就是回调成功,我们会打一个日志出来,exception就是发送没成功,也会打出一个日志出来,后面两个参数是 timeout,最后一个是delaylevel,设置的是3,应该是10秒,间隔是固定的,所以十秒之后,这个消息他真正会发到这个消费者对中间,生产者这边我们可以看到我们定义的是一个 service对象,然后利用 rocket MQ template 把它发送出去,消费者这一边在利用这个

starter rocket MQ started。

定义消费者来说也非常的简单,我们这里定义了两个消费者,一个是用来消费日志类消息的这个消费者,就是订阅了这个 lock topic这个 topic的这个消费者,另外一个是定义了这个topic的这个消费者,

image.png

我们首先来看这个 lock topic 的这个消费者,这里我们写在这个 rocket mq的start里头,我们要定一个消费者需要去实现的这个 rocket mq listener的这个方法,这个方法的后面的这个泛型其实是指定这个消息的类型,我现在用的是属性的消息,第二个接口大家看到的是这个黑色的生命周期接口,因为我们后面有一个place的方法,在vs在启动之前可以打一个日志出来,对于第一个方法来说这个 rocket mq listener里头 其实就一个方法 on success,所以当消费者收到了消息以后,调用这个方法,把消息发送过来,因为我们定义的类型是 string,我们这里把这个 string型的转成了json的一个对象,这是这个 listener的定义,那这个listener到底订阅了哪一个topic,我们在上面可以看到她用了一个 rocket mq的这样的一个注解,定义了他订阅的 topic是什么,它的tag是什么,这个 select expression就是他的type,那定义了它的这个消费的模式,它用的是这个并行消费的模式,有多少个现成的消费以及他所属的这个消费者组是哪一个组,最后不要忘记这个定义成了一个service,因为它需要让 spring的容器,把它构造出来成为一个bin对象,这是第一个这个log的消费。

image.png

第二个是 Payconsumerlistener,定义我们就没有去实现它的生命周期的那个接口,所以说大家可以看到之前的前面第一个 rock you listen的这个接口,所以他只有一个方法就是 string message,同样的我们把这个类型是 string拿出来以后,变成对象,把它输出来,而且我们输出了一个时间,大家可以看一下这个他拿到这个消息的时间和他的这个发送的时间是不是真的差了十秒,前面那个 rocket mq同样可以看到它定义了他订阅的消息是 old-pay-topic的这个 topic消费的模式,也是并行的十个现成,以及他的消费组,消费者组是这个 pay-group,这是两个这个消费者,

image.png

最后我们来看一下它的这个主程序,这里跟之前手续不一样,我们实现了一个这个command line,因为我们需要再跑起来,所以要把这些方法调用起来,所以我们实现了一个 command line runner,那在这里头我们可以看到在这里头是实现了这个 command center的一个接口中间所定义的方法叫做 run ,run里头大家可以看到我们首先第一个是十六的log对象,然后通过我们 rocket MQ service的这个 bin对象发出,调用他的这个 set log Message把这个发出去,然后第二个是我们定义了一个 orderID的这个值,然后通过这个 setorderPayMessage这个方法,把它发送出去。这个的定义非常的简单,其实里头主要是加入了我们的这个 rocket MQ的starter,其他的跟我们正常的文件没有什么太大的区别。我们加入了一个这个

rocket spring boot starter这样的一个 starter。这是我们的这个工程。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
8月前
|
消息中间件 存储 算法
RocketMQ学习笔记
RocketMQ学习笔记
173 0
|
8月前
|
传感器 网络协议 中间件
Mqtt学习笔记--交叉编译移植(1)
Mqtt学习笔记--交叉编译移植(1)
155 0
|
消息中间件 存储 缓存
RibbitMQ学习笔记之MQ练习(三)
RibbitMQ学习笔记之MQ练习
56 0
|
消息中间件 网络协议 数据中心
RabbmitMQ学习笔记-RabbitMQ集群架构模式
RabbmitMQ学习笔记-RabbitMQ集群架构模式
96 0
|
消息中间件 Java
RabbmitMQ学习笔记-RabbitMQ与SpringBoot2.0整合实战
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
122 0
|
消息中间件 中间件
RibbitMQ学习笔记之MQ发布确认
RibbitMQ学习笔记之MQ发布确认
62 0
|
消息中间件 网络协议
RibbitMQ学习笔记之MQ练习(二)
RibbitMQ学习笔记之MQ练习
42 0
|
消息中间件 网络协议 Java
RibbitMQ学习笔记之MQ练习(一)
RibbitMQ学习笔记之MQ练习
90 0
|
消息中间件 存储 网络协议
RibbitMQ学习笔记之MQ 的相关概念
RibbitMQ学习笔记之MQ 的相关概念
90 0
|
消息中间件 存储 缓存
RocketMQ 5.0 可观测能力升级: Tracing 链路追踪介绍|学习笔记
快速学习 RocketMQ 5.0 可观测能力升级: Tracing 链路追踪介绍
846 0
RocketMQ 5.0 可观测能力升级: Tracing 链路追踪介绍|学习笔记