Flink 1.11.x WatermarkStrategy 不兼容问题

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 最近群里的一个同学问了这么一个问题,在 Scala 代码中使用了 WatermarkStrategy ,代码在编译的时候会抛出异常,具体的报错信息如下:Error:Error:line (31)Static methods in interface require -target:jvm-1.8 .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[String](Duration.ofSeconds(5)))刚开始我以为是他的 JDK 版本设置的有问题,后来他说

最近群里的一个同学问了这么一个问题,在 Scala 代码中使用了 WatermarkStrategy ,代码在编译的时候会抛出异常,具体的报错信息如下:


Error:Error:line (31)Static methods in interface require -target:jvm-1.8
                .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[String](Duration.ofSeconds(5)))


刚开始我以为是他的 JDK 版本设置的有问题,后来他说 IDE 所有地方的版本都设置的是 1.8 的,因为最近我没有用 Scala 开发过 Flink 应用,在 Java 代码里面用过新版本的接口 WatermarkStrategy 是没有问题的,然后我就自己测试了下面的 Scala 代码.确实在编译的时候就会直接报错.


env.addSource(new FlinkKafkaConsumer[String](topic_, new SimpleStringSchema(), properties).setStartFromLatest())
              .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[String](Duration.ofSeconds(5)))


此编译错误明确指出,正在调用接口的静态方法,并且由于 Java 1.8 版本的接口中提供了静态方法,因此通常需要 Target JVM 1.8 版。因为是在 Scala 代码里面调用了 Java Interface 静态方法,所以导致了 Scala 和 Java 8 兼容性的问题.


这个代码实际调用的是 Java WatermarkStrategy 这个接口里面的方法,源码如下所示:


/**
  * Creates a watermark strategy for situations where records are out of order, but you can place
  * an upper bound on how far the events are out of order. An out-of-order bound B means that
  * once the an event with timestamp T was encountered, no events older than {@code T - B} will
  * follow any more.
  *
  * <p>The watermarks are generated periodically. The delay introduced by this watermark
  * strategy is the periodic interval length, plus the out of orderness bound.
  *
  * @see BoundedOutOfOrdernessWatermarks
  */
 static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
  return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
 }


看到上面的代码你可能会感觉到疑惑? interface 里面什么时候可以定义 static 方法了? 没错,在 JDK 1.8 之前,接口里面确实不允许定义 static 方法,但是在 JDK 1.8 之后新增了两个特性增强了接口,那就是接口中可以定义默认的方法 (default) 和静态方法 (static) .就像在 Scala 的 trait 里面可以定义默认方法和静态方法一样.


默认方法 (default) 的主要优势是提供一种拓展接口的方法,而不破坏现有代码。假如我们有一个已经投入使用的接口需要拓展一个新的方法,在 JDK8 以前,如果为一个使用的接口增加一个新方法,则我们必须在所有实现类中添加该方法的实现,否则编译会出现异常。如果实现类数量少并且我们有权限修改,可能会工作量相对较少。如果实现类比较多或者我们没有权限修改实现类源代码,这样可能就比较麻烦。而默认方法则解决了这个问题,它提供了一个实现,当没有显示提供其他实现时就采用这个实现。这样新添加的方法将不会破坏现有代码。默认方法的另一个优势是该方法是可选的,子类可以根据不同的需求 Override 默认实现, 也可以选择不实现.

那 Flink 在 WatermarkStrategy 接口里面为什么使用 static 方法呢? 接着往下面看.


在 Flink 1.11.1 为了简化和统一水印生成策略,重构了水印生成的接口即 WatermarkStrategy (之前是有两个不同的接口) 源码如下:


public interface WatermarkStrategy<T> extends
  TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> {
 // ------------------------------------------------------------------------
 //  Methods that implementors need to implement.
 // ------------------------------------------------------------------------
 /**
  * Instantiates a WatermarkGenerator that generates watermarks according to this strategy.
  */
 @Override
 WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
 /**
  * Instantiates a {@link TimestampAssigner} for assigning timestamps according to this
  * strategy.
  */
 @Override
 default TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
  // By default, this is {@link RecordTimestampAssigner},
  // for cases where records come out of a source with valid timestamps, for example from Kafka.
  return new RecordTimestampAssigner<>();
 }
 // ------------------------------------------------------------------------
 //  Builder methods for enriching a base WatermarkStrategy
 // ------------------------------------------------------------------------
 /**
  * Creates a new {@code WatermarkStrategy} that wraps this strategy but instead uses the given
  * {@link TimestampAssigner} (via a {@link TimestampAssignerSupplier}).
  *
  * <p>You can use this when a {@link TimestampAssigner} needs additional context, for example
  * access to the metrics system.
  *
  * <pre>
  * {@code WatermarkStrategy<Object> wmStrategy = WatermarkStrategy
  *   .forMonotonousTimestamps()
  *   .withTimestampAssigner((ctx) -> new MetricsReportingAssigner(ctx));
  * }</pre>
  */
 default WatermarkStrategy<T> withTimestampAssigner(TimestampAssignerSupplier<T> timestampAssigner) {
  checkNotNull(timestampAssigner, "timestampAssigner");
  return new WatermarkStrategyWithTimestampAssigner<>(this, timestampAssigner);
 }
 /**
  * Creates a new {@code WatermarkStrategy} that wraps this strategy but instead uses the given
  * {@link SerializableTimestampAssigner}.
  *
  * <p>You can use this in case you want to specify a {@link TimestampAssigner} via a lambda
  * function.
  *
  * <pre>
  * {@code WatermarkStrategy<CustomObject> wmStrategy = WatermarkStrategy
  *   .<CustomObject>forMonotonousTimestamps()
  *   .withTimestampAssigner((event, timestamp) -> event.getTimestamp());
  * }</pre>
  */
 default WatermarkStrategy<T> withTimestampAssigner(SerializableTimestampAssigner<T> timestampAssigner) {
  checkNotNull(timestampAssigner, "timestampAssigner");
  return new WatermarkStrategyWithTimestampAssigner<>(this,
    TimestampAssignerSupplier.of(timestampAssigner));
 }
 /**
  * Creates a new enriched {@link WatermarkStrategy} that also does idleness detection in the
  * created {@link WatermarkGenerator}.
  *
  * <p>Add an idle timeout to the watermark strategy. If no records flow in a partition of a
  * stream for that amount of time, then that partition is considered "idle" and will not hold
  * back the progress of watermarks in downstream operators.
  *
  * <p>Idleness can be important if some partitions have little data and might not have events
  * during some periods. Without idleness, these streams can stall the overall event time
  * progress of the application.
  */
 default WatermarkStrategy<T> withIdleness(Duration idleTimeout) {
  checkNotNull(idleTimeout, "idleTimeout");
  checkArgument(!(idleTimeout.isZero() || idleTimeout.isNegative()),
    "idleTimeout must be greater than zero");
  return new WatermarkStrategyWithIdleness<>(this, idleTimeout);
 }
 // ------------------------------------------------------------------------
 //  Convenience methods for common watermark strategies
 // ------------------------------------------------------------------------
 /**
  * Creates a watermark strategy for situations with monotonously ascending timestamps.
  *
  * <p>The watermarks are generated periodically and tightly follow the latest
  * timestamp in the data. The delay introduced by this strategy is mainly the periodic interval
  * in which the watermarks are generated.
  *
  * @see AscendingTimestampsWatermarks
  */
 static <T> WatermarkStrategy<T> forMonotonousTimestamps() {
  return (ctx) -> new AscendingTimestampsWatermarks<>();
 }
 /**
  * Creates a watermark strategy for situations where records are out of order, but you can place
  * an upper bound on how far the events are out of order. An out-of-order bound B means that
  * once the an event with timestamp T was encountered, no events older than {@code T - B} will
  * follow any more.
  *
  * <p>The watermarks are generated periodically. The delay introduced by this watermark
  * strategy is the periodic interval length, plus the out of orderness bound.
  *
  * @see BoundedOutOfOrdernessWatermarks
  */
 static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
  return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
 }
 /**
  * Creates a watermark strategy based on an existing {@link WatermarkGeneratorSupplier}.
  */
 static <T> WatermarkStrategy<T> forGenerator(WatermarkGeneratorSupplier<T> generatorSupplier) {
  return generatorSupplier::createWatermarkGenerator;
 }
 /**
  * Creates a watermark strategy that generates no watermarks at all. This may be useful in
  * scenarios that do pure processing-time based stream processing.
  */
 static <T> WatermarkStrategy<T> noWatermarks() {
  return (ctx) -> new NoWatermarksGenerator<>();
 }
}


可以看到这个接口里面提供了很多 default 方法,为了方便我们开发, Flink 还内置了很多 static 水印生成的方法供我们使用.正是因为 forBoundedOutOfOrderness 这个方法是静态的所以我们才可以直接 WatermarkStrategy.forBoundedOutOfOrderness 使用.


那在 Scala 2.11.x 版本和 Java 8 接口里面定义静态方法这种语法是不兼容的,所以就造成了这个报错.


说了半天该怎么解决这个问题呢? 第一种方法是在 IDEA 里面 Scala 的附加编译选项里添加 -target:jvm-1.8,然后就可以正常运行这段代码了.具体的配置参考下图:



image-20210324154605954


但是当你打包的时候还会遇到这个报错,因为打包的时候也需要指定 -target:jvm-1.8.


不要着急,通过查看官方文档: https://davidb.github.io/scala-maven-plugin/compile-mojo.html#addScalacArgs


发现 Optional Parameters 里面有相关的配置,感兴趣的同学可以点上面的链接看一下,里面有相关的解决办法.



image-20210324164741188


一般我们打包有两种方式

使用命令打包 mvn clean package


使用 scala-maven-plugin 插件打包


解决方案

maven 命令打包

如果你是用 maven 命令打包的话可以添加 -D 参数如下:


mvn clean package -DaddScalacArgs=-target:jvm-1.8


scala-maven-plugin 插件打包

如果你是用 scala-maven-plugin 插件打包的话可以在 pom 里面添加 addScalacArgs 配置:


<plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <configuration>
                    <recompileMode>incremental</recompileMode>
                    <addScalacArgs>-target:jvm-1.8</addScalacArgs>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>


上面两种方式都可以解决这个问题,建议直接在 pom 里面添加 addScalacArgs 配置,这样既可以正常运行代码也可以正常打包.


其实还有第三种更简单的方式,直接把 Scala 的版本升级到 2.12.0 以上就可以了.因为 Scala 2.12.0 版本里对编译器进行了升级,提高了和 Java 8 的兼容性和互操作性,就不存在这个问题了.

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
SQL 关系型数据库 MySQL
Flink CDC 2.0 正式发布,详解核心改进
Flink CDC 2.0.0 版本于 8 月 10 日正式发布,点击了解详情~
Flink CDC 2.0 正式发布,详解核心改进
|
7月前
|
SQL 关系型数据库 分布式数据库
Flink问题之程序直接结束如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
235 1
|
7月前
|
Oracle 关系型数据库 MySQL
flink cdc 插件问题之报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
7月前
|
消息中间件 Kafka 流计算
[flink] flink macm1pro 快速使用从零到一
[flink] flink macm1pro 快速使用从零到一
|
7月前
|
Java Maven 流计算
问题出在Flink CDC的依赖上
问题出在Flink CDC的依赖上
161 40
|
7月前
|
Oracle 关系型数据库 数据处理
Flink CDC产品常见问题之flink postgresqlcdc 报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
7月前
|
SQL 关系型数据库 Java
Flink部署问题之不支持SupportsFilterPushDown如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
7月前
|
Java atlas 网络安全
Flink CDC编译问题之编译atlas报错如何解决
Flink CDC编译指的是将Flink CDC源代码转换成可执行程序的过程,可能会涉及到依赖管理和环境配置等问题;本合集将介绍Flink CDC编译的步骤、常见错误及其解决办法,以确保顺利完成编译过程。
|
7月前
|
SQL JSON Apache
Flink问题之无法通过如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
7月前
|
SQL 关系型数据库 Apache
Flink CDC 是一个基于 Apache Flink 的开源库
Flink CDC 是一个基于 Apache Flink 的开源库
245 7