Rocketmq-消息驱动

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Rocketmq-消息驱动

1、前言

我们都知道,不管是单体项目也好,微服务分布式项目也罢,都逃不脱mq的使用,那么什么是mq?MQ的应用场景有哪些?消息消费有哪些要注意的细节问题?微服务中如何接入MQ?相信大家都会有或多或少的困惑点和不懂点,那这篇文章就带领大家了解Rocketmq-消息驱动。

如果小伙伴们对微服务感兴趣,欢迎订阅微服务专栏:从0-1学习微服务,为了感谢粉丝们的支持,目前限时该专栏限时免费,感谢支持。微服务专栏传送门:
https://blog.csdn.net/weixin_44427181/category_12053421.html?spm=1001.2014.3001.5482

2、MQ简介

2.1 什么是MQ?

在进入MQ讲解之前,什么是MQ?

MQ(Message Queue)是一种跨进程通信机制,用于传递消息。通俗点说,就是一个先进先出的数据结构。

在这里插入图片描述

2.2 MQ的应用场景

2.2.1 异步解耦

异步解耦是消息队列 MQ 的主要特点,主要目的是减少请求响应时间和解耦
主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。

同时,由于使用了消息队列 MQ,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,这叫做解耦合

案例:

最常见的一个场景是用户注册后,需要发送注册邮件和短信通知,以告知用户注册成功。传统的做法如下:

在这里插入图片描述
此架构下注册、邮件、短信三个任务全部完成后,才返回注册结果到客户端,用户才能使用账号登录。
但是对于用户来说,注册功能实际只需要注册系统存储用户的账户信息后,该用户便可以登录,而后续的注册短信和邮件不是即时需要关注的步骤。

所以实际当数据写入注册系统后,注册系统就可以把其他的操作放入对应的消息队列 MQ 中然后马上返 回用户结果,由消息队列 MQ 异步地进行这些操作。架构图如下:

在这里插入图片描述

2.2.2 流量削峰

流量削峰也是消息队列 MQ 的常用场景,一般在秒杀或团队抢购(高并发)活动中使用广泛。

在秒杀或团队抢购活动中,由于用户请求量较大,导致流量暴增,秒杀的应用在处理如此大量的访问流量后,下游的通知系统无法承载海量的调用量,甚至会导致系统崩溃等问题而发生漏通知的情况。为解决这些问题,可在应用和下游通知系统之间加入消息队列 MQ。

在这里插入图片描述
秒杀处理流程如下所述:

  1. 用户发起海量秒杀请求到秒杀业务处理系统。
  2. 秒杀处理系统按照秒杀处理逻辑将满足秒杀条件的请求发送至消息队列 MQ。
  3. 下游的通知系统订阅消息队列 MQ 的秒杀相关消息,再将秒杀成功的消息发送到相应用户。
  4. 用户收到秒杀成功的通知。

2.3 常见的MQ

目前业界有很多MQ产品,比较出名的有下面这些:

ZeroMQ

号称最快的消息队列系统,尤其针对大吞吐量的需求场景。扩展性好,开发比较灵活,采用C语言实现,实际上只是一个socket库的重新封装,如果做为消息队列使用,需要开发大量的代码。ZeroMQ仅 提供非持久性的队列,也就是说如果down机,数据将会丢失。

RabbitMQ

使用erlang语言开发,性能较好,适合于企业级的开发。但是不利于做二次开发和维护。

ActiveMQ

历史悠久的Apache开源项目。已经在很多产品中得到应用,实现了JMS1.1规范,可以和spring-jms轻 松融合,实现了多种协议,支持持久化到数据库,对队列数较多的情况支持不好。

RocketMQ

阿里巴巴的MQ中间件,由java语言开发,性能非常好,能够撑住双十一的大流量,而且使用起来很简单。

Kafka

Kafka是Apache下的一个子项目,是一个高性能跨语言分布式Publish/Subscribe消息队列系统,相对 于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。

3、RocketMQ入门

3.1 RocketMQ环境搭建

接下来我们先在linux平台下安装一个RocketMQ的服务:

3.1.1 下载RocketMQ

http://rocketmq.apache.org/release_notes/release-notes-4.4.0/

环境要求

  • Linux 64位操作系统
  • 64bit JDK 1.8+

3.1.2 安装RocketMQ

1、上传文件到Linux系统:

[root@heima rocketmq]# ls /usr/local/src/
rocketmq-all-4.4.0-bin-release.zip

2、解压到安装目录:

[root@heima src]# unzip rocketmq-all-4.4.0-bin-release.zip
[root@heima src]# mv rocketmq-all-4.4.0-bin-release ../rocketmq

3.1.3 启动RocketMQ

1、切换到安装目录

[root@heima rocketmq]# ls
benchmark bin conf lib LICENSE NOTICE README.md

2、启动NameServer

[root@heima rocketmq]# nohup ./bin/mqnamesrv &
[1] 1467
\# 只要进程不报错,就应该是启动成功了,可以查看一下日志
[root@heima rocketmq]# tail -f /root/logs/rocketmqlogs/namesrv.log

3、启动Broker

\# 编辑bin/runbroker.sh 和 bin/runserver.sh文件,修改里面的
\# JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
\# 为JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m" 
[root@heima rocketmq]# nohup bin/mqbroker -n localhost:9876 & 
[root@heima rocketmq]# tail -f /root/logs/rocketmqlogs/broker.log

3.1.4 测试RocketMQ

1、测试消息发送

[root@heima rocketmq]# export NAMESRV_ADDR=localhost:9876
[root@heima rocketmq]# bin/tools.sh
org.apache.rocketmq.example.quickstart.Producer

2、测试消息接收

[root@heima rocketmq]# export NAMESRV_ADDR=localhost:9876
[root@heima rocketmq]# bin/tools.sh
org.apache.rocketmq.example.quickstart.Consumer

3、关闭RocketMQ

[root@heima rocketmq]# bin/mqshutdown broker
[root@heima rocketmq]# bin/mqshutdown namesrv

3.2 RocketMQ的架构及概念

在这里插入图片描述

如上图所示,整体可以分成4个角色,分别是:NameServerBrokerProducerConsumer

  • Broker(邮递员):Broker是RocketMQ的核心,负责消息的接收,存储,投递等功能。
  • NameServer(邮局):消息队列的协调者,Broker向它注册路由信息,同时Producer和Consumer 向其获取路由信息Producer(寄件人)消息的生产者,需要从NameServer获取Broker信息,然后与 Broker建立连接,向Broker发送消息。
  • Consumer(收件人) :消息的消费者,需要从NameServer获取Broker信息,然后与Broker建立连接,从Broker获取消息
  • Topic(地区):用来区分不同类型的消息,发送和接收消息前都需要先创建Topic,针对Topic来发送和接收消息。Message Queue(邮件)为了提高性能和吞吐量,引入了Message Queue,一个Topic可以设置一个或多个Message Queue,这样消息就可以并行往各个Message Queue发送消息,消费者也可以并行的从多个Message Queue读取消息。
  • Message:Message 是消息的载体。
  • Producer Group:生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。
  • Consumer Group:消费者组,消费同一类消息的多个 consumer 实例组成一个消费者组。

3.3 RocketMQ控制台安装

1、下载

\# 在git上下载下面的工程 rocketmq-console-1.0.0 
https://github.com/apache/rocketmq-externals/releases

2、修改配置文件

\# 修改配置文件 rocketmq-console\src\main\resources\application.properties
server.port=7777 #项目启动后的端口号
 #nameserv的地址,注意防火墙要开启 9876端口
rocketmq.config.namesrvAddr=192.168.109.131:9876

3、打成jar包,并启动

\# 进入控制台项目,将工程打成jar包
mvn clean package -Dmaven.test.skip=true
\# 启动控制台
java -jar target/rocketmq-console-ng-1.0.0.jar

4、访问控制台

在这里插入图片描述

4、案例

接下来我们模拟一种场景: 下单成功之后,向下单用户发送短信。设计图如下:

![
](https://ucc.alicdn.com/images/user-upload-01/f4288431663049daba76d82d3b501e8a.png)

4.1 订单微服务发送消息

1、在 shop-order 中添加rocketmq的依赖

<!--rocketmq-->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.2</version>
</dependency>

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
</dependency>

2、添加配置

rocketmq:
name-server: 192.168.109.131:9876 #rocketMQ服务的地址 producer:
group: shop-order # 生产者组

3、编写测试代码

@RestController
@Slf4j
public class OrderController2 {
    @Autowired
    private OrderService orderService;
    @Autowired
    private ProductService productService;
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    //准备买1件商品 @GetMapping("/order/prod/{pid}")
     public Order order(@PathVariable("pid") Integer pid) {
        log.info(">>客户下单,这时候要调用商品微服务查询商品信息"); //通过fegin调用商品微服务
        Product product = productService.findByPid(pid);
        if (product == null) {
            Order order = new Order();
            order.setPname("下单失败");
            return order;
        }
        log.info(">>商品信息,查询结果:" + JSON.toJSONString(product));
         Order order =     new Order();
         order.setUid(1);
         order.setUsername("测试用户"); 
         order.setPid(product.getPid()); 
         order.setPname(product.getPname());
         order.setPprice(product.getPprice());
         order.setNumber(1);
         orderService.save(order);
//下单成功之后,将消息放到mq中 rocketMQTemplate.convertAndSend("order-topic", order);
        return order;
    }
}

4.2 用户微服务订阅消息

1、修改 shop-user 模块配置

<?xml version="1.0" encoding="UTF-8" ?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springcloud-alibaba</artifactId>
        <groupId>com.itheima</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
     <artifactId>shop-user</artifactId>
    <dependencies>
        <dependency>
            <groupId>com.itheima</groupId>
            <artifactId>shop-common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-
discovery</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>
    </dependencies>
</project>

2、修改主类

@SpringBootApplication
@EnableDiscoveryClient
public class UserApplication {
    public static void main(String[] args) {
        SpringApplication.run(UserApplication.class, args);
} }

3、修改配置文件

server:
port: 8071
spring:
application:
name: service-user
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql:///shop?
serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=true
username: root
password: root
jpa:
properties:
hibernate:
hbm2ddl:
auto: update
dialect: org.hibernate.dialect.MySQL5InnoDBDialect
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
rocketmq:
name-server: 192.168.109.131:9876

4、编写消息接收服务

@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "shop-user", topic = "order-topic") public class SmsService implements RocketMQListener<Order> {
    @Override
    public void onMessage(Order order) { 
        log.info("收到一个订单信息{},接下来发送短信", JSON.toJSONString(order));
    } 
}

5、启动服务,执行下单操作,观看后台输出。

5、事务消息

RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致。

事务消息交互流程:

在这里插入图片描述

事务消息发送步骤:

  1. 发送方将半事务消息发送至RocketMQ服务端。
  2. RocketMQ服务端将消息持久化之后,向发送方返回Ack确认消息已经发送成功,此时消息为半事务消息
  3. 发送方开始执行本地事务逻辑。
  4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到

Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,订阅方将不会接受该消息。

事务消息回查步骤:

  1. 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时 间后服务端将对该消息发起消息回查。
  2. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  3. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。

实体类:

//事物日志
@Entity(name = "shop_txlog")
@Data
public class TxLog {
    @Id
    private String txLogId;
    private String content;
    private Date date;
}

Service类:

@Service
public class OrderServiceImpl4 {
    @Autowired
    private OrderDao orderDao;
    @Autowired
    private TxLogDao txLogDao;
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    public void createOrderBefore(Order order) {
        String txId = UUID.randomUUID().toString();
             //发送半事务消息
        rocketMQTemplate.sendMessageInTransaction( "tx_producer_group","tx_topic",
        MessageBuilder.withPayload(order).setHeader("txId",txId).build(), order
        );
     }
     //本地事务
    @Transactional
    public void createOrder(String txId, Order order) {
         //本地事物代码 
         orderDao.save(order); 
         //记录日志到数据库,回查使用 
         TxLog txLog = new TxLog(); 
         txLog.setTxLogId(txId);
         txLog.setContent("事物测试"); 
         txLog.setDate(new Date()); 
         txLogDao.save(txLog);
         } 
}

事务监听器

@RocketMQTransactionListener(txProducerGroup = "tx_producer_group")
public class OrderServiceImpl4Listener implements RocketMQLocalTransactionListener {
    @Autowired
    private TxLogDao txLogDao;
    @Autowired
    private OrderServiceImpl4 orderServiceImpl4;
    
    //执行本地事物
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try { //本地事物
            orderServiceImpl4.createOrder((String) msg.getHeaders().get("txId"),(Order) arg);
            return RocketMQLocalTransactionState.COMMIT;
        }
        catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
}

//消息回查 
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { //查询日志记录
        TxLog txLog = txLogDao.findById((String)
        msg.getHeaders().get("txId")).get();
        if (txLog == null) {
            return RocketMQLocalTransactionState.COMMIT;
        } else {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
}

6、总结

本篇文章介绍了Rocketmq的概念,如何搭建Rocketmq,以及微服务如何集成Rocketmq。

如果小伙伴们对微服务感兴趣,欢迎订阅微服务专栏:从0-1学习微服务,为了感谢粉丝们的支持,目前限时该专栏限时免费,感谢支持。微服务专栏传送门:
https://blog.csdn.net/weixin_44427181/category_12053421.html?spm=1001.2014.3001.5482

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 存储 Java
|
消息中间件 Kubernetes Cloud Native
基于 RocketMQ+Knative 驱动云原生Serverless 应用
想必大家都比较了解 RocketMQ 消息服务,那么RocketMQ 与 Serverless 结合会碰撞怎样的火花呢?那我们今天介绍一下如何基于 RocketMQ + Knative 驱动云原生 Serverless 应用 。
3729 0
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
676 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
|
消息中间件 uml RocketMQ
3 张图带你彻底理解 RocketMQ 事务消息
3 张图带你彻底理解 RocketMQ 事务消息
67640 2
3 张图带你彻底理解 RocketMQ 事务消息
|
消息中间件 NoSQL 关系型数据库
实战:如何防止mq消费方消息重复消费、rocketmq理论概述、rocketmq组成、普通消息的发送
实战:如何防止mq消费方消息重复消费 如果因为网络延迟等原因,mq无法及时接收到消费方的应答,导致mq重试。(计算机网络)。在重试过程中造成重复消费的问题
2564 1
实战:如何防止mq消费方消息重复消费、rocketmq理论概述、rocketmq组成、普通消息的发送
|
消息中间件 Java uml
5张图带你理解 RocketMQ 顺序消息实现机制
5张图带你理解 RocketMQ 顺序消息实现机制
491 1
5张图带你理解 RocketMQ 顺序消息实现机制
|
消息中间件 缓存 算法
阿里二面:RocketMQ 消息积压了,增加消费者有用吗?
阿里二面:RocketMQ 消息积压了,增加消费者有用吗?
222 0
阿里二面:RocketMQ 消息积压了,增加消费者有用吗?
|
消息中间件 缓存 数据库
4 张图,9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息
4 张图,9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息
392 0
4 张图,9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息
|
消息中间件 存储 负载均衡
【消息中间件】默认的RocketMQ消息消费者是如何启动的?(下)
在当下的分布式服务中,消息队列中间件是一个解决服务之间耦合的利器,今天我们来瞧一瞧开源的RocketMQ消息中间件,他的消费端是如何启动的,以及在使用他的过程中有哪些配置。
|
消息中间件 负载均衡 算法
【消息中间件】默认的RocketMQ消息消费者是如何启动的?(上)
在当下的分布式服务中,消息队列中间件是一个解决服务之间耦合的利器,今天我们来瞧一瞧开源的RocketMQ消息中间件,他的消费端是如何启动的,以及在使用他的过程中有哪些配置。