Flink - The object probably contains or references non serializable fields 无法序列化问题

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
云原生内存数据库 Tair,内存型 2GB
简介: 使用 Flink 自定义 Source 生成数据时,集群提交任务时显示 org.apache.log4j.Logger@72c927f1 is not serializable. The object probably contains or references non serializable fields.

一.引言

使用 Flink 自定义 Source 生成数据时,集群提交任务时显示 org.apache.log4j.Logger@72c927f1 is not serializable. The object probably contains or references non serializable fields. 报错序列化相关错误 :

image.gif编辑

二.问题解决

1.Scala Class 初始化不需要对应变量

错误代码:

val logger = Logger.getLogger(classOf[T])

image.gif

正确代码:

通过 scala 延迟加载功能与 @transient 关键字忽略对该变量的序列化,前提是该变量在对应 class[T] 初始化时不需要,如果某个变量在 class[T] 初始化时调用,加了 @transient 关键字会导致该变量为 null 并报错空指针。

@transient lazy val logger = Logger.getLogger(classOf[T])

image.gif

2.Scala Class 初始化需要对应变量

上述 logger 在 Class 初始化阶段不使用,所以可以使用 @transient 延迟初始化解决问题,还有一些变量的生成无法延迟初始化,例如使用 redis 初始化一些变量,如果使用 @transient 会报如下错误:

image.gif编辑

此时需要将对应无法初始化的类放到 open 初始化函数中,然后变量通过 var 修饰符在 class 内定义,并在 open 函数内执行实际初始化方法,Flink RichSourceFunction open 函数使用方法如下:

var redis: Jedis = _
  var initValue: T = _
  override def open(parameters: Configuration): Unit = {
    redis = getRedisClient(host, port)
    initValue = ... (包含redis读取的初始化方法)
  }

image.gif

3.Java

错误代码:

private Logger logger =   Logger.getLogger(T.class)

image.gif

正确代码:

log4j 不能序列化,为了防止 logger 被序列化,需要保持其处于 @transient 或者 static 状态,前者会导致上述相同的问题即 NullPointException,所以这里通过 static + final 修饰。

private static final Logger logger = Logger.getLogger(T.class)

image.gif

三.扩展

上述错误发生在 class[T] 内的变量 logger,变量无法序列化通过上述方法即可解决,如果是 class[T] 内某个 class 无法序列化,则需要实现 java.io.Serializable 接口,保证该类可以被序列化。上面的序列化问题出现在 BroadcastStream 场景下,由于 broadcastStream 中的类T 中有变量无法序列化导致广播流失效,通过 scala 方法已完美解决。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
3月前
|
消息中间件 SQL Kafka
实时计算 Flink版产品使用合集之如何重写序列化器
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
关系型数据库 MySQL Java
实时计算 Flink版操作报错之遇到java.lang.IllegalStateException: The elasticsearch emitter must be serializable.的错误,如何处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
17天前
|
JSON 数据格式 Python
【python】解决json.dump(字典)时报错Object of type ‘float32‘ is not JSON serializable
在使用json.dump时遇到的“Object of type ‘float32’ is not JSON serializable”错误的方法,通过自定义一个JSON编码器类来处理NumPy类型的数据。
19 1
|
1月前
|
JSON 前端开发 数据格式
【Python】已解决:TypeError: Object of type JpegImageFile is not JSON serializable
【Python】已解决:TypeError: Object of type JpegImageFile is not JSON serializable
37 0
|
3月前
|
Prometheus 监控 Cloud Native
实时计算 Flink版操作报错之在使用ES时遇到“java.lang.IllegalStateException: The elasticsearch emitter must be serializable”,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
3月前
|
存储 关系型数据库 对象存储
实时计算 Flink版操作报错合集之变更数据流转换为Insert-Only记录时,报错"datastream api record contains: Delete"如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
63 1
|
3月前
|
Java API 数据安全/隐私保护
实时计算 Flink版操作报错合集之变更数据流转换为Insert-Only记录时,报错"datastream api record contains: Delete"如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
46 0
|
3月前
|
Java 流计算
在Flink实时任务中,POJO(Plain Old Java Object)对象的模式演进可能会引起不兼容的问题
【2月更文挑战第6天】在Flink实时任务中,POJO(Plain Old Java Object)对象的模式演进可能会引起不兼容的问题
48 3
|
3月前
|
数据库 流计算
Flink CDC对于这种DEFAULT VALUE 序列化不支持?
Flink CDC对于这种DEFAULT VALUE 序列化不支持?
40 0
|
JSON 数据格式
TypeError: Object of type ‘float32‘ is not JSON serializable
TypeError: Object of type ‘float32‘ is not JSON serializable
177 0