最近群里的一个同学问了这么一个问题,在 Scala 代码中使用了 WatermarkStrategy ,代码在编译的时候会抛出异常,具体的报错信息如下:
Error:Error:line (31)Static methods in interface require -target:jvm-1.8 .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[String](Duration.ofSeconds(5)))
刚开始我以为是他的 JDK 版本设置的有问题,后来他说 IDE 所有地方的版本都设置的是 1.8 的,因为最近我没有用 Scala 开发过 Flink 应用,在 Java 代码里面用过新版本的接口 WatermarkStrategy 是没有问题的,然后我就自己测试了下面的 Scala 代码.确实在编译的时候就会直接报错.
env.addSource(new FlinkKafkaConsumer[String](topic_, new SimpleStringSchema(), properties).setStartFromLatest()) .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[String](Duration.ofSeconds(5)))
此编译错误明确指出,正在调用接口的静态方法,并且由于 Java 1.8 版本的接口中提供了静态方法,因此通常需要 Target JVM 1.8 版。因为是在 Scala 代码里面调用了 Java Interface 静态方法,所以导致了 Scala 和 Java 8 兼容性的问题.
这个代码实际调用的是 Java WatermarkStrategy 这个接口里面的方法,源码如下所示:
/** * Creates a watermark strategy for situations where records are out of order, but you can place * an upper bound on how far the events are out of order. An out-of-order bound B means that * once the an event with timestamp T was encountered, no events older than {@code T - B} will * follow any more. * * <p>The watermarks are generated periodically. The delay introduced by this watermark * strategy is the periodic interval length, plus the out of orderness bound. * * @see BoundedOutOfOrdernessWatermarks */ static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) { return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness); }
看到上面的代码你可能会感觉到疑惑? interface 里面什么时候可以定义 static 方法了? 没错,在 JDK 1.8 之前,接口里面确实不允许定义 static 方法,但是在 JDK 1.8 之后新增了两个特性增强了接口,那就是接口中可以定义默认的方法 (default) 和静态方法 (static) .就像在 Scala 的 trait 里面可以定义默认方法和静态方法一样.
★ 默认方法 (default) 的主要优势是提供一种拓展接口的方法,而不破坏现有代码。假如我们有一个已经投入使用的接口需要拓展一个新的方法,在 JDK8 以前,如果为一个使用的接口增加一个新方法,则我们必须在所有实现类中添加该方法的实现,否则编译会出现异常。如果实现类数量少并且我们有权限修改,可能会工作量相对较少。如果实现类比较多或者我们没有权限修改实现类源代码,这样可能就比较麻烦。而默认方法则解决了这个问题,它提供了一个实现,当没有显示提供其他实现时就采用这个实现。这样新添加的方法将不会破坏现有代码。默认方法的另一个优势是该方法是可选的,子类可以根据不同的需求 Override 默认实现, 也可以选择不实现. ”
那 Flink 在 WatermarkStrategy 接口里面为什么使用 static 方法呢? 接着往下面看.
在 Flink 1.11.1 为了简化和统一水印生成策略,重构了水印生成的接口即 WatermarkStrategy (之前是有两个不同的接口) 源码如下:
public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> { // ------------------------------------------------------------------------ // Methods that implementors need to implement. // ------------------------------------------------------------------------ /** * Instantiates a WatermarkGenerator that generates watermarks according to this strategy. */ @Override WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context); /** * Instantiates a {@link TimestampAssigner} for assigning timestamps according to this * strategy. */ @Override default TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context) { // By default, this is {@link RecordTimestampAssigner}, // for cases where records come out of a source with valid timestamps, for example from Kafka. return new RecordTimestampAssigner<>(); } // ------------------------------------------------------------------------ // Builder methods for enriching a base WatermarkStrategy // ------------------------------------------------------------------------ /** * Creates a new {@code WatermarkStrategy} that wraps this strategy but instead uses the given * {@link TimestampAssigner} (via a {@link TimestampAssignerSupplier}). * * <p>You can use this when a {@link TimestampAssigner} needs additional context, for example * access to the metrics system. * * <pre> * {@code WatermarkStrategy<Object> wmStrategy = WatermarkStrategy * .forMonotonousTimestamps() * .withTimestampAssigner((ctx) -> new MetricsReportingAssigner(ctx)); * }</pre> */ default WatermarkStrategy<T> withTimestampAssigner(TimestampAssignerSupplier<T> timestampAssigner) { checkNotNull(timestampAssigner, "timestampAssigner"); return new WatermarkStrategyWithTimestampAssigner<>(this, timestampAssigner); } /** * Creates a new {@code WatermarkStrategy} that wraps this strategy but instead uses the given * {@link SerializableTimestampAssigner}. * * <p>You can use this in case you want to specify a {@link TimestampAssigner} via a lambda * function. * * <pre> * {@code WatermarkStrategy<CustomObject> wmStrategy = WatermarkStrategy * .<CustomObject>forMonotonousTimestamps() * .withTimestampAssigner((event, timestamp) -> event.getTimestamp()); * }</pre> */ default WatermarkStrategy<T> withTimestampAssigner(SerializableTimestampAssigner<T> timestampAssigner) { checkNotNull(timestampAssigner, "timestampAssigner"); return new WatermarkStrategyWithTimestampAssigner<>(this, TimestampAssignerSupplier.of(timestampAssigner)); } /** * Creates a new enriched {@link WatermarkStrategy} that also does idleness detection in the * created {@link WatermarkGenerator}. * * <p>Add an idle timeout to the watermark strategy. If no records flow in a partition of a * stream for that amount of time, then that partition is considered "idle" and will not hold * back the progress of watermarks in downstream operators. * * <p>Idleness can be important if some partitions have little data and might not have events * during some periods. Without idleness, these streams can stall the overall event time * progress of the application. */ default WatermarkStrategy<T> withIdleness(Duration idleTimeout) { checkNotNull(idleTimeout, "idleTimeout"); checkArgument(!(idleTimeout.isZero() || idleTimeout.isNegative()), "idleTimeout must be greater than zero"); return new WatermarkStrategyWithIdleness<>(this, idleTimeout); } // ------------------------------------------------------------------------ // Convenience methods for common watermark strategies // ------------------------------------------------------------------------ /** * Creates a watermark strategy for situations with monotonously ascending timestamps. * * <p>The watermarks are generated periodically and tightly follow the latest * timestamp in the data. The delay introduced by this strategy is mainly the periodic interval * in which the watermarks are generated. * * @see AscendingTimestampsWatermarks */ static <T> WatermarkStrategy<T> forMonotonousTimestamps() { return (ctx) -> new AscendingTimestampsWatermarks<>(); } /** * Creates a watermark strategy for situations where records are out of order, but you can place * an upper bound on how far the events are out of order. An out-of-order bound B means that * once the an event with timestamp T was encountered, no events older than {@code T - B} will * follow any more. * * <p>The watermarks are generated periodically. The delay introduced by this watermark * strategy is the periodic interval length, plus the out of orderness bound. * * @see BoundedOutOfOrdernessWatermarks */ static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) { return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness); } /** * Creates a watermark strategy based on an existing {@link WatermarkGeneratorSupplier}. */ static <T> WatermarkStrategy<T> forGenerator(WatermarkGeneratorSupplier<T> generatorSupplier) { return generatorSupplier::createWatermarkGenerator; } /** * Creates a watermark strategy that generates no watermarks at all. This may be useful in * scenarios that do pure processing-time based stream processing. */ static <T> WatermarkStrategy<T> noWatermarks() { return (ctx) -> new NoWatermarksGenerator<>(); } }
可以看到这个接口里面提供了很多 default 方法,为了方便我们开发, Flink 还内置了很多 static 水印生成的方法供我们使用.正是因为 forBoundedOutOfOrderness 这个方法是静态的所以我们才可以直接 WatermarkStrategy.forBoundedOutOfOrderness 使用.
那在 Scala 2.11.x 版本和 Java 8 接口里面定义静态方法这种语法是不兼容的,所以就造成了这个报错.
说了半天该怎么解决这个问题呢? 第一种方法是在 IDEA 里面 Scala 的附加编译选项里添加 -target:jvm-1.8,然后就可以正常运行这段代码了.具体的配置参考下图:
image-20210324154605954
但是当你打包的时候还会遇到这个报错,因为打包的时候也需要指定 -target:jvm-1.8.
不要着急,通过查看官方文档: https://davidb.github.io/scala-maven-plugin/compile-mojo.html#addScalacArgs
发现 Optional Parameters 里面有相关的配置,感兴趣的同学可以点上面的链接看一下,里面有相关的解决办法.
image-20210324164741188
一般我们打包有两种方式
使用命令打包 mvn clean package
使用 scala-maven-plugin 插件打包
解决方案
maven 命令打包
如果你是用 maven 命令打包的话可以添加 -D 参数如下:
mvn clean package -DaddScalacArgs=-target:jvm-1.8
scala-maven-plugin 插件打包
如果你是用 scala-maven-plugin 插件打包的话可以在 pom 里面添加 addScalacArgs 配置:
<plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.4.6</version> <configuration> <recompileMode>incremental</recompileMode> <addScalacArgs>-target:jvm-1.8</addScalacArgs> </configuration> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin>
上面两种方式都可以解决这个问题,建议直接在 pom 里面添加 addScalacArgs 配置,这样既可以正常运行代码也可以正常打包.
其实还有第三种更简单的方式,直接把 Scala 的版本升级到 2.12.0 以上就可以了.因为 Scala 2.12.0 版本里对编译器进行了升级,提高了和 Java 8 的兼容性和互操作性,就不存在这个问题了.