🍊 RocketMQ的消息类型
那我们来逐一连接下RocketMQ都支持哪些类型的消息:
🎉 同步发送、异步发送以及单向发送
基本样例部分我们使用消息生产者分别通过三种方式发送消息,同步发送、异步发送以及单向发送。
然后使用消费者来消费这些消息。
📝 1、同步发送消息的样例见:org.apache.rocketmq.example.simple.Producer
发送完消息之后,等待消息返回后再继续进行下面的操作。(消息发送最慢)
package org.apache.rocketmq.example.simple; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; //简单样例:同步发送消息 public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("ip:9876"); producer.start(); for (int i = 0; i < 20; i++) try { { Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); //同步传递消息,消息会发给集群中的一个Broker节点。 SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }
📝 2、异步发送消息的样例见:org.apache.rocketmq.example.simple.AsyncProducer
发完消息之后就去做自己的事情了,但是会给客户端一个回调方法,把消息发送的结果给到客户端。这里引入了一个countDownLatch来保证所有消息回调方法都执行完了再关闭Producer。 所以从这里可以看出,RocketMQ的Producer也是一个服务端,在往Broker发送消息的时候也要作为服务端提供服务。
package org.apache.rocketmq.example.simple; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.io.UnsupportedEncodingException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; //简单样例:异步发送消息 public class AsyncProducer { public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException { DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test"); producer.setNamesrvAddr("ip:9876"); producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); int messageCount = 100; //由于是异步发送,这里引入一个countDownLatch,保证所有Producer发送消息的回调方法都执行完了再停止Producer服务。 final CountDownLatch countDownLatch = new CountDownLatch(messageCount); for (int i = 0; i < messageCount; i++) { try { final int index = i; Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { countDownLatch.countDown(); System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } @Override public void onException(Throwable e) { countDownLatch.countDown(); System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); System.out.println("消息发送完成"); } catch (Exception e) { e.printStackTrace(); } } countDownLatch.await(5, TimeUnit.SECONDS); producer.shutdown(); } }
📝 3、单向发送消息的样例:
关键点就是使用producer.sendOneWay方式来发送消息,这个方法没有返回值,也没有回调。就是只管把消息发出去就行了。(消息发送最快)
public class OnewayProducer { public static void main(String[] args) throws Exception{ //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // Specify name server addresses.可以在代码中指定,也可以在环境变量中配置,和jdk配置环境变量类似 producer.setNamesrvAddr("ip:9876"); //Launch the instance. producer.start(); for (int i = 0; i < 100; i++) { //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. //核心:发送消息。没有返回值,发完消息就不管了,不知道有没有发送消息成功 producer.sendOneway(msg); } //Wait for sending to complete Thread.sleep(5000); producer.shutdown(); } }
📝 4、使用消费者消费消息
消费者消费消息有两种模式
一种是消费者主动去Broker上拉取消息的拉模式。
一种是消费者等待Broker把消息推送过来的推模式。
🔥 拉模式的样例见:org.apache.rocketmq.example.simple.PullConsumer
package org.apache.rocketmq.example.simple; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageQueue; import java.util.HashMap; import java.util.Map; import java.util.Set; public class PullConsumer { //偏移量 private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>(); public static void main(String[] args) throws MQClientException { //过期的方法 DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("PullConsumer_1"); consumer.setNamesrvAddr("ip:9876"); consumer.start(); Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("Topic_1"); for (MessageQueue mq : mqs) { System.out.printf("Consume from the queue: %s%n", mq); SINGLE_MQ: while (true) { try { //第一个参数:mq的消息队列,第二个参数:过滤,第三个参数:偏移量,第四个参数:一次拉取多少条消息 PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); System.out.printf("%s%n", pullResult); //自己维护偏移量 putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break SINGLE_MQ; case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e) { e.printStackTrace(); } } } consumer.shutdown(); } //获取偏移量 private static long getMessageQueueOffset(MessageQueue mq) { Long offset = OFFSE_TABLE.get(mq); if (offset != null) return offset; return 0; } //存储偏移量 private static void putMessageQueueOffset(MessageQueue mq, long offset) { OFFSE_TABLE.put(mq, offset); } }
上面那种方式需要自己维护偏移量,不够友好,所以下面这种方式改进了
package org.apache.rocketmq.example.simple; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class LitePullConsumerSubscribe { public static volatile boolean running = true; public static void main(String[] args) throws Exception { DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test"); litePullConsumer.setNamesrvAddr("ip:9876"); litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); litePullConsumer.subscribe("TopicTest", "*"); litePullConsumer.start(); try { while (running) { //直接去拉就可以了 List<MessageExt> messageExts = litePullConsumer.poll(); System.out.printf("%s%n", messageExts); } } finally { litePullConsumer.shutdown(); } } }
上面那种就是傻瓜式的一次拉取32条,无法定制化拉取指定某一个区间的消息,所以下面这种又进行了定制化调整
package org.apache.rocketmq.example.simple; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import java.util.ArrayList; import java.util.Collection; import java.util.List; public class LitePullConsumerAssign { public static volatile boolean running = true; public static void main(String[] args) throws Exception { DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("PullConsumer_1"); litePullConsumer.setAutoCommit(false); litePullConsumer.start(); //拉取这个主题里面的队列 Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("Topic_1"); List<MessageQueue> list = new ArrayList<>(mqSet); //这一步就是过滤一部分队列,取其中一部分的队列 List<MessageQueue> assignList = new ArrayList<>(); for (int i = 0; i < list.size() / 2; i++) { assignList.add(list.get(i)); } //把队列分配给这个消费者客户端 litePullConsumer.assign(assignList); //取第一个队列,从偏移量为10的起点开始消费消息 litePullConsumer.seek(assignList.get(0), 10); try { while (running) { //默认拉取32条 List<MessageExt> messageExts = litePullConsumer.poll(); System.out.printf("%s %n", messageExts); litePullConsumer.commitSync(); } } finally { litePullConsumer.shutdown(); } } }
🔥 推模式的样例见:org.apache.rocketmq.example.simple.PushConsumer
package org.apache.rocketmq.example.simple; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class PushConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer_1"); consumer.setNamesrvAddr("ip:9876"); consumer.subscribe("Topic_1", "*");//第二个参数就是过滤方式 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //wrong time format 2018_0522_221800 consumer.setConsumeTimestamp("20220117221800"); //注册一个消息监听 consumer.registerMessageListener(new MessageListenerConcurrently() { //消费消息,有消息来了之后就会由broker往这里面推消息 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
通常情况下,用推模式比较简单。实际上RocketMQ的推模式也是由拉模式封装出来的。4.7.1版本中DefaultMQPullConsumerImpl这个消费者类已标记为过期,但是还是可以使用的。替换的类是DefaultLitePullConsumerImpl。
🎉 顺序消息
顺序消息生产者样例见:org.apache.rocketmq.example.ordermessage.Producer
package org.apache.rocketmq.example.ordermessage; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; import java.util.List; public class Producer { public static void main(String[] args) throws UnsupportedEncodingException { try { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("ip:9876"); producer.start(); //业务场景:我有十个订单,每个订单有六个步骤,现在需要按照固定的步骤一条条的发送消息 for (int i = 0; i < 10; i++) { int orderId = i; for(int j = 0 ; j <= 5 ; j ++){ Message msg = new Message("OrderTopicTest", "order_"+orderId, "KEY" + orderId, ("order_"+orderId+" step " + j).getBytes(RemotingHelper.DEFAULT_CHARSET)); //消息队列的选择器 SendResult sendResult = producer.send(msg, new MessageQueueSelector() { //第一个参数:所有的消息,第二个参数:发送的消息,第三个参数:根据什么发送,这里面传的是orderId @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); //获取订单id进行取模,取其中一个消息 return mqs.get(index); } //同一个订单id可以放到同一个队列里面去 }, orderId); System.out.printf("%s%n", sendResult); } } producer.shutdown(); } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { e.printStackTrace(); } } }
顺序消息消费者样例见:org.apache.rocketmq.example.ordermessage.Consumer
package org.apache.rocketmq.example.ordermessage; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.*; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; public class Consumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3"); consumer.setNamesrvAddr("ip:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe("OrderTopicTest", "*"); //MessageListenerOrderly是可以保证消息顺序消费的,因为它是一个队列一个队列的去拿消息的 consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { //自动提交 context.setAutoCommit(true); for(MessageExt msg:msgs){ System.out.println("收到消息内容 "+new String(msg.getBody())); } return ConsumeOrderlyStatus.SUCCESS; } }); // MessageListenerConcurrently是保证不了最终消费顺序的,因为他会出现同一个主题拿多个队列的消息,有可能第一个队列拿一大部分,第二个队列拿一小部分,这样是没法保证消息的顺序消费的 // 全局顺序可以通过一个主题里面只有一个队列,来保证在消费端队列里面的消息可以顺序消费。 // consumer.registerMessageListener(new MessageListenerConcurrently() { // @Override // public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { // for(MessageExt msg:msgs){ // System.out.println("收到消息内容 "+new String(msg.getBody())); // } // return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // } // }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
验证时,可以启动多个Consumer实例,观察下每一个订单的消息分配以及每个订单下多个步骤的消费顺序。不管订单在多个Consumer实例之前是如何分配的,每个订单下的多条消息顺序都是固定从0~5的。RocketMQ保证的是消息的局部有序,而不是全局有序。
MessageListenerOrderly是可以保证最终消费顺序的,它是一个队列一个队列的去拿消息的
MessageListenerConcurrently是保证不了最终消费顺序的,不保证全局有序,只保证局部有序
先从控制台上看下List mqs是什么。再回看我们的样例,实际上,RocketMQ也只保证了每个OrderID的所有消息有序(发到了同一个queue),而并不能保证所有消息都有序。所以这就涉及到了RocketMQ消息有序的原理。要保证最终消费到的消息是有序的,需要从Producer、Broker、Consumer三个步骤都保证消息有序才行。
首先在发送者端:在默认情况下,消息发送者会采取Round Robin轮询方式把消息发送到不同的MessageQueue(分区队列),而消费者消费的时候也从多个MessageQueue上拉取消息,这种情况下消息是不能保证顺序的。而只有当一组有序的消息发送到同一个MessageQueue上时,才能利用MessageQueue先进先出的特性保证这一组消息有序。而Broker中一个队列内的消息是可以保证有序的。
然后在消费者端:消费者会从多个消息队列上去拿消息。这时虽然每个消息队列上的消息是有序的,但是多个队列之间的消息仍然是乱序的。消费者端要保证消息有序,就需要按队列一个一个来取消息,即取完一个队列的消息后,再去取下一个队列的消息。而给consumer注入的MessageListenerOrderly对象,在RocketMQ内部就会通过锁队列的方式保证消息是一个一个队列来取的。MessageListenerConcurrently这个消息监听器则不会锁队列,每次都是从多个Message中取一批数据(默认不超过32条)。因此也无法保证消息有序。
RocketMQ 在默认情况下不保证顺序,要保证全局顺序,需要把 Topic 的读写队列数设置为 1,然后生产者和消费者的并发设置也是 1,不能使用多线程。所以这样的话 高并发,高吞吐量的功能完全用不上。
全局顺序消息 对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布 和消费。 分区顺序消息 对于指定的一个 Topic,所有消息根据 Sharding Key 进行区块分区。同一个分 区内的消息按照严格的 FIFO 顺序进行发布和消费。Sharding Key 是顺序消息中用 来区分不同分区的关键字段,和普通消息的 Message Key 是完全不同的概念。
- 全局有序就是无论发的是不是同一个分区,我都可以按照你生产的顺序来消费
- 分区有序就只针对发到同一个分区的消息可以顺序消费
🎉 广播消息
广播消息的消息生产者样例见:org.apache.rocketmq.example.broadcast.PushConsumer
package org.apache.rocketmq.example.broadcast; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import java.util.List; public class PushConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.setNamesrvAddr("ip:9876"); //重点是这个消费模式 consumer.setMessageModel(MessageModel.BROADCASTING); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Broadcast Consumer Started.%n"); } }
广播消息并没有特定的消息消费者样例,这是因为这涉及到消费者的集群消费模式。在集群状态(MessageModel.CLUSTERING)下,每一条消息只会被同一个消费者组中的一个实例消费到(这跟kafka和rabbitMQ的集群模式是一样的)。而广播模式则是把消息发给了所有订阅了对应主题的消费者,而不管消费者是不是同一个消费者组。