下面就几种场景展示一下使用ananas future的解决方案。
3.使用场景
3.1 按顺序向多个服务器发起请求:链式调用
服务器需要向redis1拉取玩家基础信息,获得基础信息后,又根据其内容,再向redis2请求获取详细信息。在老式C代码中,使用callback我们一般需要保存上下文,而C++11可以利用shared_ptr和lambda模拟闭包以捕获上下文:
//1. 异步获取基础信息 redis_conn1->Get<BasicProfile>("basic_profile_key") .Then([redis_conn2](const BasicProfile& data) { //2. 处理返回的基础信息,异步获取详细信息 return redis_conn2->Get<DetailProfile>("detail_profile_key"); // it return another future }) .Then([client_conn](const DetailProfile& data) { //3. SUCC 处理返回的详细信息,返回给客户端 client_conn->SendPacket(data); }) .OnTimeout(std::chrono::seconds(3), [client_conn]() { std::cout << "请求超时了\n"; //3. FAIL 返回给客户端 client_conn->SendPacket("server timeout error"); }, &this_event_loop);
第一个Get发起请求,并立即返回,使用Then注册callback处理结果,第一个请求返回后,发起第二个Get请求,当第二个请求返回后,再发送给客户端。其中OnTimeout是处理请求超时的情况,如果3s内任意redis没有返回响应,thiseventloop超时回调,向客户端通知。
3.2 同时向多个服务器发起请求,当所有请求返回后,开始处理
仍然沿用上面的例子,条件改为基础信息和详细信息没有关联,可以同时请求,并都发送给客户端:
//1. 异步获取基础信息和详细信息 auto fut1 = redis_conn1->Get<BasicProfile>("basic_profile_key"); auto fut2 = redis_conn2->Get<DetailProfile>("detail_profile_key"); ananas::WhenAll(fut1, fut2) .Then([client_conn](std::tuple<BasicProfile, DetailProfile>& results) { //2. SUCC 返回给客户端 client_conn->SendPacket(std::get<0>(results)); client_conn->SendPacket(std::get<1>(results)); }) .OnTimeout(std::chrono::seconds(3), [client_conn]() { std::cout << "请求超时了\n"; //3. FAIL 返回给客户端 client_conn->SendPacket("server timeout error"); }, &this_event_loop);
WhenAll将所有future的结果收集起来,只有收集完毕,才会执行回调。
3.3 同时向多个服务器发起请求,当某一个请求返回后,开始处理
假如有3个同样的服务器S1,S2,S3,我们想发起100次请求测试,看哪个服务器响应最快。这是使用WhenAny的场景:
struct Statics { std::atomic<int> completes{0}; std::vector<int> firsts; explicit Statics(int n) : firsts(n) { } }; auto stat = std::make_shared<Statics>(3); // 统计每个服务器获得第一的次数 (响应最快) const int kTests = 100; for (int i = 0; i < kTests; ++ i) { std::vector<Future<std::string> > futures; for (int i = 0; i < 3; ++ i) { auto fut = conn[i]->Get<std::string>("ping"); futures.emplace_back(std::move(fut)); } auto anyFut = futures.WhenAny(std::begin(futures), std::end(futures)); anyFut.Then([stat](std::pair<size_t/* fut index*/, std::string>& result) { size_t index = result.first; // 本次,index这个服务器的响应最快 stat->firsts[index] ++; if (stat->completes.fetch_add(1) == kTests - 1) { // 100次测试完成 int quickest = 0; for (int i = 1; i < 3; ++ i) { if (stat->firsts[i] > stat->firsts[quickest]) quickest = i; } printf("The fast server index is %d\n", quickest); } }); }
当3个请求中有任意一个返回(亦即最快的那个服务器),回调函数执行,统计次数。
最终,次数最多的那个服务器基本就是响应最快的。
3.4.同时向多个服务器发起请求,当其中过半请求返回后,开始处理
典型场景是paxos。在第一阶段,proposer尝试发起预提案prepare;当得到多数派acceptors的承诺回包,才可以发起第二阶段,请求提议一个值给acceptors:
// paxos phase1: Proposer发送prepare给Acceptors const paxos::Prepare prepare; std::vector<Future<paxos::Promise> > futures; for (const auto& acceptor : acceptors_) { auto fut = acceptor.SendPrepare(prepare); futures.emplace_back(std::move(fut)); } const int kMajority = static_cast<int>(futures.size() / 2) + 1; // 这里用匿名future即可 WhenN(kMajority, std::begin(futures), std::end(futures)) .Then([](std::vector<paxos::Promise>& results) { printf("提议成功,收到了多数派acceptors的承诺,现在发起第二阶段propose!\n"); // paxos phase2: 选择一个值:SelectValue const auto value = SelectValue(hint_value); // 向acceptors发起提案: // foreach (a in acceptors) // a->SendAccept(ctx_id, value); // 使用ctx-id,保证两阶段使用的是同一个提议id号码 }) .OnTimeout(std::chrono::seconds(3), []() { printf("prepare超时,也许是失败,请增大提议号重试发起!\n"); //increase prepareId and continue send prepare }, &this_eventloop);
3.5 指定Then回调在特定线程执行
在Herb Sutter的提案中,提到了关于指派Then回调函数在特定线程执行的能力。对此,我捏造了这样的一个例子:
假如服务器需要读一个很大的文件,文件是没有非阻塞读的(先不考虑io_sumbit ),read可能需要数百毫秒的时间。如果采取同步读取,势必造成服务器阻塞。我们希望另外开一个IO线程读取,当IO线程读取完成通知我们。 使用future编写代码如下:
// In this_loop thread. // 在另外一个线程读取very_big_file Future<Buffer> ft(ReadFileInSeparateThread(very_big_file)); ft.Then([conn](const Buffer& file_contents) { // SUCCESS : process file_content; conn->SendPacket(file_content); }) .OnTimeout(std::chrono::seconds(3), [=very_big_file]() { // FAILED OR TIMEOUT: printf("Read file %s failed\n", very_big_file); }, &this_loop);
这样的代码是否存在问题?请注意,对于一个tcp连接,send一般来说都不允许多线程调用。callback中的这行语句
conn->SendPacket(file_content);
是在读文件线程中执行的,因此有多线程调用send的危险。
所以我们需要指定该callback在原来的线程执行,很简单,只需要改动一行,调用另外一个Then的重载:
ft.Then(&this_loop, [conn](const Buffer& file_contents) { ...
注意第一个参数this_loop,这样,SendPacket就将在本线程运行,不存在并发错误了。
4.示例:基于future的redis客户端
前面简单介绍了future使用的各种场景,现在以一个完整的例子结束本文:redis客户端。之所以选择实现redis客户端,一是因为redis应用广泛,大家对它很熟悉;二是redis协议简单,且能保证协议应答的有序性,实现起来难度不大,不至于使大家分散注意力。
4.1协议的发送
对于协议打包,我选择了采用inline协议。利用C++11的变长模板参数可以非常容易做到:
// Build redis request from multiple strings, use inline protocol template <typename... Args> std::string BuildRedisRequest(Args&& ...); template <typename STR> std::string BuildRedisRequest(STR&& s) { return std::string(std::forward<STR>(s)) + "\r\n"; } template <typename HEAD, typename... TAIL> std::string BuildRedisRequest(HEAD&& head, TAIL&&... tails) { std::string h(std::forward<HEAD>(head)); return h + " " + BuildRedisRequest(std::forward<TAIL>(tails)...); }
4.2 协议的发送与上下文维护
redis支持pipeline请求,也就是不必要一应一答。因此我们需要为发送出去的请求保存一个上下文。由于请求和应答是严格有序对应的,一定程度上简化了我们的实现。当发出一个请求,需要为此构造一个Promise,这里简单说一下Promise:promise和future是一一对应的,可以理解为生产者操作promise,为其填充value,而消费者操作future,为其注册回调函数,在获得value时这些回调被执行)。这样api可以返回其对应的future,使用者就可以享用fluent的future接口:
// set name first, then get name. ctx->Set("name", "bertyoung").Then( [ctx](const ResponseInfo& rsp) { RedisContext::PrintResponse(rsp); return ctx->Get("name"); // get name, return another future }).Then( RedisContext::PrintResponse );
现在定义挂起的请求上下文:
enum ResponseType { None, Fine, // redis返回OK Error, // 返回错误 String, // redis返回字符串 }; using ResponseInfo = std::pair<ResponseType, std::string>; struct Request { std::vector<std::string> request; ananas::Promise<ResponseInfo> promise; } std::queue<Request> pending_;
每次请求,创建一个Request对象,并加入到pending_队列,queue的先进先出特性和redis协议的有序性配合非常完美:
ananas::Future<ResponseInfo> RedisContext::Get(const std::string& key) { // Redis inline protocol request std::string req_buf = BuildRedisRequest("get", key); hostConn_->SendPacket(req_buf.data(), req_buf.size()); RedisContext::Request req; req.request.push_back("get"); req.request.push_back(key); auto fut = req.promise.GetFuture(); pending_.push(std::move(req)); return fut; }
4.3 处理响应
当解析到完整的redis服务器回包,从pending队列中取出头部的promise,设置值即可:
auto& req = pending_.front();
// 设置promise
req.promise.SetValue(ResponseInfo(type_, content_));
// 弹出已收到响应的请求
pending_.pop();
4.4调用示例
发起两个请求,当请求都返回后,打印:
void WaitMultiRequests(const std::shared_ptr<RedisContext>& ctx) { // issue 2 requests, when they all return, callback auto fut1 = ctx->Set("city", "shenzhen"); auto fut2 = ctx->Set("company", "tencent"); ananas::WhenAll(fut1, fut2).Then( [](std::tuple<ananas::Try<ResponseInfo>, ananas::Try<ResponseInfo> >& results) { std::cout << "All requests returned:\n"; RedisContext::PrintResponse(std::get<0>(results)); RedisContext::PrintResponse(std::get<1>(results)); }); }
5.结语
关于ananas future的使用篇就到这里,后面会带来future的源码分析以及其它模块的使用和实现。