PyFlink DataStream API 作业(适合调试)完整的作业示例是什么?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
方式一(适合调试):
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
def data_stream_api_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(4)
    ds = env.from_collection(
        collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],
        type_info=Types.ROW([Types.INT(), Types.STRING()]))
    def split(s):
        splits = s[1].split("|")
        for sp in splits:
            yield s[0], sp
    ds = ds.map(lambda i: (i[0] + 1, i[1])) \
           .flat_map(split) \
           .key_by(lambda i: i[1]) \
           .reduce(lambda i, j: (i[0] + j[0], i[1]))
    ds.print()
    env.execute()
if __name__ == '__main__':
    data_stream_api_demo()