写这篇文章是首先我之前也是做过消息中心的需求,当时3y开源了它的项目Austin,当时觉得挺好的,之后我负责的消息中心重构了一版,现在想想也挺开心的。所以想把之前学习的整理了下来。
开源项目Austin的地址:https://gitee.com/zhongfucheng/austin,以下内容基于Austin,同时带上自己的一些思考。更多细节请参考开源项目Austin,作者是3y,感谢作者开源这么好的项目。以下内容基于Austin。
一、首先我们来说思考一下,如果实现一个消息中心?
1.实现的原则
方便业务方调用,同时调用的成本相对来说较小,统一的接口发送各种类型消息,对消息生命周期全链路追踪。也即实现一个消息中心,需要对来自各个渠道的消息进行统一接口的提供,方便排错。同时方便后续的迭代开发。同时实现熔断限流的功能。
2.消息中心提供的能力
邮件、短信、IM、微信服务号、微信小程序、企业微信、钉钉、飞书等
3.实现的模型
业务方通过消息中心,能够实现对短信、邮件、IM、微信类、飞书等消息的正确发送
4.具体的实现方式
1).首先它具有多种渠道,因此我们需要对此维护一个渠道的相关账号信息
2).由于发送的消息不同于消息中间件的消息,比如短信、邮件等,需要先准备好模板,因此需要一个模板表
3).由于存在审计信息需要记录,发送消息的人发送的消息记录
4).由于存在审计信息需要记录,因此这里需要准备一张消息接收者接收消息的情况
5).对失败的消息进行一些补充或者重试,此时需要起一些定时任务或者在代码中进行重试,这个具体需要根据业务的需求进行
6).对失败和成功的消息做指标的统计,方便业务查询哪些消息发送成功了,哪些消息发送失败了,如果失败了,可以进行手动重试与定时任务重试,这个具体根据业务方的需求进行
也即总结起来就是:
实现多渠道统一接口 模板信息的准备与填充 审计信息的记录 成功失败指标统计 失败重试与提示
5.实时之外延迟队列=>不定时任务—延迟队列
除了实现这些功能外,思考一下,如果业务通常希望今天填充的信息,明天能够在某时某点能够进行发送,而且每次可能都是不固定的。此时就不再单单是发送了,需要考虑延迟的情况,也即结合延迟队列实现对短信的方式或者邮件等等。
延迟队列的实现,网上有很多实现的方案,这里主要介绍三个实现的方案:
1.基于开源版本的RocketMQ的延迟队列实现,其在5.0之后是支持任意精度的延迟消息队列的,在5.0之前它不支持任意精度的延迟消息队列。其实现原理是通过时间轮的方式实现的。
5.0之前,通过将消息的主题修改SCHEDULE_TOPIC_XXXX,并投递到延迟队列,比较局限只支持18个等级,精度有限。时间轮是一种算法, 用来管理和调度固定时间间隔内的事件。它将固定时间间隔分成若干个时间槽,并将事件映射到相应的时间槽上。在时间轮中,每个时间槽都会按顺序处理它所映射的事件,从而实现对固定时间间隔内的事件的调度。
支持的RIP:[RIP-43] Support Timing Messages with Arbitrary Time Delay
2.基于xxl-job实现,这个是开源项目Austin主推的方式
方案:万物皆扫表,针对这次需求(晚上发的消息,次日早上发送),就不需要上延时队列,因为austin已经接入了分布式定时任务框架了(对应的实现是xxl-job),只要把晚上的接收到的消息扔进Redis list,然后启个定时任务(每天早上9点)轮询该list是否有数据,如果有再重新做处理就完事了。当然它这种也不属于任意精度的。只能说是定时
3.基于Redis实现,思想还是轮询。基于Redis 实现一个轻量级的延迟队列。
6.实现业务之外的考虑-防重
业务方使用不当,不小心连续推送了两次,如果没有任何限制,那就真的下发了两次。试想下,如果你点了下验证码,如果收到了两条一模一样的短信,你是什么感受?作为平台需要有兜底的功能(尽可能避免由于业务使用不恰当,导致出现事故)
7.实现业务之外的考虑-防重-统计量化监控限流
前面说到对相关指标进行统计量化,仅仅是做这些是做了成功失败的统计,做这些是不够的。第三方接口一般都会有限流的,比如在腾讯云官网上看到对发送接口有3000QPS的限制。我们是需要知道现在各种类型的消息的发送情况是怎么样的,是否有限流的操作。如果限流了,是不是可以告诉业务方可能是原因目前发送量过大导致触发限流。系统上有完备的监控,你知道了各种的系统指标数据,自己才不会慌。比如此时产生的白名单、黑名单信息记录。同时实现熔断限流的相关措施。
8.实现业务之外的考虑-防重-花销统计
接入的短信后台都有对应的统计,但我们量大的话是需要「对账」,以我们的发送记录与回执统计跟短信的后台进行统计。austin使用的方案是基于hive实现。将短信的发送和回执数据导入到Hive,每个月跑一次Hive脚本统计进行对账
9.实现业务之外-供应商的服务挂掉了怎么办
比如某个短信渠道商挂了怎么办?短信需要接入多个渠道商,调用接口失败需要继续调用其他渠道商,支持动态分配渠道商的流量(一旦有提前预警,直接切换渠道商)
实现业务之外需要实现的功能
延迟队列 防重 统计量化监控限流 花销统计 多渠道服务
10.实现业务之外-可配置
相关参数信息可配置化,这里可以看到其使用了nacos进行相关参数配置化。
二、Austin的实现
首先准备好对应的账号信息、渠道信息、模板信息,然后进行消息的发送。
实现流程
austin-web:为前端项目提供相关接口 可以调用austin-web创建发送消息 austin-cron: 承载着定时任务的工作 austin-api: 接受到发送消息请求,直接将请求进MQ austin-handler: 消费MQ消息后,由各类handler进行发送处理 1.调用消息推送接口 2.接受消息推送接口返回信息 3.保存消息推送接口返回信息至mysql 4.从推送消息第三方拉去,推送消息回执 5.将回执信息保存到mysql中 austin-stream:处理handler产生的日志数据 austin-support: pipeline的支持、获取配置、mq的封装等,主要是对项目的支持 austin-conmmon:公共类 austin-data-house:对数据的处理,hive、flink