在我们处理业务逻辑的时候,选择不同的消费模式
广播消费模式
生产者发送的消息会被每个消费者都消费一次。也就是说,如果有m条消息和n个消费者,那么每个消费者都会消费这m条消息,总共会消费mn次。这种模式适用于每个消费者实例都需要独立处理所有消息的场景,比如系统通知和广告消息,以确保每个消费者实例都能收到并处理这些消息。
集群消费模式
而集群消费模式是RocketMQ的默认消费模式。在集群消费模式下,多个消费者实例共同消费同一个消费组的消息。消息会均匀分发给每个消费者实例,每个消费者实例只处理其中的一部分消息。这种模式适用于一组消费者实例协同处理同一个消息队列的场景,比如电商平台的订单消息,多个消费者实例可以同时消费订单消息,每个消费者实例处理其中一部分订单,从而提高消息的处理效率。
引入依赖:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency>
消费者:
package com.yun.greedy.modules.util; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.io.UnsupportedEncodingException; import java.util.List; /** * RocketMq消费者配置 * @author yangqiusheng */ @Slf4j @Data public abstract class BaseConsumer { private DefaultMQPushConsumer defaultMQPushConsumer; private String namesrvAddr; protected String consumerGroup; protected String topic; protected String topicTag; protected String consumeFromWhere; //消费模式 protected String messageModel; //是否开启消费端 protected String registFlag; //设置编码格式 protected String encoding; // public static String encoding = System.getProperty("file.encoding"); /** * 初始化配置(producerGroup,topic,topicTag) */ protected abstract void initParam(); /** * 消费处理 * * @param msg */ protected abstract void onMessage(String msg); @PostConstruct private void init() throws MQClientException { initParam(); //客户端消费是否开启 if ("0".equals(registFlag)) { return; } this.log.info("服务器编码为:" + System.getProperty("file.encoding")); log.info("consumerGroup:" + consumerGroup + " namesrvAddr:" + namesrvAddr + " initialize!"); // ConsumerGroupName需要由应用来保证唯一 defaultMQPushConsumer = new DefaultMQPushConsumer(consumerGroup); defaultMQPushConsumer.setNamesrvAddr(namesrvAddr); defaultMQPushConsumer.setInstanceName(String.valueOf(System.currentTimeMillis())); // 订阅指定Topic下的topicTag log.info("consumerGroup:" + consumerGroup + " topic:" + topic + " ,topicTag:" + topicTag); defaultMQPushConsumer.subscribe(topic, topicTag); //从消费队列最小偏移量开始消费。 if ("CONSUME_FROM_FIRST_OFFSET".equals(consumeFromWhere)) { this.defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //从队列的最大偏移量开始消费 } else if ("CONSUME_FROM_LAST_OFFSET".equals(consumeFromWhere)) { this.defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //从指定时间戳开始消费 } else if ("CONSUME_FROM_TIMESTAMP".equals(consumeFromWhere)) { this.defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP); } else { this.defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); } if ("CLUSTERING".equals(this.messageModel)) {//集群 this.defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING); } else if ("BROADCASTING".equals(this.messageModel)) {//广播 this.defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING); } else { this.defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING); } // 设置为集群消费 log.info("consumerGroup:" + consumerGroup + " consumerModel:" + MessageModel.CLUSTERING); // 通过匿名消息监听处理消息消费 defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() { // 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { MessageExt msg = msgs.get(0); if (msg.getTopic().equals(topic) && msg.getTags() != null && msg.getTags().equals(topicTag)) { // 执行topic下对应tag的消费逻辑 log.info("consumerGroup:" + consumerGroup + " MsgId:" + msg.getMsgId() + " begin to consume!"); try { onMessage(new String(msg.getBody(), encoding)); } catch (UnsupportedEncodingException e) { log.error("系统不支持消息编码格式:" + encoding); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } log.info("consumerGroup:" + consumerGroup + " MsgId:" + msg.getMsgId() + " was done!"); } // 如果没有return success ,consumer会重新消费该消息,直到return success return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // Consumer对象在使用之前必须要调用start初始化,初始化一次即可 defaultMQPushConsumer.start(); log.info("consumerGroup:" + consumerGroup + " namesrvAddr:" + namesrvAddr + " start success!"); } @PreDestroy public void destroy() { defaultMQPushConsumer.shutdown(); } }
消费者处理:
import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.util.HashMap; import java.util.Map; /** * 规则刷新消费者 * @author yangqiusheng */ @Slf4j @Service public class RocketMQCacheConsumer extends BaseConsumer { @Value(value = "${rocketmq.registry.namesrvAddr}") private String namesrvAddr; @Override protected void initParam() { super.consumerGroup = MQConstants.MQ_GROUP_NEWRULEENGINE; super.topic = MQConstants.MQ_GROUP_TOPIC_CACHE; super.topicTag = MQConstants.MQ_GROUP_TOPICTAG_CACHE; super.messageModel = MQConstants.MQ_MESSAGEMODEL_BROADCASTING; super.consumeFromWhere = MQConstants.MQ_CONSUMEFROMWHERE_CFLO; super.encoding = MQConstants.MQ_UTF_8; super.setNamesrvAddr(namesrvAddr); } @Override protected void onMessage(String msg) { log.info("############RocketMQCacheConsumer rocketmq开始接收消息:{}#################", msg); try { if (CommonFunctions.isNotEmpty(msg)) { //消费者逻辑代码在这写 } } catch (Exception e) { e.printStackTrace(); log.error("RocketMQCacheConsumer rocketmq刷新缓存失败!", e); } } }
生产者:
package com.yun.greedy.modules.util; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.io.UnsupportedEncodingException; /** * RocketMq生产者配置 * @author yangqiusheng */ @Slf4j @Data public abstract class BaseProducer { private DefaultMQProducer defaultMQProducer; protected String producerGroup; private String namesrvAddr; protected String topic; protected String topicTag; protected long timeout = 10000; //编码格式 protected String encoding; // public static String encoding = System.getProperty("file.encoding"); /** * 初始化配置 */ protected abstract void initParam(); @PostConstruct private void init() throws MQClientException { initParam(); this.log.info("服务器编码为:" + System.getProperty("file.encoding")); this.log.info("producerGroup:" + this.producerGroup + " namesrvAddr:" + this.namesrvAddr + " initialize!"); this.defaultMQProducer = new DefaultMQProducer(this.producerGroup); this.defaultMQProducer.setNamesrvAddr(this.namesrvAddr); this.defaultMQProducer.setInstanceName(String.valueOf(System.currentTimeMillis())); this.log .info("producerGroup:" + this.producerGroup + " topic:" + this.topic + " ,topicTag:" + this.topicTag); this.defaultMQProducer.start(); this.log .info("producerGroup:" + this.producerGroup + " namesrvAddr:" + this.namesrvAddr + " start success!"); } public void sendMessage(String msg) throws Exception { this.sendMessage(null, 0, msg); } /** * <p> * description:发送消息 * </p> * * @param keys * @param msg * @throws Exception * @author ex_liuruili * @see */ public void sendMessage(String keys, String msg) throws Exception { this.sendMessage(keys, 0, msg); } public void sendMessage(String keys, int delayTimeLevel, String msg) throws Exception { Message sendMsg; try { this.log.info("服务器编码为:" + System.getProperty("file.encoding")); if (StringUtils.isBlank(keys)) { keys = this.topicTag; } sendMsg = new Message(this.topic, this.topicTag, keys, msg.getBytes(encoding)); if (delayTimeLevel >= 1 && delayTimeLevel <= 18) { sendMsg.setDelayTimeLevel(delayTimeLevel); } } catch (UnsupportedEncodingException e1) { throw new Exception("发送MQ消息队列出错!" + e1.getMessage()); } SendResult sendResult = null; int count = 1; boolean status = false; while (count <= 3) { try { sendResult = this.defaultMQProducer.send(sendMsg, timeout); log.info("第" + count + "次消息发送:" + sendResult); if ((sendResult != null) && (sendResult.getSendStatus() == SendStatus.SEND_OK)) { status = true; break; } } catch (MQClientException e) { this.log.error("MQ发送消息第" + count + "次发送失败!失败原因:" + e.getMessage() + sendResult); } catch (RemotingException e) { this.log.error("MQ发送消息第" + count + "次发送失败!失败原因:" + e.getMessage() + sendResult); } catch (MQBrokerException e) { this.log.error("MQ发送消息第" + count + "次发送失败!失败原因:" + e.getMessage() + sendResult); } catch (InterruptedException e) { this.log.error("MQ发送消息第" + count + "次发送失败!失败原因:" + e.getMessage() + sendResult); } finally { count++; } } if (status != true) { this.log.error( "发送消息队列topic:" + this.topic + " topicTag:" + this.topicTag + " message:" + msg + "失败!"); throw new Exception("发送MQ消息队列出错!"); } } @PreDestroy private void destroy() { this.defaultMQProducer.shutdown(); } }
生产者实现:
import cn.axa.ruleengine.common.exception.RRException; import cn.axa.ruleengine.modules.rocketmq.util.BaseProducer; import cn.axa.ruleengine.modules.rocketmq.util.MQConstants; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.util.Map; /** * 生产者 * @author yangqiusheng */ @Slf4j @Service public class RocketMQCacheProduct extends BaseProducer { @Value(value = "${rocketmq.registry.namesrvAddr}") private String namesrvAddr; @Override protected void initParam() { super.producerGroup = MQConstants.MQ_GROUP_NEWRULEENGINE; super.topic = MQConstants.MQ_GROUP_TOPIC_CACHE; super.topicTag = MQConstants.MQ_GROUP_TOPICTAG_CACHE; super.encoding = MQConstants.MQ_UTF_8; super.setNamesrvAddr(namesrvAddr); } /** * 消息发送 * * @param map */ public void sendRuleCache(Map map) { try { String producerMsg = JSONObject.toJSON(map).toString(); log.info("####################RocketMQCacheProduct 提供者信息:{}#######################", producerMsg); sendMessage(producerMsg); } catch (Exception e) { e.printStackTrace(); throw new RRException("RocketMQCacheProduct 提供者上传队列信息失败!" + e.getMessage()); } } }