开发者社区> 问答> 正文

flink table同时作为写出及输入时下游无数据

各位大佬好,请教一个问题,就是在flink内部定义一个表g_unit(初始为空),接受一个kafka源的写入,同时g_unit又要作为下游表g_summary的输入源,测试发现g_line表一直不会写入数据,代码如下,烦请大佬解答。

from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode

from pyflink.table import StreamTableEnvironment, EnvironmentSettings

env = StreamExecutionEnvironment.get_execution_environment()

env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

env.set_parallelism(1)

env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()

t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)

kafka_source_ddl = """

CREATE TABLE kafka_source_tab (

 id VARCHAR,    

 alarm_id VARCHAR,    

 trck_id VARCHAR 

) WITH (

 'connector' = 'kafka',

 'topic' = 'gg',    

 'scan.startup.mode' = 'specific-offsets',  

 'scan.startup.specific-offsets'='partition:1,offset:0',

 'properties.bootstrap.servers' = '****',

 'format' = 'json' 

)

"""

g_unit_sink_ddl = """

CREATE TABLE g_sink_unit (

 alarm_id VARCHAR,    

 trck_id VARCHAR 

 

) WITH (

 'connector' = 'jdbc',

 'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false',

 'table-name' = 'g_unit',    

 'username' = 'root',

 'password' = 'root',

 'sink.buffer-flush.interval' = '1s'     

)

"""

g_summary_ddl = """

CREATE TABLE g_summary_base(

 alarm_id VARCHAR,    

 trck_id VARCHAR 

) WITH (

 'connector' = 'jdbc',

 'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false',

 'table-name' = 'g_summary',  

 'username' = 'root',

 'password' = 'root',

 'sink.buffer-flush.interval' = '1s'

)

"""

t_env.execute_sql(kafka_source_ddl)

t_env.execute_sql(g_unit_sink_ddl)

t_env.execute_sql(g_summary_ddl)

sql1='''Insert into g_unit_sink_ddl select alarm_id,trck_id from kafka_source_tab'''

sql2='''Insert into g_summary_ddl select alarm_id,trck_id from g_unit_sink_ddl'''

stmt_set = t_env.create_statement_set()

stmt_set.add_insert_sql(sql1)

stmt_set.add_insert_sql(sql2)

stmt_set.execute().get_job_client().get_job_execution_result().result()

*来自志愿者整理的flink邮件归档

展开
收起
小阿矿 2021-12-06 16:46:31 844 0
1 条回答
写回答
取消 提交回答
  • 你可以先只跑第一个insert,然后check一下g_sink_unit是否有数据。

    另外,你可以把query 改为都读取 kafka_source_tab 再分别写到两个不同的sink:

    sql1='''Insert into g_sink_unit select alarm_id,trck_id from kafka_source_tab''' sql2='''Insert into g_summary_base select alarm_id,trck_id from kafka_source_tab;'''*来自志愿者整理的flink邮件归档

    2021-12-06 18:29:13
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载