微服务器集成Rocketmq-消息驱动

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: Rocketmq-消息驱动

1、前言

我们都知道,不管是单体项目也好,微服务分布式项目也罢,都逃不脱mq的使用,那么什么是mq?MQ的应用场景有哪些?消息消费有哪些要注意的细节问题?微服务中如何接入MQ?相信大家都会有或多或少的困惑点和不懂点,那这篇文章就带领大家了解Rocketmq-消息驱动。

如果小伙伴们对微服务感兴趣,欢迎订阅微服务专栏:从0-1学习微服务,为了感谢粉丝们的支持,目前限时该专栏限时免费,感谢支持。微服务专栏传送门:
https://blog.csdn.net/weixin_44427181/category_12053421.html?spm=1001.2014.3001.5482

2、MQ简介

2.1 什么是MQ?

在进入MQ讲解之前,什么是MQ?

MQ(Message Queue)是一种跨进程通信机制,用于传递消息。通俗点说,就是一个先进先出的数据结构。

在这里插入图片描述

2.2 MQ的应用场景

2.2.1 异步解耦

异步解耦是消息队列 MQ 的主要特点,主要目的是减少请求响应时间和解耦
主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。

同时,由于使用了消息队列 MQ,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,这叫做解耦合

案例:

最常见的一个场景是用户注册后,需要发送注册邮件和短信通知,以告知用户注册成功。传统的做法如下:

在这里插入图片描述
此架构下注册、邮件、短信三个任务全部完成后,才返回注册结果到客户端,用户才能使用账号登录。
但是对于用户来说,注册功能实际只需要注册系统存储用户的账户信息后,该用户便可以登录,而后续的注册短信和邮件不是即时需要关注的步骤。

所以实际当数据写入注册系统后,注册系统就可以把其他的操作放入对应的消息队列 MQ 中然后马上返 回用户结果,由消息队列 MQ 异步地进行这些操作。架构图如下:

在这里插入图片描述

2.2.2 流量削峰

流量削峰也是消息队列 MQ 的常用场景,一般在秒杀或团队抢购(高并发)活动中使用广泛。

在秒杀或团队抢购活动中,由于用户请求量较大,导致流量暴增,秒杀的应用在处理如此大量的访问流量后,下游的通知系统无法承载海量的调用量,甚至会导致系统崩溃等问题而发生漏通知的情况。为解决这些问题,可在应用和下游通知系统之间加入消息队列 MQ。

在这里插入图片描述
秒杀处理流程如下所述:

  1. 用户发起海量秒杀请求到秒杀业务处理系统。
  2. 秒杀处理系统按照秒杀处理逻辑将满足秒杀条件的请求发送至消息队列 MQ。
  3. 下游的通知系统订阅消息队列 MQ 的秒杀相关消息,再将秒杀成功的消息发送到相应用户。
  4. 用户收到秒杀成功的通知。

2.3 常见的MQ

目前业界有很多MQ产品,比较出名的有下面这些:

ZeroMQ

号称最快的消息队列系统,尤其针对大吞吐量的需求场景。扩展性好,开发比较灵活,采用C语言实现,实际上只是一个socket库的重新封装,如果做为消息队列使用,需要开发大量的代码。ZeroMQ仅 提供非持久性的队列,也就是说如果down机,数据将会丢失。

RabbitMQ

使用erlang语言开发,性能较好,适合于企业级的开发。但是不利于做二次开发和维护。

ActiveMQ

历史悠久的Apache开源项目。已经在很多产品中得到应用,实现了JMS1.1规范,可以和spring-jms轻 松融合,实现了多种协议,支持持久化到数据库,对队列数较多的情况支持不好。

RocketMQ

阿里巴巴的MQ中间件,由java语言开发,性能非常好,能够撑住双十一的大流量,而且使用起来很简单。

Kafka

Kafka是Apache下的一个子项目,是一个高性能跨语言分布式Publish/Subscribe消息队列系统,相对 于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。

3、RocketMQ入门

3.1 RocketMQ环境搭建

接下来我们先在linux平台下安装一个RocketMQ的服务:

3.1.1 下载RocketMQ

http://rocketmq.apache.org/release_notes/release-notes-4.4.0/

环境要求

  • Linux 64位操作系统
  • 64bit JDK 1.8+

3.1.2 安装RocketMQ

1、上传文件到Linux系统:

[root@heima rocketmq]# ls /usr/local/src/
rocketmq-all-4.4.0-bin-release.zip

2、解压到安装目录:

[root@heima src]# unzip rocketmq-all-4.4.0-bin-release.zip
[root@heima src]# mv rocketmq-all-4.4.0-bin-release ../rocketmq

3.1.3 启动RocketMQ

1、切换到安装目录

[root@heima rocketmq]# ls
benchmark bin conf lib LICENSE NOTICE README.md

2、启动NameServer

[root@heima rocketmq]# nohup ./bin/mqnamesrv &
[1] 1467
\# 只要进程不报错,就应该是启动成功了,可以查看一下日志
[root@heima rocketmq]# tail -f /root/logs/rocketmqlogs/namesrv.log

3、启动Broker

\# 编辑bin/runbroker.sh 和 bin/runserver.sh文件,修改里面的
\# JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
\# 为JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m" 
[root@heima rocketmq]# nohup bin/mqbroker -n localhost:9876 & 
[root@heima rocketmq]# tail -f /root/logs/rocketmqlogs/broker.log

3.1.4 测试RocketMQ

1、测试消息发送

[root@heima rocketmq]# export NAMESRV_ADDR=localhost:9876
[root@heima rocketmq]# bin/tools.sh
org.apache.rocketmq.example.quickstart.Producer

2、测试消息接收

[root@heima rocketmq]# export NAMESRV_ADDR=localhost:9876
[root@heima rocketmq]# bin/tools.sh
org.apache.rocketmq.example.quickstart.Consumer

3、关闭RocketMQ

[root@heima rocketmq]# bin/mqshutdown broker
[root@heima rocketmq]# bin/mqshutdown namesrv

3.2 RocketMQ的架构及概念

在这里插入图片描述

如上图所示,整体可以分成4个角色,分别是:NameServerBrokerProducerConsumer

  • Broker(邮递员):Broker是RocketMQ的核心,负责消息的接收,存储,投递等功能。
  • NameServer(邮局):消息队列的协调者,Broker向它注册路由信息,同时Producer和Consumer 向其获取路由信息Producer(寄件人)消息的生产者,需要从NameServer获取Broker信息,然后与 Broker建立连接,向Broker发送消息。
  • Consumer(收件人) :消息的消费者,需要从NameServer获取Broker信息,然后与Broker建立连接,从Broker获取消息
  • Topic(地区):用来区分不同类型的消息,发送和接收消息前都需要先创建Topic,针对Topic来发送和接收消息。Message Queue(邮件)为了提高性能和吞吐量,引入了Message Queue,一个Topic可以设置一个或多个Message Queue,这样消息就可以并行往各个Message Queue发送消息,消费者也可以并行的从多个Message Queue读取消息。
  • Message:Message 是消息的载体。
  • Producer Group:生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。
  • Consumer Group:消费者组,消费同一类消息的多个 consumer 实例组成一个消费者组。

3.3 RocketMQ控制台安装

1、下载

\# 在git上下载下面的工程 rocketmq-console-1.0.0 
https://github.com/apache/rocketmq-externals/releases

2、修改配置文件

\# 修改配置文件 rocketmq-console\src\main\resources\application.properties
server.port=7777 #项目启动后的端口号
 #nameserv的地址,注意防火墙要开启 9876端口
rocketmq.config.namesrvAddr=192.168.109.131:9876

3、打成jar包,并启动

\# 进入控制台项目,将工程打成jar包
mvn clean package -Dmaven.test.skip=true
\# 启动控制台
java -jar target/rocketmq-console-ng-1.0.0.jar

4、访问控制台

在这里插入图片描述

4、案例

接下来我们模拟一种场景: 下单成功之后,向下单用户发送短信。设计图如下:

![
](https://ucc.alicdn.com/images/user-upload-01/f4288431663049daba76d82d3b501e8a.png)

4.1 订单微服务发送消息

1、在 shop-order 中添加rocketmq的依赖

<!--rocketmq-->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.2</version>
</dependency>

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
</dependency>

2、添加配置

rocketmq:
name-server: 192.168.109.131:9876 #rocketMQ服务的地址 producer:
group: shop-order # 生产者组

3、编写测试代码

@RestController
@Slf4j
public class OrderController2 {
    @Autowired
    private OrderService orderService;
    @Autowired
    private ProductService productService;
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    //准备买1件商品 @GetMapping("/order/prod/{pid}")
     public Order order(@PathVariable("pid") Integer pid) {
        log.info(">>客户下单,这时候要调用商品微服务查询商品信息"); //通过fegin调用商品微服务
        Product product = productService.findByPid(pid);
        if (product == null) {
            Order order = new Order();
            order.setPname("下单失败");
            return order;
        }
        log.info(">>商品信息,查询结果:" + JSON.toJSONString(product));
         Order order =     new Order();
         order.setUid(1);
         order.setUsername("测试用户"); 
         order.setPid(product.getPid()); 
         order.setPname(product.getPname());
         order.setPprice(product.getPprice());
         order.setNumber(1);
         orderService.save(order);
//下单成功之后,将消息放到mq中 rocketMQTemplate.convertAndSend("order-topic", order);
        return order;
    }
}

4.2 用户微服务订阅消息

1、修改 shop-user 模块配置

<?xml version="1.0" encoding="UTF-8" ?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springcloud-alibaba</artifactId>
        <groupId>com.itheima</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
     <artifactId>shop-user</artifactId>
    <dependencies>
        <dependency>
            <groupId>com.itheima</groupId>
            <artifactId>shop-common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-
discovery</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>
    </dependencies>
</project>

2、修改主类

@SpringBootApplication
@EnableDiscoveryClient
public class UserApplication {
    public static void main(String[] args) {
        SpringApplication.run(UserApplication.class, args);
} }

3、修改配置文件

server:
port: 8071
spring:
application:
name: service-user
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql:///shop?
serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=true
username: root
password: root
jpa:
properties:
hibernate:
hbm2ddl:
auto: update
dialect: org.hibernate.dialect.MySQL5InnoDBDialect
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
rocketmq:
name-server: 192.168.109.131:9876

4、编写消息接收服务

@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "shop-user", topic = "order-topic") public class SmsService implements RocketMQListener<Order> {
    @Override
    public void onMessage(Order order) { 
        log.info("收到一个订单信息{},接下来发送短信", JSON.toJSONString(order));
    } 
}

5、启动服务,执行下单操作,观看后台输出。

5、事务消息

RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致。

事务消息交互流程:

在这里插入图片描述

事务消息发送步骤:

  1. 发送方将半事务消息发送至RocketMQ服务端。
  2. RocketMQ服务端将消息持久化之后,向发送方返回Ack确认消息已经发送成功,此时消息为半事务消息
  3. 发送方开始执行本地事务逻辑。
  4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到

Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,订阅方将不会接受该消息。

事务消息回查步骤:

  1. 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时 间后服务端将对该消息发起消息回查。
  2. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  3. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。

实体类:

//事物日志
@Entity(name = "shop_txlog")
@Data
public class TxLog {
    @Id
    private String txLogId;
    private String content;
    private Date date;
}

Service类:

@Service
public class OrderServiceImpl4 {
    @Autowired
    private OrderDao orderDao;
    @Autowired
    private TxLogDao txLogDao;
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    public void createOrderBefore(Order order) {
        String txId = UUID.randomUUID().toString();
             //发送半事务消息
        rocketMQTemplate.sendMessageInTransaction( "tx_producer_group","tx_topic",
        MessageBuilder.withPayload(order).setHeader("txId",txId).build(), order
        );
     }
     //本地事务
    @Transactional
    public void createOrder(String txId, Order order) {
         //本地事物代码 
         orderDao.save(order); 
         //记录日志到数据库,回查使用 
         TxLog txLog = new TxLog(); 
         txLog.setTxLogId(txId);
         txLog.setContent("事物测试"); 
         txLog.setDate(new Date()); 
         txLogDao.save(txLog);
         } 
}

事务监听器

@RocketMQTransactionListener(txProducerGroup = "tx_producer_group")
public class OrderServiceImpl4Listener implements RocketMQLocalTransactionListener {
    @Autowired
    private TxLogDao txLogDao;
    @Autowired
    private OrderServiceImpl4 orderServiceImpl4;
    
    //执行本地事物
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try { //本地事物
            orderServiceImpl4.createOrder((String) msg.getHeaders().get("txId"),(Order) arg);
            return RocketMQLocalTransactionState.COMMIT;
        }
        catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
}

//消息回查 
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { //查询日志记录
        TxLog txLog = txLogDao.findById((String)
        msg.getHeaders().get("txId")).get();
        if (txLog == null) {
            return RocketMQLocalTransactionState.COMMIT;
        } else {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
}

6、总结

本篇文章介绍了Rocketmq的概念,如何搭建Rocketmq,以及微服务如何集成Rocketmq。

如果小伙伴们对微服务感兴趣,欢迎订阅微服务专栏:从0-1学习微服务,为了感谢粉丝们的支持,目前限时该专栏限时免费,感谢支持。微服务专栏传送门:
https://blog.csdn.net/weixin_44427181/category_12053421.html?spm=1001.2014.3001.5482

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
NoSQL Java Redis
小白版的springboot中集成mqtt服务(超级无敌详细),实现不了掐我头!!!
小白版的springboot中集成mqtt服务(超级无敌详细),实现不了掐我头!!!
293 1
|
4月前
|
消息中间件
SpringCloud Stream集成RabbitMQ
SpringCloud Stream集成RabbitMQ
60 0
|
5月前
|
Java API 网络架构
关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码3
关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码
173 0
关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码3
|
4月前
|
Java Nacos Docker
在集成nacos时,端口9848报错但服务器的这个端口是开放的
在集成nacos时,端口9848报错但服务器的这个端口是开放的【1月更文挑战第14天】【1月更文挑战第67篇】
257 1
|
5月前
|
Java 数据安全/隐私保护
Neo4j【付诸实践 01】SpringBoot集成报错org.neo4j.driver.exceptions.ClientException:服务器不支持此驱动程序支持的任何协议版本(解决+源代码)
Neo4j【付诸实践 01】SpringBoot集成报错org.neo4j.driver.exceptions.ClientException:服务器不支持此驱动程序支持的任何协议版本(解决+源代码)
92 1
|
3月前
|
Linux 数据安全/隐私保护 Docker
在云服务器上搭建集成开发环境
在云服务器上搭建集成开发环境
|
23天前
|
弹性计算 前端开发 Java
使用阿里云 mqtt serverless 版本超低成本快速实现 webscoket 长链接服务器
使用阿里云 MQTT Serverless 可轻松实现弹性伸缩的 WebSocket 服务,每日成本低至几元。适用于小程序消息推送的 MQTT P2P 模式。前端需注意安全,避免 AK 泄露,采用一机一密方案。后端通过调用 `RegisterDeviceCredential` API 发送消息。示例代码包括 JavaScript 前端连接和 Java 后端发送。
234 0
|
2月前
|
jenkins Java 持续交付
Docker Swarm总结+Jenkins安装配置与集成snarqube和目标服务器(4/5)
Docker Swarm总结+Jenkins安装配置与集成snarqube和目标服务器(4/5)
45 0
|
3月前
|
消息中间件 存储 物联网
|
3月前
|
JSON 物联网 开发工具
MQTT协议问题之如何搭建物联网空调的服务器
MQTT协议是一个轻量级的消息传输协议,设计用于物联网(IoT)环境中设备间的通信;本合集将详细阐述MQTT协议的基本原理、特性以及各种实际应用场景,供用户学习和参考。
79 1