开发者社区> 问答> 正文

PyFlink中使用connector的方法创建数据源的方法是什么?

PyFlink中使用connector的方法创建数据源的方法是什么?

展开
收起
游客qzzytmszf3zhq 2021-12-07 14:57:54 356 0
1 条回答
写回答
取消 提交回答
  • 此外,也可以使用 PyFlink DataStream API 中已经支持的 connector,需要注意的是,1.12 中仅提供了 Kafka connector 的支持。
    
    deserialization_schema = JsonRowDeserializationSchema.builder() 
        .type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()
    
    kafka_consumer = FlinkKafkaConsumer(
        topics='test_source_topic',
        deserialization_schema=deserialization_schema,
        properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})
    
    ds = env.add_source(kafka_consumer)
    
    2021-12-07 14:58:09
    赞同 展开评论 打赏
问答分类:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
《Apache Flink-重新定义计算》PDF下载 立即下载
Apache Phoenix and HBase: Past, Present and Future of SQL over HBase 立即下载
Ali-HBase的SQL实践与改进 立即下载

相关实验场景

更多