开发者学堂课程【全面讲解 Spring Cloud Alibaba 技术栈(知识精讲+项目实战)第四阶段:消息类型-普通消息】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/686/detail/11909
消息类型-普通消息
普通消息
发送不同类型的消息在 RocketMQ 中根据类型可以将消息分为三类,第一个普通消息,第二个顺序消息,第三个事务消息。
RocketMQ 提供三种方式来发送普通消息:可靠同步发送、可靠异步发送和单向发送。
可靠是能够从 RocketMQ 里得到消息的发送结果,同步异步是结果是否要等到返回之后才进行发送,可靠同步发送指的是向 RocketMQ 发送一个消息,直到 RocketMQ 返回结果以后才进行消息的下一次发送。异步是向 RocketMQ 发送一个消息,不管返回结果继续发送下一个消息,消息的返回结果会回调的形式回调。单向发送只负责发送消息,不会等待服务器的回应,也没有设置任何的回调
1、可靠同步发送
同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。(适用于必须拿到返回结果下一次发送的场景)
此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。
2、可靠异步发送
异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。发送方通过回调
接口接收服务器响应,并对响应结果进行处理。
异步发送一般用干链路耗时较长,对RT响应时间较为敏威的业务场量,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。(转码时间很长,不会等待转码成功以后再继续而是接着去转码下一个,转码成功以后通过回调函数来通知。)
3、单向发送
单向发送是指发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。
适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
<!--依赖-->
<dependency>
<groupId>orq.springframework.boot</qroup
I
d>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>junit</groupid>
<artifactId>junit</artifactid>
</dependency>
//测试
@Runwith(SpringRunner.class)
@SpringBootTest(classes=OrderApplication.class) public
class MessageTypeTest {
@Autowired
private RocketMQTemplate rocketMQTemplate;
//同步消息
@Test
public void testsyncSend
()
{
//参数一:topic, 如果想添加tag 可以使用"topic:tag"的写法
/
/参数二:消息内容
SendResult sendResult=
rocketMQTemplate.syncSend("test-topic-1","这是一条同步消息"
System.out.println(sendResult);
}
//异步消息
@Test
public void testAsyncsend() throws InterruptedException{
public void testsyncSendMsq
)
{
//参数一:topic,如果想添加
tag 可以使用
"topic:tag"
的写法
//参数二:消息内容
//参数三: 回调函数,处理返回结果
rocketMQTemp1ate.asyncSend("test-topic-1","这是一条异步消息",new SendCa11back(){
@Override
public void onSuccess(SendResult sendResult){
system.out.print1n(sendResu1t);
}
@
Override
public void onException(Throwable throwable){
System.out.print1n(throwable);
}
});
//让线程不要终止
Thread.sleep(30000000);
}
//单向消息
@Test
public void testoneway() {
rocketMQTemplate.sendOneWay("test-topic-1","这是一条单向消息");
}
2、发送
(1)可靠同步消息
采用 Spring boot 提供的方式来做测试,首先为工程添加两个依赖,一个是 Spring boot 提供的单元测试,一个是 junit。
<dependenay>
<groupId>org.springframework.cloud</groupid>
<artifactId>spring-cloud-starter-openfeign</artifaatid>
</dependency>
<!--sentine1-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</
artifactId></dependency>
<!--rocketmg-->
<dependency>
<groupId>org.apache.rocketmg</groupid>
<artifactId>rocketmq-spring-boot-starter</artifactid>
<version>2.0.2</version>
</dependency>
<dependenay>
<groupId>org.apache.rocketmq</
groupId><artifactId>rocketmg-client</artifactid>
<version>4.4.0</version>
</dependency>
<!--依赖-->
<dependency>
<groupId>org.springfranework.boot</groupid>
<artifactId>spring-boot-starter-test</artifactid>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactid>
</dependency>
<
/dependencies>
编写一个测试类
package com.itheima.test;
import com.itheima.orderApplication;
import orgjunit.runner.RunWith;
import
orq.springframework.boot.test.context.SpringBootTest;
import
org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest(classes =OrderApplication.class) public
class MessageTypeTest
{
@Autowired
private RocketMQTemplate rocketMQTemplate
//同步消息
@Test
public void testsyncsend() {
//第一个地址第二个消息体,可以给超时时间
//参数一:top
//参数二:消息体
//参数三:超时时间
SendResult result =
rocketMQTemplate.syncSend(destination:"test-topic-1",
payload:"这是一条同步消息",timeout:10000);
system.out.println(result);
}
}
运行一下,消息是否发送,能否拿到响应结果:
SendResult [sendstatus=SEND
OK,msgId=C0A82B903E0858644
D46992D7AAA0000,
offsetMsaId=C0A86D8300002A9F000000
00000B3E8F
,
消息能够发送成功第一是能拿到结果,第二是通过控制台能看到主题下面有一条消息。
通过控制台看一下,选择相应的主题:
点击搜索
已经有一个消息
Topic 标签是空的,可以设置标签,在 rocketmq template里面是允许在后边加一个冒号。
//同步消息
@Test
public void testsyncsend() {
//参数一:topic:tag
//参数二:消息体
//参数三:超时时间
SendResult resultm
rocketMQTemplate.syncSend( destination:"test-
topic-1:tag",payload:"这是一条同步消息",timeout:10000);
System.out.println(result);
}
}
运行一下:
SendResult[sendstatus=SEND OK,
msgId=C0A82B90736058644 D46992F1E950000,
offsetMsgId=C0A86D830000
发送完毕,再次点击搜索
第二个消息有了 tag
同步消息是可以拿到结果的,拿到结果以后才会继续往下走,拿到之前阻塞位置阻碍结果传递
2、异步可靠消息
//异步消息
@Test
public void testAsyncsend() throws InterruptedException {
//参数一:topic:tag
//参数二:消息体
//参数三:回调
rocketMQTemplate.asyncSend(destination:"test-topic-1",
payload:"这是一条异步消息",newSendCal1back(){
//成功响应的回调
@Override
public void onsuccess(SendResult result){
System.out.println(result);
}
//异常响应的回调
@Override
public void onException(Throwable throwable){
System.out.println(throwable);
}
});
//直接写是拿不到结果的,一旦方法暂时运行到位置,主方法就结束,因为里边没有东西,主方法一旦结束回调就找不到,进程已经死掉了,所以必须控制不能退出。
//休眠,保证回调时可以运行
System.out.println("
===============
");
Thread.sleep(millis:300000000);
}
}
运行结果,主要关注回调和=谁先打出来:
===============
SendResult [sendstatus=SEND OK,
msgId=C0A82B908AF85864
4D46993435FF0000,
offsetMsgId=C0A86D8300002A9F0000000
0000B438
证明执行完之后不会等结果,结果以回调的形式后来设置过来的
通过控制台看异步消息:
3、单向发送
//单向消息
@Test
public void testoneWay() {
rocketMOTemplate.sendOneWav( destination: "test-
topic-1". pavload:"这是一条
单向
消息")
}
}
点进去发现没有任何返回值
运行一下结果:
看不到任何结果,因为没有任何的输出
可以通过控制台查看,单向消息。