Kafka数据入湖OSS实践

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,内容安全 1000次 1年
日志服务 SLS,月写入数据量 50GB 1个月
简介: 本质上,Kafka提供的是消息队列的能力,为消息从生产者流向消费中提供稳定、高效、可靠的渠道。但Kafka本身并不提供海量数据存储的能力,这意味着重读kafka中历史数据将不可能。同时,Kafka没有提供开箱即用的数据处理工具(尽管你可以采用kafka streams或者flink等,但这需要你自己写代码逻辑),使得对原始数据进行加工处理成本较高。

本质上,Kafka提供的是消息队列的能力,为消息从生产者流向消费中提供稳定、高效、可靠的渠道。但Kafka本身并不提供海量数据存储的能力,这意味着重读kafka中历史数据将不可能。同时,Kafka没有提供开箱即用的数据处理工具(尽管你可以采用kafka streams或者flink等,但这需要你自己写代码逻辑),使得对原始数据进行加工处理成本较高。

我们知道,阿里云OSS提供了灵活、海量、高性价比的存储方式,用户可以根据自己的需要设置合适的存储策略。同时,日志服务SLS则提供了一站式的数据采集、加工、分析、告警可视化与投递功能。本文将会介绍一种基于SLS将Kafka数据入湖OSS的整体方案,为Kafka中的数据增加查询分析和海量存储的能力。

数据接入

SLS提供了主动接入Kafka数据的服务,用户只需配置几项必要的信息,即可将Kafka数据接入到SLS中。通常情况下,只需提供Kafka集群的bootstrap servers、对应的topic以及消费位置信息。但针对阿里云VPC下部署的Kafka集群,为了避免公网访问,则还需提供相关的VPC和ECS实例信息。下面将会具体介绍Kafka集群常见的几种部署形式以及如何通过SLS控制台快速配置Kafka接入的过程。

集群部署方式

不同的Kafka集群部署方式,接入SLS之前,需要进行不同的准备。Kafka集群的部署形式可以分为以下3种情况:

  1. 基于阿里云ECS实例自建集群
  2. 阿里云Kafka产品
  3. 其他场景(非阿里云上部署)

下面将分别介绍以上不同部署形式接入SLS的准备工作以及特点。

基于阿里云ECS实例自建集群

针对这种场景,SLS服务能够打通VPC以内网连接的形式拉取Kafka中的数据,从而提供更安全可靠的数据读取方式。当然,这要求Kafka中的数据必须接入到与VPC region相同的SLS project中,否则只能按照第三种部署形式进行接入。

为了能够打通对用户VPC的访问,用户需要额外提供Kafka集群所在的每个ECS实例的VPC ID和IP信息。如果Kafka broker之间是基于内网域名进行通信,还需提供ECS实例对应的内部域名。总结下来,需要额外提供如下类似的VPC相关信息,其中v1和v2用来标记不同broker对应的ECS实例信息(这里假设只部署了2个Kafka broker)。

{
    "config.vpc.vpc_id.v1":"vpc-bp1949587myedyj8s1bqw",
    "config.vpc.instance_ip.v1":"192.168.26.34",
    "config.vpc.instance_port.v1":"9092",
    "config.vpc.instance_host.v1":"kafka-host1",
    "config.vpc.vpc_id.v2":"vpc-bp1949587myedyj8s1bqw",
    "config.vpc.instance_ip.v2":"192.168.26.35",
    "config.vpc.instance_port.v2":"9092",
    "config.vpc.instance_host.v2":"kafka-host2"
}

阿里云Kafka产品

阿里云提供了消息队列kafka版,方便用户可以快速地使用Kafka服务。本质上,Kafka云产品也是基于VPC进行集群部署的,例如下图中显示的是Kafka云产品某个实例的具体接入点和VPC ID信息。只不过Kafka云产品提供了相应的控制台,可以协助用户以可视化的形式对Kafka集群进行管理(如果自建集群的话,则一般通过Kafka自带的命令行工具进行管理)。
kafka产品接入信息.png

图 1 阿里云Kafka产品接入信息

需要说明的是,默认情况下Kafka云产品没有开启消费组自动创建,因此在将Kafka云产品中的数据接入SLS之前,必须手动创建好消费组并填写到SLS导入配置中,否则客户端会得到错误信息,如librdkafka返回"JoinGroup failed: Broker: Group authorization failed"。通过对应Kafka云产品实例下的Group管理,可以手动创建新的consumer group,具体如下图所示。
创建消费组.png

图 2 阿里云Kafka产品手动创建消费组

因此,针对Kafka云产品的场景,除了配置VPC相关的信息外,还需要配置使用到的consumer group。结合图 1和图 2中实例,需要的额外信息如下所示:

{
    "group.id":"will-test",
    "config.vpc.vpc_id.v1":"vpc-xxxxxxxxxxj8s1bqw",
    "config.vpc.instance_ip.v1":"192.168.26.34",
    "config.vpc.instance_port.v1":"9092",
    "config.vpc.vpc_id.v2":"vpc-xxxxxxxxxxj8s1bqw",
    "config.vpc.instance_ip.v2":"192.168.26.35",
    "config.vpc.instance_port.v2":"9092",
    "config.vpc.vpc_id.v3":"vpc-xxxxxxxxxxj8s1bqw",
    "config.vpc.instance_ip.v3":"192.168.26.36",
    "config.vpc.instance_port.v3":"9092"
}

其他场景(其他非阿里云上部署)

在这种场景下,只有Kafka集群提供公网访问能力后,SLS服务才能从Kafka broker读取到数据。在SLS控制台,除了提供Kafka集群的公网bootstrap servers、需要接入的topic以及消费位置外,并不需要额外的其他配置信息。然后大多情况下,Kafka集群只提供了内网访问能力,如果需要同时提供公网访问的能力,则需要对Kafka集群进行特定的配置,具体配置可以参考我之前写的文章《Kafka集群如何同时支持内网和外网访问》。

数据接入SLS的步骤
1)登陆SLS控制台,在数据接入/抓取数据下,选择Kafka
选择kafka数据接入.png

图 3 控制台选择Kafka数据接入

2)进入选择日志空间步骤,根据Kafka集群所在的region,选择/创建相应region下的SLS Project和Logstore
选择project和logstore.png

图 4 选择Kafka数据接入的Project和Logstore

3)配置Kafka集群相关的数据源信息(这里以Kafka云产品数据接入为例),并点击预览以确保配置无误
preview kafka data.png

图 5 预览Kafka数据

4)根据需要配置日志时间字段,然后在调度间隔选择立即执行
配置时间字段.png

图 6 配置数据的时间字段

5)去对应的Logstore查看Kafka数据是否正确接入到SLS (需要等待几分钟)
查看数据接入.png

图 7 查看数据接入情况

数据加工

Kafka中的原始数据可能存在冗余、不完整、不规整等情况,如果直接用于业务消费,则需要做额外的加工处理,从而增加了业务逻辑实现的复杂性和不必要的成本。而SLS提供了丰富的DSL算子来协助用户对数据进行加工处理,可以方便地实现数据的规整、富化、流转、脱敏和过滤,更多细节可以参考SLS数据加工有关的文档。

下图展示了SLS数据加工的一种使用场景,即将非结构化的syslog日志过滤、规整为用户登陆相关的结构化数据。规整化后的数据,更加便于后续的数据分析,比如查询某个用户在特定时间段内登陆的次数。SLS内置的200多个开箱即用的算子,大大方便了对原始Kafka数据的处理,进而让用户更加专注于业务相关的逻辑。
数据规整.png

图 8 数据规整 (来源SLS数据加工)

数据分析

SLS提供了强大的SQL查询能力,能够让用户快速地对海量信息进行过滤和分析,具体可以参考SLS查询和分析相关的官方文档。同时,结合SLS提供的丰富仪表盘,能将分析后的数据以多样化的形式呈现出来,从而提供更为直观的数据洞察能力。比如,下图是基于SLS的SQL查询和仪表盘得到的结果,可以非常直观地看到Top 10用户每天通过主动接入服务导入的数据总量。
SQL查询.png

图 9 SQL查询

数据入湖

尽管日志服务SLS提供了强大的数据分析、加工等能力,但它会引入额外的存储成本。除了保存原始数据外,它还需要构建倒排索引、列式存储等元数据。因此,开启查询分析后,存入SLS的数据会有额外的开销。对于不常用的数据(如较早的历史数据),SLS支持直接将这些数据以压缩的形式入湖OSS进行存储。另外,OSS不仅支持海量的数据存储,而且还提供了丰富的冷热存储方式(如标准、低频、归档、冷归档等),用户可以根据自己的需要进行选择。

数据投递,是SLS数据入湖到OSS的优选方式,更多细节可以参考文章《SLS投递OSS功能升级:打造更顺畅的日志入湖体验》。通过SLS投递功能,可以将Kafka中不常用的数据以冷存储的形式保存到OSS中。与此同时,SLS还提供了将OSS中的数据导入回SLS的能力。结合SLS数据入湖OSS和OSS数据导入SLS这两个功能,用户可以非常灵活地控制哪些数据需入湖以低成本的形式存储,哪些数据进行查询分析以提供业务价值,具体流程如下图所示。
整体流程.png

图 10 Kafka数据入湖整体流程

总结

本文主要介绍了Kafka数据基于SLS入湖OSS的整体方案,其中着重介绍了Kafka的几种常见的部署方式,以及Kafka数据接入SLS的具体过程。虽然Kafka提供了强大的消息队列能力,但由于其缺乏对原始数据的加工/分析的能力以及对海量历史数据的存储能力,使得用户对Kafka原始数据进行分析处理变得困难。而基于SLS提供的加工/分析、SLS数据入湖OSS以及OSS数据导入SLS等特性,则可以弥补Kafka缺失的这两种能力,使得用户可以随时随地对任意时间段内的Kafka原始数据进行分析。

目录
相关文章
|
2月前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
112 4
|
21天前
|
存储 人工智能 开发工具
AI场景下的对象存储OSS数据管理实践
本文介绍了对象存储(OSS)在AI业务中的应用与实践。内容涵盖四个方面:1) 对象存储作为AI数据基石,因其低成本和高弹性成为云上数据存储首选;2) AI场景下的对象存储实践方案,包括数据获取、预处理、训练及推理阶段的具体使用方法;3) 国内主要区域的默认吞吐量提升至100Gbps,优化了大数据量下的带宽需求;4) 常用工具介绍,如OSSutil、ossfs、Python SDK等,帮助用户高效管理数据。重点讲解了OSS在AI训练和推理中的性能优化措施,以及不同工具的特点和应用场景。
78 10
|
21天前
|
弹性计算 人工智能 数据管理
AI场景下的对象存储OSS数据管理实践
本文介绍了ECS和OSS的操作流程,分为两大部分。第一部分详细讲解了ECS的登录、密码重置、安全组设置及OSSUTIL工具的安装与配置,通过实验创建并管理存储桶,上传下载文件,确保资源及时释放。第二部分则聚焦于OSSFS工具的应用,演示如何将对象存储挂载为磁盘,进行大文件加载与模型训练,强调环境搭建(如Conda环境)及依赖安装步骤,确保实验结束后正确清理AccessKey和相关资源。整个过程注重操作细节与安全性,帮助用户高效利用云资源完成实验任务。
77 10
|
2月前
|
弹性计算 数据管理 应用服务中间件
活动实践 | 借助OSS搭建在线教育视频课程分享网站
本教程指导用户在阿里云ECS实例上搭建在线教育网站,包括重置ECS密码、配置安全组、安装Nginx、创建网站页面、上传数据至OSS、开通OSS传输加速、配置生命周期策略及清理资源等步骤,实现高效、低成本的数据管理和网站运营。
活动实践 | 借助OSS搭建在线教育视频课程分享网站
|
3月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
127 1
|
3月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
66 1
|
5月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
385 9
|
5月前
|
分布式计算 搜索推荐 物联网
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
|
5月前
|
消息中间件 负载均衡 Kafka
Kafka 实现负载均衡与故障转移:深入分析 Kafka 的架构特点与实践
【8月更文挑战第24天】Apache Kafka是一款专为实时数据处理和流传输设计的高性能消息系统。其核心设计注重高吞吐量、低延迟与可扩展性,并具备出色的容错能力。Kafka采用分布式日志概念,通过数据分区及副本机制确保数据可靠性和持久性。系统包含Producer(消息生产者)、Consumer(消息消费者)和Broker(消息服务器)三大组件。Kafka利用独特的分区机制实现负载均衡,每个Topic可以被划分为多个分区,每个分区可以被复制到多个Broker上,确保数据的高可用性和可靠性。
144 2
|
5月前
|
消息中间件 存储 算法
时间轮在Kafka的实践:技术深度剖析
【8月更文挑战第13天】在分布式消息系统Kafka中,时间轮(Timing Wheel)作为一种高效的时间调度机制,被广泛应用于处理各种延时操作,如延时生产、延时拉取和延时删除等。本文将深入探讨时间轮在Kafka中的实践应用,解析其技术原理、优势及具体实现方式。
178 2