在使用flink1.17、pyflink1.17,如何使用KafkaSink? 问题1. 我在导入包 from pyflink.datastream.connectors import KafkaSource, KafkaSink
报错“ImportError: cannot import name 'KafkaSink' from 'pyflink.datastream.connectors' (/home/bigdata/flink-1.17.1/opt/python/pyflink.zip/pyflink/datastream/connectors/init.py) ”
问题2. 官方文档代码如下: sink = KafkaSink.builder()
.set_bootstrap_servers(brokers)
.set_record_serializer( KafkaRecordSerializationSchema.builder() .set_topic("topic-name") .set_value_serialization_schema(SimpleStringSchema()) .build() )
.set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build()
如何导入序列号包和实例化?
请提供一个完整的示例代码,包括相关包的导入
问题1的报错信息显示pyflink.datastream.connectors中没有KafkaSink模块,因此导入失败。可能是在pyflink1.17版本中,KafkaSink模块被移除了。可以尝试使用新的Kafka相关模块,比如FlinkKafkaProducer等。
问题2中需要导入KafkaRecordSerializationSchema、SimpleStringSchema、DeliveryGuarantee这三个模块。完整示例代码如下:
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaProducer
from pyflink.datastream import KafkaRecordSerializationSchema
from pyflink.datastream import DeliveryGuarantee
env = StreamExecutionEnvironment.get_execution_environment()
# 创建数据流
stream = env.from_collection([(1, 'hello'), (2, 'world')])
# 设置Kafka相关配置
kafka_props = {"bootstrap.servers": "localhost:9092",
"key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
"value.serializer": "org.apache.kafka.common.serialization.StringSerializer"}
# 创建KafkaProducer
producer = FlinkKafkaProducer("test-topic",
KafkaRecordSerializationSchema.builder()
.set_topic("test-topic")
.set_value_serializer(SimpleStringSchema())
.build(),
kafka_props,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE,
delivery_guarantee=DeliveryGuarantee.AT_LEAST_ONCE)
# 输出到Kafka
stream.add_sink(producer)
# 执行任务
env.execute("write to kafka")
在这个示例中,我们使用FlinkKafkaProducer模块代替了KafkaSink模块,使用KafkaRecordSerializationSchema模块来序列化数据,使用SimpleStringSchema模块来设置数据类型。在创建KafkaProducer时,我们使用了AT_LEAST_ONCE模式来保证数据至少被处理一次。最后,我们将数据流输出到Kafka主题中。
对于问题1,您需要从pyflink.datastream.connectors.kafka导入KafkaSink,而不是从pyflink.datastream.connectors导入。可以使用以下代码:
python from pyflink.datastream.connectors.kafka import KafkaSink 对于问题2,您需要导入KafkaRecordSerializationSchema和SimpleStringSchema。可以使用以下代码:
python from pyflink.common.serialization import SimpleStringSchema from pyflink.datastream.connectors.kafka import KafkaRecordSerializationSchema 然后,您可以使用以下代码实例化KafkaSink:
python sink = KafkaSink.builder() .set_bootstrap_servers(brokers) .set_record_serializer( KafkaRecordSerializationSchema.builder() .set_topic("topic-name") .set_value_serialization_schema(SimpleStringSchema()) .build() ) .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build() 以下是一个完整的示例代码:
python from pyflink.common.serialization import SimpleStringSchema from pyflink.datastream.connectors.kafka import KafkaRecordSerializationSchema, KafkaSink from pyflink.datastream import StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
brokers = "localhost:9092" topic = "test-topic"
stream = env.from_collection([(1, "Hello"), (2, "World")])
sink = KafkaSink.builder() .set_bootstrap_servers(brokers) .set_record_serializer( KafkaRecordSerializationSchema.builder() .set_topic(topic) .set_value_serialization_schema(SimpleStringSchema()) .build() ) .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build()
stream.add_sink(sink)
env.execute("Kafka Sink Example")
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。