前言
大家好,我是小郭,在当下的分布式服务中,消息队列中间件是一个解决服务之间耦合的利器,今天我们来瞧一瞧开源的RocketMQ消息中间件,他的消费端是如何启动的,以及在使用他的过程中有哪些配置。
启动流程图
Push和Pull的区别
Apache RocketMQ在消费者服务中,为我们提供了Push模式也提供了Pull模式
那他们主要有什么区别呢?
- Push是服务端主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。
- Pull是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。
BROADCASTING 和 CLUSTERING 模式的区别
BROADCASTING(广播模式):当使用广播消费模式时,RocketMQ 会将每条消息推送给消费组所有的消费者,保证消息至少被每个消费者消费一次。
CLUSTERING(集群模式):当使用集群消费模式时,RocketMQ 认为任意一条消息只需要被消费组内的任意一个消费者处理即可。
Consumer启动流程
我们直接跑一个官方提供的Demo,大家也可以去官网上去下载源码
public static void main(String[] args) throws InterruptedException, MQClientException { /* * Instantiate with specified consumer group name. * 消费者模式有两种 推和拉 */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); consumer.setNamesrvAddr("127.0.0.1:9876"); /* * Specify where to start in case the specific consumer group is a brand-new one. * 指定消费从哪里开始 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); /* * Subscribe one more topic to consume. * 设置监听主题以及过滤条件 */ consumer.subscribe("TopicTest999", "*"); /* * Register callback to execute on arrival of messages fetched from brokers. * 注册消息监听器 */ consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { //System.out.println("待消费条数:"+ msgs.size()); LOGGER.info("Receive New Messages : {}", Thread.currentThread().getName()); /*try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); }*/ LOGGER.info("success"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /* * Launch the consumer instance. */ consumer.start(); System.out.printf("Consumer Started.%n"); }
前置设置
- 指定namesrvAddr地址
consumer.setNamesrvAddr("127.0.0.1:9876");
- 指定消费从哪里开始
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- CONSUME_FROM_LAST_OFFSET 从最新的消息开始消费
- CONSUME_FROM_FIRST_OFFSET 从最新的位点开始消费
- CONSUME_FROM_TIMESTAMP 从指定的时间戳开始消费
- 指定负载均衡策略
- AllocateMessageQueueAveragely:平均连续分配算法。
- AllocateMessageQueueAveragelyByCircle:平均轮流分配算法。
- AllocateMachineRoomNearby:机房内优先就近分配。
- AllocateMessageQueueByConfig:手动指定,这个通常需要配合配置中心,在消费者启动时,首先先创建 AllocateMessageQueueByConfig 对象,然后根据配置中心的配置,再根据当前的队列信息,进行分配,即该方法不具备队列的自动负载,在 Broker 端进行队列扩容时,无法自动感知,需要手动变更配置。
// AllocateMessageQueueByConfig AllocateMessageQueueByConfig allocateMessageQueueByConfig = new AllocateMessageQueueByConfig(); MessageQueue messageQueue = new MessageQueue(); messageQueue.setBrokerName("broker-a"); messageQueue.setQueueId(2); messageQueue.setTopic("TopicTest"); allocateMessageQueueByConfig.setMessageQueueList(Collections.singletonList(messageQueue)); consumer.setAllocateMessageQueueStrategy(allocateMessageQueueByConfig);
- AllocateMessageQueueByMachineRoom:消费指定机房中的队列,该分配算法首先需要调用该策略的 setConsumeridcs(Set consumerIdCs) 方法,用于设置需要消费的机房,将刷选出来的消息按平均连续分配算法进行队列负载。
// AllocateMessageQueueByMachineRoom AllocateMachineRoomNearby.MachineRoomResolver machineRoomResolver = new AllocateMachineRoomNearby.MachineRoomResolver() { // Broker部署 @Override public String brokerDeployIn(MessageQueue messageQueue) { System.out.println(messageQueue.getBrokerName().split("-")[0]); return messageQueue.getBrokerName().split("-")[0]; } // 消费端部署 @Override public String consumerDeployIn(String clientID) { System.out.println(clientID.split("-")[0]); return clientID.split("-")[0]; } }; consumer.setAllocateMessageQueueStrategy(new AllocateMachineRoomNearby(new AllocateMessageQueueAveragely(), machineRoomResolver));
- 设置监听主题以及过滤条件
Tag过滤,用于对某个Topic下的消息进行分类,
消息发送到名称为TopicTest999的Topic中,被各个不同的系统所订阅,我们可以利用Tag来区分
consumer.subscribe("TopicTest999", "order"); consumer.subscribe("TopicTest999", "user");
- 注册消息监听器
注册消息监听器的目的就是为了接收消息,RocketMQ本身为我们提供了两种模式
- 并发消费
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { LOGGER.info("Receive New Messages : {}", Thread.currentThread().getName()); /*try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); }*/ LOGGER.info("success"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });
- 顺序消费
consumer.registerMessageListener(new MessageListenerOrderly() { AtomicLong consumeTimes = new AtomicLong(0); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); this.consumeTimes.incrementAndGet(); if ((this.consumeTimes.get() % 2) == 0) { return ConsumeOrderlyStatus.SUCCESS; } else if ((this.consumeTimes.get() % 5) == 0) { context.setSuspendCurrentQueueTimeMillis(3000); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } return ConsumeOrderlyStatus.SUCCESS; } });
他们主要的区别继承MessageListener接口的实现
除了这一些重要的参数以外,RocketMQ为我们提供了其他非常丰富的配置,我总结在了下图
启动流程源码跟踪
需要注意的是,在配置后我们才能去调用启动方法
1. 设置消费者分组后,DefaultMQPushConsumer调用start()启动消费者
入口:org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#start
@Override public void start() throws MQClientException { // step 1 设置消费者分组 setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup)); this.defaultMQPushConsumerImpl.start(); if (null != traceDispatcher) { try { traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); } catch (MQClientException e) { log.warn("trace dispatcher start failed ", e); } } }
2. 根据serviceState状态启动消费者,当服务未创建时,才能启动成功
在这里主要做了五件事
- 检查核心参数是否都配置了
private void checkConfig() throws MQClientException { // 检查消费者组,是否满足条件 Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup()); if (null == this.defaultMQPushConsumer.getConsumerGroup()) { ... } if (this.defaultMQPushConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) { ... } if (null == this.defaultMQPushConsumer.getMessageModel()) { ... } if (null == this.defaultMQPushConsumer.getConsumeFromWhere()) { ... } // allocateMessageQueueStrategy // subscription // messageListener // consumeThreadMin // consumeThreadMax // consumeConcurrentlyMaxSpan // pullThresholdForQueue // pullThresholdForTopic // pullThresholdSizeForQueue // pullInterval // consumeMessageBatchMaxSize // pullBatchSize
主要是进行了参数配置的校验,如果一些参数设置不合理的,在这里就会抛出异常,终止了消费者服务的启动,这里的配置对后面的使用会产生一定的影响,所以我们在配置的时候需要更加的谨慎
- 复制订阅信息,生成重试主题
private void copySubscription() throws MQClientException { try { Map<String, String> sub = this.defaultMQPushConsumer.getSubscription(); if (sub != null) { for (final Map.Entry<String, String> entry : sub.entrySet()) { final String topic = entry.getKey(); final String subString = entry.getValue(); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString); // 更新内部订阅关系 this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); } } if (null == this.messageListenerInner) { this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener(); } // 默认情况下我们是CLUSTERING模式 switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: break; case CLUSTERING: // 创建重试主题 final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL); // 将重试主题放入订阅关系容器中 this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData); break; default: break; } } catch (Exception e) { throw new MQClientException("subscription exception", e); } }
获取配置的订阅关系,因为setSubscription()方法已经被作废,subscription都是为空的,在下面他会去维护一个subscriptionInner
protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner = new ConcurrentHashMap<String, SubscriptionData>();
subscriptionInner保存了我们在前置配置的时候插入的订阅关系