如何基于Flink将流式数据实时写入AnalyticDB for PostgreSQL

本文涉及的产品
RDS PostgreSQL Serverless,0.5-4RCU 50GB 3个月
推荐场景:
对影评进行热评分析
阿里云百炼推荐规格 ADB PostgreSQL,4核16GB 100GB 1个月
简介: 本文主要介绍如何通过 Flink 将流式数据实时写入 ADB PG中,并给出代码demo。本文的Flink 为社区1.7.2版本,ADB PG为阿里云AnalyticDB for PostgreSQL 6.0版。

本文主要介绍如何通过 Flink 将流式数据实时写入 ADB PG中,并提供一个项目代码demo。

版本说明:
Flink 为社区1.7.2版本。
ADB PG为阿里云AnalyticDB for PostgreSQL 6.0版。

使用方法

使用 Flink 作为流处理计算引擎时,可以通过sink connector,将Flink中的数据写入到目标端 。
本文demo中以socketStream作为源端(data source),ADB PG作为目标端(data sink),并提供ADB PG sink connector代码样例 ,完成数据流式写入。
实际应用中,可能需要更改为对应的source connector ,并修改字段映射关系。

步骤一:在ADBPG目标库中建表

create table test(id int,name text);

步骤二:基于nc工具启动socket stream,并向9000端口写入数据

nc -l 9000
flink.png

步骤三:启动 flink demo

bin/flink run -c Adb4PgSinkDemo /root/flinktest-1.0-SNAPSHOT-jar-with-dependencies.jar
flink2.png

步骤四:目标端ADB PG观察数据同步情况

flink3.png

使用参数和设置说明

source connector 设置

demo的源端采用socket stream,可以参照flink官网换为kafka/ES/Hadoop 等source connector。
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/
对于不同的source stream一个通用的要求是每条数据的格式为:各列以英文逗号为分隔符拼接成单行文本。
例如我们想写入一条id=1,name=hello的数据,格式应转换为:

1,hello

sink connector 设置

sink端需要传入primaryKeys(主键字段), fieldNames(全部字段名), types(全部字段类型)三个参数来向ADB PG sink connector传入数据元信息,并传入自己的host address(格式为jdbc:postgresql://host:port/dbName),tableName,userName,password来初始化一个Adb4PgTableSink。

ArrayList<String> primaryKeys = new ArrayList<String>(Arrays.asList(new String[]{"id"}));
ArrayList<String> fieldNames = new ArrayList<String>(Arrays.asList(new String[]{"id", "name"}));
ArrayList<Class<?>> types = new ArrayList<Class<?>>(Arrays.asList(new Class<?>[]{Integer.class, String.class}));
MySchema schema = new MySchema(primaryKeys, fieldNames, types);

DataStream<Row> messageStream = sourceStream.map(new InputMap(schema));

messageStream.addSink(new Adb4PgTableSink("jdbc:postgresql://gp-****.gpdb.rds.aliyuncs.com:3432/your-dbname", "your-tablename", "your-username", "your-password", schema, schema.getPrimaryKeys()));

附:demo代码

完整项目代码请参考

demo 主流程代码如下

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.*;

public class Adb4PgSinkDemo extends RichSinkFunction<Row>{

    private transient static final Logger LOG = LoggerFactory.getLogger(Adb4PgSinkDemo.class);

    public static void main(String[] args) throws Exception {
        //设置环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //Source: 本地监听端口9000获取数据
        DataStream<String> sourceStream = env.socketTextStream("127.0.0.1", 9000);

        ArrayList<String> primaryKeys = new ArrayList<String>(Arrays.asList(new String[]{"id"}));
        ArrayList<String> fieldNames = new ArrayList<String>(Arrays.asList(new String[]{"id", "name"}));
        ArrayList<Class<?>> types = new ArrayList<Class<?>>(Arrays.asList(new Class<?>[]{Integer.class, String.class}));
        MySchema schema = new MySchema(primaryKeys, fieldNames, types);

        DataStream<Row> messageStream = sourceStream.map(new InputMap(schema));

        messageStream.addSink(new Adb4PgTableSink("jdbc:postgresql://gp-****.gpdb.rds.aliyuncs.com:3432/your-dbname", "your-tablename", "your-username", "password", schema, schema.getPrimaryKeys()));
        env.execute("Example");
    }

    // 对输入数据做map操作。
    public static class InputMap implements MapFunction<String, Row> {
        private static final long serialVersionUID = 1L;

        MySchema schema;

        public InputMap(MySchema schema) {
            this.schema = schema;
        }

        //@Override
        public Row map(String line) throws Exception {
            // normalize and split the line
            String[] arr = line.split(",");
            int columnLen = this.schema.getLength();
            if (arr.length == columnLen) {
                Row row = new Row(columnLen);
                for(int i = 0; i < columnLen; i++){
                    row.setField(i, arr[i]);
                }
                return row;
            }
            return null;
        }
    }

    public static class MySchema implements Serializable {
        private static final long serialVersionUID = 10L;
        private ArrayList<String> primaryKeys;
        private ArrayList<String> fieldNames;
        private ArrayList<Class<?>> types;
        private int length;
        public MySchema(ArrayList<String> primaryKeys, ArrayList<String> fieldNames, ArrayList<Class<?>> types){
            this.primaryKeys = primaryKeys;
            this.fieldNames = fieldNames;
            this.types = types;
            length = fieldNames.size();
        }
        public ArrayList<String> getPrimaryKeys() {
            return primaryKeys;
        }
        public int getLength(){
            return length;
        }
        public ArrayList<String> getFieldNames() {
            return fieldNames;
        }
        public ArrayList<Class<?>> getTypes() {
            return types;
        }
        public int getFieldIndex(String key) {
            for(int i=0 ; i < fieldNames.size();i++) {
                if (key.equalsIgnoreCase(fieldNames.get(i))) {
                    return i;
                }
            }
            return -1;
        }
    }

}
相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
目录
相关文章
|
4月前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
999 0
|
4月前
|
DataWorks 安全 关系型数据库
DataWorks产品使用合集之使用Flink CDC读取PostgreSQL数据时如何指定编码格式
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
3月前
|
SQL 关系型数据库 MySQL
如何在Dataphin中构建Flink+Paimon流式湖仓方案
当前大数据处理工业界非常重要的一个大趋势是一体化,尤其是湖仓一体架构。与过去分散的数据仓库和数据湖不同,湖仓一体架构通过将数据存储和处理融为一体,不仅提升了数据访问速度和处理效率,还简化了数据管理流程,降低了资源成本。企业可以更轻松地实现数据治理和分析,从而快速决策。paimon是国内开源的,也是最年轻的成员。 本文主要演示如何在 Dataphin 产品中构建 Flink+Paimon 的流式湖仓方案。
7659 10
如何在Dataphin中构建Flink+Paimon流式湖仓方案
|
3月前
|
关系型数据库 API Apache
Flink CDC:基于 Apache Flink 的流式数据集成框架
本文整理自阿里云 Flink SQL 团队研发工程师于喜千(yux)在 SECon 全球软件工程技术大会中数据集成专场沙龙的分享。
17990 11
Flink CDC:基于 Apache Flink 的流式数据集成框架
|
2月前
|
SQL 关系型数据库 MySQL
SQL Server、MySQL、PostgreSQL:主流数据库SQL语法异同比较——深入探讨数据类型、分页查询、表创建与数据插入、函数和索引等关键语法差异,为跨数据库开发提供实用指导
【8月更文挑战第31天】SQL Server、MySQL和PostgreSQL是当今最流行的关系型数据库管理系统,均使用SQL作为查询语言,但在语法和功能实现上存在差异。本文将比较它们在数据类型、分页查询、创建和插入数据以及函数和索引等方面的异同,帮助开发者更好地理解和使用这些数据库。尽管它们共用SQL语言,但每个系统都有独特的语法规则,了解这些差异有助于提升开发效率和项目成功率。
228 0
|
2月前
|
SQL 关系型数据库 HIVE
实时计算 Flink版产品使用问题之如何将PostgreSQL数据实时入库Hive并实现断点续传
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
SQL 监控 大数据
Serverless 应用的监控与调试问题之Flink流式数仓对于工商银行的数据链路要如何简化
Serverless 应用的监控与调试问题之Flink流式数仓对于工商银行的数据链路要如何简化
|
2月前
|
开发框架 关系型数据库 数据库
在 PostgreSQL 中,解决图片二进制数据,由于bytea_output参数问题导致显示不正常的问题。
在 PostgreSQL 中,解决图片二进制数据,由于bytea_output参数问题导致显示不正常的问题。
|
4月前
|
关系型数据库 5G PostgreSQL
postgreSQL 导出数据、导入
postgreSQL 导出数据、导入
49 1
|
4月前
|
运维 Cloud Native 关系型数据库
云原生数据仓库AnalyticDB产品使用合集之PostgreSQL版是否直接支持实时物化视图
阿里云AnalyticDB提供了全面的数据导入、查询分析、数据管理、运维监控等功能,并通过扩展功能支持与AI平台集成、跨地域复制与联邦查询等高级应用场景,为企业构建实时、高效、可扩展的数据仓库解决方案。以下是对AnalyticDB产品使用合集的概述,包括数据导入、查询分析、数据管理、运维监控、扩展功能等方面。
126 3

相关产品

  • 云数据库 RDS PostgreSQL 版