Kafka数据入湖OSS实践

本文涉及的产品
对象存储 OSS,20GB 3个月
云服务器ECS,u1 2核4GB 1个月
云服务器 ECS,每月免费额度200元 3个月
简介: 本质上,Kafka提供的是消息队列的能力,为消息从生产者流向消费中提供稳定、高效、可靠的渠道。但Kafka本身并不提供海量数据存储的能力,这意味着重读kafka中历史数据将不可能。同时,Kafka没有提供开箱即用的数据处理工具(尽管你可以采用kafka streams或者flink等,但这需要你自己写代码逻辑),使得对原始数据进行加工处理成本较高。我们知道,阿里云OSS提供了灵活、海量、高性价比的

本质上,Kafka提供的是消息队列的能力,为消息从生产者流向消费中提供稳定、高效、可靠的渠道。但Kafka本身并不提供海量数据存储的能力,这意味着重读kafka中历史数据将不可能。同时,Kafka没有提供开箱即用的数据处理工具(尽管你可以采用kafka streams或者flink等,但这需要你自己写代码逻辑),使得对原始数据进行加工处理成本较高。

我们知道,阿里云OSS提供了灵活、海量、高性价比的存储方式,用户可以根据自己的需要设置合适的存储策略。同时,日志服务SLS则提供了一站式的数据采集、加工、分析、告警可视化与投递功能。本文将会介绍一种基于SLS将Kafka数据入湖OSS的整体方案,为Kafka中的数据增加查询分析和海量存储的能力。

数据接入

SLS提供了主动接入Kafka数据的服务,用户只需配置几项必要的信息,即可将Kafka数据接入到SLS中。通常情况下,只需提供Kafka集群的bootstrap servers、对应的topic以及消费位置信息。但针对阿里云VPC下部署的Kafka集群,为了避免公网访问,则还需提供相关的VPC和ECS实例信息。下面将会具体介绍Kafka集群常见的几种部署形式以及如何通过SLS控制台快速配置Kafka接入的过程。

集群部署方式

不同的Kafka集群部署方式,接入SLS之前,需要进行不同的准备。Kafka集群的部署形式可以分为以下3种情况:

  1. 基于阿里云ECS实例自建集群
  2. 阿里云Kafka产品
  3. 其他场景(非阿里云上部署)

下面将分别介绍以上不同部署形式接入SLS的准备工作以及特点。

基于阿里云ECS实例自建集群

针对这种场景,SLS服务能够打通VPC以内网连接的形式拉取Kafka中的数据,从而提供更安全可靠的数据读取方式。当然,这要求Kafka中的数据必须接入到与VPC region相同的SLS project中,否则只能按照第三种部署形式进行接入。

为了能够打通对用户VPC的访问,用户需要额外提供Kafka集群所在的每个ECS实例的VPC ID和IP信息。如果Kafka broker之间是基于内网域名进行通信,还需提供ECS实例对应的内部域名。总结下来,需要额外提供如下类似的VPC相关信息,其中v1和v2用来标记不同broker对应的ECS实例信息(这里假设只部署了2个Kafka broker)。

{
    "config.vpc.vpc_id.v1":"vpc-bp1949587myedyj8s1bqw",
    "config.vpc.instance_ip.v1":"192.168.26.34",
    "config.vpc.instance_port.v1":"9092",
    "config.vpc.instance_host.v1":"kafka-host1",
    "config.vpc.vpc_id.v2":"vpc-bp1949587myedyj8s1bqw",
    "config.vpc.instance_ip.v2":"192.168.26.35",
    "config.vpc.instance_port.v2":"9092",
    "config.vpc.instance_host.v2":"kafka-host2"
}

阿里云Kafka产品

阿里云提供了消息队列kafka版,方便用户可以快速地使用Kafka服务。本质上,Kafka云产品也是基于VPC进行集群部署的,例如下图中显示的是Kafka云产品某个实例的具体接入点和VPC ID信息。只不过Kafka云产品提供了相应的控制台,可以协助用户以可视化的形式对Kafka集群进行管理(如果自建集群的话,则一般通过Kafka自带的命令行工具进行管理)。

图 1 阿里云Kafka产品接入信息

需要说明的是,默认情况下Kafka云产品没有开启消费组自动创建,因此在将Kafka云产品中的数据接入SLS之前,必须手动创建好消费组并填写到SLS导入配置中,否则客户端会得到错误信息,如librdkafka返回"JoinGroup failed: Broker: Group authorization failed"。通过对应Kafka云产品实例下的Group管理,可以手动创建新的consumer group,具体如下图所示。

图 2 阿里云Kafka产品手动创建消费组

因此,针对Kafka云产品的场景,除了配置VPC相关的信息外,还需要配置使用到的consumer group。结合图 1和图 2中实例,需要的额外信息如下所示:

{
    "group.id":"will-test",
    "config.vpc.vpc_id.v1":"vpc-xxxxxxxxxxj8s1bqw",
    "config.vpc.instance_ip.v1":"192.168.26.34",
    "config.vpc.instance_port.v1":"9092",
    "config.vpc.vpc_id.v2":"vpc-xxxxxxxxxxj8s1bqw",
    "config.vpc.instance_ip.v2":"192.168.26.35",
    "config.vpc.instance_port.v2":"9092",
    "config.vpc.vpc_id.v3":"vpc-xxxxxxxxxxj8s1bqw",
    "config.vpc.instance_ip.v3":"192.168.26.36",
    "config.vpc.instance_port.v3":"9092"
}

其他场景(其他非阿里云上部署)

在这种场景下,只有Kafka集群提供公网访问能力后,SLS服务才能从Kafka broker读取到数据。在SLS控制台,除了提供Kafka集群的公网bootstrap servers、需要接入的topic以及消费位置外,并不需要额外的其他配置信息。然后大多情况下,Kafka集群只提供了内网访问能力,如果需要同时提供公网访问的能力,则需要对Kafka集群进行特定的配置,具体配置可以参考我之前写的文章Kafka集群如何同时支持内网和外网访问

数据接入SLS的步骤

  1. 登陆SLS控制台 ,在 数据接入/抓取数据 下,选择Kafka

图 3 控制台选择Kafka数据接入

  1. 进入 选择日志空间 步骤,根据Kafka集群所在的region,选择/创建相应region下的SLS Project和Logstore

图 4 选择Kafka数据接入的Project和Logstore

  1. 配置Kafka集群相关的数据源信息(这里以Kafka云产品数据接入为例),并点击预览以确保配置无误

图 5 预览Kafka数据

  1. 根据需要配置日志时间字段,然后在调度间隔选择立即执行

图 6 配置数据的时间字段

  1. 去对应的Logstore查看Kafka数据是否正确接入到SLS (需要等待几分钟)

图 7 查看数据接入情况

数据加工

Kafka中的原始数据可能存在冗余、不完整、不规整等情况,如果直接用于业务消费,则需要做额外的加工处理,从而增加了业务逻辑实现的复杂性和不必要的成本。SLS提供了丰富的DSL算子来协助用户对数据进行加工处理,可以方便地实现数据的规整、富化、流转、脱敏和过滤,更多细节可以参考SLS数据加工有关的文档。

下图展示了SLS数据加工的一种使用场景,即将非结构化的syslog日志过滤、规整为用户登陆相关的结构化数据。规整化后的数据,更加便于后续的数据分析,比如查询某个用户在特定时间段内登陆的次数。SLS内置的200多个开箱即用的算子,大大方便了对原始Kafka数据的处理,进而让用户更加专注于业务相关的逻辑。

图 8 数据规整 (来源SLS数据加工

数据分析

SLS提供了强大的SQL查询能力,能够让用户快速地对海量信息进行过滤和分析,具体可以参考SLS查询和分析相关的官方文档。同时,结合SLS提供的丰富仪表盘,能将分析后的数据以多样化的形式呈现出来,从而提供更为直观的数据洞察能力。比如,下图是基于SLS的SQL查询和仪表盘得到的结果,可以非常直观地看到Top 10用户每天通过主动接入服务导入的数据总量。

 

图 9 SQL查询

数据入湖

尽管日志服务SLS提供了强大的数据分析、加工等能力,但它会引入额外的存储成本。除了保存原始数据外,它还需要构建倒排索引、列式存储等元数据。因此,开启查询分析后,存入SLS的数据会有额外的开销。对于不常用的数据(如较早的历史数据),SLS支持直接将这些数据以压缩的形式入湖OSS进行存储。另外,OSS不仅支持海量的数据存储,而且还提供了丰富的冷热存储方式(如标准、低频、归档、冷归档等),用户可以根据自己的需要进行选择。

数据投递,是SLS数据入湖到OSS的优选方式,更多细节可以参考文章SLS投递OSS功能升级:打造更顺畅的日志入湖体验》。通过SLS投递功能,可以将Kafka中不常用的数据以冷存储的形式保存到OSS中。与此同时,SLS还提供了将OSS中的数据导入回SLS的能力。结合SLS数据入湖OSSOSS数据导入SLS这两个功能,用户可以非常灵活地控制哪些数据需入湖以低成本的形式存储,哪些数据进行查询分析以提供业务价值,具体流程如下图所示。

  

图 10 Kafka数据入湖整体流程

总结

本文主要介绍了Kafka数据基于SLS入湖OSS的整体方案,其中着重介绍了Kafka的几种常见的部署方式,以及Kafka数据接入SLS的具体过程。虽然Kafka提供了强大的消息队列能力,但由于其缺乏对原始数据的加工/分析的能力以及对海量历史数据的存储能力,使得用户对Kafka原始数据进行分析处理变得困难。而基于SLS提供的加工/分析、SLS数据入湖OSS以及OSS数据导入SLS等特性,则可以弥补Kafka缺失的这两种能力,使得用户可以随时随地对任意时间段内的Kafka原始数据进行分析。

相关文章
|
19天前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之通过flink同步kafka数据进到doris,decimal数值类型的在kafka是正常显示数值,但是同步到doris表之后数据就变成了整数,该如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
19天前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之 从Kafka读取数据,并与两个仅在任务启动时读取一次的维度表进行内连接(inner join)时,如果没有匹配到的数据会被直接丢弃还是会被存储在内存中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
24天前
|
DataWorks 安全 关系型数据库
DataWorks产品使用合集之同步数据到OSS时,文件的切分单位如何设置
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
18天前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
777 0
|
9天前
|
Java
使用kafka-clients操作数据(java)
使用kafka-clients操作数据(java)
14 6
|
4天前
|
数据采集 DataWorks 安全
DataWorks产品使用合集之将按日分区的表同步数据到OSS数据源,该如何配置
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
23 1
|
4天前
|
消息中间件 Java Kafka
kafka 磁盘扩容与数据均衡操作代码
Kafka 的磁盘扩容和数据均衡是与保证Kafka集群可用性和性能相关的两个重要方面。在 Kafka 中,分区数据的存储和平衡对集群的运行至关重要。以下是有关Kafka磁盘扩容和数据均衡的一些建议
12 1
|
11天前
|
机器学习/深度学习 分布式计算 大数据
MaxCompute产品使用问题之如何直接加载oss中的parque数据,无需指定列和分区
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
19天前
|
消息中间件 SQL Kafka
实时计算 Flink版产品使用问题之如何实现OSS数据到Kafka的实时同步
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
24天前
|
存储 分布式计算 大数据
MaxCompute产品使用合集之是否支持创建OSS外部表为分区表,并访问OSS上以分区方式存储的数据
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。