事务消息的实现|学习笔记

简介: 快速学习事务消息的实现

开发者学堂课程【RocketMQ知识精讲与项目实战(第一阶段)事务消息的实现】学习笔记,与课程紧密联系,让用户快速学习知识

课程地址:https://developer.aliyun.com/learning/course/702/detail/12389


事务消息的实现

 

内容介绍:

一、创建生产者和消费者

二、设置监听器

三、执行三种情况

 

一、创建生产者和消费者

通过代码将事务消息实现,先将生产者和消费者先创建出来,然后在 producer 中创建 TransactionMQproducer 也就是事务的消息生产者。

//1.创建消息生产者 producer,并制定生产者组名

DefaultMQProducer producer = new DefaultMQProducer("group1");

//2.指定 Nameserver.地址producer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876");

//3.启动 producer

producer.start();

for (int i =0; i <10; i++) {

//4.创建消息对象,指定主售 Topic、Tag 和消息体

/**

*参数一,消息主心 Topic

*参数二:消息 Tag

*参数三:消息内

*/

image.png

Message msg = new Message( topic: "base", taqs: "Tag1",(“Hello World"+i).getBytes());

//5.发送消

SendResult result=producer.send(msg);

//发送状态

Sendstatus status = result.getsendstatus(); System.out.println(“发送结果:"+result);

//线程1秒

TimeUnit.SECONDS.sleep(1);

)

//6.关闭生产者 producer

producer,shutdown();

 

二、设置监听器

启动之前要设置消息事务的监听器,因此要添上这一部分:

(TransactionMQProducer) producer).setTransactionListener();

在流程中第二步消息MQ接收到半消息之后,就可以执行本地事务了,而告诉是提

交还是回滚需要一个入口,所以这个监听器也就是处理本地事务的一个入口。

//加事务监听器

producer.setTransactionListener(new TransactionListener(){

/**

*方法中执行本地事务

*@param msg

*@param arg*

*@return

*/

@Override

public LocalTransactionstate execute falTransaction(Message msg, object arg

}

return null;}

/**

*@param msg

*@return

*/

@Override

publicLocalTransactionstate checkLocalTransaction(Messageext msg) {

return null;

}

});

//3.启动producer

Producer.start();

在启动消费者之前,要先构建消息对象,将topic改为

transactiontopic,发送消息也需要一并修改。

 

三、执行三种情况

举例设置三个不同的tag消息“taga”,”tagb“,”tagc”,在执行本地事务处设

置一个判断:有提交、回滚、不做处理三个情况

publicLocalTnansactionstate executelocalTransaction(Message msg, Object arg) {

If(StringUtils.equals("TAGA",msg.getTags())){

return localTransactionstate.COMMIT MESSAGE;

}else if(stringutils.equals(“TAGB",msg.getTags())){

return LocalTransactionState.ROLLBACK_MESSAGE;

}else if(stringutils.equals(“TAGc",msg.getTags())){

return LocalTransactionState.UNKNOW;

return LocalTransactionState.UNKNON;

}

/**

*@param msg

*@return

*/

@Override

public LocalTransactionstate checkLocalTransaction(MessageExt msg){

return null;

}

});

producer.start();

如果MQ没有收到消息的回查,则有:

public LocalTransactionState checkLocalTransacti(MessageExt msg){

System.out.println("消息的Tag:"+msg.getTags()); 

return LocalTransactionState.COMMIT_MESSAGE;

}

在消费者这边,同样要保持主题是一样的,启动生产者和消费者。与原来的区别就是要创建事务的生产者。

相关文章
|
18天前
|
消息中间件 存储 Kafka
被问到MQ消息已丢失,该如何处理?
在分布式系统中,消息中间件(如RabbitMQ、Kafka等)用于解耦生产者和消费者,确保数据传输的可靠性和顺序性。尽管有多种措施防止消息丢失,如消息持久化、手动确认机制和重试机制,但消息丢失仍可能发生。本文探讨了四种常见丢失场景及补救措施:1. 生产者发送消息失败;2. 消息在传输过程中丢失;3. 消息中间件内部丢失;4. 消费者未处理完消息前丢失。针对每种场景,提出了相应的解决方案,如消息重发、本地存储、日志记录、高可用配置、死信队列等,以确保系统的可靠性和稳定性。
24 0
|
8月前
|
消息中间件 存储 RocketMQ
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
|
8月前
|
消息中间件 Kafka API
Kafka Exactly Once 语义实现原理:幂等性与事务消息
Apache Kafka的Exactly-Once语义确保了消息处理的准确性和一致性。通过幂等性和事务消息,Kafka实现了要么全处理要么全不处理的原子性。文章详细解析了Kafka事务的工作流程,包括生产者的幂等性(通过序列号保证),以及事务消息的提交和回滚过程。Kafka事务提供了ACID保证,但存在性能限制,如额外的RPC请求和单生产者只能执行一个事务。此外,事务适用于同集群内的操作,跨集群时原子性无法保证。了解这些原理有助于开发者更好地利用Kafka事务构建可靠的数据处理系统。
197 3
 Kafka Exactly Once 语义实现原理:幂等性与事务消息
|
7月前
|
消息中间件 IDE 数据库
RocketMQ事务消息学习及刨坑过程
RocketMQ事务消息学习及刨坑过程
|
8月前
|
消息中间件 RocketMQ
MQ与本地事务一致性问题---RocketMQ事务型消息
MQ与本地事务一致性问题---RocketMQ事务型消息
95 2
|
消息中间件 存储 Kafka
MQ保证消息幂等机制
MQ保证消息幂等机制
277 0
|
消息中间件 Kafka 测试技术
MQ 学习日志(七) 保证消息消费的顺序性
保证消息消费的顺序性
195 0
|
消息中间件 Kafka 数据库
【JavaP6大纲】分布式事务篇:MQ 事务消息
【JavaP6大纲】分布式事务篇:MQ 事务消息
121 0
|
消息中间件 存储 监控
七种常见分布式事务详解(2PC、3PC、TCC、Saga、本地事务表、MQ事务消息、最大努力通知)
七种常见分布式事务详解(2PC、3PC、TCC、Saga、本地事务表、MQ事务消息、最大努力通知)
2168 0
|
消息中间件 存储 缓存
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务