开发者学堂课程【微服务实战-RocketMQ Binder :SCS 介绍及 RocketMQ Binder 的使用 】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/617/detail/9420
SCS 介绍及 RocketMQ Binder 的使用
内容介绍:
一、 场景引入
二、 Spring Cloud Stream 介绍
三、总结
一、场景引入
Message rocketmq:
Message rocketMsg =new
Message(topic:"myTopic "Hello RocketMQ".getBytes());
SendResult sendResult =p roducer.send(rocketMsg);
RabbitMQ:
String message ="Hello Rab bitMQ";
channel.basicPublish(exchan ge:"",routingKey: "myQueue"
props: null, message.get Bytes(charsetName:"UTF-8")
Kafka:
ProducerRecord! ecord =
new ProducerRecord
( topic: "myTopic", key: "my yKey", value: "Hello Kafka");
producer.send(record);
不同消息中间件发送消息是如何编码的?
首先是 RocketMQ ,需要构造 RocketMQ 的一个Message,然后通过 RocketMQ 的Producer 去发送这个 Message。如果是 RabbitMQ,要通过 RabbitMQ 的 Channel 去 publish 它的消息。如果是 Kafka,要构造 producer record,然后通过 Kafka 的 Producer 去发送这个 record。那如果是 active mq 呢?
是不是又是另外一套全新的代码?而且这仅仅是消息的发送。如果涉及到消息的接收,这三种 mq 就是不是又是三套不同的代码?
那有没有一种框架能够屏蔽这些细节,能够用一套全新的代码,一套统一的代码,来完成消息的发送和接收。
从 RocketMQ 切换到 Kafka,或者说从 Kafka 到 RocketMQ ,只需要改配置文件就可以了。代码层面,不需要做任何的修改。答案是可以的。这就是 spring
二、 Spring Cloud Stream介绍
1、概括介绍
Spring 和消息有关的项目有三个。分别是 springMessaging。Spring Integration 以及 spring cut stream。
其中 Spring cutstream 它是依赖于 Spring Messaging 和 Spring Integration。这两个项目对消息做了一些规范的定义。
首先,消息由两部分组成,分别是 Header 和 payload。Header 表示消息头。Payload 表示消息体。消息的发送有一个很重要的概念是 Message Channel, 就是下级通道。所有的消息都会先发送到这个消息通道中,然后消息的接收会使用MessageHandler 去订阅 MessageChannel。直到它把 MessageChannel 中所有消息都消费完毕。
2、演示 Spring Integration 对消息是如何操作的
进入 Spring Initializr 项目(这个项目是一个用于快速构建 spring 部应用的)。它的域名是:start.spring.io。它的依赖只有一个 skipper started integration ,因为不太需要 spring boot 特性,把全部的相关的代码删掉。
3、相关步骤
首先要构造 MessageChannel , 然后使用 method handler(匿名)去订阅 message channel, 订阅到之后,会对消息进行处理。直接在控制台打印。
第三步,构造一个消息:使用 messagebuilder 去构造消息,它的 payload 直接书写为 simplemessage。
最后一步,把这个消息发送到 messagechannel 中,启动程序,发现消息是一个Genericmessage,它的 Payload 是 SimpleMessage。
DirectChannel channel=new DirectChannel();
channel.subscribe(new MessageHandler(){
@Override
public void handleMessage(Message message)throws MessagingException {
System.out.println(message);
});
Message msg=MessageBuilder.withPayload("simple msg").build();
channel.send(msg);
}
Header 有两个属性,一个是 fd ,另一个是 TimeStamp,也可以自定一些 Header(setHeader),如: test ,再进行打印,可以看到 test 的 value 是1。
还可以通过一些拦截器做一些过滤的操作。
比如给 MessageChannel 添加一个拦截器,写个匿名的拦截器,在发送之前,打印一个错误日志(stop message),从位置返回 null,说明是消息发送失败了,这个时候再执行一下,看到了 stop message,消息也没有被消费,因为它返回了 null。
这段逻辑就是简单的一个 Spring Integration 对消息做出处理的这么一个过程。涉及到的消息的构造、消息的订阅、消息的发送。
4、如何使用 RocketMQ Binder
(1)需要添加 maven 依赖
<
groupld>com.alibaba.cloud,spring-cloud-starter-stream-rocketmq
,同时需要注意要注意版本对应
版本对应关系如下:
Spring Cloud Version |
Spring Cloud Alibaba Version |
Spring Cloud Greenwich |
2.0.0.RELEASE |
Spring Cloud Edgware |
1.5.0.RELEASE |
(2)
首先进入 Spring Initializr,设置坐标和对应的依赖,先使用 web starter ,先生成Producer 项目,再生成一个consumer项,同样使用 web starter ,分别解压这两个项,通过 IntelliJ 导入项目,在 producer 的 maven 中引入 RocketMQ 的 starter,由于使用的版本为2.1.6,版本对应2.1.0。
同样为 consumer 加上 RocketMQ Binder,Producer 端需要一个叫 EnableBinding 的注解,选择 Source。
Import org.springframework.cloud.stream.messaging.Source;
@SpringBootApplication
@EnableBinding(Source)
public class RocketMQBindecProducerApplication{
Source 内容说明它是一个接口,它的唯一方法是 output,返回的是 MessageChannel。
先看 producer 端,producer 涉及到消息的发送,使用 CommandLineRunner 来进行消息的发送。
package com.alibaba.cloud.example.rocketmqbinderproducer;
Import ...
@SpringBootApplication
public class RocketngBinderProducerApplication {
public static void main(String[ ] args){
SpringApplication.run(RocketmqBinderProducerApplication.class,
args);
}
public CommandLineRunner commandLineRunner(){
Return new CommandLineRunner(){
@Override
Public void run(String... args) throws Exception{
CommandLineRunner 是一个基础的类,它在应用程序启动完之后会调用 Run 来进行各种你想要的操作,利用功能进行消息的 pass,首先先构造一个 message,使用 MessageChannel 进行消息发送,并发送到 RocketMQ,Messsage 的构造使用 MessageBuilder, Payload(“simple msg”),通过 DirectChannel 消息发送,但是怎么知道对应哪一个 topic,引入一个新的依赖,加入一个新的注解, EnableBinding 会初始化,并加入 Source,使用 Source 中的 output 方法来代替原先的 DirectChannel,这里我们要注入这个 source。因为 Binding 底层会构造 Source 的一个代理。就通过这个 MessageChannel 发送消息。这里使用 EnableBinding 代替原先的 Directchannel。
@SpringBootApplication
@EnableBinding(Source.class)
public class RocketmqBinderProducerApplication {
public static void main(String[] args) {
SpringApplication.run(RocketmqBinderProducerApplication.class,args);
}
@Bean
public CommandLineRunnercommandLineRunner() {
return new CommandLineRunner() {
@Autowired
private Source source;
@Override
public void run(String... args) throws ExcePtion{
// 1.先构造 Message
// 2.使用 MessageChannel 进行消息发送,并发送到RocketMo
// 2.1 使用 EnableBinding 代替原先的 DirectChannel
Message msg = MessageBuilder.withPayload("simple msg").build();
//DirectChannel channel = new DirectChannel();
//channel.send(msg);
source.output().send(msg);
}
知道是哪一个 topic:我们看 output 的方法下面有个 output 的注解,并且对应的这个属性是 OUTPUT,
/**
* Name of the output channel.
*/
String OUTPUT = "output";
/**
*@return output channel
*/
@
Output(Source.OUTPUT)
MessageChannel output();
这个时候就通过 spring.cloud.stream 的一个 Binding 的名叫 output 的配置,里面有个 Destination 的属性就表示的是 topic。用 rmp-binder-test topic 。
所以这个程序是在 CommandLineRunner 中构造消息,然后把它发送到了为名rmp-binder-test 的一个过程。
5、消息的订阅
Packagecom.alibaba.cloud.example.rocketmqbinderconsumer;
import ...
@SpringBootApplication
@EnableBinding(Sink.class)
public class RocketngbinderConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(RocketmqBinderConsumerApplication.class, args);
}
// 1.使用 MessageHandler 订阅 MessageChannel
// 1.1使用 StreamListener 订阅消息
@StreamListener
public void receive(String msg) {
System.out.println(msg);
}
使用 MessageHandler 订阅 MessageChannel 使用 StreamListener 订阅消息,同样需要 EnableBinding 来构造Binding,此时对应接口为 Sink。
进而直接打印消息,同时在运行时改造非 web 项:
Public static void main(String[] args) {
New SpringApplicationBuilder(RocketmqBinderConsumerApplication.class).web(webApplicationType.NONE).run(args);
}
在 producer 端也需要改造,然后先启动 consumer 看到日志中的反应,清除日志后运行 producer 可以看到五条消息被消费,改造 producer 发送50条消息。
@Override
public void run(String... args) throws exception
// 1.先构造 Message
// 2.使用 MessageChannel 进行消息发送,并发送到 RocketMo
//21使用 EnableBinding 代替原先的 DirectChannel
for(int i=0; i<50; i++) {
Message msg=MessageBuilder.withPayload("simple msg").build()
//DirectChannel channel =new DirectChannel();
//channel.send(msg);
source.output().send(msg);
}
能收到所有消息,这时候看 consumer 能看到消费55条消息
三、 总结
消息的发送
@EnableBinding(Source.class)
public class RocketMQProduceApplication{
...
public interface Source {
@Output("output")
MessageChannel output()
}
Message message =
MessageBuilder.withPayload("msg payload")
.setHeader("custom-header", "alibaba")
.build():
Source.output().send(message);
消息的接收
@EnableBinding(Sink.class)
public class
RocketMQConsumerApplication{
...
}
public interface Sink {
@Input("input")
SubscribableChannel input();
@StreamListener("input")
public void receivelnput(String msg){
System.out.println(msg);
}
对应的配置
spring.cloud.stream.bindings.input.desti nation=rmg-binder-test
spring.cloud.stream.bindings.input grou p=rmq-binder-consumer-group
spring.cloud.stream.bindings.output.des tination=rmq-binder-test
1、 消息的发送
引入了一个新的注解叫 enablebinding,它会通过 source 接口自动执行一个代理类,随即只需要注入 source 即可,可以用它内部的 output 方法来对消息进行发送。其中 output 对应 topic 可以在配置里面配置,可以通过前缀,来根据对应的 binding这个名字进行具体的配置。
2、消息的接收
还是使用了 enablebinding 这个注解,代理的这个接口是 sink ,它使用 input 的注解来设置要读取的 topic 内容。
3、最终通过 StreamListener 注解对消息做业务逻辑的处理。