开发者社区> 问答> 正文

PyFlink DataStream API 作业(适合调试)完整的作业示例是什么?

PyFlink DataStream API 作业(适合调试)完整的作业示例是什么?

展开
收起
游客qzzytmszf3zhq 2021-12-07 15:11:24 349 0
1 条回答
写回答
取消 提交回答
  • 方式一(适合调试):
    
    from pyflink.common.typeinfo import Types
    from pyflink.datastream import StreamExecutionEnvironment
    
    
    def data_stream_api_demo():
        env = StreamExecutionEnvironment.get_execution_environment()
        env.set_parallelism(4)
    
        ds = env.from_collection(
            collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],
            type_info=Types.ROW([Types.INT(), Types.STRING()]))
    
        def split(s):
            splits = s[1].split("|")
            for sp in splits:
                yield s[0], sp
    
        ds = ds.map(lambda i: (i[0] + 1, i[1])) \
               .flat_map(split) \
               .key_by(lambda i: i[1]) \
               .reduce(lambda i, j: (i[0] + j[0], i[1]))
    
        ds.print()
    
        env.execute()
    
    
    if __name__ == '__main__':
        data_stream_api_demo()
    
    2021-12-07 15:11:36
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Spring Boot2.0实战Redis分布式缓存 立即下载
CUDA MATH API 立即下载
API PLAYBOOK 立即下载