开发者学堂课程【全面讲解 Spring Cloud Alibaba 技术栈(知识精讲+项目实战)第四阶段:消息类型-事务消息】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/686/detail/11911
消息类型-事务消息
事务消息
RocketMQ 提供了事务消息,通过事务消息就能达到分布式事务的最终一致。
- 事务消息交互流程
首先关注三个地方,第一个消息发送方,第二个服务端,第三个消息订阅方,三块内容分别对应到前面学习的消息生产者,RocketMQ 服务器以及消息消费者。
第一步是由消息发送方向服务端发送一个半事务消息,消息发送方将半事务消息发送给服务端以后,服务端会返给发送方一个半事务消息发送成功的标识,接下来发送方会去执行本地事务,本地事务有两种执行结果,要么成功,要么失败,会根据成功或者失败提交一个Commit 或者 Rollback 的标识到服务端,标识是对于半事务消息的二次确认。接下来服务端会根据 Commit 或者 Rollback 执行仓的操作,如果接触到了commit 意味本地事务执行成功,接下来会将消息投递给消息订阅。如果接到的是 Rollback ,意味着本地事务失败,不会将消息在消息订阅方投递,而是暂时存储三天以后就删除掉了,这是一个流程。
接下来是另外一个流程浅黄色的线。当服务端在接收到半事务消息以后,一直收不到二次确认时,就会执行消息的回查,消息发送包会去检查本地事务是什么状态,然后根据本地事务的状态去执行 Commit 或者 Rollback 提交到服务端,接下来服务端根据 Commit 再来重复提交。
两个步骤要么投递,要么不投递,这就是事物消息的交互流程。
2、两个概念
(1)半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了 RocketMQ 服务端,但是服务端未收到生产者对该消息的二次确认(4流程),此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息,
(2)消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失(1过来,4过不来),RocketMQ 服务端通过扫描发现某条消息长期外干“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查。
写代码:
首先写代码是第一部分发送半事务消息。新开一个 service 单独来做测试,找到 order 然后 service-impl。
package com.itheima.service.impl;
Import
...
@
Service
public class orderserviceImp14{
@Autowired
private OrderDao orderDao;
@Autowired
private RocketMOTemplate rocketMOTemplate;
public void createorderBefore(
Order order
){
//要求传递四个参数,第一个事物生产者组的名字(txProducerGroup),第二个主题(destination),第三个消息(message),第四个参数(arg)
//发送半事务消息
rocketMQTemplate.sendMessageInTransaction
(
txProducerGroup: "tx producer group",
destination:"tx topic",
MessageBuilder.withPayload(order).build(),
order
);
}
}
半事务消息发送成功以后,要执行后面的步骤第二个是由服务端调我们,所以这一步没有关系,要执行本地事务第三步是提前准备好一个方法,当第二步发送成功,回调回来以后就去执行它。
单独开一个类
从步骤一创建一个本地事务,再写一个方法:
@Transactional
public void createorder(Order order) {
orderDao.save(order);
调用本地事务之后,可以把 server 调进来:
package com.itheima.service.impl;
import ...
@
service
publia class orderserviceImpl4Listener implements
RocketMQ
LocalTransactionListener {
@
Autowired
private orderServiceImp14 orderServiceImp14;
//执行本地事物
@
Override
public RocketMQLocalTransactionState executeLocalTransa
ction( Message msg, Object arg){
//订单可以从上面进行接收,有 massage、object,message对应第一步
MessageBuilder.withPayload(order).build(),,arg 对应 order
//try catch 代表成功或者失败
try {
//本地事物
Order order=(order) arg;
orderServiceImp14.createOrder(order);
return RocketMolocalTransactionstate.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionstate.ROLLBACK
}
}
消息回查如何知道消息到底是否成功或者失败,要加入一张表来记录是否成功,这张表一般叫 tx log 也是事物的日志表。
先创建一个实体类:
package com.itheima.domain;
import lombok.Data;
import javax.persistence.Entity;
import javax.persistence.Id;
import java.util.Date;
//消息事物状态记录
@Entity(name = "shop txlog")
@Data
public class TxLog {
@Id
private string txid; private Date date;
}
首先准备一下 though ,找到 order 位置,Dao 拷贝了 Tx logDao
package com.itheima.dao
import ...
public interface TxLogDao extends JpaRepository
string> {
事物的成功与失败是根据本地事务来算的,本地事务写在了单独的方法
private orderDao orderDao;
@Autowired
private TxLogDao txLogDao;
@Autowired
private RocketMQTemplate rocketMOTemplate;
public void createorderBefore(Order order){
String txId=UUID.randomUUID().toString();
//发送半事务消息
rocketMOTemplate.sendMessageInTransaction(
txProducerGroup:"txproducer group",
destination:"tx topic",
MessageBuilder.withPayload(order).setHeader( headerName: "txid",txid).build(),
//txid 可以通过 msg 接收
order
;}
}
//如果执行成功就成功,失败就失败
@Transactional
publia void createorder(Order order){
orderDao.save(order);
TxLog txLog=newTxLog();
txLog.setTxId);
txLog.setDate(newDate());
txLogDao.save(txLog)
}
txid 可以通过 msg 接收:
p
rivate OrderServiceImp14 orderServiceImp14;
//执行本地事物
@Override
public RocketMQLocalTransactionState
executeLocalTransacti
on(Message msg, Object arg)
{
String txId=(String)msggetHeaders().get("txId");
try {
//本地事物
order order=(order)ar
g
;
orderserviceImp14.createorder(txIdorder);
return RocketMolocalTransactionState.COMMTT:
}catch(Exception e){
return RocketMQLocalTransactionStateROLLBACK}
}
}
//消息回查
@Override
public RocketMOLocalTransactionState
checkLocalTransaction
(Message ms
g
){
return null;
}
}
OrderServicelmp14.Java 可以接 ID:
@
Transactional
public void createorder(string txid, Order order){
//保存订单
orderDao.save(order);
TxLog txLog=new TxLog();
txLog.setTxId(txId);
txLog.setDate(new Date(());
//记录事务日志
txLogDao.save(txLog);
}
}
因为是用本地事务控制,两个一定是同时成功同时失败的,对于消息回答的时候可以去数据库表里查,如果能查到说明肯定是保存成功,如果查不到肯定是保存失败。
写一下回查:
Import
...
@service
@
RocketMQTransactionListener(txProducerGroup =
"tx
_
produ
cer group")
public class orderserviceImpl4Listener implements
RocketMQ
LocalTransactionListener {
@Autowired
private OrderServiceImp14 orderServiceImp14;
@
Autowired
private TxLogDao txlogDao;
//执行本地事物
@
override
public RocketMQLocalTransactionState
executeLocalTransacti
on(Message msg, Object arq)(
S
tring txId=(String)msggetHeaders().get("txId");
try {
//本地事物
Order order=(Order)arg;
orderServiceImp14.createOrder(txIdorder);
return RocketMolocalTransactionState.COMMIT:}catch
(Exce
ptione){
return RocketMolocalTransactionStateROLLBACK;
}
}
//消息回查
@
Override
public RocketMQLocalTransactionstate
checklocalTransaction
(Message msg)
string txId=(String)msg.getHeaders().get("txid");
TxLog txLog=txLogDao.findById(txId).get();
if (txloq != nu
ll
){
//本地事物(订单)成功了
return RocketMoLocalTransactionState.COMMIT
}
else{
return RocketMolocalTransactionState.ROLLBACK;
}
}
}
首先消息回答会调用到方法里面去查本地事务状态(6),查完以后上报给服务端(7)。
调用代码:
把第一个 Controller 写上4
package com.itheima.controller;
Import...
@RestController
@slf4j
publia class ordercontroller4 {
@Autowired
private RestTemplate restTemplate;
@Autowired
private OrderServiceImp14 orderService
@Autowired
private productService productService;
//下单--fegin
@RequestMapping("/order/prod/{pid}")
public order order(CPathVariable("pid") Inteer pid) {
loq.info("接收到1)号商品的下单请求,接下来调用商品微服务查询此商品信息",pid)
//调用商品微服务,查询商品信息
Product product=productservicefindByPid(pid);
if (product.getPid()==-100){
Order order=newOrder(); order.setoid(-100L);
order.setPname("下单失败"); return order;
}
log.info("查询到(}号商品的信息,内容是:1}",pid,
JSON.tojsonstring(product));
//下单(创建订单)
Order order=newOrder();
order.setUid(1);
order.setUsername("测试用户");
order.set
P
id(pid);
order.setPname(product.getPname(());
order.setPprice(product.getPprice()):
order.setNumber(1);
orderService.createOrderBefore(order);
Iog.info("创建订单成功订单信息为1",
JSON.toJsoNstring(order));
return order;
}
启动微服务看效果,运行
Product 启动完毕没有问题
Order没有问题
在 OrderServicelmpl4.java 打断点,第一个打在发送半事务消息的地方 String txId=UUID.randomUUID().toString();
OrderServicelmpl4Listener.java 找到成功以后回调的地方,执行本地事务 String txId=(String)msg.getHeaders().get(“txId”);
回查的地方也打一个 String txId=(String)msg.getHeaders().get(“txId”);
刷新发送半事务消息
已经开始执行本地事务,意味着半消息事务发送成功开始回调
处理本地事务系统崩溃,因为想复现消息回查,把进程杀掉就可以停止。
Terminal 进程号:
键入“TASKKILL/”以了解用法
C: \workspace\ws\springcloud-alibaba>
jps
14368
34416 jar
42160 OrderApplication
43072 Jps
33428 jar
46708 ProductApplication
25160 KotlinCompileDaemon
27368 jar
43112 Launcher
34668 nacos-server.jar
C:\workspace\ws\springcloud-alibaba>
taskkill-F/pid 42160
成功:已终止 PID 为 42160 的进程。
C:\workspacelws\springcloud-alibaba>
重新启动看是否进行回查,步骤相当于调用本地事务完成之后一直没有发生4,应该手动调用5来查消息。
已经基本成功