RcoketMQ内部机制和应用场景的分享(二)

本文涉及的产品
网络型负载均衡 NLB,每月750个小时 15LCU
应用型负载均衡 ALB,每月750个小时 15LCU
传统型负载均衡 CLB,每月750个小时 15LCU
简介: RcoketMQ内部机制和应用场景的分享

6.4广播消费

/**
 * PushConsumer,广播方式订阅消息
 * 
 */
public class PushConsumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");
        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
         /**
          * 缺省的,Consumer的MessageModel就是CLUSTERING模式,也就是同1个Consumer Group内部,
          * 多个Consumer分摊同1个topic的多个queue,也就是负载均衡。
          * 如果你把MessageModel改成BROADCAST模式,那同1个Consumer Group内部也变成了广播,
          * 此时ConsumerGroup其实就没有区分的意义了。
          * 此时,不管是1个Consumer Group,还是多个Consumer Group,对同1个topic的消息,都变成了广播。
          */
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.subscribe("TopicTest", "TagA || TagC || TagD");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                 //RocketMQ提供了ack机制,返回消费状态  
                 //CONSUME_SUCCESS 消费成功  
                 //RECONSUME_LATER 消费失败,需要稍后重新消费  
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Broadcast Consumer Started.");
    }
}

7.RocketMQ发送消息的四方式

7.1可靠的同步

同步传输通常用于响应时间敏感的业务场景。

public static void main(String[] args) throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
   producer.start();
   for (int i = 0; i < 100; i++) {
         Message msg = new Message("TopicTest" TagA",
          ("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET));
         SendResult sendResult = producer.send(msg);
         System.out.printf("%s%n", sendResult);
   }
   producer.shutdown();
}

7.2可靠的异步,速度快//重点在SendCallback这里 异步发送回调,可靠性在于需要根据返回结果在回调里面处理业务。

   异步传输通常用于响应时间敏感的业务场景。

public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 100; i++) {
  final int index = i;
  Message msg = new Message("TopicTest","TagA","OrderID188","Hello 
                world".getBytes(RemotingHelper.DEFAULT_CHARSET));
  producer.send(msg, new SendCallback() {
     @Override
     public void onSuccess(SendResult sendResult) {
           System.out.printf("%-10d OK %s %n", index,
           sendResult.getMsgId());
     }
    @Override
    public void onException(Throwable e) {
       System.out.printf("%-10d Exception %s %n", index, e);
       e.printStackTrace();
   }
  });
 }
 producer.shutdown();
}

7.3单向传输,只管发送,不在意是否成功

    应用:单向传输用于要求中等可靠性的情况,如日志采集。

public class OnewayProducer {
public static void main(String[] args) throws Exception{
  DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
  producer.start();
  for (int i = 0; i < 100; i++) {
    Message msg = new Message("TopicTest, "TagA" ("Hello 
     RocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
     producer.sendOneway(msg);
  }
  producer.shutdown();
}

7.4.事务消息,通过实现TransactionMQProducer,并且编写本地事务监听器。

@Override
public TransactionSendResult sendMessageInTransaction(final Message msg,
                                                          final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException {
        if (null == this.transactionCheckListener) {
            throw new MQClientException("localTransactionBranchCheckListener is null", null);
        }
        return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg);
}

发送的一些其他说明

默认发送超时为3s。

消息超过4k,即启用消息的压缩。

发送失败,默认重发2次。

消息最大限制为4M,即超过4M会提示发送失败。

8.负载均衡

消费者

Consumer, 缺省的Consumer的MessageModel就是CLUSTERING模式,也就是同1个Consumer Group内部,

多个Consumer分摊同1个topic的多个queue。但是,是否实现负载均衡和调用的API有关

pull和push用法上的基本差别就是:pull是客户端主动去拉取消息,push是注册了一个回调,当有新消息,该回调被调用。

但这还不是2者的最大区别,最大区别是:在pull里面,所有MessageQueue是向我们暴露的,我们需要自己去手动遍历所有的queue;

push(DefaultMQPushConsumer)里面,我们只指定了订阅的topic,而MessageQueue是向我们隐藏的,在其内部做了"负载均衡"

pull(DefaultMQPullConsumer)的代码,我们手动遍历了所有的queue,没有负载均衡!!!

PushConsumer,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。

但是实际PushConsumer内部是使用长轮询Pull方式从Broker拉消息,然后再回调用户Listener方法

生产者

Producer端负载均衡,每个实例在发消息的时候,默认会轮询所有的message queue发送,

以达到让消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就发送到不同的broker下

/**
 *  Producer,发送顺序消息
 *  重写queue的负载策略
 */
public class Producer {
    public static void main(String[] args) {
        try {
          DefaultMQProducer producer = new DefaultMQProducer("ordermessage");
            producer.setNamesrvAddr("127.0.0.1:9876");
            producer.start();
            String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", "TagE" };
            for (int i = 0; i < 100; i++) {
                // 订单ID相同的消息要有序
                int orderId = i % 10;
                Message msg =
                        new Message("TopicTest", tags[i % tags.length], "KEY" + i,
                            ("Hello RocketMQ " + i).getBytes());
                //相同的订单号放在相同的队列里
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        //mqs默认是4个队列(可以从管理控制台里查看,从界面添加默认16个)
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);
                System.out.println(sendResult);
            }
            producer.shutdown();
        }
        catch (MQClientException e) {
            e.printStackTrace();
        }
        catch (RemotingException e) {
            e.printStackTrace();
        }
        catch (MQBrokerException e) {
            e.printStackTrace();
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

9.消息重试

RocketMQ的消息重试包含了producer发送消息的重试和consumer消息消费的重试。

9.1.producer发送消息重试

发送消息的充实次数区分不同的情况:

同步发送:org.apache.rocketmq.client.producer.DefaultMQProducer#retryTimesWhenSendFailed + 1,默认retryTimesWhenSendFailed是2,所以除了正常调用一次外,发送消息如果失败了会重试2次,立即重试,中间没有单独的间隔时间。

异步发送:不会重试(调用总次数等于1)

消息处理失败之后,该消息会和其他正常的消息一样被broker处理,之所以能重试是因为consumer会把失败的消息发送回broker,broker对于重试的消息做一些特别的处理,供consumer再次发起消费 。

9.2.consumer消费重试

以下原理均只适用于RocketMQ中的PushConsumer即Java客户端中的DefaultPushConsumer。 若使用了PullConsumer模式,类似的工作如何ack,如何保证消费等均需要使用方自己实现。

 9.2.1 exception的情况,一般重复16次 10s、30s、1mins、2mins、3mins等。注意reconsumeTimes这个参数;

   9.2.2 超时情况,这种情况MQ会无限制的发送给消费端。这种情况就是Consumer端没有返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,也没有返回ConsumeConcurrentlyStatus.RECONSUME_LATER

   9.2.3 消息重试的主要流程:

      9.2.3.1consumer消费失败,将消息发送回broker

      9.2.3.2broker收到重试消息之后置换topic,存储消息

      9.2.3.3consumer会拉取该topic对应的retryTopic的消息

      9.2.3.4consumer拉取到retryTopic消息之后,置换到原始的topic,把消息交给listener消费

  9.2.4死信队列

     如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列。应用可以监控死信队列来做人工干预。

注意点

1.如果业务的回调没有处理好而抛出异常,会认为是消费失败当ConsumeConcurrentlyStatus.RECONSUME_LATER处理。

2.当使用顺序消费的回调MessageListenerOrderly时,由于顺序消费是要前者消费成功才能继续消费,所以没有ConsumeConcurrentlyStatus.RECONSUME_LATER的这个状态,只有ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT来暂停队列的其余消费,直到原消息不断重试成功为止才能继续消费。

3.重复的时间间隔是可以在配置文件内设置的,由于我这边配置的双master模式,所以在服务器的broker-a.properties和broker-b.properties中分别配置,设置好后务必将之前的数据清理

10.如何保证消息不被重复消费

RocketMq实际上有个consumerOffset的概念,就是每个消息写进去,都有一个consumerOffset,代表他的序号,然后consumer消费了数据之后,每隔一段时间,会把自己消费过的消息的offset提交一下,代表我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的consumerOffset来继续消费吧。

但是凡事总有意外,比如我们之前生产经常遇到的,就是你有时候重启系统,看你怎么重启了,如果碰到点着急的,直接kill进程了,再重启。这会导致consumer有些消息处理了,但是没来得及提交offset,尴尬了。重启之后,少数消息会再次消费一次。

其实重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性

给你举个例子吧。假设你有个系统,消费一条往数据库里插入一条,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下已经消费过了,直接扔了,不就保留了一条数据?

一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性

幂等性,我通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。

那所以第二个问题来了,怎么保证消息队列消费的幂等性?

其实还是得结合业务来思考,我这里给几个思路:

(1)比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update一下好吧

(2)比如你是写redis,那没问题了,反正每次都是set,天然幂等性

(3)比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的id,类似订单id之类的东西,然后你这里消费到了之后,先根据这个id去比如redis里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个id写redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。

还有比如基于数据库的唯一键来保证重复数据不会重复插入多条,我们之前线上系统就有这个问题,就是拿到数据的时候,每次重启可能会有重复,因为kafka消费者还没来得及提交offset,重复数据拿到了以后我们插入的时候,因为有唯一键约束了,所以重复数据只会插入报错,不会导致数据库中出现脏数据

ps:还可以封装一个通用的解决方案

参考:http://jaskey.github.io/blog/2020/06/08/rocketmq-message-dedup/

11.如何保证高可用性(面试常问)

11.1集群化部署 + 数据多副本冗余

如果你们公司是电商平台、外卖平台、社交平台。那么来这么一出,不是会导致公司损失惨重?解决方案如下

集群化部署 + 数据多副本冗余

MQ采用集群模式部署到了2台机器上去,然后生产者给其中一台机器写入一条消息,该机器自动同步复制给另外一台机器。

此时数据在2台机器上,就有2个副本了,那么如果第一台机器宕机了,就不会影响我们

实际上这种MQ集群化部署架构以及数据多副本冗余机制,是非常常见的一种高可用架构。

11.2多副本同步复制强制要求

假如你要是不能保证这一点,比如你就写数据给了其中一台机器,然后他还没来得及复制给另外一台机器呢,直接第一台机器就宕机了。

此时虽然你可以继续基于第二台机器发送消息和消费消息,但是你刚才发送的一条消息就丢失了。

在写数据到其中一台机器的时候,得要求,必须得让那台机器复制数据到另外一台机器了,保证集群里一定有这条数据双副本了,才可以认为本次写成功了,否则认为发送失败

上面说的那一整套的机制,在Kafka里都可以采用,他有对应的一些参数可以配置数据有几个副本,包括你每次写入必须复制到几台机器才可以算成功,否则就要重新发送,以及你的集群剩余机器必须可以承载几个副本才能继续写入数据

11.3影响消息可靠性的几种情况

(1). Broker 正常关闭

(2). Broker 异常 Crash

(3). OS Crash

(4). 机器掉电,但是能立即恢复供电情冴。

(5). 机器无法开机(可能是 cpu、主板、内存等关键设备损坏)

(6). 磁盘设备损坏。

(1)、 (2)、 (3)、 (4)四种情况都属亍硬件资源可立即恢复情况,RocketMQ 在返四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。

(5)、 (6)属于单点故障,无法恢复,一旦发生,在此单点上的消息全部丢失。 RocketMQ 在返两种情冴下,通过异步复制,可保证 99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,

同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如不 Money 相关的应用。

12.如何解决消息队列的延时以及过期失效问题

12.1消息积压解决方案

1)先修复consumer的问题,确保其恢复消费速度,然后将现有cnosumer都停掉

2)新建一个topic,queue是原来的10倍

3)然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的具有10倍数量queue的topic

4)接着临时征用10倍的机器来部署consumer,每一批consumer均匀的消费queue的数据

5)这种做法相当于是临时将queue资源和consumer资源扩大10倍,以正常的10倍速度来消费数据

6)等快速消费完积压数据之后,得恢复原先部署架构,重新用原先的consumer机器来消费消息

12.2消息积压导致的数据丢失解决方案

如果消息在queue中积压超过一定的时间就会被Rocketmq给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在mq里,而是大量的数据会直接搞丢。

这个情况下,就不是说要增加consumer消费积压的消息,因为实际上没啥积压,而是丢了大量的消息。我们可以采取一个方案,就是批量重导, 将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入mq里面去,把白天丢的数据给他补回来。也只能是这样了。

13.启动的时候从哪里消费

当新实例启动的时候,PushConsumer会拿到本消费组broker已经记录好的消费进度(consumer offset),按照这个进度发起自己的第一次Pull请求。

如果这个消费进度在Broker并没有存储起来,证明这个是一个全新的消费组,这时候客户端有几个策略可以选择:

CONSUME_FROM_LAST_OFFSET //默认策略,从该队列最尾开始消费,即跳过历史消息
CONSUME_FROM_FIRST_OFFSET //从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
CONSUME_FROM_TIMESTAMP//从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以

所以,社区中经常有人问:“为什么我设了CONSUME_FROM_LAST_OFFSET,历史的消息还是被消费了”? 原因就在于只有全新的消费组才会使用到这些策略,老的消费组都是按已经存储过的消费进度继续消费。

14.消息ACK机制

14.1批量ack机制潜在的问题

RocketMQ是以consumer group+queue为单位是管理消费进度的,以一个consumer offset标记这个这个消费组在这条queue上的消费进度。

如果某已存在的消费组出现了新消费实例的时候,依靠这个组的消费进度,就可以判断第一次是从哪里开始拉取的。

每次消息成功后,本地的消费进度会被更新,然后由定时器定时同步到broker,以此持久化消费进度

但是每次记录消费进度的时候,只会把一批consumer group中最小的offset值为消费进度值

这种方式和传统的一条message单独ack的方式有本质的区别。性能上提升的同时,会带来一个潜在的重复问题——由于消费进度只是记录了一个下标,就可能出现拉取了100条消息如 2101-2200的消息,后面99条都消费结束了,只有2200消费一直没有结束的情况。

在这种情况下,RocketMQ为了保证消息肯定被消费成功,消费进度职能维持在2101,直到2200也消费结束了,本地的消费进度才能标记2200消费结束了(注:consumerOffset=2201)。

在这种设计下,就有消费大量重复的风险。如2200在还没有消费完成的时候消费实例突然退出(机器断电,或者被kill)。这条queue的消费进度还是维持在2101,当queue重新分配给新的实例的时候,新的实例从broker上拿到的消费进度还是维持在2101,这时候就会又从2101开始消费,2102-2200这批消息实际上已经被消费过还是会投递一次

14.2批量ack重复消费场景的解决

实际上对于卡住进度的场景,可以选择弃车保帅的方案:把消息卡住那些消息,先ack掉,让进度前移。但要保证这条消息不会因此丢失,ack之前要把消息sendBack回去,这样这条卡住的消息就会必然重复,但会解决潜在的大量重复的场景

后来RocketMQ显然也发现了这个问题,RocketMQ在3.5.8之后也是采用这样的方案去解决这个问题

15.推拉模式

首先明确一下推拉模式到底是在讨论消息队列的哪一个步骤,一般而言我们在谈论推拉模式的时候指的是 Comsumer 和 Broker 之间的交互。

推模式

指的是消息从 Broker 推向 Consumer,即 Consumer 被动的接收消息,由 Broker 来主导消息的发送。

推模式的好处:消息实时性高

推模式的缺点:难以适应消费速率

拉模式

指的是 Consumer 主动向 Broker 请求拉取消息,即 Broker 被动的发送消息给 Consumer。

拉模式的好处:可以更合适的进行消息的批量发送

拉模式的缺点:消息延迟

RocketMQ 中的 PushConsumer 其实是拉模式的,只是看起来像推模式而已。

RocketMQ  和 Kafka 都是采用“长轮询”的机制,具体的做法都是通过消费者等待消息,当有消息的时候 Broker 会直接返回消息,如果没有消息都会采取延迟处理的策略,并且为了保证消息的及时性,在对应队列或者分区有新消息到来的时候都会提醒消息来了,及时返回消息。

一句话说就是消费者和 Broker 相互配合,拉取消息请求不满足条件的时候 hold 住,避免了多次频繁的拉取动作,当消息一到就提醒返回。

RocketMQ 中的 PushConsumer 其实是披着拉模式的方法,只是看起来像推模式而已。

16.消息丢失场景分析及MQ内部如何解决

  • 生产者产生消息发送给RocketMQ
  • RocketMQ接收到了消息之后,必然需要存到磁盘中,否则断电或宕机之后会造成数据的丢失
  • 消费者从RocketMQ中获取消息消费,消费成功之后,整个流程结束

这三种场景都可能会产生消息的丢失,解决方案如下:

  • 使用事务机制传输消息
  • 同步刷盘替代异步刷盘,Follower备份数据
  • 基于mq的消息确认消费机制

缺陷如下:

  • 使用事务机制传输消息,会比普通的消息传输多出很多步骤,耗费性能
  • 同步刷盘相比异步刷盘,一个是存储在磁盘中,一个存储在内存中,速度完全不是一个数量级
  • 主从机构的话,需要Leader将数据同步给Follower
  • 消费时无法异步消费,只能等待消费完成再通知RocketMQ消费完成

17.相关文章

http://dy.163.com/v2/article/detail/E35QBB2053168IW.html

https://blog.csdn.net/mr253727942/article/details/55805876?utm_source=tuicool

https://blog.csdn.net/linuxheik/article/details/79579329

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
目录
相关文章
|
8月前
|
安全 中间件 数据安全/隐私保护
中间件的定义,包括它的功能、应用场景以及优势。
中间件是位于操作系统和应用软件间的系统软件,提供数据交换、应用集成、流程管理和安全保障等服务。常用于分布式系统、微服务架构和企业级应用,实现高效、低耦合的系统运行。其优势在于降低开发成本、提升系统性能、简化扩展和维护。中间件推动了软件技术的发展和创新。
1020 1
|
8月前
|
安全 Java 编译器
深入探讨Java反射:解析机制与应用场景
反射是Java的一种强大而灵活的特性,它允许程序在运行时获取类的信息、构造对象、调用方法和访问字段。在Java中,每个类都有一个对应的Class对象,通过这个对象,我们可以了解类的结构和行为。
200 1
 深入探讨Java反射:解析机制与应用场景
|
3月前
|
供应链 数据库
数据库事务安全性控制有什么应用场景吗
【10月更文挑战第15天】数据库事务安全性控制有什么应用场景吗
|
5月前
|
编解码 前端开发 JavaScript
动态组件有哪些常见的应用场景呢
【8月更文挑战第30天】动态组件有哪些常见的应用场景呢
123 1
|
8月前
|
存储 缓存 中间件
中间件Cache-Aside策略特别适合“读多”的应用场景
【5月更文挑战第8天】中间件Cache-Aside策略特别适合“读多”的应用场景
58 2
|
8月前
|
存储 数据采集 弹性计算
日志服务的典型应用场景
日志服务的典型应用场景
138 3
|
8月前
|
消息中间件 存储 监控
【ZeroMQ的SUB视角】深入探讨订阅者模式、C++编程实践与底层机制
【ZeroMQ的SUB视角】深入探讨订阅者模式、C++编程实践与底层机制
887 1
|
8月前
|
存储 程序员 C++
C++容器初始化方式详解:优缺点、性能与应用场景
C++容器初始化方式详解:优缺点、性能与应用场景
126 0
|
缓存 Java
Java线程池创建方式和应用场景
Java线程池创建方式和应用场景
104 0
|
消息中间件 存储 Kafka
RcoketMQ内部机制和应用场景的分享(一)
RcoketMQ内部机制和应用场景的分享
109 0

热门文章

最新文章