既然你说你用rocketmq,那么我问你当集群启动的时候它的工作流程是怎么样的
- 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
- Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
- 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
- Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
- Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性?
首先我们来看看在消息队列的各个组件中,有哪些组件会出现不幂等
- 生产者已把消息发送到mq,在mq给生产者返回ack的时候网络中断,故生产者未收到确定信息,生产者认为消息未发送成功,但实际情况是,mq已成功接收到了消息,在网络重连后,生产者会重新发送刚才的消息,造成mq接收了重复的消息
- 消费者在消费mq中的消息时,mq已把消息发送给消费者,消费者在给mq返回ack时网络中断,故mq未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息;
解决方案
- 第一个就是生产者,我们必须保证我们只有一个消息发送到了队列中,可以通过一个唯一的id来保证,当然这种情况是非常小的
- 第二个就是消费者,也可利用mq的该id来判断,或者可按自己的规则生成一个全局唯一id,每次消费消息时用该id先判断该消息是否已消费过,至于实现方式有很多,redis 数据库等等都行
如何保证消息的顺序消费
- 生产者必须要将所有的消息顺序的写入到一个队列中。
- 然后消费者的话,就只能保证一个消费者,这样的话就能实现顺序消费了,但是顺序消费的坏处就是我们的吞吐量要下降
如何保证消息的可靠性传输?或者说,如何处理消息丢失的问题?
数据的丢失问题,可能出现在生产者、MQ、消费者中,咱们从 RabbitMQ 和 RocketMQ 分别来分析一下吧。
RabbitMQ
- 生产者弄丢了数据
生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。 此时可以选择用 RabbitMQ 提供的事务功能或者是confirm 机制 事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调你的一个接口通知你这个消息接收到了。
- RabbitMQ 弄丢了数据
就是 RabbitMQ 自己弄丢了数据,这个你必须开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。
- 消费端弄丢了数据
RabbitMQ 如果丢失了数据,主要是因为你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,RabbitMQ 认为你都消费了,这数据就丢了。 这个时候得用 RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动 ack,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。
RocketMQ
可以从三个方面来分析rocket的消息可靠性
- Producer端消息丢失
- producer端防止消息发送失败,可以采用同步阻塞式的发送(也就是发送同步消息),同步的检查Brocker返回的状态是否持久化成功,发送超时或者失败,则会默认重试2次,rocker选择了确保消息一定发送成功,但有可能发生重复投递
- 如果是异步发送消息,会有一个回调接口,当brocker存储成功或者失败的时候,也可以在这里根据返回状态来决定是否需要重试(当然这个是需要我们自己来实现的)
- Brocker端消息丢失
- rocketmq一般都是先把消息写到PageCache中,然后再持久化到磁盘上,数据从pagecache刷新到磁盘有两种方式,同步和异步
- 同步刷盘方式:消息写入内存的 PageCache后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。这种方式可以保证数据绝对安全,但是吞吐量不大。
- 异步刷盘方式(默认):消息写入到内存的 PageCache中,就立刻给客户端返回写操作成功,当 PageCache中的消息积累到一定的量时,触发一次写操作,将 PageCache中的消息写入到磁盘中。这种方式吞吐量大,性能高,但是 PageCache中的数据可能丢失,不能保证数据绝对的安全。
- Cousmer端消息丢失
- cousmer端默认是消息之后自动返回消费成功确认ack,但是这时如果我们的程序执行失败了,数据不就丢失了吗?
- 所以我们可以将自动提交(AutoCommit)消费响应,设置为在代码中手动提交,只有真正消费成功之后再通知brocker消费成功,然后更新消费唯一offset或者删除brocker中的消息
大量消息在 mq 里积压了几个小时了还没解决此时应该怎么办
- 第一条,为啥会出现消息大量积压,是本身我们的生产者的消息产多了,还是我们的消费者出现问题了,先弄清楚原因先
- 第二 如果是我们生产的消息多了,那么我们可以多加几个消费者去消费消息
- 第三,如果说是我们的消费者出现了问题,那么我首先肯定是要修复消费者的bug,但是有一点就是就算我们修复了bug,但是要到生产的流程来说还要花几个小时才能消费完,这时候,我们要零时写一个逻辑,把消费者的耗时逻辑直接确认,然后把消息转到另外一个队列,另外一个队列用10背速度去消费,等转发完成之后,换成正常的消费逻辑,这样就可以尽快的使业务得到正常的使用了。
说说延时队列呗
延时队列,首先,它是一种队列,队列意味着内部的元素是有序的,元素出队和入队是有方向性的,元素从一端进入,从另一端取出。 其次,延时队列,最重要的特性就体现在它的延时属性上,跟普通的队列不一样的是,普通队列中的元素总是等着希望被早点取出处理,而延时队列中的元素则是希望被在指定时间得到取出和处理,所以延时队列中的元素是都是带时间属性的,通常来说是需要被处理的消息或者任务。 简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
RabbitMQ中的一个高级特性——TTL(Time To Live),当我们有一些特殊的场景,比如注册几天后,没有购买就给他们发优惠卷,这些运营手段,就可以用到这个延时队列了,在rabbitmq里面就是ttl+死信队列来实现的。
然后RocketMQ的延时队列的话用处就不大了,rocketmq实现的延时队列只支持特定的延时时间段,1s,5s,10s,...2h,不能支持任意时间段的延时
深入聊聊RocketMQ呗,因为rabbitmq的源码不是Java,所以不好问,但是RocketMQ的源码还是要大致了解了解
聊聊消息的存储和发送
- 消息存储
磁盘如果使用得当,磁盘的速度完全可以匹配上网络 的数据传输速度。目前的高性能磁盘,顺序写速度可以达到600MB/s, 超过了一般网卡的传输速度。但是磁盘随机写的速度只有大概100KB/s,和顺序写的性能相差6000倍!因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。RocketMQ的消息用顺序写,保证了消息存储的速度。
- 消息发送
Linux操作系统分为【用户态】和【内核态】,文件操作、网络操作需要涉及这两种形态的切换,免不了进行数据复制。
一台服务器 把本机磁盘文件的内容发送到客户端,一般分为两个步骤:
1)read;读取本地文件内容;
2)write;将读取的内容通过网络发送出去。
这两个看似简单的操作,实际进行了4 次数据复制,分别是:
- 从磁盘复制数据到内核态内存;
- 从内核态内存复 制到用户态内存;
- 然后从用户态 内存复制到网络驱动的内核态内存;
- 最后是从网络驱动的内核态内存复 制到网卡中进行传输。
通过使用mmap的方式,可以省去向用户态的内存复制,提高速度。这种机制在Java中是通过MappedByteBuffer实现的
RocketMQ充分利用了上述特性,也就是所谓的“零拷贝”技术,提高消息存盘和网络发送的速度。
这里需要注意的是,采用MappedByteBuffer这种内存映射的方式有几个限制,其中之一是一次只能映射1.5~2G 的文件至用户态的虚拟内存,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因了
聊聊分布式事务呗
如何解释分布式事务呢?事务大家都知道吧?要么都执行要么都不执行 。在同一个系统中我们可以轻松地实现事务,但是在分布式架构中,我们有很多服务是部署在不同系统之间的,而不同服务之间又需要进行调用。比如此时我下订单然后增加积分,如果保证不了分布式事务的话,就会出现A系统下了订单,但是B系统增加积分失败或者A系统没有下订单,B系统却增加了积分。
如今比较常见的分布式事务实现有 2PC、TCC 和 事务最终一致性,一般我们除了强一致性的场景,一般用的可靠消息最终一致性,那么对于RocketMQ 它是怎么实现的呢?
在 RocketMQ 中使用的是 事务消息加上事务反查机制 来解决分布式事务问题的
在第一步发送的 half 消息 ,它的意思是 在事务提交之前,对于消费者来说,这个消息是不可见的 。
你可以试想一下,如果没有从第5步开始的 事务反查机制 ,如果出现网路波动第4步没有发送成功,这样就会产生 MQ 不知道是不是需要给消费者消费的问题,他就像一个无头苍蝇一样。在 RocketMQ 中就是使用的上述的事务反查来解决的
- 事务消息发送及提交
- 发送消息(half消息)。
- 服务端响应消息写入结果。
- 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
- 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
- 事务补偿
- 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
- Producer收到回查消息,检查回查消息对应的本地事务的状态
- 根据本地事务状态,重新Commit或者Rollback
- 其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
聊聊RocketMQ的底层存储机制
RocketMQ 是如何设计它的存储结构了。我首先想大家介绍 RocketMQ 消息存储架构中的三大角色——CommitLog 、ConsumeQueue 和 IndexFile 。
- CommitLog: 消息主体以及元数据的存储主体,存储 Producer 端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件。
- ConsumeQueue: 消息消费队列,引入的目的主要是提高消息消费的性能,由于RocketMQ 是基于主题 Topic 的订阅模式,消息消费是针对主题进行的,如果要遍历 commitlog 文件中根据 Topic 检索消息是非常低效的。Consumer 即可根据 ConsumeQueue 来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定 Topic 下的队列消息在 CommitLog 中的起始物理偏移量 offset ,消息大小 size 和消息 Tag 的 HashCode 值。consumequeue 文件可以看成是基于 topic 的 commitlog 索引文件,故 consumequeue 文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样 consumequeue 文件采取定长设计,每一个条目共20个字节,分别为8字节的 commitlog 物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个 ConsumeQueue文件大小约5.72M;
- IndexFile: IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。
结束
接下来复习下ssm框架