开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

有没有大佬使用pyflink读取kafka数据的, 有没有相应的解决办法呀?

有没有大佬使用pyflink读取kafka数据的,kafka流出来的数据类型是字节 但是pyflink提供的kafkasource 反序列化器只有simplestringschma。这个读取kafka字节类型的反序列化 有没有相应的解决办法呀?

展开
收起
cuicuicuic 2023-10-30 19:19:47 96 0
2 条回答
写回答
取消 提交回答
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    PyFlink 的 KafkaSource 是支持字节类型反序列化的,你可以使用 PyFlink 自带的 ByteArrayDeserializationSchema 来实现这个功能。以下是具体的代码示例:

    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
    from pyflink.table.descriptors import Schema, Kafka, FileSystem
    from pyflink.table.expressions import col
    from pyflink.table.sources import KafkaSourceFunction
    from pyflink.table.descriptors import Kafka, FileSystem, OldCsv, Json, Avro, Parquet, \
        JDBC, JdbcDynamicTableSink, JdbcBatchTableSink, JdbcAppendTableSink, \
        JdbcUpsertTableSink, JdbcInsertTableSink, Cassandra, Elasticsearch7, Elasticsearch6, \
        FileSystem sink, Catalog, Delta, Hive, HBase, MongoDB
    from pyflink.formats import DeserializationSchema
    from pyflink.table.descriptors import Schema
    env = StreamExecutionEnvironment.get_execution_environment()
    t_config = TableConfig()
    t_env = BatchTableEnvironment.create(env, t_config)
    t_env.connect(Kafka() \
                 .version("universal") \
                 .topic("your-topic-name") \
                 .start_from_earliest() \
                 .deserialization_schema(ByteArrayDeserializationSchema()))
    

    这段代码创建了一个新的 Kafka 表源,并指定了使用的反序列化方案为 ByteArrayDeserializationSchema 。这样,当从 Kafka 中读取数据时,将会自动使用这个反序列化方案将字节类型的数据反序列化为其他可读的数据类型。
    请注意,这只是一个基本的示例,实际使用时还需要根据你的具体需求进行适当的修改。

    2023-10-31 14:52:56
    赞同 展开评论 打赏
  • 可以尝试使用FlinkKafkaConsumer来读取Kafka数据,并指定反序列化器为SimpleStringSchema。然后,在处理数据时,将字节类型转换为所需的数据类型。

    以下是一个示例代码:

    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.connector.kafka import FlinkKafkaConsumer
    from pyflink.common.serialization import SimpleStringSchema
    
    # 创建Flink执行环境
    env = StreamExecutionEnvironment.get_execution_environment()
    
    # 配置Kafka消费者参数
    properties = {
        "bootstrap.servers": "localhost:9092",
        "group.id": "test"
    }
    
    # 创建Kafka消费者
    consumer = FlinkKafkaConsumer(topic="test_topic", properties=properties, schema=SimpleStringSchema())
    
    # 添加Kafka消费者到Flink执行环境
    env.add_source(consumer)
    
    # 处理数据,将字节类型转换为所需的数据类型
    def process_data(data):
        # 在这里将字节类型转换为所需的数据类型,例如字符串、整数等
        pass
    
    # 对数据流进行处理
    env.add_map(process_data)
    
    # 执行Flink作业
    env.execute("Read Kafka Data")
    

    在这个示例中,我们首先创建了一个FlinkKafkaConsumer,并指定了Kafka消费者的参数和反序列化器。然后,我们将Kafka消费者添加到Flink执行环境中。接下来,我们可以在process_data函数中将字节类型转换为所需的数据类型。最后,我们对数据流进行处理,并执行Flink作业。

    2023-10-31 14:43:27
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载