实践:几十亿条数据分布在几十个节点上的毫秒级实时排序方法

引子

先简单的问一下, 你如何解决这样的需求:

                  对一堆数据按某字段排序,获取第100-10条的数据。

假设你面对的数据是个单节点,简单来说,就是一个mysql数据库, 很自然地用

select a from tb order by a limit 100, 10;
实践:几十亿条数据分布在几十个节点上的毫秒级实时排序方法

你面对的是10个甚至100个节点呢
实践:几十亿条数据分布在几十个节点上的毫秒级实时排序方法

按照常理,我们会先把所有节点的前110条数据拉到一个公用节点上,再排序,前文已述。

咱们把数字改一下,要获取第[100万, 100万+10]中间的数据10条数据,这个方法就不能用了。

问题变成:

              对分布在几十个节点上的上亿条数据要求1秒内排序。

上一篇文章的理论 中,我提到让各数据节点活动起来,不再只承担存储的功能,还要相互发现,相互问答,即“你那有这个数据吗”,“你那这个数据排多少名”。

就好像把全服的机器当成了神经网络中的一个个神经元,互相启发,理解新世界,产生乘法功能。

言归正传,接下来,我给大家实现一下这个需求。

咱们把目标按数据源分两种,先对内存型排序,再进行磁盘型排序,咱们这篇是头一种。

统一目标

咱们简单统一下试验目标。

1.有10个节点,每个节点存储100万条数据,都放在缓存中。

2.内存中的数据是排序过的。

“神马? 数据源中的原始数据都是排好序的, 那岂不是很简单,那你这个排序还有什么价值?”

这个问题这段时间一直困扰着我,因为部分同学没看明白前文,而钉我问我鄙视我(微笑)。

“对单节点数据源的排序成本很高吗”,我一般会这么反问,很显然,相比于海量节点的排序,单个节点内的数据排序成本很低。

“每个节点的数据源都是排好序的,所有节点间的数据排序就完成了吗”, 不是的, 中国有首富榜,印度有首富榜,就算中国比印度富一点(如有意见,纯属意淫),要获取中印共榜后的前10名,总不能只用中国的前10了事吧。

架构设计

既然咱们是来开发试验的,而且是分布式节点,咱们就得设计架构。

架构目标

1.各节点能够快速互相发现,互相通信。

2.节点掉线被系统实时发现,并做冗灾。

3.支持运行时新增节点。

试验采用的架构:
实践:几十亿条数据分布在几十个节点上的毫秒级实时排序方法

config:负责节点注册,节点发现。

server:数据源节点,试验中有10台server。

service:对外服务,承接外部http访问,转为server查询,再将返回结果合并发给用户。

试验目标

单次查询与skip的大小无关,不能查询说查询100万以上的排序比查询100条以内的慢。(这一条是不是太屌了)

响应时间控制在100ms以内。

那咱们就开始吧。

开始前先放一下效果吧,怕大家没信心看下去。

效果展示

实践:几十亿条数据分布在几十个节点上的毫秒级实时排序方法

实践:几十亿条数据分布在几十个节点上的毫秒级实时排序方法

第一张图:同样是获取100条,分别是从第10条,10000条, 1000万条起。

第二张图:获取从100万开始的1000条数据。

skip 耗时(limit=100) 耗时(limit=1000)
10000 64ms 29ms
10 20ms 38ms
1000000 23ms 58ms

可以看出,请求耗时与skip, limit并没有线型关系,从哪里开始取,取多少条,响应时间都差不多。

也就是说,当你要从分布式存储中获取第100万开始的100条数据,与从第10条开始的100条数据,所得时间相当,而且在100ms以内, 达到实时效果。

语言选择

我选择的语言是c++, 因为我觉得对内存,存储啥的,它要擅长一些。 不过我尽量写的简单通俗一点,并力争在不久后用java来实现开源。

分布式处理

分布式处理的config-service-server以前的文章里已经写过,框架具备自发现,动态扩容的特点,读者想要了解的话,我在以后的文章里可以继续写。

协议设计

外部协议

/topkn&k=1000&n=100
k: skip
n:limit

内部协议
内部节点相互通信频繁, 且是双向的, 因此我采用protobuf协议,方便扩展,速度也是扛扛的。

server功能:

message ServerRequest{
    optional ServerBGQuery    bg = 4;              //service -> server请求, “hey, 我要从100万开始的100条数”
    optional ServerBGIndexReq  bg_index = 5;       //server -> server 问索引,"hey 你那边这个数的索引是多少“
    optional ServerBGIndexSync bg_sync = 6;        //server ->sync 同步查询结果, "hey, 我已经确认了查询结果,我同步给你吧”
}
message ServerResponse{
    optional ServerBGResponse bg = 4;              //server -> service 响应, “hey,这是我查询完的结果,请收”
    optional ServerBGIndexResponse bg_index = 5;   //server -> server 答索引  “hey,这个数在我这边的索引是这个”
}

service

节点流程图
实践:几十亿条数据分布在几十个节点上的毫秒级实时排序方法

service起来服务代理的作用,承接外部http访问,转为server查询,再将返回结果合并发给用户。

接入请求,比较简单,只接收/topkn的get请求

service.handleGetReq("/topkn", [this](const BGCon& con){
     this->OnHttpPostRequestTopKN(con);
});

解析参数:
实践:几十亿条数据分布在几十个节点上的毫秒级实时排序方法

发送请求给后端的server:
实践:几十亿条数据分布在几十个节点上的毫秒级实时排序方法

所用到的协议:

message ServerBGQuery{
    required int64          _s = 1;           //为这次查询取个ID吧
    required int64          flag = 2;         //为这次查询立个flag
    required int64          k = 3;            //skip
    required int64          n = 4;            //limit
}

server返回

void Service::OpBackBG(const BGCon& con, const ServerBGResponse& response)
{
 
    for(auto&& v : response.vs()){
       auto vit = _ctx->_v.find(v.v());
       if(vit == _ctx->_v.end()){
             BigValuePtr bv(new BigValue{v.v(), v.count(), v.index()});
             _ctx->_v.insert(std::make_pair(v.v(), bv));
        }
        else{
             BigValuePtr bv = vit->second;
             bv->count += v.count();
             bv->index += v.index();
        }
    }
}

代码中的response代表某个server,response.vs()是它返回来的数据。

可以看见, service对所有server的数据进行整合。

涉及的协议:

message ServerBGResponse{
    required int64              _s = 1;       //查询的ID
    repeated ServerBGIndex      vs =  9;      //某节点中所包含的数据, “hei, 这里是我在本次查询中包含的数据”
}
 
message ServerBGIndex{
    required int64 v = 1;                     //某个数
    required int64 index = 2;                 //这个数的索引是多少
    required int64 count = 3;                 //这个数有多少个
}

service的业务功能没多少个,流程也很简单,就是起个请求代理与数据合并的功能。

server

server的的节点逻辑用一个图来表示最好不过了。

实践:几十亿条数据分布在几十个节点上的毫秒级实时排序方法

再来个简图

实践:几十亿条数据分布在几十个节点上的毫秒级实时排序方法

逻辑处理

server的响应逻辑,可以分成3个,就是实现下面的三条协议。

message ServerRequest{
    optional ServerBGQuery    bg = 4;              //service -> server请求, “hey, 我要从100万开始的100条数”
    optional ServerBGIndexReq  bg_index = 5;       //server -> server 问索引,"hey 你那边这个数的索引是多少“
    optional ServerBGIndexSync bg_sync = 6;        //server ->sync 同步查询结果, "hey, 我已经确认了查询结果,我同步给你吧”
}

对应的处理函数:
实践:几十亿条数据分布在几十个节点上的毫秒级实时排序方法

Top:service发过来, “hey, 我想查一下skip=100万,limit=100的数是哪些”

Count:“hey, 这个数在你那排第几”。

CountBack:"hey, 这个数在我这排第9”

CountSync: “hey, 我已经排完了,我知道service请求的数是这些,你看看”

处理service发来的请求
实践:几十亿条数据分布在几十个节点上的毫秒级实时排序方法

这个看到了咱们的核心函数Guess
实践:几十亿条数据分布在几十个节点上的毫秒级实时排序方法

Guess逻辑

guess部分的代码太长,咱们来看逻辑。

当接受到一次(skip, limit)请求时, 全服系统需要寻找2个索引, skip, skip+limit, 比如当skip=100, limit=10时,全服需要找的是第排序为第100与第110的数,它们之间的数都被包含是结果集中。

这就好办了,我把skip对应的数称为b(begin), skip+limit对应的数称为e(end). 要找到b和e,满足 index(b)=skip, index(e)=skip+limit.

1.将猜测范围锁定为全局, 我们试验中的数据源是长整型,我们把从[0, 0x7ffffffffffffffe]。

2.将猜测范围内的数分成20等分,得到集合V1=[v1, v2, ...v20], 然后得到这些点的全局排序索引I1=[index1, index2....index20]。

index(b) 与index(e) 必然落在I1中的某个区间。

假设 index2

3.针对b: 将猜测范围定在[v2, v3],进行第2步。

4.针对e:将猜测范围定在[v4, v5],进行第2步。

重复上述过程,不断缩小包转圈,直至发现b, e, 满足index(b)=skip, index(e)=skip+limit.
实践:几十亿条数据分布在几十个节点上的毫秒级实时排序方法

实际在处理过程中,会有一些边界值,比如skip太大,所有的数据都满足不了, 则第2步不满足就能发现。

再比如区间太小,不能拆分20等分,那就设步长为1来猜测。

再比如下面的一个切面, skip小于当前最小的索引, 则直接分配最小索引为skip。

实践:几十亿条数据分布在几十个节点上的毫秒级实时排序方法

测试数据产生

测试数据为每个节点100万条, 每条数据8个字节,可以将这里的数据理解成mysql的索引, 但是实际存储中索引只占很小一部分。
实践:几十亿条数据分布在几十个节点上的毫秒级实时排序方法

运行程序
实践:几十亿条数据分布在几十个节点上的毫秒级实时排序方法

测试:
实践:几十亿条数据分布在几十个节点上的毫秒级实时排序方法

图中的cost time 来自于这里
实践:几十亿条数据分布在几十个节点上的毫秒级实时排序方法

ctx->timeSec_为请求时打的点,因此cost time表示此交请求中service<->server的时间, 即后端处理的时间几乎为0。

实践:几十亿条数据分布在几十个节点上的毫秒级实时排序方法

好了, 上面的数据是100万条一个节点,我们来看看单节点1亿条的情况。

解决1亿条数据排序

生成测试数据

为了快速生成测试数据,我写了生成程序,咱们看看解析部分:

实践:几十亿条数据分布在几十个节点上的毫秒级实时排序方法

程序接受2个参数, datacount 数据数量,在此我们传1千万, filecount为文件个数,我们传10。

运行生成程序

我们试着生成一下。
实践:几十亿条数据分布在几十个节点上的毫秒级实时排序方法

生成的数据:
实践:几十亿条数据分布在几十个节点上的毫秒级实时排序方法

总共一亿条数分布在10个节点中。

起动程序后内存迅速被吃满

实践:几十亿条数据分布在几十个节点上的毫秒级实时排序方法

来在来查询一下100万起的数据:
实践:几十亿条数据分布在几十个节点上的毫秒级实时排序方法

1000万起的数据20条:
实践:几十亿条数据分布在几十个节点上的毫秒级实时排序方法

总共时间在10ms以内,可见查询时间与分布式节点的数据大小没有关系。

结论

当有上亿条数据分布在集群中的大量节点上时,如果各节点上的数据是有序的,我们对节点整个排序时,可进行 猜测=>应答=>同步 的方式进行实时操作,让节点之间实时高效地互动起来, 让它们并行运算直至产生最终结果。

上一篇:mysql复杂查询练习


下一篇:记一种分布式超大规模数据的实时快速排序算法