velox背景
velox是Meta的统一的计算引擎,主要使用在Presto、Spark等,velox是由C++实现的向量化计算引擎,其执行引擎包含Task、Driver、Operator等概念;执行引擎有内到外执行,Driver与运行线程对应。Operator执行时使用火山模型-拉的模式依次执行。
velox将Plan转换为由PlanNode组成的一棵树,然后将PlanNode转换为Operator,Operator作为基础的算子,其基类主要定义了addInput、IsBlocked、getOutput等接口来满足数据的处理和流动。
velox表达式
以FilterProject的Operator为例,在Operator中会使用有一个 std::unique_ptr<ExprSet> exprs_的变量,用来执行过滤和投影的计算。ExprSet是FilterProject计算的核心,本文主要研究下ExprSet如何执行计算。
ExprSet是对Expr的封装,Expr表示velox中可执行的表达式。
本文以 cast(a as bigint) > 1 表达式为例,来介绍如何实现Velox表达式的执行,其中包含一些源码引用。
计算目标:RowVector
velox是向量化计算引擎,velox表达式的计算目标是向量,向量在velox用Vector来表示,出于性能和内存占用的考虑,velox有多种编码的Vector来适配不同的场景,例如FlatVector、SimpleVector、DictionaryVector等。
velox中还有一种表示多列向量的结构RowVector;RowVector逻辑看做是列式表模型;在存储上,它是包含列向量Vector的数组;childrens的size对应列的个数。每个列向量的类型可以是FlatVector,也可以是DictionaryVector等。
下面是一个RowVector格式示例,包含三列,类型分别为INTERGER、VARCHAR、VARCHAR。
计算过程
接下来以一个三列的RowVector作为示例,RowVector的逻辑值如下所示,
a<string> |
b<int> |
c<string> |
"2" |
3 |
"a" |
"a5" |
0 |
"b" |
null |
4 |
"c" |
"-1" |
4 |
"d" |
本文会使用给定表达式:cast(a as bigint) > 1,来调研velox的内部实现。
接下来先抛出几个问题,通过源码的方式来逐步回答如下问题
- 表达式是怎么表示的,又是如何执行?
- 是逐行执行,还是列批量执行?
- 输入有a、b、c三列,在计算的过程中是否会用到b、c两列?也就是b、c会占用多余的内存?
- 如果列a是Dictionary编码,表达式计算会将a物化后计算吗?针对不同的编码有没有优化
- 如果列a中转换失败,表达式计算会崩溃吗?抛出exception还是结果为null
- 在执行的过程中,velox还有哪些优化措施?
表达式数据结构及执行流程
与其他语言的表达式一样,表达式往往使用一棵树来描述,表达式树的静态节点继承自core::ITypedExpr,包含五种节点类型
节点类型 |
作用 |
FieldAccessTypedExpr |
表示RowVector中的某一列,作为表达式的叶子节点 |
ConstantTypedExpr |
表示常量值,作为叶子节点 |
CallTypedExpr |
|
CastTypedExpr |
转换类型 |
LambdaTypedExpr |
lamda表达式,作为叶子节点 |
对于cast(a as bigint) > 1表达式,其对应的表达式树(编译前)如下:
velox对于表达式执行,主要包括表达式编译和执行两部分;表达式编译的过程类似PlanNode转换为Operator的过程,即把执行计划中静态的表达式转换为可执行的表达式实例。
编译
其中表达式编译前是core::ITypedExpr,编译后使用exec::Expr类型表示。
执行
执行过程:使用深度遍历执行即可,因为父节点依赖子节点的执行结果;
Expr:type_表示返回的类型,inputs_表示其孩子节点,如果当前表达式是函数,vectorFunction_表示对应函数的指针。
class Expr { ... private: const TypePtr type_; const std::vector<std::shared_ptr<Expr>> inputs_; const std::string name_; const std::shared_ptr<VectorFunction> vectorFunction_; const bool specialForm_; const bool supportsFlatNoNullsFastPath_; std::vector<VectorPtr> inputValues_; }
执行实现
执行主要是使用Expr::eval方法进行,函数前面如下:
- 其中rows表示那些行需要参与计算
- context包含输入的RowVector和内存池相关的上下文
- result表示表达式执行后的结果,类型为VectorPtr
class Expr { ... public: void eval( const SelectivityVector& rows, EvalCtx& context, VectorPtr& result, const ExprSet* FOLLY_NULLABLE parentExprSet = nullptr); ... }
EvalCtx的结构这里简单列下其主要成员:
- 其中row_表示表达式的输入;
- peeledFields_和peeledEncoding_与剥离逻辑有关,后面在做介绍。
class EvalCtx { const RowVector* FOLLY_NULLABLE row_; bool inputFlatNoNulls_; // Corresponds 1:1 to children of 'row_'. Set to an inner vector // after removing dictionary/sequence wrappers. std::vector<VectorPtr> peeledFields_; // Set if peeling was successful, that is, common encodings from inputs were // peeled off. std::shared_ptr<PeeledEncoding> peeledEncoding_; }
回到Expr::eval方法,其主要调用栈如下:
- eval
- evalEncodings
- evalWithNulls
- evalAllImpl
- if (isSpecialForm())
- evalSpecialFormWithStats(rows, context, result);
- return;
- evalArgsDefaultNulls
- for (int32_t i = 0; i < inputs_.size(); ++i)
- inputs_[i]->eval(remainingRows.rows(), context, inputValues_[i]);
- applyFunction
从调用的顺序可以看出,velox的表达式执行总体是一个后序遍历的框架,先执行孩子节点的表达式计算,再执行当前节点的applyFunction。
按理说,一个后序遍历执行下每个表达式逻辑不会很复杂,直接在将每个孩子节点的表达式结果放在inputValues_递归调用就可以,为什么中间还有evalEncoding、evalWithNulls、这些中间过程呢?事实上出于性能的考虑、velox对于特定的场景进行了极致的优化。接下来会将前文提到的问题与这些优化进行结合来描述,揭开表达式执行的面纱。
表达式执行细节
evalEncodings实现
在evalEncodings的实现中,首先介绍下DictionaryVector,然后介绍对于DictionaryVector编码的Vector如何进行编码剥离和如何进行剥离。
DictionaryVector简介
velox中大量用到一种Vector类型:DictionaryVector,DictionaryVector是一种字典编码。其背后实现是包含一个dictionaryValues_成员作为内部Vector,indices_记录每一行数据对应内部Vector的字段索引,在有重复值的场景下较为有用。
- 好处:存储使占用内存空间小,计算时可以只对dictionaryValues_操作,减少重复计算。
- 坏处:对外层Vector取值时需要decode出来,decode的过程也是通过indices_查找内部Vector的值的过程;同时,Dictionary支持多层嵌套,这种情况下想要获取某一行的值,需要一层一层拨开最内层的vector,其性能可想而知。
为了便于对DictionaryVector取值,velox提供了DecodedVector类,支持将DictionaryVector“物化”,其实现正是一层层剥离出来DictionaryVector的最内层Vector。
为什么要剥离?
在表达式计算中假如a列是Dict(Flat)的类型,对于只有一次,假如a的最内层Vector的长度是3,值为["2"、"3"、"5"];a的长度为1000,值为["2", "3", "3", "3", "5"...],其取值范围仅限于"2","3","5";
在执行cast(a as bigint)时,直观的逻辑是遍历a,循环1000次,执行cast(a as bigint);但是这样不是最高效的;
事实上只需要对内层Vector执行计算,只需要循环3次即可,不需要对1000个物化后的值进行计算,这也是evalEncodings存在的意义,在多层的情况下,比如Dict(Dict(Dict(Flat))),先物化后计算会更加浪费计算资源;除了DictionaryVector,还有ConstantVector编码也有类似的问题;这里以DictionaryVector的剥离为例:
evalEncodings主要做的事情,是将特殊编码的Vecctor如DictionaryVector背后的值拿出来,而不是直接对外层的逻辑值进行计算(以避免可能的物化代价),其过程具体来说:
- 对每个特定字段,判断是否为Flat类型,如果不是Flat,对encoding进行剥离,得到剥离后的vector和encoding。
- 对于剥离后的最内层vector进行计算,得到结果。
- 然后将上述结果使用第一步中的encoding进行重新封装。
剥离实现
剥离的过程主要使用了PeeledEncoding::peel方法,最后得到VectorPtr的数组,包含的是内层的Vector。
std::vector<VectorPtr> peeledVectors; auto peeledEncoding = PeeledEncoding::peel( vectorsToPeel, rowsToPeel, localDecoded, propagatesNulls_, peeledVectors);
实现过程是一个do while循环,通过逐个字段(5行)、逐层(20行)剥离,直到最内层不为DICTIONARY编码(13行),完整实现还会有Const类型的处理,这里隐去细节,关注主要逻辑。
do { peeled = true; BufferPtr firstIndices; maybePeeled.resize(numFields); for (int fieldIndex = 0; fieldIndex < numFields; fieldIndex++) { auto leaf = peeledVectors.empty() ? vectorsToPeel[fieldIndex] : peeledVectors[fieldIndex]; if (leaf == nullptr) { continue; } ... auto encoding = leaf->encoding(); if (encoding == VectorEncoding::Simple::DICTIONARY) { ... setPeeled(leaf->valueVector(), fieldIndex, maybePeeled); } else { ... } } if (peeled) { ++numLevels; peeledVectors = std::move(maybePeeled); } } while (peeled && nonConstant);
最终得到的peeledVectors数组,元素按照字段的序号,最终会放在ExprCtx的peeledFields_中。
ExprCtx怎么用这个剥离后的vector呢?注意到ExprCtx有一个getField方法,是用来获取特定列的vector用于计算;接下来是找到调用getField的地方。
const VectorPtr& EvalCtx::getField(int32_t index) const { const VectorPtr* field; if (!peeledFields_.empty()) { field = &peeledFields_[index]; } else { field = &row_->childAt(index); } ... return *field; }
回到最开始的表达式执行流程,在执行evalAllImpl时,前面有一句
if (isSpecialForm()) { evalSpecialFormWithStats(rows, context, result); return; }
在我们的cast(a as bigint) > 1 表达式中,其中a对应的执行表达式是FieldReference,其满足isSpecailForm()
class FieldReference : public SpecialForm
所以在执行到FieldReference时(FieldReference是叶子节点,后序遍历会先执行),会调用evalSpecailForm,其实现中会调用到context.getField(index_)(12行)。
通过以上可以看出在获取RowVector的字段取值时,会使用剥离后的内层Vector进行计算。
if (inputs_.empty()) { row = context.row(); } else { // ... } if (index_ == -1) { auto rowType = dynamic_cast<const RowType*>(row->type().get()); VELOX_CHECK(rowType); index_ = rowType->getChildIdx(field_); } VectorPtr child = inputs_.empty() ? context.getField(index_) : row->childAt(index_); // ...
回顾下整个过程:
- 在eval的最开始先使用了evalEncodings来完成剥离,剥离后的结果放在了context中;
- 然后调用evalAllImpl中遍历每一个叶子节点,FieldReference作为叶子节点被执行时,使用的已经是剥离后的结果。
- 同时从12行也解决了我们一个疑问, cast(a as bigint) > 1会不会用到b/c字段、答案是不会,只会取index_对应的值;在剥离的过程中会不会用到呢?答案也是不会,因为distinct_fields是根据表达式来计算,而不是输入内容,表达式里面只有a,所以distinct_fields只会剥离a。
- 在计算完剥离的数据后,velox还会将原来的encoding在wrap到计算结果中,例如cast(a as bigint)真正执行了3次,真正外部需要的是1000个结果,需要要用wrap encoding。
evalWithNulls实现
evalWithNulls顾名思义,是要对null值进行处理,为什么要处理null?总所周知,在SQL中用到的大部分函数对于输入为null的数据,结果也是确定的null,例如 1+null的结果是null;
这种情况下只需要判断表达式的某一行输入是否为null,没必要真正的执行表达式计算。接下来看下velox的evalWithNulls的具体实现流程:
- 判断每一列是否有null值(6行)
- 如果有null值(12行),将为null的行去除(14行)后交给evalAll处理,evalAll只对非null的行进行处理(17行)
- 处理完以后,在将null值补充到结果中(20行)
if (propagatesNulls_ && !skipFieldDependentOptimizations()) { bool mayHaveNulls = false; for (auto* field : distinctFields_) { const auto& vector = context.getField(field->index(context)); //... if (vector->mayHaveNulls()) { mayHaveNulls = true; break; } } if (mayHaveNulls) { LocalSelectivityVector nonNullHolder(context); if (removeSureNulls(rows, context, nonNullHolder)) { ScopedVarSetter noMoreNulls(context.mutableNullsPruned(), true); if (nonNullHolder.get()->hasSelections()) { evalAll(*nonNullHolder.get(), context, result); } auto rawNonNulls = nonNullHolder.get()->asRange().bits(); addNulls(rows, rawNonNulls, context, result); return; } } }
可以看出,velox是简单的将null的行去除,以达到不计算null行的效果。
applyFunction实现
在表达式的所有子节点执行完,会执行applyFunction,说明当前表达式节点是一个函数调用,接下来看下其核心实现:
- 其中包括对ascii字符的优化处理,如果输入全都是ascii,输出也是ascii,则使用函数的callAscii进行更高效的处理。
- 然后是核心(18行)调用vectorFunction_->apply来对结果进行处理
- 输入是inputValues_数组,该数组长度与函数的表达式孩子节点数相等,作为函数的参数(在上述执行流程中,遍历子节点执行时,结果放在了inputValues_)。
- result为输出,结果为VectorPtr
- 从这里可以看出vectorFunction_的输入参数是列向量,而非一行行数据传进去。
void Expr::applyFunction( const SelectivityVector& rows, EvalCtx& context, VectorPtr& result) { stats_.numProcessedVectors += 1; stats_.numProcessedRows += rows.countSelected(); auto timer = cpuWallTimer(); std::optional<bool> isAscii = std::nullopt; if (FLAGS_enable_expr_ascii_optimization) { computeIsAsciiForInputs(vectorFunction_.get(), inputValues_, rows); isAscii = type()->isVarchar() ? computeIsAsciiForResult(vectorFunction_.get(), inputValues_, rows) : std::nullopt; } try { vectorFunction_->apply(rows, inputValues_, type(), context, result); } catch (const VeloxException& ve) { throw; } catch (const std::exception& e) { VELOX_USER_FAIL(e.what()); } // ... }
VectorFunction是什么?
从VectorFunction的定义可以看出,apply的输入参数是列向量的列表,在实现VectorFunction时只要实现对VectorFunction的继承即可。
class VectorFunction { // ... virtual void apply( const SelectivityVector& rows, std::vector<VectorPtr>& args, // Not using const ref so we can reuse args const TypePtr& outputType, EvalCtx& context, VectorPtr& result) const = 0; }
但是是否所有的velox函数都是通过继承VectorFunction来实现呢?答案是否定的,每个函数在实现时参数都要处理列向量,还是比较复杂的,大部分的函数只需要实现单行的处理逻辑就可以了,其他行遍历执行即可,这种函数在velox中称为SimpleFunction。在一些列向量作为输入优势明显的场景下:比如聚合求值、列向量为Const编码、列向量为Dictionary编码等,可以将函数实现为VectorFunction。
velox的大部分函数是SimpleFunction,实现单行处理的逻辑,最简单的场景下只需要实现call函数即可,
template <typename T> struct CeilFunction { template <typename TOutput, typename TInput = TOutput> FOLLY_ALWAYS_INLINE void call(TOutput& result, const TInput& a) { if constexpr (std::is_integral_v<TInput>) { result = a; } else { result = ceil(a); } } };
以上是SimpleFunction的最简单形式,SimpleFunction虽然是行处理,但是velox依然支持很多函数实现方面的优化:
- Null处理,大部分函数支持null进null出,如果函数希望对于null返回其他值,可以重写callNullable方法,同时还有callNullFree的语法糖。
- 确定性:一个函数的输入固定后,输出是确定的,如果希望是不确定性行为,可以设置is_deterministic,比如返回随机数等。
- static constexpr bool is_deterministic = false;
- Ascii字符快速处理:支持实现callAscii方法,来高效处理输入进是ascii编码的情况。
- 字符串零拷贝:通过设置reuse_strings_from_arg,支持重用输入字符串。
- static constexpr int32_t reuse_strings_from_arg = 0;
最后一个问题:SimpleFunction是怎么转化为VectorFunction的,毕竟expr中使用的都是VectorFunction,velox是通过一个simpleFunctionAdapter来实现,在注册SimpleFunction函数时,会用到SimpleFunctionAdapterFactoryImpl
// This function should be called once and alone. template <typename UDFHolder> void registerSimpleFunction(const std::vector<std::string>& names) { mutableSimpleFunctions() .registerFunction<SimpleFunctionAdapterFactoryImpl<UDFHolder>>(names); }
来看SimpleFunctionAdapterFactoryImpl的实现:
template <typename UDFHolder> class SimpleFunctionAdapterFactoryImpl : public SimpleFunctionAdapterFactory { public: // Exposed for use in FunctionRegistry using Metadata = typename UDFHolder::Metadata; explicit SimpleFunctionAdapterFactoryImpl() {} std::unique_ptr<VectorFunction> createVectorFunction( const core::QueryConfig& config, const std::vector<VectorPtr>& constantInputs) const override { return std::make_unique<SimpleFunctionAdapter<UDFHolder>>( config, constantInputs); } };
可以看出来在createVectorFunction中实现了SimpleFunction到VectorFunction的转化;
这个转化是在哪里产生呢?是在Expr的构造过程中,在ExprPtr compileExpression的编译过程中,simpleFunction会被变换成VectorFunction,放入Expr中
auto simpleFunctionEntry = simpleFunctions().resolveFunction(call->name(), inputTypes)) { VELOX_USER_CHECK( resultType->equivalent(*simpleFunctionEntry->type().get()), "Found incompatible return types for '{}' ({} vs. {}) " "for input types ({}).", call->name(), simpleFunctionEntry->type(), resultType, folly::join(", ", inputTypes)); auto func = simpleFunctionEntry->createFunction()->createVectorFunction( config, getConstantInputs(compiledInputs)); result = std::make_shared<Expr>( resultType, std::move(compiledInputs), std::move(func), call->name(), trackCpuUsage);
失败处理
还有一个问题,如果列a中转换失败,表达式计算会崩溃吗?抛出exception还是结果为null
在表达式中,如果cast(a as bigint),如果a是字符串,转换失败会发生什么?直接来看源码,Cast对应的表达式是CastExpr
class CastExpr : public SpecialForm
接下来看下其evalSpecialForm实现,在CastExpr的转换中多次调用了context.applyToSelectedNoThrow,看函数名字应该是不会抛出exception。
context.applyToSelectedNoThrow(rows, [&](int row) { // ... }
事实是这样吗?接下来看其实现,确实handle了exception;
template <typename Callable> void applyToSelectedNoThrow(const SelectivityVector& rows, Callable func) { rows.template applyToSelected([&](auto row) INLINE_LAMBDA { try { func(row); } catch (const std::exception& e) { setError(row, std::current_exception()); } }); }
看setError的实现:第5行其实抛出了exception,这里是根据EvalCtx的throwOnError_字段进行判断,如果throwOnError_ = true,会抛出exception;否则在addError中会设置错误信息。
void EvalCtx::setError( vector_size_t index, const std::exception_ptr& exceptionPtr) { if (throwOnError_) { throwError(exceptionPtr); } addError(index, toVeloxException(exceptionPtr), errors_); }
在Expr.h中 bool throwOnError_{true}; 可以看到默认值是true,所以在cast失败时,会抛出exception;如果希望不抛出exception,可以通过ScopedVarSetter设置,在TryExpr.cpp的实现中,我们看到也有类似的调用,设置后,被try包裹的表达式不会抛出exception。
ScopedVarSetter throwOnError(context.mutableThrowOnError(), false);
TryExpr会通过context.errors() 获取表达式的错误,在处理错误的过程中,会将结果设置为null。
一个Expr就是全部吗?
事实上,在velox中,还有一个ExprSet的类,存储了Expr的列表,ExprSet也有一个eval方法,会依次调用Expr列表中eval方法,优点是,多个Expr处理时,可以对公共子表达式只处理一次,这里不在赘述。
在实际的Operator的应用中,ExprSet使用的比较多,而不是直接使用Expr,比如FilterProject这个Operator,使用ExprSet同时存储了Filter的一个Expr,和Project对应的多个Expr。
在velox整个表达式的实现过程中,velox对于不同的场景做了特定的优化,值得学习。表达式执行的过程中“见招拆招”,对于不同的输入,选择更高效的执行路径。velox源码中还有很多细节的处理,限于篇幅和水平,还有一些相关的概念没有涉及到,比如公共子表达式探测、And/OR表达式拍平、常量表达式折叠、SIMD等。
参考
https://github.com/facebookincubator/velox
https://facebookincubator.github.io/velox/develop/expression-evaluation.html
https://facebookincubator.github.io/velox/develop/scalar-functions.html