点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
Hadoop(已更完)
HDFS(已更完)
MapReduce(已更完)
Hive(已更完)
Flume(已更完)
Sqoop(已更完)
Zookeeper(已更完)
HBase(已更完)
Redis (已更完)
Kafka(已更完)
Spark(已更完)
Flink(已更完)
ClickHouse(正在更新···)
章节内容
上节我们完成了如下的内容:
MergeTree 实测案例
ReplacingMergeTree
SummingMergeTree
CollapsingMergeTree
简介
以增代删。
Yandex官方给出的介绍是CollapsingMergeTree会异步的删除(折叠)除了特定列的 Sign 有 1 和 -1 的值以外,其余所有字段的值都相等的成对的行。没有成对的行会被保留,该引擎可以显著的降低存储量并提高SELECT查询效率。
CollapsingMergeTree引擎有个状态列Sign,这个值为1为“状态”行,-1为“取消”行,对于数据只关心状态列为状态的数据,不关心状态列为取消的数据。
案例
创建新表
CREATE TABLE cmt_tab ( id UInt32, sign Int8, date Date, name String, point String ) ENGINE = CollapsingMergeTree(sign) PARTITION BY toYYYYMM(date) ORDER BY (name, id) SAMPLE BY id;
执行结果如下图:
插入数据
INSERT INTO cmt_tab (id, sign, date, name, point) VALUES (1, 1, '2024-01-01', 'Alice', '10'), (2, 1, '2024-01-01', 'Bob', '15'), (3, 1, '2024-01-02', 'Charlie', '20'), (4, 1, '2024-01-02', 'David', '25'), (5, 1, '2024-01-03', 'Eve', '30'); -- Mark Alice's row as deleted -- Mark Bob's row as deleted INSERT INTO cmt_tab (id, sign, date, name, point) VALUES (1, -1, '2024-01-01', 'Alice', '10'), (2, -1, '2024-01-01', 'Bob', '15'); -- Insert Alice's updated row -- Insert Bob's updated row INSERT INTO cmt_tab (id, sign, date, name, point) VALUES (1, 1, '2024-01-01', 'Alice', '12'), (2, 1, '2024-01-01', 'Bob', '18');
运行结果如下所示:
optimize
OPTIMIZE TABLE cmt_tab; SELECT * FROM cmt_tab;
执行结果如下图所示:
使用场景
大数据中对于数据更新很难做到,比如统计一个网站或TV的用户数,更多场景都是选择用记录每个点的数据,再对数据进行聚合查询。而ClickHouse通过CollapsingMergeTree就可以实现,使得CollapsingMergeTreeTree大部分用于OLAP场景。
VersionedCollapsingMergeTree
这个引擎和CollapsingMergeTree差不多,只是对CollapsingMergeTree引擎加了一个版本,比如可以适用于非实时的在线统计,统计每个节点用户在线的业务。
其他数据源
端口冲突
我们的ClickHouse和Hadoop的9000端口冲突了,看大家是更改ClickHouse的端口,还是Hadoop的端口。
我这里选择修改ClickHouse的端口,从9000到9001。
不过如果你不做HDFS的相关实验,这块冲突不管直接跳过就好。
我这里选择修改 ClickHouse,我已经集群都修改完毕了,所以我连接方式修改为:
clickhouse-client -m --host h121.wzk.icu --port 9001 --user default --password clickhouse@wzk.icu
HDFS
该引擎提供了集成了Apache Hadoop生态系统通过允许管理数据HDFS通过ClickHouse,这个引擎是相似的到文件和URL引擎,但提供Hadoop特定的功能。
用途介绍
ENGINE = HDFS(URI, format)
该URI参数是HDFS中整个文件的URI,该format参数指定一种可用的文件格式。执行SELECT查询时,格式必须支持输入。
示例1
添加新表
设置 HDFS_ENGINE_TABLE 表:
CREATE TABLE hdfs_engine_table( name String, value UInt32 ) ENGINE = HDFS('hdfs://h121.wzk.icu:9000/clickhouse', 'TSV');
运行之后的截图为:
插入数据
INSERT INTO hdfs_engine_table VALUES('one', 1), ('two', 2), ('three', 3);
运行之后截图为:
查询数据
SELECT * FROM hdfs_engine_table;
运行之后的截图为:
HDFS 数据查看
实施细节
读取和写入可以并行
不支持:ALTER、SELECT SAMPLE、索引、复制
MySQL
介绍
MySQL 引擎可以对存储在远程MySQL服务器上的数据执行SELECT查询。
调用参数
host:port MySQL服务器地址
database 数据库名称
table 表名称
user 数据库用户
password 用户密码
replace_query 将INSERT INTO查询是否替换为REPLACE_INFO的标志,如果REPLACE_QUERY=1则替换查询
on_duplicate_clause 将ON DUPLCATE KEY UPDATE 表达式添加到INSERT查询语句中。
示例
创建新表
CREATE TABLE mysql_table2 ( `id` UInt32, `name` String, `age` UInt32 ) ENGINE = MySQL('h122.wzk.icu:3306', 'clickhouse', 'mysql_table2', 'hive', 'hive@wzk.icu')
执行结果如下图所示:
数据库配置
在数据库中,我们要建立好对应的数据库和表:
插入数据
INSERT INTO mysql_table2 VALUES(1, 'wzk', 18); INSERT INTO mysql_table2 VALUES(2, 'icu', 18);
查询数据
SELECT * FROM mysql_table2;
运行之后截图:
Kafka
Apache Kafka 是一个分布式流处理平台,广泛用于构建实时数据管道和流应用。它能够高效地处理大量的实时数据流,常用于日志收集、事件监控、实时分析等场景。ClickHouse 提供了专门的 Kafka 引擎,使其能够直接从 Kafka 中读取数据,实现实时数据流的处理与分析。
创建新表
CREATE TABLE kafka_events ( `timestamp` DateTime, `event_type` String, `user_id` UInt64, `event_data` String ) ENGINE = Kafka SETTINGS kafka_broker_list = 'broker1:9092,broker2:9092', kafka_topic_list = 'events_topic', kafka_group_name = 'clickhouse_group', kafka_format = 'JSONEachRow', kafka_num_consumers = 1;
创建目标表并设置 Materialized View
为了将 Kafka 中的数据持久化到 ClickHouse 的表中,通常会创建一个目标表,并通过 Materialized View 实现自动插入。
CREATE TABLE events ( `timestamp` DateTime, `event_type` String, `user_id` UInt64, `event_data` String ) ENGINE = MergeTree() ORDER BY timestamp; CREATE MATERIALIZED VIEW kafka_to_events TO events AS SELECT * FROM kafka_events;
插入数据
INSERT INTO events SELECT * FROM kafka_events;
应用场景
实时日志分析:通过 Kafka 收集应用日志,ClickHouse 实时消费并分析日志数据,支持快速故障排查和性能监控。
事件驱动的业务分析:实时跟踪用户行为事件,进行实时的用户行为分析和推荐系统。
实时监控与报警:将监控数据流入 Kafka,ClickHouse 处理并生成实时报警指标。