ClickHouse在物联网(IoT)中的应用:实时监控与分析

简介: 【10月更文挑战第27天】随着物联网(IoT)技术的快速发展,越来越多的设备被连接到互联网上,产生了海量的数据。这些数据不仅包含了设备的状态信息,还包括用户的使用习惯、环境参数等。如何高效地处理和分析这些数据,成为了一个重要的挑战。作为一位数据工程师,我在一个物联网项目中深入使用了ClickHouse,以下是我的经验和思考。

随着物联网(IoT)技术的快速发展,越来越多的设备被连接到互联网上,产生了海量的数据。这些数据不仅包含了设备的状态信息,还包括用户的使用习惯、环境参数等。如何高效地处理和分析这些数据,成为了一个重要的挑战。作为一位数据工程师,我在一个物联网项目中深入使用了ClickHouse,以下是我的经验和思考。
1111.png

一、项目背景

我们的客户是一家智能家居公司,拥有数百万台连接设备,包括智能灯泡、智能插座、温湿度传感器等。这些设备每秒钟都会产生大量的数据,需要实时监控设备状态,同时还需要对历史数据进行分析,以便优化产品和服务。

二、ClickHouse简介

ClickHouse 是一个高性能的列式数据库管理系统(Column-Oriented DBMS),专为在线分析处理(OLAP)场景设计,支持实时查询,并且具有极高的查询性能。ClickHouse 使用SQL作为查询语言,这使得熟悉关系型数据库的用户可以快速上手。此外,ClickHouse 还支持分布式部署,可以在多个节点之间扩展以应对更大规模的数据集。

三、ClickHouse在物联网中的应用

1. 大规模数据处理

物联网设备产生的数据量非常大,传统的关系型数据库往往难以应对。ClickHouse 的列式存储和高效的压缩算法使其能够高效地处理大规模数据。

数据导入

假设我们有一个MQTT消息队列,用于接收设备上报的数据。我们可以使用Apache Flink消费这些消息,并将数据实时写入ClickHouse。

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcOutputFormat;

public class IoTDataIngestion {
   
    public static void main(String[] args) throws Exception {
   
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 消费MQTT消息
        DataStream<String> mqttDataStream = env.addSource(new FlinkMqttSource("tcp://mqtt-broker:1883", "iot/topic"));

        // 解析消息并转换为Tuple
        DataStream<Tuple2<String, Double>> parsedDataStream = mqttDataStream.map(new MapFunction<String, Tuple2<String, Double>>() {
   
            @Override
            public Tuple2<String, Double> map(String value) throws Exception {
   
                String[] parts = value.split(",");
                return new Tuple2<>(parts[0], Double.parseDouble(parts[1]));
            }
        });

        // 将数据写入ClickHouse
        parsedDataStream.addSink(JdbcOutputFormat.buildJdbcOutputFormat()
            .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
            .setDBUrl("jdbc:clickhouse://localhost:8123/default")
            .setQuery("INSERT INTO device_data (device_id, temperature) VALUES (?, ?)")
            .setParameterTypes(Types.VARCHAR, Types.DOUBLE)
            .finish());

        env.execute("IoT Data Ingestion to ClickHouse");
    }
}
2. 实时监控设备状态

实时监控设备状态是物联网应用中的一个重要需求。ClickHouse 支持高效的实时查询,可以快速获取设备的最新状态信息。

实时查询

假设我们需要实时监控某个设备的温度变化,可以使用以下SQL查询:

SELECT device_id, temperature, toDateTime(timestamp) AS time
FROM device_data
WHERE device_id = 'device_001'
ORDER BY time DESC
LIMIT 10;
实时报警

结合实时查询,我们可以在应用程序中设置阈值,当设备温度超过某个阈值时,触发报警。

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class RealTimeAlerts {
   
    public static void main(String[] args) throws Exception {
   
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 假设我们已经从ClickHouse中读取了实时数据
        DataStream<Tuple3<String, Double, Long>> realTimeDataStream = env.fromElements(
            new Tuple3<>("device_001", 25.0, System.currentTimeMillis()),
            new Tuple3<>("device_001", 27.0, System.currentTimeMillis())
        );

        // 设置温度阈值
        double temperatureThreshold = 26.0;

        // 实时报警
        realTimeDataStream.filter(new FilterFunction<Tuple3<String, Double, Long>>() {
   
            @Override
            public boolean filter(Tuple3<String, Double, Long> value) throws Exception {
   
                return value.f1 > temperatureThreshold;
            }
        }).map(new MapFunction<Tuple3<String, Double, Long>, String>() {
   
            @Override
            public String map(Tuple3<String, Double, Long> value) throws Exception {
   
                return "Alert: Device " + value.f0 + " temperature exceeds threshold (" + value.f1 + ")";
            }
        }).print();

        env.execute("Real-Time Alerts");
    }
}
3. 历史数据分析

除了实时监控,对历史数据的分析也是物联网应用的重要部分。ClickHouse 提供了丰富的SQL功能,可以轻松进行历史数据的查询和分析。

历史数据查询

假设我们需要查询过去一周内某个设备的平均温度,可以使用以下SQL查询:

SELECT device_id, AVG(temperature) AS avg_temperature
FROM device_data
WHERE device_id = 'device_001' AND timestamp >= now() - INTERVAL 1 WEEK
GROUP BY device_id;
趋势分析

结合历史数据,我们可以进行趋势分析,例如绘制温度随时间变化的图表。

import pandas as pd
import matplotlib.pyplot as plt
from sqlalchemy import create_engine

# 创建数据库连接
engine = create_engine('clickhouse+native://default:@localhost:8123/default')

# 查询历史数据
query = """
SELECT device_id, temperature, toDateTime(timestamp) AS time
FROM device_data
WHERE device_id = 'device_001' AND timestamp >= now() - INTERVAL 1 WEEK
ORDER BY time
"""
df = pd.read_sql(query, engine)

# 绘制温度随时间变化的图表
plt.figure(figsize=(10, 6))
plt.plot(df['time'], df['temperature'])
plt.xlabel('Time')
plt.ylabel('Temperature')
plt.title('Temperature Trend for Device 001')
plt.grid(True)
plt.show()

四、总结

通过将ClickHouse应用于物联网场景,我们成功地实现了大规模设备数据的高效处理、实时监控和历史数据分析。ClickHouse 的高性能和易用性使其成为物联网数据处理的理想选择。希望我的经验分享能够帮助你在物联网项目中更好地利用ClickHouse。如果你有任何问题或建议,欢迎随时联系我。

目录
相关文章
|
5天前
|
弹性计算 双11 开发者
阿里云ECS“99套餐”再升级!双11一站式满足全年算力需求
11月1日,阿里云弹性计算ECS双11活动全面开启,在延续火爆的云服务器“99套餐”外,CPU、GPU及容器等算力产品均迎来了全年最低价。同时,阿里云全新推出简捷版控制台ECS Lite及专属宝塔面板,大幅降低企业和开发者使用ECS云服务器门槛。
|
22天前
|
存储 弹性计算 人工智能
阿里云弹性计算_通用计算专场精华概览 | 2024云栖大会回顾
阿里云弹性计算产品线、存储产品线产品负责人Alex Chen(陈起鲲)及团队内多位专家,和中国电子技术标准化研究院云计算标准负责人陈行、北京望石智慧科技有限公司首席架构师王晓满两位嘉宾,一同带来了题为《通用计算新品发布与行业实践》的专场Session。本次专场内容包括阿里云弹性计算全新发布的产品家族、阿里云第 9 代 ECS 企业级实例、CIPU 2.0技术解读、E-HPC+超算融合、倚天云原生算力解析等内容,并发布了国内首个云超算国家标准。
阿里云弹性计算_通用计算专场精华概览 | 2024云栖大会回顾
|
4天前
|
人工智能 弹性计算 文字识别
基于阿里云文档智能和RAG快速构建企业"第二大脑"
在数字化转型的背景下,企业面临海量文档管理的挑战。传统的文档管理方式效率低下,难以满足业务需求。阿里云推出的文档智能(Document Mind)与检索增强生成(RAG)技术,通过自动化解析和智能检索,极大地提升了文档管理的效率和信息利用的价值。本文介绍了如何利用阿里云的解决方案,快速构建企业专属的“第二大脑”,助力企业在竞争中占据优势。
|
2天前
|
人工智能 自然语言处理 安全
创新不设限,灵码赋新能:通义灵码新功能深度评测
自从2023年通义灵码发布以来,这款基于阿里云通义大模型的AI编码助手迅速成为开发者心中的“明星产品”。它不仅为个人开发者提供强大支持,还帮助企业团队提升研发效率,推动软件开发行业的创新发展。本文将深入探讨通义灵码最新版本的三大新功能:@workspace、@terminal 和 #team docs,分享这些功能如何在实际工作中提高效率的具体案例。
|
9天前
|
负载均衡 算法 网络安全
阿里云WoSign SSL证书申请指南_沃通SSL技术文档
阿里云平台WoSign品牌SSL证书是由阿里云合作伙伴沃通CA提供,上线阿里云平台以来,成为阿里云平台热销的国产品牌证书产品,用户在阿里云平台https://www.aliyun.com/product/cas 可直接下单购买WoSign SSL证书,快捷部署到阿里云产品中。
2170 6
阿里云WoSign SSL证书申请指南_沃通SSL技术文档
|
1天前
|
安全 数据建模 网络安全
2024阿里云双11,WoSign SSL证书优惠券使用攻略
2024阿里云“11.11金秋云创季”活动主会场,阿里云用户通过完成个人或企业实名认证,可以领取不同额度的满减优惠券,叠加折扣优惠。用户购买WoSign SSL证书,如何叠加才能更加优惠呢?
815 1
|
20天前
|
编解码 Java 程序员
写代码还有专业的编程显示器?
写代码已经十个年头了, 一直都是习惯直接用一台Mac电脑写代码 偶尔接一个显示器, 但是可能因为公司配的显示器不怎么样, 还要接转接头 搞得桌面杂乱无章,分辨率也低,感觉屏幕还是Mac自带的看着舒服
|
27天前
|
存储 人工智能 缓存
AI助理直击要害,从繁复中提炼精华——使用CDN加速访问OSS存储的图片
本案例介绍如何利用AI助理快速实现OSS存储的图片接入CDN,以加速图片访问。通过AI助理提炼关键操作步骤,避免在复杂文档中寻找解决方案。主要步骤包括开通CDN、添加加速域名、配置CNAME等。实测显示,接入CDN后图片加载时间显著缩短,验证了加速效果。此方法大幅提高了操作效率,降低了学习成本。
5396 15
|
14天前
|
人工智能 关系型数据库 Serverless
1024,致开发者们——希望和你一起用技术人独有的方式,庆祝你的主场
阿里云开发者社区推出“1024·云上见”程序员节专题活动,包括云上实操、开发者测评和征文三个分会场,提供14个实操活动、3个解决方案、3 个产品方案的测评及征文比赛,旨在帮助开发者提升技能、分享经验,共筑技术梦想。
1173 152
|
22天前
|
存储 缓存 关系型数据库
MySQL事务日志-Redo Log工作原理分析
事务的隔离性和原子性分别通过锁和事务日志实现,而持久性则依赖于事务日志中的`Redo Log`。在MySQL中,`Redo Log`确保已提交事务的数据能持久保存,即使系统崩溃也能通过重做日志恢复数据。其工作原理是记录数据在内存中的更改,待事务提交时写入磁盘。此外,`Redo Log`采用简单的物理日志格式和高效的顺序IO,确保快速提交。通过不同的落盘策略,可在性能和安全性之间做出权衡。
1587 14