本文主要介绍如何通过 Flink 将流式数据实时写入 ADB PG中,并提供一个项目代码demo。
版本说明:
Flink 为社区1.7.2版本。
ADB PG为阿里云AnalyticDB for PostgreSQL 6.0版。
使用方法
使用 Flink 作为流处理计算引擎时,可以通过sink connector,将Flink中的数据写入到目标端 。
本文demo中以socketStream作为源端(data source),ADB PG作为目标端(data sink),并提供ADB PG sink connector代码样例 ,完成数据流式写入。
实际应用中,可能需要更改为对应的source connector ,并修改字段映射关系。
步骤一:在ADBPG目标库中建表
create table test(id int,name text);
步骤二:基于nc工具启动socket stream,并向9000端口写入数据
nc -l 9000
步骤三:启动 flink demo
bin/flink run -c Adb4PgSinkDemo /root/flinktest-1.0-SNAPSHOT-jar-with-dependencies.jar
步骤四:目标端ADB PG观察数据同步情况
使用参数和设置说明
source connector 设置
demo的源端采用socket stream,可以参照flink官网换为kafka/ES/Hadoop 等source connector。
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/
对于不同的source stream一个通用的要求是每条数据的格式为:各列以英文逗号为分隔符拼接成单行文本。
例如我们想写入一条id=1,name=hello的数据,格式应转换为:
1,hello
sink connector 设置
sink端需要传入primaryKeys(主键字段), fieldNames(全部字段名), types(全部字段类型)三个参数来向ADB PG sink connector传入数据元信息,并传入自己的host address(格式为jdbc:postgresql://host:port/dbName),tableName,userName,password来初始化一个Adb4PgTableSink。
ArrayList<String> primaryKeys = new ArrayList<String>(Arrays.asList(new String[]{"id"}));
ArrayList<String> fieldNames = new ArrayList<String>(Arrays.asList(new String[]{"id", "name"}));
ArrayList<Class<?>> types = new ArrayList<Class<?>>(Arrays.asList(new Class<?>[]{Integer.class, String.class}));
MySchema schema = new MySchema(primaryKeys, fieldNames, types);
DataStream<Row> messageStream = sourceStream.map(new InputMap(schema));
messageStream.addSink(new Adb4PgTableSink("jdbc:postgresql://gp-****.gpdb.rds.aliyuncs.com:3432/your-dbname", "your-tablename", "your-username", "your-password", schema, schema.getPrimaryKeys()));
附:demo代码
完整项目代码请参考
demo 主流程代码如下
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.*;
public class Adb4PgSinkDemo extends RichSinkFunction<Row>{
private transient static final Logger LOG = LoggerFactory.getLogger(Adb4PgSinkDemo.class);
public static void main(String[] args) throws Exception {
//设置环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//Source: 本地监听端口9000获取数据
DataStream<String> sourceStream = env.socketTextStream("127.0.0.1", 9000);
ArrayList<String> primaryKeys = new ArrayList<String>(Arrays.asList(new String[]{"id"}));
ArrayList<String> fieldNames = new ArrayList<String>(Arrays.asList(new String[]{"id", "name"}));
ArrayList<Class<?>> types = new ArrayList<Class<?>>(Arrays.asList(new Class<?>[]{Integer.class, String.class}));
MySchema schema = new MySchema(primaryKeys, fieldNames, types);
DataStream<Row> messageStream = sourceStream.map(new InputMap(schema));
messageStream.addSink(new Adb4PgTableSink("jdbc:postgresql://gp-****.gpdb.rds.aliyuncs.com:3432/your-dbname", "your-tablename", "your-username", "password", schema, schema.getPrimaryKeys()));
env.execute("Example");
}
// 对输入数据做map操作。
public static class InputMap implements MapFunction<String, Row> {
private static final long serialVersionUID = 1L;
MySchema schema;
public InputMap(MySchema schema) {
this.schema = schema;
}
//@Override
public Row map(String line) throws Exception {
// normalize and split the line
String[] arr = line.split(",");
int columnLen = this.schema.getLength();
if (arr.length == columnLen) {
Row row = new Row(columnLen);
for(int i = 0; i < columnLen; i++){
row.setField(i, arr[i]);
}
return row;
}
return null;
}
}
public static class MySchema implements Serializable {
private static final long serialVersionUID = 10L;
private ArrayList<String> primaryKeys;
private ArrayList<String> fieldNames;
private ArrayList<Class<?>> types;
private int length;
public MySchema(ArrayList<String> primaryKeys, ArrayList<String> fieldNames, ArrayList<Class<?>> types){
this.primaryKeys = primaryKeys;
this.fieldNames = fieldNames;
this.types = types;
length = fieldNames.size();
}
public ArrayList<String> getPrimaryKeys() {
return primaryKeys;
}
public int getLength(){
return length;
}
public ArrayList<String> getFieldNames() {
return fieldNames;
}
public ArrayList<Class<?>> getTypes() {
return types;
}
public int getFieldIndex(String key) {
for(int i=0 ; i < fieldNames.size();i++) {
if (key.equalsIgnoreCase(fieldNames.get(i))) {
return i;
}
}
return -1;
}
}
}