Facebook 的 C++ 11 组件库 Folly Futures

简介:

Futures 是一种通过自然的、可组合的方式表达异步计算的模式。这篇博文介绍了我们在 Facebook 中使用的一种适用于 C++11 的 futures 实现:Folly Futures。

为什么要使用异步?

想象一个服务 A 正在与服务 B 交互的场景。如果 A 被锁定到 B 回复后才能继续进行其他操作,则 A 是同步的。此时 A 所在的线程是空闲的,它不能为其他的请求提供服务。线程会变得非常笨重-切换线程是低效的,因为这需要耗费可观的内存,如果你进行了大量这样的操作,操作系统会因此陷入困境。这样做的结果就是白白浪费了资源,降低了生产力,增加了等待时间(因为请求都在队列中等待服务)。

image

如果将服务 A 做成异步的,会变得更有效率,这意味着当 B 在忙着运算的时候,A 可以转进去处理其他请求。当 B 计算完毕得出结果后,A 获取这个结果并结束请求。

image

同步代码与异步代码的比较

让我们考虑一个函数 fooSync,这个函数使用完全同步的方式完成基本计算 foo,同时用另一个函数 fooAsync 异步地在做同样的工作。fooAsync 需要提供一个输入和一个能在结果可用时调用的回调函数。

template <typename T> using Callback = std::function<void(T)>;

Output fooSync(Input);
void fooAsync(Input, Callback<Output>);

这是一种传统的异步计算表达方式。(老版本的 C/C++ 异步库会提供一个函数指针和一个 void* 类型的上下文参数,但现在 C++11 支持隐蔽功能,已经不再需要显式提供上下文参数)

传统的异步代码比同步代码更为有效,但它的可读性不高。对比同一个函数的同步和异步版本,它们都实现了一个 multiFoo 运算,这个运算为输入向量(vector)中的每一个元素执行 foo 操作:

using std::vector;vector<Output> multiFooSync(vector<Input> inputs) {
  vector<Output> outputs;
  for (auto input : inputs) {
    outputs.push_back(fooSync(input));
  }
  return outputs;}
void multiFooAsync(
  vector<Input> inputs,
  Callback<vector<Output>> callback){
  struct Context {
    vector<Output> outputs;
    std::mutex lock;
    size_t remaining;
  };
  auto context = std::make_shared<Context>();
  context->remaining = inputs.size();
    
  for (auto input : inputs) {
    fooAsync(
      input,
      [=](Output output) {
        std::lock_guard<std::mutex> guard(context->lock);
        context->outputs->push_back(output);
        if (--context->remaining == 0) {
          callback(context->outputs);
        }
      });
  }}

异步的版本要复杂得多。它需要关注很多方面,如设置一个共享的上下文对象、线程的安全性以及簿记工作,因此它必须要指定全部的计算在什么时候完成。更糟糕的是(尽管在这个例子中体现得并不明显)这使得代码执行的次序关系(computation graph)变得复杂,跟踪执行路径变得极为困难。程序员需要对整个服务的状态机和这个状态机接收不同输入时的不同行为建立一套思维模式,并且当代码中的某一处不能体现流程时要找到应该去检查的地方。这种状况也被亲切地称为“回调地狱”。

Futures

Future 是一个用来表示异步计算结果(未必可用)的对象。当计算完成,future 会持有一个值或是一个异常。例如:

#include <folly/futures/Future.h> 
using folly::Future;

// Do foo asynchronously; immediately return a Future for the output
Future<Output> fooFuture(Input);

Future<Output> f = fooFuture(input);
// f may not have a value (or exception) yet. But eventually it will.
f.isReady();  // Maybe, maybe not.
f.wait();     // You can synchronously wait for futures to become ready.
f.isReady();  // Now this is guaranteed to be true.
Output o = f.value();  // If f holds an exception, this will throw that exception.

到目前为止,我们还没有做任何 std::future 不能做的事情。但是 future 模式中的一个强有力的方面就是可以做到连锁回调,std::future 目前尚不支持此功能。我们通过方法 Future::then 来表达这个功能:

Future<double> f = 
  fooFuture(input)
  .then([](Output o) {
    return o * M_PI;
  })
  .onError([](std::exception const& e) {
    cerr << "Oh bother, " << e.what()
      << ". Returning pi instead." << endl;
    return M_PI;
  });// get() first waits, and then returns the valuecout << "Result: " << f.get() << endl;

在这里我们像使用 onError 一样使用连接起来的 then 去接住可能引发的任何异常。可以将 future 连接起来是一个重要的能力,它允许我们编写串行和并行的计算,并将它们表达在同一个地方,并为之提供明晰的错误处理。

串行功能组成

如果你想要按顺序异步计算 a、b、c 和 d,使用传统的回调方式编程就会陷入“回调地狱”- 或者,你使用的语言具备一流的匿名函数(如 C++11),结果可能是“回调金字塔”:

// the callback pyramid is syntactically annoying
void asyncA(Output, Callback<OutputA>);
void asyncB(OutputA, Callback<OutputB>);
void asyncC(OutputB, Callback<OutputC>);
void asyncD(OutputC, Callback<OutputD>);
auto result = std::make_shared<double>();
fooAsync(input, [=](Output output) {
  // ...
  asyncA(output, [=](OutputA outputA) {
    // ...
    asyncB(outputA, [=](OutputB outputB) {
      // ...
      asyncC(outputB, [=](OutputC outputC) {
        // ...
        asyncD(outputC, [=](OutputD outputD) {
          *result = outputD * M_PI;
        });
      });
    });
  });
});
// As an exercise for the masochistic reader, express the same thing without
// lambdas. The result is called callback hell.

有了 futures,顺序地使用then组合它们,代码就会变得干净整洁:

Future<OutputA> futureA(Output);
Future<OutputB> futureB(OutputA);
Future<OutputC> futureC(OutputB);

// then() automatically lifts values (and exceptions) into a Future.

OutputD d(OutputC) {
  if (somethingExceptional) throw anException;
  return OutputD();}Future<double> fut =
  fooFuture(input)
  .then(futureA)
  .then(futureB)
  .then(futureC)
  .then(d)
  .then([](OutputD outputD) { // lambdas are ok too
    return outputD * M_PI;
  });

并行功能组成

再回到我们那个 multiFoo 的例子。下面是它在 future 中的样子:

using folly::futures::collect;

Future<vector<Output>> multiFooFuture(vector<Input> inputs) {
  vector<Future<Output>> futures;
  for (auto input : inputs) {
    futures.push_back(fooFuture(input));
  }
  return collect(futures);}

collect 是一种我们提供的构建块(compositional building block),它以 Future 为输入并返回一个 Future>,这会在所有的 futures 完成后完成。(collect 的实现依赖于-你猜得到-then)有很多其他的构建块,包括:collectAny、collectN、map 和 reduce。

请注意这个代码为什么会看上去与同步版本的 multiFooSync 非常相似,我们不需要担心上下文或线程安全的问题。这些问题都由框架解决,它们对我们而言是透明的。

执行上下文

其他一些语言里的 futures 框架提供了一个线程池用于执行回调函数,你除了要知道上下文在另外一个线程中执行,不需要关注任何多余的细节。但是 C++ 的开发者们倾向于编写 C++ 代码,因为他们需要控制底层细节来实现性能优化,Facebook 也不例外。因此我们使用简单的 Executor接口提供了一个灵活的机制来明确控制回调上下文的执行:

struct Executor {
  using Func = std::function<void()>;
  virtual void add(Func) = 0;};

你可以向 then 函数传入一个 executor 来命令它的回调会通过 executor 执行。

a(input).then(executor, b);

在这段代码中,b 将会通过 executor 执行,b 可能是一个特定的线程、一个线程池、或是一些更有趣的东西。本方法的一个常见的用例是将 CPU 从 I/O 线程中解放出来,以避免队列中其他请求的排队时间。

Futures 意味着你再也不用忘记说对不起

传统的回调代码有一个普遍的问题,即不易对错误或异常情况的调用进行跟踪。程序员在检查错误和采取恰当措施上必须做到严于律己(即使是超人也要这样),更不要说当一场被意外抛出的情况了。Futures 使用包含一个值和一个异常的方式来解决这个问题,这些异常就像你希望的那样与 futures融合在了一起,除非它留在 future 单元里直到被 onErorr 接住,或是被同步地,例如,赋值或取值。这使得我们很难(但不是不可能)跟丢一个应该被接住的错误。

使用 Promise

我们已经大致看过了 futures 的使用方法,下面来说说我们该如何制作它们。如果你需要将一个值传入到 Future,使用 makeFuture:

using folly::makeFuture;
std::runtime_error greatScott("Great Scott!");
Future<double> future = makeFuture(1.21e9);
Future<double> future = makeFuture<double>(greatScott);

但如果你要包装一个异步操作,你需要使用 Promise:

using folly::Promise;
Promise<double> promise;
Future<double> future = promise.getFuture();

当你准备好为 promise 赋值的时候,使用 setValue、setException 或是 setWith:

promise.setValue(1.21e9);
promise.setException(greatScott);
promise.setWith([]{
  if (year == 1955 || year == 1885) throw greatScott;
  return 1.21e9;
});

总之,我们通过生成另一个线程,将一个长期运行的同步操作转换为异步操作,如下面代码所示:

double getEnergySync(int year) {
  auto reactor = ReactorFactory::getReactor(year);
  if (!reactor) // It must be 1955 or 1885
    throw greatScott;
  return reactor->getGigawatts(1.21);
}
Future<double> getEnergy(int year) {
  auto promise = make_shared<Promise<double>>();
  std::thread([=]{
    promise->setWith(std::bind(getEnergySync, year));
  }).detach();
  
  return promise->getFuture();
}

通常你不需要 promise,即使乍一看这像是你做的。举例来说,如果你的线程池中已经有了一个 executor 或是可以很轻易地获取它,那么这样做会更简单:

Future<double> future = folly::via(executor, std::bind(getEnergySync, year));

用例学习

我们提供了两个案例来解释如何在 Facebook 和 Instagram 中使用 future 来改善延迟、鲁棒性与代码的可读性。

Instagram 使用 futures 将他们推荐服务的基础结构由同步转换为异步,以此改善他们的系统。其结果是尾延迟(tail latency)得以显著下降,并仅用十分之一不到的服务器就实现了相同的吞吐量。他们把这些改动及相关改动带来的益处进行了记录,更多细节可以参考他们的博客。

下一个案例是一个真正的服务,它是 Facebook 新闻递送(News Feed)的一个组成部分。这个服务有一个两阶段的叶聚合模式(leaf-aggregate pattern),请求(request)会被分解成多个叶请求将碎片分配到不同的叶服务器,我们在做同样的事情,但根据第一次聚合的结果分配的碎片会变得不同。最终,我们获取两组结果集并将它们简化为一个单一的响应(response)。

image

下面是相关代码的简化版本:

Future<vector<LeafResponse>> fanout(
  const map<Leaf, LeafReq>& leafToReqMap,
  chrono::milliseconds timeout)
{
  vector<Future<LeafResponse>> leafFutures;
  for (const auto& kv : leafToReqMap) {
    const auto& leaf = kv.first;
    const auto& leafReq = kv.second;
    leafFutures.push_back(
      // Get the client for this leaf and do the async RPC
      getClient(leaf)->futureLeafRPC(leafReq)
      // If the request times out, use an empty response and move on.
      .onTimeout(timeout, [=] { return LeafResponse(); })
      // If there's an error (e.g. RPC exception),
      // use an empty response and move on.
      .onError([=](const exception& e) { return LeafResponse(); }));
  }
  // Collect all the individual leaf requests into one Future
  return collect(leafFutures);
}
// Some sharding function; possibly dependent on previous responses.
map<Leaf, LeafReq> buildLeafToReqMap(
    const Request& request,
    const vector<LeafResponse>& responses);
// This function assembles our final response.
Response assembleResponse(
    const Request& request,
    const vector<LeafResponse>& firstFanoutResponses,
    const vector<LeafResponse>& secondFanoutResponses);
Future<Response> twoStageFanout(shared_ptr<Request> request) {
  // Stage 1: first fanout
  return fanout(buildLeafToReqMap(*request, {}),
                FIRST_FANOUT_TIMEOUT_MS)
  // Stage 2: With the first fanout completed, initiate the second fanout.
  .then([=](vector<LeafResponse>& responses) {
    auto firstFanoutResponses =
      std::make_shared<vector<LeafResponse>>(std::move(responses));
    
    // This time, sharding is dependent on the first fanout.
    return fanout(buildLeafToReqMap(*request, *firstFanoutResponses),
                  SECOND_FANOUT_TIMEOUT_MS)
    
    // Stage 3: Assemble and return the final response.
    .then([=](const vector<LeafResponse>& secondFanoutResponses) {
      return assembleResponse(*request, *firstFanoutResponses, secondFanoutResponses);
    });
  });
}

该服务的历史版本中曾使用只允许整体超时的异步框架,同时使用了传统的“回调地狱”模式。是 Futures 让这个服务自然地表达了异步计算,并使用有粒度的超时以便在某些部分运行过慢时采取更积极的行动。其结果是,服务的平均延迟减少了三分之二,尾延迟减少到原来的十分之一,总体超时错误明显减少。代码变得更加易读和推理,作为结果,代码还变得更易维护。

当开发人员拥有了帮助他们更好理解和表达异步操作的工具时,他们可以写出更易于维护的低延迟服务。

结论

Folly Futures 为 C++11 带来了健壮的、强大的、高性能的 futures。我们希望你会喜欢上它(就像我们一样)。如果你想了解更多信息,可以查阅相关文档、文档块以及 GitHub 上的代码。

致谢

Folly Futures 制作团队的成员包括 Hans Fugal,Dave Watson,James Sedgwick,Hannes Roth 和 Blake Mantheny,还有许多其他志同道合的贡献者。我们要感谢 Twitter,特别是 Marius,他在 Facebook 关于 Finagle 和 Futures 的技术讲座,激发了这个项目的创作灵感。

文章转载自 开源中国社区[https://www.oschina.net]

相关文章
|
7天前
|
消息中间件 存储 开发工具
消息队列 MQ产品使用合集之C++如何使用Paho MQTT库进行连接、发布和订阅消息
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
25天前
|
存储 自然语言处理 安全
C++ STL标准库 《string原理与实战分析》
C++ STL标准库 《string原理与实战分析》
24 0
|
6天前
|
存储 安全 Linux
网络请求的高效处理:C++ libmicrohttpd库详解
网络请求的高效处理:C++ libmicrohttpd库详解
|
16天前
|
存储 算法 C++
C++一分钟之-标准模板库(STL)简介
【6月更文挑战第21天】C++ STL是高效通用的算法和数据结构集,简化编程任务。核心包括容器(如vector、list)、迭代器、算法(如sort、find)和适配器。常见问题涉及内存泄漏、迭代器失效、效率和算法误用。通过示例展示了如何排序、遍历和查找元素。掌握STL能提升效率,学习过程需注意常见陷阱。
24 4
|
4天前
|
存储 算法 程序员
C++基础知识(八:STL标准库(Vectors和list))
C++ STL (Standard Template Library标准模板库) 是通用类模板和算法的集合,它提供给程序员一些标准的数据结构的实现如 queues(队列), lists(链表), 和 stacks(栈)等. STL容器的提供是为了让开发者可以更高效率的去开发,同时我们应该也需要知道他们的底层实现,这样在出现错误的时候我们才知道一些原因,才可以更好的去解决问题。
|
4天前
|
算法 前端开发 C++
C++基础知识(八:STL标准库 deque )
deque在C++的STL(Standard Template Library)中是一个非常强大的容器,它的全称是“Double-Ended Queue”,即双端队列。deque结合了数组和链表的优点,提供了在两端进行高效插入和删除操作的能力,同时保持了随机访问的特性。
|
4天前
|
存储 C++ 索引
C++基础知识(八:STL标准库 Map和multimap )
C++ 标准模板库(STL)中的 map 容器是一种非常有用的关联容器,用于存储键值对(key-value pairs)。在 map 中,每个元素都由一个键和一个值组成,其中键是唯一的,而值则可以重复。
|
11天前
|
域名解析 网络协议 程序员
程序员必知:【转】adns解析库——域名解析实例(C++、linux)
程序员必知:【转】adns解析库——域名解析实例(C++、linux)
17 0
|
11天前
|
C++ 容器
C++ STL标准库 《map容器详解》
C++ STL标准库 《map容器详解》
11 0
|
11天前
|
存储 C++ 容器
C++ STL标准库 《map容器详解》
C++ STL标准库 《map容器详解》
14 0