构建高效的实时数据管道:Flink、Kafka、CnosDB 的完美结合

简介: 本文介绍了如何利用 Apache Flink、Kafka 和 CnosDB 构建高效的实时数据处理管道。Flink 提供低延迟、高吞吐的流处理能力,Kafka 作为可靠的分布式消息队列负责数据传输,CnosDB 则专为时间序列数据设计,适合存储传感器、日志等实时数据。通过三者结合,企业可实现从数据采集、实时处理到持久化存储的完整解决方案,广泛适用于物联网、实时监控和数据分析等场景。

在当今的数据技术生态系统中,实时数据处理已成为许多企业不可或缺的一部分。为了满足这一需求,Apache Flink、Apache Kafka、CnosDB等开源工具的组合应运而生,使得实时数据流的采集、处理和存储更加高效可靠。本文将介绍如何使用 Flink、Kafka 和 CnosDB 构建健壮的实时数据处理管道。

Flink、Kafka 和 CnosDB

  • Flink:强大的流处理引擎,支持事件驱动、分布式、容错处理。Flink 可以处理高吞吐量和低延迟的实时数据流,使其适用于数据分析、实时报告和推荐系统等各种用例。
  • Kafka:一个高吞吐量的分布式流数据平台,用于收集、存储和传输实时数据流。Kafka 具有强大的耐用性、可扩展性和容错能力,适合为实时数据流构建可靠的管道。
  • CnosDB:一个专为时间序列数据设计的开源时间序列数据库。它具有高性能、高可用性和易用性,使其成为存储实时生成的时间序列数据(例如传感器数据、日志和监控数据)的绝佳选择。

使用案例场景

在此用例中,我们假设有一个物联网 (IoT) 设备网络,其中每个设备定期生成传感器数据,包括温度、湿度和压力等。我们希望实时收集、处理和存储这些数据,以进行实时监控和分析。

数据流架构图如下:

  • 首先,我们需要设置一个数据收集器来检索传感器数据并将其发送到 Kafka 主题。这可以通过编写一个生产者应用程序来实现,该应用程序将生成的传感器数据发送到 Kafka。
  • 使用Flink实时处理传感器数据。首先,您需要编写一个 Flink 应用程序来订阅 Kafka 主题中的数据流并对数据进行实时处理和转换。例如,您可以计算平均温度、最大湿度等。
  • 将处理后的数据存储到CnosDB中以供后续查询。为了完成此步骤,需要配置一个 CnosDB Sink,以便 Flink 应用程序可以将处理后的数据写入 CnosDB。

建设管道

1.数据采集与传输

编写一个生产者应用程序来读取传感器数据并将其发送到 Kafka 主题。

public class SensorDataProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        while (true) {
            SensorData data = generateSensorData(); // Generate Monitor data
            producer.send(new ProducerRecord<>("sensor-data-topic", data));
            Thread.sleep(1000); // Send data every second
        }
    }
}

2.时间处理与转换

编写一个Flink App,订阅Kafka主题中的数据流,进行实时处理,并对数据进行转换。

// Flink App Example
public class SensorDataProcessingJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092");
        props.setProperty("group.id", "sensor-data-consumer-group");
        DataStream<String> sensorData = env.addSource(new FlinkKafkaConsumer<>("sensor-data-topic", new SimpleStringSchema(), props));
        DataStream<ProcessedData> processedData = sensorData
            .map(json -> parseJson(json)) // Parse JSON data
            .keyBy(ProcessedData::getDeviceId)
            .window(TumblingEventTimeWindows.of(Time.seconds(10))) // sliding window of 10-second 
            .apply(new SensorDataProcessor()); // Self define the processing logic
        processedData.print(); // print processed data, which can be replaced by writing into CnosDB
        env.execute("SensorDataProcessingJob");
    }
}

3.写入和存储数据

配置 CnosDB Sink 以将“processedData.print()”替换为写入 CnosDB 的程序,在 CnosDB 中创建一个数据保留期为 30 天的数据库:

CnosDB创建数据库的语法请参考:创建数据库

https://docs.cnosdb.com/zh/latest/reference/sql.html

CREATE DATABASE IF NOT EXISTS "db_flink_test" WITH TTL '30d' SHARD 2 VNODE_DURATION '1d' REPLICA 2;

在Maven [https://maven.apache.org/] 中使用CnosBD Sink

[https://docs.cnosdb.com/zh/latest/reference/connector/flink-connector-cnosdb.html] 包:

<dependency>
    <groupId>com.cnosdb</groupId>
    <artifactId>flink-connector-cnosdb</artifactId>
    <version>1.0</version>
</dependency>

编译应用:

public class WriteToCnosDBJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092");
        props.setProperty("group.id", "sensor-data-consumer-group");
        DataStream<String> sensorData = env.addSource(new FlinkKafkaConsumer<>("sensor-data-topic", new SimpleStringSchema(), props));
        DataStream<ProcessedData> processedData = sensorData
            .map((MapFunction<String, ProcessedData>) json -> parseJson(json)) // Parse JSON data
            .keyBy(ProcessedData::getDeviceId)
            .window(TumblingEventTimeWindows.of(Time.seconds(10))) // sliding window of 10-second 
            .apply(new SensorDataProcessor()); // Self define the processing logic
        DataStream<CnosDBPoint> cnosDBDataStream = processedData.map(
                new RichMapFunction<ProcessedData, CnosDBPoint>() {
                    @Override
                    public CnosDBPoint map(String s) throws Exception {
                        return new CnosDBPoint("sensor_metric")
                                .time(value.getTimestamp().toEpochMilli(), TimeUnit.MILLISECONDS)
                                .tag("device_id", value.getDeviceId())
                                .field("average_temperature", value.getAverageTemperature())
                                .field("max_humidity", value.getMaxHumidity());
                    }
                }
        );
        CnosDBConfig cnosDBConfig = CnosDBConfig.builder()
                .url("http://localhost:8902")
                .database("db_flink_test")
                .username("root")
                .password("")
                .build();
        cnosDBDataStream.addSink(new CnosDBSink(cnosDBConfig));
        env.execute("WriteToCnosDBJob");
    }
}

运行看看结果:

概括

通过结合 Flink、Kafka 和 CnosDB,您可以构建强大的实时数据处理管道,涵盖从数据收集到实时处理,最终到数据存储和可视化。每个步骤都涉及具体的配置和代码实现,确保您熟悉每个工具的功能和操作。该架构适用于物联网监控、实时报表、仪表板等各种实时数据应用。根据您的需求和环境,调整配置和代码以创建适合您业务的实时数据处理解决方案。

相关文章
|
4月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
793 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
|
9月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
634 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
9月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
284 11
|
11月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
1161 0
|
8月前
|
消息中间件 SQL 关系型数据库
Flink CDC + Kafka 加速业务实时化
Flink CDC 是一种支持流批一体的分布式数据集成工具,通过 YAML 配置实现数据传输过程中的路由与转换操作。它已从单一数据源的 CDC 数据流发展为完整的数据同步解决方案,支持 MySQL、Kafka 等多种数据源和目标端(如 Delta Lake、Iceberg)。其核心功能包括多样化数据输入链路、Schema Evolution、Transform 和 Routing 模块,以及丰富的监控指标。相比传统 SQL 和 DataStream 作业,Flink CDC 提供更灵活的 Schema 变更控制和原始 binlog 同步能力。
|
12月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
954 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
|
12月前
|
SQL 存储 HIVE
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
本文整理自鹰角网络大数据开发工程师朱正军在Flink Forward Asia 2024上的分享,主要涵盖四个方面:鹰角数据平台架构、数据湖选型、湖仓一体建设及未来展望。文章详细介绍了鹰角如何构建基于Paimon的数据湖,解决了Hudi入湖的痛点,并通过Trino引擎和Ranger权限管理实现高效的数据查询与管控。此外,还探讨了湖仓一体平台的落地效果及未来技术发展方向,包括Trino与Paimon的集成增强、StarRocks的应用以及Paimon全面替换Hive的计划。
1532 1
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
|
消息中间件 Kafka 流计算
docker环境安装kafka/Flink/clickhouse镜像
通过上述步骤和示例,您可以系统地了解如何使用Docker Compose安装和配置Kafka、Flink和ClickHouse,并进行基本的验证操作。希望这些内容对您的学习和工作有所帮助。
1286 28
|
11月前
|
SQL 存储 API
Flink Materialized Table:构建流批一体 ETL
Flink Materialized Table:构建流批一体 ETL
234 3
|
11月前
|
SQL 存储 HIVE
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
814 2

热门文章

最新文章