问题一:flinkCDC程序初始化mysql没有完成就报错,有没有可能是版本的问题呢?
flinkCDC程序初始化mysql没有完成就报错,有没有可能是版本的问题呢,因为我第一次但并行度确实初始化完成了,后来多并行度不成功,再尝试单并行度也不再能成功?
参考回答:
建议排查下集群网络等问题,你这个mysql掉线
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/573673
问题二:Flink CDC中sqlserver mysql都用的胖包mysql报这个错?
Flink CDC中sqlserver mysql都用的胖包
sqlserver的成功了,mysql报这个错?Caused by: java.io.StreamCorruptedException: unexpected block data
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
参考回答:
这个问题可能是由于Flink CDC在处理MySQL数据时,遇到了与SQL Server不同的数据格式。为了解决这个问题,你可以尝试以下方法:
- 确保你的Flink CDC版本与MySQL和SQL Server的版本兼容。你可以查看官方文档或GitHub仓库中的已知问题列表,看看是否有关于这个问题的讨论。
- 检查你的MySQL和SQL Server的数据格式是否一致。如果它们的数据格式不同,可能会导致Flink CDC在处理数据时出现错误。
- 如果可能的话,尝试将MySQL和SQL Server的数据格式统一。这可能需要你修改数据库表结构或者使用其他工具来转换数据格式。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/573671
问题三:Flink CDC中mongocdc从timestamp启动报错Caused by: 遇到过吗?
Flink CDC中mongocdc从timestamp启动报错Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records。有大佬遇到过吗?群里之前有类似的问题,我试过了没有解决?环境为flink 1.15.2+mongocdc 3.0 snapshot
参考回答:
3.0还没发0.0.,你现在最多用2.4.2了,3.0改动比较大的,还没测能,
代码里这里抛出来的异常
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/573663
问题四:Flink CDC有时间帮忙看一下flink sql 多表left join 写入失败的问题嘛?
Flink CDC有时间帮忙看一下flink sql 多表left join 写入失败的问题嘛,总是报检查点写入失败?
参考回答:
初步怀疑是执行超时了,所需要的数据量太大,每打一个checkpoint一个就需要的时间很长,初步怀疑是执行超时了,所需要的数据量太大,每打一个checkpoint一个就需要的时间很长
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/573660
问题五:Flink CDC知道这个错误是什么原因吗?
Flink CDC知道这个错误是什么原因吗?我用flink 1.17.1 sql,flink mysql cdc 2.4.2读mysql数据到starrocks?
CREATE DATABASE IF NOT EXISTS default_catalog
.ride
;
CREATE TABLE IF NOT EXISTS default_catalog
.ride
.ride_src
(
id
BIGINT NOT NULL,
stat
BIGINT NULL,
...
PRIMARY KEY(id
)
NOT ENFORCED
) with (
'port' = '3306',
'username' = 'xxxxx',
'password' = 'xxxxx',
'database-name' = 'ride',
'table-name' = 'ride',
'connector' = 'mysql-cdc',
'hostname' = 'xxxxxxx',
'server-time-zone' = 'UTC'
);
CREATE TABLE IF NOT EXISTS default_catalog
.ride
.ride_sink
(
id
BIGINT NOT NULL,
stat
BIGINT NULL,
...
PRIMARY KEY(id
)
NOT ENFORCED
) with (
'sink.properties.column_separator' = '\x01',
'sink.buffer-flush.interval-ms' = '15000',
'sink.properties.strip_outer_array' = 'true',
'jdbc-url' = 'jdbc:mysql://flink002:9030',
'connector' = 'starrocks',
'database-name' = 'ride',
'table-name' = 'ride',
'load-url' = 'flink002:8030',
'sink.properties.format' = 'csv',
'username' = 'xxxxx',
'sink.max-retries' = '10',
'sink.properties.row_delimiter' = '\x02',
'password' = 'xxxxxx'
);
INSERT INTO default_catalog
.ride
.ride_sink
SELECT * FROM default_catalog
.ride
.ride_src
;
参考回答:
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/573655