from pyflink.table import EnvironmentSettings, StreamTableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table_env.create_temporary_view("simple_source", table)
table_env.execute_sql("""
CREATE TABLE first_sink_table (
id BIGINT,
data VARCHAR
) WITH (
'connector' = 'print'
)
""")
table_env.execute_sql("""
CREATE TABLE second_sink_table (
id BIGINT,
data VARCHAR
) WITH (
'connector' = 'print'
)
""")
‘# 创建一个statement对象
statement_set = table_env.create_statement_set()
‘# 使用TABLE API 将table表插进first_sink_table表里面
statement_set.add_insert("first_sink_table", table)
‘# 使用SQL将table表插进second_sink_table表里面
statement_set.add_insert_sql("INSERT INTO second_sink_table SELECT * FROM simple_source")
’# 执行查询
statement_set.execute().wait()我执行上面脚本之后,输出以下的结果。在我理解输出内容的应该是1,2,1,2;而不是1,1,2,2。4> +I(1,Hi)
4> +I(1,Hi)
4> +I(2,Hello)
4> +I(2,Hello)*来自志愿者整理的flink邮件归档
奥,那你理解错了。这里面其实细分成2种情况: - sink1和sink2,通过operator chain之后,和source节点位于同一个物理节点:由于执行的时候,在一个JVM里,每处理一条数据,先发送给其中一个sink,再发送给另外一个sink。然后处理下一条数据 - sink1 和sink2,和source节点处于不同一个物理节点:这个时候是纯异步的,每处理一条数据,会分别通过网络发送给sink1和sink2,同时网络有buffer,所以sink1和sink2收到数据的顺序是完全不确定的。
但是不管是上面哪种情况,都不是先把数据全部发送给其中一个sink;再全部发送给第二个sink。*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。