自定义结果表

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介:

自定义结果表

CustomSink接口


   
   
  1. public abstract class CustomSinkBase implements Serializable{
  2. protected Map<String,String> userParamsMap; // 您在sql with语句中定义的键值对,但所有的键均为小写
  3. protected Set<String> primaryKeys; // 您定义的主键字段名
  4. protected List<String> headerFields;// 标记为header的字段列表
  5. protected RowTypeInfo rowTypeInfo;// 字段类型和名称
  6. /**
  7. * 初始化方法
  8. * @param taskNumber 当前节点是第几个并发
  9. * @param numTasks sink节点的并发数
  10. * @throws IOException
  11. */
  12. public abstract void open(int taskNumber, int numTasks) throws IOException;
  13. /**
  14. * close方法,释放资源
  15. * @throws IOException
  16. */
  17. public abstract void close() throws IOException;
  18. /**
  19. * 处理插入单行数据, Row中按到ddl定义的顺序
  20. *
  21. * @param row
  22. * @throws IOException
  23. */
  24. public abstract void writeAddRecord(Row row) throws IOException;
  25. /**
  26. * 处理删除单行数据,delete产生的原因可以参考:https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming.html
  27. * @param row
  28. * @throws IOException
  29. */
  30. public abstract void writeDeleteRecord(Row row) throws IOException;
  31. /**
  32. * 如果有攒批进行批量插入,需要在该方法中实现清空该节点操作
  33. * @throws IOException
  34. */
  35. public abstract void sync() throws IOException;
  36. /**
  37. * 返回用来标识sink的名字
  38. * @throws IOException
  39. */
  40. public abstract String getName();
  41. }
  1. 实现的自定义结果表需要继承CustomSinkBase。
  2. 自定义结果表需要有一个无参构造函数,结果表的初始化工作,可以在open里通过读取userParamsMap中的配置参数进行初始化。
  3. 自定义结果表的项目工程需要添加如下jar包依赖。

    ① blink-connector-custom-blink-2.0-SNAPSHOT.jar

    • 文件:mvn install:install-file -DgroupId=com.alibaba.blink -DartifactId=blink-connector-custom -Dversion=blink-2.0-SNAPSHOT -Dpackaging=jar -Dfile=blink-connector-custom-blink-2.0-SNAPSHOT.jar
    • maven依赖:
      
             
             
      1. <dependency>
      2. <groupId>com.alibaba.blink</groupId>
      3. <artifactId>blink-connector-custom</artifactId>
      4. <version>blink-2.0-SNAPSHOT</version>
      5. <scope>provided</scope>
      6. </dependency>

    ② flink-core-blink-2.0-SNAPSHOT.jar

    • 文件:mvn install:install-file -DgroupId=org.apache.flink -DartifactId=flink-core -Dversion=blink-2.0-SNAPSHOT -Dpackaging=jar -Dfile=flink-core-blink-2.0-SNAPSHOT.jar
    • maven依赖:
      
             
             
      1. <dependency>
      2. <groupId>org.apache.flink</groupId>
      3. <artifactId>flink-core</artifactId>
      4. <version>blink-2.0-SNAPSHOT</version>
      5. <scope>provided</scope>
      6. </dependency>

用法

代码如下。

   
   
  1. public class UDPrintSink extends CustomSinkBase {
  2. private static Logger LOG = LoggerFactory.getLogger(UDPrintSink.class);
  3. public void open(int taskNumber, int numTasks) throws IOException {
  4. LOG.info(String.format("Open Method Called: taskNumber %d numTasks %d", taskNumber, numTasks));
  5. String[] filedNames = rowTypeInfo.getFieldNames();
  6. TypeInformation[] typeInformations = rowTypeInfo.getFieldTypes();
  7. LOG.info(String.format("Open Method Called: filedNames %d typeInformations %d", filedNames.length, typeInformations.length));
  8. }
  9. public void close() throws IOException {
  10. LOG.info(String.format("Close Method Called"));
  11. }
  12. public void writeAddRecord(Row row) throws IOException {
  13. LOG.info("Write: " + row.toString());
  14. }
  15. public void writeDeleteRecord(Row row) throws IOException {
  16. LOG.info("Delete: " + row.toString());
  17. }
  18. public void sync() throws IOException {
  19. //没有做攒批写入,空置该方法
  20. }
  21. public String getName() {
  22. return "UDPrintSink";
  23. }
  24. }
将代码打成一个jar,上传到系统中并引用。

jar

使用自定义结果表的DDL。

   
   
  1. create table customPrint (
  2. a int,
  3. b BIGINT,
  4. c VARCHAR
  5. ) with (
  6. type = 'custom',
  7. class = 'com.alibaba.blink.connector.custom.demo.UDPrintSink'
  8. [,...]
  9. );

说明:with参数

参数名 意义
type 填写custom声明这是一个自定义结果表。
class 填写在jar中实现结果表的类名
自定义参数 自行设定,open 函数中可以通过 userParamsMap 获取。

示例

上下游存储DDL

   
   
  1. --源 DDL
  2. create table suorce_name (
  3. a int,
  4. b BIGINT
  5. ) with (
  6. type = 'XXXX'
  7. ......
  8. );
  9. --自定义sink ddl
  10. create table customPrint (
  11. a int,
  12. b BIGINT,
  13. PRIMARY KEY(a)
  14. ) with (
  15. type = 'custom',
  16. class = 'UDPrintSink' --类名
  17. );
写入结果表的数据SQL语句

   
   
  1. INSERT INTO customPrint
  2. SELECT
  3. *
  4. FROM suorce_name
测试数据
a b
1 1234
2 56789
结果数据
a b
1 1234
2 56789
本文转自实时计算—— 自定义结果表
相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
分布式计算 DataWorks 关系型数据库
DataWorks操作报错合集之新建MAXComputer数据源时,如何解决报错ODPS-0420095: Access Denied
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
4天前
|
弹性计算 关系型数据库 微服务
基于 Docker 与 Kubernetes(K3s)的微服务:阿里云生产环境扩容实践
在微服务架构中,如何实现“稳定扩容”与“成本可控”是企业面临的核心挑战。本文结合 Python FastAPI 微服务实战,详解如何基于阿里云基础设施,利用 Docker 封装服务、K3s 实现容器编排,构建生产级微服务架构。内容涵盖容器构建、集群部署、自动扩缩容、可观测性等关键环节,适配阿里云资源特性与服务生态,助力企业打造低成本、高可靠、易扩展的微服务解决方案。
1109 1
|
3天前
|
机器学习/深度学习 人工智能 前端开发
通义DeepResearch全面开源!同步分享可落地的高阶Agent构建方法论
通义研究团队开源发布通义 DeepResearch —— 首个在性能上可与 OpenAI DeepResearch 相媲美、并在多项权威基准测试中取得领先表现的全开源 Web Agent。
562 10
|
13天前
|
人工智能 运维 安全
|
4天前
|
弹性计算 Kubernetes jenkins
如何在 ECS/EKS 集群中有效使用 Jenkins
本文探讨了如何将 Jenkins 与 AWS ECS 和 EKS 集群集成,以构建高效、灵活且具备自动扩缩容能力的 CI/CD 流水线,提升软件交付效率并优化资源成本。
305 0
|
11天前
|
人工智能 异构计算
敬请锁定《C位面对面》,洞察通用计算如何在AI时代持续赋能企业创新,助力业务发展!
敬请锁定《C位面对面》,洞察通用计算如何在AI时代持续赋能企业创新,助力业务发展!
|
12天前
|
机器学习/深度学习 人工智能 自然语言处理
B站开源IndexTTS2,用极致表现力颠覆听觉体验
在语音合成技术不断演进的背景下,早期版本的IndexTTS虽然在多场景应用中展现出良好的表现,但在情感表达的细腻度与时长控制的精准性方面仍存在提升空间。为了解决这些问题,并进一步推动零样本语音合成在实际场景中的落地能力,B站语音团队对模型架构与训练策略进行了深度优化,推出了全新一代语音合成模型——IndexTTS2 。
815 23
|
4天前
|
缓存 供应链 监控
VVIC seller_search 排行榜搜索接口深度分析及 Python 实现
VVIC搜款网seller_search接口提供服装批发市场的商品及商家排行榜数据,涵盖热销榜、销量排名、类目趋势等,支持多维度筛选与数据分析,助力选品决策、竞品分析与市场预测,为服装供应链提供有力数据支撑。

热门文章

最新文章