多模态流式 AI 编排平台,大规模减少新应用开发成本

简介: 多模态流式 AI 编排平台,大规模减少新应用开发成本

/作者/

胡子昊


近年来,随着 AI 技术快速发展,已经有越来越多的视觉、语音、自然语言处理等不同模态的智能服务出现在我们的日常生活中,为我们提供丰富的 AI 能力。在每种模态内部,又可根据不同领域被细分为多种不同类别。

以语音服务为例,目前已经拥有一句话识别、实时语音识别、录音文件识别、长文本语音合成等多个成熟的智能语音服务。

语音识别公共云服务

这些语音 AI 服务的整体处理流程类似,但是又有各自的结果处理及业务流转逻辑。在过去的几年中,各服务均独立开发,在整个服务演进过程中面临着以下几点困难:

  • 解决方案的设计复杂性增高:在很多复杂场景下,单一服务涉及十几种算法,三十多步处理步骤,要求分布式执行,毫秒级延迟,这在工程实现上有着极大的挑战。
  • 解决方案的多样性变多:目前在单语音领域,所涉及的服务包括一句话识别、实时语音识别、录音文件识别、长文本语音合成等多种能力,他们在流处理过程上大致相似,但又互有差异。各服务独立开发,重复造轮子,导致代码质量差,最终无法维护。
  • 解决方案的部署运维成本过高:除了已经非常成熟的产品化的算法服务,各模态内部也有很多处于“孵化”阶段的新型算法。这些新型算法需要组合起来才能对外提供更加标准的产品化接口,每次开发独立服务不仅需要算法同学具备较高的工程能力,同时部署运维成本很高。


阿里云PAI-Mediaflow

为了解决上述痛点,我们结合了由阿里云计算平台事业部所研发的流式编排框架 Medaiflow,实现能够对流处理类型的服务进行有效抽象,并提供简洁的逻辑编排能力来将业务函数快速组装为适配不同场景的处理流程图。与此同时,还能够提供强力的图处理引擎来高效的处理业务请求,并对外提供标准的服务化接口

在流式编排框架 Medaiflow 中,对数据流提供了通用的抽象 MediaData,以及元素抽象 DataFrame。其定义:

表示由数据 DataFrame 组成的连续数据流,它的数量可以是 0N 个。

对于流处理,Mediaflow 也提供了许多算子抽象

  • Map: 单输入单输出,对数据流做 map 操作。
  • Filter: 返回 bool 类型,表示输入的数据是否要被过滤掉。
  • FlatMap: 单输入多输出,输入一条数据输出 N 条结果,可用来实现数据流复制的功能。
  • Window:窗口函数,根据配置做窗口操作输入窗口中的 N 条数据,输出单条或多条结果,支持 count window, slide window,select all 等常用 window 类型。
  • Copy: 图结构为 1 对 N 时自动将结果复制到 N 条流。
  • SortUnion: 图结构为 N 对 1 时自动将结果进行归并到 1 条流,并按时间戳排序。
  • Route: 对输入进行自定义分流操作,如可以将图像帧向 A 分支转发,音频帧向 B 分支转发。
  • StreamRead: 流失数据读入,例如视频解码。
  • StreamWrite: 流失数据写出,例如视频编码。

部分算子能力在用户试用该框架实现了不同能力的算子之后,Mediaflow 提供非常便捷的图编排能力,用户可根据具体需求把算子编排为不同的处理流程图。


阿里云计算平台事业部所研发的流式编排 Mediaflow 能够非常好的解决流处理场景的大部分问题,但是我们使用该框架在语音类场景进行落地应用时仍然遇到了以下几方面的问题:

  • 算子处理为同步阻塞:在 Mediaflow 中,对数据流上的元素进行转换操作的 map 算子在处理过程中为同步阻塞的,在处理完当前元素后才能接受后续元素。但是在语音场景中,由于 AI 模型处理模式的特殊性,算子接收数据和产生数据并不是一一对应的,因此传统的同步阻塞算子无法满足需求。
  • 语音子流场景延迟高:在语音场景中请求多为长链接,用户的输入为一个持续长达数小时的音频数据流,但是由于语音模型的特殊性,数据流需要被切分为不同的子流进行处理。在现有框架中仅有 window 算子能支持此类操作,该算子需要收集所有子流元素后才能进行计算,因此会带来几十秒的延迟,无法满足语音场景毫秒级延迟的需求。
  • 缺乏容错机制:目前的语音数据流处理链路已经十分复杂,单一场景往往就包含几十种处理步骤,任意一处节点故障即可影响服务整体的稳定性。但是现有的 Mediaflow 框架内部缺乏容错处理能力,无法满足语音服务的稳定性需求。


解决方案

为了解决上述 Medialfow 在语音场景应用时所遇到的困难,达摩院语音实验室与阿里云计算平台事业部一起,通过以下几个方面对 Mediaflow 进行改造升级:

  • 语音流式算子:提供异步非阻塞的流式算子适配语音流处理场景,同时提供预置 AI 算子,减少新应用接入成本。
  • 子图:通过子图的方式来解决语音场景对子流处理的毫秒级延迟需求。
  • 统一容错机制:框架提供多种预置的容错能力来应对语音长连接场景下稳定性的挑战。

下面将分别对上述方案进行详细的介绍。 语音流式算子 ①异步非阻塞流式算子

在大多数的传统流处理框架中,map 操作都是同步阻塞的,其处理流程如下图所示:


Map 处理流程

这种处理模型无法解决语音场景中某些算子节点接收数据和产生数据并不一一对应的问题。因此我们新增了异步非阻塞流式算子 StreamMap,其很好的支持了异步非阻塞处理场景。StreamMap 的处理流程大致如下图所示:


StreamMap 处理流程

②算子能力扩展

除了 StreamMap 之外,我们还新增了包括但不限于 concat、scan、last 等不同功能的算子,用以解决语音场景中复杂的流处理逻辑,同时也丰富了 Mediaflow 的算子能力矩阵。

部分新增基础算子

③预置AI算子

在 AI 流处理场景,有许多功能相似的 operator,我们构建了一个 AI 能力算子库,提供了几十种预置 AI 能力,能够应对上百种不同的业务场景,用户在使用时无需任何开发,只需要在数据流处理过程中配置特定的算子,即可获得丰富的 AI 能力。以下列举部分预置算子能力。AI能力算子库 子图 在传统的流处理场景中,无论是对流进行转换、复制还是合并,每一个处理操作都是持续在流的整个生命周期里 (从流上的第一个元素开始,到整个流的结束)。在语音的流式处理场景里,由于算法的需求,每一个操作只会持续在有限的边界内,因此原始的从开始到结束的数据流可能会在特定的时候被切分成多段相连的子流。子流内部的元素间具有上下文关系,不同子间的元素相互隔离。在这里需要说明的是 子流 与传统流处理中的 的概念有本质的区别。比如在当今十分流行的流处理系统 Flink 中,其中的 window 操作可以将数据流切分为不同的部分,用户只能够在每部分的数据积攒完成时进行计算,因此对各部分的处理是非异步且阻塞的。这在对实时性要求极高的语音场景下是明显不满足要求的。因此框架需要支持从子流的第一个元素开始进行处理,而不是等到子流的最后一个元素集中进行计算。针对这种场景我们对 Mediaflow 框架进行升级,首次提出了 子图 的概念:


MediaData().splitby(conditon1, sub_graph)

其处理流程如下图所示,框架会根据用户定义的条件函数 conditon 1 将原数据切分为不同的子流,然后使用用户独立编排的子图对其进行处理,整个过程为异步非阻塞,支持子流的并行分布式处理,同时保证整体结果有序。子图处理流程 统一容错机制 容错处理是流处理框架非常重要的需求,而在部分 AI 类流处理场景通常是长连接,同时对延迟又有很高的要求,简单的重试无法满足需求。为此,框架层提供了重试、重连以及降级等多种容错能力,用户只需要简单的配置就能极大的提高服务可用性。

  • 重连:框架根据配置自动缓存部分数据,在发生错误时自动重试并修正后续数据结果。


  • 重试:框架自动忽略错误,重试当前请求并继续处理后续数据。


  • 降级:框架自动忽略错误,并自动执行用户配置的特定降级处理逻辑替换原有结果。


分布式执行引擎

除了上述改进升级之外,针对像语音识别中声学模型等重计算的算子,我们自研了云原生的分布式计算框架 Atom,对识别任务进行统一调度,同时结合智能伸缩技术在提升集群资源利用率的同时又能应对各种突发流量场景。

针对该框架的介绍会在后续公众号发出,敬请期待。





语音流式编排实战

性别识别是语音实验室研发的用于识别说话人性别的智能 AI 服务。接下来我们就以该服务为示例介绍流式编排的具体方法。

性别识别的大致处理流程如下图所示:


性别识别处理流程

首先我们需要实现空音频过滤 filter 算子:




@decorator.filterdef empty_binary_filter(dataframe, ctx):    return len(dataframe.get_binary()) > 0

此外,由于该服务对用户输入音频时常有限制,因此我们实现一个 map 算子来处理该逻辑:
















class AudioLengthCount(MapFunction):    def init(self, args):        """        初始化逻辑,设置相关参数初值        """
    def map(self, dataframe, context):        """        根据采样率计算当前已经处理的数据时长,超出限制是返回错误        """
    def clean(self):        """        重置相关变量        """

最后我们还需要实现一个 map 算子用于选取最优识别结果:
















class ChooseBestResult(MapFunction):    def init(self, args):        """        初始化逻辑,设置相关参数初值        """
    def map(self, dataframe, context):        """        根据分布式计算框架ATOM的返回结果选择出最优值        """
    def clean(self):        """        重置相关变量        """

在实现了各个业务逻辑算子后,只需要简单的编排代码即可组装为具体的业务流程:



def build_graph():    MediaData('input').filter(empty_binary_filter).stream_map(config('decoder')).filter(empty_binary_filter).map(AudioLengthCount).stream_map(config('sid')).map(ChooseBestResult).last()


多模态流式编排平台

目前基于新的流式编排框架,我们已经拥有了一句话识别实时语音识别录音文件识别长文本语音合成等多种不同场景的语音服务。








随着不断的迭代,如何高效管理众多的图资源和服务版本成为了非常迫切的需求。为此,我们构建了多模态流式编排平台,平台提供了包含图资源管理、服务发布及服务运维等在内的一站式能力



下面我们将从整体架构、图资源管理、图服务发布这三个方面来对其进行详细的介绍。

整体架构

整体架构图多模态流式编排平台的整体架构如上图所示,其中:

  • 接口层:针对实时流处理和离线批处理任务分别进行统一的协议定义。因此仅需一套 SDK 即可对外部所有用户提供服务。
  • 应用层:基于 Medaflow 框架进行业务逻辑处理和图编排。
  • 框架层:提供丰富的 AI 能力算子,通过批流一体化流处理引擎 Mediaflow 进行图解释与执行。同时基于自研引擎 ATOM 提供强大的分布式 AI 模型计算能力。
  • 一站式管理平台:负责用户的图编排资源以及服务发布管理,通过统一架构进行公共云及专有云的一键部署。

图资源管理

用户可利用平台的图资源管理功能,上传不同版本的资源,平台会自动进行分类存储,同时自动分发至多地域的不同存储介质 ( NAS、OSS 等) 中。

图资源管理

服务发布

用户使用一站式平台上传图资源后,平台同时提供了全流程、可视化的服务部署能力。用户只需简单的几步操作就能将图资源部署为多地域的在线服务。

服务列表

服务发布流程


总结

截止目前,语音服务内部所有应用层服务已全部完成 FLOW 流式升级新应用的开发周期从一个月缩短至一周,极大的减少了新应用的开发成本。语音各类服务已广泛地应用于阿里集团客服、蚂蚁95188、手机淘宝、支付宝、高德地图等阿里生态业务。同时依托新一代云原生的多模态流编排平台,包含几十种算法、几百种应用场景的 AI 模型能够零时差上云,并保证服务的毫秒级延迟,各类服务单日调用量超 30亿次。解决了语音 AI 在银行、证券、法院、运营商、能源、教育等各行各业场景落地难的问题。后续我们将对平台进行持续不断的优化打磨,为语音内外部业务提供更好的技术支撑。


相关文章
|
1天前
|
机器学习/深度学习 人工智能 算法
介绍一下AI在药物研发中的应用。
【10月更文挑战第16天】介绍一下AI在药物研发中的应用。
7 0
|
2天前
|
机器学习/深度学习 数据采集 人工智能
智能化运维:AI在IT运维中的应用探索###
随着信息技术的飞速发展,传统的IT运维模式正面临着前所未有的挑战。本文旨在探讨人工智能(AI)技术如何赋能IT运维,通过智能化手段提升运维效率、降低故障率,并为企业带来更加稳定高效的服务体验。我们将从AI运维的概念入手,深入分析其在故障预测、异常检测、自动化处理等方面的应用实践,以及面临的挑战与未来发展趋势。 ###
|
3天前
|
机器学习/深度学习 人工智能 搜索推荐
AI在医疗领域的革命性应用
【10月更文挑战第14天】 本文探讨了人工智能(AI)在医疗行业中的多种应用,包括疾病诊断、个性化治疗、药物研发等。通过具体案例分析,展示了AI技术如何提高医疗服务效率和准确性,同时指出了当前面临的挑战与未来发展趋势。
15 2
|
3天前
|
机器学习/深度学习 人工智能 自然语言处理
探索AI在软件测试中的创新应用与实践###
本文旨在探讨人工智能(AI)技术如何革新软件测试领域,提升测试效率、质量与覆盖范围。通过深入分析AI驱动的自动化测试工具、智能化缺陷预测模型及持续集成/持续部署(CI/CD)流程优化等关键方面,本研究揭示了AI技术在解决传统软件测试痛点中的潜力与价值。文章首先概述了软件测试的重要性和当前面临的挑战,随后详细介绍了AI技术在测试用例生成、执行、结果分析及维护中的应用实例,并展望了未来AI与软件测试深度融合的趋势,强调了技术伦理与质量控制的重要性。本文为软件开发与测试团队提供了关于如何有效利用AI技术提升测试效能的实践指南。 ###
|
4天前
|
机器学习/深度学习 人工智能 监控
探索AI技术在医疗健康领域的应用与挑战
【10月更文挑战第13天】 本文探讨了人工智能(AI)在医疗健康领域的多种创新应用,包括疾病诊断、个性化治疗、患者监护和药物研发等方面。同时,文章也分析了当前AI技术在实际应用中面临的挑战,如数据隐私、算法透明度、监管问题等,并提出了一些可能的解决思路。通过综合分析,本文旨在为读者提供一个关于AI在医疗领域应用现状及未来的全面视角。
26 3
|
4天前
|
人工智能
添加一个Stable Difussion图像生成应用,通过向AI助手简单的提问,即可快速搭建Stable Diffusion应用至自己的网站中,大幅提升开发效率。
添加一个Stable Difussion图像生成应用,通过向AI助手简单的提问,即可快速搭建Stable Diffusion应用至自己的网站中,大幅提升开发效率。
|
4天前
|
人工智能 开发框架 Java
总计 30 万奖金,Spring AI Alibaba 应用框架挑战赛开赛
Spring AI Alibaba 应用框架挑战赛邀请广大开发者参与开源项目的共建,助力项目快速发展,掌握 AI 应用开发模式。大赛分为《支持 Spring AI Alibaba 应用可视化调试与追踪本地工具》和《基于 Flow 的 AI 编排机制设计与实现》两个赛道,总计 30 万奖金。
|
5天前
|
人工智能 Java API
阿里云开源 AI 应用开发框架:Spring AI Alibaba
近期,阿里云重磅发布了首款面向 Java 开发者的开源 AI 应用开发框架:Spring AI Alibaba(项目 Github 仓库地址:alibaba/spring-ai-alibaba),Spring AI Alibaba 项目基于 Spring AI 构建,是阿里云通义系列模型及服务在 Java AI 应用开发领域的最佳实践,提供高层次的 AI API 抽象与云原生基础设施集成方案,帮助开发者快速构建 AI 应用。本文将详细介绍 Spring AI Alibaba 的核心特性,并通过「智能机票助手」的示例直观的展示 Spring AI Alibaba 开发 AI 应用的便利性。示例源
|
5天前
|
存储 消息中间件 人工智能
ApsaraMQ Serverless 能力再升级,事件驱动架构赋能 AI 应用
本文整理自2024年云栖大会阿里云智能集团高级技术专家金吉祥的演讲《ApsaraMQ Serverless 能力再升级,事件驱动架构赋能 AI 应用》。
|
4天前
|
人工智能 运维 Serverless
Serverless + AI 让应用开发更简单
随着云计算和人工智能(AI)技术的飞速发展,企业对于高效、灵活且成本效益高的解决方案的需求日益增长。本文旨在探讨 Serverless 架构与 AI 技术的结合,如何通过 Serverless 函数计算和 AI 开发平台,助力企业简化应用开发流程,减少企业 AI 业务试错成本,加速业务创新,为企业业务发展提供无限可能。