开发者社区> 问答> 正文

PyFlink DataStream API 中定义的 connector写出结果数据的方法是什么?

PyFlink DataStream API 中定义的 connector写出结果数据的方法是什么?

展开
收起
游客qzzytmszf3zhq 2021-12-07 15:07:04 333 0
1 条回答
写回答
取消 提交回答
  • 可以直接使用 PyFlink DataStream API 中已经支持的 connector,需要注意的是,1.12 中提供了对于 FileSystem、JDBC、Kafka connector 的支持,以 Kafka 为例:
    
    serialization_schema = JsonRowSerializationSchema.builder() \
        .with_type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()
    
    kafka_producer = FlinkKafkaProducer(
        topic='test_sink_topic',
        serialization_schema=serialization_schema,
        producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})
    
    ds.add_sink(kafka_producer)
    
    
    2021-12-07 15:07:20
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Spring Boot2.0实战Redis分布式缓存 立即下载
CUDA MATH API 立即下载
API PLAYBOOK 立即下载