开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

问一下Flink:我这边使用pyflink将json的流数据写入kafka后,请问应该如何解决?

问一下Flink:我这边使用pyflink将json的流数据写入kafka后,消费得到的数据却变成字段个数统计。如上面两图所示。请问应该如何解决,使得消费得到的数据和写入数据一致?使用的是canal-json格式

展开
收起
cuicuicuic 2023-09-17 20:06:08 67 0
4 条回答
写回答
取消 提交回答
  • 可以使用如下处理流程:

    1、初始化运行环境。

    # 初始化
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(2)
    # 加载 flink 读取 kafka 的 jar 包
    env.add_jars("file:////jars/flink-sql-connector-kafka-1.16.0.jar")
    

    2、定义 kafka source

    # 源 kafka配置
    kafka_servers = "node:9092"
    source_topic = "test"
    consume_group_id = "test_group"
    sink_topic = "test1"
    
    # 使用 SimpleStringSchema 反序列模式,因为测试数据源为非结构化数据
    source = KafkaSource.builder().set_bootstrap_servers(kafka_servers) \
        .set_topics(source_topic) \
        .set_group_id(consume_group_id) \
        .set_value_only_deserializer(SimpleStringSchema()) \
        .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
        .set_property("partition.discovery.interval.ms", "10000") \
        .build()
    
    # 这里不使用水印了,来一条处理一条无需上下关联
    data_source = env.from_source(source=source,
                            watermark_strategy=WatermarkStrategy.no_watermarks(),
                            source_name="kafka source")
    

    3、定义转换函数,将普通非结构化数据转换为 json

    def standard_log(line):
        """
        标准化字符串
        :param line:
        :return:
        """
        data = []
        try:
            if line:
                data = [json.dumps({"key": line.upper()}], ensure_ascii=False)]
        except Exception as ex:
            data = [json.dumps({"key": 1}, ensure_ascii=False)]
        finally:
            yield from data
    ds_standard = data_source.flat_map(standard_log, Types.STRING())
    # 标准化日志测试打印
    ds_standard.print()
    

    4、定义 sink

    # 订阅写入
    sink = KafkaSink.builder() \
        .set_bootstrap_servers(kafka_servers) \
        .set_record_serializer(
        KafkaRecordSerializationSchema.builder()
        .set_topic(sink_topic)
        .set_value_serialization_schema(SimpleStringSchema())
        .build()
    ) \
        .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
        .build()
    ds_standard.sink_to(sink)
    
    # 执行
    env.execute()
    

    5、校验数据

    kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 --from-beginning
    

    ——参考链接

    2024-01-24 17:20:22
    赞同 1 展开评论 打赏
  • 当您使用PyFlink将JSON格式的数据写入Kafka,而在消费时却发现数据变成了字段个数统计,这很可能是因为在消费端对数据的解码处理不正确导致的。为了确保写入和消费数据的一致性,尤其是当您提及使用的是Canal JSON格式时,需要注意以下几点:

    1. 生产端编码:

      • 在PyFlink中,确保数据是以符合Canal JSON格式的完整JSON对象形式写入Kafka的。这意味着每个JSON对象应包含databasetabletype等Canal协议所需的基本字段以及对应的实际数据字段。
    2. 序列化器设置:

      • 设置正确的序列化器来将Python对象转换为JSON字符串。使用SimpleStringSchema可能只会把对象转换为其字符串表示,而不是JSON格式。您需要确保使用类似JsonRowSerializationSchema或者其他能够正确处理JSON格式的序列化器。
    3. 消费端解码:

      • 在消费者一侧,确保使用能够正确解析JSON格式的Kafka消费者,并将接收到的字节串转换回原始的JSON对象。如果是Java消费者,可能需要使用JsonDeserializer或者自定义反序列化逻辑;如果是其他语言的消费者,也需要相应的JSON解析模块。

    举例说明如何在PyFlink中使用JSON序列化器:

    from pyflink.table import DataTypes, StreamTableEnvironment, TableConfig
    from pyflink.table.descriptors import Schema, Kafka, Json
    
    # 创建表环境
    table_env = StreamTableEnvironment.create(environment_settings=..., table_config=TableConfig())
    
    # 定义表结构,假设与Canal JSON格式匹配
    schema = Schema()
    # ... 添加列定义 ...
    
    # 设置Kafka生产者配置,使用JSON格式
    table_env.connect(Kafka()
                       .version('universal')  # 根据实际情况选择版本
                       .topic('your_topic')
                       .property('bootstrap.servers', 'kafka_broker:9092')
                       .property('value.serializer', 'org.apache.kafka.common.serialization.StringSerializer')  # 或使用JSONSerializer
                       .with_schema(Json().json_schema(schema.json())))
    
    # 将DataStream转换为Table,并注册为一个临时表
    table = table_env.from_data_stream(stream)
    table_env.create_temporary_view('source_table', table)
    
    # 将表写入Kafka
    table_env.execute_sql("""
        INSERT INTO kafka_table
        SELECT * FROM source_table
    """)
    
    # 确保SELECT查询出来的每一行数据都被转换为合法的JSON字符串写入Kafka
    

    消费端则需要确保按照同样的逻辑解码JSON字符串为JSON对象,具体做法取决于您使用的消费者框架和语言。如果使用的是Flink Kafka Consumer,同样需要配置合适的JSON Deserializer。

    如果您的问题是发生在非Flink的Kafka消费者身上,请检查消费者配置以确保它正确处理JSON格式的消息。

    2024-01-15 14:24:57
    赞同 展开评论 打赏
  • 某政企事业单位运维工程师,主要从事系统运维工作,同时兼顾教育行业、企业等src安全漏洞挖掘,曾获全国行业网络安全大赛二等奖。

    使用PyFlink将JSON流数据写入Kafka,但在消费者端接收的数据变成了字段数量计数。这种情况的原因有很多可能性,请允许我对其中的一些进行简短探讨:

    • JSON序列化问题:请确保您的Python脚本正确地解码了Canal-Json消息。Canal-Json是一种特殊的JSON格式,包含元数据和原始数据。如果解码出现问题,则可能出现意外的结果。
    • Kafka分区策略:Kafka有一个称为“key-based partitioner”的分区策略,默认情况下,它是用来分发具有键的消息到特定的主题分区的。如果您的消息没有键,或者KeyGenerator未正确配置,那么消息可能会随机分配到分区,造成乱序。
    • 消费者端异常:有时候,在消费者的代码中会发生意外的事情,例如误删掉重要的代码片段,或者是无意间改变了预期的行为。请您仔细检查您的消费者代码,确保没有任何明显的bug。
    • 转义字符混淆:如果您的JSON数据中含有转义字符,而在转换成字节流发送到Kafka时发生了混乱,这也可能导致数据丢失或损坏。
    • 分布式事务问题:如果您的分布式环境中涉及多台计算机协同工作,那么必须注意协调事务的一致性和隔离性。如果出现了跨进程通信失败或是锁竞争等情况,就可能发生数据错位。
    2024-01-15 11:22:28
    赞同 展开评论 打赏
  • 北京阿里云ACE会长

    这个问题是由于在将 JSON 数据写入 Kafka 时,Flink 使用了默认的编码(canal-json),导致某些字段被截断或不正确。要解决这个问题,您可以在写入 Kafka 时指定自定义的编码。
    以下是一个使用 PyFlink 的示例,展示了如何将 JSON 数据写入 Kafka 并自定义编码:

    from pyflink.dataset import ExecutionEnvironment
    from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
    from pyflink.table.descriptors import Schema, OldCsv, FileSystem

    创建 ExecutionEnvironment 和 BatchTableEnvironment。

    env = ExecutionEnvironment.get_execution_environment()
    t_config = TableConfig()
    t_env = BatchTableEnvironment.create(env, t_config)

    设置文件系统连接器,这里使用文件系统连接器将数据写入 Kafka。

    t_env.connect(FileSystem().path('output.json')) \
    .with_format(OldCsv()
    .field_delimiter(',')
    .field("field1", DataTypes.STRING())
    .field("field2", DataTypes.STRING())
    .field("field3", DataTypes.STRING())) \
    .with_schema(Schema()
    .field("field1", DataTypes.STRING())
    .field("field2", DataTypes.STRING())
    .field("field3", DataTypes.STRING())) \
    .register_table_sink("Results")

    读取 JSON 数据并将其写入 Kafka。

    data = [{"field1": "value1", "field2": "value2", "field3": "value3"},
    {"field1": "value4", "field2": "value5", "field3": "value6"}]
    t_env.from_elements(data, DataTypes.JSON()) \
    .select("field1", "field2", "field3") \
    .insert_into("Results")

    执行任务。

    t_env.execute("write_kafka_json")
    CopyCopy

    在这个示例中,我们使用了 OldCsv 格式来指定自定义编码。您还可以根据需要修改 field_delimiter、field 和 with_schema 方法来调整编码。

    2024-01-12 22:23:33
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
    MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
    消息队列kafka介绍 立即下载