开发者社区> 问答> 正文

使用一个pyflink的HelloWorld的方法是什么?

使用一个pyflink的HelloWorld的方法是什么?

展开
收起
游客qzzytmszf3zhq 2021-12-07 15:26:19 330 0
1 条回答
写回答
取消 提交回答
  • from pyflink.common import WatermarkStrategy
    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.datastream.connectors import FileSource, StreamFormat
    
    if __name__ == '__main__':
        # 1. 创建流式处理环境
        env = StreamExecutionEnvironment.get_execution_environment()
    
        # 2. 创建source
        file_source = FileSource \
            .for_record_stream_format(StreamFormat.text_line_format(), "./test.log") \
            .build()
    
        # 3. 将source添加到环境中,环境会生成一个datastream,也就是我们进行操作的数据类
        ds = env.from_source(file_source, WatermarkStrategy.for_monotonous_timestamps(), "test")
    
        # 4. transform
        ds = ds.map(lambda x: x)
    
        # 5. sink
        ds.print()
    
        # 6. 真正执行代码
        env.execute("ANY_NAME")
    
    
    
    2021-12-07 15:26:32
    赞同 展开评论 打赏
问答分类:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载