Flink问题之无法通过如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:createTemporaryView接口注册table时,fieldname支持中横线(-)怎么处理?


Hi,all:

我使用flink1.11.2进行作业开发,由于要兼容内部历史代码,需要把source手动注册成一张表,调用为【如图】:

其中,tableEnv为 StreamTableEnvironment类型,source是 DataStream[Row] 类型,代表source

connector生成的算子,fields是 由处理过的source table的 filed name 转换成的 Expression,将filed

name转换成expression 使用 ExpressionParser.parseExpression 这个方法

正常情况下,都能注册成功。

但是,当field name带中横线,如 source中一个字段名称为

“X-Oem”时,经过 ExpressionParser.parseExpression 会被解析为 “minus(X, Model)”

而非预期的“X-Oem”,导致注册成的表与DML语句中操作的字段名不一致报错。

有什么方法能够处理这种情况么?*来自志愿者整理的flink邮件归档


参考回答:

Hi Jun   Zou.

tEnv.createTemporaryView("Word-Count", input,   ("word"),("word"),("word"), ("frequency"));

加上 ` 试一下。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359842


问题二:请问flink 什么时候支持读写ACID的hive表?


我们使用 flink.1.12 读取 ACID hive table 时报错(Reading or writing ACID table %s is not supported),我们尝试修改源码放开这个限制也会出现后续的一些错误如(cast转换 BytesColumnVector 为 LongColumnVector 出错)。 背景:目前我们生产想采用 flink 做 ETL 等数据迁移工作,对应的hive都是hive 3.0左右的版本或者hive 2.3.6的版本,默认都是ACID的表,而且数据量很大,现在使用flink做数据迁移,如果flink只支持读取非ACID标的话,我们需要全部重建hive的表是很费力的。 请问一下flink什么版本有计划支持读取 ACID的hive table?或者,目前有无办法解决我这样的问题?*来自志愿者整理的flink邮件归档


参考回答:

你好, Flink暂时没有计划支持hive的ACID表。目前hive connector的代码无法保证ACID语义,所以即使你去掉“Reading or writing ACID table %s is not supported”这个检查也达不到预期的效果。 是否考虑将ACID表迁移到数据湖中呢,比如iceberg有相应的迁移工具[1]。 [1] https://iceberg.apache.org/spark-procedures/#table-migration


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359868


问题三:使用Table&SQL API怎么构造多个sink


Hi, 个人在分析RelNodeBlock逻辑,多个SINK才会拆分并重用公共子树,怎么构造多个sink呢, 文件RelNodeBlock.scala源码里的writeToSink()已经找不到了

// 源码里的多sink例子 val sourceTable = tEnv.scan("test_table").select('a, 'b, 'c) val leftTable = sourceTable.filter('a > 0).select('a as 'a1, 'b as 'b1) val rightTable = sourceTable.filter('c.isNotNull).select('b as 'b2, 'c as 'c2) val joinTable = leftTable.join(rightTable, 'a1 === 'b2) joinTable.where('a1 >= 70).select('a1, 'b1).writeToSink(sink1) joinTable.where('a1 < 70 ).select('a1, 'c2).writeToSink(sink2)

谢谢*来自志愿者整理的flink邮件归档


参考回答:

Hi.

可以通过StatementSet 指定多个insert,这样子就可以构造出多个sink了。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/370021


问题四:请教下为什么 无法通过t.* 将 自定义函数返回的嵌套字段查出来。


tEnv.executeSql( """ | SELECT t.* FROM ( | SELECT EvtParser(request) as t FROM parsed_nginx_log | ) |""".stripMargin)

自定义函数 EvtParser

@DataTypeHint("ROW") def eval(line: String) = {...}

详细代码

class EvtEtlJobTest {

@Test def testMain(): Unit = { val settings = EnvironmentSettings.newInstance.useBlinkPlanner().build val tEnv = TableEnvironment.create(settings)

tEnv.executeSql( """ | CREATE TABLE parsed_nginx_log ( | remote_addr STRING, | remote_user STRING, | time_local BIGINT, | request STRING, | status STRING, | body_bytes_sent STRING, | http_referer STRING, | http_user_agent STRING, | http_x_forwarded_for STRING | ) WITH ( | 'connector' = 'filesystem', | 'path' = 'src/test/resources/nginx-parsed.log', | 'format' = 'json' | ) |""".stripMargin).getTableSchema

tEnv.executeSql( """ | CREATE TABLE evt_log_index ( | evt STRING, | app STRING, | uid STRING, | ts STRING, | url STRING, | action STRING | ) WITH ( | 'connector' = 'filesystem', | 'path' = '/tmp/evt_log_index', | 'format' = 'json' | ) |""".stripMargin)

// 注册函数 tEnv.createTemporarySystemFunction("EvtParser", classOf[EvtParser])

val schema = tEnv.executeSql( """ | SELECT t.* FROM ( | SELECT EvtParser(request) as t FROM parsed_nginx_log | ) |""".stripMargin).getTableSchema

println("++++++++", schema) }

}

详细报错:

java.lang.RuntimeException: Error while applying rule PushProjectIntoTableSourceScanRule, args [rel#63:LogicalProject.NONE.any.None: 0.[NONE].[NONE](input=RelSubset#62,exprs=[EvtParser(3).evt,EvtParser(3).evt,EvtParser(3).evt, EvtParser(3).app, EvtParser(3).uid,EvtParser(3).uid,EvtParser(3).uid, EvtParser(3).ts, EvtParser(3).url,EvtParser(3).url,EvtParser(3).url, EvtParser(3).action]), rel#1:LogicalTableScan.NONE.any.None: 0.[NONE].[NONE](table=[default_catalog, default_database, parsed_nginx_log])]

at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256) at org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510) at org.apache.calcite.tools.ProgramsRuleSetProgram.run(Programs.java:312)atorg.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)atorg.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgramRuleSetProgram.run(Programs.java:312)atorg.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)atorg.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgramRuleSetProgram.run(Programs.java:312) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgramanonfunanonfunanonfunoptimize1.apply(FlinkChainedProgram.scala:62)atorg.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram1.apply(FlinkChainedProgram.scala:62)atorg.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgramanonfunanonfunanonfunoptimize1.apply(FlinkChainedProgram.scala:58)atscala.collection.TraversableOnce1.apply(FlinkChainedProgram.scala:58)atscala.collection.TraversableOnce1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnceanonfunanonfunanonfunfoldLeft1.apply(TraversableOnce.scala:157)atscala.collection.TraversableOnce1.apply(TraversableOnce.scala:157)atscala.collection.TraversableOnce1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnceanonfunanonfunanonfunfoldLeft1.apply(TraversableOnce.scala:157)atscala.collection.Iterator1.apply(TraversableOnce.scala:157)atscala.collection.Iterator1.apply(TraversableOnce.scala:157) at scala.collection.Iteratorclass.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLikeclass.foreach(IterableLike.scala:72)atscala.collection.AbstractIterable.foreach(Iterable.scala:54)atscala.collection.TraversableOnceclass.foreach(IterableLike.scala:72)atscala.collection.AbstractIterable.foreach(Iterable.scala:54)atscala.collection.TraversableOnceclass.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnceclass.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:160) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:707) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1107) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666) at cn.bestlang.starry.evt.EvtEtlJobTest.testMain(EvtEtlJobTest.scala:51) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at org.junit.jupiter.engine.execution.InvocationInterceptorChainValidatingInvocation.proceed(InvocationInterceptorChain.java:131)atorg.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)atorg.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)atorg.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)atorg.junit.jupiter.engine.execution.ExecutableInvokerValidatingInvocation.proceed(InvocationInterceptorChain.java:131)atorg.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)atorg.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)atorg.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)atorg.junit.jupiter.engine.execution.ExecutableInvokerValidatingInvocation.proceed(InvocationInterceptorChain.java:131) at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) at org.junit.jupiter.engine.execution.ExecutableInvokerReflectiveInterceptorCall.lambdaofVoidMethodofVoidMethodofVoidMethod0(ExecutableInvoker.java:115) at org.junit.jupiter.engine.execution.ExecutableInvoker.lambdainvokeinvokeinvoke0(ExecutableInvoker.java:105) at org.junit.jupiter.engine.execution.InvocationInterceptorChainInterceptedInvocation.proceed(InvocationInterceptorChain.java:106)atorg.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)atorg.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)atorg.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)atorg.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)atorg.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambdaInterceptedInvocation.proceed(InvocationInterceptorChain.java:106)atorg.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)atorg.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)atorg.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)atorg.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)atorg.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambdaInterceptedInvocation.proceed(InvocationInterceptorChain.java:106) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambdainvokeTestMethod6(TestMethodTestDescriptor.java:210)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(TestMethodTestDescriptor.java:210)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(TestMethodTestDescriptor.java:210) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively6(NodeTestTask.java:151)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(NodeTestTask.java:151)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(NodeTestTask.java:151) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively8(NodeTestTask.java:141)atorg.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda8(NodeTestTask.java:141)atorg.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda8(NodeTestTask.java:141) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively9(NodeTestTask.java:139)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)atjava.util.ArrayList.forEach(ArrayList.java:1257)atorg.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda9(NodeTestTask.java:139)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)atjava.util.ArrayList.forEach(ArrayList.java:1257)atorg.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda9(NodeTestTask.java:139) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) at java.util.ArrayList.forEach(ArrayList.java:1257) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively6(NodeTestTask.java:155)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(NodeTestTask.java:155)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(NodeTestTask.java:155) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively8(NodeTestTask.java:141)atorg.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda8(NodeTestTask.java:141)atorg.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda8(NodeTestTask.java:141) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively9(NodeTestTask.java:139)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)atjava.util.ArrayList.forEach(ArrayList.java:1257)atorg.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda9(NodeTestTask.java:139)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)atjava.util.ArrayList.forEach(ArrayList.java:1257)atorg.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda9(NodeTestTask.java:139) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) at java.util.ArrayList.forEach(ArrayList.java:1257) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively6(NodeTestTask.java:155)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(NodeTestTask.java:155)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(NodeTestTask.java:155) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively8(NodeTestTask.java:141)atorg.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda8(NodeTestTask.java:141)atorg.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda8(NodeTestTask.java:141) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively9(NodeTestTask.java:139)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)atorg.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)atorg.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)atorg.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda9(NodeTestTask.java:139)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)atorg.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)atorg.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)atorg.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda9(NodeTestTask.java:139) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35) at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambdaexecute0(EngineExecutionOrchestrator.java:54)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)atorg.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)atorg.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)atorg.junit.platform.launcher.core.DefaultLauncherSession0(EngineExecutionOrchestrator.java:54)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)atorg.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)atorg.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)atorg.junit.platform.launcher.core.DefaultLauncherSession0(EngineExecutionOrchestrator.java:54) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52) at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114) at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86) at org.junit.platform.launcher.core.DefaultLauncherSessionDelegatingLauncher.execute(DefaultLauncherSession.java:86) at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53) at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71) at com.intellij.rt.junit.IdeaTestRunnerRepeater.startRunnerWithArgs(IdeaTestRunner.java:33)atcom.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221)atcom.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)Causedby:scala.MatchError:EvtParser(Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)atcom.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221)atcom.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)Causedby:scala.MatchError:EvtParser(Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) Caused by: scala.MatchError: EvtParser(3) (of class org.apache.calcite.rex.RexCall)*来自志愿者整理的flink邮件归档


参考回答:

请问你使用的是哪个版本? 这个似乎是一个已知的修复的bug[1]

[1] https://github.com/apache/flink/pull/15548 我在1.13分支上验证了下你的case,发现能够跑通。建议cp下那个patch到自己的分支,再验证下。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/370024


问题五:flink cdc 查询mysql binlog时,在 streaming模式下,如果无法获得reload权限应该如何配置参数


最近在使用flink cdc的时候,遇到需要用streaming mode来对收取的binlog日志进行后续相对复杂处理的需求。因为生产环境不允许mysql的RELOAD权限,之前在利用sql模式使用cdc的时候应用了 debezium.snap.shot.locking.mode = none的参数。

但在使用streaming mode的时候遇到了困难,不知道应该如何配置可以支持在没有reload权限的时候使用flink cdc。目前使用的配置方法是 Properties properties = new Properties(); properties.setProperty("debezium.snapshot.locking.mode", "none"); SourceFunction sourceFunction = MySQLSource.builder()         .hostname("localhost")         .port(port)         .databaseList("database")          .tableList("database.test")         .username(“user)         .password("password")         .debeziumProperties(properties)         .deserializer(new StringDebeziumDeserializationSchema())         .build();

但.debeziumProperties(properties)好像并没有和sql模式一样生效。请问我应该怎么配置sourceFunction,或者Streaming模式有没有提供什么方法解决这个问题呢?*来自志愿者整理的flink邮件归档


参考回答:

试试这种写法,在datastreaming API中的,debezium的相关参数应该是不需要加debezium前缀的 public static Properties debeziumProperties(){ Properties properties = new Properties(); properties.setProperty(“xxxx”,”xxxx"); return properties; }


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/370027?spm=a2c6h.13148508.0.0.2f984f0eqWEHKL

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
22天前
|
分布式计算 Java Apache
Flink问题之本地集群报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
22天前
|
Oracle 关系型数据库 MySQL
flink cdc 插件问题之报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
22天前
|
SQL 关系型数据库 分布式数据库
Flink问题之程序直接结束如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
114 1
|
14天前
|
SQL 关系型数据库 分布式数据库
实时计算 Flink版产品使用合集之sink到HBase如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
22天前
|
存储 NoSQL 分布式数据库
【Flink】Flink分布式快照的原理是什么?
【4月更文挑战第21天】【Flink】Flink分布式快照的原理是什么?
|
22天前
|
数据处理 Apache 流计算
【Flink】Exactly-Once的保证
【4月更文挑战第21天】【Flink】Exactly-Once的保证
|
22天前
|
数据处理 Apache 流计算
【Flink】Flink的CEP机制
【4月更文挑战第21天】【Flink】Flink的CEP机制
|
22天前
|
存储 关系型数据库 MySQL
Flink CDC产品常见问题之写hudi的时候报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
22天前
|
Oracle 关系型数据库 数据处理
Flink CDC产品常见问题之flink postgresqlcdc 报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
22天前
|
流计算
Flink这个问题怎么解决?
【2月更文挑战第17天】Flink这个问题怎么解决?
19 1