开发者社区> 问答> 正文

pyflink中使用file->JDBC的流式处理的方法是什么?

pyflink中使用file->JDBC的流式处理的方法是什么?

展开
收起
游客qzzytmszf3zhq 2021-12-07 15:30:39 732 0
1 条回答
写回答
取消 提交回答
  • from apache_beam.io.fileio import FileSink
    from pyflink.common import WatermarkStrategy, Row
    from pyflink.common.serialization import Encoder
    from pyflink.common.typeinfo import Types, RowTypeInfo
    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.datastream.connectors import FileSource, StreamFormat, FileSink, JdbcSink, JdbcConnectionOptions
    
    if __name__ == '__main__':
        # 创建流式处理环境
        env = StreamExecutionEnvironment.get_execution_environment()
        # 导入必要的包
        env.add_jars("file:///Users/xiangyang/PycharmProjects/jyyc_dp_stream/flink-connector-jdbc_2.11-1.13.0.jar")
        env.add_jars("file:///Users/xiangyang/PycharmProjects/jyyc_dp_stream/mysql-connector-java-8.0.21.jar")
    
        # 创建source
        file_source = FileSource \
            .for_record_stream_format(StreamFormat.text_line_format(), "./test.log") \
            .build()
    
        # 将source添加到环境中,环境会生成一个datastream,也就是我们进行操作的数据类
        ds = env.from_source(file_source, WatermarkStrategy.for_monotonous_timestamps(), "test")
    
        # transform
        ds = ds.map(lambda x: [x], output_type=Types.ROW([Types.STRING()]))
    
        # sink
        ds.print()
    
        jdbc_options = JdbcConnectionOptions.JdbcConnectionOptionsBuilder() \
            .with_user_name("xxxxxx") \
            .with_password("xxxxxx") \
            .with_driver_name("com.mysql.cj.jdbc.Driver") \
            .with_url("jdbc:mysql://localhost:3306/test_db") \
            .build()
    
        ds.add_sink(JdbcSink.sink("insert test_table(id, message) VALUES(null, ?)",
                                  type_info=Types.ROW([Types.STRING()]),
                                  jdbc_connection_options=jdbc_options))
        # 真正执行代码
        env.execute("ANY_NAME")
    
    
    
    2021-12-07 15:30:51
    赞同 展开评论 打赏
问答标签:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
File Format Benchmark- Avro, J 立即下载
Scaling 30 TB’s of Data Lake with Apache HBase and Scala DSL at Production 立即下载
BUILDING REALTIME DATA PIPELINES WITH KAFKA CONNECT AND SPARK STREAMING 立即下载