问题一:flink 1.11 cdc相关问题
小白通过debezium将pgsql cdc数据同步至kafka之后,使用我们flink的sql client提交测试任务,但当kafka端cdc json数据一开始发送,任务即报错,通过web ui log查看界面发现错误日志如下,还请大佬帮忙分析,谢谢!CREATE TABLE pgsql_person_cdc(
id BIGINT,
name STRING,
age STRING,
sex STRING,
phone STRING
) WITH (
'connector' = 'kafka',
'topic' = 'postgres.public.person',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'pgsql_cdc',
'format' = 'debezium-json',
'debezium-json.schema-include' = 'true'
)
CREATE TABLE sql_out (
id BIGINT,
name STRING,
age STRING,
sex STRING,
phone STRING
) WITH (
'connector' = 'print'
)
INSERT INTO sql_out SELECT * FROM pgsql_person_cdc;
====================================分割线======================================
错误日志:
java.io.IOException: Corrupt Debezium JSON message '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"postgres.public.person.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"liushimin","age":"24","sex":"man","phone":"155555555"},"source":{"version":"1.2.0.Final","connector":"postgresql","name":"postgres","ts_ms":1595409754151,"snapshot":"false","db":"postgres","schema":"public","table":"person","txId":569,"lsn":23632344,"xmin":null},"op":"u","ts_ms":1595409754270,"transaction":null}}'.
at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136) ~[flink-json-1.11.0.jar:1.11.0]
at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[flink-connector-kafka-base_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[flink-connector-kafka_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[flink-connector-kafka_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[flink-connector-kafka-base_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
Caused by: java.lang.NullPointerException
at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:120) ~[flink-json-1.11.0.jar:1.11.0]
... 7 more
2020-07-22 17:22:34,415 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: TableSourceScan(table=[[default_catalog, default_database, pgsql_person_cdc]], fields=[id, name, age, sex, phone]) -> Sink: Sink(table=[default_catalog.default_database.sql_out], fields=[id, name, age, sex, phone]) (1/1) (b553cb66df6e47a27e7dae8466b684ab).
2020-07-22 17:22:34,418 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: TableSourceScan(table=[[default_catalog, default_database, pgsql_person_cdc]], fields=[id, name, age, sex, phone]) -> Sink: Sink(table=[default_catalog.default_database.sql_out], fields=[id, name, age, sex, phone]) (1/1) b553cb66df6e47a27e7dae8466b684ab.
2020-07-22 17:22:34,461 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=384.000mb (402653174 bytes), taskOffHeapMemory=0 bytes, managedMemory=512.000mb (536870920 bytes), networkMemory=128.000mb (134217730 bytes)}, allocationId: 495bb5a0cd877808674b29890b6b8bc0, jobId: 3feda3a191fcb8e0da891b9fda1ee532).
2020-07-22 17:22:34,462 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job 3feda3a191fcb8e0da891b9fda1ee532 from job leader monitoring.
2020-07-22 17:22:34,462 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close JobManager connection for job 3feda3a191fcb8e0da891b9fda1ee532.
====================================分割线======================================*来自志愿者整理的flink邮件归档
参考答案:
代码在为before这条数据设置rowKind时抛了一个NPE,before正常应该是不为null的。 看起来是你的数据问题,一条 update 的changelog, before 为null, 这是不合理的,没有before的数据,是无法处理after的数据的。 如果确认是脏数据,可以开启ignore-parse-errors跳过[1]
祝好 Leonard [1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
{ "payload": { "before": null, "after": { "id": 2, "name": "liushimin", "age": "24", "sex": "man", "phone": "155555555" }, "source": { "version": "1.2.0.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1595409754151, "snapshot": "false", "db": "postgres", "schema": "public", "table": "person", "txId": 569, "lsn": 23632344, "xmin": null }, "op": "u", "ts_ms": 1595409754270, "transaction": null } }*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371502?spm=a2c6h.12873639.article-detail.27.29d04378ApxdqJ
问题二:flink 1.11 ddl 写mysql的问题
我使用fink 1.11.1 做cdc,发现一秒钟只能写100条左右数据到mysql,请问有优化方案,或者是其他的批量写入的方案建议嘛 代码如下: String sourceDdl =" CREATE TABLE debezium_source " + "( " + "id STRING NOT NULL, name STRING, description STRING, weight Double" + ") " + "WITH (" + " 'connector' = 'kafka-0.11'," + " 'topic' = 'test0717'," + " 'properties.bootstrap.servers' = ' 172.22.20.206:9092', " + "'scan.startup.mode' = 'group-offsets','properties.group.id'='test'," + "'format' = 'debezium-json'," + "'debezium-json.schema-include'='false'," + "'debezium-json.ignore-parse-errors'='true')"; tEnv.executeSql(sourceDdl); System.out.println("init source ddl successful ==>" + sourceDdl); String sinkDdl = " CREATE TABLE sink " + "( " + "id STRING NOT NULL," + " name STRING, " + "description STRING," + " weight Double," + " PRIMARY KEY (id) NOT ENFORCED " + ")" + " WITH " + "( " + "'connector' = 'jdbc', " + "'url' = 'jdbc:mysql://127.0.0.1:3306/test?autoReconnect=true', " + "'table-name' = 'table-out', " + "'driver'= 'com.mysql.cj.jdbc.Driver'," + "'sink.buffer-flush.interval'='1s'," + "'sink.buffer-flush.max-rows'='1000'," + "'username'='DataPip', " + "'password'='DataPip')"; tEnv.executeSql(sinkDdl); System.out.println("init sink ddl successful ==>" + sinkDdl);
String dml = "INSERT INTO sink SELECT id,name ,description, weight FROM debezium_source"; System.out.println("execute dml ==>" + dml); tEnv.executeSql(dml); tEnv.executeSql("CREATE TABLE print_table WITH ('connector' = 'print')" + "LIKE debezium_source (EXCLUDING ALL)"); tEnv.executeSql("INSERT INTO print_table SELECT id,name ,description, weight FROM debezium_source");
*来自志愿者整理的flink邮件归档
参考答案:
你观察到有sink写不过来导致反压吗? 或者你调大flush interval试试,让每个buffer攒更多的数据*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371500?spm=a2c6h.12873639.article-detail.28.29d04378ApxdqJ
问题三:flink row 类型
我这面定义row数据,类型为ROW<rule_key STRING>,可以通过
row.getField(i)获取到对应的值,但是我想获取对应的row_name名称要怎么操作,貌似没有报漏获取名称的接口
rule_key 转换为rule_key1,rulekey2
*来自志愿者整理的flink邮件归档
参考答案:
这个应该是做不到的。name只是SQL plan过程的东西,在运行时它就没有什么实际意义了。
你是想在udf里面获取row里面每个字段的名字是吧?如果是的话,我理解现在应该是做不到的。
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370331?spm=a2c6h.12873639.article-detail.29.29d04378ApxdqJ
问题四:flink sql 1.11 kafka source with子句使用新参数,下游消费不到数据
根据文档https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#start-reading-position, 使用新参数创建kafka_table,下游消费不到数据,使用老参数下游可以消费到数据,是不是新参数的方式有坑啊
老参数: streamTableEnv.executeSql( """ | |CREATE TABLE kafka_table ( | uid BIGINT, | sex VARCHAR, | age INT, | created_time TIMESTAMP(3), | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND |) WITH ( | | 'connector.type' = 'kafka', | 'connector.version' = 'universal', | 'connector.topic' = 'user', | 'connector.startup-mode' = 'latest-offset', | 'connector.properties.zookeeper.connect' = 'cdh1:2181,cdh2:2181,cdh3:2181', | 'connector.properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092', | 'connector.properties.group.id' = 'user_flink', | 'format.type' = 'json', | 'format.derive-schema' = 'true' | |) |""".stripMargin)
新参数:
streamTableEnv.executeSql( """ | |CREATE TABLE kafka_table ( | | uid BIGINT, | sex VARCHAR, | age INT, | created_time TIMESTAMP(3), | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND |) WITH ( | 'connector' = 'kafka', | 'topic' = 'user', | 'properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092', | 'properties.group.id' = 'user_flink', | 'scan.startup.mode' = 'latest-offset', | 'format' = 'json', | 'json.fail-on-missing-field' = 'false', | 'json.ignore-parse-errors' = 'true' |) |""".stripMargin)
*来自志愿者整理的flink邮件归档
参考答案:
你说的下游消费不到数据,这个下游是指当前作业消费不到数据吗?
正常应该不会的,可以提供个可复现代码吗?
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370330?spm=a2c6h.12873639.article-detail.30.29d04378ApxdqJ
问题五:flink1.11 tablefunction
我这面在定义一个表函数,通过继承TableFunction完成操作,但是eval方法中的参数类型都是java基本类型(至少看到的demo都是如此),想问一下eval方法中可以传flink
内部类型吗,比如说我想在eval()方法中传递Row类型要怎么操作,eval(Row row)
*来自志愿者整理的flink邮件归档
参考答案:
可以,定义清楚 getResultType 和 getParameterTypes, 可以参考[1]
[1]
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370327?spm=a2c6h.12873639.article-detail.31.29d04378ApxdqJ