从源码看Velox如何做序列化

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 从源码角度分析Velox做序列化和反序列化的过程

简介

Velox作为计算引擎,可以被Presto、Spark嵌入使用,Velox内部在Operator数据传递中使用的数据结构是列式结构RowVector,而Presto和Spark在计算节点之间也有对应的数据结构SerializedPage和UnsafeRow。在Presto、Spark使用Velox计算引擎的过程中需要处理两种数据格式的转换。

对于这种情况Velox提供了VectorSerde接口,同时提供了注册方法:registerVectorSerde,在不同的系统对接时,实现了相应的VectorSerde接口并注册,就可以将外部系统与Velox的RowVector实现互转。

接下来本文主要介绍VectorSerde接口的两种实现,PrestoVectorSerde和UnsafeRowVectorSerde,分别用来实现RowVector与Presto的SerializedPage、RowVector与Spark UnsafeRow的转换,同时SerailzedPage和UnsafeRow分别是列模型和行模型,两种实现也具有一定的代表性。

接下来会先介绍下Velox在序列化过程中内存管理相关的类StreamArena,ByteStream等基础实现,然后介绍PrestoVectorSerde和UnsafeRowVectorSerde的实现。

内存管理相关类

StreamArena概览

首先直接来通过这种图来看下内存相关的概念:

  • page是velox中定义的内存页,大小为由AllocationTraits::kPageSize(4K)定义,也是StreamArena申请的最小单位。
  • PageRun是指多个连续的page,PageRun提供data()方法,指向PageRun的首地址。
  • Allocation存储了PageRun的数组,Allocation由MemoryPool::allocateNonContiguous方法得到,该可以指定申请的页的数量。
  • MemoryPool是velox的内存管理接口,以树状的形式管理各个层次的内存申请和释放,在query、task、node、operator层次可以进行分别管理,本文暂不赘述。
  • Allocation的析构函数会直接释放Allocation中申请的内存。
  • StreamArena是元素为Allocation的vector,暴露两个内存申请的接口:
  • newRange:指定需要的字节数,背后通过memoryPool申请到相关的大小。需要指出的是newRange申请到内存都是page的倍数,但不会超过一个PageRun的大小。
  • newTinyRange:相对newRange来说,没有通过memoryPool申请,而是直接在StreamArena中通过类型为vector<string>的成员变量来申请,这类内存往往比较小,不需要动辄4K的页的大小来申请。
  • newRange和newTinyRange申请到的内存都以ByteRange来执行,ByteRange主要包括指针buffer和长度size,分别指向申请到的内存的开始位置和大小。

Allocation & PageRun实现

PageRun结构比较简单,构造函数中指定了头地址。

Allocation包含了PageRun的vector。

class PageRun {
   public:
...
    PageRun(void* address, MachinePageCount numPages) {
      auto word = reinterpret_cast<uint64_t>(address); // NOLINT
      data_ =
          word | (static_cast<uint64_t>(numPages) << kPointerSignificantBits);
    }
    template <typename T = uint8_t>
    T* data() const {
      return reinterpret_cast<T*>(data_ & kPointerMask); // NOLINT
    }
//...
  private:
    uint64_t data_;
}
class Allocation {
  MemoryPool* pool_{nullptr};
  std::vector<PageRun> runs_;
}

StreamArena实现

  • StreamArena通过allocations_保存申请到的内存Allocation,通过currentRun_和currentPage_用来指向需要当前用到的Allocation的PageRun和Page的索引:
  • 在newRange申请的过程中,会检查当前PageRun的索引是否超过了当前Allocation的PageRun总数,如果超过了需要申请新的Allocation。
  • 在当前PageRun中查找合适数量的Page,然后赋值给range的buffer和size。
  • tinyRanges_是string的vector:
  • 在newTinyRange时,直接vector在中申请新的string,设置特定大小,然后赋值给range的buffer和size。
class StreamArena {
//...
 private:
  // All allocations.
  std::vector<std::unique_ptr<memory::Allocation>> allocations_;
  // The allocation from which pages are given out. Moved to 'allocations_' when used up.
  memory::Allocation allocation_;
  int32_t currentRun_ = 0;
  int32_t currentPage_ = 0;
  memory::MachinePageCount allocationQuantum_ = 2;
  std::vector<std::string> tinyRanges_;
}
void StreamArena::newRange(int32_t bytes, ByteRange* range) {
  VELOX_CHECK_GT(bytes, 0);
  memory::MachinePageCount numPages =
      bits::roundUp(bytes, memory::AllocationTraits::kPageSize) /
      memory::AllocationTraits::kPageSize;
  int32_t numRuns = allocation_.numRuns();
  if (currentRun_ >= numRuns) {
    if (numRuns) {
      allocations_.push_back(
          std::make_unique<memory::Allocation>(std::move(allocation_)));
    }
    pool_->allocateNonContiguous(
        std::max(allocationQuantum_, numPages), allocation_);
    currentRun_ = 0;
    currentPage_ = 0;
    size_ += allocation_.byteSize();
  }
  auto run = allocation_.runAt(currentRun_);
  int32_t available = run.numPages() - currentPage_;
  range->buffer =
      run.data() + memory::AllocationTraits::kPageSize * currentPage_;
  range->size = std::min<int32_t>(numPages, available) *
      memory::AllocationTraits::kPageSize;
  range->position = 0;
  currentPage_ += std::min<int32_t>(available, numPages);
  if (currentPage_ == run.numPages()) {
    ++currentRun_;
    currentPage_ = 0;
  }
}
void StreamArena::newTinyRange(int32_t bytes, ByteRange* range) {
  tinyRanges_.emplace_back();
  tinyRanges_.back().resize(bytes);
  range->position = 0;
  range->buffer = reinterpret_cast<uint8_t*>(tinyRanges_.back().data());
  range->size = bytes;
}

ByteStream概览

上面介绍了StreamArena可以用来申请内存并保存在ByteRange中,在序列化/反序列化的过程中主要用到了ByteStream这个类,ByteStream用来直接对特定类型的数据进行读写,内部仍然存储ByteRange,主要方法如下:

  • append系列泛型方法提供了对不同类型的数据写入,数据最终被写入到ByteRange中。
  • read系列泛型方法提供了对特定类型的读取,读取的源都是ByteRange。

序列化接口

VeloxSerde

VectorSerde是用来做RowVector的序列化和反序列化的接口,从VectorSerde的结构来看:

  • 序列化:没有直接提供serialize方法,而是使用VectorSerializer来做Vector的序列化,提供了createSerializer方法来创建VectorSerializer。在创建VectorSerializer时:
  • 会指定RowVector的类型type,包含每一列的名称和类型。
  • 指定需要序列化的行数
  • 指定用到的StreamArena,在序列化的过程中,中间结果需要存储在StreamArena中。
  • 反序列化:使用deserialize方法来做Vector的反序列化,从ByteStream中反序列化到RowVector中。
class VectorSerde {
 public:
//...
  virtual std::unique_ptr<VectorSerializer> createSerializer(
      RowTypePtr type,
      int32_t numRows,
      StreamArena* streamArena,
      const Options* options = nullptr) = 0;
  virtual void deserialize(
      ByteStream* source,
      velox::memory::MemoryPool* pool,
      RowTypePtr type,
      RowVectorPtr* result,
      const Options* options = nullptr) = 0;
};

VectorSerializer

VectorSerializer没有直接提供serialize方法,而是提供了append方法进行序列化,在使用VectorSerializer完成序列化主要包含两步,首先是append数据,然后flush结果到OutputStream中。

  • append方法可以将一部分数据行添加到serializer的“内部的存储”中。
  • flush方法将“内部的存储”写到OutputStream中。
class VectorSerializer {
 public:
  virtual ~VectorSerializer() = default;
  /// Serialize a subset of rows in a vector.
  virtual void append(
      const RowVectorPtr& vector,
      const folly::Range<const IndexRange*>& ranges) = 0;
  /// Serialize all rows in a vector.
  void append(const RowVectorPtr& vector);
  /// Write serialized data to 'stream'.
  virtual void flush(OutputStream* stream) = 0;
};

在外部系统数据结构与Velox的RowVector相互转换的过程中需要同时实现VectorSerde和VectorSerializer两个接口,例如:

  • 针对Presto SerializedPage的PrestoVectorSerde和PrestoVectorSerializer。
  • 针对Spark UnsafeRow的UnsafeRowVectorSerde和UnsafeRowVectorSerializer。

接下来就两种实现分别进行解析。

PrestoVectorSerde

在探究其实现的过程中,也需要关注内存的申请和拷贝情况。

SerializedPage

在介绍序列化和反序列化之前,先简单介绍下SerializedPage的结构,参考SerializedPage Wire Format

总体结构

格式包括Header、列数、列;其中行数保存在Header中。

结构

列结构中也包含列Header、Null Flags、实际值。

  • 列header:表示列的类型

  • Null Flags:使用一个字节表示是否有nullFlag,使用每一个bit标识是否某一行是否为null。

  • 列内容,列内容根据列的类型可以分为如下几种类型:
  • 定长列:主要针对BYTE、INT、SHORT、LONG、INT128等基础类型编码。与Velox的定长列FlatVector不同的是,FlatVector对于null的行也会占用空间,两者各有优劣。
  • 变长列:主要针对VARCHAR等类型,每一行的值不固定,相对于定长列,需要多一个offsets存储,指定每一行数据的开始位置。
  • 复合列:包括ARRAY、MAP、ROW三种类型,复合类型的存储包括基础类型的列。
  • 定长列示例:

  • 变长列示例:

序列化实现

PrestoVectorSerializer

PrestoVectorSerializer的实现比较清晰:

  • 在构造时会根据RowVector的列来创建对应列的VectorStream,VectorStream是用来存储每一列的数据序列化的结果。
  • 在append时会创建按列进行序列化,主要工作在serializeColumn中实现。

这里留几个问题,接下来一一进行研究。

  • VectorStream是如何存储数据的?存储的是RowVector中数据的指针还是拷贝内存?
  • serializeColumn对于RowVector每一列有没有特化处理,比如某一列是ConstantVector、DictionaryVector等怎么处理?
class PrestoVectorSerializer : public VectorSerializer {
 public:
  PrestoVectorSerializer(
      std::shared_ptr<const RowType> rowType,
      int32_t numRows,
      StreamArena* streamArena,
      bool useLosslessTimestamp) {
    auto types = rowType->children();
    auto numTypes = types.size();
    streams_.resize(numTypes);
    for (int i = 0; i < numTypes; i++) {
      streams_[i] = std::make_unique<VectorStream>(
          types[i], streamArena, numRows, useLosslessTimestamp);
    }
  }
  void append(
      const RowVectorPtr& vector,
      const folly::Range<const IndexRange*>& ranges) override {
    auto newRows = rangesTotalSize(ranges);
    if (newRows > 0) {
      numRows_ += newRows;
      for (int32_t i = 0; i < vector->childrenSize(); ++i) {
        serializeColumn(vector->childAt(i).get(), ranges, streams_[i].get());
      }
    }
  }
  void flush(OutputStream* out) override {
    flushInternal(numRows_, false /*rle*/, out);
  }
//...
 private:
  ...
  int32_t numRows_{0};
  std::vector<std::unique_ptr<VectorStream>> streams_;
};
} // namespace

VectorStream

VectorStream是序列化SerializedPage做准备,其成员存储了SerializedPage结构中所需的Column Header、Null Flags、长度等;同时由于SeralizedPage也支持ROW、ARRAY、MAP类型的encoding,与此对应,VectorStream对于这三种类型使用了嵌套的VectorStream来表示。

初始化内存
  • children_:children表示ROW、ARRAY、MAP类型的子VectorStream,对应SerializedPage的三种类型。
  • header_:用来存储每种数据类型的头信息。
  • nulls_:用来存储null的值。
  • lengths_:用来存储长度。
  • values_:用来存储实际的值。
class VectorStream {
//...
private:
  const TypePtr type_;
//...
  ByteRange header_;
  ByteStream nulls_;
  ByteStream lengths_;
  ByteStream values_;
  std::vector<std::unique_ptr<VectorStream>> children_;
};

在VectorStream构造时,会对上述成员进行初始化,调用ByteStream::startWrite方法进行内存申请,根据RowVector中每列的数据类型,进行内存预申请。

下面将重要的字段初始化代码列出:代码中的lengths_对应SerializedPage中的offsets

  • 对于ROW、ARRAY、MAP类型除了lengths_初始化,也需要进行childrens_初始化,因为是嵌套结构
  • 对于VARCHAR、VARBINARY类型,进行了lengths_和values_的初始化;
class VectorStream {
 public:
  VectorStream(
      const TypePtr type,
      StreamArena* streamArena,
      int32_t initialNumRows,
      bool useLosslessTimestamp)
      : type_(type),
        useLosslessTimestamp_(useLosslessTimestamp),
        nulls_(streamArena, true, true),
        lengths_(streamArena),
        values_(streamArena) {
//...
    if (initialNumRows > 0) {
      switch (type_->kind()) {
        case TypeKind::ROW:
          if (isTimestampWithTimeZoneType(type_)) {
            values_.startWrite(initialNumRows * 4);
            break;
          }
          [[fallthrough]];
        case TypeKind::ARRAY:
        case TypeKind::MAP:
          hasLengths_ = true;
          lengths_.startWrite(initialNumRows * sizeof(vector_size_t));
          children_.resize(type_->size());
          for (int32_t i = 0; i < type_->size(); ++i) {
            children_[i] = std::make_unique<VectorStream>(
                type_->childAt(i),
                streamArena,
                initialNumRows,
                useLosslessTimestamp);
          }
          break;
        case TypeKind::VARCHAR:
        case TypeKind::VARBINARY:
          hasLengths_ = true;
          lengths_.startWrite(initialNumRows * sizeof(vector_size_t));
          values_.startWrite(initialNumRows * 10);
          break;
        default:;
          values_.startWrite(initialNumRows * 4);
          break;
      }
    }
  }
private:
  const TypePtr type_;
//...
  ByteRange header_;
  ByteStream nulls_;
  ByteStream lengths_;
  ByteStream values_;
  std::vector<std::unique_ptr<VectorStream>> children_;
};
序列化

序列化的Append方法,最终会对每一列调用serializeColumn方法,该方法实现中针对每一种列类型进行专门处理,例如对于Flat类型调用了serializeFlatVector。

针对每一种序列化的主要流程是根据提出数据的格式,写入VectorStream的lengths_、values_中,对于复合类型,进行递归调用。

需要注意的是,数据是会被写到StreamArena的内存时,使用了内存拷贝,即使是String类型。也就说在StreamArena中存储的完整的拷贝后的RowVector的数据。

void serializeColumn(
    const BaseVector* vector,
    const folly::Range<const IndexRange*>& ranges,
    VectorStream* stream) {
  switch (vector->encoding()) {
    case VectorEncoding::Simple::FLAT:
      VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL(
          serializeFlatVector, vector->typeKind(), vector, ranges, stream);
      break;
    case VectorEncoding::Simple::CONSTANT:
      VELOX_DYNAMIC_TYPE_DISPATCH_ALL(
          serializeConstantVector, vector->typeKind(), vector, ranges, stream);
      break;
    case VectorEncoding::Simple::BIASED:
      switch (vector->typeKind()) {
        case TypeKind::SMALLINT:
          serializeBiasVector<int16_t>(vector, ranges, stream);
          break;
        case TypeKind::INTEGER:
          serializeBiasVector<int32_t>(vector, ranges, stream);
          break;
        case TypeKind::BIGINT:
          serializeBiasVector<int64_t>(vector, ranges, stream);
          break;
        default:
          throw std::invalid_argument("Invalid biased vector type");
      }
      break;
    case VectorEncoding::Simple::ROW:
      serializeRowVector(vector, ranges, stream);
      break;
    case VectorEncoding::Simple::ARRAY:
      serializeArrayVector(vector, ranges, stream);
      break;
    case VectorEncoding::Simple::MAP:
      serializeMapVector(vector, ranges, stream);
      break;
    case VectorEncoding::Simple::LAZY:
      serializeColumn(vector->loadedVector(), ranges, stream);
      break;
    default:
      serializeWrapped(vector, ranges, stream);
  }
}
  • flush方法

serializeColumn是将数据写入StreamArena的内存中,flush方法可以真正的将数据严格按照SerializedPage的格式写入OutputStream。其中ROW、ARRAY、MAP类型调用了递归调用了children的flush方法。

void flush(OutputStream* out) {
    out->write(reinterpret_cast<char*>(header_.buffer), header_.size);
    switch (type_->kind()) {
      case TypeKind::ROW:
        //...
      case TypeKind::ARRAY:
        //...
      case TypeKind::MAP:
        //...
      case TypeKind::VARCHAR:
      case TypeKind::VARBINARY:
        //...
      default:
        //...
    }
  }

在PrestoVectorSerializer的flush方法中,会按照列的个数,逐个调用对应的VectorStream的flush方法。

反序列化实现

反序列化是指从SerializePage二进制格式转化成RowVector;

  • 从函数签名可以看出,需要传入输出类型RowType,说明在反序列化之前已经知道了RowVector有哪些列。
  • 反序列化的源头是ByteStream,有一组ByteRange组成。
void deserialize(
      ByteStream* source,
      velox::memory::MemoryPool* pool,
      std::shared_ptr<const RowType> type,
      std::shared_ptr<RowVector>* result) override;

反序列化的核心方法是readColumns,读取每一列的数据到result的children中。

auto children = &(*result)->children();
  auto childTypes = type->as<TypeKind::ROW>().children();
  readColumns(source, pool, childTypes, children);

接下来看下readColumns的核心实现:

  • 在readColumns中首先定义了针对不同数据类型的读取函数:对于简单类型,都使用了read函数,对于复杂类型使用了单独的读取方法。
  • 然后按照每一列的类型,逐个调用其对应类型的读取方法。
void readColumns(
    ByteStream* source,
    velox::memory::MemoryPool* pool,
    const std::vector<TypePtr>& types,
    std::vector<VectorPtr>* result) {
  static std::unordered_map<
      TypeKind,
      std::function<void(
          ByteStream * source,
          std::shared_ptr<const Type> type,
          velox::memory::MemoryPool * pool,
          VectorPtr * result)>>
      readers = {
          {TypeKind::BOOLEAN, &read<bool>},
          {TypeKind::TINYINT, &read<int8_t>},
          {TypeKind::SMALLINT, &read<int16_t>},
          {TypeKind::INTEGER, &read<int32_t>},
          {TypeKind::BIGINT, &read<int64_t>},
          {TypeKind::REAL, &read<float>},
          {TypeKind::DOUBLE, &read<double>},
          {TypeKind::TIMESTAMP, &read<Timestamp>},
          {TypeKind::DATE, &read<Date>},
          {TypeKind::VARCHAR, &read<StringView>},
          {TypeKind::VARBINARY, &read<StringView>},
          {TypeKind::ARRAY, &readArrayVector},
          {TypeKind::MAP, &readMapVector},
          {TypeKind::ROW, &readRowVector},
          {TypeKind::UNKNOWN, &read<UnknownValue>}};
  for (int32_t i = 0; i < types.size(); ++i) {
    auto it = readers.find(types[i]->kind());
//...
    it->second(source, types[i], pool, &(*result)[i]);
  }
}

接下来分别以简单类型和复杂类型来举例说明:

  • 简单类型:首先读取size,然后根据size大小创建特定大小的FlatVector,最后对flatVector进行赋值。
template <typename T>
void read(
    ByteStream* source,
    std::shared_ptr<const Type> type,
    velox::memory::MemoryPool* pool,
    VectorPtr* result) {
  int32_t size = source->read<int32_t>();
  if (*result && result->unique()) {
    (*result)->resize(size);
  } else {
    *result = BaseVector::create(type, size, pool);
  }
  auto flatResult = (*result)->asFlatVector<T>();
  auto nullCount = readNulls(source, size, flatResult);
  BufferPtr values = flatResult->mutableValues(size);
  readValues<T>(source, size, flatResult->nulls(), nullCount, values);
}
  • 复杂类型:以MapVector为例,首先会将mapVector的key和value提取出来作为children,通过递归调用readColumns将值写入到children中,然后将key和value的Vector设置到MapVector中。其他复杂类型同理,也是将children先进行序列化,然后children赋值到复杂类型中。
void readMapVector(
    ByteStream* source,
    std::shared_ptr<const Type> type,
    velox::memory::MemoryPool* pool,
    VectorPtr* result) {
  MapVector* mapVector =
      (*result && result->unique()) ? (*result)->as<MapVector>() : nullptr;
  std::vector<TypePtr> childTypes = {type->childAt(0), type->childAt(1)};
  std::vector<VectorPtr> children(2);
  if (mapVector) {
    children[0] = mapVector->mapKeys();
    children[1] = mapVector->mapValues();
  }
  readColumns(source, pool, childTypes, &children);
  //...
  mapVector->setKeysAndValues(children[0], children[1]);
  //...
}

同样,在反序列化的过程中,也会将数据从ByteStream中数据复制到Vector中,同样发生了内存复制。

UnsafeRowVectorSerde

UnsafeRow结构

多行

UnsafeRow用在Spark中,是二进制的行存格式,数据是逐行排放,多行的数据二进制表示如下:

  • 开头是行的占用空间大小,接着是UnsafeRow的二进制内容;后面依次存储下一行的数据。

UnsafeRow

  • UnsafeRow代表一行数据,每行数据会有多个列,对于不同类型,UnsafeRow定义了null flags、定长数据、变长数据三部分;
  • 如果多列的简单类型字段比如bool、int、short,因为简单类型可以被8个字节覆盖,所以直接按列存入定长数据区;
  • 如果8个字节不能覆盖,比如string类型,定长数据区会存储每列string的指针和长度,在变长区存储实际的字符串的值。
  • 对于列是Array、Map、Struct类型,主要的区别都在变长区,可以看下图示意,比如Map类型,会在变长区先存储keys的长度,然后是key的字段数量、null flag、key的实际值,接着是value的字段数量、null flag、value的实际值;

序列化接口

UnsafeRowVectorSerializer

同样,UnsafeRowVectorSerde继承自VectorSerde,对于序列化主要关注UnsafeRowVectorSerializer的实现。主要流程如下:

  • 根据输入的RowVector构造UnsafeRowFast;
  • 计算出RowVector的总大小,然后申请相应的内存大小,申请后的内存保存在buffers_中:
  • 注意:申请到的内存的生命周期、buffers_生命周期、UnsafeRowVectorSerializer的生命周期是同步的。
  • 然后通过逐行的方式调用UnsafeRow.serialize方法,将Vector内容序列化到指定的内存位置。这个过程中使用了内存拷贝的方式。
  • flush方法比较简单,直接将buffers_中的内存写出去。buffers_中的格式直接对应UnsafeRow的二进制格式。
class UnsafeRowVectorSerializer : public VectorSerializer {
 public:
  using TRowSize = uint32_t;
  explicit UnsafeRowVectorSerializer(StreamArena* streamArena)
      : pool_{streamArena->pool()} {}
  void append(
      const RowVectorPtr& vector,
      const folly::Range<const IndexRange*>& ranges) override {
    size_t totalSize = 0;
    row::UnsafeRowFast unsafeRow(vector);
    totalSize = //... calculate total size
    BufferPtr buffer = AlignedBuffer::allocate<char>(totalSize, pool_, 0);
    auto rawBuffer = buffer->asMutable<char>();
    buffers_.push_back(std::move(buffer));
    size_t offset = 0;
    for (auto& range : ranges) {
      for (auto i = range.begin; i < range.begin + range.size; ++i) {
        // Write row data.
        TRowSize size =
            unsafeRow.serialize(i, rawBuffer + offset + sizeof(TRowSize));
        // Write raw size. Needs to be in big endian order.
        *(TRowSize*)(rawBuffer + offset) = folly::Endian::big(size);
        offset += sizeof(TRowSize) + size;
      }
    }
  }
  void flush(OutputStream* stream) override {
    for (const auto& buffer : buffers_) {
      stream->write(buffer->as<char>(), buffer->size());
    }
    buffers_.clear();
  }
 private:
  memory::MemoryPool* const FOLLY_NONNULL pool_;
  std::vector<BufferPtr> buffers_;
};

UnsafeRowFast

UnsafeRowFast接受RowVector,会将RowVector转换为DecodedVector。

  • DecodedVector decoded_用来存储RowVector decode后的结果,避免在运算过程中一层层的剥离vector。
  • std::vector<UnsafeRowFast> children_;用于ARRAY, MAP 和 ROW三种类型,用来存储三种类型对应的子UnsafeRowFast,这种嵌套用法比较常用,在前面的VectorStream中也多次用到。
  • initialize方法会用来初始化decoded_和children_。
  • serialize是真正将特定行的数据,写入buffer中。
class UnsafeRowFast {
 public:
  explicit UnsafeRowFast(const RowVectorPtr& vector);
...
  /// Serializes row at specified index into 'buffer'.
  /// 'buffer' must have sufficient capacity and set to all zeros.
  int32_t serialize(vector_size_t index, char* buffer);
 protected:
  explicit UnsafeRowFast(const VectorPtr& vector);
  void initialize(const TypePtr& type);
 private:
  const TypeKind typeKind_;
  DecodedVector decoded_;
  /// ARRAY, MAP and ROW types only.
  std::vector<UnsafeRowFast> children_;
  std::vector<bool> childIsFixedWidth_;
};

children_和decoded_实现是在initialize函数中实现的,具体如下:

  • 对于ArrayVector类型,直接将elements设置为children_;
  • 对于MapVector类型,将mapKeys和mapValues设置为children_;
  • 对于RowVector类型,将每一列设置为children_;
  • 对于其他简单类型,主要设置valueBytes_等字段。
void UnsafeRowFast::initialize(const TypePtr& type) {
  auto base = decoded_.base();
  switch (typeKind_) {
    case TypeKind::ARRAY: {
      auto arrayBase = base->as<ArrayVector>();
      children_.push_back(UnsafeRowFast(arrayBase->elements()));
      //...
      break;
    }
    case TypeKind::MAP: {
      auto mapBase = base->as<MapVector>();
      children_.push_back(UnsafeRowFast(mapBase->mapKeys()));
      children_.push_back(UnsafeRowFast(mapBase->mapValues()));
      //...
      break;
    }
    case TypeKind::ROW: {
      auto rowBase = base->as<RowVector>();
      for (const auto& child : rowBase->children()) {
        children_.push_back(UnsafeRowFast(child));
      }
      //...
      break;
    }
    case TypeKind::BOOLEAN:
      valueBytes_ = 1;
      fixedWidthTypeKind_ = true;
      break;
    case TypeKind::TINYINT:
      FOLLY_FALLTHROUGH;
    case TypeKind::SMALLINT:
      FOLLY_FALLTHROUGH;
    case TypeKind::INTEGER:
      FOLLY_FALLTHROUGH;
    case TypeKind::BIGINT:
      FOLLY_FALLTHROUGH;
    case TypeKind::REAL:
      FOLLY_FALLTHROUGH;
    case TypeKind::DOUBLE:
      FOLLY_FALLTHROUGH;
    case TypeKind::DATE:
    case TypeKind::UNKNOWN:
      valueBytes_ = type->cppSizeInBytes();
      fixedWidthTypeKind_ = true;
      supportsBulkCopy_ = decoded_.isIdentityMapping();
      break;
    case TypeKind::TIMESTAMP:
      valueBytes_ = sizeof(int64_t);
      fixedWidthTypeKind_ = true;
      break;
    case TypeKind::VARCHAR:
      FOLLY_FALLTHROUGH;
    case TypeKind::VARBINARY:
      // Nothing to do.
      break;
    default:
      VELOX_UNSUPPORTED("Unsupported type: {}", type->toString());
  }
}

序列化的过程,主要调用了serializeRow方法,通过遍历children,也用来区分定长列和非定长列:

  • 如果是定长列,调用serializeFixedWidth方法序列化。
  • 如果是非定长列,调用serializeVariableWidth方法,同时会将size和offset存入定长区
int32_t UnsafeRowFast::serializeRow(vector_size_t index, char* buffer) {
  auto childIndex = decoded_.index(index);
  int64_t variableWidthOffset = rowNullBytes_ + kFieldWidth * children_.size();
  for (auto i = 0; i < children_.size(); ++i) {
    auto& child = children_[i];
    // Write null bit.
    if (child.isNullAt(childIndex)) {
      bits::setBit(buffer, i, true);
      continue;
    }
    // Write value.
    if (childIsFixedWidth_[i]) {
      child.serializeFixedWidth(
          childIndex, buffer + rowNullBytes_ + i * kFieldWidth);
    } else {
      auto size = child.serializeVariableWidth(
          childIndex, buffer + variableWidthOffset);
      // Write size and offset.
      uint64_t sizeAndOffset = variableWidthOffset << 32 | size;
      reinterpret_cast<uint64_t*>(buffer + rowNullBytes_)[i] = sizeAndOffset;
      variableWidthOffset += alignBytes(size);
    }
  }
  return variableWidthOffset;
}

接下来看下定长列和非定长列的具体实现:

  • 定长列:直接通过memcpy将decoded数据写入到buffer中。
  • 非定长列:
  • 对于varchar/varbinary类型,根据字符串长度memcpy。
  • 对于ARRAY、ROW、MAP分别调用子方法进行序列化,实际上这些方法最后都会调用到serializeFixedWidth和serializeVariableWidth方法,具体实现不再赘述。
void UnsafeRowFast::serializeFixedWidth(
    vector_size_t offset,
    vector_size_t size,
    char* buffer) {
  VELOX_DCHECK(supportsBulkCopy_);
  // decoded_.data<char>() can be null if all values are null.
  if (decoded_.data<char>()) {
    memcpy(
        buffer,
        decoded_.data<char>() + decoded_.index(offset) * valueBytes_,
        valueBytes_ * size);
  }
}
int32_t UnsafeRowFast::serializeVariableWidth(
    vector_size_t index,
    char* buffer) {
  switch (typeKind_) {
    case TypeKind::VARCHAR:
      FOLLY_FALLTHROUGH;
    case TypeKind::VARBINARY: {
      auto value = decoded_.valueAt<StringView>(index);
      memcpy(buffer, value.data(), value.size());
      return value.size();
    }
    case TypeKind::ARRAY:
      return serializeArray(index, buffer);
    case TypeKind::MAP:
      return serializeMap(index, buffer);
    case TypeKind::ROW:
      return serializeRow(index, buffer);
    default:
      VELOX_UNREACHABLE(
          "Unexpected type kind: {}", mapTypeKindToName(typeKind_));
  };
}

反序列化接口

UnsafeRowDeserializer

反序列化用到了UnsafeRowDeserializer::deserialize方法,接受string_view的vector,vector的每个元素代表一行数据,反序列化过程中主要调用了convertToVectors方法

static VectorPtr deserialize(
      const std::vector<std::optional<std::string_view>>& data,
      const TypePtr& type,
      memory::MemoryPool* pool) {
    return convertToVectors(getBatchIteratorPtr(data, type), pool);
}
  • convertToVectors方法接受一个DataBatchIterator,然后根据数据类型,调用不同的方法进行反序列化。
static VectorPtr convertToVectors(
      const DataBatchIteratorPtr& dataIterator,
      memory::MemoryPool* pool) {
    const TypePtr& type = dataIterator->type();
    if (type->isPrimitiveType()) {
      return convertPrimitiveIteratorsToVectors(dataIterator, pool);
    } else if (type->isRow()) {
      return convertStructIteratorsToVectors(dataIterator, pool);
    } else if (type->isArray()) {
      return convertArrayIteratorsToVectors(dataIterator, pool);
    } else if (type->isMap()) {
      return convertMapIteratorsToVectors(dataIterator, pool);
    } else {
      VELOX_NYI("Unsupported data iterators type");
    }
  }

getBatchIteratorPtr 根据不同的数据类型,会生成不同的DataBatchIterator

inline DataBatchIteratorPtr getBatchIteratorPtr(
    const std::vector<std::optional<std::string_view>>& data,
    const TypePtr& type) {
  if (type->isPrimitiveType()) {
    return std::make_shared<PrimitiveBatchIterator>(data, type);
  } else if (type->isRow()) {
    return std::make_shared<StructBatchIterator>(data, type);
  } else if (type->isArray()) {
    return std::make_shared<ArrayBatchIterator>(data, type);
  } else if (type->isMap()) {
    return std::make_shared<MapBatchIterator>(data, type);
  }
  VELOX_NYI("Unknown data type " + type->toString());
  return nullptr;
}

convertToVectors中以convertPrimitiveIteratorsToVectors为例,最终会调用createFlatVector

  • 首先将iterator转换为PrimitiveBatchIterator,获取行数,构建对应的FlatVector
  • 然后逐行调用iterator->next()方法对数据进行迭代,在迭代的过程根据类型区分
  • StringView类型,需要读取变长区的数据
  • 其他Primitive类型,读取定长区数据
template <TypeKind Kind>
  static VectorPtr createFlatVector(
      const DataBatchIteratorPtr& dataIterator,
      const TypePtr& type,
      memory::MemoryPool* pool) {
    auto iterator =
        std::dynamic_pointer_cast<PrimitiveBatchIterator>(dataIterator);
    size_t size = iterator->numRows();
    auto vector = BaseVector::create(type, size, pool);
    using T = typename TypeTraits<Kind>::NativeType;
    using TypeTraits = ScalarTraits<Kind>;
    auto* flatResult = vector->asFlatVector<T>();
    for (int32_t i = 0; i < size; ++i) {
      if (iterator->isNull(i)) {
        vector->setNull(i, true);
        iterator->next();
      } else {
        vector->setNull(i, false);
        if constexpr (std::is_same_v<T, StringView>) {
          StringView val =
              UnsafeRowPrimitiveBatchDeserializer::deserializeStringView(
                  iterator->next().value());
          TypeTraits::set(flatResult, i, val);
        } else {
          typename TypeTraits::SerializedType val =
              UnsafeRowPrimitiveBatchDeserializer::deserializeFixedWidth<
                  typename TypeTraits::SerializedType>(
                  iterator->next().value());
          TypeTraits::set(flatResult, i, val);
        }
      }
    }
    return vector;
  }

对于ROW、ARRAY、MAP类型分别会调用convertStructIteratorsToVectors、convertArrayIteratorsToVectors、convertMapIteratorsToVectors原理类似,按照逐行的方式进行解析,并构建对应的Vector。

总结

对于实现了VectorSerde的类,可以通过registerVectorSerde方法注册到系统中,系统就可以根据getNamedVectorSerde来找到对应的VectorSerda实现类来进行外部数据格式的序列化和反序列化;这里做些简单总结:

  • VectorSerde是Velox引擎对外沟通的接口,通过接口的形式来保持系统的扩展性,在不侵入velox引擎的情况下,外部可以根据VectorSerde进行实现并注册,来完成外部数据的写入和读出。
  • 在使用Velox的Vector时,需要清楚Vector的各种编码类型,尤其是列式场景下,针对不同的编码可以有对应的“Fast Path”执行路径,以提高执行效率。
  • 在序列化和反序列化的过程中,可以看出实现中多次出现内存拷贝的情况,个人认为是因为在Serde执行的过程中,原数据的生命周期已经结束,如果使用内存引用的方式,可能并不安全。这部分逻辑待继续调研。
  • 在StreamArena实现的过程中,也考虑了小内存的申请,对于小于一页4K的大小的需求,直接使用vector<string>来申请内存。
  • 同时在上述Velox的两种序列化方式中,都是Schema固定的场景:列的数量和格式都是固定,velox本身、Presto、Spark都是列确定的模式;对于SchemaFree(每一行的列不固定)的场景下,文中没有涉及。

参考

https://prestodb.io/docs/current/develop/serialized-page.html

https://facebookincubator.github.io/velox/develop/serde/unsaferow.html

https://github.com/facebookincubator/velox

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
6月前
|
消息中间件 存储 负载均衡
Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
【2月更文挑战第21天】Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
485 4
|
存储 Dubbo Java
dubbo 源码 v2.7 分析:通信过程及序列化协议
前面我们介绍了dubbo的核心机制,今天将开始分析远程调用流程。毕竟,作为一个rpc框架,远程调用是理论的核心内容。通过对dubbo相关实现的探究,深入了解rpc原理及可能的问题。
188 0
|
存储 XML Apache
43-微服务技术栈(高级):分布式协调服务zookeeper源码篇(序列化)
在完成了前面的理论学习后,现在可以从源码角度来解析Zookeeper的细节,首先笔者想从序列化入手,因为在网络通信、数据存储中都用到了序列化,下面开始分析。
141 0
|
分布式计算 资源调度 Java
Hadoop中的MapReduce概述、优缺点、核心思想、编程规范、进程、官方WordCount源码、提交到集群测试、常用数据序列化类型、WordCount案例实操
Hadoop中的MapReduce概述、优缺点、核心思想、编程规范、进程、官方WordCount源码、提交到集群测试、常用数据序列化类型、WordCount案例实操
Hadoop中的MapReduce概述、优缺点、核心思想、编程规范、进程、官方WordCount源码、提交到集群测试、常用数据序列化类型、WordCount案例实操
|
机器学习/深度学习 Java
序列化单例模式的实现————readResolve 源码解读 | Java Debug 笔记
序列化单例模式的实现————readResolve 源码解读 在可序列化类中加上readResolve方法,就可以实现单例模式了!这是为什么呢?让我们一起看看源码中的奥秘吧! 只有实现了序列化接口 Serializable ,才可以进行 序列化操作, 测试代码 class SingletonTest { */*** ** 序列化测试公共方法* *** *@param* *className* **/* private void testSerializable(String className) { if (className =
234 0
|
算法 Java 容器
Java对象的序列化/反序列化原理及源码解析(中)
Java对象的序列化/反序列化原理及源码解析(中)
203 0
Java对象的序列化/反序列化原理及源码解析(中)
|
Java API 容器
Java对象的序列化/反序列化原理及源码解析(上)
Java对象的序列化/反序列化原理及源码解析(上)
368 0
Java对象的序列化/反序列化原理及源码解析(上)
|
存储 安全 Java
Java对象的序列化/反序列化原理及源码解析(下)
Java对象的序列化/反序列化原理及源码解析(下)
142 0
|
存储 JavaScript 前端开发
Java对象序列化底层原理源码解析
Java序列化是指把Java对象保存为二进制字节码的过程,Java反序列化是指把二进制码重新转换成Java对象的过程。那么为什么需要序列化呢?
7603 0
|
11天前
|
JSON 数据格式 索引
Python中序列化/反序列化JSON格式的数据
【11月更文挑战第4天】本文介绍了 Python 中使用 `json` 模块进行序列化和反序列化的操作。序列化是指将 Python 对象(如字典、列表)转换为 JSON 字符串,主要使用 `json.dumps` 方法。示例包括基本的字典和列表序列化,以及自定义类的序列化。反序列化则是将 JSON 字符串转换回 Python 对象,使用 `json.loads` 方法。文中还提供了具体的代码示例,展示了如何处理不同类型的 Python 对象。