问题一:createTemporaryView接口注册table时,fieldname支持中横线(-)怎么处理?
Hi,all:
我使用flink1.11.2进行作业开发,由于要兼容内部历史代码,需要把source手动注册成一张表,调用为【如图】:
其中,tableEnv为 StreamTableEnvironment类型,source是 DataStream[Row] 类型,代表source
connector生成的算子,fields是 由处理过的source table的 filed name 转换成的 Expression,将filed
name转换成expression 使用 ExpressionParser.parseExpression 这个方法
正常情况下,都能注册成功。
但是,当field name带中横线,如 source中一个字段名称为
“X-Oem”时,经过 ExpressionParser.parseExpression 会被解析为 “minus(X, Model)”
而非预期的“X-Oem”,导致注册成的表与DML语句中操作的字段名不一致报错。
有什么方法能够处理这种情况么?*来自志愿者整理的flink邮件归档
参考回答:
Hi Jun Zou.
tEnv.createTemporaryView("Word-Count
", input, ("word"),("word"),("word"), ("frequency"));
加上 ` 试一下。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359842
问题二:请问flink 什么时候支持读写ACID的hive表?
我们使用 flink.1.12 读取 ACID hive table 时报错(Reading or writing ACID table %s is not supported),我们尝试修改源码放开这个限制也会出现后续的一些错误如(cast转换 BytesColumnVector 为 LongColumnVector 出错)。 背景:目前我们生产想采用 flink 做 ETL 等数据迁移工作,对应的hive都是hive 3.0左右的版本或者hive 2.3.6的版本,默认都是ACID的表,而且数据量很大,现在使用flink做数据迁移,如果flink只支持读取非ACID标的话,我们需要全部重建hive的表是很费力的。 请问一下flink什么版本有计划支持读取 ACID的hive table?或者,目前有无办法解决我这样的问题?*来自志愿者整理的flink邮件归档
参考回答:
你好, Flink暂时没有计划支持hive的ACID表。目前hive connector的代码无法保证ACID语义,所以即使你去掉“Reading or writing ACID table %s is not supported”这个检查也达不到预期的效果。 是否考虑将ACID表迁移到数据湖中呢,比如iceberg有相应的迁移工具[1]。 [1] https://iceberg.apache.org/spark-procedures/#table-migration
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359868
问题三:使用Table&SQL API怎么构造多个sink
Hi, 个人在分析RelNodeBlock逻辑,多个SINK才会拆分并重用公共子树,怎么构造多个sink呢, 文件RelNodeBlock.scala源码里的writeToSink()已经找不到了
// 源码里的多sink例子 val sourceTable = tEnv.scan("test_table").select('a, 'b, 'c) val leftTable = sourceTable.filter('a > 0).select('a as 'a1, 'b as 'b1) val rightTable = sourceTable.filter('c.isNotNull).select('b as 'b2, 'c as 'c2) val joinTable = leftTable.join(rightTable, 'a1 === 'b2) joinTable.where('a1 >= 70).select('a1, 'b1).writeToSink(sink1) joinTable.where('a1 < 70 ).select('a1, 'c2).writeToSink(sink2)
谢谢*来自志愿者整理的flink邮件归档
参考回答:
Hi.
可以通过StatementSet
指定多个insert,这样子就可以构造出多个sink了。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/370021
问题四:请教下为什么 无法通过t.* 将 自定义函数返回的嵌套字段查出来。
tEnv.executeSql( """ | SELECT t.* FROM ( | SELECT EvtParser(request) as t FROM parsed_nginx_log | ) |""".stripMargin)
自定义函数 EvtParser
@DataTypeHint("ROW") def eval(line: String) = {...}
详细代码
class EvtEtlJobTest {
@Test def testMain(): Unit = { val settings = EnvironmentSettings.newInstance.useBlinkPlanner().build val tEnv = TableEnvironment.create(settings)
tEnv.executeSql( """ | CREATE TABLE parsed_nginx_log ( | remote_addr STRING, | remote_user STRING, | time_local BIGINT, | request STRING, | status STRING, | body_bytes_sent STRING, | http_referer STRING, | http_user_agent STRING, | http_x_forwarded_for STRING | ) WITH ( | 'connector' = 'filesystem', | 'path' = 'src/test/resources/nginx-parsed.log', | 'format' = 'json' | ) |""".stripMargin).getTableSchema
tEnv.executeSql( """ | CREATE TABLE evt_log_index ( | evt STRING, | app STRING, | uid STRING, | ts STRING, | url STRING, | action STRING | ) WITH ( | 'connector' = 'filesystem', | 'path' = '/tmp/evt_log_index', | 'format' = 'json' | ) |""".stripMargin)
// 注册函数 tEnv.createTemporarySystemFunction("EvtParser", classOf[EvtParser])
val schema = tEnv.executeSql( """ | SELECT t.* FROM ( | SELECT EvtParser(request) as t FROM parsed_nginx_log | ) |""".stripMargin).getTableSchema
println("++++++++", schema) }
}
详细报错:
java.lang.RuntimeException: Error while applying rule PushProjectIntoTableSourceScanRule, args [rel#63:LogicalProject.NONE.any.None: 0.[NONE].[NONE](input=RelSubset#62,exprs=[EvtParser(3).evt,EvtParser(3).evt,EvtParser(3).evt, EvtParser(3).app, EvtParser(3).uid,EvtParser(3).uid,EvtParser(3).uid, EvtParser(3).ts, EvtParser(3).url,EvtParser(3).url,EvtParser(3).url, EvtParser(3).action]), rel#1:LogicalTableScan.NONE.any.None: 0.[NONE].[NONE](table=[default_catalog, default_database, parsed_nginx_log])]
at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256) at org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510) at org.apache.calcite.tools.ProgramsRuleSetProgram.run(Programs.java:312)atorg.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)atorg.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgramRuleSetProgram.run(Programs.java:312)atorg.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)atorg.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgramRuleSetProgram.run(Programs.java:312) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgramanonfunanonfunanonfunoptimize1.apply(FlinkChainedProgram.scala:62)atorg.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram1.apply(FlinkChainedProgram.scala:62)atorg.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgramanonfunanonfunanonfunoptimize1.apply(FlinkChainedProgram.scala:58)atscala.collection.TraversableOnce1.apply(FlinkChainedProgram.scala:58)atscala.collection.TraversableOnce1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnceanonfunanonfunanonfunfoldLeft1.apply(TraversableOnce.scala:157)atscala.collection.TraversableOnce1.apply(TraversableOnce.scala:157)atscala.collection.TraversableOnce1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnceanonfunanonfunanonfunfoldLeft1.apply(TraversableOnce.scala:157)atscala.collection.Iterator1.apply(TraversableOnce.scala:157)atscala.collection.Iterator1.apply(TraversableOnce.scala:157) at scala.collection.Iteratorclass.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLikeclass.foreach(IterableLike.scala:72)atscala.collection.AbstractIterable.foreach(Iterable.scala:54)atscala.collection.TraversableOnceclass.foreach(IterableLike.scala:72)atscala.collection.AbstractIterable.foreach(Iterable.scala:54)atscala.collection.TraversableOnceclass.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnceclass.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:160) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:707) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1107) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666) at cn.bestlang.starry.evt.EvtEtlJobTest.testMain(EvtEtlJobTest.scala:51) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at org.junit.jupiter.engine.execution.InvocationInterceptorChainValidatingInvocation.proceed(InvocationInterceptorChain.java:131)atorg.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)atorg.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)atorg.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)atorg.junit.jupiter.engine.execution.ExecutableInvokerValidatingInvocation.proceed(InvocationInterceptorChain.java:131)atorg.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)atorg.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)atorg.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)atorg.junit.jupiter.engine.execution.ExecutableInvokerValidatingInvocation.proceed(InvocationInterceptorChain.java:131) at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) at org.junit.jupiter.engine.execution.ExecutableInvokerReflectiveInterceptorCall.lambdaofVoidMethodofVoidMethodofVoidMethod0(ExecutableInvoker.java:115) at org.junit.jupiter.engine.execution.ExecutableInvoker.lambdainvokeinvokeinvoke0(ExecutableInvoker.java:105) at org.junit.jupiter.engine.execution.InvocationInterceptorChainInterceptedInvocation.proceed(InvocationInterceptorChain.java:106)atorg.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)atorg.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)atorg.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)atorg.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)atorg.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambdaInterceptedInvocation.proceed(InvocationInterceptorChain.java:106)atorg.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)atorg.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)atorg.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)atorg.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)atorg.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambdaInterceptedInvocation.proceed(InvocationInterceptorChain.java:106) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambdainvokeTestMethod6(TestMethodTestDescriptor.java:210)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(TestMethodTestDescriptor.java:210)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(TestMethodTestDescriptor.java:210) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively6(NodeTestTask.java:151)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(NodeTestTask.java:151)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(NodeTestTask.java:151) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively8(NodeTestTask.java:141)atorg.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda8(NodeTestTask.java:141)atorg.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda8(NodeTestTask.java:141) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively9(NodeTestTask.java:139)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)atjava.util.ArrayList.forEach(ArrayList.java:1257)atorg.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda9(NodeTestTask.java:139)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)atjava.util.ArrayList.forEach(ArrayList.java:1257)atorg.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda9(NodeTestTask.java:139) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) at java.util.ArrayList.forEach(ArrayList.java:1257) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively6(NodeTestTask.java:155)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(NodeTestTask.java:155)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(NodeTestTask.java:155) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively8(NodeTestTask.java:141)atorg.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda8(NodeTestTask.java:141)atorg.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda8(NodeTestTask.java:141) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively9(NodeTestTask.java:139)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)atjava.util.ArrayList.forEach(ArrayList.java:1257)atorg.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda9(NodeTestTask.java:139)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)atjava.util.ArrayList.forEach(ArrayList.java:1257)atorg.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda9(NodeTestTask.java:139) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) at java.util.ArrayList.forEach(ArrayList.java:1257) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively6(NodeTestTask.java:155)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(NodeTestTask.java:155)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(NodeTestTask.java:155) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively8(NodeTestTask.java:141)atorg.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda8(NodeTestTask.java:141)atorg.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda8(NodeTestTask.java:141) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively9(NodeTestTask.java:139)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)atorg.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)atorg.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)atorg.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda9(NodeTestTask.java:139)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)atorg.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)atorg.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)atorg.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda9(NodeTestTask.java:139) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35) at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambdaexecute0(EngineExecutionOrchestrator.java:54)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)atorg.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)atorg.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)atorg.junit.platform.launcher.core.DefaultLauncherSession0(EngineExecutionOrchestrator.java:54)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)atorg.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)atorg.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)atorg.junit.platform.launcher.core.DefaultLauncherSession0(EngineExecutionOrchestrator.java:54) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52) at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114) at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86) at org.junit.platform.launcher.core.DefaultLauncherSessionDelegatingLauncher.execute(DefaultLauncherSession.java:86) at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53) at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71) at com.intellij.rt.junit.IdeaTestRunnerRepeater.startRunnerWithArgs(IdeaTestRunner.java:33)atcom.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221)atcom.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)Causedby:scala.MatchError:EvtParser(Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)atcom.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221)atcom.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)Causedby:scala.MatchError:EvtParser(Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) Caused by: scala.MatchError: EvtParser(3) (of class org.apache.calcite.rex.RexCall)*来自志愿者整理的flink邮件归档
参考回答:
请问你使用的是哪个版本? 这个似乎是一个已知的修复的bug[1]
[1] https://github.com/apache/flink/pull/15548 我在1.13分支上验证了下你的case,发现能够跑通。建议cp下那个patch到自己的分支,再验证下。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/370024
问题五:flink cdc 查询mysql binlog时,在 streaming模式下,如果无法获得reload权限应该如何配置参数
最近在使用flink cdc的时候,遇到需要用streaming mode来对收取的binlog日志进行后续相对复杂处理的需求。因为生产环境不允许mysql的RELOAD权限,之前在利用sql模式使用cdc的时候应用了 debezium.snap.shot.locking.mode = none的参数。
但在使用streaming mode的时候遇到了困难,不知道应该如何配置可以支持在没有reload权限的时候使用flink cdc。目前使用的配置方法是 Properties properties = new Properties(); properties.setProperty("debezium.snapshot.locking.mode", "none"); SourceFunction sourceFunction = MySQLSource.builder() .hostname("localhost") .port(port) .databaseList("database") .tableList("database.test") .username(“user) .password("password") .debeziumProperties(properties) .deserializer(new StringDebeziumDeserializationSchema()) .build();
但.debeziumProperties(properties)好像并没有和sql模式一样生效。请问我应该怎么配置sourceFunction,或者Streaming模式有没有提供什么方法解决这个问题呢?*来自志愿者整理的flink邮件归档
参考回答:
试试这种写法,在datastreaming API中的,debezium的相关参数应该是不需要加debezium前缀的 public static Properties debeziumProperties(){ Properties properties = new Properties(); properties.setProperty(“xxxx”,”xxxx"); return properties; }
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/370027?spm=a2c6h.13148508.0.0.2f984f0eqWEHKL