自定义结果表

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介:

自定义结果表

CustomSink接口


   
   
  1. public abstract class CustomSinkBase implements Serializable{
  2. protected Map<String,String> userParamsMap; // 您在sql with语句中定义的键值对,但所有的键均为小写
  3. protected Set<String> primaryKeys; // 您定义的主键字段名
  4. protected List<String> headerFields;// 标记为header的字段列表
  5. protected RowTypeInfo rowTypeInfo;// 字段类型和名称
  6. /**
  7. * 初始化方法
  8. * @param taskNumber 当前节点是第几个并发
  9. * @param numTasks sink节点的并发数
  10. * @throws IOException
  11. */
  12. public abstract void open(int taskNumber, int numTasks) throws IOException;
  13. /**
  14. * close方法,释放资源
  15. * @throws IOException
  16. */
  17. public abstract void close() throws IOException;
  18. /**
  19. * 处理插入单行数据, Row中按到ddl定义的顺序
  20. *
  21. * @param row
  22. * @throws IOException
  23. */
  24. public abstract void writeAddRecord(Row row) throws IOException;
  25. /**
  26. * 处理删除单行数据,delete产生的原因可以参考:https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming.html
  27. * @param row
  28. * @throws IOException
  29. */
  30. public abstract void writeDeleteRecord(Row row) throws IOException;
  31. /**
  32. * 如果有攒批进行批量插入,需要在该方法中实现清空该节点操作
  33. * @throws IOException
  34. */
  35. public abstract void sync() throws IOException;
  36. /**
  37. * 返回用来标识sink的名字
  38. * @throws IOException
  39. */
  40. public abstract String getName();
  41. }
  1. 实现的自定义结果表需要继承CustomSinkBase。
  2. 自定义结果表需要有一个无参构造函数,结果表的初始化工作,可以在open里通过读取userParamsMap中的配置参数进行初始化。
  3. 自定义结果表的项目工程需要添加如下jar包依赖。

    ① blink-connector-custom-blink-2.0-SNAPSHOT.jar

    • 文件:mvn install:install-file -DgroupId=com.alibaba.blink -DartifactId=blink-connector-custom -Dversion=blink-2.0-SNAPSHOT -Dpackaging=jar -Dfile=blink-connector-custom-blink-2.0-SNAPSHOT.jar
    • maven依赖:
      
             
             
      1. <dependency>
      2. <groupId>com.alibaba.blink</groupId>
      3. <artifactId>blink-connector-custom</artifactId>
      4. <version>blink-2.0-SNAPSHOT</version>
      5. <scope>provided</scope>
      6. </dependency>

    ② flink-core-blink-2.0-SNAPSHOT.jar

    • 文件:mvn install:install-file -DgroupId=org.apache.flink -DartifactId=flink-core -Dversion=blink-2.0-SNAPSHOT -Dpackaging=jar -Dfile=flink-core-blink-2.0-SNAPSHOT.jar
    • maven依赖:
      
             
             
      1. <dependency>
      2. <groupId>org.apache.flink</groupId>
      3. <artifactId>flink-core</artifactId>
      4. <version>blink-2.0-SNAPSHOT</version>
      5. <scope>provided</scope>
      6. </dependency>

用法

代码如下。

   
   
  1. public class UDPrintSink extends CustomSinkBase {
  2. private static Logger LOG = LoggerFactory.getLogger(UDPrintSink.class);
  3. public void open(int taskNumber, int numTasks) throws IOException {
  4. LOG.info(String.format("Open Method Called: taskNumber %d numTasks %d", taskNumber, numTasks));
  5. String[] filedNames = rowTypeInfo.getFieldNames();
  6. TypeInformation[] typeInformations = rowTypeInfo.getFieldTypes();
  7. LOG.info(String.format("Open Method Called: filedNames %d typeInformations %d", filedNames.length, typeInformations.length));
  8. }
  9. public void close() throws IOException {
  10. LOG.info(String.format("Close Method Called"));
  11. }
  12. public void writeAddRecord(Row row) throws IOException {
  13. LOG.info("Write: " + row.toString());
  14. }
  15. public void writeDeleteRecord(Row row) throws IOException {
  16. LOG.info("Delete: " + row.toString());
  17. }
  18. public void sync() throws IOException {
  19. //没有做攒批写入,空置该方法
  20. }
  21. public String getName() {
  22. return "UDPrintSink";
  23. }
  24. }
将代码打成一个jar,上传到系统中并引用。

jar

使用自定义结果表的DDL。

   
   
  1. create table customPrint (
  2. a int,
  3. b BIGINT,
  4. c VARCHAR
  5. ) with (
  6. type = 'custom',
  7. class = 'com.alibaba.blink.connector.custom.demo.UDPrintSink'
  8. [,...]
  9. );

说明:with参数

参数名 意义
type 填写custom声明这是一个自定义结果表。
class 填写在jar中实现结果表的类名
自定义参数 自行设定,open 函数中可以通过 userParamsMap 获取。

示例

上下游存储DDL

   
   
  1. --源 DDL
  2. create table suorce_name (
  3. a int,
  4. b BIGINT
  5. ) with (
  6. type = 'XXXX'
  7. ......
  8. );
  9. --自定义sink ddl
  10. create table customPrint (
  11. a int,
  12. b BIGINT,
  13. PRIMARY KEY(a)
  14. ) with (
  15. type = 'custom',
  16. class = 'UDPrintSink' --类名
  17. );
写入结果表的数据SQL语句

   
   
  1. INSERT INTO customPrint
  2. SELECT
  3. *
  4. FROM suorce_name
测试数据
a b
1 1234
2 56789
结果数据
a b
1 1234
2 56789
本文转自实时计算—— 自定义结果表
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
6月前
|
存储 数据库 Python
视图函数中创建模型, 并设置外键
视图函数中创建模型, 并设置外键。
34 1
|
存储 SQL 关系型数据库
第10章_创建和管理表
第10章_创建和管理表
77 0
|
关系型数据库 MySQL 数据库连接
SQLAlchemy关联表删除策略设置
SQLAlchemy关联表删除策略设置
|
存储 SQL 关系型数据库
创建管理表(上)
创建管理表(上)
51 0
JPA自定义查询
JPA自定义查询
|
存储 SQL 关系型数据库
|
存储 SQL 数据可视化
10_创建和管理表
10_创建和管理表
78 0
|
算法 前端开发
前端算法-Excel表的列名称
前端算法-Excel表的列名称
|
存储 SQL Java
Mysql数据库表字段设计优化(状态列)
初始状态码(java int 32 long 64),int 可以表示31种(除去0000),long可以表示63种(除去0000),当然不可能将0000赋值给初始状态,一般来讲,选择int还是long是根据具体业务需求来决定的。
581 0
Mysql数据库表字段设计优化(状态列)
|
SQL 数据库
动态批量新建SQL数据库中的表
动态批量新建SQL数据库中的表
175 0