PyFlink DataStream API 作业(适合调试)完整的作业示例是什么?
方式一(适合调试):
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
def data_stream_api_demo():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
ds = env.from_collection(
collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],
type_info=Types.ROW([Types.INT(), Types.STRING()]))
def split(s):
splits = s[1].split("|")
for sp in splits:
yield s[0], sp
ds = ds.map(lambda i: (i[0] + 1, i[1])) \
.flat_map(split) \
.key_by(lambda i: i[1]) \
.reduce(lambda i, j: (i[0] + j[0], i[1]))
ds.print()
env.execute()
if __name__ == '__main__':
data_stream_api_demo()
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。