沿用昨天的代码,先定义交换机名称和routing key名称
public interface UserCenterMq {
/**
* 用户系统exchange名
*/
String MQ_EXCHANGE_USER = "user.topic.exchange";
/**
* 发送红包routing key
*/
String ROUTING_KEY_POST_REDPACKET = "post.redpacket";
}
写RabbitMQ的配置文件
@Configuration
public class RabbitmqConfig {
/**
* 红包队列名
*/
public static final String RED_PACKET_QUEUE = "red.packet.queue";
/**
* 声明队列,此队列用来接收用户注册的消息
*
* @return
*/
@Bean
public Queue redPacketQueue() {
Queue queue = new Queue(RED_PACKET_QUEUE);
return queue;
}
@Bean
public TopicExchange userTopicExchange() {
return new TopicExchange(UserCenterMq.MQ_EXCHANGE_USER);
}
/**
* 将红包队列和用户的exchange做个绑定
*
* @return
*/
@Bean
public Binding bindingRedPacket() {
Binding binding = BindingBuilder.bind(redPacketQueue()).to(userTopicExchange())
.with(UserCenterMq.ROUTING_KEY_POST_REDPACKET);
return binding;
}
}
修改事务侦听代码(事务确认完成后发送消息到MQ)
@Component
public class UserTransactionEventListener {
@Autowired
private AmqpTemplate rabbitTemplate;
@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
public void beforeCommit(PayloadApplicationEvent<User> event) {
System.out.println("before commit, id: " + event.getPayload().getId());
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void afterCommit(PayloadApplicationEvent<User> event) {
System.out.println("after commit, id: " + event.getPayload().getId());
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)
public void afterCompletion(PayloadApplicationEvent<User> event) {
System.out.println("after completion, id: " + event.getPayload().getId());
//事务完成后发送消息,消息为user对象
rabbitTemplate.convertAndSend(UserCenterMq.MQ_EXCHANGE_USER,UserCenterMq.ROUTING_KEY_POST_REDPACKET, JSONObject.toJSONString(event.getPayload()));
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
public void afterRollback(PayloadApplicationEvent<User> event) {
System.out.println("after rollback, id: " + event.getPayload().getId());
}
}
在RabbitMQ的管理界面内可以看到
已经发送了一个消息到该队列,即这个user对象.
在红包模块中,我们来监听这个消息队列完成分布式事务.
model
@Data
public class RedPacket implements Serializable {
private long redPacketId;
private long userId;
private Double redPacketAmount;
}
dao(此处有一个red_packet表,3个字段,1个自增)
@Mapper
public interface RedPacketDao {
@Options(useGeneratedKeys = true, keyProperty = "red_packet_id")
@Insert("insert into red_packet (user_id,red_packet_amount) values (#{userId},#{redPacketAmount})")
void add(RedPacket redPacket);
}
service
public interface RedPacketService {
public void add(RedPacket redPacket);
}
@Transactional
@Service
public class RedPacketServiceImpl implements RedPacketService {
@Autowired
private RedPacketDao redPacketDao;
@Override
public void add(RedPacket redPacket) {
redPacketDao.add(redPacket);
}
}
RabbitMQ消费者(此处为一注册用户就发一个十块以内的随机红包)
@Component
@RabbitListener(queues = RabbitmqConfig.RED_PACKET_QUEUE)
public class PostRedPacketConsumer {
@Autowired
private RedPacketService redPacketService;
@RabbitHandler
public void postRedPacket(String userStr) {
User user = JSONObject.parseObject(userStr,User.class);
RedPacket redPacket = new RedPacket();
redPacket.setUserId(user.getId());
redPacket.setRedPacketAmount((double)(Math.random() * 10));
redPacketService.add(redPacket);
}
}
运行后,该队列被消费掉
红包表增加数据