Flink 数据源是kafka , 一条流数据是设备的作业指令进度数据(多次更新),一条流是设备的状态(多次更新),有没有比较好的方法将两个流进行关联(以设备号关联),获取实时的指令进度信息和对应设备最新状态?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
要实现将 Kafka 数据源中的两条流(设备作业指令进度数据和设备状态数据)进行关联,并以设备号为关联键获取实时的指令进度信息和对应设备的最新状态,可以使用 Flink 的 双流 Join 功能。以下是详细的解决方案和步骤:
首先,需要在 Flink 中定义两个 Kafka 数据源表,分别表示设备作业指令进度数据和设备状态数据。
CREATE TABLE instruction_progress (
device_id STRING, -- 设备号
progress INT, -- 指令进度
event_time TIMESTAMP(3), -- 事件时间
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND -- 定义水印
) WITH (
'connector' = 'kafka',
'topic' = 'instruction_topic', -- Kafka Topic 名称
'properties.bootstrap.servers' = 'your_kafka_broker', -- Kafka Broker 地址
'format' = 'json', -- 数据格式
'scan.startup.mode' = 'earliest-offset' -- 从最早的数据开始消费
);
CREATE TABLE device_status (
device_id STRING, -- 设备号
status STRING, -- 设备状态
event_time TIMESTAMP(3), -- 事件时间
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND -- 定义水印
) WITH (
'connector' = 'kafka',
'topic' = 'status_topic', -- Kafka Topic 名称
'properties.bootstrap.servers' = 'your_kafka_broker', -- Kafka Broker 地址
'format' = 'json', -- 数据格式
'scan.startup.mode' = 'earliest-offset' -- 从最早的数据开始消费
);
Flink 支持基于时间窗口的双流 Join,可以通过 device_id
关联两条流,并结合时间窗口来确保数据的时效性。
以下是一个基于时间窗口的双流 Join 示例:
SELECT
ip.device_id, -- 设备号
ip.progress, -- 指令进度
ds.status -- 设备状态
FROM
instruction_progress AS ip
JOIN
device_status AS ds
ON
ip.device_id = ds.device_id -- 以设备号为关联键
AND
ip.event_time BETWEEN ds.event_time - INTERVAL '10' SECOND AND ds.event_time + INTERVAL '10' SECOND
-- 时间窗口范围:允许两条流的时间戳相差不超过 10 秒
ip.device_id = ds.device_id
:以设备号作为关联条件。BETWEEN ... AND ...
:定义时间窗口范围,确保两条流的数据在一定时间范围内匹配。WATERMARK
:通过水印机制处理乱序数据,确保延迟数据不会影响结果。如果需要获取设备的最新状态,可以使用 Flink 的 ROW_NUMBER()
窗口函数 对设备状态数据进行排序,保留每个设备的最新状态。
WITH latest_status AS (
SELECT
device_id,
status,
event_time,
ROW_NUMBER() OVER (PARTITION BY device_id ORDER BY event_time DESC) AS row_num
FROM
device_status
)
SELECT
ip.device_id,
ip.progress,
ls.status
FROM
instruction_progress AS ip
JOIN
latest_status AS ls
ON
ip.device_id = ls.device_id
WHERE
ls.row_num = 1; -- 仅保留每个设备的最新状态
ROW_NUMBER()
:对每个设备的状态按时间戳降序排序,生成行号。row_num = 1
:筛选出每个设备的最新状态记录。将关联后的结果写入目标端(如 Kafka 或其他存储系统)。以下是一个示例,将结果写入 Kafka:
CREATE TABLE result_sink (
device_id STRING,
progress INT,
status STRING,
PRIMARY KEY(device_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'result_topic', -- 输出的 Kafka Topic 名称
'properties.bootstrap.servers' = 'your_kafka_broker', -- Kafka Broker 地址
'format' = 'json' -- 数据格式
);
INSERT INTO result_sink
SELECT
ip.device_id,
ip.progress,
ls.status
FROM
instruction_progress AS ip
JOIN
latest_status AS ls
ON
ip.device_id = ls.device_id
WHERE
ls.row_num = 1;
通过上述方法,您可以高效地将设备作业指令进度数据和设备状态数据进行关联,并实时获取每台设备的最新状态和指令进度信息。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。