开发者社区> 问答> 正文

实现 UpsertStreamTableSink, BatchTableSink 接口代码

大家好!

伙伴们,附件有实现 blink 中 flink-table 模块 UpsertStreamTableSink, BatchTableSink 接口代码 ,自实现类放在 flink-jdbc 模块 org.apache.flink.api.java.io.jdbc 包下。大神们帮忙看看呗! 江湖救急啊!

目前实现后,在代码中调用,报异常: Exception in thread "main" org.apache.flink.table.api.TableException: Arity [4] of result [[Lorg.apache.flink.api.common.typeinfo.TypeInformation;@377dfb8d] does not match the number[2] of requested type [Java Tuple2<Boolean, Row(ido: String, nameo: String, moneyo: String, timeo: String)>].

主要盲点: 1、要怎么匹配上这个类型 Tuple2<Boolean, Row> ?这里面决定 update 或 delete 的 Boolean型值 怎么赋? Row 映射进去的底层原理? 2、这两个改怎么重写?另外 keys 和 isAppendOnly 调用时该怎么赋值? @Override public void setKeyFields(String[] keys) {} @Override public void setIsAppendOnly(Boolean isAppendOnly){}*来自志愿者整理的flink邮件归档

展开
收起
毛毛虫雨 2021-12-07 12:43:32 576 0
1 条回答
写回答
取消 提交回答
  • sql: select EVENTTIME,ID,EVENT_ID,MSISDN,TS from (select a.*,ROW_NUMBER() over(partition by EVENT_ID,MSISDN order by TS desc) AS rw from table1 a ) where rw = 1

    tableEnv.toRetractStream(结果表, Row.class).print();

    输出结果,分析结果发现,第二条的  1553652720961584  比第一条的时间 1553652720927835 更大,同时输出一条 false 的,数据结果与第一条相同,説明第三条是用来作删除操作,删掉第一条数据。。。

    (true,2019-03-27 02:12:00.0,1243296274875303847,"1c3.2729.20190327021200",XXXXXX,1553652720927835) (true,2019-03-27 02:12:00.0,1243296274875303910,"1c3.2729.20190327021200",XXXXXX,1553652720961584) (false,2019-03-27 02:12:00.0,1243296274875303847,"1c3.2729.20190327021200",XXXXXX,1553652720927835)

    结论:true 是用来插入数据的,false 是用来删除数据的,出现false时一定会有一条之前插入的数据 。。。*来自志愿者整理的flink

    2021-12-07 15:31:18
    赞同 展开评论 打赏
问答地址:
问答排行榜
最热
最新

相关电子书

更多
《0代码搭应用》 立即下载
不止代码 立即下载
继承与功能组合 立即下载