python flink_cep_example.py 过几秒就退出了,应该一直运行不退出的啊。 代码如下,使用了MATCH_RECOGNIZE:
s_env = StreamExecutionEnvironment.get_execution_environment() b_s_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build() st_env = StreamTableEnvironment.create(s_env, environment_settings=b_s_settings) configuration = st_env.get_config().get_configuration() configuration.set_string("taskmanager.memory.task.off-heap.size", "500m")
s_env.set_parallelism(1)
kafka_source = """CREATE TABLE source ( flow_name STRING, flow_id STRING, component STRING, filename STRING, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'cep', 'properties.bootstrap.servers' = 'localhost:9092', 'format' = 'json', 'scan.startup.mode' = 'latest-offset' )"""
postgres_sink = """ CREATE TABLE cep_result ( filename
STRING, start_tstamp
TIMESTAMP(3), end_tstamp
TIMESTAMP(3) ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:postgresql://127.0.0.1:5432/postgres', 'connector.table' = 'cep_result', 'connector.driver' = 'org.postgresql.Driver', 'connector.username' = 'postgres', 'connector.password' = 'my_password', 'connector.write.flush.max-rows' = '1' ) """
st_env.sql_update(kafka_source) st_env.sql_update(postgres_sink)
postgres_sink_sql = ''' INSERT INTO cep_result SELECT * FROM source MATCH_RECOGNIZE ( PARTITION BY filename ORDER BY event_time MEASURES (A.event_time) AS start_tstamp, (D.event_time) AS end_tstamp ONE ROW PER MATCH AFTER MATCH SKIP PAST LAST ROW PATTERN (A B C D) DEFINE A AS component = 'XXX', B AS component = 'YYY', C AS component = 'ZZZ', D AS component = 'WWW' ) MR '''
sql_result = st_env.execute_sql(postgres_sink_sql)
*来自志愿者整理的flink邮件归档
execute_sql是一个异步非阻塞的方法,所以你需要在你的代码末尾加上 sql_result.get_job_client().get_job_execution_result().result() 对此我已经创建了JIRA[1]
[1] https://issues.apache.org/jira/browse/FLINK-18598*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。