开发者社区> 问答> 正文

flink 1.11 cdc: 如何将DataStream<RowData> 要如何转

目前有两个DataStream 的流,通过mapfunction, 转成DataStream 流,请问DataStream 怎么转成table,并使用flink sql进行操作。 (注:因为mapfunction对流做了些顺序的限制,目前无法无法直接用flink sql cdc直接定义表!!!)

目前我的做法会报错:

StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);

DataStreamSource json1 // canal json的格式 DataStreamSource json2 // canal json的格式 ConnectedStreams<String, String> connect= caliber_cdc_json.connect(caliber_snapshot_json); //connect DataStream snapshot_cdc_stream = connect.flatMap( new SnapshotCdcCoRichFlatMapFunction() ); //做连接

//3, 注册表,将表数据,直接输出 Table snapshot_cdc_table = fsTableEnv.fromDataStream(snapshot_cdc_stream); fsTableEnv.createTemporaryView("test", snapshot_cdc_table);

String output = "CREATE TABLE test_mirror (\n" + "id INT,\n" + "name VARCHAR(255),\n" + "time TIMESTAMP(3),\n" + "PRIMARY KEY(id) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'print'\n" + ")";

//4, app logic String op = "INSERT into test_mirror SELECT * from test"; fsTableEnv.executeSql(output); fsTableEnv.executeSql(op);

但提交任务失败,错误信息: serializationSchema:root |-- id: INT NOT NULL |-- name: VARCHAR(255) |-- time: TIMESTAMP(3) |-- status: INT |-- CONSTRAINT PK_3386 PRIMARY KEY (id)

snapshot_cdc_table:UnnamedTable$0 +----------------+ | table name | +----------------+ | UnnamedTable$0 | | test | | test_mirror | +----------------+ 3 rows in set


The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: A raw type backed by type information has no serializable string representation. It needs to be resolved into a proper raw type. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at org.apache.flink.client.cli.CliFrontend$$Lambda$58/1706292388.call(Unknown Source) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) Caused by: org.apache.flink.table.api.TableException: A raw type backed by type information has no serializable string representation. It needs to be resolved into a proper raw type. at org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101) at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92) at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:92) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204) at scala.Option.map(Option.scala:146) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690) at com.qqmusic.quku.demo_app.StreamTableSql.main(StreamTableSql.java:126) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ... 9 more

请问是啥原因?需要怎么做?*来自志愿者整理的flink邮件归档

展开
收起
小阿怪 2021-12-07 10:54:53 3039 0
1 条回答
写回答
取消 提交回答
    1. 目前不支持注册一个 RowData 类型的 DataStream,因为 RowData 会被识别成 非结构化类型。
    2. 目前不支持注册一个 cdc stream,也就是说 DataStream -> Table 只支持 insert-only stream,无法识别 cdc 流。这个功能规划在了1.13 版本中。

    对于你的场景,有以下几种解决办法: 1. 如果你的流中只有插入和更新,没有删除。那么用 DataStream 先注册成一个 insert-only 的 Table,然后用 Flink SQL 的去重语法 [1] 保留 pk 下的最后一条记录即可。 2. 如果你的流中有删除,那么....你得自己开发一个 sql connector,把 cdc 抓取以及“mapfunction对流做了些顺序的限制”的逻辑实现在你的 source 中。*来自志愿者整理的flink邮件归档

    2021-12-07 11:27:02
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载