Hi,各位大佬,谁有空帮我看下这个问题
Source: Kafka SinkL Kafka
主要逻辑是 main_table left join merchatn_table 以后,使用 FIRST_VALUE 函数取第一条 transaction_id,我这个模式应该是 append 模式,但是结果好像不是
Error
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: AppendStreamTableSink requires that Table has only insert changes. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
Code
val main_column = "database
, table
, transaction_type
, transaction_id
, reference_id
, merchant_id
, event_time
, status
" val main_table = bsTableEnv.sqlQuery(s"SELECT $main_column FROM Keystats_airpay_consumer WHERE table
LIKE 'transaction_tab%' ") bsTableEnv.createTemporaryView("main_table", main_table)
val merchant_column = "transaction_sn, user_id" val merchant_table = bsTableEnv.sqlQuery(s"SELECT $merchant_column FROM Keystats_airpay_consumer WHERE table
LIKE 'wallet_id_merchant_db%' ") bsTableEnv.createTemporaryView("merchant_table", merchant_table)
bsTableEnv.sqlUpdate(""" INSERT INTO Keystats_airpay_producer | SELECT database
, table
, transaction_type
, | merchant_id
, event_time
, status
, | FIRST_VALUE(transaction_id
) OVER (PARTITION BY transaction_id
ORDER BY PROCTIME() RANGE UNBOUNDED PRECEDING) | FROM ( | SELECT database
, table
, transaction_type
, transaction_id
, | merchant_id
, event_time
, status
, reference_id
| FROM main_table | LEFT JOIN merchant_table | ON main_table.reference_id = merchant_table.transaction_sn | ) |""".stripMargin)
来自志愿者整理的flink邮件归档来自志愿者整理的FLINK邮件归档
你这个去重写法不太对,可以参考下官方文档的写法[1]
[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
来自志愿者整理的flink邮件归档来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。