Apache Kafka + 向量数据库 + LLM = 实时 GenAI

本文涉及的产品
模型训练 PAI-DLC,5000CU*H 3个月
模型在线服务 PAI-EAS,A10/V100等 500元 1个月
交互式建模 PAI-DSW,5000CU*H 3个月
简介: 生成式AI(GenAI)革新了企业架构,催生新数据集成模式与最佳实践。借助Apache Kafka与Apache Flink,企业能高效处理大规模实时数据,连接各类数据库与分析平台。Kafka作为核心组件,支持GenAI应用如服务台自动化、聊天机器人及内容审核。结合大型语言模型(LLM)、检索增强生成(RAG)与向量数据库,Kafka与Flink共同打造强大数据流处理能力,克服GenAI挑战,如昂贵训练成本、数据时效性与准确性。通过语义搜索与RAG设计模式,确保LLM生成内容可靠无误。

生成式 AI (GenAI) 支持高级 AI 用例和创新,但也改变了企业架构的外观。大型语言模型 (LLM)、向量数据库和检索增强生成 (RAG) 需要新的数据集成模式和数据工程最佳实践。使用 Apache Kafka 和 Apache Flink 进行数据流处理是大规模实时摄取和管理传入数据集、连接各种数据库和分析平台以及分离独立业务部门和数据产品的关键。这篇博客文章探讨了事件流与传统请求-响应 API 和数据库之间可能的架构、示例和权衡。

Apache Kafka 和 GenAI 的用例
生成式 AI (GenAI) 是用于自然语言处理 (NLP)、图像生成、代码优化和其他任务的下一代 AI 引擎。它可以帮助现实世界中的许多项目实现服务台自动化、与聊天机器人的客户对话、社交网络中的内容审核以及许多其他用例。

Apache Kafka 成为这些机器学习平台中的主要编排层,用于集成各种数据源、大规模处理和实时模型推理。

使用 Kafka 进行数据流传输已经为许多 GenAI 基础设施和软件产品提供支持。可能会出现非常不同的情况:

作为整个机器学习基础设施的数据结构进行数据流式处理
使用流处理进行模型评分,用于实时预测和内容生成
使用输入文本、语音或图像生成流数据管道
大型语言模型的实时在线训练
我在博客文章“Apache Kafka as Mission Critical Data Fabric for GenAI”中探讨了这些用例,包括 Expedia、BMW 和 Tinder 等真实示例。

下面介绍了大型语言模型 (LLM)、检索增强生成 (RAG) 与向量数据库和语义搜索以及 Apache Kafka 和 Flink 的数据流相结合的具体架构。

为什么生成式 AI 与传统机器学习架构不同
机器学习 (ML) 使计算机能够找到隐藏的见解,而无需编程查找位置。这称为模型训练,即分析大数据集的批处理过程。输出是一个二进制文件,即分析模型。

应用程序将这些模型应用于新的传入事件以进行预测。这称为模型评分,可以通过将模型嵌入到应用程序中或对模型服务器(部署模型)执行请求-响应 API 调用来实时或批量进行。

然而,与传统的 ML 流程相比,LLM 和 GenAI 具有不同的需求和模式,正如我的前同事 Michael Drogalis 在两个简单明了的图表中解释的那样。

具有复杂数据工程的传统预测机器学习
预测性人工智能进行预测。专用模型。线下培训。这就是我们在过去十年左右的时间里进行机器学习的方式。

在传统的 ML 中,大多数数据工程工作都在模型创建时进行。特征工程和模型训练需要大量的专业知识和努力:

新的用例需要由数据工程师和数据科学家构建的新模型。

使用大型语言模型的生成式 AI 的 AI 民主化 (LLM)
生成式人工智能 (GenAI) 创建内容。可重复使用的模型。情境学习。

但是对于大型语言模型,数据工程会发生在每个查询中。不同的应用程序重用相同的模型:

大型语言模型在GenAI用例中的挑战
大型语言模型 (LLM) 是可重用的。这使得人工智能的民主化成为可能,因为并非每个团队都需要人工智能专业知识。取而代之的是,低 AI 专业知识足以使用现有的 LLM。

但是,LLM 存在一些巨大的权衡:

昂贵的培训:像 ChatGPT 这样的 LLM 花费了数百万美元的计算资源(这不包括构建模型所需的专业知识)
静态数据:LLM 是“时间冻结”的,这意味着模型没有最新的信息。
缺乏领域知识:LLM 通常从公共数据集中学习。因此,数据工程师抓取万维网并将其输入到模型训练中。但是,企业需要在自己的环境中使用 LMM 来提供业务价值。
愚蠢:LLM 不像人类那样聪明。例如,ChatGPT 甚至无法计算您提示它的句子中的单词数。
这些挑战造成了所谓的幻觉......

避免幻觉以产生可靠的答案
幻觉,即最佳猜测答案,是结果,LLM 不会告诉你它是在编造事情。幻觉是一种现象,其中 AI 模型生成的内容不是基于真实数据或信息,而是创建完全虚构或不切实际的输出。当生成模型(如文本或图像生成器)生成的内容不连贯、不真实或与输入数据或上下文不相关时,就会发生幻觉。这些幻觉可以表现为文本、图像或其他类型的内容,这些内容看似合理,但完全是由模型捏造的。

幻觉在生成式人工智能中可能是个问题,因为它们可能导致产生误导性或虚假信息。

由于这些原因,生成式 AI 出现了一种新的设计模式:检索增强生成 (RAG)。让我们首先看一下这个新的最佳实践,然后探讨为什么使用 Apache Kafka 和 Flink 等技术进行数据流是 GenAI 企业架构的基本要求。

语义搜索和检索增强生成 (RAG)
许多支持 GenAI 的应用程序都遵循检索增强生成 (RAG) 的设计模式,将 LLM 与准确和最新的上下文相结合。Pinecone(一个完全托管的矢量数据库)背后的团队使用这张图有一个很好的解释:

在较高级别上,RAG 通常是两个独立的步骤。第一个是数据增强步骤,其中将不同的(通常是非结构化的)操作数据分块,然后使用嵌入模型创建嵌入。嵌入被索引到向量数据库中。向量数据库是一种工具,用于启用语义搜索,以查找不需要精确关键字匹配的提示的相关上下文。

其次是推理步骤,GenAI 模型接收问题和上下文以生成可靠的答案(没有幻觉)。RAG 不会更新嵌入,但会检索相关信息以与提示一起发送到 LLM。

用于带嵌入的语义搜索的向量数据库
向量数据库,也称为向量存储或向量索引,是一种专门设计用于有效存储和检索向量数据的数据库。在此上下文中,向量数据是指数值向量的集合,这些向量可以表示各种数据类型,例如文本、图像、音频或任何其他结构化或非结构化数据的嵌入。向量数据库在与机器学习、数据检索、推荐系统、相似性搜索等相关的应用程序中非常有用。

向量数据库擅长执行相似性搜索,通常称为语义搜索。他们可以根据各种相似性指标(例如余弦相似性或欧几里得距离)快速找到与给定查询向量相似或接近的向量。

向量数据库(不一定)是一个单独的数据库类别。Gradient Flow 解释了其检索增强生成的最佳实践:

“向量搜索不再局限于向量数据库。许多数据管理系统(包括 PostgreSQL)现在都支持向量搜索。根据您的具体应用,您可能会找到满足您特定需求的系统。近实时还是流媒体是优先事项?查看 Rockset 的产品。您已经在使用知识图谱了吗?Neo4j对向量搜索的支持意味着您的RAG结果将更容易解释和可视化。

对于另一个具体示例,请查看 MongoDB 的教程“使用 MongoDB 构建生成式 AI 应用程序:利用 Atlas 向量搜索和开源模型的力量”。将用于 GenAI 用例的向量数据库与 Apache Kafka 相结合有多种选择。以下是事件驱动世界中可能的体系结构。

事件驱动架构:数据流 + Vector DB + LLM
事件驱动的应用程序可以更有效地实现检索增强生成 (RAG)、数据增强和模型推理这两个步骤。使用 Apache Kafka 和 Apache Flink 进行数据流式处理可实现任何规模的数据(如果应用程序或数据库可以处理,则实时同步)和数据管理(= 流式 ETL)的一致同步。

下图显示了一个企业架构,该架构利用事件驱动的数据流在整个 GenAI 管道中引入和处理数据:

此示例使用数据流将航班预订和更改实时摄取到 Kafka 的事件存储中,以便以后使用 GenAI 技术进行处理。Flink 在调用嵌入模型为向量数据库生成嵌入之前,会对数据进行预处理。同时,使用 Python 构建的实时客户服务应用程序使用所有相关的上下文数据(例如,航班数据、客户数据、嵌入等)来提示大型语言模型。LLM 创建可靠的预测,例如建议将乘客重新预订到另一个航班。

在大多数企业方案中,出于安全和数据隐私原因,所有处理都在企业防火墙后面运行。LLM甚至可以与交易系统(如预订引擎)集成,以执行重新预订并将结果输入相关应用程序和数据库。

使用 API 的请求-响应与事件驱动的数据流
在一个理想的世界里,一切都是基于事件和流数据的。现实世界是不同的。因此,在企业架构的某些部分,使用 HTTP/REST 或 SQL 的请求-响应进行 API 调用是完全可以的。由于 Kafka 真正解耦了系统,因此每个应用程序都会选择自己的通信范式和处理速度。因此,了解 HTTP/REST API 和 Apache Kafka 之间的权衡非常重要。

何时在 Apache Kafka 中使用 Request-Response?— 此决策通常是根据延迟、解耦或安全性等权衡做出的。然而,对于大型 LLM,情况发生了变化。由于 LLM 的训练成本非常高,因此现有 LLM 的可重用性至关重要。将 LLM 嵌入到 Kafka Streams 或 Flink 应用程序中几乎没有意义,这与使用决策树、聚类甚至小型神经网络等算法构建的其他模型相反。

同样,增强模型通常通过 RPC/API 调用进行集成。通过将其嵌入到 Kafka Streams 微服务或 Flink 作业中,增强模型变得紧密耦合。如今,专家们主持了其中许多项目,因为运营和优化它们并非易事。

托管 LLM 和增强模型的解决方案通常只提供 RPC 接口,如 HTTP。这种情况将来可能会改变,因为请求-响应是流数据的反模式。模型服务器演变的一个很好的例子是 Seldon;同时提供 Kafka 原生接口。在文章 Streaming Machine Learning with Kafka-native Model Deployment 中阅读有关请求-响应与流式处理模型服务的更多信息。

LLM与企业其他部分之间的直接集成
在撰写本文时,OpenAI 宣布使用 GPT 创建 ChatGPT 的自定义版本,该版本结合了指令、额外知识和任何技能组合。对于企业使用,最有趣的功能是开发人员可以将 OpenAI 的 GPT 连接到现实世界,即其他软件应用程序、数据库和云服务:

“除了使用我们的内置功能外,您还可以通过使一个或多个 API 可供 GPT 使用来定义自定义操作。与插件一样,动作允许 GPT 集成外部数据或与现实世界交互。将 GPT 连接到数据库,将它们插入电子邮件,或将它们设为您的购物助手。例如,您可以集成旅行列表数据库,连接用户的电子邮件收件箱,或促进电子商务订单。

使用直接集成的权衡是紧密耦合和点对点通信。如果您已经使用过 Kafka,那么您就会理解具有真正解耦的领域驱动设计的价值。

最后但并非最不重要的一点是,公共 GenAI API 和 LLM 的安全性和治理策略薄弱。随着 AI 数据的出现和点对点集成数量的增加,数据访问、沿袭和安全挑战不断升级。

在实践中使用 Kafka、Flink 和 GenAI 进行数据流传输
在进行了大量的理论研究之后,让我们看一个具体的例子、一个演示和一个将数据流与 GenAI 相结合的真实案例研究:

示例:Flink SQL + OpenAI API
演示:ChatGPT 4 + Confluent Cloud + MongoDB Atlas 用于 RAG 和矢量搜索
成功案例:Elemental Cognition - 由 Confluent Cloud 提供支持的实时 AI 平台
示例:Flink SQL + OpenAI API
使用 Kafka 和 Flink 进行流处理,实现实时和历史数据的数据关联。一个很好的例子,特别是对于生成式人工智能,是特定于上下文的客户服务。我们在这里停留航空公司示例和航班取消。

有状态流处理器从 CRM、忠诚度平台和其他应用程序中获取现有客户信息,将其与客户的查询关联到聊天机器人中,并对 LLM 进行 RPC 调用。

下图使用带有 Flink SQL 用户定义函数 (UDF) 的 Apache Flink。SQL 查询将预处理的数据馈送到 OpenAI API 中,以获得可靠的答案。答案被发送到另一个 Kafka 主题,下游应用程序从那里使用它,例如,用于机票重新预订、更新忠诚度平台,以及将数据存储在数据湖中以供以后的批处理和分析。

演示:ChatGPT 4 + Confluent Cloud + MongoDB Atlas 用于 RAG 和矢量搜索
我的同事布里顿·拉罗什(Britton LaRoche)构建了一个精彩的零售演示,展示了用于数据集成和处理的Kafka与用于存储和语义向量搜索的MongoDB的组合。D-ID 是一个 AI 视频创作平台,通过用视觉 AI 头像替换命令行界面,使演示更加美观。

Confluent Cloud 和 MongoDB Atlas 的完全托管和深度集成服务使我们能够专注于构建业务逻辑。

该体系结构与我上面的基于事件的流式处理示例不同。核心仍然是 Kafka,以真正解耦应用程序。大多数服务都是通过请求-响应 API 集成的。这很简单,很容易理解,而且通常足够好。您以后可以使用 Python Kafka API、从 Kafka 更改数据捕获 (CDC)、将 LangChain Python UDF 嵌入到 Apache Flink 或使用异步接口(如 AsyncAPI)轻松迁移到基于事件的模式。

下面是一个简短的 5 分钟演示,引导您完成使用 RAG 和语义搜索的演示,使用 MongoDB Atlas、Confluent 作为集成中心和 D-ID 作为与最终用户的通信接口:

成功案例:Elemental Cognition — 由 Kafka 和 Confluent Cloud 提供支持的实时 GenAI 平台
David Ferrucci 博士是著名的 AI 研究员,也是 IBM 突破性 Watson 技术的发明者,他于 2015 年创立了 Elemental Cognition。该公司利用 GenAI 来加速和改进关键决策,在这些决策中,信任、准确性和透明度至关重要。

Elemental Cognition 技术可以跨行业和用例使用。主要目标是医疗保健/生命科学、投资管理、智能、物流和调度以及联络中心。

AI 平台开发负责任和透明的 AI,帮助解决问题并提供可以理解和信任的专业知识。

Elemental Cognition 的方法将不同的 AI 策略结合在一个新颖的架构中,该架构获取和推理人类可读的知识,以协作和动态的方式解决问题。其结果是将专家解决问题的智能以更透明、更具成本效益的方式交付到对话和发现应用程序中。

Confluent Cloud 为 AI 平台提供支持,以实现可扩展的实时数据和数据集成用例。我建议查看他们的网站,从各种令人印象深刻的用例中学习。

Apache Kafka 作为 GenAI 企业架构的中枢神经系统
生成式 AI (GenAI) 需要对 AI/ML 企业架构进行更改。增强模型、LLM、带有向量数据库的 RAG 和语义搜索需要数据集成、关联和解耦。使用 Kafka 和 Flink 进行数据流处理可以提供帮助。

许多应用程序和数据库使用与 REST/HTTP、SQL 或其他接口的请求-响应通信。这完全没问题。为您的数据产品和应用程序选择正确的技术和集成层。但要确保数据的一致性。

使用 Apache Kafka 和 Apache Flink 进行数据流处理使开发人员和数据工程师能够专注于其数据产品或集成项目中的业务问题,因为它真正解耦了不同的领域。可以使用 HTTP、Kafka API、AsyncAPI、来自数据库的 CDC、SaaS 接口和许多其他选项与 Kafka 集成。

Kafka 支持将系统与任何通信范式连接。其事件存储以毫秒为单位共享数据(即使在极端规模下),但也会为较慢的下游应用程序保留数据并重播历史数据。数据网格的核心必须实时跳动。对于任何好的企业架构来说都是如此。GenAI也不例外。

如何利用 Apache Kafka 构建对话式 AI、聊天机器人和其他 GenAI 应用程序?你是否使用 Flink 和向量数据库实时构建了 RAG,以便为 LLM 提供正确的上下文?让我们在 LinkedIn 上联系并讨论一下!通过订阅我的时事通讯,随时了解新的博客文章。

相关实践学习
使用PAI-EAS一键部署ChatGLM及LangChain应用
本场景中主要介绍如何使用模型在线服务(PAI-EAS)部署ChatGLM的AI-Web应用以及启动WebUI进行模型推理,并通过LangChain集成自己的业务数据。
机器学习概览及常见算法
机器学习(Machine Learning, ML)是人工智能的核心,专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能,它是使计算机具有智能的根本途径,其应用遍及人工智能的各个领域。 本课程将带你入门机器学习,掌握机器学习的概念和常用的算法。
目录
相关文章
|
18天前
|
消息中间件 前端开发 Kafka
【Azure 事件中心】使用Apache Flink 连接 Event Hubs 出错 Kafka error: No resolvable bootstrap urls
【Azure 事件中心】使用Apache Flink 连接 Event Hubs 出错 Kafka error: No resolvable bootstrap urls
|
21天前
|
存储 消息中间件 人工智能
AI大模型独角兽 MiniMax 基于阿里云数据库 SelectDB 版内核 Apache Doris 升级日志系统,PB 数据秒级查询响应
早期 MiniMax 基于 Grafana Loki 构建了日志系统,在资源消耗、写入性能及系统稳定性上都面临巨大的挑战。为此 MiniMax 开始寻找全新的日志系统方案,并基于阿里云数据库 SelectDB 版内核 Apache Doris 升级了日志系统,新系统已接入 MiniMax 内部所有业务线日志数据,数据规模为 PB 级, 整体可用性达到 99.9% 以上,10 亿级日志数据的检索速度可实现秒级响应。
AI大模型独角兽 MiniMax 基于阿里云数据库 SelectDB 版内核 Apache Doris 升级日志系统,PB 数据秒级查询响应
|
19天前
|
消息中间件 Kafka 数据库
深入理解Kafka的数据一致性原理及其与传统数据库的对比
【8月更文挑战第24天】在分布式系统中,确保数据一致性至关重要。传统数据库利用ACID原则保障事务完整性;相比之下,Kafka作为高性能消息队列,采用副本机制与日志结构确保数据一致性。通过同步所有副本上的数据、维护消息顺序以及支持生产者的幂等性操作,Kafka在不牺牲性能的前提下实现了高可用性和数据可靠性。这些特性使Kafka成为处理大规模数据流的理想工具。
38 6
|
19天前
|
消息中间件 Java Kafka
【Azure 事件中心】在微软云中国区 (Mooncake) 上实验以Apache Kafka协议方式发送/接受Event Hubs消息 (Java版)
【Azure 事件中心】在微软云中国区 (Mooncake) 上实验以Apache Kafka协议方式发送/接受Event Hubs消息 (Java版)
|
22天前
|
消息中间件 Java Kafka
|
14天前
|
消息中间件 Kafka 数据处理
实时数据流处理:Dask Streams 与 Apache Kafka 集成
【8月更文第29天】在现代数据处理领域,实时数据流处理已经成为不可或缺的一部分。随着物联网设备、社交媒体和其他实时数据源的普及,处理这些高吞吐量的数据流成为了一项挑战。Apache Kafka 作为一种高吞吐量的消息队列服务,被广泛应用于实时数据流处理场景中。Dask Streams 是 Dask 库的一个子模块,它为 Python 开发者提供了一个易于使用的实时数据流处理框架。本文将介绍如何将 Dask Streams 与 Apache Kafka 结合使用,以实现高效的数据流处理。
20 0
|
18天前
|
消息中间件 Java Kafka
【Azure 事件中心】开启 Apache Flink 制造者 Producer 示例代码中的日志输出 (连接 Azure Event Hub Kafka 终结点)
【Azure 事件中心】开启 Apache Flink 制造者 Producer 示例代码中的日志输出 (连接 Azure Event Hub Kafka 终结点)
|
28天前
|
消息中间件 存储 Ubuntu
在Ubuntu 14.04上安装Apache Kafka的方法
在Ubuntu 14.04上安装Apache Kafka的方法
14 0
|
21天前
|
SQL 关系型数据库 MySQL
【揭秘】MySQL binlog日志与GTID:如何让数据库备份恢复变得轻松简单?
【8月更文挑战第22天】MySQL的binlog日志记录数据变更,用于恢复、复制和点恢复;GTID为每笔事务分配唯一ID,简化复制和恢复流程。开启binlog和GTID后,可通过`mysqldump`进行逻辑备份,包含binlog位置信息,或用`xtrabackup`做物理备份。恢复时,使用`mysql`命令执行备份文件,或通过`innobackupex`恢复物理备份。GTID模式下的主从复制配置更简便。
91 2
|
16天前
|
弹性计算 关系型数据库 数据库
手把手带你从自建 MySQL 迁移到云数据库,一步就能脱胎换骨
阿里云瑶池数据库来开课啦!自建数据库迁移至云数据库 RDS原来只要一步操作就能搞定!点击阅读原文完成实验就可获得一本日历哦~

推荐镜像

更多