在当今的数据技术生态系统中,实时数据处理已成为许多企业不可或缺的一部分。为了满足这一需求,Apache Flink、Apache Kafka、CnosDB等开源工具的组合应运而生,使得实时数据流的采集、处理和存储更加高效可靠。本文将介绍如何使用 Flink、Kafka 和 CnosDB 构建健壮的实时数据处理管道。
Flink、Kafka 和 CnosDB
- Flink:强大的流处理引擎,支持事件驱动、分布式、容错处理。Flink 可以处理高吞吐量和低延迟的实时数据流,使其适用于数据分析、实时报告和推荐系统等各种用例。
- Kafka:一个高吞吐量的分布式流数据平台,用于收集、存储和传输实时数据流。Kafka 具有强大的耐用性、可扩展性和容错能力,适合为实时数据流构建可靠的管道。
- CnosDB:一个专为时间序列数据设计的开源时间序列数据库。它具有高性能、高可用性和易用性,使其成为存储实时生成的时间序列数据(例如传感器数据、日志和监控数据)的绝佳选择。
使用案例场景
在此用例中,我们假设有一个物联网 (IoT) 设备网络,其中每个设备定期生成传感器数据,包括温度、湿度和压力等。我们希望实时收集、处理和存储这些数据,以进行实时监控和分析。
数据流架构图如下:
- 首先,我们需要设置一个数据收集器来检索传感器数据并将其发送到 Kafka 主题。这可以通过编写一个生产者应用程序来实现,该应用程序将生成的传感器数据发送到 Kafka。
- 使用Flink实时处理传感器数据。首先,您需要编写一个 Flink 应用程序来订阅 Kafka 主题中的数据流并对数据进行实时处理和转换。例如,您可以计算平均温度、最大湿度等。
- 将处理后的数据存储到CnosDB中以供后续查询。为了完成此步骤,需要配置一个 CnosDB Sink,以便 Flink 应用程序可以将处理后的数据写入 CnosDB。
建设管道
1.数据采集与传输
编写一个生产者应用程序来读取传感器数据并将其发送到 Kafka 主题。
public class SensorDataProducer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); while (true) { SensorData data = generateSensorData(); // Generate Monitor data producer.send(new ProducerRecord<>("sensor-data-topic", data)); Thread.sleep(1000); // Send data every second } } }
2.时间处理与转换
编写一个Flink App,订阅Kafka主题中的数据流,进行实时处理,并对数据进行转换。
// Flink App Example public class SensorDataProcessingJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties props = new Properties(); props.setProperty("bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092"); props.setProperty("group.id", "sensor-data-consumer-group"); DataStream<String> sensorData = env.addSource(new FlinkKafkaConsumer<>("sensor-data-topic", new SimpleStringSchema(), props)); DataStream<ProcessedData> processedData = sensorData .map(json -> parseJson(json)) // Parse JSON data .keyBy(ProcessedData::getDeviceId) .window(TumblingEventTimeWindows.of(Time.seconds(10))) // sliding window of 10-second .apply(new SensorDataProcessor()); // Self define the processing logic processedData.print(); // print processed data, which can be replaced by writing into CnosDB env.execute("SensorDataProcessingJob"); } }
3.写入和存储数据
配置 CnosDB Sink 以将“processedData.print()”替换为写入 CnosDB 的程序,在 CnosDB 中创建一个数据保留期为 30 天的数据库:
CnosDB创建数据库的语法请参考:创建数据库
https://docs.cnosdb.com/zh/latest/reference/sql.html
CREATE DATABASE IF NOT EXISTS "db_flink_test" WITH TTL '30d' SHARD 2 VNODE_DURATION '1d' REPLICA 2;
在Maven [https://maven.apache.org/] 中使用CnosBD Sink
[https://docs.cnosdb.com/zh/latest/reference/connector/flink-connector-cnosdb.html] 包:
<dependency> <groupId>com.cnosdb</groupId> <artifactId>flink-connector-cnosdb</artifactId> <version>1.0</version> </dependency>
编译应用:
public class WriteToCnosDBJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties props = new Properties(); props.setProperty("bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092"); props.setProperty("group.id", "sensor-data-consumer-group"); DataStream<String> sensorData = env.addSource(new FlinkKafkaConsumer<>("sensor-data-topic", new SimpleStringSchema(), props)); DataStream<ProcessedData> processedData = sensorData .map((MapFunction<String, ProcessedData>) json -> parseJson(json)) // Parse JSON data .keyBy(ProcessedData::getDeviceId) .window(TumblingEventTimeWindows.of(Time.seconds(10))) // sliding window of 10-second .apply(new SensorDataProcessor()); // Self define the processing logic DataStream<CnosDBPoint> cnosDBDataStream = processedData.map( new RichMapFunction<ProcessedData, CnosDBPoint>() { @Override public CnosDBPoint map(String s) throws Exception { return new CnosDBPoint("sensor_metric") .time(value.getTimestamp().toEpochMilli(), TimeUnit.MILLISECONDS) .tag("device_id", value.getDeviceId()) .field("average_temperature", value.getAverageTemperature()) .field("max_humidity", value.getMaxHumidity()); } } ); CnosDBConfig cnosDBConfig = CnosDBConfig.builder() .url("http://localhost:8902") .database("db_flink_test") .username("root") .password("") .build(); cnosDBDataStream.addSink(new CnosDBSink(cnosDBConfig)); env.execute("WriteToCnosDBJob"); } }
运行看看结果:
概括
通过结合 Flink、Kafka 和 CnosDB,您可以构建强大的实时数据处理管道,涵盖从数据收集到实时处理,最终到数据存储和可视化。每个步骤都涉及具体的配置和代码实现,确保您熟悉每个工具的功能和操作。该架构适用于物联网监控、实时报表、仪表板等各种实时数据应用。根据您的需求和环境,调整配置和代码以创建适合您业务的实时数据处理解决方案。