开发者社区> 问答> 正文

flink1.11 pyflink stream job 退出

python flink_cep_example.py 过几秒就退出了,应该一直运行不退出的啊。 代码如下,使用了MATCH_RECOGNIZE:

s_env = StreamExecutionEnvironment.get_execution_environment() b_s_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build() st_env = StreamTableEnvironment.create(s_env, environment_settings=b_s_settings) configuration = st_env.get_config().get_configuration() configuration.set_string("taskmanager.memory.task.off-heap.size", "500m")

s_env.set_parallelism(1)

kafka_source = """CREATE TABLE source ( flow_name STRING, flow_id STRING, component STRING, filename STRING, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'cep', 'properties.bootstrap.servers' = 'localhost:9092', 'format' = 'json', 'scan.startup.mode' = 'latest-offset' )"""

postgres_sink = """ CREATE TABLE cep_result ( filename STRING, start_tstamp TIMESTAMP(3), end_tstamp TIMESTAMP(3) ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:postgresql://127.0.0.1:5432/postgres', 'connector.table' = 'cep_result', 'connector.driver' = 'org.postgresql.Driver', 'connector.username' = 'postgres', 'connector.password' = 'my_password', 'connector.write.flush.max-rows' = '1' ) """

st_env.sql_update(kafka_source) st_env.sql_update(postgres_sink)

postgres_sink_sql = ''' INSERT INTO cep_result SELECT * FROM source MATCH_RECOGNIZE ( PARTITION BY filename ORDER BY event_time MEASURES (A.event_time) AS start_tstamp, (D.event_time) AS end_tstamp ONE ROW PER MATCH AFTER MATCH SKIP PAST LAST ROW PATTERN (A B C D) DEFINE A AS component = 'XXX', B AS component = 'YYY', C AS component = 'ZZZ', D AS component = 'WWW' ) MR '''

sql_result = st_env.execute_sql(postgres_sink_sql)

*来自志愿者整理的flink邮件归档

展开
收起
小阿矿 2021-12-06 16:17:50 768 0
1 条回答
写回答
取消 提交回答
  • execute_sql是一个异步非阻塞的方法,所以你需要在你的代码末尾加上 sql_result.get_job_client().get_job_execution_result().result() 对此我已经创建了JIRA[1]

    [1] https://issues.apache.org/jira/browse/FLINK-18598*来自志愿者整理的flink邮件归档

    2021-12-06 18:32:39
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载