flinkcdc该如何只读取前一天的增量数据呀,已定时任务的形式,让他读完就停止怎么办?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
要实现 Flink CDC 只读取前一天的增量数据,并以定时任务的形式运行,读完后自动停止,可以按照以下步骤进行配置和操作:
Flink CDC 默认会从 MySQL 的 Binlog 中读取所有增量数据。如果需要只读取前一天的数据,可以通过以下方式实现: - 时间范围过滤:在 Flink SQL 或任务配置中,通过时间字段(如 created_at
)对数据进行过滤。 - 任务控制:通过定时任务调度工具(如 Airflow、Cron 等)启动 Flink 作业,并在作业完成后自动停止。
在 Flink CDC 的配置中,可以通过 SQL 或 YAML 文件指定数据源和过滤条件。
假设你的表中有时间字段 created_at
,可以通过以下 SQL 过滤前一天的数据:
CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`orders_src` (
`order_id` INT NOT NULL,
`order_revenue` FLOAT NOT NULL,
`order_region` STRING NOT NULL,
`customer_id` INT NOT NULL,
`created_at` TIMESTAMP(3) NOT NULL, -- 假设这是时间字段
PRIMARY KEY(`order_id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com',
'port' = '3306',
'username' = 'emr_test',
'password' = 'Yz12****',
'database-name' = 'flink_cdc',
'table-name' = 'orders',
'scan.startup.mode' = 'timestamp', -- 指定从某个时间点开始读取
'scan.startup.timestamp-millis' = '${start_time}', -- 替换为前一天的起始时间戳
'scan.incremental.snapshot.enabled' = 'true'
);
-- 过滤前一天的数据
SELECT *
FROM `default_catalog`.`flink_cdc`.`orders_src`
WHERE `created_at` >= TO_TIMESTAMP('${start_time}')
AND `created_at` < TO_TIMESTAMP('${end_time}');
scan.startup.mode
:设置为 timestamp
,表示从指定的时间戳开始读取。scan.startup.timestamp-millis
:指定起始时间戳(毫秒),可以通过脚本动态生成前一天的起始时间。TO_TIMESTAMP
:用于将字符串时间转换为时间戳格式。如果使用 YAML 文件配置 Flink CDC,可以类似地指定时间范围:
source:
type: mysql
hostname: rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com
port: 3306
username: workshop
password: workshop#2017
tables: flink_cdc.orders
server-id: 5400-5404
scan.startup.mode: timestamp
scan.startup.timestamp-millis: ${start_time} # 动态替换为前一天的起始时间戳
sink:
type: maxcompute
name: MaxComputeSink
accessId: xxxxxxxxxxxxxxxxxxxxxx
accessKey: xxxxxxxxxxxxxxxxxxxxxxx
endpoint: http://xxx.xxx.xxx.xxx:8008
tunnelEndpoint: http://xxx.xxx.xxx.xxx:8009
project: delta_compute_yunqi
bucketSize: 8
pipeline:
parallelism: 4
为了动态生成前一天的时间范围,可以使用 Shell 脚本或 Python 脚本计算时间戳,并将其传递给 Flink 作业。
#!/bin/bash
# 获取前一天的起始时间和结束时间
START_TIME=$(date -d "yesterday 00:00:00" +%s%3N)
END_TIME=$(date -d "today 00:00:00" +%s%3N)
# 启动 Flink 作业
flink run -c com.example.FlinkCDCJob \
-Dscan.startup.timestamp-millis=$START_TIME \
-Dend_time=$END_TIME \
flink-cdc-job.jar
date -d "yesterday 00:00:00"
:获取前一天的零点时间。%s%3N
:生成毫秒级时间戳。为了实现定时任务并让 Flink 作业在读取完数据后自动停止,可以结合以下方法:
Flink CDC 支持有限模式(Bounded Mode),可以在读取完指定范围的数据后自动停止任务。在 SQL 或 YAML 配置中启用该模式:
SET 'execution.runtime-mode' = 'batch'; -- 设置为批处理模式
使用外部调度工具(如 Airflow 或 Cron)启动 Flink 作业,并在作业完成后自动退出。例如:
# 使用 Cron 定时执行脚本
0 0 * * * /path/to/run_flink_cdc.sh
created_at
),并且该字段能够准确反映数据的生成时间。parallelism
)以提高性能。通过以上步骤,您可以实现 Flink CDC 只读取前一天的增量数据,并以定时任务的形式运行,读取完成后自动停止。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。