各位大佬好,请教一个问题,就是在flink内部定义一个表g_unit(初始为空),接受一个kafka源的写入,同时g_unit又要作为下游表g_summary的输入源,测试发现g_line表一直不会写入数据,代码如下,烦请大佬解答。
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
kafka_source_ddl = """
CREATE TABLE kafka_source_tab (
id VARCHAR,
alarm_id VARCHAR,
trck_id VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'gg',
'scan.startup.mode' = 'specific-offsets',
'scan.startup.specific-offsets'='partition:1,offset:0',
'properties.bootstrap.servers' = '****',
'format' = 'json'
)
"""
g_unit_sink_ddl = """
CREATE TABLE g_sink_unit (
alarm_id VARCHAR,
trck_id VARCHAR
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false',
'table-name' = 'g_unit',
'username' = 'root',
'password' = 'root',
'sink.buffer-flush.interval' = '1s'
)
"""
g_summary_ddl = """
CREATE TABLE g_summary_base(
alarm_id VARCHAR,
trck_id VARCHAR
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false',
'table-name' = 'g_summary',
'username' = 'root',
'password' = 'root',
'sink.buffer-flush.interval' = '1s'
)
"""
t_env.execute_sql(kafka_source_ddl)
t_env.execute_sql(g_unit_sink_ddl)
t_env.execute_sql(g_summary_ddl)
sql1='''Insert into g_unit_sink_ddl select alarm_id,trck_id from kafka_source_tab'''
sql2='''Insert into g_summary_ddl select alarm_id,trck_id from g_unit_sink_ddl'''
stmt_set = t_env.create_statement_set()
stmt_set.add_insert_sql(sql1)
stmt_set.add_insert_sql(sql2)
stmt_set.execute().get_job_client().get_job_execution_result().result()
*来自志愿者整理的flink邮件归档
你可以先只跑第一个insert,然后check一下g_sink_unit是否有数据。
另外,你可以把query 改为都读取 kafka_source_tab 再分别写到两个不同的sink:
sql1='''Insert into g_sink_unit select alarm_id,trck_id from kafka_source_tab''' sql2='''Insert into g_summary_base select alarm_id,trck_id from kafka_source_tab;'''*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。