开发者社区> 问答> 正文

flink 1.11 upsert结果出错

各位大佬好,请教一个问题flink从Kafka读数,写入mysql,对mysql结果根据主键进行数据更新,看官网是支持“on DUPLICATE”的,但是在执行中报错是这个导致的语法问题。完整代码如下,是在linux下,直接python *.py执行的。请问下这个是不支持吗,还是怎么写呢!

from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode

from pyflink.table import StreamTableEnvironment, EnvironmentSettings

source="""

CREATE TABLE kafka_source_tab (     

 trck_id VARCHAR,

 score  INT

) WITH (

 'connector' = 'kafka',

 'topic' = 'alarm_test_g',   

 'scan.startup.mode' = 'earliest-offset',

 'properties.bootstrap.servers' = '10.2.2.73:2181',

 'properties.bootstrap.servers' = '10.2.2.73:9092',

 'format' = 'json' 

)

"""

sink="""

CREATE TABLE g_source_tab (

 trck_id VARCHAR,

 score  INT,

PRIMARY KEY (trck_id) NOT ENFORCED

) WITH (

 'connector' = 'jdbc',

 'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false', 

 'table-name' = 'g',   

 'username' = 'root',

 'password' = '123456t',

 'sink.buffer-flush.interval' = '1s'

)

"""

env = StreamExecutionEnvironment.get_execution_environment()

env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

env.set_parallelism(1)

env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()

t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)

t_env.execute_sql(source)

t_env.execute_sql(sink)

table_result1=t_env.execute_sql('''Insert into g_source_tab (trck_id,score) VALUES (select

                       trck_id,score from kafka_source_tab ) ON DUPLICATE KEY UPDATE score=score+1''')

table_result1.get_job_client().get_job_execution_result().result()

*来自志愿者整理的flink邮件归档

展开
收起
小阿矿 2021-12-06 15:50:16 793 0
1 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
基于 Flink SQL + Paimon 构建流式湖仓新方 立即下载
Flink SQL in 2020 立即下载
Flink Streaming SQL 2018 立即下载