详谈C++11新特性之future及开源项目ananas(folly,std c++11和ananas的future各自的区别是?)(而)

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 详谈C++11新特性之future及开源项目ananas(folly,std c++11和ananas的future各自的区别是?)

下面就几种场景展示一下使用ananas future的解决方案。

3.使用场景


3.1 按顺序向多个服务器发起请求:链式调用


服务器需要向redis1拉取玩家基础信息,获得基础信息后,又根据其内容,再向redis2请求获取详细信息。在老式C代码中,使用callback我们一般需要保存上下文,而C++11可以利用shared_ptr和lambda模拟闭包以捕获上下文:


//1. 异步获取基础信息
redis_conn1->Get<BasicProfile>("basic_profile_key")
.Then([redis_conn2](const BasicProfile& data) {
    //2. 处理返回的基础信息,异步获取详细信息                           
    return redis_conn2->Get<DetailProfile>("detail_profile_key"); 
    // it return another future
})
.Then([client_conn](const DetailProfile& data) {
    //3. SUCC 处理返回的详细信息,返回给客户端
    client_conn->SendPacket(data);
})
.OnTimeout(std::chrono::seconds(3), [client_conn]() {
    std::cout << "请求超时了\n";
    //3. FAIL 返回给客户端
    client_conn->SendPacket("server timeout error");
}, &this_event_loop);

第一个Get发起请求,并立即返回,使用Then注册callback处理结果,第一个请求返回后,发起第二个Get请求,当第二个请求返回后,再发送给客户端。其中OnTimeout是处理请求超时的情况,如果3s内任意redis没有返回响应,thiseventloop超时回调,向客户端通知。


3.2 同时向多个服务器发起请求,当所有请求返回后,开始处理


仍然沿用上面的例子,条件改为基础信息和详细信息没有关联,可以同时请求,并都发送给客户端:


//1. 异步获取基础信息和详细信息
auto fut1 = redis_conn1->Get<BasicProfile>("basic_profile_key");
auto fut2 = redis_conn2->Get<DetailProfile>("detail_profile_key");
ananas::WhenAll(fut1, fut2)
.Then([client_conn](std::tuple<BasicProfile, DetailProfile>& results) {
    //2. SUCC 返回给客户端
    client_conn->SendPacket(std::get<0>(results));
    client_conn->SendPacket(std::get<1>(results));
})
.OnTimeout(std::chrono::seconds(3), [client_conn]() {
    std::cout << "请求超时了\n";
    //3. FAIL 返回给客户端
    client_conn->SendPacket("server timeout error");
}, &this_event_loop);

WhenAll将所有future的结果收集起来,只有收集完毕,才会执行回调。


3.3 同时向多个服务器发起请求,当某一个请求返回后,开始处理


假如有3个同样的服务器S1,S2,S3,我们想发起100次请求测试,看哪个服务器响应最快。这是使用WhenAny的场景:


 

struct Statics
    {
        std::atomic<int> completes{0};
        std::vector<int>  firsts;
        explicit Statics(int n) : 
            firsts(n)
        { } 
    };  
    auto stat = std::make_shared<Statics>(3); // 统计每个服务器获得第一的次数 (响应最快)
    const int kTests = 100;
    for (int i = 0; i < kTests; ++ i)
    {   
        std::vector<Future<std::string> > futures;
        for (int i = 0; i < 3; ++ i)
        {   
            auto fut = conn[i]->Get<std::string>("ping");
            futures.emplace_back(std::move(fut));
        }   
        auto anyFut = futures.WhenAny(std::begin(futures), std::end(futures));
        anyFut.Then([stat](std::pair<size_t/* fut index*/, std::string>& result) {
            size_t index = result.first;
            // 本次,index这个服务器的响应最快
            stat->firsts[index] ++; 
            if (stat->completes.fetch_add(1) == kTests - 1) {
                // 100次测试完成 
                int quickest = 0;
                for (int i = 1; i < 3; ++ i)
                {   
                    if (stat->firsts[i] > stat->firsts[quickest])
                        quickest = i;
                }   
                printf("The fast server index is %d\n", quickest);
            }   
        });
    }

当3个请求中有任意一个返回(亦即最快的那个服务器),回调函数执行,统计次数。


最终,次数最多的那个服务器基本就是响应最快的。


3.4.同时向多个服务器发起请求,当其中过半请求返回后,开始处理


典型场景是paxos。在第一阶段,proposer尝试发起预提案prepare;当得到多数派acceptors的承诺回包,才可以发起第二阶段,请求提议一个值给acceptors:


// paxos phase1: Proposer发送prepare给Acceptors
const paxos::Prepare prepare;
std::vector<Future<paxos::Promise> > futures;
for (const auto& acceptor : acceptors_)
{
    auto fut = acceptor.SendPrepare(prepare);
    futures.emplace_back(std::move(fut));
}
const int kMajority = static_cast<int>(futures.size() / 2) + 1;
// 这里用匿名future即可
WhenN(kMajority, std::begin(futures), std::end(futures))
.Then([](std::vector<paxos::Promise>& results) {
    printf("提议成功,收到了多数派acceptors的承诺,现在发起第二阶段propose!\n");
    // paxos phase2: 选择一个值:SelectValue
    const auto value = SelectValue(hint_value);
    // 向acceptors发起提案:
    // foreach (a in acceptors)
    //   a->SendAccept(ctx_id, value); // 使用ctx-id,保证两阶段使用的是同一个提议id号码
})
.OnTimeout(std::chrono::seconds(3), []() {
    printf("prepare超时,也许是失败,请增大提议号重试发起!\n");
    //increase prepareId and  continue send prepare
},
&this_eventloop);

3.5 指定Then回调在特定线程执行


在Herb Sutter的提案中,提到了关于指派Then回调函数在特定线程执行的能力。对此,我捏造了这样的一个例子:


假如服务器需要读一个很大的文件,文件是没有非阻塞读的(先不考虑io_sumbit ),read可能需要数百毫秒的时间。如果采取同步读取,势必造成服务器阻塞。我们希望另外开一个IO线程读取,当IO线程读取完成通知我们。 使用future编写代码如下:

// In this_loop thread.
// 在另外一个线程读取very_big_file
Future<Buffer> ft(ReadFileInSeparateThread(very_big_file));
ft.Then([conn](const Buffer& file_contents) {
    // SUCCESS : process file_content; 
    conn->SendPacket(file_content);
})
.OnTimeout(std::chrono::seconds(3), [=very_big_file]() {
    // FAILED OR TIMEOUT: 
    printf("Read file %s failed\n", very_big_file); 
},
&this_loop);

这样的代码是否存在问题?请注意,对于一个tcp连接,send一般来说都不允许多线程调用。callback中的这行语句


conn->SendPacket(file_content);

是在读文件线程中执行的,因此有多线程调用send的危险。


所以我们需要指定该callback在原来的线程执行,很简单,只需要改动一行,调用另外一个Then的重载:


ft.Then(&this_loop, [conn](const Buffer& file_contents) { ...

注意第一个参数this_loop,这样,SendPacket就将在本线程运行,不存在并发错误了。


4.示例:基于future的redis客户端


前面简单介绍了future使用的各种场景,现在以一个完整的例子结束本文:redis客户端。之所以选择实现redis客户端,一是因为redis应用广泛,大家对它很熟悉;二是redis协议简单,且能保证协议应答的有序性,实现起来难度不大,不至于使大家分散注意力。


4.1协议的发送


对于协议打包,我选择了采用inline协议。利用C++11的变长模板参数可以非常容易做到:

// Build redis request from multiple strings, use inline protocol 
template <typename... Args>
std::string BuildRedisRequest(Args&& ...);
template <typename STR>
std::string BuildRedisRequest(STR&& s)
{
    return std::string(std::forward<STR>(s)) + "\r\n";
}
template <typename HEAD, typename... TAIL>
std::string BuildRedisRequest(HEAD&& head, TAIL&&... tails)
{
    std::string h(std::forward<HEAD>(head));
    return h + " " + BuildRedisRequest(std::forward<TAIL>(tails)...);
}


4.2 协议的发送与上下文维护


redis支持pipeline请求,也就是不必要一应一答。因此我们需要为发送出去的请求保存一个上下文。由于请求和应答是严格有序对应的,一定程度上简化了我们的实现。当发出一个请求,需要为此构造一个Promise,这里简单说一下Promise:promise和future是一一对应的,可以理解为生产者操作promise,为其填充value,而消费者操作future,为其注册回调函数,在获得value时这些回调被执行)。这样api可以返回其对应的future,使用者就可以享用fluent的future接口:

// set name first, then get name.
    ctx->Set("name", "bertyoung").Then(
            [ctx](const ResponseInfo& rsp) {
                RedisContext::PrintResponse(rsp);
                return ctx->Get("name"); // get name, return another future
            }).Then(
                RedisContext::PrintResponse
            );

 

现在定义挂起的请求上下文:

enum ResponseType
{
    None,
    Fine, // redis返回OK
    Error, // 返回错误
    String, // redis返回字符串
};
using ResponseInfo = std::pair<ResponseType, std::string>;
struct Request
{
       std::vector<std::string> request;
       ananas::Promise<ResponseInfo> promise;
}
std::queue<Request> pending_;


每次请求,创建一个Request对象,并加入到pending_队列,queue的先进先出特性和redis协议的有序性配合非常完美:


ananas::Future<ResponseInfo>
RedisContext::Get(const std::string& key)
{
    // Redis inline protocol request
    std::string req_buf = BuildRedisRequest("get", key);
    hostConn_->SendPacket(req_buf.data(), req_buf.size());
    RedisContext::Request req;
    req.request.push_back("get");
    req.request.push_back(key);
    auto fut = req.promise.GetFuture();
    pending_.push(std::move(req));
    return fut;
}

4.3 处理响应


当解析到完整的redis服务器回包,从pending队列中取出头部的promise,设置值即可:


auto& req = pending_.front();

// 设置promise      

req.promise.SetValue(ResponseInfo(type_, content_));

// 弹出已收到响应的请求

pending_.pop();

4.4调用示例


发起两个请求,当请求都返回后,打印:


void WaitMultiRequests(const std::shared_ptr<RedisContext>& ctx)
{
    // issue 2 requests, when they all return, callback
    auto fut1 = ctx->Set("city", "shenzhen");
    auto fut2 = ctx->Set("company", "tencent");
    ananas::WhenAll(fut1, fut2).Then(
                    [](std::tuple<ananas::Try<ResponseInfo>,
                                  ananas::Try<ResponseInfo> >& results) {
                        std::cout << "All requests returned:\n";
                        RedisContext::PrintResponse(std::get<0>(results));
                        RedisContext::PrintResponse(std::get<1>(results));
            }); 
}

5.结语


关于ananas future的使用篇就到这里,后面会带来future的源码分析以及其它模块的使用和实现。



 


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
16天前
|
存储 安全 编译器
在 C++中,引用和指针的区别
在C++中,引用和指针都是用于间接访问对象的工具,但它们有显著区别。引用是对象的别名,必须在定义时初始化且不可重新绑定;指针是一个变量,可以指向不同对象,也可为空。引用更安全,指针更灵活。
|
1月前
|
C语言 C++
C 语言的关键字 static 和 C++ 的关键字 static 有什么区别
在C语言中,`static`关键字主要用于变量声明,使得该变量的作用域被限制在其被声明的函数内部,且在整个程序运行期间保留其值。而在C++中,除了继承了C的特性外,`static`还可以用于类成员,使该成员被所有类实例共享,同时在类外进行初始化。这使得C++中的`static`具有更广泛的应用场景,不仅限于控制变量的作用域和生存期。
46 10
|
1月前
|
C语言 C++
实现两个变量值的互换[C语言和C++的区别]
实现两个变量值的互换[C语言和C++的区别]
17 0
|
2月前
|
安全 C++
C++: std::once_flag 和 std::call_once
`std::once_flag` 和 `std::call_once` 是 C++11 引入的同步原语,确保某个函数在多线程环境中仅执行一次。
|
3月前
|
存储 编译器 C语言
C++内存管理(区别C语言)深度对比
C++内存管理(区别C语言)深度对比
82 5
|
3月前
|
存储 程序员 C++
【C++小知识】基于范围的for循环(C++11)
【C++小知识】基于范围的for循环(C++11)
|
3月前
|
编译器 C语言 C++
【C++关键字】指针空值nullptr(C++11)
【C++关键字】指针空值nullptr(C++11)
|
3月前
|
存储 编译器 C++
【C++关键字】auto的使用(C++11)
【C++关键字】auto的使用(C++11)
|
3月前
|
缓存 C++ Windows
Inno setup 脚本判断 Microsoft Visual C++ Redistributable 不同版本区别
Inno setup 脚本判断 Microsoft Visual C++ Redistributable 不同版本区别
|
23天前
|
存储 编译器 对象存储
【C++打怪之路Lv5】-- 类和对象(下)
【C++打怪之路Lv5】-- 类和对象(下)
21 4