1. 有序性分类
根据有序范围的不同,可以分为两种消息的有序性:分区有序和全局有序
分区有序
有多个Queue参与,其仅可保证在该Queue分区队列上的消息顺序,称为分区有序
在定义Producer时我们可以指定消息队列选择器,而这个选择器是我们自己实现了MessageQueueSelector接口定义的。在定义选择器的选择算法时,一般需要使用选择key。这个选择key可以是消息key也可以是其它数据。但无论谁做选择key,都不能重复,都是唯一的。
一般性的选择算法是,让选择key(或其hash值)与该Topic所包含的Queue的数量取模,其结果即为选择出的Queue的QueueId。
取模算法存在一个问题:不同选择key与Queue数量取模结果可能会是相同的,即不同选择key的消息可能会出现在相同的Queue,即同一个Consuemr可能会消费到不同选择key的消息。这个问题如何解决?一般性的作法是,从消息中获取到选择key,对其进行判断。若是当前Consumer需要消费的消息,则直接消费,否则,什么也不做。这种做法要求选择key要能够随着消息一起被Consumer获取到。此时使用消息key作为选择key是比较好的做法。
全局有序
当发送和消费参与的Queue只有一个时所保证的有序是整个Topic中消息的顺序,称为全局有序
在创建Topic时指定Queue的数量。有三种指定方式:
在代码中创建Producer时,可以指定其自动创建的Topic的Queue数量
在RocketMQ可视化控制台中手动创建Topic时指定Queue数量
使用mqadmin命令手动创建Topic时指定Queue数量
2. 生产者业务接口
public interface OrderMessageService {
/**
* 发送同步顺序消息
* @param id
* @param message
*/
void sendSyncOrderMessage(String id, String message);
/**
* 发送异步顺序消息
* @param id
* @param message
*/
void sendAsyncOrderMessage(String id, String message);
/**
* 发送单向顺序消息
* @param id
* @param message
*/
void sendOnewayOrderMessage(String id, String message);
}
3. 生产者业务接口实现类
@Service
public class OrderMessageServiceImpl implements OrderMessageService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
private static final Logger logger = LoggerFactory.getLogger(OrderMessageServiceImpl.class);
@Override
public void sendSyncOrderMessage(String id, String message) {
Message<String> strMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, id).build();
rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, org.apache.rocketmq.common.message.Message message, Object obj) {
Integer uid = Integer.valueOf(String.valueOf(obj));
int index = uid % list.size();
return list.get(index);
}
});
SendResult result = rocketMQTemplate.syncSendOrderly("order-message-topic:sync-tags", strMessage, id);
if (result.getSendStatus() == SendStatus.SEND_OK) {
logger.info("发送同步顺序消息成功!");
} else {
logger.error("发送同步顺序消息失败!消息ID为:{}", result.getMsgId());
}
}
@Override
public void sendAsyncOrderMessage(String id, String message) {
Message<String> strMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, id).build();
rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, org.apache.rocketmq.common.message.Message message, Object obj) {
Integer uid = (Integer) obj;
int index = uid % list.size();
return list.get(index);
}
});
rocketMQTemplate.asyncSendOrderly("order-message-topic:async-tags", strMessage, id, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
logger.info("发送异步顺序消息成功!消息ID为:{}", sendResult.getMsgId());
}
}
@Override
public void onException(Throwable throwable) {
logger.info("发送异步顺序消息失败!失败原因为:{}", throwable.getMessage());
}
});
}
@Override
public void sendOnewayOrderMessage(String id, String message) {
Message<String> strMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, id).build();
rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, org.apache.rocketmq.common.message.Message message, Object obj) {
Integer uid = (Integer) obj;
int index = uid % list.size();
return list.get(index);
}
});
rocketMQTemplate.sendOneWayOrderly("order-message-topic:oneway-tags", strMessage, id);
}
}
4. 消费者类
@Component
@RocketMQMessageListener(topic = "order-message-topic", consumerGroup = "order-consumer-group", consumeMode = ConsumeMode.ORDERLY)
public class OrderMessageListener implements RocketMQListener<String> {
private static final Logger logger = LoggerFactory.getLogger(OrderMessageListener.class);
@Override
public void onMessage(String message) {
logger.info("接收到顺序消息为:{}", message);
}
}
5. 测试
@Test
void orderMessage() {
for (int i = 1; i < 5; i++) {
orderMessageService.sendSyncOrderMessage(String.valueOf(i), "hello" + i);
}
}