消息类型-事务消息|学习笔记

简介: 快速学习消息类型-事务消息

开发者学堂课程【全面讲解 Spring Cloud Alibaba 技术栈(知识精讲+项目实战)第四阶段消息类型-事务消息】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址https://developer.aliyun.com/learning/course/686/detail/11911


消息类型-事务消息


事务消息

RocketMQ 提供了事务消息,通过事务消息就能达到分布式事务的最终一致。

  1. 事务消息交互流程

image.png

首先关注三个地方,第一个消息发送方,第二个服务端,第三个消息订阅方,三块内容别对应到前面学习的消息生产者,RocketMQ 服务器以及消息消费者。

第一步是由消息发送方向服务端发送一个消息,消息发送方将半消息发送给服务端以后,服务端会返给发送方一个半事务消息发送成功的标识,接下来发送方会去执行本地事务,本地事务有两种执行结果,要么成功,要么失败,会根据成功或者失败提交一个Commit 或者 Rollback 的标识到服务端,标识是对于半消息的二次确认。接下来服务端会根据 Commit 或者 Rollback 执行仓的操作,如果接触到了commit 意味本地事务执行成功,接下来会将消息投递给消息订阅。如果接到的是 Rollback ,意味着本地事务失败,不会将消息在消息订阅方投递,而是暂时存储三天以后就删除掉了,这是一个流程。

接下来是另外一个流程浅黄色的线。当服务端在接收到半消息以后,一直收不到二次确认时,就会执行消息的回查,消息发送包会去检查本地事务是什么状态,然后根据本地事务的状态去执行 Commit 或者 Rollback 提交到服务端,接下来服务端根据 Commit 再来重复提交。

两个步骤要么投递,要么不投递,这就是事物消息的交互流程。

2、两个概念

(1)半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了 RocketMQ 服务端,但是服务端未收到生产者对该消息的二次确认(4流程),此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息,

(2)消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失(1过来,4过不来),RocketMQ 服务端通扫描发现某条消息长期外干“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查。

写代码:

首先写代码是第一部分发送事务消息。新开一个 service 单独来做测试,找到 order 然后 service-impl。

image.png

package com.itheima.service.impl;

Import...

@Service

public class orderserviceImp14{

@Autowired

private OrderDao orderDao;

@Autowired

private RocketMOTemplate rocketMOTemplate;

public void createorderBefore(Order order){

//要求传递四个参数,第一个事物生产者组的名字(txProducerGroup),第二个主题(destination),第三个消息(message),第四个参数(arg)

//发送半事务消息

rocketMQTemplate.sendMessageInTransaction

txProducerGroup: "tx producer group",

destination:"tx topic",

MessageBuilder.withPayload(order).build(),

order

);

}

}

半事务消息发送成功以后,要执行后面的步骤第二个是由服务端调我们,所以这一步没有关系,要执行本地事务第三步是提前准备好一个方法,当第二步发送成功,回调回来以后就去执行它。

单独开一个类

image.png

从步骤一创建一个本地事务,再写一个方法:

@Transactional

public void createorder(Order order) {

orderDao.save(order);

调用本地事务之后,可以把 server 调进来:

package com.itheima.service.impl;

import ...

@service

publia class orderserviceImpl4Listener implements

RocketMQ LocalTransactionListener {

@Autowired

private orderServiceImp14 orderServiceImp14;

//执行本地事物

@Override

public RocketMQLocalTransactionState executeLocalTransa

ction( Message msg, Object arg){

//订单可以从上面进行接收,有 massage、object,message对应第一步

MessageBuilder.withPayload(order).build(),,arg 对应 order

//try catch 代表成功或者失败

try {

//本地事物

Order order=(order) arg;

orderServiceImp14.createOrder(order);

return RocketMolocalTransactionstate.COMMIT;

} catch (Exception e) {

return RocketMQLocalTransactionstate.ROLLBACK

}

}

消息回查如何知道消息到底是否成功或者失败,要加入一张表来记录是否成功,这张表一般叫 tx log 也是事物的日志表。

先创建一个实体类:

image.png

image.png

package com.itheima.domain;

import lombok.Data;

import javax.persistence.Entity;

import javax.persistence.Id;

import java.util.Date;

//消息事物状态记录

@Entity(name = "shop txlog")

@Data

public class TxLog {

@Id

private string txid; private Date date;

}

首先准备一下 though ,找到 order 位置,Dao 拷贝了 Tx logDao

image.png

package com.itheima.dao

import ...

public interface TxLogDao extends JpaRepository

string> {

事物的成功与失败是根据本地事务来算的,本地事务写在了单独的方法

private orderDao orderDao;

@Autowired

private TxLogDao txLogDao;

@Autowired

private RocketMQTemplate rocketMOTemplate;

public void createorderBefore(Order order){

String txId=UUID.randomUUID().toString();

//发送半事务消息

rocketMOTemplate.sendMessageInTransaction(

txProducerGroup:"txproducer group",

destination:"tx topic",

MessageBuilder.withPayload(order).setHeader( headerName: "txid",txid).build(),

//txid 可以通过 msg 接收

order

;}

}

//如果执行成功就成功,失败就失败

@Transactional

publia void createorder(Order order){

orderDao.save(order);

TxLog txLog=newTxLog();

txLog.setTxId);

txLog.setDate(newDate());

txLogDao.save(txLog)

}

txid 可以通过 msg 接收:

private OrderServiceImp14 orderServiceImp14;

//执行本地事物

@Override

public RocketMQLocalTransactionState

executeLocalTransacti on(Message msg, Object arg){

String txId=(String)msggetHeaders().get("txId");

try {

//本地事物

order order=(order)arg;

orderserviceImp14.createorder(txIdorder);

return RocketMolocalTransactionState.COMMTT:

}catch(Exception e){

return RocketMQLocalTransactionStateROLLBACK}

}

}

//消息回查

@Override

public RocketMOLocalTransactionState

checkLocalTransaction (Message msg){

return null;

}

}

OrderServicelmp14.Java 可以接 ID:

@Transactional

public void createorder(string txid, Order order){

//保存订单

orderDao.save(order);

TxLog txLog=new TxLog();

txLog.setTxId(txId);

txLog.setDate(new Date(());

//记录事务日志

txLogDao.save(txLog);

}

}

因为是用本地事务控制,两个一定是同时成功同时失败的,对于消息回答的时候可以去数据库表里查如果能查到说明肯定是保存成功,如果查不到肯定是保存失败

写一下回查:

Import...

@service

@RocketMQTransactionListener(txProducerGroup =

"tx_produ cer group")

public class orderserviceImpl4Listener implements

RocketMQ LocalTransactionListener {

@Autowired

private OrderServiceImp14 orderServiceImp14;

@Autowired

private TxLogDao txlogDao;

//执行本地事物

@override

public RocketMQLocalTransactionState

executeLocalTransacti on(Message msg, Object arq)(

String txId=(String)msggetHeaders().get("txId");

try {

//本地事物

Order order=(Order)arg;

orderServiceImp14.createOrder(txIdorder);

return RocketMolocalTransactionState.COMMIT:}catch

(Exce ptione){

return RocketMolocalTransactionStateROLLBACK;

}

}

//消息回查

@Override

public RocketMQLocalTransactionstate

checklocalTransaction (Message msg)

string txId=(String)msg.getHeaders().get("txid");

TxLog txLog=txLogDao.findById(txId).get();

if (txloq != null){

//本地事物(订单)成功了

return RocketMoLocalTransactionState.COMMIT

}else{

return RocketMolocalTransactionState.ROLLBACK;

}

}

}

首先消息回答会调用到方法里面去查本地事务状态(6),查完以后上报给服务端(7)。

调用代码:

把第一个 Controller 写上4

image.png

package com.itheima.controller;

Import...

@RestController

@slf4j

publia class ordercontroller4 {

@Autowired

private RestTemplate restTemplate;

@Autowired

private OrderServiceImp14 orderService

@Autowired

private productService productService;

//下单--fegin

@RequestMapping("/order/prod/{pid}")

public order order(CPathVariable("pid") Inteer pid) {

loq.info("接收到1)号商品的下单请求,接下来调用商品微服务查询此商品信息",pid)

//调用商品微服务,查询商品信息

Product product=productservicefindByPid(pid);

if (product.getPid()==-100){

Order order=newOrder(); order.setoid(-100L);

order.setPname("下单失败"); return order;

}

log.info("查询到(}号商品的信息,内容是:1}",pid,

JSON.tojsonstring(product));

//下单(创建订单)

Order order=newOrder();

order.setUid(1);

order.setUsername("测试用户");

order.setPid(pid);

order.setPname(product.getPname(());

order.setPprice(product.getPprice()):

order.setNumber(1);

orderService.createOrderBefore(order);

Iog.info("创建订单成功订单信息为1",

JSON.toJsoNstring(order));

return order;

}

启动微服务看效果,运行

Product 启动完毕没有问题

image.png

Order没有问题

image.png

在 OrderServicelmpl4.java 打断点,第一个打在发送半事务消息的地方 String txId=UUID.randomUUID().toString();

OrderServicelmpl4Listener.java 找到成功以后回调的地方,执行本地事务 String txId=(String)msg.getHeaders().get(“txId”);

回查的地方也打一个 String txId=(String)msg.getHeaders().get(“txId”);

刷新发送半事务消息

image.png

已经开始执行本地事务,意味着半消息事务发送成功开始回调

image.png

处理本地事务系统崩溃,因为想复现消息回查,把进程杀掉就可以停止。

Terminal 进程号:

键入“TASKKILL/以了解用法

C: \workspace\ws\springcloud-alibaba>jps

14368

34416 jar

42160 OrderApplication

43072 Jps

33428 jar

46708 ProductApplication

25160 KotlinCompileDaemon

27368 jar

43112 Launcher

34668 nacos-server.jar

C:\workspace\ws\springcloud-alibaba>taskkill-F/pid 42160

成功:已终止 PID 为 42160 的进程。

C:\workspacelws\springcloud-alibaba>

重新启动看是否进行回查,步骤相当于调用本地事务完成之后一直没有发生4,应该手动调用5来查消息。

已经基本成功

image.png

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件 SQL RocketMQ
【RocketMQ系列五】消息示例-顺序消息&延迟消息&广播消息的实现
【RocketMQ系列五】消息示例-顺序消息&延迟消息&广播消息的实现
81 1
|
消息中间件
RabbitMQ如何支持事务性消息的发送和接收
RabbitMQ消息的发送和接收
222 0
|
7月前
|
消息中间件 存储 Kafka
几种 MQ 顺序消息的实现方式
几种 MQ 顺序消息的实现方式
|
消息中间件 存储 缓存
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
|
消息中间件 存储 网络协议
多类型业务消息专题-事务消息 | 学习笔记
快速学习多类型业务消息专题-事务消息
142 0
多类型业务消息专题-事务消息 | 学习笔记
|
编解码 Java 测试技术
消息类型-普通消息|学习笔记
快速学习消息类型-普通消息
169 0
消息类型-普通消息|学习笔记
|
消息中间件 存储 网络协议
【视频】事务消息| 学习笔记
快速学习【视频】事务消息
【视频】事务消息| 学习笔记
|
消息中间件 Java 开发者
消息类型-顺序消息|学习笔记
快速学习消息类型-顺序消息
122 0
消息类型-顺序消息|学习笔记
|
消息中间件 运维 监控
多类型业务消息专题-普通消息 | 学习笔记(一)
快速学习多类型业务消息专题-普通消息
166 0
 多类型业务消息专题-普通消息 | 学习笔记(一)
|
存储 消息中间件 Linux
多类型业务消息专题-顺序消息 | 学习笔记
快速学习多类型业务消息专题-顺序消息
多类型业务消息专题-顺序消息 | 学习笔记