如何基于Flink将流式数据实时写入AnalyticDB for PostgreSQL

简介: 本文主要介绍如何通过 Flink 将流式数据实时写入 ADB PG中,并给出代码demo。本文的Flink 为社区1.7.2版本,ADB PG为阿里云AnalyticDB for PostgreSQL 6.0版。

本文主要介绍如何通过 Flink 将流式数据实时写入 ADB PG中,并提供一个项目代码demo。

版本说明:
Flink 为社区1.7.2版本。
ADB PG为阿里云AnalyticDB for PostgreSQL 6.0版。

使用方法

使用 Flink 作为流处理计算引擎时,可以通过sink connector,将Flink中的数据写入到目标端 。
本文demo中以socketStream作为源端(data source),ADB PG作为目标端(data sink),并提供ADB PG sink connector代码样例 ,完成数据流式写入。
实际应用中,可能需要更改为对应的source connector ,并修改字段映射关系。

步骤一:在ADBPG目标库中建表

create table test(id int,name text);

步骤二:基于nc工具启动socket stream,并向9000端口写入数据

nc -l 9000
flink.png

步骤三:启动 flink demo

bin/flink run -c Adb4PgSinkDemo /root/flinktest-1.0-SNAPSHOT-jar-with-dependencies.jar
flink2.png

步骤四:目标端ADB PG观察数据同步情况

flink3.png

使用参数和设置说明

source connector 设置

demo的源端采用socket stream,可以参照flink官网换为kafka/ES/Hadoop 等source connector。
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/
对于不同的source stream一个通用的要求是每条数据的格式为:各列以英文逗号为分隔符拼接成单行文本。
例如我们想写入一条id=1,name=hello的数据,格式应转换为:

1,hello

sink connector 设置

sink端需要传入primaryKeys(主键字段), fieldNames(全部字段名), types(全部字段类型)三个参数来向ADB PG sink connector传入数据元信息,并传入自己的host address(格式为jdbc:postgresql://host:port/dbName),tableName,userName,password来初始化一个Adb4PgTableSink。

ArrayList<String> primaryKeys = new ArrayList<String>(Arrays.asList(new String[]{"id"}));
ArrayList<String> fieldNames = new ArrayList<String>(Arrays.asList(new String[]{"id", "name"}));
ArrayList<Class<?>> types = new ArrayList<Class<?>>(Arrays.asList(new Class<?>[]{Integer.class, String.class}));
MySchema schema = new MySchema(primaryKeys, fieldNames, types);

DataStream<Row> messageStream = sourceStream.map(new InputMap(schema));

messageStream.addSink(new Adb4PgTableSink("jdbc:postgresql://gp-****.gpdb.rds.aliyuncs.com:3432/your-dbname", "your-tablename", "your-username", "your-password", schema, schema.getPrimaryKeys()));

附:demo代码

完整项目代码请参考

demo 主流程代码如下

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.*;

public class Adb4PgSinkDemo extends RichSinkFunction<Row>{

    private transient static final Logger LOG = LoggerFactory.getLogger(Adb4PgSinkDemo.class);

    public static void main(String[] args) throws Exception {
        //设置环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //Source: 本地监听端口9000获取数据
        DataStream<String> sourceStream = env.socketTextStream("127.0.0.1", 9000);

        ArrayList<String> primaryKeys = new ArrayList<String>(Arrays.asList(new String[]{"id"}));
        ArrayList<String> fieldNames = new ArrayList<String>(Arrays.asList(new String[]{"id", "name"}));
        ArrayList<Class<?>> types = new ArrayList<Class<?>>(Arrays.asList(new Class<?>[]{Integer.class, String.class}));
        MySchema schema = new MySchema(primaryKeys, fieldNames, types);

        DataStream<Row> messageStream = sourceStream.map(new InputMap(schema));

        messageStream.addSink(new Adb4PgTableSink("jdbc:postgresql://gp-****.gpdb.rds.aliyuncs.com:3432/your-dbname", "your-tablename", "your-username", "password", schema, schema.getPrimaryKeys()));
        env.execute("Example");
    }

    // 对输入数据做map操作。
    public static class InputMap implements MapFunction<String, Row> {
        private static final long serialVersionUID = 1L;

        MySchema schema;

        public InputMap(MySchema schema) {
            this.schema = schema;
        }

        //@Override
        public Row map(String line) throws Exception {
            // normalize and split the line
            String[] arr = line.split(",");
            int columnLen = this.schema.getLength();
            if (arr.length == columnLen) {
                Row row = new Row(columnLen);
                for(int i = 0; i < columnLen; i++){
                    row.setField(i, arr[i]);
                }
                return row;
            }
            return null;
        }
    }

    public static class MySchema implements Serializable {
        private static final long serialVersionUID = 10L;
        private ArrayList<String> primaryKeys;
        private ArrayList<String> fieldNames;
        private ArrayList<Class<?>> types;
        private int length;
        public MySchema(ArrayList<String> primaryKeys, ArrayList<String> fieldNames, ArrayList<Class<?>> types){
            this.primaryKeys = primaryKeys;
            this.fieldNames = fieldNames;
            this.types = types;
            length = fieldNames.size();
        }
        public ArrayList<String> getPrimaryKeys() {
            return primaryKeys;
        }
        public int getLength(){
            return length;
        }
        public ArrayList<String> getFieldNames() {
            return fieldNames;
        }
        public ArrayList<Class<?>> getTypes() {
            return types;
        }
        public int getFieldIndex(String key) {
            for(int i=0 ; i < fieldNames.size();i++) {
                if (key.equalsIgnoreCase(fieldNames.get(i))) {
                    return i;
                }
            }
            return -1;
        }
    }

}
相关实践学习
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
目录
相关文章
|
7月前
|
运维 算法 机器人
阿里云AnalyticDB具身智能方案:破解机器人仿真数据、算力与运维之困
本文将介绍阿里云瑶池旗下的云原生数据仓库AnalyticDB MySQL推出的全托管云上仿真解决方案,方案采用云原生架构,为开发者提供从开发环境、仿真计算到数据管理的全链路支持。
|
5月前
|
存储 人工智能 关系型数据库
阿里云AnalyticDB for PostgreSQL 入选VLDB 2025:统一架构破局HTAP,Beam+Laser引擎赋能Data+AI融合新范式
在数据驱动与人工智能深度融合的时代,企业对数据仓库的需求早已超越“查得快”这一基础能力。面对传统数仓挑战,阿里云瑶池数据库AnalyticDB for PostgreSQL(简称ADB-PG)创新性地构建了统一架构下的Shared-Nothing与Shared-Storage双模融合体系,并自主研发Beam混合存储引擎与Laser向量化执行引擎,全面解决HTAP场景下性能、弹性、成本与实时性的矛盾。 近日,相关研究成果发表于在英国伦敦召开的数据库领域顶级会议 VLDB 2025,标志着中国自研云数仓技术再次登上国际舞台。
517 0
|
12月前
|
存储 SQL 数据挖掘
数据无界、湖仓无界, Apache Doris 湖仓一体解决方案全面解读(上篇)
湖仓一体架构融合了数据湖的低成本、高扩展性,以及数据仓库的高性能、强数据治理能力,高效应对大数据时代的挑战。为助力企业实现湖仓一体的建设,Apache Doris 提出了数据无界和湖仓无界核心理念,并结合自身特性,助力企业加速从 0 到 1 构建湖仓体系,降低转型过程中的风险和成本。本文将对湖仓一体演进及 Apache Doris 湖仓一体方案进行介绍。
1153 1
数据无界、湖仓无界, Apache Doris 湖仓一体解决方案全面解读(上篇)
|
11月前
|
SQL 关系型数据库 PostgreSQL
【YashanDB 知识库】从 PostgreSQL 迁移到 YashanDB 如何进行数据行数比对
【YashanDB 知识库】从 PostgreSQL 迁移到 YashanDB 如何进行数据行数比对
|
11月前
|
SQL Oracle 关系型数据库
【YashanDB知识库】从PostgreSQL迁移到YashanDB如何进行数据行数比对
本文介绍了通过Oracle视图`v$sql`和`v$sql_plan`分析SQL性能的方法。首先,可通过`plan_hash_value`从`v$sql_plan`获取SQL执行计划,结合示例展示了具体查询方式。文章还创建了一个UDF函数`REPEAT`用于格式化输出,便于阅读复杂执行计划。最后,通过实例展示了如何根据`plan_hash_value`获取SQL文本及其内存中的执行计划,帮助优化性能问题。
|
存储 关系型数据库 数据库
【赵渝强老师】PostgreSQL的数据文件
PostgreSQL的物理存储结构主要包括数据文件、日志文件等。数据文件按oid命名,超过1G时自动拆分。通过查询数据库和表的oid,可定位到具体的数据文件。例如,查询数据库oid后,再查询特定表的oid及relfilenode,即可找到该表对应的数据文件位置。
336 1
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
411 1
|
SQL 关系型数据库 MySQL
SQL Server、MySQL、PostgreSQL:主流数据库SQL语法异同比较——深入探讨数据类型、分页查询、表创建与数据插入、函数和索引等关键语法差异,为跨数据库开发提供实用指导
【8月更文挑战第31天】SQL Server、MySQL和PostgreSQL是当今最流行的关系型数据库管理系统,均使用SQL作为查询语言,但在语法和功能实现上存在差异。本文将比较它们在数据类型、分页查询、创建和插入数据以及函数和索引等方面的异同,帮助开发者更好地理解和使用这些数据库。尽管它们共用SQL语言,但每个系统都有独特的语法规则,了解这些差异有助于提升开发效率和项目成功率。
1798 0
|
SQL 关系型数据库 HIVE
实时计算 Flink版产品使用问题之如何将PostgreSQL数据实时入库Hive并实现断点续传
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

相关产品

  • 云数据库 RDS PostgreSQL 版
  • 推荐镜像

    更多