开发者社区> 问答> 正文

pyflink中指明deserialization_schema的方法是什么?

pyflink中指明deserialization_schema的方法是什么?

展开
收起
游客qzzytmszf3zhq 2021-12-07 15:28:23 730 0
1 条回答
写回答
取消 提交回答
  • kafka_consumer = FlinkKafkaConsumer(
      topics='test',
      deserialization_schema=JsonRowDeserializationSchema.builder().type_info(
        type_info=Types.ROW_NAMED(["id", "username", "attribute1", "attribute2"],
                                  [Types.INT(), Types.STRING(), Types.STRING(), Types.FLOAT()])).build(),
      properties={
        'bootstrap.servers': 'hadoop1:9092,hadoop2:9092,hadoop3:9092',
        'auto.offset.reset': 'latest'})
    
    ds = env.add_source(kafka_consumer)
    
    
    2021-12-07 15:28:34
    赞同 1 展开评论 打赏
问答分类:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
Dynamic DDL Adding Structure to Streaming Data on the Fly 立即下载
Apache Spark’s Performance Project Tungsten and Beyond 立即下载
低代码开发师(初级)实战教程 立即下载