开发者社区> 问答> 正文

关于statement输出结果疑问?

from pyflink.table import EnvironmentSettings, StreamTableEnvironment

env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()

table_env = StreamTableEnvironment.create(environment_settings=env_settings)

table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])

table_env.create_temporary_view("simple_source", table)

table_env.execute_sql("""

CREATE TABLE first_sink_table (

id BIGINT,

data VARCHAR

) WITH (

'connector' = 'print'

)

""")

table_env.execute_sql("""

CREATE TABLE second_sink_table (

id BIGINT,

data VARCHAR

) WITH (

'connector' = 'print'

)

""")

‘# 创建一个statement对象

statement_set = table_env.create_statement_set()

‘# 使用TABLE API 将table表插进first_sink_table表里面

statement_set.add_insert("first_sink_table", table)

‘# 使用SQL将table表插进second_sink_table表里面

statement_set.add_insert_sql("INSERT INTO second_sink_table SELECT * FROM simple_source")

’# 执行查询

statement_set.execute().wait()我执行上面脚本之后,输出以下的结果。在我理解输出内容的应该是1,2,1,2;而不是1,1,2,2。4> +I(1,Hi)

4> +I(1,Hi)

4> +I(2,Hello)

4> +I(2,Hello)*来自志愿者整理的flink邮件归档

展开
收起
moonlightdisco 2021-12-01 14:01:46 803 0
1 条回答
写回答
取消 提交回答
  • 奥,那你理解错了。这里面其实细分成2种情况: - sink1和sink2,通过operator chain之后,和source节点位于同一个物理节点:由于执行的时候,在一个JVM里,每处理一条数据,先发送给其中一个sink,再发送给另外一个sink。然后处理下一条数据 - sink1 和sink2,和source节点处于不同一个物理节点:这个时候是纯异步的,每处理一条数据,会分别通过网络发送给sink1和sink2,同时网络有buffer,所以sink1和sink2收到数据的顺序是完全不确定的。

    但是不管是上面哪种情况,都不是先把数据全部发送给其中一个sink;再全部发送给第二个sink。*来自志愿者整理的flink邮件归档

    2021-12-01 14:18:36
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载