云原生中间件RocketMQ-消费者消费模式之广播模式

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 云原生中间件RocketMQ-消费者消费模式之广播模式

PushConsumer消费模式-广播模式

广播消费: 当使用广播消费模式时, 消息队列 RocketMQ 会将每条消息推送给集群内所有注册过的客户端, 保证消息至少被每台机器消费一次。
image.png
相比于集群模式,广播模式的特点为: 每个消费者都会消费所订阅的Topic + Tag下的所有queue中的所有消息。
适用场景&注意事项

  1. 广播消费模式下不支持顺序消息。
  2. 广播消费模式下不支持重置消费位点。
  3. 每条消息都需要被相同逻辑的多台机器处理。
  4. 广播模式下, 消息队列 RocketMQ 保证每条消息至少被每台客户端消费一次, 但是并不会对消费失败的消息进行失败重投, 因此业务方需要关注消费失败的情况。
  5. 广播模式下, 客户端每一次重启都会从最新消息消费。 客户端在被停止期间发送至服务端的消息将会被自动跳过, 请谨慎选择
  6. 广播模式下, 每条消息都会被大量的客户端重复处理, 因此推荐尽可能使用集群模式。
  7. 目前仅 Java 客户端支持广播模式。
  8. 广播模式下服务端不维护消费进度, 所以消息队列 RocketMQ 控制台不支持消息堆积查询、 消息堆积报警和订阅关系查询功能。
  9. 消费进度在客户端维护, 出现消息重复消费的概率稍大于集群模式。

设置成广播模式相关代码如下:

//设置消费模式为广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);

至少一次设计理念

在集群模式下,RocketMQ 可以保证Topic + Tag下的消息可以肯定会被整个集群至少消费一次。
在广播模式下,RocketMQ 可以保证至少被每台机器消费一次。
类似于数据库的事务操作,消费者未消费完成不返回ack给RocketMQ。官方对于至少一次的解释如下:
image.png
官方地址:https://github.com/apache/rocketmq/blob/master/docs/cn/features.md

消费过程幂等

RocketMQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)
msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。

消息存储核心-偏移量Offset

Offset指某个topic下的一条消息在某个MessageQueue里的位置,通过Offset可以进行定位到这条消息。Offset是消息消费进度的核心。

  • message queue 是无限长的数组,一条消息进来下标就会加1,下标就是 offset,消息在某个 MessageQueue 里的位置,通过 offset 的值可以定位到这条消息,或者指示 Consumer 从这条消息开始向后处理。
  • message queue 中的 maxOffset 表示消息的最大 offset,maxOffset 并不是最新的那条消息的 offset,而是最新消息的 offset+1,minOffset 则是现存在的最小 offset。
  • fileReserveTime=48 默认消息存储48小时后,消费会被物理地从磁盘删除,message queue 的 minOffset 也就对应增长。所以比 minOffset 还要小的那些消息已经不在 broker上了,就无法被消费。

Offset的存储实现分为远程文件类型和本地文件类型两种方式。

集群模式-RemoteBrokerOffsetStore解析

默认集群模式clustering,采用远程文件存储Offset。
本质上因为多消费模式,每个Consumer消费所订阅主题的一部分
这种情况需要broker控制offset的值,使用RemoteBrokerOffsetStore。

广播模式-LocalFileOffsetStore解析

  1. 广播模式下,由于每个Consumer都会收到消息且消费
  2. 各个Consumer之间没有任何干扰,独立线程消费
  3. 所以使用LocalFileOffsetStore,也就是把Offset存储到本地

RocketMQ消费者拉取模式-PullConsumer使用

Pull方式主要做了三件事:

  • 获取Message Queue并遍历
  • 维护OffsetStore
  • 根据不同的消息状态做不同的处理

代码案例如下:
DefaultMQPullConsumer拉取:

package com.zjq.rocketmq.consumer.pull;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;

import com.zjq.rocketmq.constants.Const;
 

public class PullConsumer {
    //Map<key, value>  key为指定的队列,value为这个队列拉取数据的最后位置
    //实际中可以放在redis里面或者持久化记录消费的位置
    private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();
 
    public static void main(String[] args) throws MQClientException {
        
        String group_name = "test_pull_consumer_name";
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(group_name);
        consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
        consumer.start();
        System.err.println("consumer start");
        //    从TopicTest这个主题去获取所有的队列(默认会有4个队列)
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("test_pull_topic");
        //    遍历每一个队列,进行拉取数据
        for (MessageQueue mq : mqs) {
            System.out.println("Consume from the queue: " + mq);
            
            SINGLE_MQ: while (true) {
                try {
                    //    从queue中获取数据,从什么位置开始拉取数据 单次对多拉取32条记录
                    PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.println(pullResult);
                    System.out.println(pullResult.getPullStatus());
                    System.out.println();
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            List<MessageExt> list = pullResult.getMsgFoundList();
                            for(MessageExt msg : list){
                                System.out.println(new String(msg.getBody()));
                            }
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            System.out.println("没有新的数据啦...");
                            break SINGLE_MQ;
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        consumer.shutdown();
    }
 
 
    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        offseTable.put(mq, offset);
    }
 
 
    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = offseTable.get(mq);
        if (offset != null)
            return offset;
        return 0;
    }
 
}

MQPullConsumerScheduleService定时拉取:

package com.zjq.rocketmq.consumer.pull;

import java.util.List;

import org.apache.rocketmq.client.consumer.MQPullConsumer;
import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullTaskCallback;
import org.apache.rocketmq.client.consumer.PullTaskContext;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import com.zjq.rocketmq.constants.Const;
 
public class PullScheduleService {

    public static void main(String[] args) throws MQClientException {
        
        String group_name = "test_pull_consumer_name";
        
        final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService(group_name);
        
        scheduleService.getDefaultMQPullConsumer().setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
        
        scheduleService.setMessageModel(MessageModel.CLUSTERING);
        
        scheduleService.registerPullTaskCallback("test_pull_topic", new PullTaskCallback() {
 
            @Override
            public void doPullTask(MessageQueue mq, PullTaskContext context) {
                MQPullConsumer consumer = context.getPullConsumer();
                System.err.println("-------------- queueId: " + mq.getQueueId()  + "-------------");
                try {
                    // 获取从哪里拉取
                    long offset = consumer.fetchConsumeOffset(mq, false);
                    if (offset < 0)
                        offset = 0;
 
                    PullResult pullResult = consumer.pull(mq, "*", offset, 32);
                    switch (pullResult.getPullStatus()) {
                    case FOUND:
                        List<MessageExt> list = pullResult.getMsgFoundList();
                        for(MessageExt msg : list){
                            //消费数据...
                            System.out.println(new String(msg.getBody()));
                        }
                        break;
                    case NO_MATCHED_MSG:
                        break;
                    case NO_NEW_MSG:
                    case OFFSET_ILLEGAL:
                        break;
                    default:
                        break;
                    }
                    consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
                    // 设置再过3000ms后重新拉取
                    context.setPullNextDelayTimeMillis(3000);
          
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
 
        scheduleService.start();
    }
}

参考:
https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
物联网 Go 网络性能优化
使用Go语言(Golang)可以实现MQTT协议的点对点(P2P)消息发送。MQTT协议本身支持多种消息收发模式
使用Go语言(Golang)可以实现MQTT协议的点对点(P2P)消息发送。MQTT协议本身支持多种消息收发模式【1月更文挑战第21天】【1月更文挑战第104篇】
124 1
|
3月前
|
消息中间件 中间件 Kafka
原来RocketMQ中间件可以这么玩
消息队列作为高并发系统的核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性。主要具有以下优势:
35 0
|
4月前
|
消息中间件 Java
Java操作RabbitMQ单一生产-消费者模式
Java操作RabbitMQ单一生产-消费者模式
32 0
|
14天前
|
消息中间件 存储 监控
写了10000字:全面学习RocketMQ中间件
以上是 V 哥在授课时整理的全部 RocketMQ 的内容,在学习时重点要理解其中的含义,正所谓知其然知其所以然,希望这篇文章可以帮助兄弟们搞清楚RocketMQ的来龙去脉,必竟这是一个非常常用的分布式应用的中间件,好了,今天的内容就分享到这,我靠!已经 00:36分,建议收藏起来,慢慢消化,创作不易,喜欢请点赞转发。
|
25天前
|
消息中间件 RocketMQ
RocketMq消费者/生产者配置
RocketMq消费者/生产者配置
|
28天前
|
消息中间件 供应链 Java
RabbitMQ入门指南(九):消费者可靠性
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了消费者确认机制、失败重试机制、失败处理策略、业务幂等性等内容。
43 0
RabbitMQ入门指南(九):消费者可靠性
|
1月前
|
消息中间件 Java BI
RabbitMQ的四种消息传递模式与演示代码
RabbitMQ的四种消息传递模式与演示代码
39 0
|
2月前
|
消息中间件 Java 调度
【深度挖掘RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)
【深度挖掘RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)
14 1
|
2月前
|
消息中间件 Java RocketMQ
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
13 1
|
2月前
|
消息中间件 存储 NoSQL
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)
28 1

相关产品

  • 云消息队列 MQ