- 目录结构
首先开发者需要通过 git 下载最新代码到本地,并导入到 IDE 中。同时创建自己的工作分支,使用该分支开发自己的 Connector。
项目地址:github.com/bytedance/b…
项目结构如下:
- 开发流程
BitSail 是一款基于分布式架构的数据集成引擎,Connector 会并发执行。并由 BitSail 框架来负责任务的调度、并发执行、脏数据处理等,开发者只需要实现对应接口即可,具体开发流程如下:
工程配置,开发者需要在 bitsail/bitsail-connectors/pom.xml 模块中注册自己的 Connector,同时在 bitsail/bitsail-dist/pom.xml 增加自己的 Connector 模块,同时为你的连接器注册配置文件,来使得框架可以在运行时动态发现它。
Connector 开发,实现 Source、Sink 提供的抽象方法,具体细节参考后续介绍。
数据输出类型,目前支持的数据类型为 BitSail Row 类型,无论是 Source 在 Reader 中传递给下游的数据类型,还是 Sink 从上游消费的数据类型,都应该是 BitSail Row 类型。
- Architecture
当前 Source API 的设计同时兼容了流批一批的场景,换言之就是同时支持 pull & push 的场景。在此之前,我们需要首先再过一遍传统流批场景中各组件的交互模型。
3.1 Batch Model
传统批式场景中,数据的读取一般分为如下几步:
createSplits:一般在 client 端或者中心节点执行,目的是将完整的数据按照指定的规则尽可能拆分为较多的 rangeSplits,createSplits 在作业生命周期内有且执行一次。
runWithSplit: 一般在执行节点节点执行,执行节点启动后会向中心节点请求存在的 rangeSplit,然后再本地进行执行;执行完成后会再次向中心节点请求直到所有 splits 执行完成。
commit:全部的 split 的执行完成后,一般会在中心节点执行 commit 的操作,用于将数据对外可见。
3.2 Stream Model
传统流式场景中,数据的读取一般分为如下几步:
createSplits:一般在 client 端或者中心节点执行,目的是根据滑动窗口或者滚动窗口的策略将数据流划分为 rangeSplits,createSplits 在流式作业的生命周期中按照划分窗口的会一直执行。
runWithSplit: 一般在执行节点节点执行,中心节点会向可执行节点发送 rangeSplit,然后在可执行节点本地进行执行;执行完成后会将处理完的 splits 数据向下游发送。
commit:全部的 split 的执行完成后,一般会向目标数据源发送 retract message,实时动态展现结果。
3.3 BitSail Model
createSplits:BitSail 通过 SplitCoordinator 模块划分 rangeSplits,在流式作业中的生命周期中 createSplits 会周期性执行,而在批式作业中仅仅会执行一次。
runWithSplit: 在执行节点节点执行,BitSail 中执行节点包括 Reader 和 Writer 模块,中心节点会向可执行节点发送 rangeSplit,然后在可执行节点本地进行执行;执行完成后会将处理完的 splits 数据向下游发送。
commit:writer 在完成数据写入后,committer 来完成提交。在不开启 checkpoint 时,commit 会在所有 writer 都结束后执行一次;在开启 checkpoint 时,commit 会在每次 checkpoint 的时候都会执行一次。
- Source Connector
Source: 数据读取组件的生命周期管理,主要负责和框架的交互,构架作业,不参与作业真正的执行。
SourceSplit: 数据读取分片;大数据处理框架的核心目的就是将大规模的数据拆分成为多个合理的 Split。
State:作业状态快照,当开启 checkpoint 之后,会保存当前执行状态。
SplitCoordinator: 既然提到了 Split,就需要有相应的组件去创建、管理 Split;SplitCoordinator 承担了这样的角色。
SourceReader: 真正负责数据读取的组件,在接收到 Split 后会对其进行数据读取,然后将数据传输给下一个算子。
Source Connector 开发流程如下
首先需要创建 Source 类,需要实现 Source 和 ParallelismComputable 接口,主要负责和框架的交互,构架作业,它不参与作业真正的执行。
BitSail 的 Source 采用流批一体的设计思想,通过 getSourceBoundedness 方法设置作业的处理方式,通过 configure 方法定义 readerConfiguration 的配置,通过 createTypeInfoConverter 方法来进行数据类型转换,可以通过 FileMappingTypeInfoConverter 得到用户在 yaml 文件中自定义的数据源类型和 BitSail 类型的转换,实现自定义化的类型转换。
最后,定义数据源的数据分片格式 SourceSplit 类和闯将管理 Split 的角色 SourceSplitCoordinator 类。
最后完成 SourceReader 实现从 Split 中进行数据的读取。
每个 SourceReader 都在独立的线程中执行,并保证 SourceSplitCoordinator 分配给不同 SourceReader 的切片没有交集。
在 SourceReader 的执行周期中,开发者只需要关注如何从构造好的切片中去读取数据,之后完成数据类型对转换,将外部数据类型转换成 BitSail 的 Row 类型传递给下游即可。
4.1 Reader 示例
public class FakeSourceReader extends SimpleSourceReaderBase {
private final BitSailConfiguration readerConfiguration;
private final TypeInfo<?>[] typeInfos;
private final transient int totalCount;
private final transient RateLimiter fakeGenerateRate;
private final transient AtomicLong counter;
private final FakeRowGenerator fakeRowGenerator;
public FakeSourceReader(BitSailConfiguration readerConfiguration, Context context) {
this.readerConfiguration = readerConfiguration;
this.typeInfos = context.getTypeInfos();
this.totalCount = readerConfiguration.get(FakeReaderOptions.TOTAL_COUNT);
this.fakeGenerateRate = RateLimiter.create(readerConfiguration.get(FakeReaderOptions.RATE));
this.counter = new AtomicLong();
this.fakeRowGenerator = new FakeRowGenerator(readerConfiguration, context.getIndexOfSubtask());
}
@Override
public void pollNext(SourcePipeline pipeline) throws Exception {
fakeGenerateRate.acquire();
pipeline.output(fakeRowGenerator.fakeOneRecord(typeInfos));
}
@Override
public boolean hasMoreElements() {
return counter.incrementAndGet() <= totalCount;
}
}