开发者学堂课程【RocketMQ知识精讲与项目实战(第一阶段):事务消息的实现】学习笔记,与课程紧密联系,让用户快速学习知识
课程地址:https://developer.aliyun.com/learning/course/702/detail/12389
事务消息的实现
内容介绍:
一、创建生产者和消费者
二、设置监听器
三、执行三种情况
一、创建生产者和消费者
通过代码将事务消息实现,先将生产者和消费者先创建出来,然后在 producer 中创建 TransactionMQproducer 也就是事务的消息生产者。
//1.创建消息生产者 producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.指定 Nameserver.地址producer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876");
//3.启动 producer
producer.start();
for (int i
=0
; i
<
10; i++) {
//4.创建消息对象,指定主售 Topic、Tag 和消息体
/**
*参数一,消息主心 Topic
*参数二:消息 Tag
*参数三:消息内容
*/
Message msg = new Message( topic: "base", ta
q
s: "Tag1",(“Hello World"+i).getBytes());
//5.发送消息
SendResult result=producer.send(msg);
//发送状态
Sendstatus status = result.getsendstatus(); System.out.println(“发送结果:"+result);
//线程睡1秒
TimeUnit.SECONDS.sleep(1);
)
//6.关闭生产者 producer
producer,shutdown();
二、设置监听器
启动之前要设置消息事务的监听器,因此要添上这一部分:
(TransactionMQProducer) producer).setTransactionListener();
在流程中第二步消息MQ接收到半消息之后,就可以执行本地事务了,而告诉是提
交还是回滚需要一个入口,所以这个监听器也就是处理本地事务的一个入口。
//添加事务监听器
producer.setTransactionListener(new TransactionListener(){
/**
*在该方法中执行本地事务
*
@param
msg
*
@param arg*
*@
return
*/
@
Overr
i
de
public LocalTransactionstate execute falTransaction(Message msg, object arg
;
}
return null;}
/
**
*
@param
msg
*@
return
*/
@
Overr
i
de
publicLocalTransactionstate
checkLocalTransaction(Messageext msg) {
return null;
}
});
//3.启动producer
Producer
.
start();
在启动消费者之前,要先构建消息对象,将topic改为
transactiontopic,发送消息也需要一并修改。
三、执行三种情况
举例设置三个不同的tag消息“taga”,”tagb“,”tagc”,在执行本地事务处设
置一个判断:有提交、回滚、不做处理三个情况
publicLocalTnansactionstate
executelocalTransaction(Message msg, Object arg)
{
If(
StringUtils.equals("TAGA",msg.getTags()))
{
return localTransactionstate.COMMIT MESSAGE;
}
else if(stringutils.equals(“TAGB",msg.getTags()))
{
return LocalTransactionState.ROLLBACK_MESSAGE;
}
else if(stringutils.equals(“TAGc",msg.getTags()))
{
return LocalTransactionState.UNKNOW;
return LocalTransactionState.UNKNON
;
}
/**
*
@param
msg
*@
return
*/
@
Overr
i
de
public LocalTransactionstate checkLocalTransaction(MessageExt msg)
{
return nul
l
;
}
});
producer.start();
如果MQ没有收到消息的回查,则有:
public LocalTransactionState checkLocalTransacti(MessageExt msg)
{
System.out.println
("消息的Tag:"+msg.getTags());
return LocalTransactionState.COMMIT
_
MESSAGE;
}
在消费者这边,同样要保持主题是一样的,启动生产者和消费者。与原来的区别就是要创建事务的生产者。