一.引言
使用 Flink 自定义 Source 生成数据时,集群提交任务时显示 org.apache.log4j.Logger@72c927f1 is not serializable. The object probably contains or references non serializable fields. 报错序列化相关错误 :
编辑
二.问题解决
1.Scala Class 初始化不需要对应变量
错误代码:
val logger = Logger.getLogger(classOf[T])
正确代码:
通过 scala 延迟加载功能与 @transient 关键字忽略对该变量的序列化,前提是该变量在对应 class[T] 初始化时不需要,如果某个变量在 class[T] 初始化时调用,加了 @transient 关键字会导致该变量为 null 并报错空指针。
@transient lazy val logger = Logger.getLogger(classOf[T])
2.Scala Class 初始化需要对应变量
上述 logger 在 Class 初始化阶段不使用,所以可以使用 @transient 延迟初始化解决问题,还有一些变量的生成无法延迟初始化,例如使用 redis 初始化一些变量,如果使用 @transient 会报如下错误:
编辑
此时需要将对应无法初始化的类放到 open 初始化函数中,然后变量通过 var 修饰符在 class 内定义,并在 open 函数内执行实际初始化方法,Flink RichSourceFunction open 函数使用方法如下:
var redis: Jedis = _ var initValue: T = _ override def open(parameters: Configuration): Unit = { redis = getRedisClient(host, port) initValue = ... (包含redis读取的初始化方法) }
3.Java
错误代码:
private Logger logger = Logger.getLogger(T.class)
正确代码:
log4j 不能序列化,为了防止 logger 被序列化,需要保持其处于 @transient 或者 static 状态,前者会导致上述相同的问题即 NullPointException,所以这里通过 static + final 修饰。
private static final Logger logger = Logger.getLogger(T.class)
三.扩展
上述错误发生在 class[T] 内的变量 logger,变量无法序列化通过上述方法即可解决,如果是 class[T] 内某个 class 无法序列化,则需要实现 java.io.Serializable 接口,保证该类可以被序列化。上面的序列化问题出现在 BroadcastStream 场景下,由于 broadcastStream 中的类T 中有变量无法序列化导致广播流失效,通过 scala 方法已完美解决。