SCS 介绍及 RocketMQ Binder 的使用 | 学习笔记

简介: 快速学习 SCS 介绍及 RocketMQ Binder 的使用,介绍了 SCS 介绍及 RocketMQ Binder 的使用系统机制, 以及在实际应用过程中如何使用。

开发者学堂课程【微服务实战-RocketMQ Binder SCS 介绍及 RocketMQ Binder 的使用 】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/617/detail/9420


SCS 介绍及 RocketMQ Binder 的使用


内容介绍:

一、 场景引入

二、 Spring Cloud Stream 介绍

三、总结

 

一、场景引入

Message rocketmq:

Message rocketMsg =new

Message(topic:"myTopic "Hello RocketMQ".getBytes());

SendResult sendResult =p roducer.send(rocketMsg);

RabbitMQ:

String message ="Hello Rab bitMQ";

channel.basicPublish(exchan ge:"",routingKey: "myQueue"

props: null, message.get Bytes(charsetName:"UTF-8")

Kafka:

ProducerRecord! ecord =

new ProducerRecord

( topic: "myTopic", key: "my yKey", value: "Hello Kafka");

producer.send(record);

不同消息中间件发送消息是如何编码的?

首先是 RocketMQ ,需要构造 RocketMQ  的一个Message,然后通过 RocketMQ 的Producer 去发送这个 Message。如果是 RabbitMQ,要通过 RabbitMQ Channel publish 的消息。如果是 Kafka,要构造 producer record,然后通过 Kafka 的 Producer 去发送这个 record。那如果是 active mq 呢?

是不是又是另外一套全新的代码?而且这仅仅是消息的发送。如果涉及到消息的接收,这三种 mq 就是不是又是三套不同的代码? 

那有没有一种框架能够屏蔽这些细节,能够用一套全新的代码,一套统一的代码,来完成消息的发送和接收。

RocketMQ 切换到 Kafka,或者说从 Kafka 到 RocketMQ ,只需要改配置文件就可以了。代码层面,不需要做任何的修改。答案是可以的。这就spring

 

二、 Spring Cloud Stream介绍

image.png

1、概括介绍

Spring 和消息有关的项目有三个。分别是 springMessaging。Spring Integration 以及 spring cut stream。

Spring cutstream 它是依赖于 Spring Messaging Spring Integration。这两个项目对消息做了一些规范的定义。

首先,消息两部分组成,分别是 Header 和 payload。Header 表示消息头。Payload 表示消息体。消息的发送一个很重要的概念是 Message Channel, 就是下级通道。所有的消息都会发送到这个消息通道中,然后消息的接收会使用MessageHandler 去订阅 MessageChannel。直到它把 MessageChannel 所有消息都消费完毕。

2、演示 Spring Integration 对消息是如何操作的

进入 Spring Initializr 项目(这个项目是一个用于快速构建 spring 部应用的)的域名是start.spring.io。它的依赖只有一个 skipper started integration ,因为不太需要 spring boot 特性,把全部的相关的代码删掉。

3、相关步骤

首先要构造 MessageChannel , 然后使用 method handler(匿名)去订阅 message channel, 订阅到之后,会对消息进行处理。直接控制台打印。

第三步,构造一个消息:使用 messagebuilder 去构造消息,它的 payload 直接书写为 simplemessage

最后一把这个消息发送到 messagechannel 中,启动程序,发现消息是一个Genericmessage,它的 Payload SimpleMessage。

DirectChannel channel=new DirectChannel();  

channel.subscribe(new MessageHandler(){

@Override

public void handleMessage(Message message)throws MessagingException {

System.out.println(message);

});

Message msg=MessageBuilder.withPayload("simple msg").build();

channel.send(msg);

} 

Header 有两个属性,一个是 fd ,另一个是 TimeStamp,也可以自定一些 Header(setHeader),如: test ,再进行打印,可以看到 test 的 value 是1。

还可以通过一些拦截器做一些过滤的操作。

比如给 MessageChannel 添加一个拦截器,写个匿名的拦截器,在发送之前,打印一个错误日志(stop message),从位置返null,说明是消息发送失败了这个时候再执行一下,看到stop message消息也没有被消费,因为它返回了 null

这段逻辑就是简单的一个 Spring Integration 对消息做出处理的这么一个过程。涉及到的消息的构造消息订阅、消息的发送。

image.png

4、如何使用 RocketMQ Binder

(1)需要添加 maven 依赖

<groupld>com.alibaba.cloud,spring-cloud-starter-stream-rocketmq,同时需要注意要注意版本对应

版本对应关系如下:

Spring Cloud Version

 Spring Cloud Alibaba Version

Spring Cloud Greenwich

2.0.0.RELEASE

Spring Cloud Edgware

1.5.0.RELEASE

(2)

image.png

首先进入 Spring Initializr,设置坐标和对应的依赖,先使用 web starter ,先生成Producer 项目,再生成一个consumer项,同样使用 web starter ,分别解压这两个项,通过 IntelliJ 导入项目,在 producer 的 maven 中引入 RocketMQ 的 starter,由于使用的版本为2.1.6,版本对应2.1.0

image.png

同样为 consumer 加上 RocketMQ Binder,Producer 端需要一个叫 EnableBinding 的注解,选择 Source。

Import org.springframework.cloud.stream.messaging.Source;

@SpringBootApplication

@EnableBinding(Source)

public class RocketMQBindecProducerApplication{

Source 内容说明它是一个接口,它的唯一方法是 output,返回的是 MessageChannel。

先看 producer 端,producer 涉及到消息的发送,使用 CommandLineRunner 来进行消息的发送。

package com.alibaba.cloud.example.rocketmqbinderproducer;

Import ...

@SpringBootApplication

public class RocketngBinderProducerApplication {

public static void main(String[ ] args){

SpringApplication.run(RocketmqBinderProducerApplication.class,args);

}

public CommandLineRunner commandLineRunner(){

Return new CommandLineRunner(){

@Override

Public void run(String... args) throws Exception{

CommandLineRunner 是一个基础的类,它在应用程序启动完之后会调用 Run 来进行各种你想要的操作,利用功能进行消息的 pass,首先先构造一个 message,使用 MessageChannel 进行消息发送,并发送到 RocketMQ,Messsage 的构造使用 MessageBuilder, Payload(“simple msg”),通过 DirectChannel 消息发送,但是怎么知道对应哪一个 topic,引入一个新的依赖,加入一个新的注解, EnableBinding 会初始化,并加入 Source,使用 Source 中的 output 方法来代替原先的 DirectChannel,这里我们要注入这个 source。因为 Binding 底层构造  Source  一个代理。就通过这个 MessageChannel 发送消息。这里使用 EnableBinding 代替原先的 Directchannel

@SpringBootApplication

@EnableBinding(Source.class)

public class RocketmqBinderProducerApplication {

public static void main(String[] args) {

SpringApplication.run(RocketmqBinderProducerApplication.class,args);

}

@Bean

public CommandLineRunnercommandLineRunner() {

return new CommandLineRunner() {

@Autowired

private Source source;

@Override

public void run(String... args)  throws  ExcePtion{

// 1.先构造 Message

// 2.使用 MessageChannel 进行消息发送,并发送到RocketMo

// 2.1 使用 EnableBinding 代替原先的 DirectChannel

Message msg = MessageBuilder.withPayload("simple msg").build();

//DirectChannel channel = new DirectChannel();

//channel.send(msg);

source.output().send(msg);

}

知道是哪一个 topic:我们看 output 的方法下面有个 output 的注解,并且对应的这个属性是 OUTPUT

/**

* Name of the output channel.

*/

String OUTPUT = "output";

/**

*@return output channel

*/

@Output(Source.OUTPUT)

MessageChannel output();

这个时候就通过  spring.cloud.stream  的一个  Binding 的名叫  output  配置里面有个 Destination 的属性就表示的是 topic。用 rmp-binder-test topic 。

所以这个程序是在 CommandLineRunner 中构造消息,然后把它发送到了为名rmp-binder-test 的一个过程。

5、消息的订阅

Packagecom.alibaba.cloud.example.rocketmqbinderconsumer;

import  ...

@SpringBootApplication

@EnableBinding(Sink.class)

public class RocketngbinderConsumerApplication {

public static void main(String[] args) {

SpringApplication.run(RocketmqBinderConsumerApplication.class, args);

}

// 1.使用 MessageHandler 订阅 MessageChannel

// 1.1使用 StreamListener 订阅消息

@StreamListener

public void receive(String msg) {

System.out.println(msg);

}

使用 MessageHandler 订阅 MessageChannel 使用 StreamListener 订阅消息,同样需要 EnableBinding 来构造Binding,此时对应接口为 Sink。

进而直接打印消息,同时在运行时改造非 web 项:

Public static void main(String[] args) {

New SpringApplicationBuilder(RocketmqBinderConsumerApplication.class).web(webApplicationType.NONE).run(args);

}

在 producer 端也需要改造,然后先启动 consumer 看到日志中的反应,清除日志后运行 producer 可以看到五条消息被消费,改造 producer 发送50条消息。

@Override

public void run(String... args)  throws  exception

// 1.先构造 Message

// 2.使用 MessageChannel 进行消息发送并发送到 RocketMo

//21使用 EnableBinding 代替原先的 DirectChannel

for(int  i=0;  i<50;  i++) {

Message msg=MessageBuilder.withPayload("simple msg").build()

//DirectChannel channel =new DirectChannel();

//channel.send(msg);

source.output().send(msg);

}

能收到所有消息,这时候看 consumer 能看到消费55条消息


三、 总结

消息的发送 

@EnableBinding(Source.class)

public class RocketMQProduceApplication{

...

public interface Source {

@Output("output")

MessageChannel output()

}

Message message =

MessageBuilder.withPayload("msg payload")

.setHeader("custom-header", "alibaba")

.build():

Source.output().send(message);

消息的接收

@EnableBinding(Sink.class)

public class

RocketMQConsumerApplication{

...

}

public interface Sink {

@Input("input")

SubscribableChannel input();

@StreamListener("input")

public void receivelnput(String msg){

System.out.println(msg);

}

对应的配置

spring.cloud.stream.bindings.input.desti nation=rmg-binder-test

spring.cloud.stream.bindings.input grou p=rmq-binder-consumer-group

spring.cloud.stream.bindings.output.des tination=rmq-binder-test

1、 消息的发送

引入了一个新的注解叫 enablebinding,它会通过 source 接口自动执行一个代理类,随即只需要注入 source 即可,可以用它内部的 output 方法来对消息进行发送。其中 output topic 可以在配置里面配置,可以通过前缀,来根据对应的 binding这个名字进行具体的配置。

2、消息的接收

还是使用了 enablebinding 这个注解,代理的这个接口是 sink ,它使用 input 的注解来设置要读取的 topic 内容。

3、最终通过 StreamListener 注解消息做业务逻辑的处理

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 存储 算法
RocketMQ学习笔记
RocketMQ学习笔记
155 0
|
6月前
|
传感器 网络协议 中间件
Mqtt学习笔记--交叉编译移植(1)
Mqtt学习笔记--交叉编译移植(1)
123 0
|
消息中间件 存储 缓存
RibbitMQ学习笔记之MQ练习(三)
RibbitMQ学习笔记之MQ练习
49 0
|
消息中间件 网络协议 数据中心
RabbmitMQ学习笔记-RabbitMQ集群架构模式
RabbmitMQ学习笔记-RabbitMQ集群架构模式
84 0
|
消息中间件 Java
RabbmitMQ学习笔记-RabbitMQ与SpringBoot2.0整合实战
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
116 0
|
消息中间件 中间件
RibbitMQ学习笔记之MQ发布确认
RibbitMQ学习笔记之MQ发布确认
57 0
|
消息中间件 网络协议
RibbitMQ学习笔记之MQ练习(二)
RibbitMQ学习笔记之MQ练习
34 0
|
消息中间件 网络协议 Java
RibbitMQ学习笔记之MQ练习(一)
RibbitMQ学习笔记之MQ练习
78 0
|
消息中间件 存储 网络协议
RibbitMQ学习笔记之MQ 的相关概念
RibbitMQ学习笔记之MQ 的相关概念
82 0
|
消息中间件 存储 缓存
RocketMQ 5.0 可观测能力升级: Tracing 链路追踪介绍|学习笔记
快速学习 RocketMQ 5.0 可观测能力升级: Tracing 链路追踪介绍
824 0
RocketMQ 5.0 可观测能力升级: Tracing 链路追踪介绍|学习笔记