多模态流式 AI 编排平台,大规模减少新应用开发成本

简介: 多模态流式 AI 编排平台,大规模减少新应用开发成本

/作者/

胡子昊


近年来,随着 AI 技术快速发展,已经有越来越多的视觉、语音、自然语言处理等不同模态的智能服务出现在我们的日常生活中,为我们提供丰富的 AI 能力。在每种模态内部,又可根据不同领域被细分为多种不同类别。

以语音服务为例,目前已经拥有一句话识别、实时语音识别、录音文件识别、长文本语音合成等多个成熟的智能语音服务。

语音识别公共云服务

这些语音 AI 服务的整体处理流程类似,但是又有各自的结果处理及业务流转逻辑。在过去的几年中,各服务均独立开发,在整个服务演进过程中面临着以下几点困难:

  • 解决方案的设计复杂性增高:在很多复杂场景下,单一服务涉及十几种算法,三十多步处理步骤,要求分布式执行,毫秒级延迟,这在工程实现上有着极大的挑战。
  • 解决方案的多样性变多:目前在单语音领域,所涉及的服务包括一句话识别、实时语音识别、录音文件识别、长文本语音合成等多种能力,他们在流处理过程上大致相似,但又互有差异。各服务独立开发,重复造轮子,导致代码质量差,最终无法维护。
  • 解决方案的部署运维成本过高:除了已经非常成熟的产品化的算法服务,各模态内部也有很多处于“孵化”阶段的新型算法。这些新型算法需要组合起来才能对外提供更加标准的产品化接口,每次开发独立服务不仅需要算法同学具备较高的工程能力,同时部署运维成本很高。


阿里云PAI-Mediaflow

为了解决上述痛点,我们结合了由阿里云计算平台事业部所研发的流式编排框架 Medaiflow,实现能够对流处理类型的服务进行有效抽象,并提供简洁的逻辑编排能力来将业务函数快速组装为适配不同场景的处理流程图。与此同时,还能够提供强力的图处理引擎来高效的处理业务请求,并对外提供标准的服务化接口

在流式编排框架 Medaiflow 中,对数据流提供了通用的抽象 MediaData,以及元素抽象 DataFrame。其定义:

表示由数据 DataFrame 组成的连续数据流,它的数量可以是 0N 个。

对于流处理,Mediaflow 也提供了许多算子抽象

  • Map: 单输入单输出,对数据流做 map 操作。
  • Filter: 返回 bool 类型,表示输入的数据是否要被过滤掉。
  • FlatMap: 单输入多输出,输入一条数据输出 N 条结果,可用来实现数据流复制的功能。
  • Window:窗口函数,根据配置做窗口操作输入窗口中的 N 条数据,输出单条或多条结果,支持 count window, slide window,select all 等常用 window 类型。
  • Copy: 图结构为 1 对 N 时自动将结果复制到 N 条流。
  • SortUnion: 图结构为 N 对 1 时自动将结果进行归并到 1 条流,并按时间戳排序。
  • Route: 对输入进行自定义分流操作,如可以将图像帧向 A 分支转发,音频帧向 B 分支转发。
  • StreamRead: 流失数据读入,例如视频解码。
  • StreamWrite: 流失数据写出,例如视频编码。

部分算子能力在用户试用该框架实现了不同能力的算子之后,Mediaflow 提供非常便捷的图编排能力,用户可根据具体需求把算子编排为不同的处理流程图。


阿里云计算平台事业部所研发的流式编排 Mediaflow 能够非常好的解决流处理场景的大部分问题,但是我们使用该框架在语音类场景进行落地应用时仍然遇到了以下几方面的问题:

  • 算子处理为同步阻塞:在 Mediaflow 中,对数据流上的元素进行转换操作的 map 算子在处理过程中为同步阻塞的,在处理完当前元素后才能接受后续元素。但是在语音场景中,由于 AI 模型处理模式的特殊性,算子接收数据和产生数据并不是一一对应的,因此传统的同步阻塞算子无法满足需求。
  • 语音子流场景延迟高:在语音场景中请求多为长链接,用户的输入为一个持续长达数小时的音频数据流,但是由于语音模型的特殊性,数据流需要被切分为不同的子流进行处理。在现有框架中仅有 window 算子能支持此类操作,该算子需要收集所有子流元素后才能进行计算,因此会带来几十秒的延迟,无法满足语音场景毫秒级延迟的需求。
  • 缺乏容错机制:目前的语音数据流处理链路已经十分复杂,单一场景往往就包含几十种处理步骤,任意一处节点故障即可影响服务整体的稳定性。但是现有的 Mediaflow 框架内部缺乏容错处理能力,无法满足语音服务的稳定性需求。


解决方案

为了解决上述 Medialfow 在语音场景应用时所遇到的困难,达摩院语音实验室与阿里云计算平台事业部一起,通过以下几个方面对 Mediaflow 进行改造升级:

  • 语音流式算子:提供异步非阻塞的流式算子适配语音流处理场景,同时提供预置 AI 算子,减少新应用接入成本。
  • 子图:通过子图的方式来解决语音场景对子流处理的毫秒级延迟需求。
  • 统一容错机制:框架提供多种预置的容错能力来应对语音长连接场景下稳定性的挑战。

下面将分别对上述方案进行详细的介绍。 语音流式算子 ①异步非阻塞流式算子

在大多数的传统流处理框架中,map 操作都是同步阻塞的,其处理流程如下图所示:


Map 处理流程

这种处理模型无法解决语音场景中某些算子节点接收数据和产生数据并不一一对应的问题。因此我们新增了异步非阻塞流式算子 StreamMap,其很好的支持了异步非阻塞处理场景。StreamMap 的处理流程大致如下图所示:


StreamMap 处理流程

②算子能力扩展

除了 StreamMap 之外,我们还新增了包括但不限于 concat、scan、last 等不同功能的算子,用以解决语音场景中复杂的流处理逻辑,同时也丰富了 Mediaflow 的算子能力矩阵。

部分新增基础算子

③预置AI算子

在 AI 流处理场景,有许多功能相似的 operator,我们构建了一个 AI 能力算子库,提供了几十种预置 AI 能力,能够应对上百种不同的业务场景,用户在使用时无需任何开发,只需要在数据流处理过程中配置特定的算子,即可获得丰富的 AI 能力。以下列举部分预置算子能力。AI能力算子库 子图 在传统的流处理场景中,无论是对流进行转换、复制还是合并,每一个处理操作都是持续在流的整个生命周期里 (从流上的第一个元素开始,到整个流的结束)。在语音的流式处理场景里,由于算法的需求,每一个操作只会持续在有限的边界内,因此原始的从开始到结束的数据流可能会在特定的时候被切分成多段相连的子流。子流内部的元素间具有上下文关系,不同子间的元素相互隔离。在这里需要说明的是 子流 与传统流处理中的 的概念有本质的区别。比如在当今十分流行的流处理系统 Flink 中,其中的 window 操作可以将数据流切分为不同的部分,用户只能够在每部分的数据积攒完成时进行计算,因此对各部分的处理是非异步且阻塞的。这在对实时性要求极高的语音场景下是明显不满足要求的。因此框架需要支持从子流的第一个元素开始进行处理,而不是等到子流的最后一个元素集中进行计算。针对这种场景我们对 Mediaflow 框架进行升级,首次提出了 子图 的概念:


MediaData().splitby(conditon1, sub_graph)

其处理流程如下图所示,框架会根据用户定义的条件函数 conditon 1 将原数据切分为不同的子流,然后使用用户独立编排的子图对其进行处理,整个过程为异步非阻塞,支持子流的并行分布式处理,同时保证整体结果有序。子图处理流程 统一容错机制 容错处理是流处理框架非常重要的需求,而在部分 AI 类流处理场景通常是长连接,同时对延迟又有很高的要求,简单的重试无法满足需求。为此,框架层提供了重试、重连以及降级等多种容错能力,用户只需要简单的配置就能极大的提高服务可用性。

  • 重连:框架根据配置自动缓存部分数据,在发生错误时自动重试并修正后续数据结果。


  • 重试:框架自动忽略错误,重试当前请求并继续处理后续数据。


  • 降级:框架自动忽略错误,并自动执行用户配置的特定降级处理逻辑替换原有结果。


分布式执行引擎

除了上述改进升级之外,针对像语音识别中声学模型等重计算的算子,我们自研了云原生的分布式计算框架 Atom,对识别任务进行统一调度,同时结合智能伸缩技术在提升集群资源利用率的同时又能应对各种突发流量场景。

针对该框架的介绍会在后续公众号发出,敬请期待。





语音流式编排实战

性别识别是语音实验室研发的用于识别说话人性别的智能 AI 服务。接下来我们就以该服务为示例介绍流式编排的具体方法。

性别识别的大致处理流程如下图所示:


性别识别处理流程

首先我们需要实现空音频过滤 filter 算子:




@decorator.filterdef empty_binary_filter(dataframe, ctx):    return len(dataframe.get_binary()) > 0

此外,由于该服务对用户输入音频时常有限制,因此我们实现一个 map 算子来处理该逻辑:
















class AudioLengthCount(MapFunction):    def init(self, args):        """        初始化逻辑,设置相关参数初值        """
    def map(self, dataframe, context):        """        根据采样率计算当前已经处理的数据时长,超出限制是返回错误        """
    def clean(self):        """        重置相关变量        """

最后我们还需要实现一个 map 算子用于选取最优识别结果:
















class ChooseBestResult(MapFunction):    def init(self, args):        """        初始化逻辑,设置相关参数初值        """
    def map(self, dataframe, context):        """        根据分布式计算框架ATOM的返回结果选择出最优值        """
    def clean(self):        """        重置相关变量        """

在实现了各个业务逻辑算子后,只需要简单的编排代码即可组装为具体的业务流程:



def build_graph():    MediaData('input').filter(empty_binary_filter).stream_map(config('decoder')).filter(empty_binary_filter).map(AudioLengthCount).stream_map(config('sid')).map(ChooseBestResult).last()


多模态流式编排平台

目前基于新的流式编排框架,我们已经拥有了一句话识别实时语音识别录音文件识别长文本语音合成等多种不同场景的语音服务。








随着不断的迭代,如何高效管理众多的图资源和服务版本成为了非常迫切的需求。为此,我们构建了多模态流式编排平台,平台提供了包含图资源管理、服务发布及服务运维等在内的一站式能力



下面我们将从整体架构、图资源管理、图服务发布这三个方面来对其进行详细的介绍。

整体架构

整体架构图多模态流式编排平台的整体架构如上图所示,其中:

  • 接口层:针对实时流处理和离线批处理任务分别进行统一的协议定义。因此仅需一套 SDK 即可对外部所有用户提供服务。
  • 应用层:基于 Medaflow 框架进行业务逻辑处理和图编排。
  • 框架层:提供丰富的 AI 能力算子,通过批流一体化流处理引擎 Mediaflow 进行图解释与执行。同时基于自研引擎 ATOM 提供强大的分布式 AI 模型计算能力。
  • 一站式管理平台:负责用户的图编排资源以及服务发布管理,通过统一架构进行公共云及专有云的一键部署。

图资源管理

用户可利用平台的图资源管理功能,上传不同版本的资源,平台会自动进行分类存储,同时自动分发至多地域的不同存储介质 ( NAS、OSS 等) 中。

图资源管理

服务发布

用户使用一站式平台上传图资源后,平台同时提供了全流程、可视化的服务部署能力。用户只需简单的几步操作就能将图资源部署为多地域的在线服务。

服务列表

服务发布流程


总结

截止目前,语音服务内部所有应用层服务已全部完成 FLOW 流式升级新应用的开发周期从一个月缩短至一周,极大的减少了新应用的开发成本。语音各类服务已广泛地应用于阿里集团客服、蚂蚁95188、手机淘宝、支付宝、高德地图等阿里生态业务。同时依托新一代云原生的多模态流编排平台,包含几十种算法、几百种应用场景的 AI 模型能够零时差上云,并保证服务的毫秒级延迟,各类服务单日调用量超 30亿次。解决了语音 AI 在银行、证券、法院、运营商、能源、教育等各行各业场景落地难的问题。后续我们将对平台进行持续不断的优化打磨,为语音内外部业务提供更好的技术支撑。


相关文章
|
8天前
|
机器学习/深度学习 人工智能 自然语言处理
AI技术深度解析:从基础到应用的全面介绍
人工智能(AI)技术的迅猛发展,正在深刻改变着我们的生活和工作方式。从自然语言处理(NLP)到机器学习,从神经网络到大型语言模型(LLM),AI技术的每一次进步都带来了前所未有的机遇和挑战。本文将从背景、历史、业务场景、Python代码示例、流程图以及如何上手等多个方面,对AI技术中的关键组件进行深度解析,为读者呈现一个全面而深入的AI技术世界。
63 10
|
2天前
|
机器学习/深度学习 人工智能 物联网
AI赋能大学计划·大模型技术与应用实战学生训练营——湖南大学站圆满结营
12月14日,由中国软件行业校园招聘与实习公共服务平台携手魔搭社区共同举办的AI赋能大学计划·大模型技术与产业趋势高校行AIGC项目实战营·湖南大学站圆满结营。
AI赋能大学计划·大模型技术与应用实战学生训练营——湖南大学站圆满结营
|
6天前
|
人工智能 数据可视化 JavaScript
NodeTool:AI 工作流可视化构建器,通过拖放节点设计复杂的工作流,集成 OpenAI 等多个平台
NodeTool 是一个开源的 AI 工作流可视化构建器,通过拖放节点的方式设计复杂的工作流,无需编码即可快速原型设计和测试。它支持本地 GPU 运行 AI 模型,并与 Hugging Face、OpenAI 等平台集成,提供模型访问能力。
58 14
NodeTool:AI 工作流可视化构建器,通过拖放节点设计复杂的工作流,集成 OpenAI 等多个平台
|
8天前
|
机器学习/深度学习 人工智能 算法
探索AI在医疗诊断中的应用与挑战
【10月更文挑战第21天】 本文深入探讨了人工智能(AI)技术在医疗诊断领域的应用现状与面临的挑战,旨在为读者提供一个全面的视角,了解AI如何改变传统医疗模式,以及这一变革过程中所伴随的技术、伦理和法律问题。通过分析AI技术的优势和局限性,本文旨在促进对AI在医疗领域应用的更深层次理解和讨论。
|
5天前
|
机器学习/深度学习 人工智能 自然语言处理
AI在自然语言处理中的突破:从理论到应用
AI在自然语言处理中的突破:从理论到应用
47 17
|
4天前
|
人工智能 Serverless API
尽享红利,Serverless构建企业AI应用方案与实践
本次课程由阿里云云原生架构师计缘分享,主题为“尽享红利,Serverless构建企业AI应用方案与实践”。课程分为四个部分:1) Serverless技术价值,介绍其发展趋势及优势;2) Serverless函数计算与AI的结合,探讨两者融合的应用场景;3) Serverless函数计算AIGC应用方案,展示具体的技术实现和客户案例;4) 业务初期如何降低使用门槛,提供新用户权益和免费资源。通过这些内容,帮助企业和开发者快速构建高效、低成本的AI应用。
41 12
|
2天前
|
人工智能 容灾 关系型数据库
【AI应用启航workshop】构建高可用数据库、拥抱AI智能问数
12月25日(周三)14:00-16:30参与线上闭门会,阿里云诚邀您一同开启AI应用实践之旅!
|
1天前
|
人工智能 前端开发 Java
Spring AI Alibaba + 通义千问,开发AI应用如此简单!!!
本文介绍了如何使用Spring AI Alibaba开发一个简单的AI对话应用。通过引入`spring-ai-alibaba-starter`依赖和配置API密钥,结合Spring Boot项目,只需几行代码即可实现与AI模型的交互。具体步骤包括创建Spring Boot项目、编写Controller处理对话请求以及前端页面展示对话内容。此外,文章还介绍了如何通过添加对话记忆功能,使AI能够理解上下文并进行连贯对话。最后,总结了Spring AI为Java开发者带来的便利,简化了AI应用的开发流程。
50 0
|
8天前
|
传感器 机器学习/深度学习 人工智能
AI在自动驾驶汽车中的应用与未来展望
AI在自动驾驶汽车中的应用与未来展望
53 9
|
23小时前
|
人工智能 运维 Devops
CAP:Serverless + AI 让应用开发更简单
对于众多开发者而言,Serverless 架构的核心优势在于其能够无缝集成多种云产品与组件,从而使得开发者可以更加专注于核心业务逻辑和创新。此外,Serverless 架构还提供了按量付费的灵活计费模式,进一步降低了资源成本。使用云应用开发平台 CAP,在 AI 领域,企业就可以专注于模型训练、算法优化等关键任务,让 AI 应用的开发、部署以及全生命周期的管理更加简单。可以预见 Serverless 技术将催生一系列创新且有趣的应用,而这些应用将不断拓展 AI 技术的边界。