少强 2017-12-27 17910浏览量
移动互联网时代,微信和微博已经成为这个时代的两大支柱类社交应用。
这两类应用,其中一个是IM产品,一个是Feed流产品,微信的朋友圈也属于Feed流。
如果再细心去发现,会发现基本所有移动App都有Feed流的功能:消息广场、个人关注、通知、新闻聚合和图片分享等等。各种各样的Feed流产品占据了我们生活的方方面面。
IM和Feed流功能已经基本成为所有App标配,如何开发一个IM或者Feed流功能是很多架构师、工程师要面临的问题。虽然是一个常见功能,但仍然是一个巨大的挑战,要考虑的因素非常多,比如:
为了解决上述问题,我们之前推出了三篇文章来阐述:
高并发IM系统架构优化实践:介绍了使用Table Store主键列自增功能后,对传统IM架构带来的巨大冲击,在稳定性,性能,成本,架构复杂度等多个方面都产生了巨大收益。
上述三篇文章推出后,用户反响很好,在各个平台的传播很广,为很多用户提供了设计一款IM和Feed流产品的架构思路,但是从这里到完全实现一个可靠的IM、Feed流系统平台还有很长的路,比如:
针对上述三个问题,在《现代IM系统中消息推送和存储架构的实现》中引入了一个逻辑模型概念:Timeline模型。
在《现代IM系统中消息推送和存储架构的实现》中基于IM系统提出了Timeline模型,进一步会发现Timeline模型适用场景可以更广泛:
IM和Feed流产品完全匹配上述四个特征,所以Timeline模型可以完全适用于IM和Feed流场景中。
下面我们来看看如何在各个场景中使用Timeline:
单聊就是三个Timeline:
群聊就是1 + N个Timeline:
每个用户只有一个同步库Timeline,就算用户A在10个群里面,那么这个10个群的同步消息都是发送给用户A的这一个同步库Timeline中。
发一条朋友圈状态就是1 + N个Timeline:
如果是微博,粉丝可能会达到上亿级别,这时候会比朋友圈稍微复杂些:
大V发一条微博就是 1 + M个Timeline(M << N,N是粉丝数)。
从上面分析可以看出来,不管是IM,还是Feed流产品都可以将底层的存储、同步逻辑抽象成一个对多个Timeline进行读写的模型。
有了Timeline概念模型后,从IM/Feed流应用映射到Timeline就比较容易了,但是从Timeline映射到存储、同步系统仍然很复杂,主要体现在:
这些问题涉及的内容光,细节多,深度大,坑较多等,整体上很繁杂,这一部分在耗费了大量人力之后,结果可能并不理想。
针对上述问题,只要存储系统和推送系统确定后,剩余的工作都是类似的,可以完全将经验封装起来成为一个LIB,将表结构设计,读写方式,隐患等等都解决好,然后供后来者使用,后来者可以不用再关心Timeline到底层存储系统之间的事情了。
所以,我们基于JAVA语言实现了一个TableStore-Timeline LIB,简称Timeline LIB。
目前已经开源在了GitHub上:Timeline@GitHub。
Timeline LIB的结构如下:
整个Timeline分为两层,上层的Timeline层和下层的Store层。
Timeline层,提供最终的读写接口,用户操作的也是Timeline的接口。
Store层,负责存储系统的交互,目前Timeline LIB中提供了DistributeTimelineStore,基于Table Store,同时实现了分布式的存储和同步。后续会继续实现GlobalTimelineStore等。如果有用户有其他系统需求,比如MySQL,Redis,可以通过实现IStore接口来新增MySQLStore和RedisStore。
也欢迎大家将自己实现的Store通过GitHub的PullRequest共享出来。
有了Timeline LIB之后,如果要实现一个IM或者Feed流,只需要创建两种类型Timeline(存储类,同步类),然后调用Timeline的读写接口即可。
接下来,我们看下Timeline LIB的API。
Timeline LIB中面向最终用户的是Timeline类,用于对每个Timeline做读写操作。
Timeline的接口主要分为三类:
写:
读:
范围读:
/**
* Timeline的构造函数。
* @param timelineID 此Timeline对应的ID。唯一标识一个Timeline,需要全局唯一,如果业务场景中需要多个字段才能唯一标识一个TimelineID,此时可以将多个字段拼接成一个字段。
* @param store 此Timeline关联的Store,一般为存储Store或同步Store。实现了IStore接口类的对象,目前LIB中默认实现了DistributeTimelineStore,可以使用此store。除此之外,用户还可以自己实现自己的Store类,用于适配其他系统。
*/
public Timeline(String timelineID, IStore store);
/**
* 写入一个消息到此Timeline中。
* @param message 消息对象,需实现IMessage接口。用户需要通过实现IMessage接口创造符合自己业务场景的消息类,LIB中默认实现了StringMessage类。
* @return 完整的TimelineEntry,包括消息和顺序ID。
*/
public TimelineEntry store(IMessage message);
/**
* 异步写入消息接口。
* @param message 消息对象,需实现IMessage接口。
* @param callback 回调函数。
* @return Future对象,异步模式下,Future和callback需要二选一。
*/
public Future<TimelineEntry> storeAsync(IMessage message, TimelineCallback<IMessage> callback);
/**
* 批量写入消息接口。
* 此接口只是把消息加入到本地的一个buffer中,当buffer满或者超时(默认10s,可配置)才会统一写入。
* 此接口返回时并不一定消息已经写入成功。
* @param message 消息对象,需实现IMessage接口。
*/
public void batch(IMessage message) {
/**
* 同步读取接口,通过制定一个唯一的顺序ID读取目标TimelineEntry。
* @param sequenceID 顺序ID。此顺序ID可由store或scan接口获取到。
* @return 完整的TimelineEntry,包括消息和顺序ID。
*/
public TimelineEntry get(Long sequenceID);
/**
* 异步读取接口,通过制定一个唯一的顺序ID读取目标TimelineEntry。
* @param sequenceID 顺序ID。
* @param callback 读取结束后的回调函数。
* @return Future对象,异步模式下,Future和Callback需要二选一。
*/
public Future<TimelineEntry> getAsync(Long sequenceID, TimelineCallback<Long> callback);
/**
* 顺序读取一段范围内或固定数目的消息,支持逆序,正序。
* @param parameter 顺序读取的参数,包括方向、from、to和maxCount。
* @return TimelineEntry的迭代器,通过迭代器可以遍历到待读取的所有消息。
*/
public Iterator<TimelineEntry> scan(ScanParameter parameter);
在 Maven 工程中使用 Timeline LIB 只需在 pom.xml 中加入相应依赖即可:
<dependency>
<groupId>com.aliyun.openservices.tablestore</groupId>
<artifactId>timeline</artifactId>
<version>1.0.0</version>
</dependency>
使用之前,需要先实现一个满足自己业务特点的Message类,此Message类能表示业务中的一条完整消息。
需要实现IMessage的下列接口:
String getMessageID():
void setMessageID(String messageID):
IMessage newInstance():
byte[] serialize():
void deserialize(byte[] input):
在一个IM或Feed流产品中,一般会有两个子系统,一个是存储系统,一个是同步系统。
需要为这两个系统各自生成一个Store对象。
Store生成好后就可以构造最终的Timeline对象了,Timeline对象分为两类,一类是存储库Timeline,一个是同步库Timeline。
当在IM中发布消息或者Feed流产品中发布状态时,就是对相应存储库Timeline和同步库Timeline的消息写入(store/storeAsync)。
当在IM或Feed流产品中读取最新消息时,就是对相应同步库Timeline的范围读取(scan)。
当在IM或Feed流产品中读取历史消息时,就是对相应存储库Timeline的范围读取(scan)。
如果是推拉结合的微博模式,则读取最新消息时,就是对相应存储库Timeline和同步库Timeline的同时范围读取(scan)。
Timeline LIB中会抛出TimelineException,TimelineException提供了两种接口:getType()和getMessage(),getType()返回此TimelineException的类型,包括了TET_ABORT,TET_RETRY,TET_INVALID_USE,TET_UNKNOWN:
这一节会演示下如何使用Timeline LIB实现IM的群组功能。
构造两个store,一个用来存储,一个用例同步。
IStore store = new DistributeTimelineStore(storeConfig);
IStore sync = new DistributeTimelineStore(syncConfig);
// 构造群成员列表,群成员列表可以存储在Table Store。
List<String> groupMembers = Arrays.asList("user_A", "user_B", "user_C");
user_A发一条群消息:“有人吗”。
// 存储会话消息
Timeline timeline1 = new Timeline("11789671", store);
timeline1.store(new StringMessage("user_A:有人吗"));
// 发送同步消息
for (String user : groupMembers) {
Timeline timeline = new Timeline(user, sync);
timeline.store(new StringMessage("user_A:有人吗"));
}
user_C读取自己最新的同步消息
Timeline timeline = new Timeline("user_C", sync);
ScanParameter scanParameter = ScanParameterBuilder
.scanForward()
.from(last_sequence_id)
.to(Long.MAX_VALUE)
.maxCount(100)
.build();
Iterator<TimelineEntry> iterator = timeline.scan(scanParameter);
while(iterator.hasNext()) {
TimelineEntry entry = iterator.next();
// 处理消息
}
上面的示例演示了如何用Timeline LIB实现IM中的群组功能。其他的朋友圈,微博等也类似,这里就不赘述了。
我们目前在Timeline Samples@GitHub上实现两个场景的实例:
也欢迎大家共享其他场景的实现代码。
我们使用阿里云ECS做了性能测试,效果较理想。
不同接口的写入性能上,批量batch接口最快,其次是异步writeAsync,最后是同步write接口。
在阿里云共享型1核1G的ECS机器上,使用DistributeTimelineStore,Timeline LIB的storeAsync接口可以完成每秒1.2万消息的写入,如果使用batch批量接口,则可以完成每秒5.3万消息的写入。
如果使用一台8核的ECS,只需要3秒钟就可以完成100万条消息的写入。
由于DistributeTimelineStore使用了Table Store作为存储和同步系统,Table Store是阿里云的一款服务化NoSQL服务,支持的TPS在理论上无上限,实际中仅受限于集群大小,所以整个Timeline LIB的写入能力和压力器的CPU成正比。
下面的图展示了不同机型上完成1000万条消息写入的延迟:
在一台8核ECS上只需要27秒就可以完成1000万写入,由于写入能力和CPU成线线性关系,如果用两台16核的,则只需要7秒就可以完成1000万消息的写入。
我们再来看一下scan读取的性能,读取20条1KB长度消息,LIB端延迟一直稳定在3.4ms,Table Store服务端延迟稳定在2ms。
这个量级和能力,可以撑得住目前所有的IM和Feed流产品的压力。
Timeline LIB的想法来源于Table Store的真实场景需求,并且为了用户可以更加简单的使用,增加了主键列自增功能。
目前Timeline LIB的store层实现了DistributeTimelineStore类,DistributeTimelineStore可同时适用于存储store和同步store。
DistributeTimelineStore是基于Table Store的,但是为了便于用户使用其他系统,在Timeline LIB中将Store层独立了出来。
如果用户希望使用其他系统,比如MySQL作为存储系统,可以实现IStore接口构造自己的Store类。我们也欢迎大家提供自己的各种Store层实现,最终希望为社交场景的架构师和开发者提供一套完整的易用性开发框架。
Store的扩展:
更易用性的接口
如果在使用过程中有任何问题或者建议,可以通过下列途径联系我们:
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。