背景
本文基于Flink 1.13.3
Flink计算引擎VVR版本的hbase Connector
具体maven依赖如下:
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-cloudhbase</artifactId> <version>1.13-vvr-4.0.7</version> </dependency>
在基于VVR版本的cloudHbase维表查询的时候,发现同步查询的速度很慢,所以我们打算做基于异步的维表查询。
在运行的过程中发现了NPE问题,具体的报错堆栈如下:
2022-06-08 15:01:05 java.lang.Exception: Could not complete the stream element: Record @ (undef) : org.apache.flink.table.data.binary.BinaryRowData@d8014011. at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.completeExceptionally(AsyncWaitOperator.java:382) at org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner$JoinedRowResultFuture.completeExceptionally(AsyncLookupJoinRunner.java:253) at org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner$JoinedRowResultFuture$DelegateResultFuture.completeExceptionally(AsyncLookupJoinRunner.java:275) at org.apache.flink.table.runtime.collector.TableFunctionResultFuture.completeExceptionally(TableFunctionResultFuture.java:61) at org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(AsyncLookupJoinWithCalcRunner.java:121) at org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner$JoinedRowResultFuture.complete(AsyncLookupJoinRunner.java:219) at org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture.accept(DelegatingResultFuture.java:48) at org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture.accept(DelegatingResultFuture.java:32) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.connector.xx.cloudhbase.source.AsyncHBaseLRURowFetcher.lambda$fetchResult$6(AsyncHBaseLRURowFetcher.java:223) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) Caused by: java.lang.NullPointerException at TableCalcMapFunction$8.flatMap(Unknown Source) at org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(AsyncLookupJoinWithCalcRunner.java:119) ... 15 more
先说结论
- Flink计算引擎VVR版本的hbase Connector把hbase的数据转化为RowData的时候存在多线程问题,这种会导致NPE问题
- 相比Asynchronous I/O for External Data Access 的实现,我们不需要实现RichAsyncFunction类的asyncInvoke方法,只需要实现*eval(CompletableFuture<Collection> future, RowData rowData)*方法即可,因为flink做在codegen的时候做封装
分析
- 初始定位
定位到 AsyncLookupJoinWithCalcRunner 119行如下:
@Override public void complete(Collection<RowData> result) { if (result == null || result.size() == 0) { joinConditionResultFuture.complete(result); } else { for (RowData row : result) { try { calc.flatMap(row, calcCollector); } catch (Exception e) { joinConditionResultFuture.completeExceptionally(e); } } joinConditionResultFuture.complete(calcCollector.collection); } }
起初的时候是怀疑 calc是null,后来经过排查不是此问题.
- 再定位
重新定位到自己实现的类 AsyncHBaseLRURowFetcher 223行,如下:
RowData rowData = readHelper.convertToRow(result); if (cache != null) { resultFuture.complete(Collections.singletonList(rowData)); cache.put(rowKey, rowData); } else { resultFuture.complete(Collections.singletonList(rowData)); }
经过测试发现rowData数据居然为null,也就是说 传给calc.flatMap(row, calcCollector) 中的row是null,难道calc没有对null做处理么?(calc 是flink codegen出来的对象)。我们把codegen的代码打印出来如下:
public class TableCalcMapFunction$8 extends org.apache.flink.api.common.functions.RichFlatMapFunction { private transient org.apache.flink.table.runtime.typeutils.StringDataSerializer typeSerializer$6; org.apache.flink.table.data.GenericRowData out = new org.apache.flink.table.data.GenericRowData(1); public TableCalcMapFunction$8(Object[] references) throws Exception { typeSerializer$6 = (((org.apache.flink.table.runtime.typeutils.StringDataSerializer) references[0])); } @Override public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { } @Override public void flatMap(Object _in1, org.apache.flink.util.Collector c) throws Exception { org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) _in1; org.apache.flink.table.data.binary.BinaryStringData field$5; boolean isNull$5; org.apache.flink.table.data.binary.BinaryStringData field$7; isNull$5 = in1.isNullAt(0); field$5 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8; if (!isNull$5) { field$5 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0)); } field$7 = field$5; if (!isNull$5) { field$7 = (org.apache.flink.table.data.binary.BinaryStringData) (typeSerializer$6.copy(field$7)); } if (isNull$5) { out.setField(0, null); } else { out.setField(0, field$7); } c.collect(out); } @Override public void close() throws Exception { } }
还真是没有对null进行处理,难道是Flink的codegen的实现有问题?不是的。
再次找到 RowData rowData = readHelper.convertToRow(result) 这段代码,发现该方法存在多线程问题,如下:
this.rowDataGenerator.start(); ... return this.rowDataGenerator.end();
this.rowDataGenerator.start()的实现如下:
this.rowData = genericRowData()
其中this.rowDataGenerator.end()内部实现如下:
GenericRowData localRowData = this.rowData; this.rowData = null; return localRowData;
对同一个rowData对象进行操作,这显然在多线程环境下是有问题的。
所以说这块代码进行修改,每次都重新创建对象,即可规避这个问题,也解决了NPE问题。
额外话题
对于Flink SQL在内部是怎么实现异步操作呢?如果按照Asynchronous I/O for External Data Access,我们是应该继承RichAsyncFunction类从而实现asyncInvoke方法,然而我们的实现仅仅是*eval(CompletableFuture<Collection> future, RowData rowData)*方法。
这还是得从AsyncLookupJoinRunner 这个类说起,
public class AsyncLookupJoinRunner extends RichAsyncFunction<RowData, RowData> { ... private final GeneratedFunction<AsyncFunction<RowData, Object>> generatedFetcher; ... public void open(Configuration parameters) throws Exception { super.open(parameters); this.fetcher = generatedFetcher.newInstance(getRuntimeContext().getUserCodeClassLoader()); FunctionUtils.setFunctionRuntimeContext(fetcher, getRuntimeContext()); FunctionUtils.openFunction(fetcher, parameters); ''' @Override public void asyncInvoke(RowData input, ResultFuture<RowData> resultFuture) throws Exception { JoinedRowResultFuture outResultFuture = resultFutureBuffer.take(); // the input row is copied when object reuse in AsyncWaitOperator outResultFuture.reset(input, resultFuture); // fetcher has copied the input field when object reuse is enabled fetcher.asyncInvoke(input, outResultFuture); }
其中generatedFetcher这个就是codegen生成的代码,我们打印generatedFetcher中的code,如下:
public class LookupFunction$3 extends org.apache.flink.streaming.api.functions.async.RichAsyncFunction { private transient org.apache.flink.table.runtime.typeutils.StringDataSerializer typeSerializer$1; private transient org.apache.flink.connector.xx.cloudhbase.source.AsyncLookupFunctionWrapper function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3; public LookupFunction$3(Object[] references) throws Exception { typeSerializer$1 = (((org.apache.flink.table.runtime.typeutils.StringDataSerializer) references[0])); function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3 = (((org.apache.flink.connector.xx.cloudhbase.source.AsyncLookupFunctionWrapper) references[1])); } @Override public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3.open(new org.apache.flink.table.functions.FunctionContext(getRuntimeContext())); } @Override public void asyncInvoke(Object _in1, org.apache.flink.streaming.api.functions.async.ResultFuture c) throws Exception { org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) _in1; org.apache.flink.table.data.binary.BinaryStringData field$0; boolean isNull$0; org.apache.flink.table.data.binary.BinaryStringData field$2; isNull$0 = in1.isNullAt(2); field$0 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8; if (!isNull$0) { field$0 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(2)); } field$2 = field$0; if (!isNull$0) { field$2 = (org.apache.flink.table.data.binary.BinaryStringData) (typeSerializer$1.copy(field$2)); } if (isNull$0) { c.complete(java.util.Collections.emptyList()); } else { org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture delegates = new org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture(c); function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3.eval( delegates.getCompletableFuture(), isNull$0 ? null : ((org.apache.flink.table.data.binary.BinaryStringData) field$2)); } } @Override public void close() throws Exception { function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3.close(); } }
可以看到:
org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture delegates = new org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture(c); function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3.eval( delegates.getCompletableFuture(), isNull$0 ? null : ((org.apache.flink.table.data.binary.BinaryStringData) field$2));
用到了DelegatingResultFuture类作为CompletableFuture到ResultFuture类的转换,所以我们的自己实现的方法签名是CompletableFuture
function_orga p a c h e apacheapacheflinkc o n n e c t o r connectorconnectorxxc l o u d h b a s e cloudhbasecloudhbase
cloudhbasecloudhbasesource$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3 是我们自己写类的对象(要继承自AsyncTableFunction类),这样就只需要实现一个*eval(CompletableFuture<Collection> future, RowData rowData)*方法。