引子
先简单的问一下, 你如何解决这样的需求:
对一堆数据按某字段排序,获取第100-10条的数据。
假设你面对的数据是个单节点,简单来说,就是一个mysql数据库, 很自然地用
select a from tb order by a limit 100, 10;
你面对的是10个甚至100个节点呢
按照常理,我们会先把所有节点的前110条数据拉到一个公用节点上,再排序,前文已述。
咱们把数字改一下,要获取第[100万, 100万+10]中间的数据10条数据,这个方法就不能用了。
问题变成:
对分布在几十个节点上的上亿条数据要求1秒内排序。
在上一篇文章的理论 中,我提到让各数据节点活动起来,不再只承担存储的功能,还要相互发现,相互问答,即“你那有这个数据吗”,“你那这个数据排多少名”。
就好像把全服的机器当成了神经网络中的一个个神经元,互相启发,理解新世界,产生乘法功能。
言归正传,接下来,我给大家实现一下这个需求。
咱们把目标按数据源分两种,先对内存型排序,再进行磁盘型排序,咱们这篇是头一种。
统一目标
咱们简单统一下试验目标。
1.有10个节点,每个节点存储100万条数据,都放在缓存中。
2.内存中的数据是排序过的。
“神马? 数据源中的原始数据都是排好序的, 那岂不是很简单,那你这个排序还有什么价值?”
这个问题这段时间一直困扰着我,因为部分同学没看明白前文,而钉我问我鄙视我(微笑)。
“对单节点数据源的排序成本很高吗”,我一般会这么反问,很显然,相比于海量节点的排序,单个节点内的数据排序成本很低。
“每个节点的数据源都是排好序的,所有节点间的数据排序就完成了吗”, 不是的, 中国有首富榜,印度有首富榜,就算中国比印度富一点(如有意见,纯属意淫),要获取中印共榜后的前10名,总不能只用中国的前10了事吧。
架构设计
既然咱们是来开发试验的,而且是分布式节点,咱们就得设计架构。
架构目标
1.各节点能够快速互相发现,互相通信。
2.节点掉线被系统实时发现,并做冗灾。
3.支持运行时新增节点。
试验采用的架构:
config:负责节点注册,节点发现。
server:数据源节点,试验中有10台server。
service:对外服务,承接外部http访问,转为server查询,再将返回结果合并发给用户。
试验目标
单次查询与skip的大小无关,不能查询说查询100万以上的排序比查询100条以内的慢。(这一条是不是太屌了)
响应时间控制在100ms以内。
那咱们就开始吧。
开始前先放一下效果吧,怕大家没信心看下去。
效果展示
第一张图:同样是获取100条,分别是从第10条,10000条, 1000万条起。
第二张图:获取从100万开始的1000条数据。
skip | 耗时(limit=100) | 耗时(limit=1000) |
---|---|---|
10000 | 64ms | 29ms |
10 | 20ms | 38ms |
1000000 | 23ms | 58ms |
可以看出,请求耗时与skip, limit并没有线型关系,从哪里开始取,取多少条,响应时间都差不多。
也就是说,当你要从分布式存储中获取第100万开始的100条数据,与从第10条开始的100条数据,所得时间相当,而且在100ms以内, 达到实时效果。
语言选择
我选择的语言是c++, 因为我觉得对内存,存储啥的,它要擅长一些。 不过我尽量写的简单通俗一点,并力争在不久后用java来实现开源。
分布式处理
分布式处理的config-service-server以前的文章里已经写过,框架具备自发现,动态扩容的特点,读者想要了解的话,我在以后的文章里可以继续写。
协议设计
外部协议
/topkn&k=1000&n=100
k: skip
n:limit
内部协议
内部节点相互通信频繁, 且是双向的, 因此我采用protobuf协议,方便扩展,速度也是扛扛的。
server功能:
message ServerRequest{
optional ServerBGQuery bg = 4; //service -> server请求, “hey, 我要从100万开始的100条数”
optional ServerBGIndexReq bg_index = 5; //server -> server 问索引,"hey 你那边这个数的索引是多少“
optional ServerBGIndexSync bg_sync = 6; //server ->sync 同步查询结果, "hey, 我已经确认了查询结果,我同步给你吧”
}
message ServerResponse{
optional ServerBGResponse bg = 4; //server -> service 响应, “hey,这是我查询完的结果,请收”
optional ServerBGIndexResponse bg_index = 5; //server -> server 答索引 “hey,这个数在我这边的索引是这个”
}
service
节点流程图
service起来服务代理的作用,承接外部http访问,转为server查询,再将返回结果合并发给用户。
接入请求,比较简单,只接收/topkn的get请求
service.handleGetReq("/topkn", [this](const BGCon& con){
this->OnHttpPostRequestTopKN(con);
});
解析参数:
发送请求给后端的server:
所用到的协议:
message ServerBGQuery{
required int64 _s = 1; //为这次查询取个ID吧
required int64 flag = 2; //为这次查询立个flag
required int64 k = 3; //skip
required int64 n = 4; //limit
}
server返回
void Service::OpBackBG(const BGCon& con, const ServerBGResponse& response)
{
for(auto&& v : response.vs()){
auto vit = _ctx->_v.find(v.v());
if(vit == _ctx->_v.end()){
BigValuePtr bv(new BigValue{v.v(), v.count(), v.index()});
_ctx->_v.insert(std::make_pair(v.v(), bv));
}
else{
BigValuePtr bv = vit->second;
bv->count += v.count();
bv->index += v.index();
}
}
}
代码中的response代表某个server,response.vs()是它返回来的数据。
可以看见, service对所有server的数据进行整合。
涉及的协议:
message ServerBGResponse{
required int64 _s = 1; //查询的ID
repeated ServerBGIndex vs = 9; //某节点中所包含的数据, “hei, 这里是我在本次查询中包含的数据”
}
message ServerBGIndex{
required int64 v = 1; //某个数
required int64 index = 2; //这个数的索引是多少
required int64 count = 3; //这个数有多少个
}
service的业务功能没多少个,流程也很简单,就是起个请求代理与数据合并的功能。
server
server的的节点逻辑用一个图来表示最好不过了。
再来个简图
逻辑处理
server的响应逻辑,可以分成3个,就是实现下面的三条协议。
message ServerRequest{
optional ServerBGQuery bg = 4; //service -> server请求, “hey, 我要从100万开始的100条数”
optional ServerBGIndexReq bg_index = 5; //server -> server 问索引,"hey 你那边这个数的索引是多少“
optional ServerBGIndexSync bg_sync = 6; //server ->sync 同步查询结果, "hey, 我已经确认了查询结果,我同步给你吧”
}
对应的处理函数:
Top:service发过来, “hey, 我想查一下skip=100万,limit=100的数是哪些”
Count:“hey, 这个数在你那排第几”。
CountBack:"hey, 这个数在我这排第9”
CountSync: “hey, 我已经排完了,我知道service请求的数是这些,你看看”
处理service发来的请求
这个看到了咱们的核心函数Guess
Guess逻辑
guess部分的代码太长,咱们来看逻辑。
当接受到一次(skip, limit)请求时, 全服系统需要寻找2个索引, skip, skip+limit, 比如当skip=100, limit=10时,全服需要找的是第排序为第100与第110的数,它们之间的数都被包含是结果集中。
这就好办了,我把skip对应的数称为b(begin), skip+limit对应的数称为e(end). 要找到b和e,满足 index(b)=skip, index(e)=skip+limit.
1.将猜测范围锁定为全局, 我们试验中的数据源是长整型,我们把从[0, 0x7ffffffffffffffe]。
2.将猜测范围内的数分成20等分,得到集合V1=[v1, v2, ...v20], 然后得到这些点的全局排序索引I1=[index1, index2....index20]。
index(b) 与index(e) 必然落在I1中的某个区间。
假设 index2
3.针对b: 将猜测范围定在[v2, v3],进行第2步。
4.针对e:将猜测范围定在[v4, v5],进行第2步。
重复上述过程,不断缩小包转圈,直至发现b, e, 满足index(b)=skip, index(e)=skip+limit.
实际在处理过程中,会有一些边界值,比如skip太大,所有的数据都满足不了, 则第2步不满足就能发现。
再比如区间太小,不能拆分20等分,那就设步长为1来猜测。
再比如下面的一个切面, skip小于当前最小的索引, 则直接分配最小索引为skip。
测试数据产生
测试数据为每个节点100万条, 每条数据8个字节,可以将这里的数据理解成mysql的索引, 但是实际存储中索引只占很小一部分。
运行程序
测试:
图中的cost time 来自于这里
ctx->timeSec_为请求时打的点,因此cost time表示此交请求中service<->server的时间, 即后端处理的时间几乎为0。
好了, 上面的数据是100万条一个节点,我们来看看单节点1亿条的情况。
解决1亿条数据排序
生成测试数据
为了快速生成测试数据,我写了生成程序,咱们看看解析部分:
程序接受2个参数, datacount 数据数量,在此我们传1千万, filecount为文件个数,我们传10。
运行生成程序
我们试着生成一下。
生成的数据:
总共一亿条数分布在10个节点中。
起动程序后内存迅速被吃满
来在来查询一下100万起的数据:
1000万起的数据20条:
总共时间在10ms以内,可见查询时间与分布式节点的数据大小没有关系。
结论
当有上亿条数据分布在集群中的大量节点上时,如果各节点上的数据是有序的,我们对节点整个排序时,可进行 猜测=>应答=>同步 的方式进行实时操作,让节点之间实时高效地互动起来, 让它们并行运算直至产生最终结果。