开发者社区> 问答> 正文

请问如何在PYFLINK1.15下将connect steam进行sink?

请问如何在PYFLINK1.15下将connect steam进行sink? 我尝试用sink_to和add_sink都不行。 请提供具体方法和代码!

展开
收起
侠客张 2023-04-12 11:23:57 150 0
1 条回答
写回答
取消 提交回答
  • 发表文章、提出问题、分享经验、结交志同道合的朋友

    可以使用 DataStream.add_sink() 方法将数据流连接到 Sink 上。

    参考这个示例代码:

    from pyflink.common.serialization import SimpleStringEncoder
    from pyflink.common.serialization import SimpleStringSchema
    from pyflink.common.typeinfo import Types
    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.datastream.connectors import StreamingFileSink
    
    # 创建 StreamExecutionEnvironment 对象
    env = StreamExecutionEnvironment.get_execution_environment()
    
    # 创建数据流
    data_stream = env.from_collection([(1, "hello"), (2, "world")], Types.TUPLE([Types.INT(), Types.STRING()]))
    
    # 创建 StreamingFileSink 对象
    sink = StreamingFileSink \
        .for_row_format('./output', SimpleStringEncoder('utf-8')) \
        .with_rolling_policy(
            file_size=1024 * 1024 * 128,  # 文件大小
            rollover_interval=1000 * 60,  # 滚动间隔
            inactivity_interval=1000)  # 空闲间隔
    
    # 连接数据流和 Sink
    data_stream.add_sink(sink)
    
    # 执行作业
    env.execute("Write data stream to local file system")
    
    2023-04-12 21:51:50
    赞同 展开评论 打赏
问答分类:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载