开发者社区> 问答> 正文

请问如何在PYFLINK1.17下使用KafkaSink

在使用flink1.17、pyflink1.17,如何使用KafkaSink? 问题1. 我在导入包 from pyflink.datastream.connectors import KafkaSource, KafkaSink

报错“ImportError: cannot import name 'KafkaSink' from 'pyflink.datastream.connectors' (/home/bigdata/flink-1.17.1/opt/python/pyflink.zip/pyflink/datastream/connectors/init.py) ”

问题2. 官方文档代码如下: sink = KafkaSink.builder()
.set_bootstrap_servers(brokers)
.set_record_serializer( KafkaRecordSerializationSchema.builder() .set_topic("topic-name") .set_value_serialization_schema(SimpleStringSchema()) .build() )
.set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build()

如何导入序列号包和实例化?

请提供一个完整的示例代码,包括相关包的导入

展开
收起
侠客张 2023-06-04 21:02:28 264 0
2 条回答
写回答
取消 提交回答
  • 问题1的报错信息显示pyflink.datastream.connectors中没有KafkaSink模块,因此导入失败。可能是在pyflink1.17版本中,KafkaSink模块被移除了。可以尝试使用新的Kafka相关模块,比如FlinkKafkaProducer等。

    问题2中需要导入KafkaRecordSerializationSchema、SimpleStringSchema、DeliveryGuarantee这三个模块。完整示例代码如下:

    from pyflink.common.serialization import SimpleStringSchema
    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.datastream.connectors import FlinkKafkaProducer
    from pyflink.datastream import KafkaRecordSerializationSchema
    from pyflink.datastream import DeliveryGuarantee
    
    env = StreamExecutionEnvironment.get_execution_environment()
    
    # 创建数据流
    stream = env.from_collection([(1, 'hello'), (2, 'world')])
    
    # 设置Kafka相关配置
    kafka_props = {"bootstrap.servers": "localhost:9092",
                   "key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
                   "value.serializer": "org.apache.kafka.common.serialization.StringSerializer"}
    
    # 创建KafkaProducer
    producer = FlinkKafkaProducer("test-topic", 
                                  KafkaRecordSerializationSchema.builder()
                                    .set_topic("test-topic")
                                    .set_value_serializer(SimpleStringSchema())
                                    .build(), 
                                  kafka_props, 
                                  FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 
                                  delivery_guarantee=DeliveryGuarantee.AT_LEAST_ONCE)
    
    # 输出到Kafka
    stream.add_sink(producer)
    
    # 执行任务
    env.execute("write to kafka")
    

    在这个示例中,我们使用FlinkKafkaProducer模块代替了KafkaSink模块,使用KafkaRecordSerializationSchema模块来序列化数据,使用SimpleStringSchema模块来设置数据类型。在创建KafkaProducer时,我们使用了AT_LEAST_ONCE模式来保证数据至少被处理一次。最后,我们将数据流输出到Kafka主题中。

    2023-06-05 07:33:51
    赞同 展开评论 打赏
  • 值得去的地方都没有捷径

    对于问题1,您需要从pyflink.datastream.connectors.kafka导入KafkaSink,而不是从pyflink.datastream.connectors导入。可以使用以下代码:

    python from pyflink.datastream.connectors.kafka import KafkaSink 对于问题2,您需要导入KafkaRecordSerializationSchema和SimpleStringSchema。可以使用以下代码:

    python from pyflink.common.serialization import SimpleStringSchema from pyflink.datastream.connectors.kafka import KafkaRecordSerializationSchema 然后,您可以使用以下代码实例化KafkaSink:

    python sink = KafkaSink.builder() .set_bootstrap_servers(brokers) .set_record_serializer( KafkaRecordSerializationSchema.builder() .set_topic("topic-name") .set_value_serialization_schema(SimpleStringSchema()) .build() ) .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build() 以下是一个完整的示例代码:

    python from pyflink.common.serialization import SimpleStringSchema from pyflink.datastream.connectors.kafka import KafkaRecordSerializationSchema, KafkaSink from pyflink.datastream import StreamExecutionEnvironment

    env = StreamExecutionEnvironment.get_execution_environment()

    configure Kafka producer

    brokers = "localhost:9092" topic = "test-topic"

    create a stream

    stream = env.from_collection([(1, "Hello"), (2, "World")])

    create Kafka sink

    sink = KafkaSink.builder() .set_bootstrap_servers(brokers) .set_record_serializer( KafkaRecordSerializationSchema.builder() .set_topic(topic) .set_value_serialization_schema(SimpleStringSchema()) .build() ) .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build()

    add Kafka sink to stream

    stream.add_sink(sink)

    execute the program

    env.execute("Kafka Sink Example")

    2023-06-04 22:55:11
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载