How is data inserted into Presto?

简介: ![Data Insertion Journey](http://ata2-img.cn-hangzhou.img-pub.aliyun-inc.com/c83161a4683d9bb134ea2b60c0ddd811.png) ## Overview We know that there is an interesting question interviewer likes to

Data Insertion Journey

Overview

We know that there is an interesting question interviewer likes to ask:

Tell me what happens after I click the web browser's Go button util we see the response page?

The interesting part of the question is that you have to understand the whole process of how a http request/response works before you can answer this question. Today I will use the similar scheme to describe how a row of data is inserted into Presto.

The Presto Data Insertion Story

We know that Presto is a superb query engine that supports querying Peta bytes of data in seconds, actually it also supports INSERT statement as long as your connector implemented the Sink related SPIs, today we will introduce data inserting using the Hive connector as an example.

To make the story telling simpler, lets first construct the following scenario: we have a table named person which has the following schema:

CREATE EXTERNAL TABLE person (
    id int,
    name string,
    age int
) 
PARTITIONED BY (dt string);

now we issue an INSERT statement to insert a row of data into the table person:

insert into person values (1, 'james', 10, '20190301');

The following diagram depicts the things happened behind the scene:

Data Insertion Process

  1. Determine where to write the data.
  2. Write the data.
  3. Write the metadata.
  4. Commit and finalize all the changes.

In the later sections I will introduce each step in detail.

Determine where to write the data

The first thing Presto need to figure out is where to write the data, in Hive, it is of course written to the HDFS, the actual file path is determined by Metadata#beginInsert, lets take a look at its implementation:

SchemaTableName tableName = schemaTableName(tableHandle);
// query the table metadata
Optional<Table> table = metastore.getTable(
    tableName.getSchemaName(), tableName.getTableName());
...
// determine the data format to write
HiveStorageFormat tableStorageFormat 
  = extractHiveStorageFormat(table.get());
LocationHandle locationHandle = locationService
  .forExistingTable(metastore, session, table.get());
// determine the HDFS file path to write the data
WriteInfo writeInfo = locationService.getQueryWriteInfo(locationHandle);
...

All these information: table format, file path to write etc is encapsulated into an HiveInsertTableHandle object which will be used by the later step.
In the WriteInfo object, there are two pathes: targetPath and writePath:

class WriteInfo
{
    private final Path targetPath;
    private final Path writePath;
    private final LocationHandle.WriteMode writeMode;
    ...
}

targetPath is where the data should finally land while writePath tells us where to write the data for the moment, because sometimes we need to first write data into staging directory first to achieve reliable, reversible data insertion.

The following diagram depicts how the data is moved between several directories during an insert overwrite operation:

Data Files Movement During Insert Overwrite

As we can see from the diagram the data is first inserted into a staging directory, then the old data in the target directory is moved to a temp backup directory, then we move the data from staging directory to the target directory, if everything goes well, we can delete the data from the temp backup directory now.

We know Metadata#beginInsert determined where to write the data, do you know when is Metadata#beginInsert is called? you might think it is called in the early phase just after query is planned, but actually it is not, it is called during the planing phase by an optimizer rule: BeginTableWrite, sounds weird, right? No wonder that the javadoc of BeginTableWrite stated the Major Hack Alert:

/*
 * Major HACK alert!!!
 *
 * This logic should be invoked on query start, not during planning. At that point, the token
 * returned by beginCreate/beginInsert should be handed down to tasks in a mapping separate
 * from the plan that links plan nodes to the corresponding token.
 */

Write the data

After where to write data is determined, HiveInsertTableHandle is built, the query starts to run, and HivePageSink do all the real work to write data, the whole job acts like an Operator DAG, the upstream operator generates data and the downstream operator pulls it from upstream operator, process it and generates new data to even downstream operators.

Operators

In our simple demo, the data source is the ValueOperator, it provides the single one row of data:

[1, 'james', 10]

TableWriteOperator pulls the data from ValuesOperator, it delegates the write logic to HivePageSink, HivePageSink will call HDFS FileSystem related logic to persist the data, in the meantime, it will collect some stats which will later be output by TableWriteOpeartor, more specifically, it will collect the written row count, and all the written partition information. This is the output of TableWriteOpeartor.

Write the metadata

After TableWriteOperator finish its work, not everything is done, it only persisted the data, but not the metadata: the data is there, but it is not in Presto & Hive's metadata repository, in Metadata#finishInsert it will do some check, e.g. whether the table is dropped during insertion?

Optional<Table> table = metastore.getTable(
  handle.getSchemaName(), handle.getTableName());
if (!table.isPresent()) {
    throw new TableNotFoundException(
      new SchemaTableName(handle.getSchemaName(),
      handle.getTableName())
    );
}

Whether the table format is changed during insertion?

if (!table.get().getStorage().getStorageFormat().getInputFormat()
  .equals(tableStorageFormat.getInputFormat()) && respectTableFormat) {
    throw new PrestoException(
      HIVE_CONCURRENT_MODIFICATION_DETECTED, 
      "Table format changed during insert"
  );
}

Whether the partition has been concurrently modified during this insertion?

if (partition.isNew() != firstPartition.isNew() ||
    !partition.getWritePath().equals(firstPartition.getWritePath()) ||
    !partition.getTargetPath().equals(firstPartition.getTargetPath())) {
    throw new PrestoException(
      HIVE_CONCURRENT_MODIFICATION_DETECTED,
      format(
        "Partition %s was added or modified during INSERT" +
                "([isNew=%s, writePath=%s, targetPath=%s] vs [isNew=%s, writePath=%s, targetPath=%s])",
        ...
      )
    );
}

If everything goes well, all checks pass, Presto will calculate all the metadata changes we need to do further. These operations are reversible.

Commit

In the previous step, Presto only calculated all the metadata change operations, they are not actually executed.

In this step, Presto will actually run all the metadata change operations, it will do it in a 2-phase-commit style:

  1. From the reversible operations, Presto will calculate the irreversible operations to execute and initiate the corresponding data files movement in Hadoop FileSystem.
  2. Wait for all the data files movement finish.
  3. Run all the irreversible operations to finalize the metadata change.

Reversible VS Irreversible

Reversible means for everything we do we have some way to roll it back, for example, for operations like move data from path a to b, to be reversible, we can backup the data of b before moving the data. Irreversible means just the opposite.

Before the step 3, all operations are reversible, after that, all operations are irreversible, i.e. system might result in an inconsistent state if something bad happens, Presto will try it best to clean things up, but consistency is not guaranteed anymore.

Summary

In this article, we introduced the process of how a row of data is inserted into Presto, we can see that Presto & Hive Connector has done quite a lot of nice work in order to make things right, e.g. The 2-phase-commit of the metadata, it is really good code to read, I have enjoyed it. Hope you enjoyed reading this article too.

目录
相关文章
|
小程序 测试技术 API
微信小程序学习笔记(6) -- 本地生活项目
微信小程序学习笔记(6) -- 本地生活项目
234 0
【笔记】PCIe LTSSM 状态转移
【笔记】PCIe LTSSM 状态转移
2197 0
【笔记】PCIe LTSSM 状态转移
|
缓存 网络协议 网络性能优化
C语言 网络编程(二)TCP 协议
TCP(传输控制协议)是一种面向连接、可靠的传输层协议,通过校验和、序列号、确认应答等机制确保数据完整性和可靠性。通信双方需先建立连接,再进行通信,采用三次握手建立连接,四次挥手断开连接。TCP支持任意字节长度的数据传输,具备超时重传、流量控制及拥塞控制机制。三次握手用于同步序列号和确认双方通信能力,四次挥手则确保双方均能完成连接关闭操作,保证数据传输的可靠性。
|
索引 Python
Numpy学习笔记(三):np.where和np.logical_and/or/not详解
NumPy库中`np.where`和逻辑运算函数`np.logical_and`、`np.logical_or`、`np.logical_not`的使用方法和示例。
819 1
Numpy学习笔记(三):np.where和np.logical_and/or/not详解
|
Linux C++ iOS开发
vs code常见的查找快捷键大全
本文来自 frozencola 技术日志,介绍了 VS Code 中常用的查找快捷键,包括快速打开文件、文件资源管理器、全局搜索、查找符号、查找文件中的文本、查找并替换、文件导航和使用命令面板。掌握这些快捷键可以显著提升开发效率。
942 4
|
XML Android开发 数据格式
Android面试题之DialogFragment中隐藏导航栏
在Android中展示全屏`DialogFragment`并隐藏状态栏和导航栏,可通过设置系统UI标志实现。 记得在布局文件中添加内容,并使用`show()`方法显示`DialogFragment`。
233 2
|
机器学习/深度学习 Shell C++
测试本地部署ChatGLM-6B | ChatGPT
ChatGLM-6B是款62亿参数的中英对话模型,类似ChatGPT,可在6GB显存(INT4量化)的GPU或CPU上运行。它提供流畅、多样的对话体验。用户可从Hugging Face或清华云下载模型配置。部署涉及创建Python环境,安装依赖,下载模型到`ckpt`文件夹。测试时加载tokenizer和模型,使用示例代码进行交互。应用包括基于MNN和JittorLLMs的推理实现,以及langchain-ChatGLM、闻达、chatgpt_academic和glm-bot等项目。5月更文挑战第10天
375 1
|
JavaScript
vue 组件封装——可自由拖拽移动的盒子
vue 组件封装——可自由拖拽移动的盒子
198 0
|
Java 测试技术
使用IDEA进行服务器远程debug调试
使用IDEA进行服务器远程debug调试
392 0
|
前端开发 JavaScript 中间件
阿里开源:Dawn - 基于「中间件和 Pipeline」的自动化构建工具
  Dawn 取「黎明、破晓」之意,原为「阿里云·业务运营团队」内部的前端构建和工程化工具,现已完全开源。它通过 pipeline 和 middleware 将开发过程抽象为相对固定的阶段和有限的操作,简化并统一了开发人员的日常构建与开发相关的工作。   采用中间件技术,封装常用功能,易于扩展,方便重用支持 pipeline 让多个 task 协同完成构建任务简单、一致的命令行接口,易于开发人员使用支持基于「中心服务」管理中间件和工程模板支持搭建私有中心服务,并统一下发构建规则,易于团队统一管理   依赖的环境、软件及其版本:   Node.js v7.6.0 及以上版本Mac/Linu
656 95