Flink CDC全库订阅binlog你们怎么配置的,我们这好像有漏数据的情况?
对表结构有要求, 我这边试的结果是要满足这些条件
● 不使用 blob 字段
● 不用 char 类型字段做主键
● 建表须有主键
● 不要用 double 类型做主键 ,此回答整理自钉群“Flink CDC 社区”
Flink CDC 全库订阅binlog的配置可以参考以下步骤:
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server-id=1
BinlogSourceFunction<String> binlogSourceFunction = new BinlogSourceFunction<>(
"localhost", 3306, "root", "password", "test_db", "test_table");
binlogSourceFunction.setStartPosition(BinlogEntry.Position.START_EVENT_V4);
DataStream<String> binlogStream = env.addSource(binlogSourceFunction)
.map(new MyMapFunction());
如果您发现有漏数据的情况,可能是由于某些原因导致Flink无法正常连接到MySQL或无法读取到完整的binlog信息。您可以检查以下几点:
[mysqld]
gtid_mode=ON
enforce_gtid_consistency=ON
Flink CDC全库订阅binlog的配置可以参考以下步骤:
在项目的pom.xml文件中添加如下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.1.0</version>
</dependency>
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkCDCDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
}
}
```java
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.descriptors.*;
import org.apache.flink.table.sources.tsextractors.RowtimeAttributeDescriptor;
import org.apache.flink.types.Row;
public class FlinkCDCDemo {
public static void main(String[] args) throws Exception {
// ... 创建Flink SQL执行环境和表环境的代码 ...
// 注册MySQL源表
tableEnv.executeSql("""
CREATE TABLE my_mysql (
id INT,
name STRING,
age INT,
create_time TIMESTAMP(3),
update_time TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc', -- 使用MySQL CDC连接器
'hostname' = 'localhost', -- MySQL服务器地址
'port' = '3306', -- MySQL服务器端口
'username' = 'root', -- MySQL用户名
'password' = 'password', -- MySQL密码
'database-name' = 'test', -- 需要订阅的数据库名
'table-name' = 'my_table', -- 需要订阅的表名
'scan.startup.mode' = 'latest-offset', -- 从最新的位置开始消费数据,避免重复消费和丢失数据
'debezium-internal.offset-storage' = 'org.apache.kafka.connect.storage.FileOffsetBackingStore', -- 使用Kafka作为Offset存储后端,以便在发生故障时恢复消费位置
'debezium-internal.history' = 'io.debezium.relational.history.FileDatabaseHistory', -- 使用文件系统作为历史记录存储后端,以便在发生故障时恢复消费位置
'debezium-sqlserver-heartbeat-interval' = '60000', -- 设置心跳间隔,默认为60秒,可以根据实际需求调整
'debezium-sqlserver-paging-size' = '5000', -- 设置分页大小,默认为5000条记录,可以根据实际需求调整
'debezium-sqlserver-max-queued-messages' = '1000', -- 设置最大队列消息数,默认为1000条,可以根据实际需求调整
'debezium-sqlserver-snapshot-mode' = 'initial', -- 设置快照模式,默认为initial,可以根据实际需求调整
'format.type' = 'json', -- 设置数据格式类型,默认为json,可以根据实际需求调整
'debezium-sqlserver-include-schema-changes' = 'false', -- 是否包含模式更改事件,默认为false,可以根据实际需求调整
'debezium-sqlserver-exclude-tables' = 'dbo.__RefactorLog, dbo.__RefactorTable, dbo.__SysAgentEvents, dbo.__SysSchemas, dbo.__SysObjects, dbo.__Triggers, dbo.__Views, dbo.__XmlSchemaCollections, dbo.__XmlTableCollections, sysdiagrams, sysschobjs, syscomments, sysdepends, sysfiles, syscolpars, sysconstraints, sysstatgroups, sysstats, sysusers, sysallocunits, syslanguages, sysremotelogins, sysfts_configurations, sysfts_templates, sysfts_contents, syspartitionstats, sysfilegroups, sysindexes, syscolumns, sysdenypermissions, sysrolemembership, syslogins, sysprocedures, syssequenceobjects, sysremoteprocedures, systypes, sysuserdefinedtabletypes, sysuserdefinedaggregates, sysrulesets, sysforeignkeys, syssqlmodules, sysparamsets, sysxlogins, sysxmlextensions', -- 排除不需要订阅的表和视图,可以根据实际需求调整
) WITH (
'connector' = 'jdbc', -- 使用JDBC连接器,以便将数据转换为Flink支持的数据类型和格式
'url' = 'jdbc:mysql://localhost:3306/test?useSSL=false&serverTimezone=UTC', -- JDBC连接URL,根据实际情况修改数据库地址、端口和时区等参数
'table-name' = 'my_table', -- 需要订阅的表名,与上面的MySQL源表保持一致,以便正确映射到对应的源表和目标表
'username' = 'root', -- JDBC用户名,根据实际情况修改数据库用户名和密码等参数
'password' = 'password', -- JDBC密码,根据实际情况修改数据库用户名和密码等参数
'driver-name' = 'com.mysql.cj.jdbc.Driver', -- JDBC驱动名称,根据实际情况修改数据库驱动名称和版本等参数,
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。