2021-Java后端工程师面试指南-(消息队列)(下)

简介: 前言文本已收录至我的GitHub仓库,欢迎Star:github.com/bin39232820…种一棵树最好的时间是十年前,其次是现在

既然你说你用rocketmq,那么我问你当集群启动的时候它的工作流程是怎么样的

  • 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
  • Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
  • 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
  • Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
  • Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。


如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性?

首先我们来看看在消息队列的各个组件中,有哪些组件会出现不幂等

  • 生产者已把消息发送到mq,在mq给生产者返回ack的时候网络中断,故生产者未收到确定信息,生产者认为消息未发送成功,但实际情况是,mq已成功接收到了消息,在网络重连后,生产者会重新发送刚才的消息,造成mq接收了重复的消息
  • 消费者在消费mq中的消息时,mq已把消息发送给消费者,消费者在给mq返回ack时网络中断,故mq未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息;

解决方案

  • 第一个就是生产者,我们必须保证我们只有一个消息发送到了队列中,可以通过一个唯一的id来保证,当然这种情况是非常小的
  • 第二个就是消费者,也可利用mq的该id来判断,或者可按自己的规则生成一个全局唯一id,每次消费消息时用该id先判断该消息是否已消费过,至于实现方式有很多,redis 数据库等等都行


如何保证消息的顺序消费

  • 生产者必须要将所有的消息顺序的写入到一个队列中。
  • 然后消费者的话,就只能保证一个消费者,这样的话就能实现顺序消费了,但是顺序消费的坏处就是我们的吞吐量要下降


如何保证消息的可靠性传输?或者说,如何处理消息丢失的问题?

数据的丢失问题,可能出现在生产者、MQ、消费者中,咱们从 RabbitMQ 和 RocketMQ 分别来分析一下吧。

RabbitMQ

网络异常,图片无法展示
|


  • 生产者弄丢了数据

生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。 此时可以选择用 RabbitMQ 提供的事务功能或者是confirm 机制 事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调你的一个接口通知你这个消息接收到了。

  • RabbitMQ 弄丢了数据

就是 RabbitMQ 自己弄丢了数据,这个你必须开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。

  • 消费端弄丢了数据

RabbitMQ 如果丢失了数据,主要是因为你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,RabbitMQ 认为你都消费了,这数据就丢了。 这个时候得用 RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动 ack,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。


RocketMQ

可以从三个方面来分析rocket的消息可靠性

  • Producer端消息丢失
  • producer端防止消息发送失败,可以采用同步阻塞式的发送(也就是发送同步消息),同步的检查Brocker返回的状态是否持久化成功,发送超时或者失败,则会默认重试2次,rocker选择了确保消息一定发送成功,但有可能发生重复投递
  • 如果是异步发送消息,会有一个回调接口,当brocker存储成功或者失败的时候,也可以在这里根据返回状态来决定是否需要重试(当然这个是需要我们自己来实现的)
  • Brocker端消息丢失
  • rocketmq一般都是先把消息写到PageCache中,然后再持久化到磁盘上,数据从pagecache刷新到磁盘有两种方式,同步和异步
  • 同步刷盘方式:消息写入内存的 PageCache后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。这种方式可以保证数据绝对安全,但是吞吐量不大。
  • 异步刷盘方式(默认):消息写入到内存的 PageCache中,就立刻给客户端返回写操作成功,当 PageCache中的消息积累到一定的量时,触发一次写操作,将 PageCache中的消息写入到磁盘中。这种方式吞吐量大,性能高,但是 PageCache中的数据可能丢失,不能保证数据绝对的安全。
  • Cousmer端消息丢失
  • cousmer端默认是消息之后自动返回消费成功确认ack,但是这时如果我们的程序执行失败了,数据不就丢失了吗?
  • 所以我们可以将自动提交(AutoCommit)消费响应,设置为在代码中手动提交,只有真正消费成功之后再通知brocker消费成功,然后更新消费唯一offset或者删除brocker中的消息


大量消息在 mq 里积压了几个小时了还没解决此时应该怎么办

  • 第一条,为啥会出现消息大量积压,是本身我们的生产者的消息产多了,还是我们的消费者出现问题了,先弄清楚原因先
  • 第二 如果是我们生产的消息多了,那么我们可以多加几个消费者去消费消息
  • 第三,如果说是我们的消费者出现了问题,那么我首先肯定是要修复消费者的bug,但是有一点就是就算我们修复了bug,但是要到生产的流程来说还要花几个小时才能消费完,这时候,我们要零时写一个逻辑,把消费者的耗时逻辑直接确认,然后把消息转到另外一个队列,另外一个队列用10背速度去消费,等转发完成之后,换成正常的消费逻辑,这样就可以尽快的使业务得到正常的使用了。


说说延时队列呗

延时队列,首先,它是一种队列,队列意味着内部的元素是有序的,元素出队和入队是有方向性的,元素从一端进入,从另一端取出。 其次,延时队列,最重要的特性就体现在它的延时属性上,跟普通的队列不一样的是,普通队列中的元素总是等着希望被早点取出处理,而延时队列中的元素则是希望被在指定时间得到取出和处理,所以延时队列中的元素是都是带时间属性的,通常来说是需要被处理的消息或者任务。 简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

RabbitMQ中的一个高级特性——TTL(Time To Live),当我们有一些特殊的场景,比如注册几天后,没有购买就给他们发优惠卷,这些运营手段,就可以用到这个延时队列了,在rabbitmq里面就是ttl+死信队列来实现的。


网络异常,图片无法展示
|

然后RocketMQ的延时队列的话用处就不大了,rocketmq实现的延时队列只支持特定的延时时间段,1s,5s,10s,...2h,不能支持任意时间段的延时


深入聊聊RocketMQ呗,因为rabbitmq的源码不是Java,所以不好问,但是RocketMQ的源码还是要大致了解了解


聊聊消息的存储和发送

  • 消息存储

磁盘如果使用得当,磁盘的速度完全可以匹配上网络 的数据传输速度。目前的高性能磁盘,顺序写速度可以达到600MB/s, 超过了一般网卡的传输速度。但是磁盘随机写的速度只有大概100KB/s,和顺序写的性能相差6000倍!因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。RocketMQ的消息用顺序写,保证了消息存储的速度。

  • 消息发送

Linux操作系统分为【用户态】和【内核态】,文件操作、网络操作需要涉及这两种形态的切换,免不了进行数据复制。

一台服务器 把本机磁盘文件的内容发送到客户端,一般分为两个步骤:

1)read;读取本地文件内容;

2)write;将读取的内容通过网络发送出去。


这两个看似简单的操作,实际进行了4 次数据复制,分别是:

  1. 从磁盘复制数据到内核态内存;
  2. 从内核态内存复 制到用户态内存;
  3. 然后从用户态 内存复制到网络驱动的内核态内存;
  4. 最后是从网络驱动的内核态内存复 制到网卡中进行传输。

网络异常,图片无法展示
|

通过使用mmap的方式,可以省去向用户态的内存复制,提高速度。这种机制在Java中是通过MappedByteBuffer实现的

RocketMQ充分利用了上述特性,也就是所谓的“零拷贝”技术,提高消息存盘和网络发送的速度。

这里需要注意的是,采用MappedByteBuffer这种内存映射的方式有几个限制,其中之一是一次只能映射1.5~2G 的文件至用户态的虚拟内存,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因了


聊聊分布式事务呗

如何解释分布式事务呢?事务大家都知道吧?要么都执行要么都不执行 。在同一个系统中我们可以轻松地实现事务,但是在分布式架构中,我们有很多服务是部署在不同系统之间的,而不同服务之间又需要进行调用。比如此时我下订单然后增加积分,如果保证不了分布式事务的话,就会出现A系统下了订单,但是B系统增加积分失败或者A系统没有下订单,B系统却增加了积分。

如今比较常见的分布式事务实现有 2PC、TCC 和 事务最终一致性,一般我们除了强一致性的场景,一般用的可靠消息最终一致性,那么对于RocketMQ 它是怎么实现的呢?

在 RocketMQ 中使用的是 事务消息加上事务反查机制 来解决分布式事务问题的


网络异常,图片无法展示
|

在第一步发送的 half 消息 ,它的意思是 在事务提交之前,对于消费者来说,这个消息是不可见的 。

你可以试想一下,如果没有从第5步开始的 事务反查机制 ,如果出现网路波动第4步没有发送成功,这样就会产生 MQ 不知道是不是需要给消费者消费的问题,他就像一个无头苍蝇一样。在 RocketMQ 中就是使用的上述的事务反查来解决的


  • 事务消息发送及提交
  • 发送消息(half消息)。
  • 服务端响应消息写入结果。
  • 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
  • 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
  • 事务补偿
  • 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
  • Producer收到回查消息,检查回查消息对应的本地事务的状态
  • 根据本地事务状态,重新Commit或者Rollback
  • 其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。


聊聊RocketMQ的底层存储机制

RocketMQ 是如何设计它的存储结构了。我首先想大家介绍 RocketMQ 消息存储架构中的三大角色——CommitLog 、ConsumeQueue 和 IndexFile 。

  • CommitLog: 消息主体以及元数据的存储主体,存储 Producer 端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件。
  • ConsumeQueue: 消息消费队列,引入的目的主要是提高消息消费的性能,由于RocketMQ 是基于主题 Topic 的订阅模式,消息消费是针对主题进行的,如果要遍历 commitlog 文件中根据 Topic 检索消息是非常低效的。Consumer 即可根据 ConsumeQueue 来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定 Topic 下的队列消息在 CommitLog 中的起始物理偏移量 offset ,消息大小 size 和消息 Tag 的 HashCode 值。consumequeue 文件可以看成是基于 topic 的 commitlog 索引文件,故 consumequeue 文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样 consumequeue 文件采取定长设计,每一个条目共20个字节,分别为8字节的 commitlog 物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个 ConsumeQueue文件大小约5.72M;
  • IndexFile: IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。


结束


接下来复习下ssm框架

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
1月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
67 2
|
18天前
|
Java 程序员
Java社招面试题:& 和 && 的区别,HR的套路险些让我翻车!
小米,29岁程序员,分享了一次面试经历,详细解析了Java中&和&&的区别及应用场景,展示了扎实的基础知识和良好的应变能力,最终成功获得Offer。
48 14
|
29天前
|
存储 缓存 算法
面试官:单核 CPU 支持 Java 多线程吗?为什么?被问懵了!
本文介绍了多线程环境下的几个关键概念,包括时间片、超线程、上下文切换及其影响因素,以及线程调度的两种方式——抢占式调度和协同式调度。文章还讨论了减少上下文切换次数以提高多线程程序效率的方法,如无锁并发编程、使用CAS算法等,并提出了合理的线程数量配置策略,以平衡CPU利用率和线程切换开销。
面试官:单核 CPU 支持 Java 多线程吗?为什么?被问懵了!
|
1月前
|
存储 算法 Java
大厂面试高频:什么是自旋锁?Java 实现自旋锁的原理?
本文详解自旋锁的概念、优缺点、使用场景及Java实现。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:什么是自旋锁?Java 实现自旋锁的原理?
|
1月前
|
存储 缓存 Oracle
Java I/O流面试之道
NIO的出现在于提高IO的速度,它相比传统的输入/输出流速度更快。NIO通过管道Channel和缓冲器Buffer来处理数据,可以把管道当成一个矿藏,缓冲器就是矿藏里的卡车。程序通过管道里的缓冲器进行数据交互,而不直接处理数据。程序要么从缓冲器获取数据,要么输入数据到缓冲器。
Java I/O流面试之道
|
23天前
|
Java 编译器 程序员
Java面试高频题:用最优解法算出2乘以8!
本文探讨了面试中一个看似简单的数学问题——如何高效计算2×8。从直接使用乘法、位运算优化、编译器优化、加法实现到大整数场景下的处理,全面解析了不同方法的原理和适用场景,帮助读者深入理解计算效率优化的重要性。
29 6
|
1月前
|
存储 缓存 Java
大厂面试必看!Java基本数据类型和包装类的那些坑
本文介绍了Java中的基本数据类型和包装类,包括整数类型、浮点数类型、字符类型和布尔类型。详细讲解了每种类型的特性和应用场景,并探讨了包装类的引入原因、装箱与拆箱机制以及缓存机制。最后总结了面试中常见的相关考点,帮助读者更好地理解和应对面试中的问题。
54 4
|
1月前
|
存储 Java 程序员
Java基础的灵魂——Object类方法详解(社招面试不踩坑)
本文介绍了Java中`Object`类的几个重要方法,包括`toString`、`equals`、`hashCode`、`finalize`、`clone`、`getClass`、`notify`和`wait`。这些方法是面试中的常考点,掌握它们有助于理解Java对象的行为和实现多线程编程。作者通过具体示例和应用场景,详细解析了每个方法的作用和重写技巧,帮助读者更好地应对面试和技术开发。
107 4
|
1月前
|
算法 Java
JAVA 二叉树面试题
JAVA 二叉树面试题
18 0
|
4月前
|
存储 Java
【IO面试题 四】、介绍一下Java的序列化与反序列化
Java的序列化与反序列化允许对象通过实现Serializable接口转换成字节序列并存储或传输,之后可以通过ObjectInputStream和ObjectOutputStream的方法将这些字节序列恢复成对象。