RocketMq消费者/生产者配置

简介: RocketMq消费者/生产者配置

在我们处理业务逻辑的时候,选择不同的消费模式

广播消费模式

生产者发送的消息会被每个消费者都消费一次。也就是说,如果有m条消息和n个消费者,那么每个消费者都会消费这m条消息,总共会消费mn次。这种模式适用于每个消费者实例都需要独立处理所有消息的场景,比如系统通知和广告消息,以确保每个消费者实例都能收到并处理这些消息。

集群消费模式

而集群消费模式是RocketMQ的默认消费模式。在集群消费模式下,多个消费者实例共同消费同一个消费组的消息。消息会均匀分发给每个消费者实例,每个消费者实例只处理其中的一部分消息。这种模式适用于一组消费者实例协同处理同一个消息队列的场景,比如电商平台的订单消息,多个消费者实例可以同时消费订单消息,每个消费者实例处理其中一部分订单,从而提高消息的处理效率。

引入依赖:

<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>

消费者:

package com.yun.greedy.modules.util;
 
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.UnsupportedEncodingException;
import java.util.List;
 
/**
 * RocketMq消费者配置
 * @author yangqiusheng
 */
@Slf4j
@Data
public abstract class BaseConsumer {
    private DefaultMQPushConsumer defaultMQPushConsumer;
    private String namesrvAddr;
    protected String consumerGroup;
    protected String topic;
    protected String topicTag;
    protected String consumeFromWhere;
    //消费模式
    protected String messageModel;
    //是否开启消费端
    protected String registFlag;
    //设置编码格式
    protected String encoding;
//    public static String encoding = System.getProperty("file.encoding");
    /**
     * 初始化配置(producerGroup,topic,topicTag)
     */
    protected abstract void initParam();
 
    /**
     * 消费处理
     *
     * @param msg
     */
    protected abstract void onMessage(String msg);
 
    @PostConstruct
    private void init() throws MQClientException {
        initParam();
        //客户端消费是否开启
        if ("0".equals(registFlag)) {
            return;
        }
 
        this.log.info("服务器编码为:" + System.getProperty("file.encoding"));
        log.info("consumerGroup:" + consumerGroup + " namesrvAddr:" + namesrvAddr + " initialize!");
 
        // ConsumerGroupName需要由应用来保证唯一
        defaultMQPushConsumer = new DefaultMQPushConsumer(consumerGroup);
        defaultMQPushConsumer.setNamesrvAddr(namesrvAddr);
        defaultMQPushConsumer.setInstanceName(String.valueOf(System.currentTimeMillis()));
 
        // 订阅指定Topic下的topicTag
        log.info("consumerGroup:" + consumerGroup + " topic:" + topic + " ,topicTag:" + topicTag);
        defaultMQPushConsumer.subscribe(topic, topicTag);
        //从消费队列最小偏移量开始消费。
        if ("CONSUME_FROM_FIRST_OFFSET".equals(consumeFromWhere)) {
            this.defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            //从队列的最大偏移量开始消费
        } else if ("CONSUME_FROM_LAST_OFFSET".equals(consumeFromWhere)) {
            this.defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            //从指定时间戳开始消费
        } else if ("CONSUME_FROM_TIMESTAMP".equals(consumeFromWhere)) {
            this.defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
        } else {
            this.defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        }
 
        if ("CLUSTERING".equals(this.messageModel)) {//集群
            this.defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
        } else if ("BROADCASTING".equals(this.messageModel)) {//广播
            this.defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
        } else {
            this.defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
        }
 
        // 设置为集群消费
        log.info("consumerGroup:" + consumerGroup + " consumerModel:" + MessageModel.CLUSTERING);
 
        // 通过匿名消息监听处理消息消费
        defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
 
            // 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                MessageExt msg = msgs.get(0);
                if (msg.getTopic().equals(topic) && msg.getTags() != null && msg.getTags().equals(topicTag)) {
                    // 执行topic下对应tag的消费逻辑
                    log.info("consumerGroup:" + consumerGroup + " MsgId:" + msg.getMsgId() + " begin to consume!");
                    try {
                        onMessage(new String(msg.getBody(), encoding));
                    } catch (UnsupportedEncodingException e) {
                        log.error("系统不支持消息编码格式:" + encoding);
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                    log.info("consumerGroup:" + consumerGroup + " MsgId:" + msg.getMsgId() + " was done!");
                }
                // 如果没有return success ,consumer会重新消费该消息,直到return success
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
 
        // Consumer对象在使用之前必须要调用start初始化,初始化一次即可
        defaultMQPushConsumer.start();
 
        log.info("consumerGroup:" + consumerGroup + " namesrvAddr:" + namesrvAddr + "  start success!");
    }
 
    @PreDestroy
    public void destroy() {
        defaultMQPushConsumer.shutdown();
    }
}

消费者处理:

 
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
 
import java.util.HashMap;
import java.util.Map;
 
/**
 * 规则刷新消费者
 * @author yangqiusheng
 */
@Slf4j
@Service
public class RocketMQCacheConsumer extends BaseConsumer {
    @Value(value = "${rocketmq.registry.namesrvAddr}")
    private String namesrvAddr;
 
 
    @Override
    protected void initParam() {
        super.consumerGroup = MQConstants.MQ_GROUP_NEWRULEENGINE;
        super.topic = MQConstants.MQ_GROUP_TOPIC_CACHE;
        super.topicTag = MQConstants.MQ_GROUP_TOPICTAG_CACHE;
        super.messageModel = MQConstants.MQ_MESSAGEMODEL_BROADCASTING;
        super.consumeFromWhere = MQConstants.MQ_CONSUMEFROMWHERE_CFLO;
        super.encoding = MQConstants.MQ_UTF_8;
        super.setNamesrvAddr(namesrvAddr);
    }
 
    @Override
    protected void onMessage(String msg) {
        log.info("############RocketMQCacheConsumer rocketmq开始接收消息:{}#################", msg);
        try {
            if (CommonFunctions.isNotEmpty(msg)) {
                //消费者逻辑代码在这写
            }
        } catch (Exception e) {
            e.printStackTrace();
            log.error("RocketMQCacheConsumer    rocketmq刷新缓存失败!", e);
        }
    }
 
}

生产者:

package com.yun.greedy.modules.util;
 
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
 
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.UnsupportedEncodingException;
 
/**
 * RocketMq生产者配置
 * @author yangqiusheng
 */
@Slf4j
@Data
public abstract class BaseProducer {
    private DefaultMQProducer defaultMQProducer;
    protected String producerGroup;
    private String namesrvAddr;
    protected String topic;
    protected String topicTag;
    protected long timeout = 10000;
    //编码格式
    protected String encoding;
//    public static String encoding = System.getProperty("file.encoding");
 
    /**
     * 初始化配置
     */
    protected abstract void initParam();
 
    @PostConstruct
    private void init() throws MQClientException {
        initParam();
        this.log.info("服务器编码为:" + System.getProperty("file.encoding"));
        this.log.info("producerGroup:" + this.producerGroup + " namesrvAddr:" + this.namesrvAddr + " initialize!");
 
        this.defaultMQProducer = new DefaultMQProducer(this.producerGroup);
        this.defaultMQProducer.setNamesrvAddr(this.namesrvAddr);
        this.defaultMQProducer.setInstanceName(String.valueOf(System.currentTimeMillis()));
 
        this.log
                .info("producerGroup:" + this.producerGroup + " topic:" + this.topic + " ,topicTag:" + this.topicTag);
        this.defaultMQProducer.start();
 
        this.log
                .info("producerGroup:" + this.producerGroup + " namesrvAddr:" + this.namesrvAddr + "  start success!");
    }
 
    public void sendMessage(String msg) throws Exception {
        this.sendMessage(null, 0, msg);
    }
 
    /**
     * <p>
     * description:发送消息
     * </p>
     *
     * @param keys
     * @param msg
     * @throws Exception
     * @author ex_liuruili
     * @see
     */
    public void sendMessage(String keys, String msg) throws Exception {
        this.sendMessage(keys, 0, msg);
    }
 
 
    public void sendMessage(String keys, int delayTimeLevel, String msg) throws Exception {
        Message sendMsg;
        try {
            this.log.info("服务器编码为:" + System.getProperty("file.encoding"));
            if (StringUtils.isBlank(keys)) {
                keys = this.topicTag;
            }
            sendMsg = new Message(this.topic, this.topicTag, keys, msg.getBytes(encoding));
            if (delayTimeLevel >= 1 && delayTimeLevel <= 18) {
                sendMsg.setDelayTimeLevel(delayTimeLevel);
            }
 
        } catch (UnsupportedEncodingException e1) {
            throw new Exception("发送MQ消息队列出错!" + e1.getMessage());
        }
        SendResult sendResult = null;
        int count = 1;
        boolean status = false;
        while (count <= 3) {
            try {
                sendResult = this.defaultMQProducer.send(sendMsg, timeout);
                log.info("第" + count + "次消息发送:" + sendResult);
 
                if ((sendResult != null) && (sendResult.getSendStatus() == SendStatus.SEND_OK)) {
                    status = true;
                    break;
                }
            } catch (MQClientException e) {
                this.log.error("MQ发送消息第" + count + "次发送失败!失败原因:" + e.getMessage() + sendResult);
            } catch (RemotingException e) {
                this.log.error("MQ发送消息第" + count + "次发送失败!失败原因:" + e.getMessage() + sendResult);
            } catch (MQBrokerException e) {
                this.log.error("MQ发送消息第" + count + "次发送失败!失败原因:" + e.getMessage() + sendResult);
            } catch (InterruptedException e) {
                this.log.error("MQ发送消息第" + count + "次发送失败!失败原因:" + e.getMessage() + sendResult);
            } finally {
                count++;
            }
        }
 
        if (status != true) {
            this.log.error(
                    "发送消息队列topic:" + this.topic + " topicTag:" + this.topicTag + " message:" + msg + "失败!");
            throw new Exception("发送MQ消息队列出错!");
        }
    }
 
    @PreDestroy
    private void destroy() {
        this.defaultMQProducer.shutdown();
    }
}

生产者实现:

 
 
import cn.axa.ruleengine.common.exception.RRException;
import cn.axa.ruleengine.modules.rocketmq.util.BaseProducer;
import cn.axa.ruleengine.modules.rocketmq.util.MQConstants;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
 
import java.util.Map;
 
/**
 * 生产者
 * @author yangqiusheng
 */
@Slf4j
@Service
public class RocketMQCacheProduct extends BaseProducer {
    @Value(value = "${rocketmq.registry.namesrvAddr}")
    private String namesrvAddr;
 
    @Override
    protected void initParam() {
        super.producerGroup = MQConstants.MQ_GROUP_NEWRULEENGINE;
        super.topic = MQConstants.MQ_GROUP_TOPIC_CACHE;
        super.topicTag = MQConstants.MQ_GROUP_TOPICTAG_CACHE;
        super.encoding = MQConstants.MQ_UTF_8;
        super.setNamesrvAddr(namesrvAddr);
    }
 
    /**
     * 消息发送
     *
     * @param map
     */
    public void sendRuleCache(Map map) {
        try {
            String producerMsg = JSONObject.toJSON(map).toString();
            log.info("####################RocketMQCacheProduct  提供者信息:{}#######################", producerMsg);
            sendMessage(producerMsg);
        } catch (Exception e) {
            e.printStackTrace();
            throw new RRException("RocketMQCacheProduct 提供者上传队列信息失败!" + e.getMessage());
        }
    }
}


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3天前
|
消息中间件 存储 负载均衡
我服了,RocketMQ消费者负载均衡内核是这样设计的
文章为理解RocketMQ的负载均衡机制提供了深入的技术洞察,并对如何在实际应用中扩展和定制负载均衡策略提供了有价值的见解。
我服了,RocketMQ消费者负载均衡内核是这样设计的
|
3天前
|
消息中间件 负载均衡 API
RocketMQ生产者负载均衡(轮询机制)核心原理
文章深入分析了RocketMQ生产者的负载均衡机制,特别是轮询机制的实现原理,揭示了如何通过`ThreadLocal`技术和消息队列的选播策略来确保消息在多个队列之间均衡发送,以及如何通过灵活的API支持自定义负载均衡策略。
|
3天前
|
消息中间件 存储 负载均衡
RocketMQ消费者消费消息核心原理(含长轮询机制)
这篇文章深入探讨了Apache RocketMQ消息队列中消费者消费消息的核心原理,特别是长轮询机制。文章从消费者和Broker的交互流程出发,详细分析了Push和Pull两种消费模式的内部实现,以及它们是如何通过长轮询机制来优化消息消费的效率。文章还对RocketMQ的消费者启动流程、消息拉取请求的发起、Broker端处理消息拉取请求的流程进行了深入的源码分析,并总结了RocketMQ在设计上的优点,如单一职责化和线程池的使用等。
RocketMQ消费者消费消息核心原理(含长轮询机制)
|
4天前
|
消息中间件 缓存 Java
RocketMQ - 消费者消费方式
RocketMQ - 消费者消费方式
12 0
|
4天前
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
12 0
|
4天前
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
10 0
|
4天前
|
消息中间件 存储 缓存
RocketMQ - 消费者启动机制
RocketMQ - 消费者启动机制
9 0
|
4天前
|
消息中间件 存储 缓存
RocketMQ - 消费者概述
RocketMQ - 消费者概述
12 0
|
21天前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
21天前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。