自定义结果表
CustomSink接口
public abstract class CustomSinkBase implements Serializable{
protected Map<String,String> userParamsMap; // 您在sql with语句中定义的键值对,但所有的键均为小写
protected Set<String> primaryKeys; // 您定义的主键字段名
protected List<String> headerFields;// 标记为header的字段列表
protected RowTypeInfo rowTypeInfo;// 字段类型和名称
/**
* 初始化方法
* @param taskNumber 当前节点是第几个并发
* @param numTasks sink节点的并发数
* @throws IOException
*/
public abstract void open(int taskNumber, int numTasks) throws IOException;
/**
* close方法,释放资源
* @throws IOException
*/
public abstract void close() throws IOException;
/**
* 处理插入单行数据, Row中按到ddl定义的顺序
*
* @param row
* @throws IOException
*/
public abstract void writeAddRecord(Row row) throws IOException;
/**
* 处理删除单行数据,delete产生的原因可以参考:https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming.html
* @param row
* @throws IOException
*/
public abstract void writeDeleteRecord(Row row) throws IOException;
/**
* 如果有攒批进行批量插入,需要在该方法中实现清空该节点操作
* @throws IOException
*/
public abstract void sync() throws IOException;
/**
* 返回用来标识sink的名字
* @throws IOException
*/
public abstract String getName();
}
- 实现的自定义结果表需要继承CustomSinkBase。
- 自定义结果表需要有一个无参构造函数,结果表的初始化工作,可以在open里通过读取userParamsMap中的配置参数进行初始化。
自定义结果表的项目工程需要添加如下jar包依赖。
① blink-connector-custom-blink-2.0-SNAPSHOT.jar
- 文件:
mvn install:install-file -DgroupId=com.alibaba.blink -DartifactId=blink-connector-custom -Dversion=blink-2.0-SNAPSHOT -Dpackaging=jar -Dfile=blink-connector-custom-blink-2.0-SNAPSHOT.jar
- maven依赖:
<dependency>
<groupId>com.alibaba.blink</groupId>
<artifactId>blink-connector-custom</artifactId>
<version>blink-2.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
② flink-core-blink-2.0-SNAPSHOT.jar
- 文件:
mvn install:install-file -DgroupId=org.apache.flink -DartifactId=flink-core -Dversion=blink-2.0-SNAPSHOT -Dpackaging=jar -Dfile=flink-core-blink-2.0-SNAPSHOT.jar
- maven依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>blink-2.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
- 文件:
用法
代码如下。
public class UDPrintSink extends CustomSinkBase {
private static Logger LOG = LoggerFactory.getLogger(UDPrintSink.class);
public void open(int taskNumber, int numTasks) throws IOException {
LOG.info(String.format("Open Method Called: taskNumber %d numTasks %d", taskNumber, numTasks));
String[] filedNames = rowTypeInfo.getFieldNames();
TypeInformation[] typeInformations = rowTypeInfo.getFieldTypes();
LOG.info(String.format("Open Method Called: filedNames %d typeInformations %d", filedNames.length, typeInformations.length));
}
public void close() throws IOException {
LOG.info(String.format("Close Method Called"));
}
public void writeAddRecord(Row row) throws IOException {
LOG.info("Write: " + row.toString());
}
public void writeDeleteRecord(Row row) throws IOException {
LOG.info("Delete: " + row.toString());
}
public void sync() throws IOException {
//没有做攒批写入,空置该方法
}
public String getName() {
return "UDPrintSink";
}
}
将代码打成一个jar,上传到系统中并引用。
使用自定义结果表的DDL。
create table customPrint (
a int,
b BIGINT,
c VARCHAR
) with (
type = 'custom',
class = 'com.alibaba.blink.connector.custom.demo.UDPrintSink'
[,...]
);
说明:with参数
参数名 意义 type 填写 custom
声明这是一个自定义结果表。class 填写在jar中实现结果表的类名 自定义参数 自行设定, open
函数中可以通过userParamsMap
获取。
示例
上下游存储DDL
--源 DDL
create table suorce_name (
a int,
b BIGINT
) with (
type = 'XXXX'
......
);
--自定义sink ddl
create table customPrint (
a int,
b BIGINT,
PRIMARY KEY(a)
) with (
type = 'custom',
class = 'UDPrintSink' --类名
);
写入结果表的数据SQL语句
INSERT INTO customPrint
SELECT
*
FROM suorce_name
测试数据
a | b |
---|---|
1 | 1234 |
2 | 56789 |
结果数据
a | b |
---|---|
1 | 1234 |
2 | 56789 |
本文转自实时计算——
自定义结果表