简介
Velox作为计算引擎,可以被Presto、Spark嵌入使用,Velox内部在Operator数据传递中使用的数据结构是列式结构RowVector,而Presto和Spark在计算节点之间也有对应的数据结构SerializedPage和UnsafeRow。在Presto、Spark使用Velox计算引擎的过程中需要处理两种数据格式的转换。
对于这种情况Velox提供了VectorSerde接口,同时提供了注册方法:registerVectorSerde,在不同的系统对接时,实现了相应的VectorSerde接口并注册,就可以将外部系统与Velox的RowVector实现互转。
接下来本文主要介绍VectorSerde接口的两种实现,PrestoVectorSerde和UnsafeRowVectorSerde,分别用来实现RowVector与Presto的SerializedPage、RowVector与Spark UnsafeRow的转换,同时SerailzedPage和UnsafeRow分别是列模型和行模型,两种实现也具有一定的代表性。
接下来会先介绍下Velox在序列化过程中内存管理相关的类StreamArena,ByteStream等基础实现,然后介绍PrestoVectorSerde和UnsafeRowVectorSerde的实现。
内存管理相关类
StreamArena概览
首先直接来通过这种图来看下内存相关的概念:
- page是velox中定义的内存页,大小为由AllocationTraits::kPageSize(4K)定义,也是StreamArena申请的最小单位。
- PageRun是指多个连续的page,PageRun提供data()方法,指向PageRun的首地址。
- Allocation存储了PageRun的数组,Allocation由MemoryPool::allocateNonContiguous方法得到,该可以指定申请的页的数量。
- MemoryPool是velox的内存管理接口,以树状的形式管理各个层次的内存申请和释放,在query、task、node、operator层次可以进行分别管理,本文暂不赘述。
- Allocation的析构函数会直接释放Allocation中申请的内存。
- StreamArena是元素为Allocation的vector,暴露两个内存申请的接口:
- newRange:指定需要的字节数,背后通过memoryPool申请到相关的大小。需要指出的是newRange申请到内存都是page的倍数,但不会超过一个PageRun的大小。
- newTinyRange:相对newRange来说,没有通过memoryPool申请,而是直接在StreamArena中通过类型为vector<string>的成员变量来申请,这类内存往往比较小,不需要动辄4K的页的大小来申请。
- newRange和newTinyRange申请到的内存都以ByteRange来执行,ByteRange主要包括指针buffer和长度size,分别指向申请到的内存的开始位置和大小。
Allocation & PageRun实现
PageRun结构比较简单,构造函数中指定了头地址。
Allocation包含了PageRun的vector。
class PageRun { public: ... PageRun(void* address, MachinePageCount numPages) { auto word = reinterpret_cast<uint64_t>(address); // NOLINT data_ = word | (static_cast<uint64_t>(numPages) << kPointerSignificantBits); } template <typename T = uint8_t> T* data() const { return reinterpret_cast<T*>(data_ & kPointerMask); // NOLINT } //... private: uint64_t data_; } class Allocation { MemoryPool* pool_{nullptr}; std::vector<PageRun> runs_; }
StreamArena实现
- StreamArena通过allocations_保存申请到的内存Allocation,通过currentRun_和currentPage_用来指向需要当前用到的Allocation的PageRun和Page的索引:
- 在newRange申请的过程中,会检查当前PageRun的索引是否超过了当前Allocation的PageRun总数,如果超过了需要申请新的Allocation。
- 在当前PageRun中查找合适数量的Page,然后赋值给range的buffer和size。
- tinyRanges_是string的vector:
- 在newTinyRange时,直接vector在中申请新的string,设置特定大小,然后赋值给range的buffer和size。
class StreamArena { //... private: // All allocations. std::vector<std::unique_ptr<memory::Allocation>> allocations_; // The allocation from which pages are given out. Moved to 'allocations_' when used up. memory::Allocation allocation_; int32_t currentRun_ = 0; int32_t currentPage_ = 0; memory::MachinePageCount allocationQuantum_ = 2; std::vector<std::string> tinyRanges_; } void StreamArena::newRange(int32_t bytes, ByteRange* range) { VELOX_CHECK_GT(bytes, 0); memory::MachinePageCount numPages = bits::roundUp(bytes, memory::AllocationTraits::kPageSize) / memory::AllocationTraits::kPageSize; int32_t numRuns = allocation_.numRuns(); if (currentRun_ >= numRuns) { if (numRuns) { allocations_.push_back( std::make_unique<memory::Allocation>(std::move(allocation_))); } pool_->allocateNonContiguous( std::max(allocationQuantum_, numPages), allocation_); currentRun_ = 0; currentPage_ = 0; size_ += allocation_.byteSize(); } auto run = allocation_.runAt(currentRun_); int32_t available = run.numPages() - currentPage_; range->buffer = run.data() + memory::AllocationTraits::kPageSize * currentPage_; range->size = std::min<int32_t>(numPages, available) * memory::AllocationTraits::kPageSize; range->position = 0; currentPage_ += std::min<int32_t>(available, numPages); if (currentPage_ == run.numPages()) { ++currentRun_; currentPage_ = 0; } } void StreamArena::newTinyRange(int32_t bytes, ByteRange* range) { tinyRanges_.emplace_back(); tinyRanges_.back().resize(bytes); range->position = 0; range->buffer = reinterpret_cast<uint8_t*>(tinyRanges_.back().data()); range->size = bytes; }
ByteStream概览
上面介绍了StreamArena可以用来申请内存并保存在ByteRange中,在序列化/反序列化的过程中主要用到了ByteStream这个类,ByteStream用来直接对特定类型的数据进行读写,内部仍然存储ByteRange,主要方法如下:
- append系列泛型方法提供了对不同类型的数据写入,数据最终被写入到ByteRange中。
- read系列泛型方法提供了对特定类型的读取,读取的源都是ByteRange。
序列化接口
VeloxSerde
VectorSerde是用来做RowVector的序列化和反序列化的接口,从VectorSerde的结构来看:
- 序列化:没有直接提供serialize方法,而是使用VectorSerializer来做Vector的序列化,提供了createSerializer方法来创建VectorSerializer。在创建VectorSerializer时:
- 会指定RowVector的类型type,包含每一列的名称和类型。
- 指定需要序列化的行数
- 指定用到的StreamArena,在序列化的过程中,中间结果需要存储在StreamArena中。
- 反序列化:使用deserialize方法来做Vector的反序列化,从ByteStream中反序列化到RowVector中。
class VectorSerde { public: //... virtual std::unique_ptr<VectorSerializer> createSerializer( RowTypePtr type, int32_t numRows, StreamArena* streamArena, const Options* options = nullptr) = 0; virtual void deserialize( ByteStream* source, velox::memory::MemoryPool* pool, RowTypePtr type, RowVectorPtr* result, const Options* options = nullptr) = 0; };
VectorSerializer
VectorSerializer没有直接提供serialize方法,而是提供了append方法进行序列化,在使用VectorSerializer完成序列化主要包含两步,首先是append数据,然后flush结果到OutputStream中。
- append方法可以将一部分数据行添加到serializer的“内部的存储”中。
- flush方法将“内部的存储”写到OutputStream中。
class VectorSerializer { public: virtual ~VectorSerializer() = default; /// Serialize a subset of rows in a vector. virtual void append( const RowVectorPtr& vector, const folly::Range<const IndexRange*>& ranges) = 0; /// Serialize all rows in a vector. void append(const RowVectorPtr& vector); /// Write serialized data to 'stream'. virtual void flush(OutputStream* stream) = 0; };
在外部系统数据结构与Velox的RowVector相互转换的过程中需要同时实现VectorSerde和VectorSerializer两个接口,例如:
- 针对Presto SerializedPage的PrestoVectorSerde和PrestoVectorSerializer。
- 针对Spark UnsafeRow的UnsafeRowVectorSerde和UnsafeRowVectorSerializer。
接下来就两种实现分别进行解析。
PrestoVectorSerde
在探究其实现的过程中,也需要关注内存的申请和拷贝情况。
SerializedPage
在介绍序列化和反序列化之前,先简单介绍下SerializedPage的结构,参考SerializedPage Wire Format。
总体结构
格式包括Header、列数、列;其中行数保存在Header中。
列结构
列结构中也包含列Header、Null Flags、实际值。
- 列header:表示列的类型
- Null Flags:使用一个字节表示是否有nullFlag,使用每一个bit标识是否某一行是否为null。
- 列内容,列内容根据列的类型可以分为如下几种类型:
- 定长列:主要针对BYTE、INT、SHORT、LONG、INT128等基础类型编码。与Velox的定长列FlatVector不同的是,FlatVector对于null的行也会占用空间,两者各有优劣。
- 变长列:主要针对VARCHAR等类型,每一行的值不固定,相对于定长列,需要多一个offsets存储,指定每一行数据的开始位置。
- 复合列:包括ARRAY、MAP、ROW三种类型,复合类型的存储包括基础类型的列。
- 定长列示例:
- 变长列示例:
序列化实现
PrestoVectorSerializer
PrestoVectorSerializer的实现比较清晰:
- 在构造时会根据RowVector的列来创建对应列的VectorStream,VectorStream是用来存储每一列的数据序列化的结果。
- 在append时会创建按列进行序列化,主要工作在serializeColumn中实现。
这里留几个问题,接下来一一进行研究。
- VectorStream是如何存储数据的?存储的是RowVector中数据的指针还是拷贝内存?
- serializeColumn对于RowVector每一列有没有特化处理,比如某一列是ConstantVector、DictionaryVector等怎么处理?
class PrestoVectorSerializer : public VectorSerializer { public: PrestoVectorSerializer( std::shared_ptr<const RowType> rowType, int32_t numRows, StreamArena* streamArena, bool useLosslessTimestamp) { auto types = rowType->children(); auto numTypes = types.size(); streams_.resize(numTypes); for (int i = 0; i < numTypes; i++) { streams_[i] = std::make_unique<VectorStream>( types[i], streamArena, numRows, useLosslessTimestamp); } } void append( const RowVectorPtr& vector, const folly::Range<const IndexRange*>& ranges) override { auto newRows = rangesTotalSize(ranges); if (newRows > 0) { numRows_ += newRows; for (int32_t i = 0; i < vector->childrenSize(); ++i) { serializeColumn(vector->childAt(i).get(), ranges, streams_[i].get()); } } } void flush(OutputStream* out) override { flushInternal(numRows_, false /*rle*/, out); } //... private: ... int32_t numRows_{0}; std::vector<std::unique_ptr<VectorStream>> streams_; }; } // namespace
VectorStream
VectorStream是序列化SerializedPage做准备,其成员存储了SerializedPage结构中所需的Column Header、Null Flags、长度等;同时由于SeralizedPage也支持ROW、ARRAY、MAP类型的encoding,与此对应,VectorStream对于这三种类型使用了嵌套的VectorStream来表示。
初始化内存
- children_:children表示ROW、ARRAY、MAP类型的子VectorStream,对应SerializedPage的三种类型。
- header_:用来存储每种数据类型的头信息。
- nulls_:用来存储null的值。
- lengths_:用来存储长度。
- values_:用来存储实际的值。
class VectorStream { //... private: const TypePtr type_; //... ByteRange header_; ByteStream nulls_; ByteStream lengths_; ByteStream values_; std::vector<std::unique_ptr<VectorStream>> children_; };
在VectorStream构造时,会对上述成员进行初始化,调用ByteStream::startWrite方法进行内存申请,根据RowVector中每列的数据类型,进行内存预申请。
下面将重要的字段初始化代码列出:代码中的lengths_对应SerializedPage中的offsets
- 对于ROW、ARRAY、MAP类型除了lengths_初始化,也需要进行childrens_初始化,因为是嵌套结构
- 对于VARCHAR、VARBINARY类型,进行了lengths_和values_的初始化;
class VectorStream { public: VectorStream( const TypePtr type, StreamArena* streamArena, int32_t initialNumRows, bool useLosslessTimestamp) : type_(type), useLosslessTimestamp_(useLosslessTimestamp), nulls_(streamArena, true, true), lengths_(streamArena), values_(streamArena) { //... if (initialNumRows > 0) { switch (type_->kind()) { case TypeKind::ROW: if (isTimestampWithTimeZoneType(type_)) { values_.startWrite(initialNumRows * 4); break; } [[fallthrough]]; case TypeKind::ARRAY: case TypeKind::MAP: hasLengths_ = true; lengths_.startWrite(initialNumRows * sizeof(vector_size_t)); children_.resize(type_->size()); for (int32_t i = 0; i < type_->size(); ++i) { children_[i] = std::make_unique<VectorStream>( type_->childAt(i), streamArena, initialNumRows, useLosslessTimestamp); } break; case TypeKind::VARCHAR: case TypeKind::VARBINARY: hasLengths_ = true; lengths_.startWrite(initialNumRows * sizeof(vector_size_t)); values_.startWrite(initialNumRows * 10); break; default:; values_.startWrite(initialNumRows * 4); break; } } } private: const TypePtr type_; //... ByteRange header_; ByteStream nulls_; ByteStream lengths_; ByteStream values_; std::vector<std::unique_ptr<VectorStream>> children_; };
序列化
序列化的Append方法,最终会对每一列调用serializeColumn方法,该方法实现中针对每一种列类型进行专门处理,例如对于Flat类型调用了serializeFlatVector。
针对每一种序列化的主要流程是根据提出数据的格式,写入VectorStream的lengths_、values_中,对于复合类型,进行递归调用。
需要注意的是,数据是会被写到StreamArena的内存时,使用了内存拷贝,即使是String类型。也就说在StreamArena中存储的完整的拷贝后的RowVector的数据。
void serializeColumn( const BaseVector* vector, const folly::Range<const IndexRange*>& ranges, VectorStream* stream) { switch (vector->encoding()) { case VectorEncoding::Simple::FLAT: VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL( serializeFlatVector, vector->typeKind(), vector, ranges, stream); break; case VectorEncoding::Simple::CONSTANT: VELOX_DYNAMIC_TYPE_DISPATCH_ALL( serializeConstantVector, vector->typeKind(), vector, ranges, stream); break; case VectorEncoding::Simple::BIASED: switch (vector->typeKind()) { case TypeKind::SMALLINT: serializeBiasVector<int16_t>(vector, ranges, stream); break; case TypeKind::INTEGER: serializeBiasVector<int32_t>(vector, ranges, stream); break; case TypeKind::BIGINT: serializeBiasVector<int64_t>(vector, ranges, stream); break; default: throw std::invalid_argument("Invalid biased vector type"); } break; case VectorEncoding::Simple::ROW: serializeRowVector(vector, ranges, stream); break; case VectorEncoding::Simple::ARRAY: serializeArrayVector(vector, ranges, stream); break; case VectorEncoding::Simple::MAP: serializeMapVector(vector, ranges, stream); break; case VectorEncoding::Simple::LAZY: serializeColumn(vector->loadedVector(), ranges, stream); break; default: serializeWrapped(vector, ranges, stream); } }
- flush方法
serializeColumn是将数据写入StreamArena的内存中,flush方法可以真正的将数据严格按照SerializedPage的格式写入OutputStream。其中ROW、ARRAY、MAP类型调用了递归调用了children的flush方法。
void flush(OutputStream* out) { out->write(reinterpret_cast<char*>(header_.buffer), header_.size); switch (type_->kind()) { case TypeKind::ROW: //... case TypeKind::ARRAY: //... case TypeKind::MAP: //... case TypeKind::VARCHAR: case TypeKind::VARBINARY: //... default: //... } }
在PrestoVectorSerializer的flush方法中,会按照列的个数,逐个调用对应的VectorStream的flush方法。
反序列化实现
反序列化是指从SerializePage二进制格式转化成RowVector;
- 从函数签名可以看出,需要传入输出类型RowType,说明在反序列化之前已经知道了RowVector有哪些列。
- 反序列化的源头是ByteStream,有一组ByteRange组成。
void deserialize( ByteStream* source, velox::memory::MemoryPool* pool, std::shared_ptr<const RowType> type, std::shared_ptr<RowVector>* result) override;
反序列化的核心方法是readColumns,读取每一列的数据到result的children中。
auto children = &(*result)->children(); auto childTypes = type->as<TypeKind::ROW>().children(); readColumns(source, pool, childTypes, children);
接下来看下readColumns的核心实现:
- 在readColumns中首先定义了针对不同数据类型的读取函数:对于简单类型,都使用了read函数,对于复杂类型使用了单独的读取方法。
- 然后按照每一列的类型,逐个调用其对应类型的读取方法。
void readColumns( ByteStream* source, velox::memory::MemoryPool* pool, const std::vector<TypePtr>& types, std::vector<VectorPtr>* result) { static std::unordered_map< TypeKind, std::function<void( ByteStream * source, std::shared_ptr<const Type> type, velox::memory::MemoryPool * pool, VectorPtr * result)>> readers = { {TypeKind::BOOLEAN, &read<bool>}, {TypeKind::TINYINT, &read<int8_t>}, {TypeKind::SMALLINT, &read<int16_t>}, {TypeKind::INTEGER, &read<int32_t>}, {TypeKind::BIGINT, &read<int64_t>}, {TypeKind::REAL, &read<float>}, {TypeKind::DOUBLE, &read<double>}, {TypeKind::TIMESTAMP, &read<Timestamp>}, {TypeKind::DATE, &read<Date>}, {TypeKind::VARCHAR, &read<StringView>}, {TypeKind::VARBINARY, &read<StringView>}, {TypeKind::ARRAY, &readArrayVector}, {TypeKind::MAP, &readMapVector}, {TypeKind::ROW, &readRowVector}, {TypeKind::UNKNOWN, &read<UnknownValue>}}; for (int32_t i = 0; i < types.size(); ++i) { auto it = readers.find(types[i]->kind()); //... it->second(source, types[i], pool, &(*result)[i]); } }
接下来分别以简单类型和复杂类型来举例说明:
- 简单类型:首先读取size,然后根据size大小创建特定大小的FlatVector,最后对flatVector进行赋值。
template <typename T> void read( ByteStream* source, std::shared_ptr<const Type> type, velox::memory::MemoryPool* pool, VectorPtr* result) { int32_t size = source->read<int32_t>(); if (*result && result->unique()) { (*result)->resize(size); } else { *result = BaseVector::create(type, size, pool); } auto flatResult = (*result)->asFlatVector<T>(); auto nullCount = readNulls(source, size, flatResult); BufferPtr values = flatResult->mutableValues(size); readValues<T>(source, size, flatResult->nulls(), nullCount, values); }
- 复杂类型:以MapVector为例,首先会将mapVector的key和value提取出来作为children,通过递归调用readColumns将值写入到children中,然后将key和value的Vector设置到MapVector中。其他复杂类型同理,也是将children先进行序列化,然后children赋值到复杂类型中。
void readMapVector( ByteStream* source, std::shared_ptr<const Type> type, velox::memory::MemoryPool* pool, VectorPtr* result) { MapVector* mapVector = (*result && result->unique()) ? (*result)->as<MapVector>() : nullptr; std::vector<TypePtr> childTypes = {type->childAt(0), type->childAt(1)}; std::vector<VectorPtr> children(2); if (mapVector) { children[0] = mapVector->mapKeys(); children[1] = mapVector->mapValues(); } readColumns(source, pool, childTypes, &children); //... mapVector->setKeysAndValues(children[0], children[1]); //... }
同样,在反序列化的过程中,也会将数据从ByteStream中数据复制到Vector中,同样发生了内存复制。
UnsafeRowVectorSerde
UnsafeRow结构
多行
UnsafeRow用在Spark中,是二进制的行存格式,数据是逐行排放,多行的数据二进制表示如下:
- 开头是行的占用空间大小,接着是UnsafeRow的二进制内容;后面依次存储下一行的数据。
UnsafeRow
- UnsafeRow代表一行数据,每行数据会有多个列,对于不同类型,UnsafeRow定义了null flags、定长数据、变长数据三部分;
- 如果多列的简单类型字段比如bool、int、short,因为简单类型可以被8个字节覆盖,所以直接按列存入定长数据区;
- 如果8个字节不能覆盖,比如string类型,定长数据区会存储每列string的指针和长度,在变长区存储实际的字符串的值。
- 对于列是Array、Map、Struct类型,主要的区别都在变长区,可以看下图示意,比如Map类型,会在变长区先存储keys的长度,然后是key的字段数量、null flag、key的实际值,接着是value的字段数量、null flag、value的实际值;
序列化接口
UnsafeRowVectorSerializer
同样,UnsafeRowVectorSerde继承自VectorSerde,对于序列化主要关注UnsafeRowVectorSerializer的实现。主要流程如下:
- 根据输入的RowVector构造UnsafeRowFast;
- 计算出RowVector的总大小,然后申请相应的内存大小,申请后的内存保存在buffers_中:
- 注意:申请到的内存的生命周期、buffers_生命周期、UnsafeRowVectorSerializer的生命周期是同步的。
- 然后通过逐行的方式调用UnsafeRow.serialize方法,将Vector内容序列化到指定的内存位置。这个过程中使用了内存拷贝的方式。
- flush方法比较简单,直接将buffers_中的内存写出去。buffers_中的格式直接对应UnsafeRow的二进制格式。
class UnsafeRowVectorSerializer : public VectorSerializer { public: using TRowSize = uint32_t; explicit UnsafeRowVectorSerializer(StreamArena* streamArena) : pool_{streamArena->pool()} {} void append( const RowVectorPtr& vector, const folly::Range<const IndexRange*>& ranges) override { size_t totalSize = 0; row::UnsafeRowFast unsafeRow(vector); totalSize = //... calculate total size BufferPtr buffer = AlignedBuffer::allocate<char>(totalSize, pool_, 0); auto rawBuffer = buffer->asMutable<char>(); buffers_.push_back(std::move(buffer)); size_t offset = 0; for (auto& range : ranges) { for (auto i = range.begin; i < range.begin + range.size; ++i) { // Write row data. TRowSize size = unsafeRow.serialize(i, rawBuffer + offset + sizeof(TRowSize)); // Write raw size. Needs to be in big endian order. *(TRowSize*)(rawBuffer + offset) = folly::Endian::big(size); offset += sizeof(TRowSize) + size; } } } void flush(OutputStream* stream) override { for (const auto& buffer : buffers_) { stream->write(buffer->as<char>(), buffer->size()); } buffers_.clear(); } private: memory::MemoryPool* const FOLLY_NONNULL pool_; std::vector<BufferPtr> buffers_; };
UnsafeRowFast
UnsafeRowFast接受RowVector,会将RowVector转换为DecodedVector。
- DecodedVector decoded_用来存储RowVector decode后的结果,避免在运算过程中一层层的剥离vector。
- std::vector<UnsafeRowFast> children_;用于ARRAY, MAP 和 ROW三种类型,用来存储三种类型对应的子UnsafeRowFast,这种嵌套用法比较常用,在前面的VectorStream中也多次用到。
- initialize方法会用来初始化decoded_和children_。
- serialize是真正将特定行的数据,写入buffer中。
class UnsafeRowFast { public: explicit UnsafeRowFast(const RowVectorPtr& vector); ... /// Serializes row at specified index into 'buffer'. /// 'buffer' must have sufficient capacity and set to all zeros. int32_t serialize(vector_size_t index, char* buffer); protected: explicit UnsafeRowFast(const VectorPtr& vector); void initialize(const TypePtr& type); private: const TypeKind typeKind_; DecodedVector decoded_; /// ARRAY, MAP and ROW types only. std::vector<UnsafeRowFast> children_; std::vector<bool> childIsFixedWidth_; };
children_和decoded_实现是在initialize函数中实现的,具体如下:
- 对于ArrayVector类型,直接将elements设置为children_;
- 对于MapVector类型,将mapKeys和mapValues设置为children_;
- 对于RowVector类型,将每一列设置为children_;
- 对于其他简单类型,主要设置valueBytes_等字段。
void UnsafeRowFast::initialize(const TypePtr& type) { auto base = decoded_.base(); switch (typeKind_) { case TypeKind::ARRAY: { auto arrayBase = base->as<ArrayVector>(); children_.push_back(UnsafeRowFast(arrayBase->elements())); //... break; } case TypeKind::MAP: { auto mapBase = base->as<MapVector>(); children_.push_back(UnsafeRowFast(mapBase->mapKeys())); children_.push_back(UnsafeRowFast(mapBase->mapValues())); //... break; } case TypeKind::ROW: { auto rowBase = base->as<RowVector>(); for (const auto& child : rowBase->children()) { children_.push_back(UnsafeRowFast(child)); } //... break; } case TypeKind::BOOLEAN: valueBytes_ = 1; fixedWidthTypeKind_ = true; break; case TypeKind::TINYINT: FOLLY_FALLTHROUGH; case TypeKind::SMALLINT: FOLLY_FALLTHROUGH; case TypeKind::INTEGER: FOLLY_FALLTHROUGH; case TypeKind::BIGINT: FOLLY_FALLTHROUGH; case TypeKind::REAL: FOLLY_FALLTHROUGH; case TypeKind::DOUBLE: FOLLY_FALLTHROUGH; case TypeKind::DATE: case TypeKind::UNKNOWN: valueBytes_ = type->cppSizeInBytes(); fixedWidthTypeKind_ = true; supportsBulkCopy_ = decoded_.isIdentityMapping(); break; case TypeKind::TIMESTAMP: valueBytes_ = sizeof(int64_t); fixedWidthTypeKind_ = true; break; case TypeKind::VARCHAR: FOLLY_FALLTHROUGH; case TypeKind::VARBINARY: // Nothing to do. break; default: VELOX_UNSUPPORTED("Unsupported type: {}", type->toString()); } }
序列化的过程,主要调用了serializeRow方法,通过遍历children,也用来区分定长列和非定长列:
- 如果是定长列,调用serializeFixedWidth方法序列化。
- 如果是非定长列,调用serializeVariableWidth方法,同时会将size和offset存入定长区。
int32_t UnsafeRowFast::serializeRow(vector_size_t index, char* buffer) { auto childIndex = decoded_.index(index); int64_t variableWidthOffset = rowNullBytes_ + kFieldWidth * children_.size(); for (auto i = 0; i < children_.size(); ++i) { auto& child = children_[i]; // Write null bit. if (child.isNullAt(childIndex)) { bits::setBit(buffer, i, true); continue; } // Write value. if (childIsFixedWidth_[i]) { child.serializeFixedWidth( childIndex, buffer + rowNullBytes_ + i * kFieldWidth); } else { auto size = child.serializeVariableWidth( childIndex, buffer + variableWidthOffset); // Write size and offset. uint64_t sizeAndOffset = variableWidthOffset << 32 | size; reinterpret_cast<uint64_t*>(buffer + rowNullBytes_)[i] = sizeAndOffset; variableWidthOffset += alignBytes(size); } } return variableWidthOffset; }
接下来看下定长列和非定长列的具体实现:
- 定长列:直接通过memcpy将decoded数据写入到buffer中。
- 非定长列:
- 对于varchar/varbinary类型,根据字符串长度memcpy。
- 对于ARRAY、ROW、MAP分别调用子方法进行序列化,实际上这些方法最后都会调用到serializeFixedWidth和serializeVariableWidth方法,具体实现不再赘述。
void UnsafeRowFast::serializeFixedWidth( vector_size_t offset, vector_size_t size, char* buffer) { VELOX_DCHECK(supportsBulkCopy_); // decoded_.data<char>() can be null if all values are null. if (decoded_.data<char>()) { memcpy( buffer, decoded_.data<char>() + decoded_.index(offset) * valueBytes_, valueBytes_ * size); } } int32_t UnsafeRowFast::serializeVariableWidth( vector_size_t index, char* buffer) { switch (typeKind_) { case TypeKind::VARCHAR: FOLLY_FALLTHROUGH; case TypeKind::VARBINARY: { auto value = decoded_.valueAt<StringView>(index); memcpy(buffer, value.data(), value.size()); return value.size(); } case TypeKind::ARRAY: return serializeArray(index, buffer); case TypeKind::MAP: return serializeMap(index, buffer); case TypeKind::ROW: return serializeRow(index, buffer); default: VELOX_UNREACHABLE( "Unexpected type kind: {}", mapTypeKindToName(typeKind_)); }; }
反序列化接口
UnsafeRowDeserializer
反序列化用到了UnsafeRowDeserializer::deserialize方法,接受string_view的vector,vector的每个元素代表一行数据,反序列化过程中主要调用了convertToVectors方法
static VectorPtr deserialize( const std::vector<std::optional<std::string_view>>& data, const TypePtr& type, memory::MemoryPool* pool) { return convertToVectors(getBatchIteratorPtr(data, type), pool); }
- convertToVectors方法接受一个DataBatchIterator,然后根据数据类型,调用不同的方法进行反序列化。
static VectorPtr convertToVectors( const DataBatchIteratorPtr& dataIterator, memory::MemoryPool* pool) { const TypePtr& type = dataIterator->type(); if (type->isPrimitiveType()) { return convertPrimitiveIteratorsToVectors(dataIterator, pool); } else if (type->isRow()) { return convertStructIteratorsToVectors(dataIterator, pool); } else if (type->isArray()) { return convertArrayIteratorsToVectors(dataIterator, pool); } else if (type->isMap()) { return convertMapIteratorsToVectors(dataIterator, pool); } else { VELOX_NYI("Unsupported data iterators type"); } }
getBatchIteratorPtr 根据不同的数据类型,会生成不同的DataBatchIterator
inline DataBatchIteratorPtr getBatchIteratorPtr( const std::vector<std::optional<std::string_view>>& data, const TypePtr& type) { if (type->isPrimitiveType()) { return std::make_shared<PrimitiveBatchIterator>(data, type); } else if (type->isRow()) { return std::make_shared<StructBatchIterator>(data, type); } else if (type->isArray()) { return std::make_shared<ArrayBatchIterator>(data, type); } else if (type->isMap()) { return std::make_shared<MapBatchIterator>(data, type); } VELOX_NYI("Unknown data type " + type->toString()); return nullptr; }
convertToVectors中以convertPrimitiveIteratorsToVectors为例,最终会调用createFlatVector
- 首先将iterator转换为PrimitiveBatchIterator,获取行数,构建对应的FlatVector
- 然后逐行调用iterator->next()方法对数据进行迭代,在迭代的过程根据类型区分
- StringView类型,需要读取变长区的数据
- 其他Primitive类型,读取定长区数据
template <TypeKind Kind> static VectorPtr createFlatVector( const DataBatchIteratorPtr& dataIterator, const TypePtr& type, memory::MemoryPool* pool) { auto iterator = std::dynamic_pointer_cast<PrimitiveBatchIterator>(dataIterator); size_t size = iterator->numRows(); auto vector = BaseVector::create(type, size, pool); using T = typename TypeTraits<Kind>::NativeType; using TypeTraits = ScalarTraits<Kind>; auto* flatResult = vector->asFlatVector<T>(); for (int32_t i = 0; i < size; ++i) { if (iterator->isNull(i)) { vector->setNull(i, true); iterator->next(); } else { vector->setNull(i, false); if constexpr (std::is_same_v<T, StringView>) { StringView val = UnsafeRowPrimitiveBatchDeserializer::deserializeStringView( iterator->next().value()); TypeTraits::set(flatResult, i, val); } else { typename TypeTraits::SerializedType val = UnsafeRowPrimitiveBatchDeserializer::deserializeFixedWidth< typename TypeTraits::SerializedType>( iterator->next().value()); TypeTraits::set(flatResult, i, val); } } } return vector; }
对于ROW、ARRAY、MAP类型分别会调用convertStructIteratorsToVectors、convertArrayIteratorsToVectors、convertMapIteratorsToVectors原理类似,按照逐行的方式进行解析,并构建对应的Vector。
总结
对于实现了VectorSerde的类,可以通过registerVectorSerde方法注册到系统中,系统就可以根据getNamedVectorSerde来找到对应的VectorSerda实现类来进行外部数据格式的序列化和反序列化;这里做些简单总结:
- VectorSerde是Velox引擎对外沟通的接口,通过接口的形式来保持系统的扩展性,在不侵入velox引擎的情况下,外部可以根据VectorSerde进行实现并注册,来完成外部数据的写入和读出。
- 在使用Velox的Vector时,需要清楚Vector的各种编码类型,尤其是列式场景下,针对不同的编码可以有对应的“Fast Path”执行路径,以提高执行效率。
- 在序列化和反序列化的过程中,可以看出实现中多次出现内存拷贝的情况,个人认为是因为在Serde执行的过程中,原数据的生命周期已经结束,如果使用内存引用的方式,可能并不安全。这部分逻辑待继续调研。
- 在StreamArena实现的过程中,也考虑了小内存的申请,对于小于一页4K的大小的需求,直接使用vector<string>来申请内存。
- 同时在上述Velox的两种序列化方式中,都是Schema固定的场景:列的数量和格式都是固定,velox本身、Presto、Spark都是列确定的模式;对于SchemaFree(每一行的列不固定)的场景下,文中没有涉及。
参考
https://prestodb.io/docs/current/develop/serialized-page.html
https://facebookincubator.github.io/velox/develop/serde/unsaferow.html
https://github.com/facebookincubator/velox