Kafka数据入湖OSS实践

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,内容安全 1000次 1年
对象存储 OSS,恶意文件检测 1000次 1年
简介: 本质上,Kafka提供的是消息队列的能力,为消息从生产者流向消费中提供稳定、高效、可靠的渠道。但Kafka本身并不提供海量数据存储的能力,这意味着重读kafka中历史数据将不可能。同时,Kafka没有提供开箱即用的数据处理工具(尽管你可以采用kafka streams或者flink等,但这需要你自己写代码逻辑),使得对原始数据进行加工处理成本较高。我们知道,阿里云OSS提供了灵活、海量、高性价比的

本质上,Kafka提供的是消息队列的能力,为消息从生产者流向消费中提供稳定、高效、可靠的渠道。但Kafka本身并不提供海量数据存储的能力,这意味着重读kafka中历史数据将不可能。同时,Kafka没有提供开箱即用的数据处理工具(尽管你可以采用kafka streams或者flink等,但这需要你自己写代码逻辑),使得对原始数据进行加工处理成本较高。

我们知道,阿里云OSS提供了灵活、海量、高性价比的存储方式,用户可以根据自己的需要设置合适的存储策略。同时,日志服务SLS则提供了一站式的数据采集、加工、分析、告警可视化与投递功能。本文将会介绍一种基于SLS将Kafka数据入湖OSS的整体方案,为Kafka中的数据增加查询分析和海量存储的能力。

数据接入

SLS提供了主动接入Kafka数据的服务,用户只需配置几项必要的信息,即可将Kafka数据接入到SLS中。通常情况下,只需提供Kafka集群的bootstrap servers、对应的topic以及消费位置信息。但针对阿里云VPC下部署的Kafka集群,为了避免公网访问,则还需提供相关的VPC和ECS实例信息。下面将会具体介绍Kafka集群常见的几种部署形式以及如何通过SLS控制台快速配置Kafka接入的过程。

集群部署方式

不同的Kafka集群部署方式,接入SLS之前,需要进行不同的准备。Kafka集群的部署形式可以分为以下3种情况:

  1. 基于阿里云ECS实例自建集群
  2. 阿里云Kafka产品
  3. 其他场景(非阿里云上部署)

下面将分别介绍以上不同部署形式接入SLS的准备工作以及特点。

基于阿里云ECS实例自建集群

针对这种场景,SLS服务能够打通VPC以内网连接的形式拉取Kafka中的数据,从而提供更安全可靠的数据读取方式。当然,这要求Kafka中的数据必须接入到与VPC region相同的SLS project中,否则只能按照第三种部署形式进行接入。

为了能够打通对用户VPC的访问,用户需要额外提供Kafka集群所在的每个ECS实例的VPC ID和IP信息。如果Kafka broker之间是基于内网域名进行通信,还需提供ECS实例对应的内部域名。总结下来,需要额外提供如下类似的VPC相关信息,其中v1和v2用来标记不同broker对应的ECS实例信息(这里假设只部署了2个Kafka broker)。

{
    "config.vpc.vpc_id.v1":"vpc-bp1949587myedyj8s1bqw",
    "config.vpc.instance_ip.v1":"192.168.26.34",
    "config.vpc.instance_port.v1":"9092",
    "config.vpc.instance_host.v1":"kafka-host1",
    "config.vpc.vpc_id.v2":"vpc-bp1949587myedyj8s1bqw",
    "config.vpc.instance_ip.v2":"192.168.26.35",
    "config.vpc.instance_port.v2":"9092",
    "config.vpc.instance_host.v2":"kafka-host2"
}

阿里云Kafka产品

阿里云提供了消息队列kafka版,方便用户可以快速地使用Kafka服务。本质上,Kafka云产品也是基于VPC进行集群部署的,例如下图中显示的是Kafka云产品某个实例的具体接入点和VPC ID信息。只不过Kafka云产品提供了相应的控制台,可以协助用户以可视化的形式对Kafka集群进行管理(如果自建集群的话,则一般通过Kafka自带的命令行工具进行管理)。

图 1 阿里云Kafka产品接入信息

需要说明的是,默认情况下Kafka云产品没有开启消费组自动创建,因此在将Kafka云产品中的数据接入SLS之前,必须手动创建好消费组并填写到SLS导入配置中,否则客户端会得到错误信息,如librdkafka返回"JoinGroup failed: Broker: Group authorization failed"。通过对应Kafka云产品实例下的Group管理,可以手动创建新的consumer group,具体如下图所示。

图 2 阿里云Kafka产品手动创建消费组

因此,针对Kafka云产品的场景,除了配置VPC相关的信息外,还需要配置使用到的consumer group。结合图 1和图 2中实例,需要的额外信息如下所示:

{
    "group.id":"will-test",
    "config.vpc.vpc_id.v1":"vpc-xxxxxxxxxxj8s1bqw",
    "config.vpc.instance_ip.v1":"192.168.26.34",
    "config.vpc.instance_port.v1":"9092",
    "config.vpc.vpc_id.v2":"vpc-xxxxxxxxxxj8s1bqw",
    "config.vpc.instance_ip.v2":"192.168.26.35",
    "config.vpc.instance_port.v2":"9092",
    "config.vpc.vpc_id.v3":"vpc-xxxxxxxxxxj8s1bqw",
    "config.vpc.instance_ip.v3":"192.168.26.36",
    "config.vpc.instance_port.v3":"9092"
}

其他场景(其他非阿里云上部署)

在这种场景下,只有Kafka集群提供公网访问能力后,SLS服务才能从Kafka broker读取到数据。在SLS控制台,除了提供Kafka集群的公网bootstrap servers、需要接入的topic以及消费位置外,并不需要额外的其他配置信息。然后大多情况下,Kafka集群只提供了内网访问能力,如果需要同时提供公网访问的能力,则需要对Kafka集群进行特定的配置,具体配置可以参考我之前写的文章Kafka集群如何同时支持内网和外网访问

数据接入SLS的步骤

  1. 登陆SLS控制台 ,在 数据接入/抓取数据 下,选择Kafka

图 3 控制台选择Kafka数据接入

  1. 进入 选择日志空间 步骤,根据Kafka集群所在的region,选择/创建相应region下的SLS Project和Logstore

图 4 选择Kafka数据接入的Project和Logstore

  1. 配置Kafka集群相关的数据源信息(这里以Kafka云产品数据接入为例),并点击预览以确保配置无误

图 5 预览Kafka数据

  1. 根据需要配置日志时间字段,然后在调度间隔选择立即执行

图 6 配置数据的时间字段

  1. 去对应的Logstore查看Kafka数据是否正确接入到SLS (需要等待几分钟)

图 7 查看数据接入情况

数据加工

Kafka中的原始数据可能存在冗余、不完整、不规整等情况,如果直接用于业务消费,则需要做额外的加工处理,从而增加了业务逻辑实现的复杂性和不必要的成本。SLS提供了丰富的DSL算子来协助用户对数据进行加工处理,可以方便地实现数据的规整、富化、流转、脱敏和过滤,更多细节可以参考SLS数据加工有关的文档。

下图展示了SLS数据加工的一种使用场景,即将非结构化的syslog日志过滤、规整为用户登陆相关的结构化数据。规整化后的数据,更加便于后续的数据分析,比如查询某个用户在特定时间段内登陆的次数。SLS内置的200多个开箱即用的算子,大大方便了对原始Kafka数据的处理,进而让用户更加专注于业务相关的逻辑。

图 8 数据规整 (来源SLS数据加工

数据分析

SLS提供了强大的SQL查询能力,能够让用户快速地对海量信息进行过滤和分析,具体可以参考SLS查询和分析相关的官方文档。同时,结合SLS提供的丰富仪表盘,能将分析后的数据以多样化的形式呈现出来,从而提供更为直观的数据洞察能力。比如,下图是基于SLS的SQL查询和仪表盘得到的结果,可以非常直观地看到Top 10用户每天通过主动接入服务导入的数据总量。

 

图 9 SQL查询

数据入湖

尽管日志服务SLS提供了强大的数据分析、加工等能力,但它会引入额外的存储成本。除了保存原始数据外,它还需要构建倒排索引、列式存储等元数据。因此,开启查询分析后,存入SLS的数据会有额外的开销。对于不常用的数据(如较早的历史数据),SLS支持直接将这些数据以压缩的形式入湖OSS进行存储。另外,OSS不仅支持海量的数据存储,而且还提供了丰富的冷热存储方式(如标准、低频、归档、冷归档等),用户可以根据自己的需要进行选择。

数据投递,是SLS数据入湖到OSS的优选方式,更多细节可以参考文章SLS投递OSS功能升级:打造更顺畅的日志入湖体验》。通过SLS投递功能,可以将Kafka中不常用的数据以冷存储的形式保存到OSS中。与此同时,SLS还提供了将OSS中的数据导入回SLS的能力。结合SLS数据入湖OSSOSS数据导入SLS这两个功能,用户可以非常灵活地控制哪些数据需入湖以低成本的形式存储,哪些数据进行查询分析以提供业务价值,具体流程如下图所示。

  

图 10 Kafka数据入湖整体流程

总结

本文主要介绍了Kafka数据基于SLS入湖OSS的整体方案,其中着重介绍了Kafka的几种常见的部署方式,以及Kafka数据接入SLS的具体过程。虽然Kafka提供了强大的消息队列能力,但由于其缺乏对原始数据的加工/分析的能力以及对海量历史数据的存储能力,使得用户对Kafka原始数据进行分析处理变得困难。而基于SLS提供的加工/分析、SLS数据入湖OSS以及OSS数据导入SLS等特性,则可以弥补Kafka缺失的这两种能力,使得用户可以随时随地对任意时间段内的Kafka原始数据进行分析。

相关文章
|
3月前
|
机器学习/深度学习 人工智能 专有云
人工智能平台PAI使用问题之怎么将DLC的数据写入到另一个阿里云主账号的OSS中
阿里云人工智能平台PAI是一个功能强大、易于使用的AI开发平台,旨在降低AI开发门槛,加速创新,助力企业和开发者高效构建、部署和管理人工智能应用。其中包含了一系列相互协同的产品与服务,共同构成一个完整的人工智能开发与应用生态系统。以下是对PAI产品使用合集的概述,涵盖数据处理、模型开发、训练加速、模型部署及管理等多个环节。
|
6天前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
15天前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
18 1
|
2月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
159 9
|
2月前
|
分布式计算 搜索推荐 物联网
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
|
2月前
|
消息中间件 负载均衡 Kafka
Kafka 实现负载均衡与故障转移:深入分析 Kafka 的架构特点与实践
【8月更文挑战第24天】Apache Kafka是一款专为实时数据处理和流传输设计的高性能消息系统。其核心设计注重高吞吐量、低延迟与可扩展性,并具备出色的容错能力。Kafka采用分布式日志概念,通过数据分区及副本机制确保数据可靠性和持久性。系统包含Producer(消息生产者)、Consumer(消息消费者)和Broker(消息服务器)三大组件。Kafka利用独特的分区机制实现负载均衡,每个Topic可以被划分为多个分区,每个分区可以被复制到多个Broker上,确保数据的高可用性和可靠性。
55 2
|
2月前
|
消息中间件 存储 算法
时间轮在Kafka的实践:技术深度剖析
【8月更文挑战第13天】在分布式消息系统Kafka中,时间轮(Timing Wheel)作为一种高效的时间调度机制,被广泛应用于处理各种延时操作,如延时生产、延时拉取和延时删除等。本文将深入探讨时间轮在Kafka中的实践应用,解析其技术原理、优势及具体实现方式。
96 2
|
2月前
|
消息中间件 安全 Kafka
"深入实践Kafka多线程Consumer:案例分析、实现方式、优缺点及高效数据处理策略"
【8月更文挑战第10天】Apache Kafka是一款高性能的分布式流处理平台,以高吞吐量和可扩展性著称。为提升数据处理效率,常采用多线程消费Kafka数据。本文通过电商订单系统的案例,探讨了多线程Consumer的实现方法及其利弊,并提供示例代码。案例展示了如何通过并行处理加快订单数据的处理速度,确保数据正确性和顺序性的同时最大化资源利用。多线程Consumer有两种主要模式:每线程一个实例和单实例多worker线程。前者简单易行但资源消耗较大;后者虽能解耦消息获取与处理,却增加了系统复杂度。通过合理设计,多线程Consumer能够有效支持高并发数据处理需求。
130 4
|
2月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
63 3
|
2月前
|
存储 安全 大数据
对象存储的意义:探索数据新纪元的关键基石
在信息爆炸时代,数据成为核心资产,而高效安全的数据存储至关重要。对象存储作为一种新兴技术,起源于20世纪90年代,旨在解决传统文件系统的局限性。随着云计算和大数据技术的发展,它已成为关键技术之一。对象存储具备高可扩展性、高可靠性、低成本、易于管理和多协议支持等优点。它支撑大数据发展、推动云计算繁荣、助力企业数字化转型并保障数据安全。未来,对象存储将进一步提升性能,实现智能化管理,并与边缘计算融合,获得政策支持,成为数据新时代的关键基石。
143 3

热门文章

最新文章