第 4 章 RocketMQ应用
四、事务消息
1 问题引入
这里的一个需求场景是:工行用户A向建行用户B转账 1 万元。
我们可以使用同步消息来处理该需求场景:
- 工行系统发送一个给B增款 1 万元的同步消息M给Broker
- 消息被Broker成功接收后,向工行系统发送成功ACK
- 工行系统收到成功ACK后从用户A中扣款 1 万元
- 建行系统从Broker中获取到消息M
建行系统消费消息M,即向用户B中增加 1 万元
这其中是有问题的:若第 3 步中的扣款操作失败,但消息已经成功发送到了Broker。对于MQ来说,只要消息写入成功,那么这个消息就可以被消费。此时建行系统中用户B增加了 1 万元。出现了数据不一致问题。
2 解决思路
解决思路是,让第 1 、 2 、 3 步具有原子性,要么全部成功,要么全部失败。即消息发送成功后,必须要保证扣款成功。如果扣款失败,则回滚发送成功的消息。而该思路即使用事务消息
。这里要使用分布式事务
解决方案。
使用事务消息来处理该需求场景:
- 事务管理器TM向事务协调器TC发起指令,开启
全局事务
- 工行系统发一个给B增款 1 万元的事务消息M给TC
- TC会向Broker发送
半事务消息prepareHalf
,将消息M预提交
到Broker。此时的建行系统是看不到Broker中的消息M的 - Broker会将预提交执行结果Report给TC。
- 如果预提交失败,则TC会向TM上报预提交失败的响应,全局事务结束;如果预提交成功,TC会调用工行系统的
回调操作
,去完成工行用户A的预扣款
1 万元的操作 - 工行系统会向TC发送预扣款执行结果,即
本地事务
的执行状态 TC收到预扣款执行结果后,会将结果上报给TM。
预扣款执行结果存在三种可能性:
// 描述本地事务执行状态public enum LocalTransactionState { COMMIT_MESSAGE, // 本地事务执行成功 ROLLBACK_MESSAGE, // 本地事务执行失败 UNKNOW, // 不确定,表示需要进行回查以确定本地事务的执行结果}
- TM会根据上报结果向TC发出不同的确认指令
- 若预扣款成功(本地事务状态为COMMIT_MESSAGE),则TM向TC发送Global Commit指令
- 若预扣款失败(本地事务状态为ROLLBACK_MESSAGE),则TM向TC发送Global Rollback指令
- 若现未知状态(本地事务状态为UNKNOW),则会触发工行系统的本地事务状态
回查操作
。回查操作会将回查结果,即COMMIT_MESSAGE或ROLLBACK_MESSAGE Report给TC。TC将结果上报给TM,TM会再向TC发送最终确认指令Global Commit或Global Rollback
- TC在接收到指令后会向Broker与工行系统发出确认指令
- TC接收的若是Global Commit指令,则向Broker与工行系统发送Branch Commit指令。此时Broker中的消息M才可被建行系统看到;此时的工行用户A中的扣款操作才真正被确认
TC接收到的若是Global Rollback指令,则向Broker与工行系统发送Branch Rollback指令。此时Broker中的消息M将被撤销;工行用户A中的扣款操作将被回滚
以上方案就是为了确保消息投递与扣款操作能够在一个事务中,要成功都成功,有一个失败,则全部回滚。
以上方案并不是一个典型的XA模式。因为XA模式中的分支事务是异步的,而事务消息方案中的消息预提交与预扣款操作间是同步的。
3 基础
分布式事务
对于分布式事务,通俗地说就是,一次操作由若干分支操作组成,这些分支操作分属不同应用,分布在不同服务器上。分布式事务需要保证这些分支操作要么全部成功,要么全部失败。分布式事务与普通事务一样,就是为了保证操作结果的一致性。
事务消息
RocketMQ提供了类似X/Open XA的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA是一种分布式事务解决方案,一种分布式事务处理模式。
半事务消息
暂不能投递的消息,发送方已经成功地将消息发送到了Broker,但是Broker未收到最终确认指令,此时该消息被标记成“暂不能投递”状态,即不能被消费者看到。处于该种状态下的消息即半事务消息。
本地事务状态
Producer回调操作
执行的结果为本地事务状态,其会发送给TC,而TC会再发送给TM。TM会根据TC发送来的本地事务状态来决定全局事务确认指令。
// 描述本地事务执行状态
public enum LocalTransactionState {
COMMIT_MESSAGE, // 本地事务执行成功
ROLLBACK_MESSAGE, // 本地事务执行失败
UNKNOW, // 不确定,表示需要进行回查以确定本地事务的执行结果
}
消息回查
消息回查,即重新查询本地事务的执行状态。本例就是重新到DB中查看预扣款操作是否执行成功。
注意,消息回查不是重新执行回调操作。回调操作是进行预扣款操作,而消息回查则是查看预扣款操作执行的结果。引发消息回查的原因最常见的有两个:
1)回调操作返回UNKNWON
2)TC没有接收到TM的最终全局事务确认指令
RocketMQ中的消息回查设置
关于消息回查,有三个常见的属性设置。它们都在broker加载的配置文件中设置,例如:
- transactionTimeout=20,指定TM在 20 秒内应将最终确认状态发送给TC,否则引发消息回查。默认为 60 秒
- transactionCheckMax=5,指定最多回查 5 次,超过后将丢弃消息并记录错误日志。默认 15 次。
- transactionCheckInterval=10,指定设置的多次消息回查的时间间隔为 10 秒。默认为 60 秒。
4 XA模式三剑客
XA协议
XA(Unix Transaction)是一种分布式事务解决方案,一种分布式事务处理模式,是基于XA协议的。XA协议由Tuxedo(Transaction for Unix has been Extended for Distributed Operation,分布式操作扩展之后的Unix事务系统)首先提出的,并交给X/Open组织,作为资源管理器与事务管理器的接口标准。
XA模式中有三个重要组件:TC、TM、RM。
TC
Transaction Coordinator,事务协调者。维护全局和分支事务的状态,驱动全局事务提交或回滚。
RocketMQ中Broker充当着TC。
TM
Transaction Manager,事务管理器。定义全局事务的范围:开始全局事务、提交或回滚全局事务。它实际是全局事务的发起者。
RocketMQ中事务消息的Producer充当着TM。
RM
Resource Manager,资源管理器。管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
RocketMQ中事务消息的Producer及Broker均是RM。
5 XA模式架构
XA模式是一个典型的2PC,其执行原理如下:
- TM向TC发起指令,开启一个全局事务。
- 根据业务要求,各个RM会逐个向TC注册分支事务,然后TC会逐个向RM发出预执行指令。
- 各个RM在接收到指令后会在进行本地事务预执行。
- RM将预执行结果Report给TC。当然,这个结果可能是成功,也可能是失败。
TC在接收到各个RM的Report后会将汇总结果上报给TM,根据汇总结果TM会向TC发出确认指令。
- 若所有结果都是成功响应,则向TC发送Global Commit指令。
- 只要有结果是失败响应,则向TC发送Global Rollback指令。
TC在接收到指令后再次向RM发送确认指令。
事务消息方案并不是一个典型的XA模式。因为XA模式中的分支事务是异步的,而事务消息方案中的消息预提交与预扣款操作间是同步的。
6 注意
- 事务消息不支持延时消息
- 对于事务消息要做好幂等性检查,因为事务消息可能不止一次被消费(因为存在回滚后再提交的情况)
7 代码举例
定义工行事务监听器
public class ICBCTransactionListener implements TransactionListener {
// 回调操作方法
// 消息预提交成功就会触发该方法的执行,用于完成本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg,
Object arg) {
System.out.println("预提交消息成功:" + msg);
// 假设接收到TAGA的消息就表示扣款操作成功,TAGB的消息表示扣款失败,
// TAGC表示扣款结果不清楚,需要执行消息回查
if (StringUtils.equals("TAGA", msg.getTags())) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.equals("TAGB", msg.getTags())) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if (StringUtils.equals("TAGC", msg.getTags())) {
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.UNKNOW;
}
// 消息回查方法
// 引发消息回查的原因最常见的有两个:
// 1)回调操作返回UNKNWON
// 2)TC没有接收到TM的最终全局事务确认指令
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("执行消息回查" + msg.getTags());
return LocalTransactionState.COMMIT_MESSAGE;
}
}
定义事物消息生产者
public class TransactionProducer {
public static void main(String[] args) throws Exception {
TransactionMQProducer producer = new TransactionMQProducer("tpg");
producer.setNamesrvAddr("rocketmqOS:9876");
/**
* 定义一个线程池
* @param corePoolSize 线程池中核心线程数量
* @param maximumPoolSize 线程池中最多线程数
* @param keepAliveTime 这是一个时间。当线程池中线程数量大于核心线程数量是,多余空闲线程的存活时长
* @param unit 时间单位
* @param workQueue 临时存放任务的队列,其参数就是队列的长度
* @param threadFactory 线程工厂
*/
ExecutorService executorService = new ThreadPoolExecutor( 2 , 5 , 100 , TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>( 2000 ), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
// 为生产者指定一个线程池
producer.setExecutorService(executorService);
// 为生产者添加事务监听器
producer.setTransactionListener(new ICBCTransactionListener());
producer.start();
String[] tags = {"TAGA","TAGB","TAGC"};
for (int i = 0 ; i < 3 ; i++) {
byte[] body = ("Hi," + i).getBytes();
Message msg = new Message("TTopic", tags[i], body);
// 发送事务消息
// 第二个参数用于指定在执行本地事务时要使用的业务参数
SendResult sendResult = producer.sendMessageInTransaction(msg,null);
System.out.println("发送结果为:" + sendResult.getSendStatus());
}
}
}
定义消费者
直接使用普通消息的SomeConsumer作为消费者即可。
public class SomeConsumer {
public static void main(String[] args) throws MQClientException {
// 定义一个pull消费者
// DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("cg");
// 定义一个push消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
// 指定nameServer
consumer.setNamesrvAddr("rocketmqOS:9876");
// 指定从第一条消息开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 指定消费topic与tag
consumer.subscribe("TTopic", "*");
// 指定采用“广播模式”进行消费,默认为“集群模式”
// consumer.setMessageModel(MessageModel.BROADCASTING);
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
// 一旦broker中有了其订阅的消息就会触发该方法的执行,
// 其返回值为当前consumer消费的状态
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 逐条消费消息
for (MessageExt msg : msgs) {
System.out.println(msg);
}
// 返回消费状态:消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 开启消费者消费
consumer.start();
System.out.println("Consumer Started");
}
}
五、批量消息
1 批量发送消息
发送限制
生产者进行消息发送时可以一次发送多条消息,这可以大大提升Producer的发送效率。不过需要注意以下几点:
- 批量发送的消息必须具有相同的Topic
- 批量发送的消息必须具有相同的刷盘策略
- 批量发送的消息不能是延时消息与事务消息
批量发送大小
默认情况下,一批发送的消息总大小不能超过4MB字节。如果想超出该值,有两种解决方案:
- 方案一:将批量消息进行拆分,拆分为若干不大于4M的消息集合分多次批量发送
- 方案二:在Producer端与Broker端修改属性
** Producer端需要在发送之前设置Producer的maxMessageSize属性
** Broker端需要修改其加载的配置文件中的maxMessageSize属性
生产者发送的消息大小
生产者通过send()方法发送的Message,并不是直接将Message序列化后发送到网络上的,而是通过这个Message生成了一个字符串发送出去的。这个字符串由四部分构成:Topic、消息Body、消息日志(占 20 字节),及用于描述消息的一堆属性key-value。这些属性中包含例如生产者地址、生产时间、要发送的QueueId等。最终写入到Broker中消息单元中的数据都是来自于这些属性。
2 批量消费消息
修改批量属性
Consumer的MessageListenerConcurrently监听接口的consumeMessage()方法的第一个参数为消息列表,但默认情况下每次只能消费一条消息。若要使其一次可以消费多条消息,则可以通过修改Consumer的consumeMessageBatchMaxSize属性来指定。不过,该值不能超过 32 。因为默认情况下消费者每次可以拉取的消息最多是 32 条。若要修改一次拉取的最大值,则可通过修改Consumer的pullBatchSize属性来指定。
存在的问题
Consumer的pullBatchSize属性与consumeMessageBatchMaxSize属性是否设置的越大越好?当然不是。
- pullBatchSize值设置的越大,Consumer每拉取一次需要的时间就会越长,且在网络上传输出现问题的可能性就越高。若在拉取过程中若出现了问题,那么本批次所有消息都需要全部重新拉取。
- consumeMessageBatchMaxSize值设置的越大,Consumer的消息并发消费能力越低,且这批被消费的消息具有相同的消费结果。因consumeMessageBatchMaxSize指定的一批消息只会使用一个线程进行处理,且在处理过程中只要有一个消息处理异常,则这批消息需要全部重新再次消费处理。
3 代码举例
该批量发送的需求是,不修改最大发送4M的默认值,但要防止发送的批量消息超出4M的限制。
定义消息列表分割器
// 消息列表分割器:其只会处理每条消息的大小不超4M的情况。
// 若存在某条消息,其本身大小大于4M,这个分割器无法处理,
// 其直接将这条消息构成一个子列表返回。并没有再进行分割
public class MessageListSplitter implements Iterator<List<Message>> {
// 指定极限值为4M
private final int SIZE_LIMIT = 4 * 1024 * 1024 ;
// 存放所有要发送的消息
private final List<Message> messages;
// 要进行批量发送消息的小集合起始索引
private int currIndex;
public MessageListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
// 判断当前开始遍历的消息索引要小于消息总数
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
// 记录当前要发送的这一小批次消息列表的大小
int totalSize = 0 ;
for (; nextIndex < messages.size(); nextIndex++) {
// 获取当前遍历的消息
Message message = messages.get(nextIndex);
// 统计当前遍历的message的大小
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20 ;
// 判断当前消息本身是否大于4M
if (tmpSize > SIZE_LIMIT) {
if (nextIndex - currIndex == 0 ) {
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
} // end-for
// 获取当前messages列表的子集合[currIndex, nextIndex)
List<Message> subList = messages.subList(currIndex, nextIndex);
// 下次遍历的开始索引
currIndex = nextIndex;
return subList;
}
}
定义批量消息消费者
public class BatchProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("rocketmqOS:9876");
// 指定要发送的消息的最大大小,默认是4M
// 不过,仅修改该属性是不行的,还需要同时修改broker加载的配置文件中的
// maxMessageSize属性
// producer.setMaxMessageSize(8 * 1024 * 1024);
producer.start();
// 定义要发送的消息集合
List<Message> messages = new ArrayList<>();
for (int i = 0 ; i < 100 ; i++) {
byte[] body = ("Hi," + i).getBytes();
Message msg = new Message("someTopic", "someTag", body);
messages.add(msg);
}
// 定义消息列表分割器,将消息列表分割为多个不超出4M大小的小列表
MessageListSplitter splitter = new MessageListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
}
}
定义批量消息生产者
public class BatchConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
consumer.setNamesrvAddr("rocketmqOS:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("someTopicA", "*");
// 指定每次可以消费 10 条消息,默认为 1
consumer.setConsumeMessageBatchMaxSize( 10 );
// 指定每次可以从Broker拉取 40 条消息,默认为 32
consumer.setPullBatchSize( 40 );
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(msg);
}
// 消费成功的返回结果
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
// 消费异常时的返回结果
// return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}
六、消息过滤
消息者在进行消息订阅时,除了可以指定要订阅消息的Topic外,还可以对指定Topic中的消息根据指定条件进行过滤,即可以订阅比Topic更加细粒度的消息类型。
对于指定Topic消息的过滤有两种过滤方式:Tag过滤与SQL过滤。
1 Tag过滤
通过consumer的subscribe()方法指定要订阅消息的Tag。如果订阅多个Tag的消息,Tag间使用或运算符(双竖线||)连接。
DefaultMQPushConsumer consumer = newDefaultMQPushConsumer("CID_EXAMPLE");consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
2 SQL过滤
SQL过滤是一种通过特定表达式对事先埋入到消息中的用户属性
进行筛选过滤的方式。通过SQL过滤,可以实现对消息的复杂过滤。不过,只有使用PUSH模式
的消费者才能使用SQL过滤。
SQL过滤表达式中支持多种常量类型与运算符。
支持的常量类型:
- 数值:比如: 123 ,3.1415
- 字符:必须用单引号包裹起来,比如:'abc'
- 布尔:TRUE 或 FALSE
- NULL:特殊的常量,表示空
支持的运算符有:
- 数值比较:>,>=,<,<=,BETWEEN,=
- 字符比较:=,<>,IN
- 逻辑运算 :AND,OR,NOT
- NULL判断:IS NULL 或者 IS NOT NULL
默认情况下Broker没有开启消息的SQL过滤功能,需要在Broker加载的配置文件中添加如下属性,以开启该功能:
enablePropertyFilter = true
在启动Broker时需要指定这个修改过的配置文件。例如对于单机Broker的启动,其修改的配置文件是conf/broker.conf,启动时使用如下命令:
sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &
3 代码举例
定义Tag过滤Producer
public class FilterByTagProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("rocketmqOS:9876");
producer.start();
String[] tags = {"myTagA","myTagB","myTagC"};
for (int i = 0 ; i < 10 ; i++) {
byte[] body = ("Hi," + i).getBytes();
String tag = tags[i%tags.length];
Message msg = new Message("myTopic",tag,body);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
producer.shutdown();
}
}
定义Tag过滤Consumer
public class FilterByTagConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("pg");
consumer.setNamesrvAddr("rocketmqOS:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("myTopic", "myTagA || myTagB");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt me:msgs){
System.out.println(me);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}
定义SQL过滤Producer
public class FilterBySQLProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("rocketmqOS:9876");
producer.start();
for (int i = 0 ; i < 10 ; i++) {
try {
byte[] body = ("Hi," + i).getBytes();
Message msg = new Message("myTopic", "myTag", body);
msg.putUserProperty("age", i + "");
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
}
}
定义SQL过滤Consumer
public class FilterBySQLConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("pg");
consumer.setNamesrvAddr("rocketmqOS:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("myTopic", MessageSelector.bySql("age between 0 and 6"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt me:msgs){
System.out.println(me);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}