6.4广播消费
/** * PushConsumer,广播方式订阅消息 * */ public class PushConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1"); /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br> * 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); /** * 缺省的,Consumer的MessageModel就是CLUSTERING模式,也就是同1个Consumer Group内部, * 多个Consumer分摊同1个topic的多个queue,也就是负载均衡。 * 如果你把MessageModel改成BROADCAST模式,那同1个Consumer Group内部也变成了广播, * 此时ConsumerGroup其实就没有区分的意义了。 * 此时,不管是1个Consumer Group,还是多个Consumer Group,对同1个topic的消息,都变成了广播。 */ consumer.setMessageModel(MessageModel.BROADCASTING); consumer.subscribe("TopicTest", "TagA || TagC || TagD"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); //RocketMQ提供了ack机制,返回消费状态 //CONSUME_SUCCESS 消费成功 //RECONSUME_LATER 消费失败,需要稍后重新消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Broadcast Consumer Started."); } }
7.RocketMQ发送消息的四种方式
7.1可靠的同步
同步传输通常用于响应时间敏感的业务场景。
public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.start(); for (int i = 0; i < 100; i++) { Message msg = new Message("TopicTest" TagA", ("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } producer.shutdown(); }
7.2可靠的异步,速度快//重点在SendCallback这里 异步发送回调,可靠性在于需要根据返回结果在回调里面处理业务。
异步传输通常用于响应时间敏感的业务场景。
public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup"); producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); for (int i = 0; i < 100; i++) { final int index = i; Message msg = new Message("TopicTest","TagA","OrderID188","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } @Override public void onException(Throwable e) { System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); } producer.shutdown(); }
7.3单向传输,只管发送,不在意是否成功
应用:单向传输用于要求中等可靠性的情况,如日志采集。
public class OnewayProducer { public static void main(String[] args) throws Exception{ DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup"); producer.start(); for (int i = 0; i < 100; i++) { Message msg = new Message("TopicTest, "TagA" ("Hello RocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.sendOneway(msg); } producer.shutdown(); }
7.4.事务消息,通过实现TransactionMQProducer,并且编写本地事务监听器。
@Override public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException { if (null == this.transactionCheckListener) { throw new MQClientException("localTransactionBranchCheckListener is null", null); } return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg); }
发送的一些其他说明
默认发送超时为3s。
消息超过4k,即启用消息的压缩。
发送失败,默认重发2次。
消息最大限制为4M,即超过4M会提示发送失败。
8.负载均衡
消费者
Consumer, 缺省的Consumer的MessageModel就是CLUSTERING模式,也就是同1个Consumer Group内部,
多个Consumer分摊同1个topic的多个queue。但是,是否实现负载均衡和调用的API有关
pull和push用法上的基本差别就是:pull是客户端主动去拉取消息,push是注册了一个回调,当有新消息,该回调被调用。
但这还不是2者的最大区别,最大区别是:在pull里面,所有MessageQueue是向我们暴露的,我们需要自己去手动遍历所有的queue;
push(DefaultMQPushConsumer)里面,我们只指定了订阅的topic,而MessageQueue是向我们隐藏的,在其内部做了"负载均衡"。
pull(DefaultMQPullConsumer)的代码,我们手动遍历了所有的queue,没有负载均衡!!!
PushConsumer,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。
但是实际PushConsumer内部是使用长轮询Pull方式从Broker拉消息,然后再回调用户Listener方法
生产者
Producer端负载均衡,每个实例在发消息的时候,默认会轮询所有的message queue发送,
以达到让消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就发送到不同的broker下
/** * Producer,发送顺序消息 * 重写queue的负载策略 */ public class Producer { public static void main(String[] args) { try { DefaultMQProducer producer = new DefaultMQProducer("ordermessage"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", "TagE" }; for (int i = 0; i < 100; i++) { // 订单ID相同的消息要有序 int orderId = i % 10; Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes()); //相同的订单号放在相同的队列里 SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; //mqs默认是4个队列(可以从管理控制台里查看,从界面添加默认16个) int index = id % mqs.size(); return mqs.get(index); } }, orderId); System.out.println(sendResult); } producer.shutdown(); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
9.消息重试
RocketMQ的消息重试包含了producer发送消息的重试和consumer消息消费的重试。
9.1.producer发送消息重试
发送消息的充实次数区分不同的情况:
同步发送:org.apache.rocketmq.client.producer.DefaultMQProducer#retryTimesWhenSendFailed + 1,默认retryTimesWhenSendFailed是2,所以除了正常调用一次外,发送消息如果失败了会重试2次,立即重试,中间没有单独的间隔时间。
异步发送:不会重试(调用总次数等于1)
消息处理失败之后,该消息会和其他正常的消息一样被broker处理,之所以能重试是因为consumer会把失败的消息发送回broker,broker对于重试的消息做一些特别的处理,供consumer再次发起消费 。
9.2.consumer消费重试
以下原理均只适用于RocketMQ中的PushConsumer即Java客户端中的DefaultPushConsumer
。 若使用了PullConsumer模式,类似的工作如何ack,如何保证消费等均需要使用方自己实现。
9.2.1 exception的情况,一般重复16次 10s、30s、1mins、2mins、3mins等。注意reconsumeTimes这个参数;
9.2.2 超时情况,这种情况MQ会无限制的发送给消费端。这种情况就是Consumer端没有返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,也没有返回ConsumeConcurrentlyStatus.RECONSUME_LATER
9.2.3 消息重试的主要流程:
9.2.3.1consumer消费失败,将消息发送回broker
9.2.3.2broker收到重试消息之后置换topic,存储消息
9.2.3.3consumer会拉取该topic对应的retryTopic的消息
9.2.3.4consumer拉取到retryTopic消息之后,置换到原始的topic,把消息交给listener消费
9.2.4死信队列
如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列。应用可以监控死信队列来做人工干预。
注意点
1.如果业务的回调没有处理好而抛出异常,会认为是消费失败当ConsumeConcurrentlyStatus.RECONSUME_LATER处理。
2.当使用顺序消费的回调MessageListenerOrderly时,由于顺序消费是要前者消费成功才能继续消费,所以没有ConsumeConcurrentlyStatus.RECONSUME_LATER的这个状态,只有ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT来暂停队列的其余消费,直到原消息不断重试成功为止才能继续消费。
3.重复的时间间隔是可以在配置文件内设置的,由于我这边配置的双master模式,所以在服务器的broker-a.properties和broker-b.properties中分别配置,设置好后务必将之前的数据清理
10.如何保证消息不被重复消费
RocketMq实际上有个consumerOffset的概念,就是每个消息写进去,都有一个consumerOffset,代表他的序号,然后consumer消费了数据之后,每隔一段时间,会把自己消费过的消息的offset提交一下,代表我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的consumerOffset来继续消费吧。
但是凡事总有意外,比如我们之前生产经常遇到的,就是你有时候重启系统,看你怎么重启了,如果碰到点着急的,直接kill进程了,再重启。这会导致consumer有些消息处理了,但是没来得及提交offset,尴尬了。重启之后,少数消息会再次消费一次。
其实重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性。
给你举个例子吧。假设你有个系统,消费一条往数据库里插入一条,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下已经消费过了,直接扔了,不就保留了一条数据?
一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性
幂等性,我通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。
那所以第二个问题来了,怎么保证消息队列消费的幂等性?
其实还是得结合业务来思考,我这里给几个思路:
(1)比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update一下好吧
(2)比如你是写redis,那没问题了,反正每次都是set,天然幂等性
(3)比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的id,类似订单id之类的东西,然后你这里消费到了之后,先根据这个id去比如redis里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个id写redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。
还有比如基于数据库的唯一键来保证重复数据不会重复插入多条,我们之前线上系统就有这个问题,就是拿到数据的时候,每次重启可能会有重复,因为kafka消费者还没来得及提交offset,重复数据拿到了以后我们插入的时候,因为有唯一键约束了,所以重复数据只会插入报错,不会导致数据库中出现脏数据
ps:还可以封装一个通用的解决方案
参考:http://jaskey.github.io/blog/2020/06/08/rocketmq-message-dedup/
11.如何保证高可用性(面试常问)
11.1集群化部署 + 数据多副本冗余
如果你们公司是电商平台、外卖平台、社交平台。那么来这么一出,不是会导致公司损失惨重?解决方案如下
集群化部署 + 数据多副本冗余
MQ采用集群模式部署到了2台机器上去,然后生产者给其中一台机器写入一条消息,该机器自动同步复制给另外一台机器。
此时数据在2台机器上,就有2个副本了,那么如果第一台机器宕机了,就不会影响我们
实际上这种MQ集群化部署架构以及数据多副本冗余机制,是非常常见的一种高可用架构。
11.2多副本同步复制强制要求
假如你要是不能保证这一点,比如你就写数据给了其中一台机器,然后他还没来得及复制给另外一台机器呢,直接第一台机器就宕机了。
此时虽然你可以继续基于第二台机器发送消息和消费消息,但是你刚才发送的一条消息就丢失了。
在写数据到其中一台机器的时候,得要求,必须得让那台机器复制数据到另外一台机器了,保证集群里一定有这条数据双副本了,才可以认为本次写成功了,否则认为发送失败
上面说的那一整套的机制,在Kafka里都可以采用,他有对应的一些参数可以配置数据有几个副本,包括你每次写入必须复制到几台机器才可以算成功,否则就要重新发送,以及你的集群剩余机器必须可以承载几个副本才能继续写入数据。
11.3影响消息可靠性的几种情况
(1). Broker 正常关闭
(2). Broker 异常 Crash
(3). OS Crash
(4). 机器掉电,但是能立即恢复供电情冴。
(5). 机器无法开机(可能是 cpu、主板、内存等关键设备损坏)
(6). 磁盘设备损坏。
(1)、 (2)、 (3)、 (4)四种情况都属亍硬件资源可立即恢复情况,RocketMQ 在返四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。
(5)、 (6)属于单点故障,无法恢复,一旦发生,在此单点上的消息全部丢失。 RocketMQ 在返两种情冴下,通过异步复制,可保证 99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,
同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如不 Money 相关的应用。
12.如何解决消息队列的延时以及过期失效问题
12.1消息积压解决方案
1)先修复consumer的问题,确保其恢复消费速度,然后将现有cnosumer都停掉
2)新建一个topic,queue是原来的10倍
3)然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的具有10倍数量queue的topic
4)接着临时征用10倍的机器来部署consumer,每一批consumer均匀的消费queue的数据
5)这种做法相当于是临时将queue资源和consumer资源扩大10倍,以正常的10倍速度来消费数据
6)等快速消费完积压数据之后,得恢复原先部署架构,重新用原先的consumer机器来消费消息
12.2消息积压导致的数据丢失解决方案
如果消息在queue中积压超过一定的时间就会被Rocketmq给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在mq里,而是大量的数据会直接搞丢。
这个情况下,就不是说要增加consumer消费积压的消息,因为实际上没啥积压,而是丢了大量的消息。我们可以采取一个方案,就是批量重导, 将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入mq里面去,把白天丢的数据给他补回来。也只能是这样了。
13.启动的时候从哪里消费
当新实例启动的时候,PushConsumer会拿到本消费组broker已经记录好的消费进度(consumer offset),按照这个进度发起自己的第一次Pull请求。
如果这个消费进度在Broker并没有存储起来,证明这个是一个全新的消费组,这时候客户端有几个策略可以选择:
CONSUME_FROM_LAST_OFFSET //默认策略,从该队列最尾开始消费,即跳过历史消息 CONSUME_FROM_FIRST_OFFSET //从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍 CONSUME_FROM_TIMESTAMP//从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以 前
所以,社区中经常有人问:“为什么我设了CONSUME_FROM_LAST_OFFSET
,历史的消息还是被消费了”? 原因就在于只有全新的消费组才会使用到这些策略,老的消费组都是按已经存储过的消费进度继续消费。
14.消息ACK机制
14.1批量ack机制潜在的问题
RocketMQ是以consumer group+queue为单位是管理消费进度的,以一个consumer offset标记这个这个消费组在这条queue上的消费进度。
如果某已存在的消费组出现了新消费实例的时候,依靠这个组的消费进度,就可以判断第一次是从哪里开始拉取的。
每次消息成功后,本地的消费进度会被更新,然后由定时器定时同步到broker,以此持久化消费进度。
但是每次记录消费进度的时候,只会把一批consumer group中最小的offset值为消费进度值
这种方式和传统的一条message单独ack的方式有本质的区别。性能上提升的同时,会带来一个潜在的重复问题——由于消费进度只是记录了一个下标,就可能出现拉取了100条消息如 2101-2200的消息,后面99条都消费结束了,只有2200消费一直没有结束的情况。
在这种情况下,RocketMQ为了保证消息肯定被消费成功,消费进度职能维持在2101,直到2200也消费结束了,本地的消费进度才能标记2200消费结束了(注:consumerOffset=2201)。
在这种设计下,就有消费大量重复的风险。如2200在还没有消费完成的时候消费实例突然退出(机器断电,或者被kill)。这条queue的消费进度还是维持在2101,当queue重新分配给新的实例的时候,新的实例从broker上拿到的消费进度还是维持在2101,这时候就会又从2101开始消费,2102-2200这批消息实际上已经被消费过还是会投递一次
14.2批量ack重复消费场景的解决
实际上对于卡住进度的场景,可以选择弃车保帅的方案:把消息卡住那些消息,先ack掉,让进度前移。但要保证这条消息不会因此丢失,ack之前要把消息sendBack回去,这样这条卡住的消息就会必然重复,但会解决潜在的大量重复的场景。
后来RocketMQ显然也发现了这个问题,RocketMQ在3.5.8之后也是采用这样的方案去解决这个问题
15.推拉模式
首先明确一下推拉模式到底是在讨论消息队列的哪一个步骤,一般而言我们在谈论推拉模式的时候指的是 Comsumer 和 Broker 之间的交互。
推模式
指的是消息从 Broker 推向 Consumer,即 Consumer 被动的接收消息,由 Broker 来主导消息的发送。
推模式的好处:消息实时性高
推模式的缺点:难以适应消费速率
拉模式
指的是 Consumer 主动向 Broker 请求拉取消息,即 Broker 被动的发送消息给 Consumer。
拉模式的好处:可以更合适的进行消息的批量发送
拉模式的缺点:消息延迟
RocketMQ 中的 PushConsumer 其实是拉模式的,只是看起来像推模式而已。
RocketMQ 和 Kafka 都是采用“长轮询”的机制,具体的做法都是通过消费者等待消息,当有消息的时候 Broker 会直接返回消息,如果没有消息都会采取延迟处理的策略,并且为了保证消息的及时性,在对应队列或者分区有新消息到来的时候都会提醒消息来了,及时返回消息。
一句话说就是消费者和 Broker 相互配合,拉取消息请求不满足条件的时候 hold 住,避免了多次频繁的拉取动作,当消息一到就提醒返回。
RocketMQ 中的 PushConsumer 其实是披着拉模式的方法,只是看起来像推模式而已。
16.消息丢失场景分析及MQ内部如何解决
- 生产者产生消息发送给RocketMQ
- RocketMQ接收到了消息之后,必然需要存到磁盘中,否则断电或宕机之后会造成数据的丢失
- 消费者从RocketMQ中获取消息消费,消费成功之后,整个流程结束
- 使用事务机制传输消息
- 同步刷盘替代异步刷盘,Follower备份数据
- 基于mq的消息确认消费机制
缺陷如下:
- 使用事务机制传输消息,会比普通的消息传输多出很多步骤,耗费性能
- 同步刷盘相比异步刷盘,一个是存储在磁盘中,一个存储在内存中,速度完全不是一个数量级
- 主从机构的话,需要Leader将数据同步给Follower
- 消费时无法异步消费,只能等待消费完成再通知RocketMQ消费完成
17.相关文章
http://dy.163.com/v2/article/detail/E35QBB2053168IW.html
https://blog.csdn.net/mr253727942/article/details/55805876?utm_source=tuicool