开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

rabbitmq为source的情况下 如果反序列化失败,Flink会一直消费数据,一直错误下去,?

rabbitmq为source的情况下 如果反序列化失败,Flink会一直消费数据,一直错误下去,这个有参数可以配置吗 ?image.png image.png

展开
收起
真的很搞笑 2023-06-05 18:50:45 138 0
1 条回答
写回答
取消 提交回答
  • 存在即是合理

    Flink有一个参数可以配置来控制反序列化失败时的行为。可以使用failOnDeserialization参数来控制是否在反序列化失败时停止消费数据。如果将此参数设置为true,则当发生反序列化错误时,Flink将停止消费数据并抛出异常。如果将此参数设置为false,则Flink将继续消费数据,即使发生反序列化错误。

    以下是一个示例代码片段,演示如何使用failOnDeserialization参数:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    
    DataStream<String> stream = env.socketTextStream("localhost", 9000);
    stream.map(new MyMapFunction())
        .setParallelism(1)
        .addSink(new MySinkFunction());
    
    // 设置failOnDeserialization参数为true
    env.enableCheckpointing(60000);
    env.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.setSavepointRestoreSettings(SavepointRestoreSettings.forPath("path/to/savepoint"));
    env.setExecutionConfig("failOnDeserialization", "true");
    env.execute("Fail on Deserialization Example");
    

    在这个例子中,创建了一个Flink流处理环境,并从本地主机上的端口9000读取数据。然后,使用MyMapFunction对数据进行映射,并使用MySinkFunction将结果写入目标位置。接下来,启用了检查点功能,并将failOnDeserialization参数设置为true。这意味着如果发生反序列化错误,Flink将停止消费数据并抛出异常。最后,执行了Flink作业。

    2023-06-08 11:09:46
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    RocketMQ Client-GO 介绍 立即下载
    RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载
    基于 RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载