【SpringCloud-Alibaba系列教程】15.RocketMQ消息

简介: RocketMQ消息类型介绍以及使用。

1.普通消息

普通消息是指消息队列RocketMQ版中无特性的消息,区别于有特性的定时和延时消息、顺序消息和事务消息。

同步发送

原理
同步发送是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式。sync
4c394b65caa0bbe99e440ae48eb73210_p71666.png
应用场景
此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。

@Autowired
    private RocketMQTemplate rocketMQTemplate;
    //同步消息
    @Test
    public void testSyncSend(){
        //参数一: topic 添加tag 可以使用topic:tag
        //参数二: 消息内容
        SendResult sendResult = rocketMQTemplate.syncSend("test-topic-1:tag","这是一条同步消息");
        System.out.println(sendResult);
    }

异步发送

原理
异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。消息队列RocketMQ版的异步发送,需要您实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。
de0922094498fc02923a253b06dd5279_p71667.png
应用场景
异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如,您视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

//异步消息
    @Test
    public void testAyncSend() throws InterruptedException{
            //参数一: topic 添加tag 可以使用topic:tag
            //参数二: 消息内容
            //参数三: 回调结果,处理返回结果
         rocketMQTemplate.asyncSend("test-topic-1:tag", "这是一条异步消息", new SendCallback() {
             @Override
             public void onSuccess(SendResult sendResult) {
                 System.out.println(sendResult);
             }

             @Override
             public void onException(Throwable throwable) {
                     System.out.println(throwable);
             }
         });
         System.out.println("=========================");
            Thread.sleep(30000000000L);
    }

单向发送

原理
发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
094ee027adcc31e9980922e5aac9a1c7_p71668.png
应用场景
适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

//单向消息
    @Test
    public void testOneWaySend(){
        //参数一: topic 添加tag 可以使用topic:tag
        //参数二: 消息内容
        rocketMQTemplate.sendOneWay("test-topic-1:tag","这是一条单向消息");
    }

2.顺序消息

顺序消息分为两类:
全局顺序:对于指定的一个Topic,所有消息按照严格的先入先出FIFO(First In First Out)的顺序进行发布和消费。
具体就是在后面添加一个Orderly

//单向顺序消息
    @Test
    public void sendOneWayOrderly(){
        //参数一: topic 添加tag 可以使用topic:tag
        //参数二: 消息内容
        //参数三: 发送到哪个队列
        rocketMQTemplate.sendOneWayOrderly("test-topic-1:tag","这是一条单向消息","xxx");
    }
    //同步顺序消息
    @Test
    public void testSyncSendOrderly(){
        //参数一: topic 添加tag 可以使用topic:tag
        //参数二: 消息内容
        //参数三: 发送到哪个队列
        rocketMQTemplate.syncSendOrderly("test-topic-1:tag","这是一条单向消息","xxx");
    }
    //异步顺序消息
    @Test
    public void testAyncSendOrderly() throws InterruptedException {
        //参数一: topic 添加tag 可以使用topic:tag
        //参数二: 消息内容
        //参数三: 回调结果,处理返回结果
        rocketMQTemplate.asyncSendOrderly("test-topic-1:tag", 10000, "这是一条异步消息", new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.println(sendResult);
                    }

                    @Override
                    public void onException(Throwable throwable) {
                        System.out.println(throwable);
                    }
                });
        System.out.println("=========================");
        Thread.sleep(30000000000L);
    }

# 3.事务消息
通过消息队列RocketMQ版事务消息,能达到分布式事务的最终一致。

交互流程

事务消息交互流程如下图所示。
05b5136dc0287f3e62b52ac6f61b94fd_p69402.png
首先在我们之前代码中添加相关Service类

![image.png](https://ucc.alicdn.com/pic/developer-ecology/8b95e6e12d5542a8a22bd482a7d16724.png)
@Service
@Slf4j
public class OrderServiceImplRocketMQ  {
    @Autowired
    private OrderDao orderDao;
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    @Autowired
    private TxLogDao txLogDao;
    public void createOrderBefore(Order order) {
        String txId= UUID.randomUUID().toString();
        rocketMQTemplate.sendMessageInTransaction("tx_producer_group", "tx_topic", MessageBuilder.withPayload(order).setHeader("txId",txId).build(), order);

    }
    @Transactional
    public void createOrder(String txId,Order order) {
        orderDao.save(order);
        TxLog txLog = new TxLog();
        txLog.setTxId(txId);
        txLog.setDate(new Date());
        //记录事务日志
        txLogDao.save(txLog);
    }
}

两个概念:
半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了RocketMQ服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成"暂不能投递"状态,处于该种状态下的消息即半事务消息。
消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,RocketMQ服务端通过扫描发现某条消息长期处于"半事务消息"时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback),该询问过程即消息回查。
编写Listenter
image.png
这样实现了事务。
后期会在这个项目上不断添加,喜欢的请点个start~
项目源码参考一下分支220311_xgc_rocketMQ
Gitee:https://gitee.com/coderxgc/springcloud-alibaba
GitHub:https://github.com/coderxgc/springcloud-alibaba

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
6月前
|
消息中间件 Java RocketMQ
RocketMQ实战教程之RocketMQ安装
这是一篇关于RocketMQ安装的实战教程,主要介绍了在CentOS系统上使用传统安装和Docker两种方式安装RocketMQ。首先,系统需要是64位,并且已经安装了JDK 1.8。传统安装包括下载安装包,解压并启动NameServer和Broker。Docker安装则涉及安装docker和docker-compose,然后通过docker-compose.yaml文件配置并启动服务。教程还提供了启动命令和解决问题的提示。
|
1月前
|
消息中间件 存储 JSON
rabbitmq基础教程(ui,java,springamqp)
本文提供了RabbitMQ的基础教程,包括如何使用UI创建队列和交换机、Java代码操作RabbitMQ、Spring AMQP进行消息发送和接收,以及如何使用不同的交换机类型(fanout、direct、topic)进行消息路由。
22 0
rabbitmq基础教程(ui,java,springamqp)
|
6月前
|
消息中间件 前端开发 数据库
RocketMQ实战教程之MQ简介与应用场景
RocketMQ实战教程介绍了MQ的基本概念和应用场景。MQ(消息队列)是生产者和消费者模型,用于异步传输数据,实现系统解耦。消息中间件在生产者发送消息和消费者接收消息之间起到邮箱作用,简化通信。主要应用场景包括:1)应用解耦,如订单系统与库存系统的非直接交互;2)异步处理,如用户注册后的邮件和短信发送延迟处理,提高响应速度;3)流量削峰,如秒杀活动限制并发流量,防止系统崩溃。
|
2月前
|
消息中间件 存储 Java
SpringCloud基础4——RabbitMQ和SpringAMQP
消息队列MQ、RabbitMQ、SpringAMQP高级消息队列协议、发布/订阅模型、fanout、direct、topic模式
SpringCloud基础4——RabbitMQ和SpringAMQP
|
3月前
|
网络协议 物联网 测试技术
App Inventor 2 MQTT拓展入门(保姆级教程)
本文演示的是App和一个测试客户端进行消息交互的案例,实际应用中,我们的测试客户端可以看着是任意的、支持MQTT协议的硬件,通过订阅及发布消息,联网硬件与我们的App进行双向数据通信,以实现万物互联的智能控制效果。
196 2
|
3月前
|
消息中间件 Java RocketMQ
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
65 0
|
3月前
|
消息中间件 监控 Ubuntu
RabbitMQ安装配置,超详细版教程
以上步骤为您提供了在Linux环境下安装RabbitMQ的详细过程。安装Erlang作为基础,然后通过添加官方源并安装RabbitMQ本身,最后对服务进行配置并启用Web管理界面。这些步骤操作简单直观,只需要跟随上述指南,即可在短时间内将RabbitMQ服务器运行起来,并进行进一步的配置和管理。不要忘记硬件和网络资源对性能的影响,确保RabbitMQ能够满足您的应用需求。
246 0
|
6月前
|
消息中间件 存储 Apache
RocketMQ实战教程之常见概念和模型
Apache RocketMQ 实战教程介绍了其核心概念和模型。消息是基本的数据传输单元,主题是消息的分类容器,支持字节、数字和短划线命名,最长64个字符。消息类型包括普通、顺序、事务和定时/延时消息。消息队列是实际存储和传输消息的容器,是主题的分区。消费者分组是一组行为一致的消费者的逻辑集合,也有命名限制。此外,文档还提到了一些使用约束和建议,如主题和消费者组名的命名规则,消息大小限制,请求超时时间等。RocketMQ 提供了多种消息模型,包括发布/订阅模型,有助于理解和优化消息处理。
|
6月前
|
消息中间件 存储 Java
RocketMQ实战教程之NameServer与BrokerServer
这是一个关于RocketMQ实战教程的概要,主要讨论NameServer和BrokerServer的角色。NameServer负责管理所有BrokerServer,而BrokerServer存储和传输消息。生产者和消费者通过NameServer找到合适的Broker进行交互,不需要直接知道Broker的具体信息。工作流程包括生产者向NameServer查询后发送消息到Broker,以及消费者同样通过NameServer获取消息进行消费。这种设计类似于服务注册中心的概念,便于系统扩展和集群管理。
|
5月前
|
JSON Java Spring
实战SpringCloud响应式微服务系列教程(第八章)构建响应式RESTful服务
实战SpringCloud响应式微服务系列教程(第八章)构建响应式RESTful服务