Flink Doris Connector设计方案

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 我们选择了Flink作为我们架构的一部分,用于数据的ETL及实时计算框架,社区目前支持Spark doris connector,因此我们参照Spark doris connector 设计开发了Flink doris Connector。

概述


该方案首先感谢社区Spark Doris Connector的作者


从Doris角度看,将其数据引入Flink,可以使用Flink一系列丰富的生态产品,拓宽了产品的想象力,也使得Doris和其他数据源的联合查询成为可能


从我们业务架构出发和业务需求,我们选择了Flink作为我们架构的一部分,用于数据的ETL及实时计算框架,社区目前支持Spark doris connector,因此我们参照Spark doris connector 设计开发了Flink doris Connector。


技术选型


一开始我们选型的时候,也是和Spark Doris Connector 一样,开始考虑的是JDBC的方式,但是这种方式就像Spark doris connector那篇文章中说的,有优点,但是缺点更明显。后来我们阅读及测试了Spark的代码,决定站在巨人的肩上来实现(备注:直接拷贝代码修改)。


以下内容来自Spark Doris Connector博客的,直接拷贝了

于是我们开发了针对Doris的新的Datasource,Spark-Doris-Connector。这种方案下,Doris可以暴露Doris数据分布给Spark。Spark的Driver访问Doris的FE获取Doris表的Schema和底层数据分布。之后,依据此数据分布,合理分配数据查询任务给Executors。最后,Spark的Executors分别访问不同的BE进行查询。大大提升了查询的效率

使用方法


在Doris的代码库的 extension/flink-doris-connector/ 目录下编译生成doris-flink-1.0.0-SNAPSHOT.jar,将这个jar包加入flink的ClassPath中,即可使用Flink-on-Doris功能了


SQL方式


CREATE TABLE flink_doris_source (
    name STRING,
    age INT,
    price DECIMAL(5,2),
    sale DOUBLE
    ) 
    WITH (
      'connector' = 'doris',
      'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT',
      'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME',
      'username' = '$YOUR_DORIS_USERNAME',
      'password' = '$YOUR_DORIS_PASSWORD'
);
CREATE TABLE flink_doris_sink (
    name STRING,
    age INT,
    price DECIMAL(5,2),
    sale DOUBLE
    ) 
    WITH (
      'connector' = 'doris',
      'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT',
      'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME',
      'username' = '$YOUR_DORIS_USERNAME',
      'password' = '$YOUR_DORIS_PASSWORD'
);
INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source

DataStream方式


DorisOptions.Builder options = DorisOptions.builder()
                .setFenodes("$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
                .setUsername("$YOUR_DORIS_USERNAME")
                .setPassword("$YOUR_DORIS_PASSWORD")
                .setTableIdentifier("$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME");
env.addSource(new DorisSourceFunction<>(options.build(),new SimpleListDeserializationSchema())).print();

适用场景

image.png


1.使用Flink对Doris中的数据和其他数据源进行联合分析


很多业务部门会将自己的数据放在不同的存储系统上,比如一些在线分析、报表的数据放在Doris中,一些结构化检索数据放在Elasticsearch中、一些需要事物的数据放在MySQL中,等等。业务往往需要跨多个存储源进行分析,通过Flink Doris Connector打通Flink和Doris后,业务可以直接使用Flink,将Doris中的数据与多个外部数据源做联合查询计算。


2.实时数据接入


Flink Doris Connector之前:针对业务不规则数据,经常需要针对消息做规范处理,空值过滤等写入新的topic,然后再启动Routine load写入Doris。

image.png


image.png

Flink Doris Connector之后:flink读取kafka,直接写入doris。


image.png

image.png

技术实现

架构图

image.png

image

Doris对外提供更多能力


Doris FE


对外开放了获取内部表的元数据信息、单表查询规划和部分统计信息的接口。


所有的Rest API接口都需要进行HttpBasic认证,用户名和密码是登录数据库的用户名和密码,需要注意权限的正确分配。

// 获取表schema元信息
GET api/{database}/{table}/_schema
// 获取对单表的查询规划模版
POST api/{database}/{table}/_query_plan
{
"sql": "select k1, k2 from {database}.{table}"
}
// 获取表大小
GET api/{database}/{table}/_count

Doris BE


通过Thrift协议,直接对外提供数据的过滤、扫描和裁剪能力。


service TDorisExternalService {
    // 初始化查询执行器
TScanOpenResult open_scanner(1: TScanOpenParams params);
// 流式batch获取数据,Apache Arrow数据格式
    TScanBatchResult get_next(1: TScanNextBatchParams params);
// 结束扫描
    TScanCloseResult close_scanner(1: TScanCloseParams params);
}


实现DataStream


继承 org.apache.flink.streaming.api.functions.source.RichSourceFunction ,自定义DorisSourceFunction,初始化时,获取相关表的执行计划,获取对应的分区。

重写run方法,循环从分区中读取数据。


public void run(SourceContext sourceContext){
       //循环读取各分区
        for(PartitionDefinition partitions : dorisPartitions){
            scalaValueReader = new ScalaValueReader(partitions, settings);
            while (scalaValueReader.hasNext()){
                Object next = scalaValueReader.next();
                sourceContext.collect(next);
            }
        }
}

实现Flink SQL on Doris


参考了Flink自定义Source&Sink 和 Flink-jdbc-connector,实现了下面的效果,可以实现用Flink SQL直接操作Doris的表,包括读和写。


实现细节


1.实现DynamicTableSourceFactory , DynamicTableSinkFactory 注册 doris connector


2.自定义DynamicTableSource和DynamicTableSink 生成逻辑计划


3.DorisRowDataInputFormat和DorisDynamicOutputFormat获取到逻辑计划后开始执行。

image.png


image

实现中最主要的是基于RichInputFormat和RichOutputFormat 定制的

DorisRowDataInputFormat和DorisDynamicOutputFormat。


在DorisRowDataInputFormat中,将获取到的dorisPartitions 在createInputSplits中 切分成多个分片,用于并行计算。


public DorisTableInputSplit[] createInputSplits(int minNumSplits) {
        List<DorisTableInputSplit> dorisSplits = new ArrayList<>();
        int splitNum = 0;
        for (PartitionDefinition partition : dorisPartitions) {
            dorisSplits.add(new DorisTableInputSplit(splitNum++,partition));
        }
        return dorisSplits.toArray(new DorisTableInputSplit[0]);
}
public RowData nextRecord(RowData reuse)  {
        if (!hasNext) {
            //已经读完数据,返回null
            return null;
        }
        List next = (List)scalaValueReader.next();
        GenericRowData genericRowData = new GenericRowData(next.size());
        for(int i =0;i<next.size();i++){
            genericRowData.setField(i, next.get(i));
        }
        //判断是否还有数据
        hasNext = scalaValueReader.hasNext();
        return genericRowData;
}

在DorisRowDataOutputFormat中,通过streamload的方式向doris中写数据。


streamload程序参考org.apache.doris.plugin.audit.DorisStreamLoader

public  void writeRecord(RowData row) throws IOException {
       //streamload 默认分隔符 \t
        StringJoiner value = new StringJoiner("\t");
        GenericRowData rowData = (GenericRowData) row;
        for(int i = 0; i < row.getArity(); ++i) {
            value.add(rowData.getField(i).toString());
        }
        //streamload 写数据
        DorisStreamLoad.LoadResponse loadResponse = dorisStreamLoad.loadBatch(value.toString());
        System.out.println(loadResponse);




相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
6月前
|
存储 缓存 数据挖掘
Flink + Doris 实时湖仓解决方案
本文整理自SelectDB技术副总裁陈明雨在Flink Forward Asia 2024的分享,聚焦Apache Doris与湖仓一体解决方案。内容涵盖三部分:一是介绍Apache Doris,一款高性能实时分析数据库,支持多场景应用;二是基于Doris、Flink和Paimon的湖仓解决方案,解决批流融合与数据一致性挑战;三是Doris社区生态及云原生发展,包括存算分离架构与600多位贡献者的活跃社区。文章深入探讨了Doris在性能、易用性及场景支持上的优势,并展示了其在多维分析、日志分析和湖仓分析中的实际应用案例。
503 17
Flink + Doris 实时湖仓解决方案
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
364 21
|
10月前
|
存储 监控 数据处理
flink 向doris 数据库写入数据时出现背压如何排查?
本文介绍了如何确定和解决Flink任务向Doris数据库写入数据时遇到的背压问题。首先通过Flink Web UI和性能指标监控识别背压,然后从Doris数据库性能、网络连接稳定性、Flink任务数据处理逻辑及资源配置等方面排查原因,并通过分析相关日志进一步定位问题。
737 61
|
SQL 关系型数据库 MySQL
如何在Dataphin中构建Flink+Paimon流式湖仓方案
当前大数据处理工业界非常重要的一个大趋势是一体化,尤其是湖仓一体架构。与过去分散的数据仓库和数据湖不同,湖仓一体架构通过将数据存储和处理融为一体,不仅提升了数据访问速度和处理效率,还简化了数据管理流程,降低了资源成本。企业可以更轻松地实现数据治理和分析,从而快速决策。paimon是国内开源的,也是最年轻的成员。 本文主要演示如何在 Dataphin 产品中构建 Flink+Paimon 的流式湖仓方案。
8330 11
如何在Dataphin中构建Flink+Paimon流式湖仓方案
|
11月前
|
SQL 分布式计算 大数据
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(一)
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(一)
152 0
|
11月前
|
大数据 流计算
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(二)
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(二)
125 0
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何配置Connector来保持与MySOL一致
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之运行mysql to doris pipeline时报错,该如何排查
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
SQL 设计模式 数据处理
Flink SQL 在快手实践问题之状态兼容的终极方案特点内容如何解决
Flink SQL 在快手实践问题之状态兼容的终极方案特点内容如何解决
97 0

热门文章

最新文章