1、Flink CDC
1.1、什么是 Flink CDC?
CDC(Change Data Capture)也就是变更数据采集的意思,我们之前学的 Maxwell 就是一种 CDC 工具。今天要学的 Flink CDC 则是 Flink 自己开发的一款与 Flink 自身无缝集成的 CDC 工具,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)。在同步过程中,还可以对数据进行一定的处理,例如过滤、关联、分组、统计等。
目前专业做数据库事件接受和解析的中间件是Debezium,如果是捕获Mysql,还有Canal。而Flink 正是集成了 Debezium 作为捕获数据变更的引擎,所以它可以充分发挥 Debezium 的能力。Flink 和 Debezium 最大的不同之处在于,Flink 并不强制依赖于 Kafka。
1.2、为什么要使用 Flink CDC?
之前我们监听 MySQL 的业务数据变更是通过 Maxwell 或者可以通过 Cannal ,它们都是通过监听 binlog 日志来实现数据同步的。
在实时数仓中,如果我们要使用上面的 Maxwell 或 Cannal 做业务数据同步,那必须通过下面这几步:
- mysql 开启 binlog
- Maxwell/Canal 同步 binlog 数据写入到 Kafka
- Flink 读取 Kakfa 中的 binlog 数据进行相关的业务处理
整体来看步骤很长,而且用到的组件也比较多,那能不能直接 Flink 一步到位,直接自己通过 binlog 来实现日志同步呢?所以 Flink CDC 这就出现了。
Flink CDC 数据格式
{ "before": { "id": "PF1784570096901248", "pay_order_no": null, "out_no": "J1784570080435328", "title": "充值办卡", "from_user_id": "PG11111", "from_account_id": "1286009802396288", "user_id": "BO1707796995184000", "account_id": "1707895210106496", "amount": 13400, "profit_state": 1, "profit_time": 1686758315000, "refund_state": 0, "refund_time": null, "add_time": 1686758315000, "remark": "充值办卡", "acct_circle": "PG11111", "user_type": 92, "from_user_type": 90, "company_id": "PG11111", "profit_mode": 1, "type": 2, "parent_id": null, "oc_profit_id": "1784570096901248", "keep_account_from_user_id": null, "keep_account_from_bm_user_id": null, "keep_account_user_id": null, "keep_account_bm_user_id": null, "biz_company_id": "PG11111" }, "after": { "id": "PF1784570096901248", "pay_order_no": null, "out_no": "J1784570080435328", "title": "充值办卡", "from_user_id": "PG11111", "from_account_id": "1286009802396288", "user_id": "BO1707796995184000", "account_id": "1707895210106496", "amount": 13400, "profit_state": 1, "profit_time": 1686758315000, "refund_state": 0, "refund_time": null, "add_time": 1686758315000, "remark": "充值办卡1", "acct_circle": "PG11111", "user_type": 92, "from_user_type": 90, "company_id": "PG11111", "profit_mode": 1, "type": 2, "parent_id": null, "oc_profit_id": "1784570096901248", "keep_account_from_user_id": null, "keep_account_from_bm_user_id": null, "keep_account_user_id": null, "keep_account_bm_user_id": null, "biz_company_id": "PG11111" }, "source": { "version": "1.6.4.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1686734882000, "snapshot": "false", "db": "cloud_test", "sequence": null, "table": "acct_profit", "server_id": 1, "gtid": null, "file": "mysql-bin.000514", "pos": 650576218, "row": 0, "thread": null, "query": null }, "op": "u", "ts_ms": 1686734882689, "transaction": null }
1.3、使用 Flink CDC
MySQL 首先得开启 binlog 功能,这里需要特别注意 MySQL 时区的设置(我们需要保证 Flink 的时间和数据库时间保持一致):
show variables like '%time_zone%'; # 如果失去是UTC,则需要时间校正 set time_zone='+8:00';
1.3.1、变更数据处理逻辑
import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; public class CustomSink extends RichSinkFunction<String> { // 自定义向下游写入 @Override public void invoke(String json, Context context) throws Exception { // 每次产生变更操作后对数据进行什么操作,比如写入到ES、Redis、MongoDB等 System.out.println(">>>" + json); } // 写入其它数据源需要创建连接 @Override public void open(Configuration parameters) throws Exception { } // 关闭外部数据库连接 @Override public void close() throws Exception { } }
1.3.2、构建 Flink CDC 监听程序
import com.ververica.cdc.connectors.mysql.MySqlSource; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class MySqlSourceExample { public static void main(String[] args) throws Exception { MySqlSource<String> source = MySqlSource.builder() .hostname("hadoop102") .port(3306) .databaseList("数据库名") .tableList("表名") // 多个表之间用逗号切割 .username("root") .password("123456") .deserializer(new JsonDebeziumDeserializationSchema()) // 反序列化器 .includeSchemaChanges(true) // 是否监听表结构的变化 .build(); // 启动本地 WEB-UI Configuration conf = new Configuration(); conf.setInteger(RestOptions.PORT,8081); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); // 检查点时间间隔 env.enableCheckpointing(5000); DataStreamSink<String> sink = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source") .addSink(new CustomSink()); env.execute(); } }