开发者社区> 问答> 正文

PyFlink Table API 中定义的 connector写出结果数据的方法是什么?

PyFlink Table API 中定义的 connector写出结果数据的方法是什么?

展开
收起
游客qzzytmszf3zhq 2021-12-07 15:08:08 342 0
1 条回答
写回答
取消 提交回答
  • 以下示例展示了如何将 Table & SQL 中支持的 connector,用作 PyFlink DataStream API 作业的 sink。
    
    # 写法一:ds类型为Types.ROW
    def split(s):
        splits = s[1].split("|")
        for sp in splits:
            yield Row(s[0], sp)
    
    ds = ds.map(lambda i: (i[0] + 1, i[1])) \
           .flat_map(split, Types.ROW([Types.INT(), Types.STRING()])) \
           .key_by(lambda i: i[1]) \
           .reduce(lambda i, j: Row(i[0] + j[0], i[1]))
    
    # 写法二:ds类型为Types.TUPLE
    def split(s):
        splits = s[1].split("|")
        for sp in splits:
            yield s[0], sp
    
    ds = ds.map(lambda i: (i[0] + 1, i[1])) \
           .flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \
           .key_by(lambda i: i[1]) \
           .reduce(lambda i, j: (i[0] + j[0], i[1]))
    
    # 将ds写出到sink
    t_env.execute_sql("""
            CREATE TABLE my_sink (
              a INT,
              b VARCHAR
            ) WITH (
              'connector' = 'print'
            )
        """)
    
    table = t_env.from_data_stream(ds)
    table_result = table.execute_insert("my_sink")
    
    2021-12-07 15:08:24
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Spring Boot2.0实战Redis分布式缓存 立即下载
CUDA MATH API 立即下载
API PLAYBOOK 立即下载