RocketMQ与MYSQL事务消息整合

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: rocketmq事务消息与mysql事物整合

1、基础理论知识篇“两阶段提交”如果你了解可以跳过这段,当然如果你想深入了解你可以购买相关书籍或去搜索相关资料阅读
  两阶段提交分为 正常提交和异常提交或异常回滚
image

   上面是正常提交的示意图,协调者发起预提交请求,参与者回复成功之后协调者再次发起commit请求,统一提交事物。事物结束。

   如果这两阶段提交过程当中有任何一个请求出现异常就会回滚,如下流程:

image

   异常请求包括预提交 返回预提交的应答,commit请求 等任何一个失败都会导致整个事物回滚。

  二阶段提交的问题
    “二阶段提交”还有一个很严重的问题就是如果commit过程当中失败了 就导致了全部事物失败,代价很大,简单粗暴的处理方式

     还有一个问题是如果 commit过程中网络出现问题 commit没有被整个事物的参与者之一或者多个收到,这个时候就会出现数据不一致现象。

  可能大家会提到 协调者是谁,参与者又是谁那?

           这里简单说下自己的理解

     如果在你的应用程序中你是通过 begin等相关操作语句开始的,比如 你使用了spring的@Transactional注解等,

     那协调者就是你的“应用程序”,参与者就是 mysql或其他支持事物的数据库系统。

     如果你就直接向mysql发送了一条sql语句mysql是自行提交的,那协调者和参与者都是mysql数据库自己

2、这里说下mysql对所谓的“重复数据”提供的相关sql或关键字。

   unique 唯一主键约束

          在sql事物中和应用程序中都可以捕获这个错误码或异常,可以作为幂等判断的一个依据。

   upset 操作,发现唯一主键冲突然后更新相关数据,mongodb有直接使用的sql方法语句

          示例:insert into tablename(column1) values(123) on duplicate key update column1 =column1 +123

    ignore 忽略操作对于多余的操作直接忽略

          insert ignore into tablename(column1)  values(123)

  基础篇说完很多内容如果想深入了解可以自己找资料处理。下面是华丽分割线

3、在我们原有的认知里有一个方案就差那么一点点就可以大面积使用的。

   我们之前可能想过怎样既能发送mq又能写数据库,下面这个方案会分接近我们的愿望。

   我们遵从如下步骤进行代码处理:

   1、开启数据库事物执行相关sql

   2、发送MQ消息

   3、提交数据库事物

   (注意:以上每一步都是在上一步执行成功之后在执行下一步的)

   根据步骤我画出了下面的流程图

image

其实这个流程是有一个漏洞的,如果我把上面的流程图改造为下面的二阶段提交的示意图就会很明显的看出来

image

    不知道大家有么有发现问题,是不是 各种提交和回滚操作都是针对的数据库,而不是MQ。commit数据库事物出现异常就会造成数据不一致现象。

    其实也不用在想有没有其他的流程方案能解决分布式双写问题,只要存在多写问题就存在数据不一致问题的现象,

    所以就出现了3pc Paxos 等协议来解决分布式事物/一致性的问题。
    下面我们开始介绍怎么使用mysql和RocketMQ来实现事物问题

     华丽分割线

4、RocketMQ事物消息的过程

   1、发送MQ的事物消息

   2、事物消息发送成功后会同步触发对应执行本地接口来执行针对mysql数据库的操作

   3、如果有未commit的消息,RocketMQ 的 broker会定时间隔时间来回查数据库事物是否已经提交完成

5、结合RocketMQ的事物消息与Mysql数据库事物的实现思想

image

  如果上面的二阶段提交你已经理解了,你会发现我这里设计的流程(上面图的流程)有点不太一样的地方

    什么地方那?

    MQ事物消息回滚的时候是因为mysql数据库事物没有提交成功而导致的,也就是说如果mysql数据库事务成功了MQ的事务消息是一定要成功的

    否则就会出现事物不一致的现象。

    假如发送MQ的prepare消息成功了,执行mysql事物的操作也成功了,但是偏偏返回给MQ的commit消息丢失了,那这个时候数据库消息并不会回滚。

  所以就有了回查本地事物消息是否成功的操作,来对MQ的消息做个补偿动作实现数据一致性

    理解了二阶段提交以及RocketMQ的事物实现之后你就可以自己设计事物相关操作的执行顺序了

    (这里的流程设计以及包括我的代码实现是以我的理解做出的最佳实践)

6、RocketMQ与Mysql事物结合注意事项

   1、如果应用程序承担协调者的工作就尽量晚开启事物和尽量早的提交数据库事物,事物中的sql对数据竞争多的sql尽量靠后

        因为执行数据库事物会有各种锁操作,减少锁的生命周期,数据库是稀缺资源,大家能省则省

   2、数据库事物最好设置超时时间,超时之后自动解除,最好不超过1分钟

   3、MQ默认1分钟之后回查一次已发送message但未commit的消息,最多回查15次,之后会执行回滚操作

   4、应用程序一定要做好幂等处理(可以参考上面mysql相关语句实现幂等接口)

   5、网络不要太差,否则会造成大量的重试,重试就会影响消息的及时性

   6、适用场景

                单次请求数量小

                每次请求会有数据产生,而不是查询产生的数据(比如 insert操作叫生产数据,select操作不生产数据)

                下游可以接受一定的延迟(这里有两个因素,有应用程序本身和Broker,这里指broker)

                下游服务或系统以接收到的消息为依据做相应的操作

                 MQ消息作为主要信息传递的工具
     下面说下具体代码实现

     华丽分割线

7、实战代码解析

   首先附上源码地址 https://github.com/zygfengyuwuzu/springboot-rocketmq-example

   下面将针对关键代码进行讲解

   首先介绍一下代码目录

image

     了解了上面的代码目录下面说下代码的执行流程

   首先看事物消息生产者的实例对象创建
package rocketmq_example.mqandmysqltraction.producer;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**

  • 生产者和消费者测试的时候记得注掉一中的一个以免观察不出效果
  • */

@Component
public class TransactionProducer {
static Logger logger = LoggerFactory.getLogger(TransactionProducer.class);

public DefaultMQProducer producer = null;

@Autowired
TransactionListener transactionListenerImp;

@PostConstruct
private void init() throws MQClientException {
logger.info("MQ事物生产者初始化开始--------------------------------------------------");
TransactionMQProducer transactionProducer = new TransactionMQProducer("mytestgroup");
// Producer 组名, 多个 Producer 如果属于一 个应用,发送同样的消息,则应该将它们 归为同一组
//transactionProducer.setProducerGroup("mytestgroup");
// Name Server 地址列表
transactionProducer.setNamesrvAddr("10.10.6.71:9876;10.10.6.72:9876");
// 超时时间 这里一定要大于数据库事物执行的超时时间
transactionProducer.setSendMsgTimeout(90000);
//这个线程池作用就是 mqbroker端回调信息的本地处理线程池
ExecutorService executorService = new ThreadPoolExecutor(1, 5, 100, TimeUnit.SECONDS,

new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
 @Override
 public Thread newThread(Runnable r) {
  Thread thread = new Thread(r);
  thread.setName("client-transaction-msg-check-thread");
  return thread;
 }
});

transactionProducer.setExecutorService(executorService);
transactionProducer.setTransactionListener(transactionListenerImp);
producer = transactionProducer;
producer.start();
logger.info("MQ事物生产者初始化结束--------------------------------------------------");
}
public SendResult send(Message me) throws Exception {
return producer.send(me);
}
/**

  • 发送普通消息
  • @param Topic
  • @param Tags
  • @param body
  • @return
  • @throws Exception
    */

public SendResult send(String Topic, String Tags, String body) throws Exception {
Message me = new Message();
// 标示
me.setTopic(Topic);
// 标签
me.setTags(Tags);
// 内容
me.setBody(body.getBytes(RemotingHelper.DEFAULT_CHARSET));
return producer.send(me);
}
/**

  • 发送普通消息
  • @param Topic
  • @param Tags
  • @param key
  • @param body
  • @return
  • @throws Exception
    */

public SendResult send(String Topic, String Tags, String key, String body) throws Exception {
try {
Message me = new Message(Topic, Tags, key, 0, body.getBytes(RemotingHelper.DEFAULT_CHARSET), true);
return producer.send(me);
} catch (Exception e) {
logger.error("发送MQ信息异常Topic{},Tags{},key{},body{}", Topic, Tags, key, body);
throw e;
}
}
@PreDestroy
public void Destroy() {
producer.shutdown();
}
}
  上面的代码我们接收到请求传输过来的数据之后,首先做了MQ消息对象的创建,创建成功之后直接发送MQ事物消息

  事物消息发送成功之后会调用上面设置的接口实现类的TransactionListenerImpl.executeLocalTransaction()这个方法。

  接口实现的方法代码如下:

package rocketmq_example.mqandmysqltraction.producer;

import java.util.List;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import rocketmq_example.mqandmysqltraction.MyTableModel;
import rocketmq_example.mqandmysqltraction.MytableService;

/**

  • 把数据库事物嵌套在mq事物当中不能显示抛出异常
  • @author zyg
    *

*/
@Component
public class TransactionListenerImpl implements TransactionListener {

static Logger logger = LoggerFactory.getLogger(TransactionListenerImpl.class);

@Autowired
MytableService mytableService;

/**

  • 一定要设置执行sql时间,尽量不要超时
  • */

@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
logger.info("开始执行本地数据库事物 transactionid:{}", msg.getTransactionId());
LocalTransactionState lts = LocalTransactionState.UNKNOW;
@SuppressWarnings("unchecked")
List mytablelist = (List) arg;
try {
long start=System.currentTimeMillis();
//数据库事物执行时间不要超过mq回查时间 默认15分钟
mytableService.execMytableinsert2(mytablelist, msg.getTransactionId());
logger.info("执行数据库事物耗时:{}",System.currentTimeMillis()-start);
lts = LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
logger.error("数据库事务异常", e);
lts = LocalTransactionState.ROLLBACK_MESSAGE;
}
logger.info("结束执行本地数据库事物 transactionid:{} 返回:{}", msg.getTransactionId(),lts);
return lts;
}

/**

  • 去数据库查询看看是否存在已经成功发送预提交数据而没有commit成功的mq信息
  • 每分钟1次默认15次
  • 这里可以做个计数 让MQ重试5次/5分钟就回滚减轻MQ回查的压力
  • */

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
if (mytableService.existMyTableModelByMsgid(msg.getTransactionId())) {
logger.info("查询到已提交事物 transactionid:{}",msg.getTransactionId());
return LocalTransactionState.COMMIT_MESSAGE;
} else {
logger.info("未查到已提交事物 transactionid:{}",msg.getTransactionId());
return LocalTransactionState.UNKNOW;
}

}

}

 上面代码有两个方法,这里说下两个方法的作用和执行时间

         executeLocalTransaction这个方法是发送完 事物消息 之后同步被调用到的方法,用来执行本地事物操作

         executeLocalTransaction方法有两个参数,第一个是发送成功之后的message消息,在这个方法中包含事物ID其实就是msgid

         第二个参数是object类型的是从dataapi传过来,

         我的代码中没做任何处理直接传递过来了然后直接转化传递给了service层进行事物处理

         这个executeLocalTransaction方法里面为什么要直接返回commit或rollback,

         目的是尽量快的告诉MQ我的数据库事务执行成功了,

         尽快将half消息转为正常消息,已备消费者消费到做业务处理。

         这里完全可以直接返回unknow,等待broker回查来实现commit操作的。但是这样做对回查消息broker造成一定的压力。

  上面代码的第二个方法是提供给broker回调执行的,进行检查本地事务是否成功执行的操作,发起方是broker

         这里面我们接收到broker的回查请求之后直接去数据库查询是否存在broker提供的事务ID的数据

         如果存在返回commit标识,如果不存在返回unknow标识以等待下一次再来回查

  到此我们的一个事务操作就算完成了

  另外大家可以直接查看service层的实现代码,就不一一解释了
package rocketmq_example.mqandmysqltraction;

import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import com.fasterxml.jackson.databind.ObjectMapper;

@Service
public class MytableService {
static Logger logger = LoggerFactory.getLogger(MytableService.class);

@Autowired
IMytableMapper mytable;

@Autowired
ObjectMapper objMapper;

/**

  • 这里可以显示提交事物 返回boolean 一条一条插入只是为了展现事物的特性 获取所有异常 处理你的业务逻辑等等
  • @param mytablemodels
  • @return
    */

@Transactional(rollbackFor = Exception.class, timeout = 60000)
public List execMytableinsert2(List mytablemodels, String msgid) {

// logger.info("开始执行数据库事物");
List result = new ArrayList();
for (MyTableModel myTableModel : mytablemodels) {
// 插入数据库
myTableModel.setMsgid(msgid);
mytable.insertmytable(myTableModel);
result.add(myTableModel.getId());
}
// logger.info("结束执行数据库事物");
return result;
}

public boolean existMyTableModelById(Integer id) {
MyTableModel myTableModel = mytable.selectMyTableModelById(id);
if (myTableModel != null && null != myTableModel.getId()) {
return true;
}
return false;
}

/**

  • 查询是否存在已经发送过的msgid消息
  • @param msgid
  • @return
    */

public boolean existMyTableModelByMsgid(String msgid) {
int count = mytable.selectMyTableModelByMsgid(msgid);
if (count > 0) {
return true;
}
return false;
}

public void insetmsg(MyTableModel mytablemodel) {
try {
mytable.insertmsgrecord(mytablemodel);

} catch (org.springframework.dao.DuplicateKeyException e) {
logger.error("主键冲突异常被捕获",e);
}
}
}
非常感谢你能看到这里!!!看到这里相信你已经对本篇博客的内容有所了解了!如果有什么问题或者想不通的地方欢迎评论区进行讨论。

如果有不正确的地方恳请指正
image

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
30天前
|
存储 SQL 关系型数据库
MySQL的事务隔离级别
【10月更文挑战第17天】MySQL的事务隔离级别
97 43
|
1月前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
1月前
|
存储 缓存 关系型数据库
MySQL事务日志-Redo Log工作原理分析
事务的隔离性和原子性分别通过锁和事务日志实现,而持久性则依赖于事务日志中的`Redo Log`。在MySQL中,`Redo Log`确保已提交事务的数据能持久保存,即使系统崩溃也能通过重做日志恢复数据。其工作原理是记录数据在内存中的更改,待事务提交时写入磁盘。此外,`Redo Log`采用简单的物理日志格式和高效的顺序IO,确保快速提交。通过不同的落盘策略,可在性能和安全性之间做出权衡。
1638 14
|
2月前
|
存储 Oracle 关系型数据库
Oracle和MySQL有哪些区别?从基本特性、技术选型、字段类型、事务、语句等角度详细对比Oracle和MySQL
从基本特性、技术选型、字段类型、事务提交方式、SQL语句、分页方法等方面对比Oracle和MySQL的区别。
523 18
Oracle和MySQL有哪些区别?从基本特性、技术选型、字段类型、事务、语句等角度详细对比Oracle和MySQL
|
1月前
|
SQL 关系型数据库 MySQL
阿里面试:MYSQL 事务ACID,底层原理是什么? 具体是如何实现的?
尼恩,一位40岁的资深架构师,通过其丰富的经验和深厚的技術功底,为众多读者提供了宝贵的面试指导和技术分享。在他的读者交流群中,许多小伙伴获得了来自一线互联网企业的面试机会,并成功应对了诸如事务ACID特性实现、MVCC等相关面试题。尼恩特别整理了这些常见面试题的系统化解答,形成了《MVCC 学习圣经:一次穿透MYSQL MVCC》PDF文档,旨在帮助大家在面试中展示出扎实的技术功底,提高面试成功率。此外,他还编写了《尼恩Java面试宝典》等资料,涵盖了大量面试题和答案,帮助读者全面提升技术面试的表现。这些资料不仅内容详实,而且持续更新,是求职者备战技术面试的宝贵资源。
阿里面试:MYSQL 事务ACID,底层原理是什么? 具体是如何实现的?
|
1月前
|
消息中间件 监控 供应链
深度剖析 RocketMQ 事务消息!
本文深入探讨了 RocketMQ 的事务消息原理及其应用场景。通过详细的源码分析,阐述了事务消息的基本流程,包括准备阶段、提交阶段及补偿机制。文章还提供了示例代码,帮助读者更好地理解整个过程。此外,还讨论了事务消息的优缺点、适用场景及注意事项,如确保本地事务的幂等性、合理设置超时时间等。尽管事务消息增加了系统复杂性,但在需要保证消息一致性的场景中,它仍是一种高效的解决方案。
86 2
|
2月前
|
SQL 关系型数据库 MySQL
MySQL基础:事务
本文详细介绍了数据库事务的概念及操作,包括事务的定义、开启、提交与回滚。事务作为一组不可分割的操作集合,确保了数据的一致性和完整性。文章还探讨了事务的四大特性(原子性、一致性、隔离性、持久性),并分析了并发事务可能引发的问题及其解决方案,如脏读、不可重复读和幻读。最后,详细讲解了不同事务隔离级别的特点和应用场景。
144 4
MySQL基础:事务
|
2月前
|
NoSQL 关系型数据库 Redis
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
mall在linux环境下的部署(基于Docker容器),docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongodb、minio详细教程,拉取镜像、运行容器
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
|
1月前
|
SQL 关系型数据库 MySQL
【MySQL】索引和事务
【MySQL】索引和事务
55 0
|
2月前
|
SQL Oracle 关系型数据库
详解 MySQL 的事务以及隔离级别
详解 MySQL 的事务以及隔离级别
41 0
下一篇
无影云桌面