有flink cdc 取mysql数据直接到holo的方法么?
Flink Sql支持实施的数据同步CDC,MySql CDC、Postgress CDC、Hologress CDC。
flink cdc 取mysql数据直接到holo的方法直接可以通过Flink CDC特性同步到Hholo,即flink-cdc-connectors 组件可以实现直接读取各种数据。
Flink CDC 提供了整库同步特性, 通过 CDAS (Create Database AS) 语法配合 Catalog 实现。 https://developer.aliyun.com/article/1001096
MySQL的数据可以通过Flink CDC特性同步到Hologres,这个过程只是一个简单的同步,没有做任何数据的处理。
mysql配置 开启binglog vim /usr/local/etc/my.cnf 添加以下配置
[mysqld]
bind-address = 127.0.0.1 log-bin = mysql-bin binlog-format = ROW server_id = 1 重启mysql :service mysql.server restart
创建账号密码,可以修改密码复杂度 set global validate_password_policy=0 set validate_password_length=6 CREATE USER 'flinkuser'@'localhost' IDENTIFIED BY 'flinkpw';
授权 GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT > ON . TO 'flinkuser'@'localhost' IDENTIFIED BY 'flinkpw'
查看授权 show grants for 'flinkuser'@'localhost'
把配置刷新到文件 FLUSH PRIVILEGES
hadoop配置 配置环境变量 export HADOOP_CLASSPATH=hadoop classpath
启动集群 sbin/start-all.sh
进程列表
flink-1.13.5 % jps 57988 SecondaryNameNode 64612 YarnSessionClusterEntrypoint 57749 NameNode 58183 ResourceManager 58279 NodeManager 60969 SqlClient 64733 Jps 57855 DataNode flink 配置 添加依赖 flink-sql-connector-mysql-cdc-2.2.1.jar hudi-flink-bundle_2.11-0.10.0.jar
注:为了保证版本的兼容性,最好自己手动编译jar包
启动yarn-session集群(注:依赖于hadoop存储,所以只能以yarn模式启动集群) bin/yarn-session.sh -nm flink-session-cluster -d
启动flink client bin/sql-client.sh embedded -s yarn-session
创建mysql cdc源表 CREATE TABLE mysql_user ( id INT, name STRING, age INT, dt STRING, score DOUBLE, create_at STRING, update_at STRING, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'flinkuser', 'password' = 'flinkpw', 'database-name' = 'wlapp', 'table-name' = 'user' ); 创建hudi目标表 CREATE TABLE hudi_user( id INT, name STRING, age INT, dt STRING, score DOUBLE, create_at STRING, update_at STRING, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://localhost:9000/user/warehouse/wlapp.db/user', 'table.type' = 'COPY_ON_WRITE', 'write.insert.drop.duplicates' = 'true' ); 执行etl INSERT INTO hudi_user SELECT * FROM mysql_user; 程序运行之后在mysql客户端查看下binlog状态 show master status
CDC是Change Data Capture(变更数据获取)的简称。核心思想是监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。常见的CDC组件有基于查询的Sqoop、Kafka JDBC Source,基于Binlog的Canal、Maxwell、Debezium等。
Flink CDC是Flink社区开发的flink-cdc-connectors 组件,可以直接从 MySQL、Oracle、PostgreSQL等数据库直接读取全量数据和增量变更数据。
Flink在1.11之后就已经支持从MySQL增量读取Binlog日志的方式。
基于查询的,客户端会通过SQL方式查询源库表变更数据,然后对外发送。
基于日志的,这也是业界广泛使用的一种方式,一般是通过binlog方式,变更的记录会写入binlog,解析binlog后会写入消息系统,或直接基于Flink CDC进行处理。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。