开发者社区> 问答> 正文

pyflink中指明deserialization_schema的方法是什么?

pyflink中指明deserialization_schema的方法是什么?

展开
收起
游客qzzytmszf3zhq 2021-12-07 15:28:23 695 0
1 条回答
写回答
取消 提交回答
  • kafka_consumer = FlinkKafkaConsumer(
      topics='test',
      deserialization_schema=JsonRowDeserializationSchema.builder().type_info(
        type_info=Types.ROW_NAMED(["id", "username", "attribute1", "attribute2"],
                                  [Types.INT(), Types.STRING(), Types.STRING(), Types.FLOAT()])).build(),
      properties={
        'bootstrap.servers': 'hadoop1:9092,hadoop2:9092,hadoop3:9092',
        'auto.offset.reset': 'latest'})
    
    ds = env.add_source(kafka_consumer)
    
    
    2021-12-07 15:28:34
    赞同 1 展开评论 打赏
问答分类:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
Major advancements in Apache Hive towards full support of SQL compliance 立即下载
Dynamic DDL Adding Structure to Streaming Data on the Fly 立即下载
Apache Spark’s Performance Project Tungsten and Beyond 立即下载