这次分享,你将获得消息中间件到底是什么?
同步调用
A、B、C三个系统,实现一个功能的调用链是:A调用B,B又调用C,A要返回结果,必须等B返回,B又等C返回,这种模式其实就是所谓的“同步调用”。
依托消息中间件如何实现异步?
1、引入了MQ后,用来的依赖关系转移了,从系统之间的依赖,变成系统都依赖MQ
A调用B,只需要向MQ发送一条消息,A就认为自己的工作完成了。不用像之前调用B一样等着B处理一堆的业务逻辑和数据库操作。
B会从MQ中读取A发送的特定消息,完成自己该做的事情。
这样就实现了A异步调用B
2、系统B什么时候收到通知,B什么时候去干自己的事情,这个A系统不需要去关心。
MQ是用来干嘛的?有什么作用?
作用:异步提升性能、系统解耦、流量消峰
提升性能:
引入MQ之前,用户同步调用耗时220ms
一个请求调用了A、B两个系统,执行业务逻辑各需要20 、200毫秒,那么处理这个请求一共需要220毫秒
引入MQ之后的效果,耗时20ms
引入MQ后:发送消息给MQ的速度是很快的(没有业务逻辑、没有数据库操作),所以引入MQ后,20多毫秒就可以返回结果给用户了。
系统解耦:
引入MQ前
系统A和系统B通过同步调用的模式耦合在了一起,一旦系统B出现故障,很可能会影响系统A也有故障
而且系统A还得去关心系统B的故障,去处理对应的异常,这是很麻烦的。
引入MQ之后
引入MQ后:B如果出现了故障,对系统A根本没影响,系统A也感觉不到,B自己处理自己的问题!
流量消峰:
引入MQ之前
如果高并发访问系统A(A没有数据库操作),A调用B(B有数据库操作),那么瓶颈在B,因为数据库操作是比较耗时的。
同样的机器配置下,如果数据库可以抗每秒6000请求,MQ至少可以抗每秒几万请求。因为数据库复杂,需要支持事务、复杂的SQL查询等
引入MQ后,扛住高并发的写QPS
引入MQ后:A系统依赖支持高并发的MQ,B也依赖MQ,此时B可以用自己的合适的速度访问MQ,即B系统流量被消峰了。整个系统的性能由A决定,而不速度慢的B决定
我们为什么选择RocketMQ消息中间件?
常用mq介绍及对比
RabbitMQ – Erlang开发,社区大,功能丰富
Kafka – Scala开发,支持批量发送消费,适合日志分发
RocketMQ – Java开发,原理跟Kafka类似,但是更适合做业务MQ,阿里巴巴开源,经过大规模校验
ActiveMQ、ZeroMQ等等很多
RocketMQ各⻆⾊以及概念术语
Producer
⽣产者。发送消息的客户端⻆⾊。发送消息的时候需要指定Topic。
Consumer
消费者。消费消息的客户端⻆⾊。通常是后台处理异步消费的系统。
RocketMQ中Consumer有两种实现:PushConsumer和PullConsumer。
PushConsumer
推送模式(虽然RocketMQ使⽤的是⻓轮询)的消费者。消息的能及时被消
费。使⽤⾮常简单,内部已处理如线程池消费、流控、负载均衡、异常处理
等等的各种场景。
PullConsumer
拉取模式的消费者。应⽤主动控制拉取的时机,怎么拉取,怎么消费等。主
动权更⾼。但要⾃⼰处理各种场景。
Topic
标识⼀类消息的逻辑名字,消息的逻辑管理单位。⽆论消息⽣产还是消费,
都需要指定Topic。
Tag
RocketMQ⽀持给在发送的时候给topic打tag,同⼀个topic的消息虽然逻辑管
理是⼀样的。但是消费topic1的时候,如果你订阅的时候指定的是tagA,那么
tagB的消息将不会投递。
Message Queue
简称Queue或Q。消息物理管理单位。⼀个Topic将有若⼲个Q。若Topic同时
创建在不同的Broker,则不同的broker上都有若⼲Q,消息将物理地存储落在
不同Broker结点上,具有⽔平扩展的能⼒。
RocketMQ物理部署结构
NameServer
NameServer的主要作用是为消息的生产者和消息消费者提供关于主题Topic的路由信息,那么NameServer需要存储路由的基础信息,还要管理Broker节点,包括路由注册、路由删除等。
**topicQueueTable:**Topic消息队列路由信息,消息发送时根据路由表进行负载均衡
**brokerAddrTable:**Broker基础信息,包括brokerName、所属集群名称、主备Broker地址
**clusterAddrTable:**Broker集群信息,存储集群中所有Broker名称
**brokerLiveTable:**Broker状态信息,NameServer每次收到心跳包是会替换该信息
**filterServerTable:**Broker上的FilterServer列表,用于类模式消息过滤。
RocketMQ基于定于发布机制,一个Topic拥有多个消息队列,一个Broker为每一个主题创建4个读队列和4个写队列。多个Broker组成一个集群,集群由相同的多台Broker组成Master-Slave架构,brokerId为0代表Master,大于0为Slave。BrokerLiveInfo中的lastUpdateTimestamp存储上次收到Broker心跳包的时间。
总结:
Name Server 路由发现与删除机制就介绍到这里了,我们会发现这种设计会存在这样
种情况: NameServer 需要等 Broker 失效至少 120s 才能将该 Broker 从路由表中移除掉,那
如果在 Broker 故障期间,消息生产者 Producer 根据主题获取到的路由信息包含已经看机的
Broker ,会导致消息发送失败,那这种情况怎么办?
Producer流程
发送消息负载均衡
Producer端,每个实例在发消息的时候,默认会轮询所有的message queue发送,以达到让消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就发送到不同的broker下,如下图:
图中箭头线条上的标号代表顺序,发布方会把第一条消息发送至 Queue 0,然后第二条消息发送至 Queue 1,以此类推。
- 默认发送超时时间为3秒
- 默认发送的数据传输方式为同步,即等待数据的返回
- 默认消息发送失败重试次数为3次
- 在3+1秒内,重试3次,如果还不成功将会抛出异常,但是因为一次发送的超时时间为3秒,如果超时可能只能重试2次
消息发送流程
总结:
1 )消息生产者启动流程
2)消息队列负载机制
3)消息发送异常机制
事务消息流程
·事务消息实现思想
prepare/commit/rollback
·事务消息发送流程
·事务消息提交或回滚
·事务消息回查事务状态
如下图所示:
发送顺序消息流程
Consumer流程
订阅消息Rebalance负载均衡
定时消息原理
消息存储设计
RocketMQ的消息存储是由consume queue和commit log配合完成的。
1、Consume Queue
consume queue是消息的逻辑队列,相当于字典的目录,用来指定消息在物理文件commit log上的位置。
我们可以在配置中指定consumequeue与commitlog存储的目录
每个topic下的每个queue都有一个对应的consumequeue文件,比如:
${rocketmq.home}/store/consumequeue/${topicName}/${queueId}/${fileName}
Consume Queue文件组织,如图所示:
2、commitlog
存储路径默认为$HOME/store/commitlog,可以在broker启动时通过storePathCommitLog设置,存储的文件如下图所示,默认大小为1G:
RocketMQ的最佳实践
1、一个应用尽可能用一个 Topic,消息子类型用 tags 来标识
2、send 消息方法,只要不抛异常,就代表发送成功。
3、对于消息不可丢失应用,务必要有消息重发机制
4、消费过程要做到幂等(即消费端去重)
5、消费打印日志
消息重复的常见问题
1、producer发送端相关:
2、broker master 挂了,client 是OK的:
3、broker OK的,client也挂了:
4、broker 挂了,client也挂了
5、nameserver挂了的影响
架构设计亮点:
1、master broker是如何将消息同步到slave broker?
Broker一般是`Master-Slave`模式部署,就是一个Master对应一个Slave
Slave Broker不停的发送请求到Master Broker去拉取消息
Slave Broker也会向所有的NameServer进行注册和默认30s发送心跳包
2、master broker 挂了怎么办
3、基于Broker读写分离架构读取消息的原理
4、CommitLog基于os cache实现写入性能优化
5、ConsumeQueue基于os cache实现读取性能优化的原理
6、基于Dledger实现RocketMQ高可用自动切换
若有收获,就点个赞吧