如何基于Flink将流式数据实时写入AnalyticDB for PostgreSQL

简介: 本文主要介绍如何通过 Flink 将流式数据实时写入 ADB PG中,并给出代码demo。本文的Flink 为社区1.7.2版本,ADB PG为阿里云AnalyticDB for PostgreSQL 6.0版。

本文主要介绍如何通过 Flink 将流式数据实时写入 ADB PG中,并提供一个项目代码demo。

版本说明:
Flink 为社区1.7.2版本。
ADB PG为阿里云AnalyticDB for PostgreSQL 6.0版。

使用方法

使用 Flink 作为流处理计算引擎时,可以通过sink connector,将Flink中的数据写入到目标端 。
本文demo中以socketStream作为源端(data source),ADB PG作为目标端(data sink),并提供ADB PG sink connector代码样例 ,完成数据流式写入。
实际应用中,可能需要更改为对应的source connector ,并修改字段映射关系。

步骤一:在ADBPG目标库中建表

create table test(id int,name text);

步骤二:基于nc工具启动socket stream,并向9000端口写入数据

nc -l 9000
flink.png

步骤三:启动 flink demo

bin/flink run -c Adb4PgSinkDemo /root/flinktest-1.0-SNAPSHOT-jar-with-dependencies.jar
flink2.png

步骤四:目标端ADB PG观察数据同步情况

flink3.png

使用参数和设置说明

source connector 设置

demo的源端采用socket stream,可以参照flink官网换为kafka/ES/Hadoop 等source connector。
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/
对于不同的source stream一个通用的要求是每条数据的格式为:各列以英文逗号为分隔符拼接成单行文本。
例如我们想写入一条id=1,name=hello的数据,格式应转换为:

1,hello

sink connector 设置

sink端需要传入primaryKeys(主键字段), fieldNames(全部字段名), types(全部字段类型)三个参数来向ADB PG sink connector传入数据元信息,并传入自己的host address(格式为jdbc:postgresql://host:port/dbName),tableName,userName,password来初始化一个Adb4PgTableSink。

ArrayList<String> primaryKeys = new ArrayList<String>(Arrays.asList(new String[]{"id"}));
ArrayList<String> fieldNames = new ArrayList<String>(Arrays.asList(new String[]{"id", "name"}));
ArrayList<Class<?>> types = new ArrayList<Class<?>>(Arrays.asList(new Class<?>[]{Integer.class, String.class}));
MySchema schema = new MySchema(primaryKeys, fieldNames, types);

DataStream<Row> messageStream = sourceStream.map(new InputMap(schema));

messageStream.addSink(new Adb4PgTableSink("jdbc:postgresql://gp-****.gpdb.rds.aliyuncs.com:3432/your-dbname", "your-tablename", "your-username", "your-password", schema, schema.getPrimaryKeys()));

附:demo代码

完整项目代码请参考

demo 主流程代码如下

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.*;

public class Adb4PgSinkDemo extends RichSinkFunction<Row>{

    private transient static final Logger LOG = LoggerFactory.getLogger(Adb4PgSinkDemo.class);

    public static void main(String[] args) throws Exception {
        //设置环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //Source: 本地监听端口9000获取数据
        DataStream<String> sourceStream = env.socketTextStream("127.0.0.1", 9000);

        ArrayList<String> primaryKeys = new ArrayList<String>(Arrays.asList(new String[]{"id"}));
        ArrayList<String> fieldNames = new ArrayList<String>(Arrays.asList(new String[]{"id", "name"}));
        ArrayList<Class<?>> types = new ArrayList<Class<?>>(Arrays.asList(new Class<?>[]{Integer.class, String.class}));
        MySchema schema = new MySchema(primaryKeys, fieldNames, types);

        DataStream<Row> messageStream = sourceStream.map(new InputMap(schema));

        messageStream.addSink(new Adb4PgTableSink("jdbc:postgresql://gp-****.gpdb.rds.aliyuncs.com:3432/your-dbname", "your-tablename", "your-username", "password", schema, schema.getPrimaryKeys()));
        env.execute("Example");
    }

    // 对输入数据做map操作。
    public static class InputMap implements MapFunction<String, Row> {
        private static final long serialVersionUID = 1L;

        MySchema schema;

        public InputMap(MySchema schema) {
            this.schema = schema;
        }

        //@Override
        public Row map(String line) throws Exception {
            // normalize and split the line
            String[] arr = line.split(",");
            int columnLen = this.schema.getLength();
            if (arr.length == columnLen) {
                Row row = new Row(columnLen);
                for(int i = 0; i < columnLen; i++){
                    row.setField(i, arr[i]);
                }
                return row;
            }
            return null;
        }
    }

    public static class MySchema implements Serializable {
        private static final long serialVersionUID = 10L;
        private ArrayList<String> primaryKeys;
        private ArrayList<String> fieldNames;
        private ArrayList<Class<?>> types;
        private int length;
        public MySchema(ArrayList<String> primaryKeys, ArrayList<String> fieldNames, ArrayList<Class<?>> types){
            this.primaryKeys = primaryKeys;
            this.fieldNames = fieldNames;
            this.types = types;
            length = fieldNames.size();
        }
        public ArrayList<String> getPrimaryKeys() {
            return primaryKeys;
        }
        public int getLength(){
            return length;
        }
        public ArrayList<String> getFieldNames() {
            return fieldNames;
        }
        public ArrayList<Class<?>> getTypes() {
            return types;
        }
        public int getFieldIndex(String key) {
            for(int i=0 ; i < fieldNames.size();i++) {
                if (key.equalsIgnoreCase(fieldNames.get(i))) {
                    return i;
                }
            }
            return -1;
        }
    }

}
相关实践学习
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
目录
相关文章
|
10月前
|
存储 消息中间件 OLAP
基于 Flink+Paimon+Hologres 搭建淘天集团湖仓一体数据链路
本文整理自淘天集团高级数据开发工程师朱奥在Flink Forward Asia 2024的分享,围绕实时数仓优化展开。内容涵盖项目背景、核心策略、解决方案、项目价值及未来计划五部分。通过引入Paimon和Hologres技术,解决当前流批存储不统一、实时数据可见性差等痛点,实现流批一体存储与高效近实时数据加工。项目显著提升了数据时效性和开发运维效率,降低了使用门槛与成本,并规划未来在集团内推广湖仓一体架构,探索更多技术创新场景。
1759 3
基于 Flink+Paimon+Hologres 搭建淘天集团湖仓一体数据链路
|
11月前
|
消息中间件 存储 监控
Lalamove基于Flink实时湖仓演进之路
本文由货拉拉国际化技术部资深数据仓库工程师林海亮撰写,围绕Flink在实时数仓中的应用展开。文章首先介绍了Lalamove业务背景,随后分析了Flink在实时看板、数据服务API、数据监控及数据分析中的应用与挑战,如多数据中心、时区差异、上游改造频繁及高成本问题。接着阐述了实时数仓架构从无分层到引入Paimon湖仓的演进过程,解决了数据延迟、兼容性及资源消耗等问题。最后展望未来,提出基于Fluss+Paimon优化架构的方向,进一步提升性能与降低成本。
409 11
Lalamove基于Flink实时湖仓演进之路
|
11月前
|
存储 监控 数据挖掘
京东物流基于Flink & StarRocks的湖仓建设实践
本文整理自京东物流高级数据开发工程师梁宝彬在Flink Forward Asia 2024的分享,聚焦实时湖仓的探索与建设、应用实践、问题思考及未来展望。内容涵盖京东物流通过Flink和Paimon等技术构建实时湖仓体系的过程,解决复杂业务场景下的数据分析挑战,如多维OLAP分析、大屏监控等。同时,文章详细介绍了基于StarRocks的湖仓一体方案,优化存储成本并提升查询效率,以及存算分离的应用实践。最后,对未来数据服务的发展方向进行了展望,计划推广长周期数据存储服务和原生数据湖建设,进一步提升数据分析能力。
1016 1
京东物流基于Flink & StarRocks的湖仓建设实践
|
6月前
|
存储 JSON 数据处理
Flink基于Paimon的实时湖仓解决方案的演进
本文源自Apache CommunityOverCode Asia 2025,阿里云专家苏轩楠分享Flink与Paimon构建实时湖仓的演进实践。深度解析Variant数据类型、Lookup Join优化等关键技术,提升半结构化数据处理效率与系统可扩展性,推动实时湖仓在生产环境的高效落地。
768 1
Flink基于Paimon的实时湖仓解决方案的演进
|
6月前
|
存储 人工智能 监控
淘宝闪购基于Flink&Paimon的Lakehouse生产实践:从实时数仓到湖仓一体化的演进之路
本文整理自淘宝闪购(饿了么)大数据架构师王沛斌在 Flink Forward Asia 2025 上海站的分享,深度解析其基于 Apache Flink 与 Paimon 的 Lakehouse 架构演进与落地实践,涵盖实时数仓发展、技术选型、平台建设及未来展望。
1232 0
淘宝闪购基于Flink&Paimon的Lakehouse生产实践:从实时数仓到湖仓一体化的演进之路
|
6月前
|
存储 人工智能 关系型数据库
阿里云AnalyticDB for PostgreSQL 入选VLDB 2025:统一架构破局HTAP,Beam+Laser引擎赋能Data+AI融合新范式
在数据驱动与人工智能深度融合的时代,企业对数据仓库的需求早已超越“查得快”这一基础能力。面对传统数仓挑战,阿里云瑶池数据库AnalyticDB for PostgreSQL(简称ADB-PG)创新性地构建了统一架构下的Shared-Nothing与Shared-Storage双模融合体系,并自主研发Beam混合存储引擎与Laser向量化执行引擎,全面解决HTAP场景下性能、弹性、成本与实时性的矛盾。 近日,相关研究成果发表于在英国伦敦召开的数据库领域顶级会议 VLDB 2025,标志着中国自研云数仓技术再次登上国际舞台。
686 0
|
12月前
|
存储 缓存 数据挖掘
Flink + Doris 实时湖仓解决方案
本文整理自SelectDB技术副总裁陈明雨在Flink Forward Asia 2024的分享,聚焦Apache Doris与湖仓一体解决方案。内容涵盖三部分:一是介绍Apache Doris,一款高性能实时分析数据库,支持多场景应用;二是基于Doris、Flink和Paimon的湖仓解决方案,解决批流融合与数据一致性挑战;三是Doris社区生态及云原生发展,包括存算分离架构与600多位贡献者的活跃社区。文章深入探讨了Doris在性能、易用性及场景支持上的优势,并展示了其在多维分析、日志分析和湖仓分析中的实际应用案例。
1040 17
Flink + Doris 实时湖仓解决方案
|
SQL 消息中间件 Kafka
Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
本文介绍了阿里云实时数仓Hologres负责人姜伟华在Flink Forward Asia 2024上的分享,涵盖实时数仓的发展历程、从实时数仓到实时湖仓的演进,以及总结。文章通过三代实时数仓架构的演变,详细解析了Lambda架构、Kafka实时数仓分层+OLAP、Hologres实时数仓分层复用等方案,并探讨了未来从实时数仓到实时湖仓的演进方向。最后,结合实际案例和Demo展示了Hologres + Flink + Paimon在实时湖仓中的应用,帮助用户根据业务需求选择合适的方案。
1665 20
Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
|
SQL 监控 关系型数据库
用友畅捷通在Flink上构建实时数仓、挑战与最佳实践
本文整理自用友畅捷通数据架构师王龙强在FFA2024上的分享,介绍了公司在Flink上构建实时数仓的经验。内容涵盖业务背景、数仓建设、当前挑战、最佳实践和未来展望。随着数据量增长,公司面临数据库性能瓶颈及实时数据处理需求,通过引入Flink技术逐步解决了数据同步、链路稳定性和表结构差异等问题,并计划在未来进一步优化链路稳定性、探索湖仓一体架构以及结合AI技术推进数据资源高效利用。
885 25
用友畅捷通在Flink上构建实时数仓、挑战与最佳实践
|
存储 关系型数据库 MySQL
Flink基于Paimon的实时湖仓解决方案的演进
本文整理自阿里云智能集团苏轩楠老师在Flink Forward Asia 2024论坛的分享,涵盖流式湖仓架构的背景介绍、技术演进和未来发展规划。背景部分介绍了ODS、DWD、DWS三层数据架构及关键组件Flink与Paimon的作用;技术演进讨论了全量与增量数据处理优化、宽表构建及Compaction操作的改进;发展规划则展望了Range Partition、Materialized Table等新功能的应用前景。通过这些优化,系统不仅简化了复杂度,还提升了实时与离线处理的灵活性和效率。
1038 3
Flink基于Paimon的实时湖仓解决方案的演进

相关产品

  • 云数据库 RDS PostgreSQL 版
  • 推荐镜像

    更多