如何快速实现 BitSail Connector?

简介: 首先开发者需要通过 git 下载最新代码到本地,并导入到 IDE 中。同时创建自己的工作分支,使用该分支开发自己的 Connector。
  1. 目录结构

首先开发者需要通过 git 下载最新代码到本地,并导入到 IDE 中。同时创建自己的工作分支,使用该分支开发自己的 Connector。
项目地址:github.com/bytedance/b…
项目结构如下:
0.5.png

  1. 开发流程

BitSail 是一款基于分布式架构的数据集成引擎,Connector 会并发执行。并由 BitSail 框架来负责任务的调度、并发执行、脏数据处理等,开发者只需要实现对应接口即可,具体开发流程如下:
0.4.png

工程配置,开发者需要在 bitsail/bitsail-connectors/pom.xml 模块中注册自己的 Connector,同时在 bitsail/bitsail-dist/pom.xml 增加自己的 Connector 模块,同时为你的连接器注册配置文件,来使得框架可以在运行时动态发现它。
0.3.png

Connector 开发,实现 Source、Sink 提供的抽象方法,具体细节参考后续介绍。
数据输出类型,目前支持的数据类型为 BitSail Row 类型,无论是 Source 在 Reader 中传递给下游的数据类型,还是 Sink 从上游消费的数据类型,都应该是 BitSail Row 类型。

  1. Architecture

当前 Source API 的设计同时兼容了流批一批的场景,换言之就是同时支持 pull & push 的场景。在此之前,我们需要首先再过一遍传统流批场景中各组件的交互模型。
3.1 Batch Model
传统批式场景中,数据的读取一般分为如下几步:

createSplits:一般在 client 端或者中心节点执行,目的是将完整的数据按照指定的规则尽可能拆分为较多的 rangeSplits,createSplits 在作业生命周期内有且执行一次。
runWithSplit: 一般在执行节点节点执行,执行节点启动后会向中心节点请求存在的 rangeSplit,然后再本地进行执行;执行完成后会再次向中心节点请求直到所有 splits 执行完成。
commit:全部的 split 的执行完成后,一般会在中心节点执行 commit 的操作,用于将数据对外可见。

3.2 Stream Model
传统流式场景中,数据的读取一般分为如下几步:

createSplits:一般在 client 端或者中心节点执行,目的是根据滑动窗口或者滚动窗口的策略将数据流划分为 rangeSplits,createSplits 在流式作业的生命周期中按照划分窗口的会一直执行。
runWithSplit: 一般在执行节点节点执行,中心节点会向可执行节点发送 rangeSplit,然后在可执行节点本地进行执行;执行完成后会将处理完的 splits 数据向下游发送。
commit:全部的 split 的执行完成后,一般会向目标数据源发送 retract message,实时动态展现结果。

3.3 BitSail Model

createSplits:BitSail 通过 SplitCoordinator 模块划分 rangeSplits,在流式作业中的生命周期中 createSplits 会周期性执行,而在批式作业中仅仅会执行一次。
runWithSplit: 在执行节点节点执行,BitSail 中执行节点包括 Reader 和 Writer 模块,中心节点会向可执行节点发送 rangeSplit,然后在可执行节点本地进行执行;执行完成后会将处理完的 splits 数据向下游发送。
commit:writer 在完成数据写入后,committer 来完成提交。在不开启 checkpoint 时,commit 会在所有 writer 都结束后执行一次;在开启 checkpoint 时,commit 会在每次 checkpoint 的时候都会执行一次。

  1. Source Connector

0.2.png

Source: 数据读取组件的生命周期管理,主要负责和框架的交互,构架作业,不参与作业真正的执行。
SourceSplit: 数据读取分片;大数据处理框架的核心目的就是将大规模的数据拆分成为多个合理的 Split。
State:作业状态快照,当开启 checkpoint 之后,会保存当前执行状态。
SplitCoordinator: 既然提到了 Split,就需要有相应的组件去创建、管理 Split;SplitCoordinator 承担了这样的角色。
SourceReader: 真正负责数据读取的组件,在接收到 Split 后会对其进行数据读取,然后将数据传输给下一个算子。

Source Connector 开发流程如下

首先需要创建 Source 类,需要实现 Source 和 ParallelismComputable 接口,主要负责和框架的交互,构架作业,它不参与作业真正的执行。
BitSail 的 Source 采用流批一体的设计思想,通过 getSourceBoundedness 方法设置作业的处理方式,通过 configure 方法定义 readerConfiguration 的配置,通过 createTypeInfoConverter 方法来进行数据类型转换,可以通过 FileMappingTypeInfoConverter 得到用户在 yaml 文件中自定义的数据源类型和 BitSail 类型的转换,实现自定义化的类型转换。
最后,定义数据源的数据分片格式 SourceSplit 类和闯将管理 Split 的角色 SourceSplitCoordinator 类。
最后完成 SourceReader 实现从 Split 中进行数据的读取。

0.1.png

每个 SourceReader 都在独立的线程中执行,并保证 SourceSplitCoordinator 分配给不同 SourceReader 的切片没有交集。
在 SourceReader 的执行周期中,开发者只需要关注如何从构造好的切片中去读取数据,之后完成数据类型对转换,将外部数据类型转换成 BitSail 的 Row 类型传递给下游即可。

4.1 Reader 示例
public class FakeSourceReader extends SimpleSourceReaderBase {

  private final BitSailConfiguration readerConfiguration;
  private final TypeInfo<?>[] typeInfos;

  private final transient int totalCount;
  private final transient RateLimiter fakeGenerateRate;
  private final transient AtomicLong counter;

  private final FakeRowGenerator fakeRowGenerator;

  public FakeSourceReader(BitSailConfiguration readerConfiguration, Context context) {
    this.readerConfiguration = readerConfiguration;
    this.typeInfos = context.getTypeInfos();
    this.totalCount = readerConfiguration.get(FakeReaderOptions.TOTAL_COUNT);
    this.fakeGenerateRate = RateLimiter.create(readerConfiguration.get(FakeReaderOptions.RATE));
    this.counter = new AtomicLong();
    this.fakeRowGenerator = new FakeRowGenerator(readerConfiguration, context.getIndexOfSubtask());
  }

  @Override
  public void pollNext(SourcePipeline pipeline) throws Exception {
    fakeGenerateRate.acquire();
    pipeline.output(fakeRowGenerator.fakeOneRecord(typeInfos));
  }

  @Override
  public boolean hasMoreElements() {
    return counter.incrementAndGet() <= totalCount;
  }
}

相关文章
|
12月前
|
消息中间件 SQL Java
Flink自定义Connector
Flink自定义Connector
436 0
|
6月前
|
SQL Java 流计算
Flink CDC在代码里面集成cdc的时候,是不是也要用上面这个胖包flink-sql-connector-mysql-cdc,不要去用瘦包flink-connector-mysql-cdc? com.ververica flink-sql-connector-mysql-cdc 2.4.0
Flink CDC在代码里面集成cdc的时候,是不是也要用上面这个胖包flink-sql-connector-mysql-cdc,不要去用瘦包flink-connector-mysql-cdc? com.ververica flink-sql-connector-mysql-cdc 2.4.0
95 1
|
2月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
6月前
|
流计算
在Flink中,如果一个任务的输入依赖于前面两个任务的输出,可以使用`connector`来连接这三个任务
【1月更文挑战第19天】【1月更文挑战第93篇】在Flink中,如果一个任务的输入依赖于前面两个任务的输出,可以使用`connector`来连接这三个任务
45 2
|
6月前
|
关系型数据库 MySQL Java
Flink cdc报错问题之使用jdbc connector报错如何解决
Flink CDC报错指的是使用Apache Flink的Change Data Capture(CDC)组件时遇到的错误和异常;本合集将汇总Flink CDC常见的报错情况,并提供相应的诊断和解决方法,帮助用户快速恢复数据处理任务的正常运行。
|
6月前
|
消息中间件 Apache 流计算
Apache Flink的RabbitMQ connector使用的是`org.apache.flink:flink-sql-connector-rabbitmq`库
Apache Flink的RabbitMQ connector使用的是`org.apache.flink:flink-sql-connector-rabbitmq`库
123 2
|
SQL 关系型数据库 MySQL
Flink mysql-cdc connector 源码解析
在 Flink 1.11 引入了 CDC 机制,CDC 的全称是 Change Data Capture,用于捕捉数据库表的增删改查操作,是目前非常成熟的同步数据库变更方案。Flink CDC Connectors 是 Apache Flink 的一组源连接器,是可以从 MySQL、PostgreSQL 数据直接读取全量数据和增量数据的 Source Connectors.
Flink mysql-cdc connector 源码解析
|
数据采集 分布式计算 项目管理
通过 Connector 同步到 Odps|学习笔记
快速学习通过 Connector 同步到 Odps
170 0
通过 Connector 同步到 Odps|学习笔记
|
SQL 存储 NoSQL
Flink SQL 自定义 redis connector
一般情况下,我们不需要创建新的 connector,因为 Flink SQL 已经内置了丰富的 connector 供我们使用,但是在实际生产环境中我们的存储是多种多样的,所以原生的 connector 并不能满足所有用户的需求,这个时候就需要我们自定义 connector,这篇文章的重点就是介绍一下如何实现自定义 Flink SQL connector ? 先来看一下官网的一张 connector 架构图:
Flink SQL 自定义 redis connector
|
消息中间件 存储 Kafka
flink-pulsar-connector
flink-pulsar-connector