TuGraph Analytics动态插件:快速集成大数据生态系统

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 插件机制为GeaFlow任务提供了外部数据源的集成能力扩展,GeaFlow支持从各类Connector中读写数据,GeaFlow将它们都识别为外部表,并将元数据存储在Catalog中。GeaFlow已有一些内置的插件,例如FileConnector,KafkaConnector,JDBCConnector,HiveConnector等。

介绍

插件机制介绍

插件机制为GeaFlow任务提供了外部数据源的集成能力扩展,GeaFlow支持从各类Connector中读写数据,GeaFlow将它们都识别为外部表,并将元数据存储在Catalog中。GeaFlow已有一些内置的插件,例如FileConnector,KafkaConnector,JDBCConnector,HiveConnector等。

GeaFlow也提供了动态插件的功能,用户可以通过Java SPI的方式自定义Connector,连接外部数据源,例如Kafka,Hive等,也可自定义实现不同的sink、source连接方式和逻辑,更多关于自定义插件的介绍,可参考开发手册中自定义Connector章节。同时,GeaFlow Conosole平台为用户提供了插件管理的功能。在Console中,插件属于一种资源类型,用户可以通过白屏化的方式在Console上注册自定义的Connector插件,并在DSL任务或创建表时使用自定义的插件。

插件模型设计

  • GeaflowPlugin: 插件模型。
  • GeaflowPluginType: 插件(数据源)类型(KAFKA、HIVE、JDBC、FILE等)。
  • GeaflowPluginCategory: 插件种类(图、表、文件等)。
  • GealfowPluginConfig: 插件配置。
  • GealfowJarPackage: jar包。

上文所述中,目前支持用户自定义Connector插件种类为TABLE,即可在表配置中使用,作为表的输入或输出源,其插件类型为用户自定义。

除此之外,在GeaFlow Console中,插件的概念更为广泛,还包含了一些系统级的插件,是GeaFlow作业运行所依赖的外部系统,例如运行时元信息插件(RUNTIME_META)、指标系统插件(METRIC)、外部文件系统插件(REMOTE_FILE)、外部图存储系统插件(DATA),如下列表所示。由插件类型和插件种类可唯一确定一个插件,而插件类型和插件种类是多对多的关系,一个种类可能有多种类型,例如REMOTE_FILE种类的插件,其类型可以是LOCAL、DFS、OSS,对应了不同的外部存储系统。

插件引用解析

解析dsl任务中使用的插件是使用代理的方式调用引擎的解析接口,通过Calcite解析得到dsl文本中的信息,其主要分为4步:

  1. 解析DSL中表with参数中定义的插件。
  2. 解析DSL中使用的表绑定的插件。
  3. 获取引擎自带的插件列表。
  4. 将1和2中的结果进行合并,过滤引擎自带的插件,得到最终dsl任务中用户使用的插件列表。

Demo演示

插件开发

自定义Collector

自定义Collector需要实现TableReadableConnectorTableWritableConnector接口,分别是获取数据输入和输出源。
本例子中,在原来的FileTableConnector基础上,扩展了为每条数据增加前缀或后缀的功能。其中MyFileSource可在读取数据时,在每条数据前加一个自定义前缀,而MyFileSink可在写入每条数据时,在其之后加一个自定义后缀。

public class MyFileConnector implements TableWritableConnector, TableReadableConnector {
   
   

    @Override
    public TableSource createSource(Configuration configuration) {
   
   
        return new MyFileSource();
    }

    @Override
    public TableSink createSink(Configuration configuration) {
   
   
        return new MyFileSink();
    }

    @Override
    public String getType() {
   
   
        return "myFileType";
    }

}

public class MyFileSource extends FileTableSource {
   
   

    private static final Logger LOGGER = LoggerFactory.getLogger(MyFileSource.class);

    private String suffix;

    @Override
    public void init(Configuration tableConf, TableSchema tableSchema) {
   
   
        super.init(tableConf, tableSchema);
        this.suffix = tableConf.getString("geaflow.dsl.mysource.suffix");
        if (suffix == null) {
   
   
            suffix = "mySourceSuffix";
        }
        LOGGER.info("init table source with tableConf: {}", tableConf);
    }


    @SuppressWarnings("unchecked")
    @Override
    public <T> FetchData<T> fetch(Partition partition, Optional<Offset> startOffset,
                                  long windowSize) throws IOException {
   
   
        FileTableSource.FileOffset offset = startOffset.map(value -> (FileTableSource.FileOffset) value)
            .orElseGet(() -> new FileTableSource.FileOffset(0L));
        FetchData<T> tFetchData = fileReadHandler.readPartition((FileSplit) partition, offset, (int) windowSize);
        Iterator<T> dataIterator = tFetchData.getDataIterator();

        Iterator<T> newIterator = (Iterator<T>) Iterators.transform(dataIterator, e -> suffix + "_" + e);
        return FetchData.createBatchFetch(newIterator, tFetchData.getNextOffset());
    }

}

public class MyFileSink extends FileTableSink {
   
   

    private String suffix;

    private static final Logger LOGGER = LoggerFactory.getLogger(MyFileSink.class);

    private String separator;

    private StructType schema;

    @Override
    public void init(Configuration tableConf, StructType structType) {
   
   
        super.init(tableConf, structType);
        this.separator = tableConf.getString(ConnectorConfigKeys.GEAFLOW_DSL_COLUMN_SEPARATOR);
        this.schema = Objects.requireNonNull(structType);
        this.suffix = tableConf.getString("geaflow.dsl.mysink.suffix");
        if (suffix == null) {
   
   
            suffix = "mySinkSuffix";
        }
    }

    @Override
    public void write(Row row) throws IOException {
   
   
        Object[] values = new Object[schema.size()];
        for (int i = 0; i < schema.size(); i++) {
   
   
            values[i] = row.getField(i, schema.getType(i));
        }

        StringBuilder line = new StringBuilder();
        for (Object value : values) {
   
   
            if (line.length() > 0) {
   
   
                line.append(separator);
            }
            line.append(value);
        }
        line.append("_").append(suffix);
        LOGGER.info("sinkLine {}", line);
        writer.write(line + "\n");
    }
}

注册插件

GeaFlow使用ServiceLoader的方式读取所有的Connectors,需要在项目/resources/META-INF/services目录下,增加配置文件,文件名为com.antgroup.geaflow.dsl.connector.api.TableConnector。

文件内容为定义的Connector的全类名,如:

com.connector.myconnector.MyFileConnector

准备测试数据

在项目 /resources/data 目录中创建数据文件data1,便于后续测试

1,"tom",15
2,"cat",20
3,"anny",23
4,"alice",21

打包项目

最后,将maven项目进行打包,得到插件的jar包。

插件使用与管理

新增插件

在GeaFlow Console页面,“插件管理”模块中新增插件,填写插件名称方便管理,上传JAR包。其中“插件类型”字段需要和JAR包中自定义Connector#getType方法返回的值一致,并不能和已有插件重名。

创建表

创建source表,在参数配置中,选择类型为自定义的插件类型,并填写相应的参数(如输入表数据路径,自定义的suffix)

创建sink表:

提交任务

创建dsl任务,直接在dsl中使用之前创建的source表和sink表。

insert into sinkTable select * from sourceTable;

发布,提交作业后,在容器的/tmp/geaflow/result目录下,找到结果输出文件, 可看到输出数据中有插件中添加的suffix,表示自定义插件运行成功。

test-source_1,"tom",15_test-sink
test-source_2,"cat",20_test-sink
test-source_3,"anny",23_test-sink
test-source_4,"alice",21_test-sink

至此,我们就成功使用GeaFlow实现了自定义Connector插件!是不是超简单!快来试一试吧!

GeaFlow(品牌名TuGraph-Analytics) 已正式开源,欢迎大家关注!!!
欢迎给我们 Star 哦! GitHub👉 https://github.com/TuGraph-family/tugraph-analytics
更多精彩内容,关注我们的博客 https://geaflow.github.io/

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
11天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
42 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
3月前
|
分布式计算 DataWorks 关系型数据库
MaxCompute 生态系统中的数据集成工具
【8月更文第31天】在大数据时代,数据集成对于构建高效的数据处理流水线至关重要。阿里云的 MaxCompute 是一个用于处理大规模数据集的服务平台,它提供了强大的计算能力和丰富的生态系统工具来帮助用户管理和处理数据。本文将详细介绍如何使用 DataWorks 这样的工具将 MaxCompute 整合到整个数据处理流程中,以便更有效地管理数据生命周期。
117 0
|
4天前
|
XML Java 数据库连接
SpringBoot集成Flowable:打造强大的工作流管理系统
在企业级应用开发中,工作流管理是一个核心组件,它能够帮助我们定义、执行和管理业务流程。Flowable是一个开源的工作流和业务流程管理(BPM)平台,它提供了强大的工作流引擎和建模工具。结合SpringBoot,我们可以快速构建一个高效、灵活的工作流管理系统。本文将探讨如何将Flowable集成到SpringBoot应用中,并展示其强大的功能。
20 1
|
12天前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
45 1
|
1月前
|
SQL 数据库连接 数据库
管理系统中的Visual Studio与SQL集成技巧与方法
在现代软件开发和管理系统中,Visual Studio(VS)作为强大的集成开发环境(IDE),与SQL数据库的紧密集成是构建高效、可靠应用程序的关键
|
1月前
|
SQL 监控 数据库
管理系统VS SQL:高效集成的关键技巧与方法
在现代企业信息化建设中,管理系统(如ERP、CRM等)与SQL数据库之间的紧密集成是确保数据流动顺畅、业务逻辑高效执行的关键
|
2月前
|
并行计算 关系型数据库 分布式数据库
朗坤智慧科技「LiEMS企业管理信息系统」通过PolarDB产品生态集成认证!
近日,朗坤智慧科技股份有限公司「LiEMS企业管理信息系统软件」通过PolarDB产品生态集成认证!
|
3月前
|
存储 Prometheus 监控
Grafana 与 Prometheus 集成:打造高效监控系统
【8月更文第29天】在现代软件开发和运维领域,监控系统已成为不可或缺的一部分。Prometheus 和 Grafana 作为两个非常流行且互补的开源工具,可以协同工作来构建强大的实时监控解决方案。Prometheus 负责收集和存储时间序列数据,而 Grafana 则提供直观的数据可视化功能。本文将详细介绍如何集成这两个工具,构建一个高效、灵活的监控系统。
373 1
|
3月前
|
消息中间件 分布式计算 大数据
RabbitMQ与大数据平台的集成
【8月更文第28天】在现代的大数据处理架构中,消息队列作为数据传输的关键组件扮演着重要的角色。RabbitMQ 是一个开源的消息代理软件,它支持多种消息协议,能够为分布式系统提供可靠的消息传递服务。本篇文章将探讨如何使用 RabbitMQ 与 Hadoop 和 Spark 进行集成,以实现高效的数据处理和分析。
35 1
|
3月前
|
机器学习/深度学习 开发工具 git
Jupyter 与版本控制系统的集成
【8月更文第29天】在数据科学和机器学习项目中,Jupyter Notebook 提供了一个强大的环境来编写代码、执行实验和记录结果。然而,随着项目的复杂度增加以及团队规模的扩大,版本控制变得至关重要。Git 是最常用的版本控制系统之一,它可以帮助团队协作、追踪变更历史、管理分支等。本文将探讨如何将 Git 与 Jupyter Notebook 集成起来,从而更好地管理代码和文档。
62 0
下一篇
无影云桌面