开发者社区> 问答> 正文

PyFlink中使用connector的方法创建数据源的方法是什么?

PyFlink中使用connector的方法创建数据源的方法是什么?

展开
收起
游客qzzytmszf3zhq 2021-12-07 14:57:54 356 0
1 条回答
写回答
取消 提交回答
  • 此外,也可以使用 PyFlink DataStream API 中已经支持的 connector,需要注意的是,1.12 中仅提供了 Kafka connector 的支持。
    
    deserialization_schema = JsonRowDeserializationSchema.builder() 
        .type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()
    
    kafka_consumer = FlinkKafkaConsumer(
        topics='test_source_topic',
        deserialization_schema=deserialization_schema,
        properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})
    
    ds = env.add_source(kafka_consumer)
    
    2021-12-07 14:58:09
    赞同 展开评论 打赏
问答分类:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
Ali-HBase的SQL实践与改进 立即下载
《基于 Flink 的全新 Pulsar Connector 的设计、开发和使用》 立即下载
Apache Flink 流式应用中状态的数据结构定义升级 立即下载

相关实验场景

更多