开发者社区> 问答> 正文

Flink SQL upsert-kafka connector 生成的 Stage Changel

Hi 社区。

Flink 1.12.1

现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition ,设置大的并发,对于只有

forword 的ETL没有作用。

insert into table_a select id,udf(a),b,c from table_b;

发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区

  1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置?

  2. 这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka 中生效吗?可以用在我上面说的场景上面吗?


== Physical Execution Plan ==

Stage 1 : Data Source

content : Source: TableSourceScan(table=[[default_catalog,

default_database, temp_table]], fields=[id...])



Stage 3 : Operator

content : ChangelogNormalize(key=[id])

ship_strategy : HASH



Stage 4 : Operator

content : Calc(select=[...])

ship_strategy : FORWARD



Stage 5 : Data Sink

content : Sink: Sink(table=[default_catalog.default_database.table_a],

fields=[id...])

ship_strategy : FORWARD

```*来自志愿者整理的flink邮件归档

展开
收起
EXCEED 2021-12-01 15:42:39 1348 0
1 条回答
写回答
取消 提交回答
    1. 对于 upsert-kafka 会默认加上 ChangelogNormalize

    2. ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json

    也能用,但是要加上 table.exec.source.cdc-events-duplicate = true

    参数[1]才能开启。但是要注意 ChangelogNormalize 是一个 stateful 节点,本身也是有性能开销的,总体性能可能还不如

    forward。[1]:

    https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/#table-exec-source-cdc-events-duplicate*来自志愿者整理的flink邮件归档

    2021-12-01 16:05:02
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载