事务消息的实现|学习笔记

简介: 快速学习事务消息的实现

开发者学堂课程【RocketMQ知识精讲与项目实战(第一阶段)事务消息的实现】学习笔记,与课程紧密联系,让用户快速学习知识

课程地址:https://developer.aliyun.com/learning/course/702/detail/12389


事务消息的实现

 

内容介绍:

一、创建生产者和消费者

二、设置监听器

三、执行三种情况

 

一、创建生产者和消费者

通过代码将事务消息实现,先将生产者和消费者先创建出来,然后在 producer 中创建 TransactionMQproducer 也就是事务的消息生产者。

//1.创建消息生产者 producer,并制定生产者组名

DefaultMQProducer producer = new DefaultMQProducer("group1");

//2.指定 Nameserver.地址producer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876");

//3.启动 producer

producer.start();

for (int i =0; i <10; i++) {

//4.创建消息对象,指定主售 Topic、Tag 和消息体

/**

*参数一,消息主心 Topic

*参数二:消息 Tag

*参数三:消息内

*/

image.png

Message msg = new Message( topic: "base", taqs: "Tag1",(“Hello World"+i).getBytes());

//5.发送消

SendResult result=producer.send(msg);

//发送状态

Sendstatus status = result.getsendstatus(); System.out.println(“发送结果:"+result);

//线程1秒

TimeUnit.SECONDS.sleep(1);

)

//6.关闭生产者 producer

producer,shutdown();

 

二、设置监听器

启动之前要设置消息事务的监听器,因此要添上这一部分:

(TransactionMQProducer) producer).setTransactionListener();

在流程中第二步消息MQ接收到半消息之后,就可以执行本地事务了,而告诉是提

交还是回滚需要一个入口,所以这个监听器也就是处理本地事务的一个入口。

//加事务监听器

producer.setTransactionListener(new TransactionListener(){

/**

*方法中执行本地事务

*@param msg

*@param arg*

*@return

*/

@Override

public LocalTransactionstate execute falTransaction(Message msg, object arg

}

return null;}

/**

*@param msg

*@return

*/

@Override

publicLocalTransactionstate checkLocalTransaction(Messageext msg) {

return null;

}

});

//3.启动producer

Producer.start();

在启动消费者之前,要先构建消息对象,将topic改为

transactiontopic,发送消息也需要一并修改。

 

三、执行三种情况

举例设置三个不同的tag消息“taga”,”tagb“,”tagc”,在执行本地事务处设

置一个判断:有提交、回滚、不做处理三个情况

publicLocalTnansactionstate executelocalTransaction(Message msg, Object arg) {

If(StringUtils.equals("TAGA",msg.getTags())){

return localTransactionstate.COMMIT MESSAGE;

}else if(stringutils.equals(“TAGB",msg.getTags())){

return LocalTransactionState.ROLLBACK_MESSAGE;

}else if(stringutils.equals(“TAGc",msg.getTags())){

return LocalTransactionState.UNKNOW;

return LocalTransactionState.UNKNON;

}

/**

*@param msg

*@return

*/

@Override

public LocalTransactionstate checkLocalTransaction(MessageExt msg){

return null;

}

});

producer.start();

如果MQ没有收到消息的回查,则有:

public LocalTransactionState checkLocalTransacti(MessageExt msg){

System.out.println("消息的Tag:"+msg.getTags()); 

return LocalTransactionState.COMMIT_MESSAGE;

}

在消费者这边,同样要保持主题是一样的,启动生产者和消费者。与原来的区别就是要创建事务的生产者。

相关文章
|
5月前
|
消息中间件 存储 RocketMQ
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
|
2月前
|
消息中间件 存储 缓存
RocketMQ发送消息原理(含事务消息)
本文深入探讨了RocketMQ发送消息的原理,包括生产者端的发送流程、Broker端接收和处理消息的流程,以及事务消息的特殊处理机制,提供了对RocketMQ消息发送机制全面的理解。
RocketMQ发送消息原理(含事务消息)
|
4月前
|
消息中间件 IDE 数据库
RocketMQ事务消息学习及刨坑过程
RocketMQ事务消息学习及刨坑过程
|
5月前
|
消息中间件 RocketMQ
MQ与本地事务一致性问题---RocketMQ事务型消息
MQ与本地事务一致性问题---RocketMQ事务型消息
65 2
|
消息中间件 Kafka 数据库
【JavaP6大纲】分布式事务篇:MQ 事务消息
【JavaP6大纲】分布式事务篇:MQ 事务消息
112 0
|
消息中间件 存储 监控
七种常见分布式事务详解(2PC、3PC、TCC、Saga、本地事务表、MQ事务消息、最大努力通知)
七种常见分布式事务详解(2PC、3PC、TCC、Saga、本地事务表、MQ事务消息、最大努力通知)
1671 0
|
消息中间件 存储 网络协议
【视频】事务消息| 学习笔记
快速学习【视频】事务消息
【视频】事务消息| 学习笔记
|
存储 Java 数据库
消息类型-事务消息|学习笔记
快速学习消息类型-事务消息
155 0
消息类型-事务消息|学习笔记
|
消息中间件 存储 网络协议
多类型业务消息专题-事务消息 | 学习笔记
快速学习多类型业务消息专题-事务消息
133 0
多类型业务消息专题-事务消息 | 学习笔记
|
消息中间件 存储 数据库
解析 RocketMQ 业务消息——“事务消息”
本篇文章通过拆解 RocketMQ 事务消息的使用场景、基本原理、实现细节和实战使用,帮助大家更好的理解和使用 RocketMQ 的事务消息。
解析 RocketMQ 业务消息——“事务消息”