给注册用户发红包,RabbitMQ实现(分布式事务2)

简介:

沿用昨天的代码,先定义交换机名称和routing key名称

public interface UserCenterMq {

/**

* 用户系统exchange名
*/

String MQ_EXCHANGE_USER = "user.topic.exchange";

/**

* 发送红包routing key
*/

String ROUTING_KEY_POST_REDPACKET = "post.redpacket";
}
写RabbitMQ的配置文件

@Configuration
public class RabbitmqConfig {

/**

* 红包队列名
*/

public static final String RED_PACKET_QUEUE = "red.packet.queue";

/**

* 声明队列,此队列用来接收用户注册的消息
* 
* @return
*/

@Bean
public Queue redPacketQueue() {

  Queue queue = new Queue(RED_PACKET_QUEUE);

  return queue;

}

@Bean
public TopicExchange userTopicExchange() {

  return new TopicExchange(UserCenterMq.MQ_EXCHANGE_USER);

}

/**

* 将红包队列和用户的exchange做个绑定
* 
* @return
*/

@Bean
public Binding bindingRedPacket() {

  Binding binding = BindingBuilder.bind(redPacketQueue()).to(userTopicExchange())
        .with(UserCenterMq.ROUTING_KEY_POST_REDPACKET);
  return binding;

}
}
修改事务侦听代码(事务确认完成后发送消息到MQ)

@Component
public class UserTransactionEventListener {

@Autowired
private AmqpTemplate rabbitTemplate;

@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
public void beforeCommit(PayloadApplicationEvent<User> event) {
    System.out.println("before commit, id: " + event.getPayload().getId());
}

@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void afterCommit(PayloadApplicationEvent<User> event) {
    System.out.println("after commit, id: " + event.getPayload().getId());
}

@TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)
public void afterCompletion(PayloadApplicationEvent<User> event) {
    System.out.println("after completion, id: " + event.getPayload().getId());
    //事务完成后发送消息,消息为user对象
    rabbitTemplate.convertAndSend(UserCenterMq.MQ_EXCHANGE_USER,UserCenterMq.ROUTING_KEY_POST_REDPACKET, JSONObject.toJSONString(event.getPayload()));
}

@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
public void afterRollback(PayloadApplicationEvent<User> event) {
    System.out.println("after rollback, id: " + event.getPayload().getId());
}

}
在RabbitMQ的管理界面内可以看到
image
image
image
已经发送了一个消息到该队列,即这个user对象.

在红包模块中,我们来监听这个消息队列完成分布式事务.

model

@Data
public class RedPacket implements Serializable {

private long redPacketId;
private long userId;
private Double redPacketAmount;

}
dao(此处有一个red_packet表,3个字段,1个自增)

@Mapper
public interface RedPacketDao {

@Options(useGeneratedKeys = true, keyProperty = "red_packet_id")
@Insert("insert into red_packet (user_id,red_packet_amount) values (#{userId},#{redPacketAmount})")
void add(RedPacket redPacket);

}
service

public interface RedPacketService {

public void add(RedPacket redPacket);

}
@Transactional
@Service
public class RedPacketServiceImpl implements RedPacketService {

@Autowired
private RedPacketDao redPacketDao;
@Override
public void add(RedPacket redPacket) {
    redPacketDao.add(redPacket);
}

}
RabbitMQ消费者(此处为一注册用户就发一个十块以内的随机红包)

@Component
@RabbitListener(queues = RabbitmqConfig.RED_PACKET_QUEUE)
public class PostRedPacketConsumer {

@Autowired
private RedPacketService redPacketService;

@RabbitHandler
public void postRedPacket(String userStr) {
    User user = JSONObject.parseObject(userStr,User.class);
    RedPacket redPacket = new RedPacket();
    redPacket.setUserId(user.getId());
    redPacket.setRedPacketAmount((double)(Math.random() * 10));
    redPacketService.add(redPacket);
}

}
运行后,该队列被消费掉
image
红包表增加数据
image

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
8月前
|
消息中间件 存储 监控
RabbitMQ:分布式系统中的高效消息队列
RabbitMQ:分布式系统中的高效消息队列
|
8月前
|
消息中间件 RocketMQ 微服务
RocketMQ 分布式事务消息实战指南
RocketMQ 分布式事务消息实战指南
641 1
|
27天前
|
消息中间件 SQL 中间件
大厂都在用的分布式事务方案,Seata+RocketMQ带你打破10万QPS瓶颈
分布式事务涉及跨多个数据库或服务的操作,确保数据一致性。本地事务通过数据库直接支持ACID特性,而分布式事务则需解决跨服务协调难、高并发压力及性能与一致性权衡等问题。常见的解决方案包括两阶段提交(2PC)、Seata提供的AT和TCC模式、以及基于消息队列的最终一致性方案。这些方法各有优劣,适用于不同业务场景,选择合适的方案需综合考虑业务需求、系统规模和技术团队能力。
183 7
|
3月前
|
消息中间件 网络协议 C#
C#使用Socket实现分布式事件总线,不依赖第三方MQ
`CodeWF.EventBus.Socket` 是一个轻量级的、基于Socket的分布式事件总线系统,旨在简化分布式架构中的事件通信。它允许进程之间通过发布/订阅模式进行通信,无需依赖外部消息队列服务。
C#使用Socket实现分布式事件总线,不依赖第三方MQ
|
5月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
151 2
|
5月前
|
Java Nacos Docker
"揭秘!Docker部署Seata遇上Nacos,注册成功却报错?这些坑你不得不防!一网打尽解决秘籍,让你的分布式事务稳如老狗!"
【8月更文挑战第15天】在微服务架构中,Nacos搭配Seata确保数据一致性时,Docker部署Seata后可能出现客户端连接错误,如“can not connect to services-server”。此问题多由网络配置不当、配置文件错误或版本不兼容引起。解决策略包括:调整Docker网络设置确保可达性;检查并修正`file.conf`和`registry.conf`中的Nacos地址和端口;验证Seata与Nacos版本兼容性;修改配置后重启服务;参考官方文档和最佳实践进行配置。通过这些步骤,能有效排除故障,保障服务稳定运行。
390 0
|
8月前
|
消息中间件 存储 负载均衡
分布式消息传递新时代:深入了解RabbitMQ_sharding插件的精髓【RabbitMQ 八】
分布式消息传递新时代:深入了解RabbitMQ_sharding插件的精髓【RabbitMQ 八】
159 0
|
7月前
|
消息中间件 中间件 程序员
分布式事务大揭秘:使用MQ实现最终一致性
本文由小米分享,介绍分布式事务中的MQ最终一致性实现,以RocketMQ为例。RocketMQ的事务消息机制包括准备消息、本地事务执行、确认/回滚消息及事务状态检查四个步骤。这种机制通过消息队列协调多系统操作,确保数据最终一致。MQ最终一致性具有系统解耦、提高可用性和灵活事务管理等优点,广泛应用于分布式系统中。文章还讨论了RocketMQ的事务消息处理流程和失败情况下的处理策略,帮助读者理解如何在实际应用中解决分布式事务问题。
480 6
|
7月前
|
消息中间件 监控 调度
构建Python中的分布式系统结合Celery与RabbitMQ
在当今的软件开发中,构建高效的分布式系统是至关重要的。Python作为一种流行的编程语言,提供了许多工具和库来帮助开发人员构建分布式系统。其中,Celery和RabbitMQ是两个强大的工具,它们结合在一起可以为你的Python应用程序提供可靠的异步任务队列和消息传递机制。
|
7月前
|
存储 关系型数据库 Java
技术经验解读:三种分布式事务LCN、Seata、MQ
技术经验解读:三种分布式事务LCN、Seata、MQ
237 0