1.环境
RocketMQ版本:4.7.1
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.1</version> </dependency>
2.生产者、消费者的模式
生产者有三种生产模式:
- 同步,生产者等待broker的响应后再往下走。
- 异步,生产者不等待broker的响应,继续往下走,broker的响应通过事件监听的方式,触发回调函数,通知生产者。
- 单向,生产者只管发,不管broker的响应,这种模式没办法做消息丢失后的补救
消费者有两种消费模式:
- 主动拉取
- 等待推送
生产者示例:
public static void main(String[] args) throws MQClientException, InterruptedException { //创建消费者,创建的时候可以指定该消费者属于哪个消费者组 DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); //指定name server的地址 producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876"); producer.start(); //发送一千条信息 for (int i = 0; i < 1000; i++) { try { //消息,topic为TopicTest,后面跟的一串是tag Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //同步发送 SendResult sendResult = producer.send(msg); /**异步发送,通过自定义回调函数的方式来触发响应 producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { countDownLatch.countDown(); System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } @Override public void onException(Throwable e) { countDownLatch.countDown(); System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }**/ System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); }
消费者示例:
推模式:
public static void main(String[] args) throws InterruptedException, MQClientException { //创建消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); //设置name server consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); }
拉模式:
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>(); public static void main(String[] args) throws MQClientException { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.start(); Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("broker-a"); for (MessageQueue mq : mqs) { System.out.printf("Consume from the queue: %s%n", mq); SINGLE_MQ: while (true) { try { PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); System.out.printf("%s%n", pullResult); putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break SINGLE_MQ; case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e) { e.printStackTrace(); } } } consumer.shutdown(); } private static long getMessageQueueOffset(MessageQueue mq) { Long offset = OFFSE_TABLE.get(mq); if (offset != null) return offset; return 0; } private static void putMessageQueueOffset(MessageQueue mq, long offset) { OFFSE_TABLE.put(mq, offset); } }
3.顺序消息
有些场景中我们需要保证消息的有序性,比如以下场景:
1.开通会员,将积分初始化为10分
2.完成赚取积分的操作,+5分
3.完成违规操作,-10分
最后剩余积分5分
如果消息乱序为321,最后积分为10分。
生产者将消息顺序的发到一个MessageQueue上,然后消费者去一个MessageQueue上拿消息,就能保证消息的顺序性。
RocketMQ支持生产者、消费者在生产、消费的时候指定MessageQueue,但是具体怎样利用这一点来实现顺序消费,需要开发人员去手写。
生产者:
for (int i = 0; i < 100; i++) { for(int j=0;i<5;i++) { //每一个订单用同一个orderId int orderId = i; Message msg = new Message("TopicTest","order_"+orderId, "KEY" + orderId, ("order_" + orderId+"step"+j).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { /** * 回调函数 * @param mqs broker中的所有MessageQueue * @param msg 发送的消息 * @param arg 就是传过来的orderId * @return */ @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; //每个订单的message用同一个orderId,必然会选中同一个MessageQueue,自然会顺序存进去 int index = id % mqs.size(); return mqs.get(index); } }, orderId); System.out.printf("%s%n", sendResult); } }
消费者:
//MessageListenerOrderly这种类型的监听器,会让consumer一直去读一个messagequeue的内容,一直读完为止 consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); for (MessageExt msg : msgs) { System.out.println("收到的消息内容:"+new String(msg.getBody())); } return ConsumeOrderlyStatus.SUCCESS; } });
4.广播消息
默认情况下,topic中的一条消息只会被一个消费者所消费。广播模式,topic中的一条消息,可以被订阅该topic的所有消费者消费。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //设置为广播模式 consumer.setMessageModel(MessageModel.BROADCASTING); consumer.subscribe("TopicTest", "TagA || TagC || TagD"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Broadcast Consumer Started.%n"); }
5.延迟消息
延迟消息,即指定消息在MQ的一个驻留时间,过多少时间后,过了驻留时间,消费者才能消费到消息。
延迟消息可以用来做定时任务。
API上来说,就是在生产者端,给消息指定一个延迟级别即可:
//messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h //3即第三个等级,10s msg.setDelayTimeLevel(3);
很明显上面的延迟级别都是定死的,如果我们要用来做定时任务,需要自定义延迟级别。开源版的RocketMQ不支持自定义延迟级别,只有商业版(阿里云上部署的RocketMQ)才支持。
开源版的RocketMQ只能自己去改源码,所以这里也是各个公司做自己定制化的RocketMQ时,一个改造的重点
6.批量消息
RocketMQ支持生产者批量发送消息,以此去减少生产者的网络IO,批量消息只和生产者有关,消费者仍然是按照一条一条的去消费的。
public class SimpleBatchProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName"); producer.start(); //If you just send messages of no more than 1MiB at a time, it is easy to use batch //Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support String topic = "BatchTest"; List<Message> messages = new ArrayList<>(); messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes())); messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes())); messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes())); producer.send(messages); } }
7.过滤消息
通过tag,消费者可以消费同一个topic下自己感兴趣的消息
生产者:
public class TagFilterProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC"}; for (int i = 0; i < 60; i++) { Message msg = new Message("TagFilterTest", tags[i % tags.length], "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } producer.shutdown(); }
消费者:
public class TagFilterConsumer { public static void main(String[] args) throws InterruptedException, MQClientException, IOException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); consumer.subscribe("TagFilterTest", "TagA || TagC"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
8.事务消息
事务消息,即发送到MQ的消息能和本地事务一起被回滚。
MQ消息和业务之间其实会存在类似于分布式事务的一致性问题,举个例子:
以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。很明显这几个步骤的调用都应该是在一个事务中的,这些调用都可以用MQ做成移步的,那么问题就来了。这些业务中一旦有一个业务失败,其它业务应该同时失败,但是消息已经发出去了怎么办?
这时候事务消息就派上用场了。
事务消息的生产者在发送消息时,会将消息转为一个half(半消息),并存入RocketMQ内部的一个RMQ_SYS_TRANS_HALF_TOPIC这个Topic,这个Topic对消费者是不可见的,当本地事务执行成功后会向MQ发送commit信号,MQ再将消息转为原本的Topic,当本地事务执行失败后会向MQ发送roll_back信号,MQ会丢弃对应消息。如果本地事务没有执行完,可以向MQ发送一个unknown信号,MQ收到unknow信号后,会让对应的消息等待,并且定期回查状态为unknown的消息。
自定义本地事务的提交逻辑和检查逻辑:
public class TransactionListenerImpl implements TransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { int value = transactionIndex.getAndIncrement(); int status = value % 3; localTrans.put(msg.getTransactionId(), status); return LocalTransactionState.UNKNOW; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { Integer status = localTrans.get(msg.getTransactionId()); if (null != status) { switch (status) { case 0: return LocalTransactionState.UNKNOW; case 1: return LocalTransactionState.COMMIT_MESSAGE; case 2: return LocalTransactionState.ROLLBACK_MESSAGE; default: return LocalTransactionState.COMMIT_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; } }
生产者:
public class TransactionProducer { public static void main(String[] args) throws MQClientException, InterruptedException { TransactionListener transactionListener = new TransactionListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 10; i++) { try { Message msg = new Message("TopicTest1234", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); Thread.sleep(10); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } } for (int i = 0; i < 100000; i++) { Thread.sleep(1000); } producer.shutdown(); } }