RocketMq消费者/生产者配置

简介: RocketMq消费者/生产者配置

在我们处理业务逻辑的时候,选择不同的消费模式

广播消费模式

生产者发送的消息会被每个消费者都消费一次。也就是说,如果有m条消息和n个消费者,那么每个消费者都会消费这m条消息,总共会消费mn次。这种模式适用于每个消费者实例都需要独立处理所有消息的场景,比如系统通知和广告消息,以确保每个消费者实例都能收到并处理这些消息。

集群消费模式

而集群消费模式是RocketMQ的默认消费模式。在集群消费模式下,多个消费者实例共同消费同一个消费组的消息。消息会均匀分发给每个消费者实例,每个消费者实例只处理其中的一部分消息。这种模式适用于一组消费者实例协同处理同一个消息队列的场景,比如电商平台的订单消息,多个消费者实例可以同时消费订单消息,每个消费者实例处理其中一部分订单,从而提高消息的处理效率。

引入依赖:

<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>

消费者:

package com.yun.greedy.modules.util;
 
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.UnsupportedEncodingException;
import java.util.List;
 
/**
 * RocketMq消费者配置
 * @author yangqiusheng
 */
@Slf4j
@Data
public abstract class BaseConsumer {
    private DefaultMQPushConsumer defaultMQPushConsumer;
    private String namesrvAddr;
    protected String consumerGroup;
    protected String topic;
    protected String topicTag;
    protected String consumeFromWhere;
    //消费模式
    protected String messageModel;
    //是否开启消费端
    protected String registFlag;
    //设置编码格式
    protected String encoding;
//    public static String encoding = System.getProperty("file.encoding");
    /**
     * 初始化配置(producerGroup,topic,topicTag)
     */
    protected abstract void initParam();
 
    /**
     * 消费处理
     *
     * @param msg
     */
    protected abstract void onMessage(String msg);
 
    @PostConstruct
    private void init() throws MQClientException {
        initParam();
        //客户端消费是否开启
        if ("0".equals(registFlag)) {
            return;
        }
 
        this.log.info("服务器编码为:" + System.getProperty("file.encoding"));
        log.info("consumerGroup:" + consumerGroup + " namesrvAddr:" + namesrvAddr + " initialize!");
 
        // ConsumerGroupName需要由应用来保证唯一
        defaultMQPushConsumer = new DefaultMQPushConsumer(consumerGroup);
        defaultMQPushConsumer.setNamesrvAddr(namesrvAddr);
        defaultMQPushConsumer.setInstanceName(String.valueOf(System.currentTimeMillis()));
 
        // 订阅指定Topic下的topicTag
        log.info("consumerGroup:" + consumerGroup + " topic:" + topic + " ,topicTag:" + topicTag);
        defaultMQPushConsumer.subscribe(topic, topicTag);
        //从消费队列最小偏移量开始消费。
        if ("CONSUME_FROM_FIRST_OFFSET".equals(consumeFromWhere)) {
            this.defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            //从队列的最大偏移量开始消费
        } else if ("CONSUME_FROM_LAST_OFFSET".equals(consumeFromWhere)) {
            this.defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            //从指定时间戳开始消费
        } else if ("CONSUME_FROM_TIMESTAMP".equals(consumeFromWhere)) {
            this.defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
        } else {
            this.defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        }
 
        if ("CLUSTERING".equals(this.messageModel)) {//集群
            this.defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
        } else if ("BROADCASTING".equals(this.messageModel)) {//广播
            this.defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
        } else {
            this.defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
        }
 
        // 设置为集群消费
        log.info("consumerGroup:" + consumerGroup + " consumerModel:" + MessageModel.CLUSTERING);
 
        // 通过匿名消息监听处理消息消费
        defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
 
            // 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                MessageExt msg = msgs.get(0);
                if (msg.getTopic().equals(topic) && msg.getTags() != null && msg.getTags().equals(topicTag)) {
                    // 执行topic下对应tag的消费逻辑
                    log.info("consumerGroup:" + consumerGroup + " MsgId:" + msg.getMsgId() + " begin to consume!");
                    try {
                        onMessage(new String(msg.getBody(), encoding));
                    } catch (UnsupportedEncodingException e) {
                        log.error("系统不支持消息编码格式:" + encoding);
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                    log.info("consumerGroup:" + consumerGroup + " MsgId:" + msg.getMsgId() + " was done!");
                }
                // 如果没有return success ,consumer会重新消费该消息,直到return success
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
 
        // Consumer对象在使用之前必须要调用start初始化,初始化一次即可
        defaultMQPushConsumer.start();
 
        log.info("consumerGroup:" + consumerGroup + " namesrvAddr:" + namesrvAddr + "  start success!");
    }
 
    @PreDestroy
    public void destroy() {
        defaultMQPushConsumer.shutdown();
    }
}

消费者处理:

 
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
 
import java.util.HashMap;
import java.util.Map;
 
/**
 * 规则刷新消费者
 * @author yangqiusheng
 */
@Slf4j
@Service
public class RocketMQCacheConsumer extends BaseConsumer {
    @Value(value = "${rocketmq.registry.namesrvAddr}")
    private String namesrvAddr;
 
 
    @Override
    protected void initParam() {
        super.consumerGroup = MQConstants.MQ_GROUP_NEWRULEENGINE;
        super.topic = MQConstants.MQ_GROUP_TOPIC_CACHE;
        super.topicTag = MQConstants.MQ_GROUP_TOPICTAG_CACHE;
        super.messageModel = MQConstants.MQ_MESSAGEMODEL_BROADCASTING;
        super.consumeFromWhere = MQConstants.MQ_CONSUMEFROMWHERE_CFLO;
        super.encoding = MQConstants.MQ_UTF_8;
        super.setNamesrvAddr(namesrvAddr);
    }
 
    @Override
    protected void onMessage(String msg) {
        log.info("############RocketMQCacheConsumer rocketmq开始接收消息:{}#################", msg);
        try {
            if (CommonFunctions.isNotEmpty(msg)) {
                //消费者逻辑代码在这写
            }
        } catch (Exception e) {
            e.printStackTrace();
            log.error("RocketMQCacheConsumer    rocketmq刷新缓存失败!", e);
        }
    }
 
}

生产者:

package com.yun.greedy.modules.util;
 
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
 
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.UnsupportedEncodingException;
 
/**
 * RocketMq生产者配置
 * @author yangqiusheng
 */
@Slf4j
@Data
public abstract class BaseProducer {
    private DefaultMQProducer defaultMQProducer;
    protected String producerGroup;
    private String namesrvAddr;
    protected String topic;
    protected String topicTag;
    protected long timeout = 10000;
    //编码格式
    protected String encoding;
//    public static String encoding = System.getProperty("file.encoding");
 
    /**
     * 初始化配置
     */
    protected abstract void initParam();
 
    @PostConstruct
    private void init() throws MQClientException {
        initParam();
        this.log.info("服务器编码为:" + System.getProperty("file.encoding"));
        this.log.info("producerGroup:" + this.producerGroup + " namesrvAddr:" + this.namesrvAddr + " initialize!");
 
        this.defaultMQProducer = new DefaultMQProducer(this.producerGroup);
        this.defaultMQProducer.setNamesrvAddr(this.namesrvAddr);
        this.defaultMQProducer.setInstanceName(String.valueOf(System.currentTimeMillis()));
 
        this.log
                .info("producerGroup:" + this.producerGroup + " topic:" + this.topic + " ,topicTag:" + this.topicTag);
        this.defaultMQProducer.start();
 
        this.log
                .info("producerGroup:" + this.producerGroup + " namesrvAddr:" + this.namesrvAddr + "  start success!");
    }
 
    public void sendMessage(String msg) throws Exception {
        this.sendMessage(null, 0, msg);
    }
 
    /**
     * <p>
     * description:发送消息
     * </p>
     *
     * @param keys
     * @param msg
     * @throws Exception
     * @author ex_liuruili
     * @see
     */
    public void sendMessage(String keys, String msg) throws Exception {
        this.sendMessage(keys, 0, msg);
    }
 
 
    public void sendMessage(String keys, int delayTimeLevel, String msg) throws Exception {
        Message sendMsg;
        try {
            this.log.info("服务器编码为:" + System.getProperty("file.encoding"));
            if (StringUtils.isBlank(keys)) {
                keys = this.topicTag;
            }
            sendMsg = new Message(this.topic, this.topicTag, keys, msg.getBytes(encoding));
            if (delayTimeLevel >= 1 && delayTimeLevel <= 18) {
                sendMsg.setDelayTimeLevel(delayTimeLevel);
            }
 
        } catch (UnsupportedEncodingException e1) {
            throw new Exception("发送MQ消息队列出错!" + e1.getMessage());
        }
        SendResult sendResult = null;
        int count = 1;
        boolean status = false;
        while (count <= 3) {
            try {
                sendResult = this.defaultMQProducer.send(sendMsg, timeout);
                log.info("第" + count + "次消息发送:" + sendResult);
 
                if ((sendResult != null) && (sendResult.getSendStatus() == SendStatus.SEND_OK)) {
                    status = true;
                    break;
                }
            } catch (MQClientException e) {
                this.log.error("MQ发送消息第" + count + "次发送失败!失败原因:" + e.getMessage() + sendResult);
            } catch (RemotingException e) {
                this.log.error("MQ发送消息第" + count + "次发送失败!失败原因:" + e.getMessage() + sendResult);
            } catch (MQBrokerException e) {
                this.log.error("MQ发送消息第" + count + "次发送失败!失败原因:" + e.getMessage() + sendResult);
            } catch (InterruptedException e) {
                this.log.error("MQ发送消息第" + count + "次发送失败!失败原因:" + e.getMessage() + sendResult);
            } finally {
                count++;
            }
        }
 
        if (status != true) {
            this.log.error(
                    "发送消息队列topic:" + this.topic + " topicTag:" + this.topicTag + " message:" + msg + "失败!");
            throw new Exception("发送MQ消息队列出错!");
        }
    }
 
    @PreDestroy
    private void destroy() {
        this.defaultMQProducer.shutdown();
    }
}

生产者实现:

 
 
import cn.axa.ruleengine.common.exception.RRException;
import cn.axa.ruleengine.modules.rocketmq.util.BaseProducer;
import cn.axa.ruleengine.modules.rocketmq.util.MQConstants;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
 
import java.util.Map;
 
/**
 * 生产者
 * @author yangqiusheng
 */
@Slf4j
@Service
public class RocketMQCacheProduct extends BaseProducer {
    @Value(value = "${rocketmq.registry.namesrvAddr}")
    private String namesrvAddr;
 
    @Override
    protected void initParam() {
        super.producerGroup = MQConstants.MQ_GROUP_NEWRULEENGINE;
        super.topic = MQConstants.MQ_GROUP_TOPIC_CACHE;
        super.topicTag = MQConstants.MQ_GROUP_TOPICTAG_CACHE;
        super.encoding = MQConstants.MQ_UTF_8;
        super.setNamesrvAddr(namesrvAddr);
    }
 
    /**
     * 消息发送
     *
     * @param map
     */
    public void sendRuleCache(Map map) {
        try {
            String producerMsg = JSONObject.toJSON(map).toString();
            log.info("####################RocketMQCacheProduct  提供者信息:{}#######################", producerMsg);
            sendMessage(producerMsg);
        } catch (Exception e) {
            e.printStackTrace();
            throw new RRException("RocketMQCacheProduct 提供者上传队列信息失败!" + e.getMessage());
        }
    }
}


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件 Linux API
centos7 安装rabbitmq自定义版本及配置
centos7 安装rabbitmq自定义版本及配置
ly~
|
3月前
|
消息中间件 搜索推荐 大数据
一般情况下在 RocketMQ 中添加 access key 的步骤: 一、确定配置文件位置 RocketMQ 的配置文件通常位于安装目录下的 conf 文件夹中。你需要找到 broker.conf 或相关的配置文件。 二、编辑配置文件 打开配置文件,查找与 ACL(访问控制列表)相关的配置部分。 在配置文件中添加以下内容:
大数据广泛应用于商业、金融、医疗和政府等多个领域。在商业上,它支持精准营销、客户细分及流失预测,并优化供应链管理;金融领域则利用大数据进行风险评估、市场预测及欺诈检测;医疗行业通过大数据预测疾病、提供个性化治疗;政府运用大数据进行城市规划和公共安全管理;工业领域则借助大数据进行设备维护、故障预测及质量控制。
ly~
157 2
|
4月前
|
消息中间件 存储 负载均衡
我服了,RocketMQ消费者负载均衡内核是这样设计的
文章为理解RocketMQ的负载均衡机制提供了深入的技术洞察,并对如何在实际应用中扩展和定制负载均衡策略提供了有价值的见解。
我服了,RocketMQ消费者负载均衡内核是这样设计的
|
4月前
|
消息中间件 负载均衡 API
RocketMQ生产者负载均衡(轮询机制)核心原理
文章深入分析了RocketMQ生产者的负载均衡机制,特别是轮询机制的实现原理,揭示了如何通过`ThreadLocal`技术和消息队列的选播策略来确保消息在多个队列之间均衡发送,以及如何通过灵活的API支持自定义负载均衡策略。
|
4月前
|
消息中间件 存储 负载均衡
RocketMQ消费者消费消息核心原理(含长轮询机制)
这篇文章深入探讨了Apache RocketMQ消息队列中消费者消费消息的核心原理,特别是长轮询机制。文章从消费者和Broker的交互流程出发,详细分析了Push和Pull两种消费模式的内部实现,以及它们是如何通过长轮询机制来优化消息消费的效率。文章还对RocketMQ的消费者启动流程、消息拉取请求的发起、Broker端处理消息拉取请求的流程进行了深入的源码分析,并总结了RocketMQ在设计上的优点,如单一职责化和线程池的使用等。
RocketMQ消费者消费消息核心原理(含长轮询机制)
|
4月前
|
消息中间件 SQL 监控
RocketMQ 5.3.0 版本中 Broker IP 配置为 IPv6 的情况
【8月更文第28天】RocketMQ 是一款分布式消息中间件,支持多种消息发布和订阅模式。在 RocketMQ 5.3.0 版本中,Broker 的配置文件 `broker.conf` 允许配置 IPv6 地址。当 Broker 的 `brokerIP1` 配置为 IPv6 地址时,会对 Broker 的启动、消息推送和状态监控等方面产生影响。本文将探讨如何在 RocketMQ 中配置 IPv6 地址,并检查 Broker 的状态。
245 0
|
4月前
|
消息中间件 缓存 Java
RocketMQ - 消费者消费方式
RocketMQ - 消费者消费方式
114 0
|
4月前
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
80 0
|
4月前
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
67 0
|
4月前
|
消息中间件 存储 缓存
RocketMQ - 消费者启动机制
RocketMQ - 消费者启动机制
56 0