ClickHouse(21)ClickHouse集成Kafka表引擎详细解析

本文涉及的产品
实时数仓Hologres,5000CU*H 100GB 3个月
智能开放搜索 OpenSearch行业算法版,1GB 20LCU 1个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: ClickHouse的Kafka表引擎允许直接从Apache Kafka流中消费数据,支持多种数据格式如JSONEachRow。创建Kafka表时需指定参数如brokers、topics、group和format。关键参数包括`kafka_broker_list`、`kafka_topic_list`、`kafka_group_name`和`kafka_format`。Kafka特性包括发布/订阅、容错存储和流处理。通过设置`kafka_num_consumers`可以调整并行消费者数量。Kafka引擎还支持Kerberos认证。虚拟列如`_topic`、`_offset`等提供元数据信息。

Kafka表集成引擎

此引擎与Apache Kafka结合使用。

Kafka 特性:

  • 发布或者订阅数据流。
  • 容错存储机制。
  • 处理流数据。

老版Kafka集成表引擎参数格式:

Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
      [, kafka_row_delimiter, kafka_schema, kafka_num_consumers])

新版Kafka集成表引擎参数格式:

Kafka SETTINGS
  kafka_broker_list = 'localhost:9092',
  kafka_topic_list = 'topic1,topic2',
  kafka_group_name = 'group1',
  kafka_format = 'JSONEachRow',
  kafka_row_delimiter = '\n',
  kafka_schema = '',
  kafka_num_consumers = 2

必要参数:

  • kafka_broker_list – 以逗号分隔的 brokers 列表 (localhost:9092)。
  • kafka_topic_list – topic 列表 (my_topic)。
  • kafka_group_name – Kafka 消费组名称 (group1)。如果不希望消息在集群中重复,请在每个分片中使用相同的组名。
  • kafka_format – 消息体格式。使用与 SQL 部分的 FORMAT 函数相同表示方法,例如 JSONEachRow

可选参数:

  • kafka_row_delimiter - 每个消息体(记录)之间的分隔符。
  • kafka_schema – 如果解析格式需要一个 schema 时,此参数必填。
  • kafka_num_consumers – 单个表的消费者数量。默认值是:1,如果一个消费者的吞吐量不足,则指定更多的消费者。消费者的总数不应该超过 topic 中分区的数量,因为每个分区只能分配一个消费者。

ClickHouse可以接受和返回各种格式的数据。受支持的输入格式可用于提交给INSERT语句、从文件表(File,URL,HDFS或者外部目录)执行SELECT语句,受支持的输出格式可用于格式化SELECT语句的返回结果,或者通过INSERT写入到文件表。

以下kafka_format是支持的格式,ClickHouse可以接受和返回各种格式的数据。受支持的输入格式可用于提交给INSERT语句、从文件表(File,URL,HDFS或者外部目录)执行SELECT语句,受支持的输出格式可用于格式化SELECT语句的返回结果,或者通过INSERT写入到文件表。

格式 输入 输出
[TabSeparated]
[TabSeparatedRaw]
[TabSeparatedWithNames]
[TabSeparatedWithNamesAndTypes]
[Template]
[TemplateIgnoreSpaces]
[CSV]
[CSVWithNames]
[CustomSeparated]
[Values]
[Vertical]
[JSON]
[JSONAsString]
[JSONStrings]
[JSONCompact]
[JSONCompactStrings]
[JSONEachRow]
[JSONEachRowWithProgress]
[JSONStringsEachRow]
[JSONStringsEachRowWithProgress]
[JSONCompactEachRow]
[JSONCompactEachRowWithNamesAndTypes]
[JSONCompactStringsEachRow]
[JSONCompactStringsEachRowWithNamesAndTypes]
[TSKV]
[Pretty]
[PrettyCompact]
[PrettyCompactMonoBlock]
[PrettyNoEscapes]
[PrettySpace]
[Protobuf]
[ProtobufSingle]
[Avro]
[AvroConfluent]
[Parquet]
[Arrow]
[ArrowStream]
[ORC]
[RowBinary]
[RowBinaryWithNamesAndTypes]
[Native]
[Null]
[XML]
[CapnProto]
[LineAsString]
[Regexp]
[RawBLOB]

示例:

  CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

  SELECT * FROM queue LIMIT 5;

  CREATE TABLE queue2 (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
                            kafka_topic_list = 'topic',
                            kafka_group_name = 'group1',
                            kafka_format = 'JSONEachRow',
                            kafka_num_consumers = 4;

  CREATE TABLE queue2 (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
              SETTINGS kafka_format = 'JSONEachRow',
                       kafka_num_consumers = 4;

消费的消息会被自动追踪,因此每个消息在不同的消费组里只会记录一次。如果希望获得两次数据,则使用另一个组名创建副本。

消费组可以灵活配置并且在集群之间同步。例如,如果群集中有10个主题和5个表副本,则每个副本将获得2个主题。 如果副本数量发生变化,主题将自动在副本中重新分配。

SELECT 查询对于读取消息并不是很有用(调试除外),因为每条消息只能被读取一次。使用物化视图创建实时线程更实用。您可以这样做:

  1. 使用引擎创建一个 Kafka 消费者并作为一条数据流。
  2. 创建一个结构表。
  3. 创建物化视图,改视图会在后台转换引擎中的数据并将其放入之前创建的表中。

MATERIALIZED VIEW 添加至引擎,它将会在后台收集数据。可以持续不断地从 Kafka 收集数据并通过 SELECT 将数据转换为所需要的格式。

示例:

  CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

  CREATE TABLE daily (
    day Date,
    level String,
    total UInt64
  ) ENGINE = SummingMergeTree(day, (day, level), 8192);

  CREATE MATERIALIZED VIEW consumer TO daily
    AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
    FROM queue GROUP BY day, level;

  SELECT level, sum(total) FROM daily GROUP BY level;

为了提高性能,接受的消息被分组为max_insert_block_size大小的块。如果未在stream_flush_interval_ms毫秒内形成块,则不关心块的完整性,都会将数据刷新到表中。

停止接收主题数据或更改转换逻辑,请 detach 物化视图:

  DETACH TABLE consumer;
  ATTACH TABLE consumer;

如果使用 ALTER 更改目标表,为了避免目标表与视图中的数据之间存在差异,推荐停止物化视图。

配置

GraphiteMergeTree 类似,Kafka 引擎支持使用ClickHouse配置文件进行扩展配置。可以使用两个配置键:全局 (kafka) 和 主题级别 (kafka_*)。首先应用全局配置,然后应用主题级配置(如果存在)。

  <!-- Global configuration options for all tables of Kafka engine type -->
  <kafka>
    <debug>cgrp</debug>
    <auto_offset_reset>smallest</auto_offset_reset>
  </kafka>

  <!-- Configuration specific for topic "logs" -->
  <kafka_logs>
    <retry_backoff_ms>250</retry_backoff_ms>
    <fetch_min_bytes>100000</fetch_min_bytes>
  </kafka_logs>

ClickHouse配置中使用下划线 (_) ,并不是使用点 (.)。例如,check.crcs=true 将是 <check_crcs>true</check_crcs>

Kerberos 支持

对于使用了kerberos的kafka, 将security_protocol 设置为sasl_plaintext就够了,如果kerberos的ticket是由操作系统获取和缓存的。
clickhouse也支持自己使用keyfile的方式来维护kerbros的凭证。配置sasl_kerberos_service_name、sasl_kerberos_keytab、sasl_kerberos_principal三个子元素就可以。

示例:

  <!-- Kerberos-aware Kafka -->
  <kafka>
    <security_protocol>SASL_PLAINTEXT</security_protocol>
    <sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytab</sasl_kerberos_keytab>
    <sasl_kerberos_principal>kafkauser/kafkahost@EXAMPLE.COM</sasl_kerberos_principal>
  </kafka>

虚拟列

  • _topic – Kafka 主题。
  • _key – 信息的键。
  • _offset – 消息的偏移量。
  • _timestamp – 消息的时间戳。
  • _timestamp_ms – 消息的时间戳(毫秒)。
  • _partition – Kafka 主题的分区。

资料分享

ClickHouse经典中文文档分享

clickhouse系列文章

相关文章
|
14天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
46 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
消息中间件 分布式计算 关系型数据库
大数据-140 - ClickHouse 集群 表引擎详解5 - MergeTree CollapsingMergeTree 与其他数据源 HDFS MySQL
大数据-140 - ClickHouse 集群 表引擎详解5 - MergeTree CollapsingMergeTree 与其他数据源 HDFS MySQL
44 0
|
14天前
|
存储 缓存 大数据
ClickHouse核心概念详解:表引擎与数据模型
【10月更文挑战第26天】在大数据时代,数据处理的速度和效率变得至关重要。ClickHouse,作为一个列式存储数据库系统,以其高效的查询性能和强大的数据处理能力而受到广泛欢迎。本文将从我个人的角度出发,详细介绍ClickHouse的核心概念,特别是其表引擎和数据模型,以及这些特性如何影响数据的存储和查询。
29 1
|
17天前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
48 2
|
1月前
|
存储 SQL 分布式计算
大数据-139 - ClickHouse 集群 表引擎详解4 - MergeTree 实测案例 ReplacingMergeTree SummingMergeTree
大数据-139 - ClickHouse 集群 表引擎详解4 - MergeTree 实测案例 ReplacingMergeTree SummingMergeTree
30 0
|
1月前
|
存储 算法 NoSQL
大数据-138 - ClickHouse 集群 表引擎详解3 - MergeTree 存储结构 数据标记 分区 索引 标记 压缩协同
大数据-138 - ClickHouse 集群 表引擎详解3 - MergeTree 存储结构 数据标记 分区 索引 标记 压缩协同
33 0
|
1月前
|
存储 消息中间件 分布式计算
大数据-137 - ClickHouse 集群 表引擎详解2 - MergeTree 存储结构 一级索引 跳数索引
大数据-137 - ClickHouse 集群 表引擎详解2 - MergeTree 存储结构 一级索引 跳数索引
30 0
|
1月前
|
存储 分布式计算 NoSQL
大数据-136 - ClickHouse 集群 表引擎详解1 - 日志、Log、Memory、Merge
大数据-136 - ClickHouse 集群 表引擎详解1 - 日志、Log、Memory、Merge
39 0
|
2月前
|
消息中间件 安全 Kafka
Kafka支持SSL/TLS协议技术深度解析
SSL(Secure Socket Layer,安全套接层)及其继任者TLS(Transport Layer Security,传输层安全)是为网络通信提供安全及数据完整性的一种安全协议。这些协议在传输层对网络连接进行加密,确保数据在传输过程中不被窃取或篡改。
165 0
|
3月前
|
消息中间件 Kafka 数据处理
实时数据流处理:Dask Streams 与 Apache Kafka 集成
【8月更文第29天】在现代数据处理领域,实时数据流处理已经成为不可或缺的一部分。随着物联网设备、社交媒体和其他实时数据源的普及,处理这些高吞吐量的数据流成为了一项挑战。Apache Kafka 作为一种高吞吐量的消息队列服务,被广泛应用于实时数据流处理场景中。Dask Streams 是 Dask 库的一个子模块,它为 Python 开发者提供了一个易于使用的实时数据流处理框架。本文将介绍如何将 Dask Streams 与 Apache Kafka 结合使用,以实现高效的数据流处理。
76 0