实践:几十亿条数据分布在几十个节点上的毫秒级实时排序方法

简介: #引子 先简单的问一下, 你如何解决这样的需求: ``` 对一堆数据按某字段排序,获取第100-10条的数据。 ``` 假设你面对的数据是个单节点,简单来说,就是一个mysql数据库, 很自然地用 select a from tb order by a limit 100, 10; ![imag

引子

先简单的问一下, 你如何解决这样的需求:

                  对一堆数据按某字段排序,获取第100-10条的数据。

假设你面对的数据是个单节点,简单来说,就是一个mysql数据库, 很自然地用

select a from tb order by a limit 100, 10;
image.png

你面对的是10个甚至100个节点呢
image.png

按照常理,我们会先把所有节点的前110条数据拉到一个公用节点上,再排序,前文已述。

咱们把数字改一下,要获取第[100万, 100万+10]中间的数据10条数据,这个方法就不能用了。

问题变成:

              对分布在几十个节点上的上亿条数据要求1秒内排序。

上一篇文章的理论 中,我提到让各数据节点活动起来,不再只承担存储的功能,还要相互发现,相互问答,即“你那有这个数据吗”,“你那这个数据排多少名”。

就好像把全服的机器当成了神经网络中的一个个神经元,互相启发,理解新世界,产生乘法功能。

言归正传,接下来,我给大家实现一下这个需求。

咱们把目标按数据源分两种,先对内存型排序,再进行磁盘型排序,咱们这篇是头一种。

统一目标

咱们简单统一下试验目标。

1.有10个节点,每个节点存储100万条数据,都放在缓存中。

2.内存中的数据是排序过的。

“神马? 数据源中的原始数据都是排好序的, 那岂不是很简单,那你这个排序还有什么价值?”

这个问题这段时间一直困扰着我,因为部分同学没看明白前文,而钉我问我鄙视我(微笑)。

“对单节点数据源的排序成本很高吗”,我一般会这么反问,很显然,相比于海量节点的排序,单个节点内的数据排序成本很低。

“每个节点的数据源都是排好序的,所有节点间的数据排序就完成了吗”, 不是的, 中国有首富榜,印度有首富榜,就算中国比印度富一点(如有意见,纯属意淫),要获取中印共榜后的前10名,总不能只用中国的前10了事吧。

架构设计

既然咱们是来开发试验的,而且是分布式节点,咱们就得设计架构。

架构目标

1.各节点能够快速互相发现,互相通信。

2.节点掉线被系统实时发现,并做冗灾。

3.支持运行时新增节点。

试验采用的架构:
image.png

config:负责节点注册,节点发现。

server:数据源节点,试验中有10台server。

service:对外服务,承接外部http访问,转为server查询,再将返回结果合并发给用户。

试验目标

单次查询与skip的大小无关,不能查询说查询100万以上的排序比查询100条以内的慢。(这一条是不是太屌了)

响应时间控制在100ms以内。

那咱们就开始吧。

开始前先放一下效果吧,怕大家没信心看下去。

效果展示

image.png

image.png

第一张图:同样是获取100条,分别是从第10条,10000条, 1000万条起。

第二张图:获取从100万开始的1000条数据。

skip 耗时(limit=100) 耗时(limit=1000)
10000 64ms 29ms
10 20ms 38ms
1000000 23ms 58ms

可以看出,请求耗时与skip, limit并没有线型关系,从哪里开始取,取多少条,响应时间都差不多。

也就是说,当你要从分布式存储中获取第100万开始的100条数据,与从第10条开始的100条数据,所得时间相当,而且在100ms以内, 达到实时效果。

语言选择

我选择的语言是c++, 因为我觉得对内存,存储啥的,它要擅长一些。 不过我尽量写的简单通俗一点,并力争在不久后用java来实现开源。

分布式处理

分布式处理的config-service-server以前的文章里已经写过,框架具备自发现,动态扩容的特点,读者想要了解的话,我在以后的文章里可以继续写。

协议设计

外部协议

/topkn&k=1000&n=100
k: skip
n:limit

内部协议
内部节点相互通信频繁, 且是双向的, 因此我采用protobuf协议,方便扩展,速度也是扛扛的。

server功能:

message ServerRequest{
    optional ServerBGQuery    bg = 4;              //service -> server请求, “hey, 我要从100万开始的100条数”
    optional ServerBGIndexReq  bg_index = 5;       //server -> server 问索引,"hey 你那边这个数的索引是多少“
    optional ServerBGIndexSync bg_sync = 6;        //server ->sync 同步查询结果, "hey, 我已经确认了查询结果,我同步给你吧”
}
message ServerResponse{
    optional ServerBGResponse bg = 4;              //server -> service 响应, “hey,这是我查询完的结果,请收”
    optional ServerBGIndexResponse bg_index = 5;   //server -> server 答索引  “hey,这个数在我这边的索引是这个”
}

service

节点流程图
image.png

service起来服务代理的作用,承接外部http访问,转为server查询,再将返回结果合并发给用户。

接入请求,比较简单,只接收/topkn的get请求

service.handleGetReq("/topkn", [this](const BGCon& con){
     this->OnHttpPostRequestTopKN(con);
});

解析参数:
image.png

发送请求给后端的server:
image.png

所用到的协议:

message ServerBGQuery{
    required int64          _s = 1;           //为这次查询取个ID吧
    required int64          flag = 2;         //为这次查询立个flag
    required int64          k = 3;            //skip
    required int64          n = 4;            //limit
}

server返回

void Service::OpBackBG(const BGCon& con, const ServerBGResponse& response)
{
 
    for(auto&& v : response.vs()){
       auto vit = _ctx->_v.find(v.v());
       if(vit == _ctx->_v.end()){
             BigValuePtr bv(new BigValue{v.v(), v.count(), v.index()});
             _ctx->_v.insert(std::make_pair(v.v(), bv));
        }
        else{
             BigValuePtr bv = vit->second;
             bv->count += v.count();
             bv->index += v.index();
        }
    }
}

代码中的response代表某个server,response.vs()是它返回来的数据。

可以看见, service对所有server的数据进行整合。

涉及的协议:

message ServerBGResponse{
    required int64              _s = 1;       //查询的ID
    repeated ServerBGIndex      vs =  9;      //某节点中所包含的数据, “hei, 这里是我在本次查询中包含的数据”
}
 
message ServerBGIndex{
    required int64 v = 1;                     //某个数
    required int64 index = 2;                 //这个数的索引是多少
    required int64 count = 3;                 //这个数有多少个
}

service的业务功能没多少个,流程也很简单,就是起个请求代理与数据合并的功能。

server

server的的节点逻辑用一个图来表示最好不过了。

image.png

再来个简图

image.png

逻辑处理

server的响应逻辑,可以分成3个,就是实现下面的三条协议。

message ServerRequest{
    optional ServerBGQuery    bg = 4;              //service -> server请求, “hey, 我要从100万开始的100条数”
    optional ServerBGIndexReq  bg_index = 5;       //server -> server 问索引,"hey 你那边这个数的索引是多少“
    optional ServerBGIndexSync bg_sync = 6;        //server ->sync 同步查询结果, "hey, 我已经确认了查询结果,我同步给你吧”
}

对应的处理函数:
image.png

Top:service发过来, “hey, 我想查一下skip=100万,limit=100的数是哪些”

Count:“hey, 这个数在你那排第几”。

CountBack:"hey, 这个数在我这排第9”

CountSync: “hey, 我已经排完了,我知道service请求的数是这些,你看看”

处理service发来的请求
image.png

这个看到了咱们的核心函数Guess
image.png

Guess逻辑

guess部分的代码太长,咱们来看逻辑。

当接受到一次(skip, limit)请求时, 全服系统需要寻找2个索引, skip, skip+limit, 比如当skip=100, limit=10时,全服需要找的是第排序为第100与第110的数,它们之间的数都被包含是结果集中。

这就好办了,我把skip对应的数称为b(begin), skip+limit对应的数称为e(end). 要找到b和e,满足 index(b)=skip, index(e)=skip+limit.

1.将猜测范围锁定为全局, 我们试验中的数据源是长整型,我们把从[0, 0x7ffffffffffffffe]。

2.将猜测范围内的数分成20等分,得到集合V1=[v1, v2, ...v20], 然后得到这些点的全局排序索引I1=[index1, index2....index20]。

index(b) 与index(e) 必然落在I1中的某个区间。

假设 index2

3.针对b: 将猜测范围定在[v2, v3],进行第2步。

4.针对e:将猜测范围定在[v4, v5],进行第2步。

重复上述过程,不断缩小包转圈,直至发现b, e, 满足index(b)=skip, index(e)=skip+limit.
image.png

实际在处理过程中,会有一些边界值,比如skip太大,所有的数据都满足不了, 则第2步不满足就能发现。

再比如区间太小,不能拆分20等分,那就设步长为1来猜测。

再比如下面的一个切面, skip小于当前最小的索引, 则直接分配最小索引为skip。

image.png

测试数据产生

测试数据为每个节点100万条, 每条数据8个字节,可以将这里的数据理解成mysql的索引, 但是实际存储中索引只占很小一部分。
image.png

运行程序
image.png

测试:
image.png

图中的cost time 来自于这里
image.png

ctx->timeSec_为请求时打的点,因此cost time表示此交请求中service<->server的时间, 即后端处理的时间几乎为0。

image.png

好了, 上面的数据是100万条一个节点,我们来看看单节点1亿条的情况。

解决1亿条数据排序

生成测试数据

为了快速生成测试数据,我写了生成程序,咱们看看解析部分:

image.png

程序接受2个参数, datacount 数据数量,在此我们传1千万, filecount为文件个数,我们传10。

运行生成程序

我们试着生成一下。
image.png

生成的数据:
image.png

总共一亿条数分布在10个节点中。

起动程序后内存迅速被吃满

image.png

来在来查询一下100万起的数据:
image.png

1000万起的数据20条:
image.png

总共时间在10ms以内,可见查询时间与分布式节点的数据大小没有关系。

结论

当有上亿条数据分布在集群中的大量节点上时,如果各节点上的数据是有序的,我们对节点整个排序时,可进行 猜测=>应答=>同步 的方式进行实时操作,让节点之间实时高效地互动起来, 让它们并行运算直至产生最终结果。

目录
相关文章
|
2天前
|
消息中间件 存储 缓存
十万订单每秒热点数据架构优化实践深度解析
【11月更文挑战第20天】随着互联网技术的飞速发展,电子商务平台在高峰时段需要处理海量订单,这对系统的性能、稳定性和扩展性提出了极高的要求。尤其是在“双十一”、“618”等大型促销活动中,每秒需要处理数万甚至数十万笔订单,这对系统的热点数据处理能力构成了严峻挑战。本文将深入探讨如何优化架构以应对每秒十万订单级别的热点数据处理,从历史背景、功能点、业务场景、底层原理以及使用Java模拟示例等多个维度进行剖析。
18 8
|
30天前
|
算法 Java 数据库
美团面试:百亿级分片,如何设计基因算法?
40岁老架构师尼恩在读者群中分享了关于分库分表的基因算法设计,旨在帮助大家应对一线互联网企业的面试题。文章详细介绍了分库分表的背景、分片键的设计目标和建议,以及基因法的具体应用和优缺点。通过系统化的梳理,帮助读者提升架构、设计和开发水平,顺利通过面试。
美团面试:百亿级分片,如何设计基因算法?
|
5月前
|
存储 关系型数据库 分布式数据库
突破大表瓶颈|小鹏汽车使用PolarDB实现百亿级表高频更新和实时分析
PolarDB已经成为小鹏汽车应对TB级别大表标注、分析查询的&quot;利器&quot;。
突破大表瓶颈|小鹏汽车使用PolarDB实现百亿级表高频更新和实时分析
|
5月前
|
消息中间件 监控 druid
思源:秒级体验百亿级数据量监控钻取
思源:秒级体验百亿级数据量监控钻取
|
6月前
|
存储 监控 数据库
改良海量数据存储的若干的手段-转变数据垃圾为黄金
改良海量数据存储的若干的手段-转变数据垃圾为黄金
51 0
|
6月前
|
存储 消息中间件 Java
【亿级数据专题】「高并发架构」盘点本年度探索对外服务的百万请求量的高可靠消息服务设计实现
在深入研究了 **“【亿级数据专题】「高并发架构」盘点本年度探索对外服务的百万请求量的API网关设计实现”** 设计实现后,我们意识到,尽管API网关为服务商提供了高效的数据获取手段,但实时数据的获取仍然是一个亟待解决的问题。
99 1
【亿级数据专题】「高并发架构」盘点本年度探索对外服务的百万请求量的高可靠消息服务设计实现
|
6月前
快速排序 通透百万数据秒级排序
快速排序 通透百万数据秒级排序
41 0
|
6月前
|
存储 新零售 监控
挖掘业务场景的存储更优解
挖掘业务场景的存储更优解
|
存储 关系型数据库 MySQL
太强了!三种方案优化 2000w 数据大表!
太强了!三种方案优化 2000w 数据大表!
162 0