Apache Kafka + 向量数据库 + LLM = 实时 GenAI

本文涉及的产品
模型训练 PAI-DLC,5000CU*H 3个月
模型在线服务 PAI-EAS,A10/V100等 500元 1个月
交互式建模 PAI-DSW,每月250计算时 3个月
简介: 生成式AI(GenAI)革新了企业架构,催生新数据集成模式与最佳实践。借助Apache Kafka与Apache Flink,企业能高效处理大规模实时数据,连接各类数据库与分析平台。Kafka作为核心组件,支持GenAI应用如服务台自动化、聊天机器人及内容审核。结合大型语言模型(LLM)、检索增强生成(RAG)与向量数据库,Kafka与Flink共同打造强大数据流处理能力,克服GenAI挑战,如昂贵训练成本、数据时效性与准确性。通过语义搜索与RAG设计模式,确保LLM生成内容可靠无误。

生成式 AI (GenAI) 支持高级 AI 用例和创新,但也改变了企业架构的外观。大型语言模型 (LLM)、向量数据库和检索增强生成 (RAG) 需要新的数据集成模式和数据工程最佳实践。使用 Apache Kafka 和 Apache Flink 进行数据流处理是大规模实时摄取和管理传入数据集、连接各种数据库和分析平台以及分离独立业务部门和数据产品的关键。这篇博客文章探讨了事件流与传统请求-响应 API 和数据库之间可能的架构、示例和权衡。

Apache Kafka 和 GenAI 的用例
生成式 AI (GenAI) 是用于自然语言处理 (NLP)、图像生成、代码优化和其他任务的下一代 AI 引擎。它可以帮助现实世界中的许多项目实现服务台自动化、与聊天机器人的客户对话、社交网络中的内容审核以及许多其他用例。

Apache Kafka 成为这些机器学习平台中的主要编排层,用于集成各种数据源、大规模处理和实时模型推理。

使用 Kafka 进行数据流传输已经为许多 GenAI 基础设施和软件产品提供支持。可能会出现非常不同的情况:

作为整个机器学习基础设施的数据结构进行数据流式处理
使用流处理进行模型评分,用于实时预测和内容生成
使用输入文本、语音或图像生成流数据管道
大型语言模型的实时在线训练
我在博客文章“Apache Kafka as Mission Critical Data Fabric for GenAI”中探讨了这些用例,包括 Expedia、BMW 和 Tinder 等真实示例。

下面介绍了大型语言模型 (LLM)、检索增强生成 (RAG) 与向量数据库和语义搜索以及 Apache Kafka 和 Flink 的数据流相结合的具体架构。

为什么生成式 AI 与传统机器学习架构不同
机器学习 (ML) 使计算机能够找到隐藏的见解,而无需编程查找位置。这称为模型训练,即分析大数据集的批处理过程。输出是一个二进制文件,即分析模型。

应用程序将这些模型应用于新的传入事件以进行预测。这称为模型评分,可以通过将模型嵌入到应用程序中或对模型服务器(部署模型)执行请求-响应 API 调用来实时或批量进行。

然而,与传统的 ML 流程相比,LLM 和 GenAI 具有不同的需求和模式,正如我的前同事 Michael Drogalis 在两个简单明了的图表中解释的那样。

具有复杂数据工程的传统预测机器学习
预测性人工智能进行预测。专用模型。线下培训。这就是我们在过去十年左右的时间里进行机器学习的方式。

在传统的 ML 中,大多数数据工程工作都在模型创建时进行。特征工程和模型训练需要大量的专业知识和努力:

新的用例需要由数据工程师和数据科学家构建的新模型。

使用大型语言模型的生成式 AI 的 AI 民主化 (LLM)
生成式人工智能 (GenAI) 创建内容。可重复使用的模型。情境学习。

但是对于大型语言模型,数据工程会发生在每个查询中。不同的应用程序重用相同的模型:

大型语言模型在GenAI用例中的挑战
大型语言模型 (LLM) 是可重用的。这使得人工智能的民主化成为可能,因为并非每个团队都需要人工智能专业知识。取而代之的是,低 AI 专业知识足以使用现有的 LLM。

但是,LLM 存在一些巨大的权衡:

昂贵的培训:像 ChatGPT 这样的 LLM 花费了数百万美元的计算资源(这不包括构建模型所需的专业知识)
静态数据:LLM 是“时间冻结”的,这意味着模型没有最新的信息。
缺乏领域知识:LLM 通常从公共数据集中学习。因此,数据工程师抓取万维网并将其输入到模型训练中。但是,企业需要在自己的环境中使用 LMM 来提供业务价值。
愚蠢:LLM 不像人类那样聪明。例如,ChatGPT 甚至无法计算您提示它的句子中的单词数。
这些挑战造成了所谓的幻觉......

避免幻觉以产生可靠的答案
幻觉,即最佳猜测答案,是结果,LLM 不会告诉你它是在编造事情。幻觉是一种现象,其中 AI 模型生成的内容不是基于真实数据或信息,而是创建完全虚构或不切实际的输出。当生成模型(如文本或图像生成器)生成的内容不连贯、不真实或与输入数据或上下文不相关时,就会发生幻觉。这些幻觉可以表现为文本、图像或其他类型的内容,这些内容看似合理,但完全是由模型捏造的。

幻觉在生成式人工智能中可能是个问题,因为它们可能导致产生误导性或虚假信息。

由于这些原因,生成式 AI 出现了一种新的设计模式:检索增强生成 (RAG)。让我们首先看一下这个新的最佳实践,然后探讨为什么使用 Apache Kafka 和 Flink 等技术进行数据流是 GenAI 企业架构的基本要求。

语义搜索和检索增强生成 (RAG)
许多支持 GenAI 的应用程序都遵循检索增强生成 (RAG) 的设计模式,将 LLM 与准确和最新的上下文相结合。Pinecone(一个完全托管的矢量数据库)背后的团队使用这张图有一个很好的解释:

在较高级别上,RAG 通常是两个独立的步骤。第一个是数据增强步骤,其中将不同的(通常是非结构化的)操作数据分块,然后使用嵌入模型创建嵌入。嵌入被索引到向量数据库中。向量数据库是一种工具,用于启用语义搜索,以查找不需要精确关键字匹配的提示的相关上下文。

其次是推理步骤,GenAI 模型接收问题和上下文以生成可靠的答案(没有幻觉)。RAG 不会更新嵌入,但会检索相关信息以与提示一起发送到 LLM。

用于带嵌入的语义搜索的向量数据库
向量数据库,也称为向量存储或向量索引,是一种专门设计用于有效存储和检索向量数据的数据库。在此上下文中,向量数据是指数值向量的集合,这些向量可以表示各种数据类型,例如文本、图像、音频或任何其他结构化或非结构化数据的嵌入。向量数据库在与机器学习、数据检索、推荐系统、相似性搜索等相关的应用程序中非常有用。

向量数据库擅长执行相似性搜索,通常称为语义搜索。他们可以根据各种相似性指标(例如余弦相似性或欧几里得距离)快速找到与给定查询向量相似或接近的向量。

向量数据库(不一定)是一个单独的数据库类别。Gradient Flow 解释了其检索增强生成的最佳实践:

“向量搜索不再局限于向量数据库。许多数据管理系统(包括 PostgreSQL)现在都支持向量搜索。根据您的具体应用,您可能会找到满足您特定需求的系统。近实时还是流媒体是优先事项?查看 Rockset 的产品。您已经在使用知识图谱了吗?Neo4j对向量搜索的支持意味着您的RAG结果将更容易解释和可视化。

对于另一个具体示例,请查看 MongoDB 的教程“使用 MongoDB 构建生成式 AI 应用程序:利用 Atlas 向量搜索和开源模型的力量”。将用于 GenAI 用例的向量数据库与 Apache Kafka 相结合有多种选择。以下是事件驱动世界中可能的体系结构。

事件驱动架构:数据流 + Vector DB + LLM
事件驱动的应用程序可以更有效地实现检索增强生成 (RAG)、数据增强和模型推理这两个步骤。使用 Apache Kafka 和 Apache Flink 进行数据流式处理可实现任何规模的数据(如果应用程序或数据库可以处理,则实时同步)和数据管理(= 流式 ETL)的一致同步。

下图显示了一个企业架构,该架构利用事件驱动的数据流在整个 GenAI 管道中引入和处理数据:

此示例使用数据流将航班预订和更改实时摄取到 Kafka 的事件存储中,以便以后使用 GenAI 技术进行处理。Flink 在调用嵌入模型为向量数据库生成嵌入之前,会对数据进行预处理。同时,使用 Python 构建的实时客户服务应用程序使用所有相关的上下文数据(例如,航班数据、客户数据、嵌入等)来提示大型语言模型。LLM 创建可靠的预测,例如建议将乘客重新预订到另一个航班。

在大多数企业方案中,出于安全和数据隐私原因,所有处理都在企业防火墙后面运行。LLM甚至可以与交易系统(如预订引擎)集成,以执行重新预订并将结果输入相关应用程序和数据库。

使用 API 的请求-响应与事件驱动的数据流
在一个理想的世界里,一切都是基于事件和流数据的。现实世界是不同的。因此,在企业架构的某些部分,使用 HTTP/REST 或 SQL 的请求-响应进行 API 调用是完全可以的。由于 Kafka 真正解耦了系统,因此每个应用程序都会选择自己的通信范式和处理速度。因此,了解 HTTP/REST API 和 Apache Kafka 之间的权衡非常重要。

何时在 Apache Kafka 中使用 Request-Response?— 此决策通常是根据延迟、解耦或安全性等权衡做出的。然而,对于大型 LLM,情况发生了变化。由于 LLM 的训练成本非常高,因此现有 LLM 的可重用性至关重要。将 LLM 嵌入到 Kafka Streams 或 Flink 应用程序中几乎没有意义,这与使用决策树、聚类甚至小型神经网络等算法构建的其他模型相反。

同样,增强模型通常通过 RPC/API 调用进行集成。通过将其嵌入到 Kafka Streams 微服务或 Flink 作业中,增强模型变得紧密耦合。如今,专家们主持了其中许多项目,因为运营和优化它们并非易事。

托管 LLM 和增强模型的解决方案通常只提供 RPC 接口,如 HTTP。这种情况将来可能会改变,因为请求-响应是流数据的反模式。模型服务器演变的一个很好的例子是 Seldon;同时提供 Kafka 原生接口。在文章 Streaming Machine Learning with Kafka-native Model Deployment 中阅读有关请求-响应与流式处理模型服务的更多信息。

LLM与企业其他部分之间的直接集成
在撰写本文时,OpenAI 宣布使用 GPT 创建 ChatGPT 的自定义版本,该版本结合了指令、额外知识和任何技能组合。对于企业使用,最有趣的功能是开发人员可以将 OpenAI 的 GPT 连接到现实世界,即其他软件应用程序、数据库和云服务:

“除了使用我们的内置功能外,您还可以通过使一个或多个 API 可供 GPT 使用来定义自定义操作。与插件一样,动作允许 GPT 集成外部数据或与现实世界交互。将 GPT 连接到数据库,将它们插入电子邮件,或将它们设为您的购物助手。例如,您可以集成旅行列表数据库,连接用户的电子邮件收件箱,或促进电子商务订单。

使用直接集成的权衡是紧密耦合和点对点通信。如果您已经使用过 Kafka,那么您就会理解具有真正解耦的领域驱动设计的价值。

最后但并非最不重要的一点是,公共 GenAI API 和 LLM 的安全性和治理策略薄弱。随着 AI 数据的出现和点对点集成数量的增加,数据访问、沿袭和安全挑战不断升级。

在实践中使用 Kafka、Flink 和 GenAI 进行数据流传输
在进行了大量的理论研究之后,让我们看一个具体的例子、一个演示和一个将数据流与 GenAI 相结合的真实案例研究:

示例:Flink SQL + OpenAI API
演示:ChatGPT 4 + Confluent Cloud + MongoDB Atlas 用于 RAG 和矢量搜索
成功案例:Elemental Cognition - 由 Confluent Cloud 提供支持的实时 AI 平台
示例:Flink SQL + OpenAI API
使用 Kafka 和 Flink 进行流处理,实现实时和历史数据的数据关联。一个很好的例子,特别是对于生成式人工智能,是特定于上下文的客户服务。我们在这里停留航空公司示例和航班取消。

有状态流处理器从 CRM、忠诚度平台和其他应用程序中获取现有客户信息,将其与客户的查询关联到聊天机器人中,并对 LLM 进行 RPC 调用。

下图使用带有 Flink SQL 用户定义函数 (UDF) 的 Apache Flink。SQL 查询将预处理的数据馈送到 OpenAI API 中,以获得可靠的答案。答案被发送到另一个 Kafka 主题,下游应用程序从那里使用它,例如,用于机票重新预订、更新忠诚度平台,以及将数据存储在数据湖中以供以后的批处理和分析。

演示:ChatGPT 4 + Confluent Cloud + MongoDB Atlas 用于 RAG 和矢量搜索
我的同事布里顿·拉罗什(Britton LaRoche)构建了一个精彩的零售演示,展示了用于数据集成和处理的Kafka与用于存储和语义向量搜索的MongoDB的组合。D-ID 是一个 AI 视频创作平台,通过用视觉 AI 头像替换命令行界面,使演示更加美观。

Confluent Cloud 和 MongoDB Atlas 的完全托管和深度集成服务使我们能够专注于构建业务逻辑。

该体系结构与我上面的基于事件的流式处理示例不同。核心仍然是 Kafka,以真正解耦应用程序。大多数服务都是通过请求-响应 API 集成的。这很简单,很容易理解,而且通常足够好。您以后可以使用 Python Kafka API、从 Kafka 更改数据捕获 (CDC)、将 LangChain Python UDF 嵌入到 Apache Flink 或使用异步接口(如 AsyncAPI)轻松迁移到基于事件的模式。

下面是一个简短的 5 分钟演示,引导您完成使用 RAG 和语义搜索的演示,使用 MongoDB Atlas、Confluent 作为集成中心和 D-ID 作为与最终用户的通信接口:

成功案例:Elemental Cognition — 由 Kafka 和 Confluent Cloud 提供支持的实时 GenAI 平台
David Ferrucci 博士是著名的 AI 研究员,也是 IBM 突破性 Watson 技术的发明者,他于 2015 年创立了 Elemental Cognition。该公司利用 GenAI 来加速和改进关键决策,在这些决策中,信任、准确性和透明度至关重要。

Elemental Cognition 技术可以跨行业和用例使用。主要目标是医疗保健/生命科学、投资管理、智能、物流和调度以及联络中心。

AI 平台开发负责任和透明的 AI,帮助解决问题并提供可以理解和信任的专业知识。

Elemental Cognition 的方法将不同的 AI 策略结合在一个新颖的架构中,该架构获取和推理人类可读的知识,以协作和动态的方式解决问题。其结果是将专家解决问题的智能以更透明、更具成本效益的方式交付到对话和发现应用程序中。

Confluent Cloud 为 AI 平台提供支持,以实现可扩展的实时数据和数据集成用例。我建议查看他们的网站,从各种令人印象深刻的用例中学习。

Apache Kafka 作为 GenAI 企业架构的中枢神经系统
生成式 AI (GenAI) 需要对 AI/ML 企业架构进行更改。增强模型、LLM、带有向量数据库的 RAG 和语义搜索需要数据集成、关联和解耦。使用 Kafka 和 Flink 进行数据流处理可以提供帮助。

许多应用程序和数据库使用与 REST/HTTP、SQL 或其他接口的请求-响应通信。这完全没问题。为您的数据产品和应用程序选择正确的技术和集成层。但要确保数据的一致性。

使用 Apache Kafka 和 Apache Flink 进行数据流处理使开发人员和数据工程师能够专注于其数据产品或集成项目中的业务问题,因为它真正解耦了不同的领域。可以使用 HTTP、Kafka API、AsyncAPI、来自数据库的 CDC、SaaS 接口和许多其他选项与 Kafka 集成。

Kafka 支持将系统与任何通信范式连接。其事件存储以毫秒为单位共享数据(即使在极端规模下),但也会为较慢的下游应用程序保留数据并重播历史数据。数据网格的核心必须实时跳动。对于任何好的企业架构来说都是如此。GenAI也不例外。

如何利用 Apache Kafka 构建对话式 AI、聊天机器人和其他 GenAI 应用程序?你是否使用 Flink 和向量数据库实时构建了 RAG,以便为 LLM 提供正确的上下文?让我们在 LinkedIn 上联系并讨论一下!通过订阅我的时事通讯,随时了解新的博客文章。

相关实践学习
使用PAI-EAS一键部署ChatGLM及LangChain应用
本场景中主要介绍如何使用模型在线服务(PAI-EAS)部署ChatGLM的AI-Web应用以及启动WebUI进行模型推理,并通过LangChain集成自己的业务数据。
机器学习概览及常见算法
机器学习(Machine Learning, ML)是人工智能的核心,专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能,它是使计算机具有智能的根本途径,其应用遍及人工智能的各个领域。 本课程将带你入门机器学习,掌握机器学习的概念和常用的算法。
目录
相关文章
|
26天前
|
消息中间件 安全 Kafka
Apache Kafka安全加固指南:保护你的消息传递系统
【10月更文挑战第24天】在现代企业环境中,数据的安全性和隐私保护至关重要。Apache Kafka作为一款广泛使用的分布式流处理平台,其安全性直接影响着业务的稳定性和用户数据的安全。作为一名资深的Kafka使用者,我深知加强Kafka安全性的重要性。本文将从个人角度出发,分享我在实践中积累的经验,帮助读者了解如何有效地保护Kafka消息传递系统的安全性。
54 7
|
26天前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
69 5
|
26天前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
64 4
|
26天前
|
消息中间件 监控 大数据
优化Apache Kafka性能:最佳实践与调优策略
【10月更文挑战第24天】作为一名已经对Apache Kafka有所了解并有实际使用经验的开发者,我深知在大数据处理和实时数据流传输中,Kafka的重要性不言而喻。然而,在面对日益增长的数据量和业务需求时,如何保证系统的高性能和稳定性成为了摆在我们面前的一个挑战。本文将从我的个人视角出发,分享一些关于如何通过合理的配置和调优来提高Kafka性能的经验和建议。
63 4
|
21天前
|
存储 SQL Apache
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
Apache Doris 是一个基于 MPP 架构的高性能实时分析数据库,以其极高的速度和易用性著称。它支持高并发点查询和复杂分析场景,适用于报表分析、即席查询、数据仓库和数据湖查询加速等。最新发布的 2.0.2 版本在性能、稳定性和多租户支持方面有显著提升。社区活跃,已广泛应用于电商、广告、用户行为分析等领域。
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
|
1天前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
9 1
|
26天前
|
消息中间件 Java Kafka
初识Apache Kafka:搭建你的第一个消息队列系统
【10月更文挑战第24天】在数字化转型的浪潮中,数据成为了企业决策的关键因素之一。而高效的数据处理能力,则成为了企业在竞争中脱颖而出的重要武器。在这个背景下,消息队列作为连接不同系统和服务的桥梁,其重要性日益凸显。Apache Kafka 是一款开源的消息队列系统,以其高吞吐量、可扩展性和持久性等特点受到了广泛欢迎。作为一名技术爱好者,我对 Apache Kafka 产生了浓厚的兴趣,并决定亲手搭建一套属于自己的消息队列系统。
46 2
初识Apache Kafka:搭建你的第一个消息队列系统
|
26天前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
56 2
|
1月前
|
消息中间件 监控 Kafka
Apache Kafka 成为实时数据流处理的关键组件
【10月更文挑战第8天】随着大数据技术的发展,Apache Kafka 成为实时数据流处理的关键组件。Kafka Manager 提供了一个简洁易用的 Web 界面,方便管理和监控 Kafka 集群。本文详细介绍了 Kafka Manager 的部署步骤和基本使用方法,包括配置文件修改、启动服务、创建和管理 Topic 等操作,帮助你快速上手。
50 3
|
24天前
|
消息中间件 Ubuntu Java
Ubuntu系统上安装Apache Kafka
Ubuntu系统上安装Apache Kafka

热门文章

最新文章

推荐镜像

更多
下一篇
无影云桌面