开发者社区> 问答> 正文

Flink SQL FIRST_VALUE 是append 模式吗?

Hi,各位大佬,谁有空帮我看下这个问题

Source: Kafka SinkL Kafka

主要逻辑是 main_table left join merchatn_table 以后,使用 FIRST_VALUE 函数取第一条 transaction_id,我这个模式应该是 append 模式,但是结果好像不是

Error

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: AppendStreamTableSink requires that Table has only insert changes. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)

Code

val main_column = "database, table, transaction_type, transaction_id, reference_id, merchant_id, event_time, status" val main_table = bsTableEnv.sqlQuery(s"SELECT $main_column FROM Keystats_airpay_consumer WHERE table LIKE 'transaction_tab%' ") bsTableEnv.createTemporaryView("main_table", main_table)

val merchant_column = "transaction_sn, user_id" val merchant_table = bsTableEnv.sqlQuery(s"SELECT $merchant_column FROM Keystats_airpay_consumer WHERE table LIKE 'wallet_id_merchant_db%' ") bsTableEnv.createTemporaryView("merchant_table", merchant_table)

bsTableEnv.sqlUpdate(""" INSERT INTO Keystats_airpay_producer | SELECT database, table, transaction_type, | merchant_id, event_time, status, | FIRST_VALUE(transaction_id) OVER (PARTITION BY transaction_id ORDER BY PROCTIME() RANGE UNBOUNDED PRECEDING) | FROM ( | SELECT database, table, transaction_type, transaction_id, | merchant_id, event_time, status, reference_id | FROM main_table | LEFT JOIN merchant_table | ON main_table.reference_id = merchant_table.transaction_sn | ) |""".stripMargin)

来自志愿者整理的flink邮件归档来自志愿者整理的FLINK邮件归档

展开
收起
小阿怪 2021-12-04 19:08:21 656 0
1 条回答
写回答
取消 提交回答
  • 你这个去重写法不太对,可以参考下官方文档的写法[1]

    [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication

    来自志愿者整理的flink邮件归档来自志愿者整理的FLINK邮件归档

    2021-12-04 22:27:38
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
SQL Server在电子商务中的应用与实践 立即下载
GeoMesa on Spark SQL 立即下载
原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载