正文
一、原理
1、生产者投递事务消息到Broker中,设置该消息为半消息,不可以被消费 。
2、broker在刷盘成功之后返回ack给生产者。
3、生产者执行本地事务
4、生产者将本地事务执行结果,告知Broker。
5、如果事务执行成功,则将半消息设置成可以消费,然后消费者进行消费,,如果本地事务执行失败,则将半消息删除,进行回滚。
6、如果由于网络原因或者其他原因,Broker一直没有收到本地事务执行的结果,则Broker每隔60s主动获取本地事务执行的结果,若果获取到则设置半消息可以消费,反之继续重试。
二、代码
生产者代码
package com.xiaojie.rocket.producer; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.TransactionSendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; /** * @author xiaojie * @version 1.0 * @description: 订单生产者 * @date 2021/11/14 23:25 */ @Component @Slf4j public class OrderProducer { @Autowired private RocketMQTemplate rocketMQTemplate; public TransactionSendResult sendSyncMessage(String msg, String destination, String tag) { log.info("【发送消息】:{}........", msg); Message<String> message = MessageBuilder.withPayload(msg).build(); TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(destination, message,null); log.info("【发送状态】:{}", result.getLocalTransactionState()); return result; } }
生产者监听
package com.xiaojie.rocket.listener; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.xiaojie.rocket.mapper.OrderMapper; import com.xiaojie.rocket.pojo.Order; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.stereotype.Component; import java.io.InputStream; /** * @author xiaojie * @version 1.0 * @description: 回调监听 * @date 2021/11/14 23:59 */ @Slf4j @Component @RocketMQTransactionListener public class ProducerListener implements RocketMQLocalTransactionListener { @Autowired private OrderMapper orderMapper; /** * @description: 执行本地事务 * @author xiaojie * @date 2021/11/15 0:05 * @version 1.0 */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { try { if (message == null) { return null; } String msg = new String((byte[]) message.getPayload()); log.info("发送的消息是message>>>>>>>>>",msg); Order order = JSONObject.parseObject(msg, Order.class); int insert = orderMapper.insert(order); if (insert>0){ //事务执行成功 return RocketMQLocalTransactionState.COMMIT; }else{ return RocketMQLocalTransactionState.ROLLBACK; } } catch (Exception e) { e.printStackTrace(); return RocketMQLocalTransactionState.ROLLBACK; } } /** * @description: 检查本地事务 * @author xiaojie * @date 2021/11/15 0:06 * @version 1.0 */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { try { if (message == null) { //如果为空,有可能是网络原因,不能删除数据,继续重试 return RocketMQLocalTransactionState.UNKNOWN; } String msg = new String((byte[]) message.getPayload()); log.info("发送的消息是message>>>>>>>>>",msg); Order order = JSONObject.parseObject(msg, Order.class); QueryWrapper queryWrapper=new QueryWrapper(order.getOrderid()); Order dbOrder = orderMapper.selectOne(queryWrapper); if (dbOrder == null) { return RocketMQLocalTransactionState.UNKNOWN; } return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { e.printStackTrace(); return RocketMQLocalTransactionState.UNKNOWN; } } }
消费者
package com.xiaojie.rocket.consumer; import com.alibaba.fastjson.JSONObject; import com.xiaojie.rocket.mapper.DispatchMapper; import com.xiaojie.rocket.pojo.Dispatch; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author xiaojie * @version 1.0 * @description: 派单消费者 * @date 2021/11/15 0:23 */ @Component @RocketMQMessageListener(consumerGroup = "order-consumer", topic = "order-topic-test") @Slf4j public class DispatchConsumer implements RocketMQListener<String> { @Autowired private DispatchMapper dispatchMapper; @Override public void onMessage(String msg) { log.info(">>>>>>>>>>>>>>>>>>>>>",msg); JSONObject jsonObject = JSONObject.parseObject(msg); String orderId = jsonObject.getString("orderId"); // 计算分配的快递员id Dispatch dispatch=new Dispatch(); dispatch.setOrderid(orderId); //经过一系列的算法得到送餐时间为30分钟 dispatch.setSendtime(30*60L); dispatch.setRiderid(1000012L); dispatch.setUserid(15672L); // 3.插入我们的数据库 int result = dispatchMapper.insert(dispatch); } }