1. 相关背景
搜索事业部与计算平台事业部目前使用消息队列主要有以下三种场景:
1. 每天有上万张表需要通过Build Service来构建索引。这些表主要来自主搜索,IGRAPH,Rank Service等业务,且每个表包含的文档数差别很大。总数据量为PB级别,总文档数达万亿级。文档的大小不一,小到几十Byte大到几百KB。在Build Service内部,文档处理与索引构建需要一个消息队列来传送消息。因此在build时,容易产生突发大流量(几百G/秒,几千万条/秒)持续消息写入与读取。
2. 搜索的在线服务如主搜索查询服务,RankService打分服务或IGRAPH服务需要毫秒级的实时文档更新。这些服务引擎基本上是多行多列结构,即每一行是一个完整的服务单元,由多台机器组成,多行提升服务的总能力。大的服务通常包含数百行,所以一条实时消息通常会被消费数百次,在线同时实时消费的机器规模也达上万台。
3. 在线的实时消息主要来自离线实时模型训练,用户的实时浏览、点击、加购行为或者商家的增删改宝贝等。离线训练任务会同时启动几十万个worker对上千张表产生实时消息,写请求每秒达千万次。
对于这几种场景,传统的消息队列(如Kafka等)要同时满足,至少需要成百上千台物理机,且系统还需要做改造来适用于每天上万个topic的增减和几十万的生产者与消费者同时读写消息。另外这些机器的failover管理也是个大问题。现实中,搜索团队所有的机器都是由调度系统统一管理和复用,没有专门的物理机可给消息系统独占使用。
本文将介绍搜索事业部目前使用消息系统swift。主要介绍系统结构和消息可靠传递机制这两方面。最后介绍下swift系统在今年双十一期间的表现。
2. Swift介绍
传统的消息队列通常为消息的安全性,一般先要求消息落盘到本机后才返回成功。这限制的机器的迁移,扩展和复用。因为消息数据只存一两台机器,机器迁移必然导致数据的迁移。传统消息队列要有较高的性能,通常先要解写磁盘的毛刺等io问题。特别是机器与其它应用复用时IO问题并不好解决。所以传统的消息队列一般要求机器独占使用。
计算与存储分离一直是最近年来研究与应用的热点。计算与存储分离带来最大的好处是机器的迁移与调度不再受到数据存储大小与位置的限制。计算资源在调度系统的管理下可以近无限的扩展。存储系统如HDFS,PANGU能提供的PB级的存储空间以及百万级的文件读写。
计算与存储的分离在消息中间件系统中的应用还比较少,主要的问题是传统的分布式文件系统的读写响应latency远大于本机的磁盘。如果要保证消息先落盘,导致整个请求的latency就会飙升,影响整个消息系统的吞吐率。但随着硬件水平的提升如25G/100G网络和RDMA等新技术的出现,分布式文件系统也有了质的飞跃,例如集团盘古2.0等系统为低延时高可靠的存储提供很好的存储平台。
2.1 Swift系统结构
Swift消息系统是在计算与存储分离上的一次尝试。它主要有以下特点:
首先,Swift的每个计算结点都是无状态的,即每个worker上除log记录,不存储任何消息系统相关数据。在swift消息系统中,系统的状态数据存储在zookeeper上,消息的内容则存储在分布式文件系统如HDFS, PANGU等上。
其次,Swift的每个计算结点都是等价的,只要消息系统需要计算资源,就可以通过调度系统不停的申请并提升整个消息系统的服务能力。目前Swift可以跑在Hippo或者Yarn上面。 Swift消息系统每次申请的资源粒度也比较小,可以充当调度系统的碎片利用者。
最后,Swift自身的client与server的消息读写协议,能够保证消息高效可靠的传递。
图 1 SWIFT系统结构图
图1是SWIFT系统结构,其主要分成2种worker: Admin和Broker。Admin和Broker的资源分配与启动都是基于调度系统。目前支持Hippo与Yarn这两种调度系统,这2种worker都会有很多个实例,Broker worker都是等价的,Admin worker则有一个leader,其余的等价,这些worker一般在Docker容器中工作。
Admin角色主要负责:1. Topic的增删改 2. Topic对应物理partition与broker调度 3. Client读写数据时物理partition的定位。4.资源的调整,如broker个数的增减等。
Broker角色主要负责:1.partiton相关的消息的读写 2. Partition相关数据的管理如过期数据的清理等。
2.2 Swift Topic介绍
Swift系统中的topic与其它消息系统的类似,它是一堆相关消息的集合,通常由业务自定义。如图二所示,在swift中topic是由65536个逻辑分区组成,编号是[0 - 65535]。在Swift消息系统内部,topic是由partition组成的,每个partition负责一个range的逻辑partition读写。
在用户层面,用户看不到Swift的物理partition,写消息时要么需要提供一个hash字段(由swift client自动映射到相应的逻辑分区)要么提供一个0-65535的逻辑编号。 swift根据topic下每个partition的服务range,把消息写入相应partition的writer中。Writer可以通过同步与异步方式把消息append到对应的物理partition中。
Topic里物理Partition个数的多少影响整个topic的读写能力,通过逻辑partition与物理partition映射,当topic的服务能力不足时,可以动态的扩展物理partition来提升读写能力。另外,物理partition是Swift的基本调度单元,admin会根据每个broker worker负载,尽可以平衡的调度partition。
图 2 swift topic数据写入示意图
3. 可靠消息读写机制
先前提到传统的消息系统为了保证消息的可靠性,在写消息时需要先落盘,以防机器挂掉时,消息不丢失。Swift也提供类似的模式,但落盘的对象是分布式文件系统如HDFS。这种模式下正常写落盘消息延时的毫秒级,当HDFS压力大时,会变成秒级,所以其性能不太稳定。Swift 设计了一种client与broker之间,broker与HDFS之间的消息写入与确认协议来保证消息高效可靠的写入与持久化,其机制类似TCP的滑动窗口协议。
图3是消息异步安全发送的示意图。Broker在分配到partition进行服务时,会生成一个标记,其由partition的版本号(V),broker加载partition时间戳(S)以及消息持久化的checkpoint (C)组成。Client在向admin定位到partition所在broker的时也会获取partition的版本号(V)。版本号V主要在topic属性发生变化时(例如partition的个数等)会更新。时间戳每次partition发生重加载或调度都会发生变化。
图 3 SWIFT异步安全发送消息示意图
异步安全写消息工作流程如下:
1. 用户通过客户端写入一条消息,client定位到写哪个物理partition,同时把消息写入到对应的buffer中。用户写消息时,还可以给每条消息设置一个递增编号,swift client会自动映射写消息进度与编号的关系。在异步模式的,client会有专门的提交线程与broker进行通信。
2. Client第一次向partition发送消息时,broker会验证partition的版本V0, 匹配后才会接受消息,同时会把三元组(V,S, C)返回。client收到accept消息后,会更新已接受消息的光标和协议的三元组信息。
3. 客户端可以持续的写入消息,同时broker那能partition中的消息做异步持久化,当持久化成功时,会更新持久化信息(Ca)。持久化成功的消息在内存中不会马上删除,只有内存不足时才会被回收。
4. Client的后台发送线程继续工作,发送消息b,同时请求带上了(V0,S0)。
5. Broker端验证(V0,S0),收受消息b,顺便把持久化信息也返回(V0,S0,Ca), client接收到accept信息后,更新已发送的光标到b,同时更新已接受的光标到a。消息a已经持久化成功,在使用的内存将会被writer回收。Writer更新checkpoint (Ca)给用户层,表示消息a已经持久化。
6. 同3一样,client继续写消息c,broker继续持久化消息b。
7. 此时partition发生了调度(例如被分配到了其它机器),其HDFS上的文件消息马上可以读取到,但内存中的消息会被清空。此时partition加载时间戳变成了S1。Client向admin重新定位到partition的服务broker写入的消息c和(V0,S0)。
8. Broker检查client发送的(V0,S0)与自身的(V0,S1)不相等,将拒绝此次消息的写入。主要基于消息在partition内要求保序考虑。此时client还不知道b是否被序列化成功,partition重新被加载b是否被序列化成功的信息也会被丢弃(无状态),所以它也不知道。因Broker返回(V0,S1,C0),要求client重新发送未持久化的所有消息。
9. Client 重置已发送光标到b之前,更新S1并重新发送消息b和c。
10. Broker检验client的(V0,S1)并收受消息b和c,这时消息b会被再次持久化化到HDFS上。Client重新更新已发送光标到c。如果此后无新消息的写入,且buffer中的消息还有未被持久化的,client会发起一次空写操作获取最新的持久化信息。
步骤1-10是异步消息写入的工作方式,用户层可以获取到当前持久化消息的checkpoint,可以自己记录发送进度以便回滚。如果不方便记录发送进度,可以在写完一段数据后,调用flush方法强制把数据从client的buffer放到broker的buffer中。此时消息虽然没有被持久化,但在client与partition各存一份。所以只有在broker与client同时挂掉才出现消息丢失,因此我们认为这种方法也是比较安全的。
Swift partition的写buffer缓存所有写入的消息,只有当空间不足时,消息内存空间才会被回收。对文件上的消息读取,也会以块buffer的方式做缓存。Partition之间的buffer各文件cache buffer都是共享存储,由统一的内存回收模块管理。其保证冷门的partition基本不消耗资源,热门的partition可以充分利用资源。正常情况下,swift的内存可以缓存数分钟的消息,所以消费消息时基本上从内存读取,读的性能也会很高效。在这个协议下,分布式文件系统偶尔抖动也不会影响消息的时效性,实际上文件系统在数分钟内的挂机也不影响消息的实时传递。
4. Swift IN双十一
2017年swift消息系统开始在搜索事业部与计算平台事业部大规模应用,主要场景除主搜索外,还包括Porsche,K-monitor,IGraph, DII, OpenSearch,RankService等业务。另外,Swift在蚂蚁金服,阿里妈妈和神马事业部也有多套swift机群的部署。
双十一当天,同时服务的topic个数均值近万个, partition个数达10万。当天创建与删除的topic近2万个,其主要来自IGRAPH, RankService,DII等业务索引的重建,平时也差不多是这个数量级。 当天Swift消息系统总共写入数万亿条消息,读取数十万亿条消息,读写比3:1。总读写消息内容字节大小数PB,消息的内容是经过Swift client压缩,一般消息压缩率是原始大小的1/4 - 1/2之间。读写的最大QPS与均值都超亿条/秒,读写请求的峰值与均值超千万次/秒。在线与离线读写消息的worker超20万个。 另外,swift日常处理的数据与双十一的数值相差不大。
5. 总结与展望
Swift消息系统经过一年多的不断改进与优化,目前每天能处理PB级与万亿级的消息,但在不久的将来还需要解一些问题:
1. 超大量client如百万级的client写入,涉及到的partition定位与worker的连接问题。当client达百万时,首先碰到的一个问题是连接数不够用,目前离线的一个client写数据会对所有相关加载partition的worker产生连接,如果worker有N个,partition有M个,其连接数达N*M个。其次partition发生调度时,partition的定位瞬间能打爆Admin。
2.每秒百亿级别的消息读写时,如何减少系统处理消息量。Swift目前有client主动合并消息的优化,但能合并的消息数量并不多,能否在broker端进行消息合并与存储。在大规模消息读写时如何降低对HDFS的压力。Swift目前提供内存topic等来尽可能的减少消息落盘,是否有更好的机制也需要探索。
6. 相关职位招聘
发布时间: | 2017-11-17 | 工作地点: | 杭州市 | 工作年限: | 二年以上 |
所属部门: | 搜索事业部 | 学 历: | 本科 | 招聘人数: | 3 |
岗位描述:
参与阿里巴巴集团内部实时消息系统开发,支持每秒万亿级别消息读写,提供高可靠、高性能、高伸缩、低延时的服务,支撑电商、金融、物流、文娱、大数据、人工智能、搜索、广告等各种业务场景。
岗位要求:
1. 精通C/C++语言和数据结构,算法和数据结构基础扎实
2. 学习能力较强,有很好的逻辑思维能力,善于主动思考,对技术有强烈激情
3. 具有优秀的分析和解决实际问题的能力和态度,追求编写优雅的代码,从技术趋势和思路上能影响技术团队;
4. 符合以下条件之一者优先。
(1) 有互联网中间件(数据、消息、服务等)开发经验者优先。
(2) 对hbase/hadoop/cassandra/elasticsearch/rocksdb等开源存储产品的一种或多种熟悉者优先
(3) 对linux内核原理或服务器硬件熟悉者优先