Flink 维表异步查询的实现以及问题排查

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink 维表异步查询的实现以及问题排查

背景

本文基于Flink 1.13.3

Flink计算引擎VVR版本的hbase Connector

具体maven依赖如下:

<dependency>
  <groupId>com.alibaba.ververica</groupId>
  <artifactId>ververica-connector-cloudhbase</artifactId>
  <version>1.13-vvr-4.0.7</version>
</dependency>

在基于VVR版本的cloudHbase维表查询的时候,发现同步查询的速度很慢,所以我们打算做基于异步的维表查询。

在运行的过程中发现了NPE问题,具体的报错堆栈如下:

2022-06-08 15:01:05
java.lang.Exception: Could not complete the stream element: Record @ (undef) : org.apache.flink.table.data.binary.BinaryRowData@d8014011.
  at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.completeExceptionally(AsyncWaitOperator.java:382)
  at org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner$JoinedRowResultFuture.completeExceptionally(AsyncLookupJoinRunner.java:253)
  at org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner$JoinedRowResultFuture$DelegateResultFuture.completeExceptionally(AsyncLookupJoinRunner.java:275)
  at org.apache.flink.table.runtime.collector.TableFunctionResultFuture.completeExceptionally(TableFunctionResultFuture.java:61)
  at org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(AsyncLookupJoinWithCalcRunner.java:121)
  at org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner$JoinedRowResultFuture.complete(AsyncLookupJoinRunner.java:219)
  at org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture.accept(DelegatingResultFuture.java:48)
  at org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture.accept(DelegatingResultFuture.java:32)
  at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
  at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
  at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
  at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
  at org.apache.flink.connector.xx.cloudhbase.source.AsyncHBaseLRURowFetcher.lambda$fetchResult$6(AsyncHBaseLRURowFetcher.java:223)
  at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
  at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
  at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
  at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
  at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
  at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
  at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.NullPointerException
  at TableCalcMapFunction$8.flatMap(Unknown Source)
  at org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(AsyncLookupJoinWithCalcRunner.java:119)
  ... 15 more


先说结论

  • Flink计算引擎VVR版本的hbase Connector把hbase的数据转化为RowData的时候存在多线程问题,这种会导致NPE问题
  • 相比Asynchronous I/O for External Data Access 的实现,我们不需要实现RichAsyncFunction类的asyncInvoke方法,只需要实现*eval(CompletableFuture<Collection> future, RowData rowData)*方法即可,因为flink做在codegen的时候做封装


分析

  • 初始定位
    定位到 AsyncLookupJoinWithCalcRunner 119行如下:
@Override
        public void complete(Collection<RowData> result) {
            if (result == null || result.size() == 0) {
                joinConditionResultFuture.complete(result);
            } else {
                for (RowData row : result) {
                    try {
                        calc.flatMap(row, calcCollector);
                    } catch (Exception e) {
                        joinConditionResultFuture.completeExceptionally(e);
                    }
                }
                joinConditionResultFuture.complete(calcCollector.collection);
            }
        }


起初的时候是怀疑 calc是null,后来经过排查不是此问题.

  • 再定位
    重新定位到自己实现的类 AsyncHBaseLRURowFetcher 223行,如下:
       RowData rowData = readHelper.convertToRow(result);
           if (cache != null) {
               resultFuture.complete(Collections.singletonList(rowData));
               cache.put(rowKey, rowData);
           } else {
               resultFuture.complete(Collections.singletonList(rowData));
           }

经过测试发现rowData数据居然为null,也就是说 传给calc.flatMap(row, calcCollector) 中的row是null,难道calc没有对null做处理么?(calc 是flink codegen出来的对象)。我们把codegen的代码打印出来如下:


public class TableCalcMapFunction$8
          extends org.apache.flink.api.common.functions.RichFlatMapFunction {
        private transient org.apache.flink.table.runtime.typeutils.StringDataSerializer typeSerializer$6;
        org.apache.flink.table.data.GenericRowData out = new org.apache.flink.table.data.GenericRowData(1);
        public TableCalcMapFunction$8(Object[] references) throws Exception {
          typeSerializer$6 = (((org.apache.flink.table.runtime.typeutils.StringDataSerializer) references[0]));
        }
        @Override
        public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
        }
        @Override
        public void flatMap(Object _in1, org.apache.flink.util.Collector c) throws Exception {
          org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) _in1;
          org.apache.flink.table.data.binary.BinaryStringData field$5;
          boolean isNull$5;
          org.apache.flink.table.data.binary.BinaryStringData field$7;
          isNull$5 = in1.isNullAt(0);
          field$5 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
          if (!isNull$5) {
            field$5 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0));
          }
          field$7 = field$5;
          if (!isNull$5) {
            field$7 = (org.apache.flink.table.data.binary.BinaryStringData) (typeSerializer$6.copy(field$7));
          }
          if (isNull$5) {
            out.setField(0, null);
          } else {
            out.setField(0, field$7);
          }
          c.collect(out);
        }
        @Override
        public void close() throws Exception {
        }
      }

还真是没有对null进行处理,难道是Flink的codegen的实现有问题?不是的。

再次找到 RowData rowData = readHelper.convertToRow(result) 这段代码,发现该方法存在多线程问题,如下:

this.rowDataGenerator.start();
...
return this.rowDataGenerator.end();


this.rowDataGenerator.start()的实现如下:

this.rowData = genericRowData()

其中this.rowDataGenerator.end()内部实现如下:

 GenericRowData localRowData = this.rowData;
 this.rowData = null;
 return localRowData;

对同一个rowData对象进行操作,这显然在多线程环境下是有问题的。

所以说这块代码进行修改,每次都重新创建对象,即可规避这个问题,也解决了NPE问题。


额外话题

对于Flink SQL在内部是怎么实现异步操作呢?如果按照Asynchronous I/O for External Data Access,我们是应该继承RichAsyncFunction类从而实现asyncInvoke方法,然而我们的实现仅仅是*eval(CompletableFuture<Collection> future, RowData rowData)*方法。


这还是得从AsyncLookupJoinRunner 这个类说起,

public class AsyncLookupJoinRunner extends RichAsyncFunction<RowData, RowData> {
...
private final GeneratedFunction<AsyncFunction<RowData, Object>> generatedFetcher;
...
 public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.fetcher = generatedFetcher.newInstance(getRuntimeContext().getUserCodeClassLoader());
        FunctionUtils.setFunctionRuntimeContext(fetcher, getRuntimeContext());
        FunctionUtils.openFunction(fetcher, parameters);
'''
 @Override
    public void asyncInvoke(RowData input, ResultFuture<RowData> resultFuture) throws Exception {
        JoinedRowResultFuture outResultFuture = resultFutureBuffer.take();
        // the input row is copied when object reuse in AsyncWaitOperator
        outResultFuture.reset(input, resultFuture);
        // fetcher has copied the input field when object reuse is enabled
        fetcher.asyncInvoke(input, outResultFuture);
    }


其中generatedFetcher这个就是codegen生成的代码,我们打印generatedFetcher中的code,如下:


public class LookupFunction$3
          extends org.apache.flink.streaming.api.functions.async.RichAsyncFunction {
        private transient org.apache.flink.table.runtime.typeutils.StringDataSerializer typeSerializer$1;
        private transient org.apache.flink.connector.xx.cloudhbase.source.AsyncLookupFunctionWrapper function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3;
        public LookupFunction$3(Object[] references) throws Exception {
          typeSerializer$1 = (((org.apache.flink.table.runtime.typeutils.StringDataSerializer) references[0]));
          function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3 = (((org.apache.flink.connector.xx.cloudhbase.source.AsyncLookupFunctionWrapper) references[1]));
        }
        @Override
        public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
          function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3.open(new org.apache.flink.table.functions.FunctionContext(getRuntimeContext()));
        }
        @Override
        public void asyncInvoke(Object _in1, org.apache.flink.streaming.api.functions.async.ResultFuture c) throws Exception {
          org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) _in1;
          org.apache.flink.table.data.binary.BinaryStringData field$0;
          boolean isNull$0;
          org.apache.flink.table.data.binary.BinaryStringData field$2;
          isNull$0 = in1.isNullAt(2);
          field$0 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
          if (!isNull$0) {
            field$0 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(2));
          }
          field$2 = field$0;
          if (!isNull$0) {
            field$2 = (org.apache.flink.table.data.binary.BinaryStringData) (typeSerializer$1.copy(field$2));
          }
          if (isNull$0) {
            c.complete(java.util.Collections.emptyList());
          } else {
            org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture delegates = new org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture(c);
            function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3.eval(
              delegates.getCompletableFuture(),
              isNull$0 ? null : ((org.apache.flink.table.data.binary.BinaryStringData) field$2));
          }
        }
        @Override
        public void close() throws Exception {
          function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3.close();
        }
      }

可以看到:

org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture delegates = new org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture(c);
            function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3.eval(
              delegates.getCompletableFuture(),
              isNull$0 ? null : ((org.apache.flink.table.data.binary.BinaryStringData) field$2));

用到了DelegatingResultFuture类作为CompletableFuture到ResultFuture类的转换,所以我们的自己实现的方法签名是CompletableFuture


function_orga p a c h e apacheapacheflinkc o n n e c t o r connectorconnectorxxc l o u d h b a s e cloudhbasecloudhbase

cloudhbasecloudhbasesource$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3 是我们自己写类的对象(要继承自AsyncTableFunction类),这样就只需要实现一个*eval(CompletableFuture<Collection> future, RowData rowData)*方法。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
存储 监控 数据处理
flink 向doris 数据库写入数据时出现背压如何排查?
本文介绍了如何确定和解决Flink任务向Doris数据库写入数据时遇到的背压问题。首先通过Flink Web UI和性能指标监控识别背压,然后从Doris数据库性能、网络连接稳定性、Flink任务数据处理逻辑及资源配置等方面排查原因,并通过分析相关日志进一步定位问题。
207 61
|
6月前
|
SQL Java Serverless
实时计算 Flink版操作报错合集之在写入SLS(Serverless Log Service)时出现报错,该如何排查
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
消息中间件 监控 Kafka
联通实时计算平台问题之Flink状态后端数据量较大时,问题排查要如何进行
联通实时计算平台问题之Flink状态后端数据量较大时,问题排查要如何进行
|
5月前
|
Java 流计算
美团 Flink 大作业部署问题之files-to-delete 的执行为什么能够异步进行呢
美团 Flink 大作业部署问题之files-to-delete 的执行为什么能够异步进行呢
|
5月前
|
Kubernetes Java 数据库连接
实时计算 Flink版产品使用问题之部署到 Kubernetes 集群时,任务过一会儿自动被取消,该如何排查
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL 消息中间件 资源调度
实时计算 Flink版产品使用问题之如何实现血缘查询功能
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
存储 JSON Kubernetes
实时计算 Flink版操作报错合集之 写入hudi时报错,该如何排查
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
6月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之运行mysql to doris pipeline时报错,该如何排查
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
6月前
|
SQL 分布式计算 Hadoop
实时计算 Flink版产品使用问题之建了一张upsert-kafka的flink表,但是数据为空,该如何排查
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之在Flink算子内部使用异步IO可以通过什么办法实现
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。