如何基于Flink将流式数据实时写入AnalyticDB for PostgreSQL

本文涉及的产品
阿里云百炼推荐规格 ADB PostgreSQL,4核16GB 100GB 1个月
RDS PostgreSQL Serverless,0.5-4RCU 50GB 3个月
推荐场景:
对影评进行热评分析
简介: 本文主要介绍如何通过 Flink 将流式数据实时写入 ADB PG中,并给出代码demo。本文的Flink 为社区1.7.2版本,ADB PG为阿里云AnalyticDB for PostgreSQL 6.0版。

本文主要介绍如何通过 Flink 将流式数据实时写入 ADB PG中,并提供一个项目代码demo。

版本说明:
Flink 为社区1.7.2版本。
ADB PG为阿里云AnalyticDB for PostgreSQL 6.0版。

使用方法

使用 Flink 作为流处理计算引擎时,可以通过sink connector,将Flink中的数据写入到目标端 。
本文demo中以socketStream作为源端(data source),ADB PG作为目标端(data sink),并提供ADB PG sink connector代码样例 ,完成数据流式写入。
实际应用中,可能需要更改为对应的source connector ,并修改字段映射关系。

步骤一:在ADBPG目标库中建表

create table test(id int,name text);

步骤二:基于nc工具启动socket stream,并向9000端口写入数据

nc -l 9000
flink.png

步骤三:启动 flink demo

bin/flink run -c Adb4PgSinkDemo /root/flinktest-1.0-SNAPSHOT-jar-with-dependencies.jar
flink2.png

步骤四:目标端ADB PG观察数据同步情况

flink3.png

使用参数和设置说明

source connector 设置

demo的源端采用socket stream,可以参照flink官网换为kafka/ES/Hadoop 等source connector。
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/
对于不同的source stream一个通用的要求是每条数据的格式为:各列以英文逗号为分隔符拼接成单行文本。
例如我们想写入一条id=1,name=hello的数据,格式应转换为:

1,hello

sink connector 设置

sink端需要传入primaryKeys(主键字段), fieldNames(全部字段名), types(全部字段类型)三个参数来向ADB PG sink connector传入数据元信息,并传入自己的host address(格式为jdbc:postgresql://host:port/dbName),tableName,userName,password来初始化一个Adb4PgTableSink。

ArrayList<String> primaryKeys = new ArrayList<String>(Arrays.asList(new String[]{"id"}));
ArrayList<String> fieldNames = new ArrayList<String>(Arrays.asList(new String[]{"id", "name"}));
ArrayList<Class<?>> types = new ArrayList<Class<?>>(Arrays.asList(new Class<?>[]{Integer.class, String.class}));
MySchema schema = new MySchema(primaryKeys, fieldNames, types);

DataStream<Row> messageStream = sourceStream.map(new InputMap(schema));

messageStream.addSink(new Adb4PgTableSink("jdbc:postgresql://gp-****.gpdb.rds.aliyuncs.com:3432/your-dbname", "your-tablename", "your-username", "your-password", schema, schema.getPrimaryKeys()));

附:demo代码

完整项目代码请参考

demo 主流程代码如下

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.*;

public class Adb4PgSinkDemo extends RichSinkFunction<Row>{

    private transient static final Logger LOG = LoggerFactory.getLogger(Adb4PgSinkDemo.class);

    public static void main(String[] args) throws Exception {
        //设置环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //Source: 本地监听端口9000获取数据
        DataStream<String> sourceStream = env.socketTextStream("127.0.0.1", 9000);

        ArrayList<String> primaryKeys = new ArrayList<String>(Arrays.asList(new String[]{"id"}));
        ArrayList<String> fieldNames = new ArrayList<String>(Arrays.asList(new String[]{"id", "name"}));
        ArrayList<Class<?>> types = new ArrayList<Class<?>>(Arrays.asList(new Class<?>[]{Integer.class, String.class}));
        MySchema schema = new MySchema(primaryKeys, fieldNames, types);

        DataStream<Row> messageStream = sourceStream.map(new InputMap(schema));

        messageStream.addSink(new Adb4PgTableSink("jdbc:postgresql://gp-****.gpdb.rds.aliyuncs.com:3432/your-dbname", "your-tablename", "your-username", "password", schema, schema.getPrimaryKeys()));
        env.execute("Example");
    }

    // 对输入数据做map操作。
    public static class InputMap implements MapFunction<String, Row> {
        private static final long serialVersionUID = 1L;

        MySchema schema;

        public InputMap(MySchema schema) {
            this.schema = schema;
        }

        //@Override
        public Row map(String line) throws Exception {
            // normalize and split the line
            String[] arr = line.split(",");
            int columnLen = this.schema.getLength();
            if (arr.length == columnLen) {
                Row row = new Row(columnLen);
                for(int i = 0; i < columnLen; i++){
                    row.setField(i, arr[i]);
                }
                return row;
            }
            return null;
        }
    }

    public static class MySchema implements Serializable {
        private static final long serialVersionUID = 10L;
        private ArrayList<String> primaryKeys;
        private ArrayList<String> fieldNames;
        private ArrayList<Class<?>> types;
        private int length;
        public MySchema(ArrayList<String> primaryKeys, ArrayList<String> fieldNames, ArrayList<Class<?>> types){
            this.primaryKeys = primaryKeys;
            this.fieldNames = fieldNames;
            this.types = types;
            length = fieldNames.size();
        }
        public ArrayList<String> getPrimaryKeys() {
            return primaryKeys;
        }
        public int getLength(){
            return length;
        }
        public ArrayList<String> getFieldNames() {
            return fieldNames;
        }
        public ArrayList<Class<?>> getTypes() {
            return types;
        }
        public int getFieldIndex(String key) {
            for(int i=0 ; i < fieldNames.size();i++) {
                if (key.equalsIgnoreCase(fieldNames.get(i))) {
                    return i;
                }
            }
            return -1;
        }
    }

}
相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
目录
相关文章
|
3月前
|
DataWorks 负载均衡 Serverless
实时数仓 Hologres产品使用合集之如何导入大量数据
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
3月前
|
SQL 消息中间件 OLAP
OneSQL OLAP实践问题之实时数仓中数据的分层如何解决
OneSQL OLAP实践问题之实时数仓中数据的分层如何解决
53 1
|
3月前
|
SQL DataWorks 数据库连接
实时数仓 Hologres操作报错合集之如何将物理表数据写入临时表
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
3月前
|
SQL 分布式计算 关系型数据库
实时数仓 Hologres操作报错合集之指定主键更新模式报错主键数据重复,该如何处理
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
3月前
|
SQL 分布式计算 MaxCompute
实时数仓 Hologres产品使用合集之如何在插入数据后获取自增的id值
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
实时数仓 Hologres产品使用合集之如何在插入数据后获取自增的id值
|
3月前
|
SQL 关系型数据库 HIVE
实时计算 Flink版产品使用问题之如何将PostgreSQL数据实时入库Hive并实现断点续传
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
存储 搜索推荐 关系型数据库
实时数仓 Hologres产品使用合集之如何在新增列的时候将历史数据也补上默认值
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
3月前
|
存储 关系型数据库 分布式数据库
实时数仓 Hologres产品使用合集之对于大量数据的写入,该如何优化
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
3月前
|
消息中间件 SQL Java
实时数仓 Hologres产品使用合集之如何用python将kafka数据写入
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
4月前
|
分布式计算 DataWorks 关系型数据库
阿里云数加-分析型数据库AnalyticDB数据导入的多样化策略
通过合理利用这些数据导入方法,用户可以充分发挥AnalyticDB的实时计算能力和高并发查询性能,为业务分析和决策提供强有力的数据支持。

相关产品

  • 云数据库 RDS PostgreSQL 版