消息队列设计精要总结之高级特性篇
队列高级特性设计
上面都是些消息队列基本功能的实现,下面来看一些关于消息队列特性相关的内容,不管可靠投递/消息丢失与重复以及事务乃至于性能,不是每个消息队列都会照顾到,所以要依照业务的需求,来仔细衡量各种特性实现的成本,利弊,最终做出最为合理的设计。
可靠投递(最终一致性)
这是个激动人心的话题,完全不丢消息,究竟可不可能?答案是,完全可能,前提是消息可能会重复,并且,在异常情况下,要接受消息的延迟。
1 完全不丢消息的方案:方案说简单也简单,就是每当要发生不可靠的事情(RPC等)之前,先将消息落地,然后发送。当失败或者不知道成功失败(比如超时)时,消息状态是待发送,定时任务不停轮询所有待发送消息,最终一定可以送达。具体来说:
- producer往broker发送消息之前,需要做一次落地。
- 请求到server后,server确保数据落地后再告诉客户端发送成功。
- 支持广播的消息队列需要对每个待发送的endpoint,持久化一个发送状态,直到所有endpoint状态都OK才可删除消息。
2 消息丢失与消息重复:对于各种不确定(超时、down机、消息没有送达、送达后数据没落地、数据落地了回复没收到),其实对于发送方来说,都是一件事情,就是消息没有送达。
3 重复和丢失,必须接受一个:重推消息所面临的问题就是消息重复。重复和丢失就像两个噩梦,你必须要面对一个。好在消息重复还有处理的机会,消息丢失再想找回就难了。(但是重复有处理方案,丢失却是不能忍受的)
Anyway,作为一个成熟的消息队列,应该尽量在各个环节减少重复投递的可能性,不能因为重复有解决方案就放纵的乱投递。
3 根据业务场景决定处理模式:最后说一句,不是所有的系统都要求最终一致性或者可靠投递,比如一个论坛系统、一个招聘系统。一个重复的简历或话题被发布,可能比丢失了一个发布显得更让用户无法接受。不断重复一句话,任何基础组件要服务于业务场景。消费确认
1 消费者接收消息:当broker把消息投递给消费者后,消费者可以立即响应我收到了这个消息。
但收到了这个消息只是第一步,我能不能处理这个消息却不一定。
或许因为消费能力的问题,系统的负荷已经不能处理这个消息。
或者是刚才状态机里面提到的消息不是我想要接收的消息,主动要求重发。
2 消息送达与处理的解耦:把消息的送达和消息的处理分开,这样才真正的实现了消息队列的本质-解耦。所以,允许消费者主动进行消费确认是必要的。当然,对于没有特殊逻辑的消息,默认Auto Ack也是可以的,但一定要允许消费方主动ack。
3 处理ack和reject的方式:对于正确消费ack的,没什么特殊的。但是对于reject和error,需要特别说明。reject这件事情,往往业务方是无法感知到的,系统的流量和健康状况的评估,以及处理能力的评估是一件非常复杂的事情。举个极端的例子,收到一个消息开始build索引,可能这个消息要处理半个小时,但消息量却是非常的小。(意思就是,消息虽小,但是要得到reject的结果却需要等待任务执行玩才能知道,这样非常低效,所以这些消息建议直接拒绝,等待重发)
所以reject这块建议做成滑动窗口/线程池类似的模型来控制(需要保留这些被拒绝的消息,然后通过窗口进行重发,直到这些暂存消息被回复完毕),消费能力不匹配的时候,直接拒绝,过一段时间重发,减少业务的负担。
4 业务出错需要主动ack error:但业务出错这件事情是只有业务方自己知道的,就像上文提到的状态机等等。这时应该允许业务方主动ack error,并可以与broker约定下次投递的时间。
重复消息和顺序消息
1 顺序消息要求:上文谈到重复消息是不可能100%避免的,除非可以允许丢失,那么,顺序消息能否100%满足呢? 答案是可以,但条件更为苛刻:
- 允许消息丢失。
- 从发送方到服务方到接受者都是单点单线程。
所以绝对的顺序消息基本上是不能实现的,当然在METAQ/Kafka等pull模型的消息队列中,单线程生产/消费,排除消息丢失,也是一种顺序消息的解决方案。
谈到重复消息,主要是两个话题:
- 如何鉴别消息重复,并幂等的处理重复消息。
- 一个消息队列如何尽量减少重复消息的投递。
3 唯一消息id的实现:先来看看第一个话题,每一个消息应该有它的唯一身份。不管是业务方自定义的,还是根据IP/PID/时间戳生成的MessageId,如果有地方记录这个MessageId,消息到来是能够进行比对就能完成重复的鉴定。
数据库的唯一键/bloom filter/分布式KV中的key,都是不错的选择。由于消息不能被永久存储,所以理论上都存在消息从持久化存储移除的瞬间上游还在投递的可能(上游因种种原因投递失败,不停重试,都到了下游清理消息的时间)。这种事情都是异常情况下才会发生的,毕竟是小众情况。两分钟消息都还没送达,多送一次又能怎样呢?幂等的处理消息是一门艺术,因为种种原因重复消息或者错乱的消息还是来到了,说两种通用的解决方案:
- 版本号。
- 状态机。
版本号解决消息重复(记录最新版本号,每次只接受更新的消息)
1 举个简单的例子,一个产品的状态有上线/下线状态。如果消息1是下线,消息2是上线。不巧消息1判重失败,被投递了两次,且第二次发生在2之后,如果不做重复性判断,显然最终状态是错误的。
但是,如果每个消息自带一个版本号。上游发送的时候,标记消息1版本号是1,消息2版本号是2。如果再发送下线消息,则版本号标记为3。下游对于每次消息的处理,同时维护一个版本号。
每次只接受比当前版本号大的消息。初始版本为0,当消息1到达时,将版本号更新为1。消息2到来时,因为版本号>1.可以接收,同时更新版本号为2.当另一条下线消息到来时,如果版本号是3.则是真实的下线消息。如果是1,则是重复投递的消息。
2 解决消息乱序的方法(类似TCP的数据报有序处理方式):如果业务方只关心消息重复不重复,那么问题就已经解决了。但很多时候另一个头疼的问题来了,就是消息顺序如果和想象的顺序不一致。比如应该的顺序是12,到来的顺序是21。则最后会发生状态错误。
如果到来的顺序是21,则先把2存起来,待2到来后,再处理1,这样重复性和顺序性要求就都达到了。
状态机
1 使用版本号的弊病:基于版本号来处理重复和顺序消息听起来是个不错的主意,但凡事总有瑕疵。使用版本号的最大问题是:
- 对发送方必须要求消息带业务版本号。
- 下游必须存储消息的版本号,对于要严格保证顺序的。
还不能只存储最新的版本号的消息,要把乱序到来的消息都存储起来。而且必须要对此做出处理。试想一个永不过期的"session",比如一个物品的状态,会不停流转于上下线。那么中间环节的所有存储
就必须保留,直到在某个版本号之前的版本一个不丢的到来,成本太高。
2 状态机解决方案:就刚才的场景看,如果消息没有版本号,该怎么解决呢?业务方只需要自己维护一个状态机,定义各种状态的流转关系。例如,"下线"状态只允许接收"上线"消息,“上线”状态只能接收“下线消息”,如果上线收到上线消息,或者下线收到下线消息,在消息不丢失和上游业务正确的前提下。
要么是消息发重了,要么是顺序到达反了。这时消费者只需要把“我不能处理这个消息”告诉投递者,要求投递者过一段时间重发即可。而且重发一定要有次数限制,比如5次,避免死循环,就解决了。
3 举例子说明,假设产品本身状态是下线,1是上线消息,2是下线消息,3是上线消息,正常情况下,消息应该的到来顺序是123,但实际情况下收到的消息状态变成了3123。
那么下游收到3消息的时候,判断状态机流转是下线->上线,可以接收消息。然后收到消息1,发现是上线->上线,拒绝接收,要求重发。然后收到消息2,状态是上线->下线,于是接收这个消息。
此时无论重发的消息1或者3到来,还是可以接收。另外的重发,在一定次数拒绝后停止重发,业务正确。
中间件对于重复消息的处理
回归到消息队列的话题来讲。上述通用的版本号/状态机/ID判重解决方案里,哪些是消息队列该做的、哪些是消息队列不该做业务方处理的呢?其实这里没有一个完全严格的定义,但回到我们的出发点,我们保证不丢失消息的情况下尽量少重复消息,消费顺序不保证。那么重复消息下和乱序消息下业务的正确,应该是由消费方保证的,我们要做的是减少消息发送的重复。
我们无法定义业务方的业务版本号/状态机,如果API里强制需要指定版本号,则显得过于绑架客户了。况且,在消费方维护这么多状态,就涉及到一个消费方的消息落地/多机间的同步消费状态问题,复杂度指数级上升,而且只能解决部分问题。
减少重复消息的关键步骤:
- broker记录MessageId,直到投递成功后清除,重复的ID到来不做处理,这样只要发送者在清除周期内能够感知到消息投递成功,就基本不会在server端产生重复消息。
- 对于server投递到consumer的消息,由于不确定对端是在处理过程中还是消息发送丢失的情况下,有必要记录下投递的IP地址。决定重发之前询问这个IP,消息处理成功了吗?如果询问无果,再重发。
事务
持久性是事务的一个特性,然而只满足持久性却不一定能满足事务的特性。还是拿扣钱/加钱的例子讲。满足事务的一致性特征,则必须要么都不进行,要么都能成功。
解决方案从大方向上有两种:
- 两阶段提交,分布式事务。
- 本地事务,本地落地,补偿发送。
1 分布式事务强一致性:分布式事务存在的最大问题是成本太高,两阶段提交协议,对于仲裁down机或者单点故障,几乎是一个无解的黑洞。对于交易密集型或者I/O密集型的应用,没有办法承受这么高的网络延迟,系统复杂性。
并且成熟的分布式事务一定构建与比较靠谱的商用DB和商用中间件上,成本也太高。
2 本地事务+消息方案:那如何使用本地事务解决分布式事务的问题呢?以本地和业务在一个数据库实例中建表为例子,与扣钱的业务操作同一个事务里,将消息插入本地数据库。如果消息入库失败,则业务回滚;如果消息入库成功,事务提交。
然后发送消息(注意这里可以实时发送,不需要等定时任务检出,以提高消息实时性)。以后的问题就是前文的最终一致性问题所提到的了,只要消息没有发送成功,就一直靠定时任务重试。
3 :本地事务不包含rpc:这里有一个关键的点,本地事务做的,是业务落地和消息落地的事务,而不是业务落地和RPC成功的事务。这里很多人容易混淆,如果是后者,无疑是事务嵌套RPC,是大忌,会有长事务死锁等各种风险。
4 :本地事务要求业务方维护消息表:而消息只要成功落地,很大程度上就没有丢失的风险(磁盘物理损坏除外)。而消息只要投递到服务端确认后本地才做删除,就完成了producer->broker的可靠投递,并且当消息存储异常时,业务也是可以回滚的。
本地事务存在两个最大的使用障碍:
- 配置较为复杂,“绑架”业务方,必须本地数据库实例提供一个库表。
- 对于消息延迟高敏感的业务不适用。
话说回来,不是每个业务都需要强事务的。扣钱和加钱需要事务保证,但下单和生成短信却不需要事务,不能因为要求发短信的消息存储投递失败而要求下单业务回滚。
5 :消息类型选择:所以,一个完整的消息队列应该定义清楚自己可以投递的消息类型,如事务型消息,本地非持久型消息,以及服务端不落地的非可靠消息等。对不同的业务场景做不同的选择。另外事务的使用应该尽量低成本、透明化,可以依托于现有的成熟框架,如Spring的声明式事务做扩展。业务方只需要使用@Transactional标签即可。
性能相关
异步/同步
首先澄清一个概念,异步,同步和oneway是三件事。
异步,归根结底你还是需要关心结果的,但可能不是当时的时间点关心,可以用轮询或者回调等方式处理结果;
同步是需要当时关心的结果的;
而oneway是发出去就不管死活的方式,这种对于某些完全对可靠性没有要求的场景还是适用的,但不是我们重点讨论的范畴。
回归来看,任何的RPC都是存在客户端异步与服务端异步的,而且是可以任意组合的:客户端同步对服务端异步,客户端异步对服务端异步,客户端同步对服务端同步,客户端异步对服务端同步。
1 客户端的同步异步:对于客户端来说,同步与异步主要是拿到一个Result,还是Future(Listenable)的区别。实现方式可以是线程池,NIO或者其他事件机制,这里先不展开讲。2 服务端的同步异步: 服务端异步可能稍微难理解一点,这个是需要RPC协议支持的。参考servlet 3.0规范,服务端可以吐一个future给客户端,并且在future done的时候通知客户端。整个过程可以参考下面的代码:
客户端同步服务端异步。
1 Future<Result> future = request(server);//server立刻返回future 2 synchronized(future){ 3 while(!future.isDone()){ 4 future.wait();//server处理结束后会notify这个future,并修改isdone标志 5 } 6 } 7 return future.get();
客户端同步服务端同步。
1 Result result = request(server);
客户端异步服务端同步(这里用线程池的方式)。
1 Future<Result> future = executor.submit(new Callable(){public void call<Result>(){ 2 result = request(server); 3 }}) 4 return future;
客户端异步服务端异步。
1 Future<Result> future = request(server);//server立刻返回future 2 return future
上面说了这么多,其实是想让大家脱离两个误区:
- RPC只有客户端能做异步,服务端不能。
- 异步只能通过线程池。
那么,服务端使用异步最大的好处是什么呢?说到底,是解放了线程和I/O。试想服务端有一堆I/O等待处理,如果每个请求都需要同步响应,每条消息都需要结果立刻返回,那么就几乎没法做I/O合并
来看第二个误区,返回future的方式不一定只有线程池。换句话说,可以在线程池里面进行同步操作,也可以进行异步操作,也可以不使用线程池使用异步操作(NIO、事件)。
批量发送消息与处理消息
谈到批量就不得不提生产者消费者模型。但生产者消费者模型中最大的痛点是:消费者到底应该何时进行消费。大处着眼来看,消费动作都是事件驱动的。主要事件包括:
- 攒够了一定数量。
- 到达了一定时间。
- 队列里有新的数据到来。
对于及时性要求高的数据,可用采用方式3来完成,比如客户端向服务端投递数据。只要队列有数据,就把队列中的所有数据刷出,否则将自己挂起,等待新数据的到来。
在第一次把队列数据往外刷的过程中,又积攒了一部分数据,第二次又可以形成一个批量。伪代码如下:
1 Executor executor = Executors.newFixedThreadPool(4); 2 final BlockingQueue<Message> queue = new ArrayBlockingQueue<>(); 3 private Runnable task = new Runnable({//这里由于共享队列,Runnable可以复用,故做成全局的 4 public void run(){ 5 List<Message> messages = new ArrayList<>(20); 6 queue.drainTo(messages,20); 7 doSend(messages);//阻塞,在这个过程中会有新的消息到来,如果4个线程都占满,队列就有机会囤新的消息 8 } 9 }); 10 public void send(Message message){ 11 queue.offer(message); 12 executor.submit(task) 13 }
这种方式是消息延迟和批量的一个比较好的平衡,但优先响应低延迟。延迟的最高程度由上一次发送的等待时间决定。但可能造成的问题是发送过快的话批量的大小不够满足性能的极致。
1 Executor executor = Executors.newFixedThreadPool(4); 2 final BlockingQueue<Message> queue = new ArrayBlockingQueue<>(); 3 volatile long last = System.currentMills(); 4 Executors.newSingleThreadScheduledExecutor().submit(new Runnable(){ 5 flush(); 6 },500,500,TimeUnits.MILLS); 7 private Runnable task = new Runnable({//这里由于共享队列,Runnable可以复用,顾做成全局的。 8 public void run(){ 9 List<Message> messages = new ArrayList<>(20); 10 queue.drainTo(messages,20); 11 doSend(messages);//阻塞,在这个过程中会有新的消息到来,如果4个线程都占满,队列就有机会屯新的消息。 12 } 13 }); 14 public void send(Message message){ 15 last = System.currentMills(); 16 queue.offer(message); 17 flush(); 18 } 19 private void flush(){ 20 if(queue.size>200||System.currentMills()-last>200){ 21 executor.submit(task) 22 } 23 }
相反对于可以用适量的延迟来换取高性能的场景来说,用定时/定量二选一的方式可能会更为理想,既到达一定数量才发送,但如果数量一直达不到,也不能干等,有一个时间上限。
具体说来,在上文的submit之前,多判断一个时间和数量,并且Runnable内部维护一个定时器,避免没有新任务到来时旧的任务永远没有机会触发发送条件。对于server端的数据落地,使用这种方式就非常方便。
为啥需要批量:
最后啰嗦几句,曾经有人问我,为什么网络请求小包合并成大包会提高性能?主要原因有两个:
- 减少无谓的请求头,如果你每个请求只有几字节,而头却有几十字节,无疑效率非常低下。
- 减少回复的ack包个数。把请求合并后,ack包数量必然减少,确认和重发的成本就会降低。
push还是pull
上文提到的消息队列,大多是针对push模型的设计。现在市面上有很多经典的也比较成熟的pull模型的消息队列,如Kafka、MetaQ等。这跟JMS中传统的push方式有很大的区别,可谓另辟蹊径。
我们简要分析下push和pull模型各自存在的利弊。
push弊病:慢消费
慢消费无疑是push模型最大的致命伤,穿成流水线来看,如果消费者的速度比发送者的速度慢很多,势必造成消息在broker的堆积。假设这些消息都是有用的无法丢弃的,消息就要一直在broker端保存。当然这还不是最致命的,最致命的是broker给consumer推送一堆consumer无法处理的消息,consumer不是reject就是error,然后来回踢皮球。
反观pull模式,consumer可以按需消费,不用担心自己处理不了的消息来骚扰自己,而broker堆积消息也会相对简单,无需记录每一个要发送消息的状态,只需要维护所有消息的队列和偏移量就可以了。所以对于建立索引等慢消费,消息量有限且到来的速度不均匀的情况,pull模式比较合适。
pull缺陷:消息延迟与忙等
这是pull模式最大的短板。由于主动权在消费方,消费方无法准确地决定何时去拉取最新的消息。如果一次pull取到消息了还可以继续去pull,如果没有pull取到则需要等待一段时间重新pull。
但等待多久就很难判定了。你可能会说,我可以有xx动态pull取时间调整算法,但问题的本质在于,有没有消息到来这件事情决定权不在消费方。也许1分钟内连续来了1000条消息,然后半个小时没有新消息产生,
可能你的算法算出下次最有可能到来的时间点是31分钟之后,或者60分钟之后,结果下条消息10分钟后到了,是不是很让人沮丧?
即使这样,依然存在延迟问题:假设40ms到80ms之间的50ms消息到来,消息就延迟了30ms,而且对于半个小时来一次的消息,这些开销就是白白浪费的。
RocketMq的优化方法:长轮询
在阿里的RocketMq里,有一种优化的做法-长轮询,来平衡推拉模型各自的缺点。基本思路是:消费者如果尝试拉取失败,不是直接return,而是把连接挂在那里wait,服务端如果有新的消息到来,把连接notify起来,这也是不错的思路。但海量的长连接block对系统的开销还是不容小觑的,还是要合理的评估时间间隔,给wait加一个时间上限比较好~
push和pull实现顺序消息的优劣
1 push:如果push模式的消息队列,支持分区,单分区只支持一个消费者消费,并且消费者只有确认一个消息消费后才能push送另外一个消息,还要发送者保证全局顺序唯一,听起来也能做顺序消息,但成本太高了,尤其是必须每个消息消费确认后(这样的成本太高了)才能发下一条消息,这对于本身堆积能力和慢消费就是瓶颈的push模式的消息队列,简直是一场灾难。
2 pull:反观pull模式,如果想做到全局顺序消息,就相对容易很多:(例如kafka,pull以后记录偏移量即可保证顺序,当然只是单个partition的有序)
- producer对应partition,并且单线程。
- consumer对应partition,消费确认(或批量确认),继续消费即可。
所以对于日志推送这种最好全局有序,但允许出现小误差的场景,pull模式非常合适。如果你不想看到通篇乱套的日志~~
Anyway,需要顺序消息的场景还是比较有限的而且成本太高,请慎重考虑。
总结
本文从为何使用消息队列开始讲起,然后主要介绍了如何从零开始设计一个消息队列,包括RPC、事务、最终一致性、广播、消息确认等关键问题。并对消息队列的push、pull模型做了简要分析,最后从批量和异步角度,分析了消息队列性能优化的思路。下篇会着重介绍一些高级话题,如存储系统的设计、流控和错峰的设计、公平调度等。希望通过这些,让大家对消息队列有个提纲挈领的整体认识,并给自主开发消息队列提供思路。另外,本文主要是源自自己在开发消息队列中的思考和读源码时的体会,比较不"官方",也难免会存在一些漏洞,欢迎大家多多交流。
微信公众号【Java技术江湖】一位阿里 Java 工程师的技术小站。(关注公众号后回复”Java“即可领取 Java基础、进阶、项目和架构师等免费学习资料,更有数据库、分布式、微服务等热门技术学习视频,内容丰富,兼顾原理和实践,另外也将赠送作者原创的Java学习指南、Java程序员面试指南等干货资源)