/作者/
胡子昊
近年来,随着 AI 技术快速发展,已经有越来越多的视觉、语音、自然语言处理等不同模态的智能服务出现在我们的日常生活中,为我们提供丰富的 AI 能力。在每种模态内部,又可根据不同领域被细分为多种不同类别。
以语音服务为例,目前已经拥有一句话识别、实时语音识别、录音文件识别、长文本语音合成等多个成熟的智能语音服务。
语音识别公共云服务
这些语音 AI 服务的整体处理流程类似,但是又有各自的结果处理及业务流转逻辑。在过去的几年中,各服务均独立开发,在整个服务演进过程中面临着以下几点困难:
- 解决方案的设计复杂性增高:在很多复杂场景下,单一服务涉及十几种算法,三十多步处理步骤,要求分布式执行,毫秒级延迟,这在工程实现上有着极大的挑战。
- 解决方案的多样性变多:目前在单语音领域,所涉及的服务包括一句话识别、实时语音识别、录音文件识别、长文本语音合成等多种能力,他们在流处理过程上大致相似,但又互有差异。各服务独立开发,重复造轮子,导致代码质量差,最终无法维护。
- 解决方案的部署运维成本过高:除了已经非常成熟的产品化的算法服务,各模态内部也有很多处于“孵化”阶段的新型算法。这些新型算法需要组合起来才能对外提供更加标准的产品化接口,每次开发独立服务不仅需要算法同学具备较高的工程能力,同时部署运维成本很高。
▎阿里云PAI-Mediaflow
为了解决上述痛点,我们结合了由阿里云计算平台事业部所研发的流式编排框架 Medaiflow,实现能够对流处理类型的服务进行有效抽象,并提供简洁的逻辑编排能力来将业务函数快速组装为适配不同场景的处理流程图。与此同时,还能够提供强力的图处理引擎来高效的处理业务请求,并对外提供标准的服务化接口。
在流式编排框架 Medaiflow 中,对数据流提供了通用的抽象 MediaData,以及元素抽象 DataFrame。其定义:
表示由数据 DataFrame 组成的连续数据流,它的数量可以是 0 → N 个。
对于流处理,Mediaflow 也提供了许多算子抽象:
- Map: 单输入单输出,对数据流做 map 操作。
- Filter: 返回 bool 类型,表示输入的数据是否要被过滤掉。
- FlatMap: 单输入多输出,输入一条数据输出 N 条结果,可用来实现数据流复制的功能。
- Window:窗口函数,根据配置做窗口操作输入窗口中的 N 条数据,输出单条或多条结果,支持 count window, slide window,select all 等常用 window 类型。
- Copy: 图结构为 1 对 N 时自动将结果复制到 N 条流。
- SortUnion: 图结构为 N 对 1 时自动将结果进行归并到 1 条流,并按时间戳排序。
- Route: 对输入进行自定义分流操作,如可以将图像帧向 A 分支转发,音频帧向 B 分支转发。
- StreamRead: 流失数据读入,例如视频解码。
- StreamWrite: 流失数据写出,例如视频编码。
部分算子能力在用户试用该框架实现了不同能力的算子之后,Mediaflow 提供非常便捷的图编排能力,用户可根据具体需求把算子编排为不同的处理流程图。
阿里云计算平台事业部所研发的流式编排 Mediaflow 能够非常好的解决流处理场景的大部分问题,但是我们使用该框架在语音类场景进行落地应用时仍然遇到了以下几方面的问题:
- 算子处理为同步阻塞:在 Mediaflow 中,对数据流上的元素进行转换操作的 map 算子在处理过程中为同步阻塞的,在处理完当前元素后才能接受后续元素。但是在语音场景中,由于 AI 模型处理模式的特殊性,算子接收数据和产生数据并不是一一对应的,因此传统的同步阻塞算子无法满足需求。
- 语音子流场景延迟高:在语音场景中请求多为长链接,用户的输入为一个持续长达数小时的音频数据流,但是由于语音模型的特殊性,数据流需要被切分为不同的子流进行处理。在现有框架中仅有 window 算子能支持此类操作,该算子需要收集所有子流元素后才能进行计算,因此会带来几十秒的延迟,无法满足语音场景毫秒级延迟的需求。
- 缺乏容错机制:目前的语音数据流处理链路已经十分复杂,单一场景往往就包含几十种处理步骤,任意一处节点故障即可影响服务整体的稳定性。但是现有的 Mediaflow 框架内部缺乏容错处理能力,无法满足语音服务的稳定性需求。
▎解决方案
为了解决上述 Medialfow 在语音场景应用时所遇到的困难,达摩院语音实验室与阿里云计算平台事业部一起,通过以下几个方面对 Mediaflow 进行改造升级:
- 语音流式算子:提供异步非阻塞的流式算子适配语音流处理场景,同时提供预置 AI 算子,减少新应用接入成本。
- 子图:通过子图的方式来解决语音场景对子流处理的毫秒级延迟需求。
- 统一容错机制:框架提供多种预置的容错能力来应对语音长连接场景下稳定性的挑战。
下面将分别对上述方案进行详细的介绍。 语音流式算子 ①异步非阻塞流式算子
在大多数的传统流处理框架中,map 操作都是同步阻塞的,其处理流程如下图所示:
Map 处理流程
这种处理模型无法解决语音场景中某些算子节点接收数据和产生数据并不一一对应的问题。因此我们新增了异步非阻塞流式算子 StreamMap,其很好的支持了异步非阻塞处理场景。StreamMap 的处理流程大致如下图所示:
StreamMap 处理流程
②算子能力扩展
除了 StreamMap 之外,我们还新增了包括但不限于 concat、scan、last 等不同功能的算子,用以解决语音场景中复杂的流处理逻辑,同时也丰富了 Mediaflow 的算子能力矩阵。
部分新增基础算子
③预置AI算子
在 AI 流处理场景,有许多功能相似的 operator,我们构建了一个 AI 能力算子库,提供了几十种预置 AI 能力,能够应对上百种不同的业务场景,用户在使用时无需任何开发,只需要在数据流处理过程中配置特定的算子,即可获得丰富的 AI 能力。以下列举部分预置算子能力。AI能力算子库 子图 在传统的流处理场景中,无论是对流进行转换、复制还是合并,每一个处理操作都是持续在流的整个生命周期里 (从流上的第一个元素开始,到整个流的结束)。在语音的流式处理场景里,由于算法的需求,每一个操作只会持续在有限的边界内,因此原始的从开始到结束的数据流可能会在特定的时候被切分成多段相连的子流。子流内部的元素间具有上下文关系,不同子流间的元素相互隔离。在这里需要说明的是 子流 与传统流处理中的 批 的概念有本质的区别。比如在当今十分流行的流处理系统 Flink 中,其中的 window 操作可以将数据流切分为不同的部分,用户只能够在每部分的数据积攒完成时进行计算,因此对各部分的处理是非异步且阻塞的。这在对实时性要求极高的语音场景下是明显不满足要求的。因此框架需要支持从子流的第一个元素开始进行处理,而不是等到子流的最后一个元素集中进行计算。针对这种场景我们对 Mediaflow 框架进行升级,首次提出了 子图 的概念:
MediaData().splitby(conditon1, sub_graph)
其处理流程如下图所示,框架会根据用户定义的条件函数 conditon 1 将原数据切分为不同的子流,然后使用用户独立编排的子图对其进行处理,整个过程为异步非阻塞,支持子流的并行分布式处理,同时保证整体结果有序。子图处理流程 统一容错机制 容错处理是流处理框架非常重要的需求,而在部分 AI 类流处理场景通常是长连接,同时对延迟又有很高的要求,简单的重试无法满足需求。为此,框架层提供了重试、重连以及降级等多种容错能力,用户只需要简单的配置就能极大的提高服务可用性。
- 重连:框架根据配置自动缓存部分数据,在发生错误时自动重试并修正后续数据结果。
- 重试:框架自动忽略错误,重试当前请求并继续处理后续数据。
- 降级:框架自动忽略错误,并自动执行用户配置的特定降级处理逻辑替换原有结果。
分布式执行引擎
除了上述改进升级之外,针对像语音识别中声学模型等重计算的算子,我们自研了云原生的分布式计算框架 Atom,对识别任务进行统一调度,同时结合智能伸缩技术在提升集群资源利用率的同时又能应对各种突发流量场景。
针对该框架的介绍会在后续公众号发出,敬请期待。
▎语音流式编排实战
性别识别是语音实验室研发的用于识别说话人性别的智能 AI 服务。接下来我们就以该服务为示例介绍流式编排的具体方法。
性别识别的大致处理流程如下图所示:
性别识别处理流程
首先我们需要实现空音频过滤 filter 算子:
@decorator.filterdef empty_binary_filter(dataframe, ctx): return len(dataframe.get_binary()) > 0
此外,由于该服务对用户输入音频时常有限制,因此我们实现一个 map 算子来处理该逻辑:
class AudioLengthCount(MapFunction): def init(self, args): """ 初始化逻辑,设置相关参数初值 """ def map(self, dataframe, context): """ 根据采样率计算当前已经处理的数据时长,超出限制是返回错误 """ def clean(self): """ 重置相关变量 """
最后我们还需要实现一个 map 算子用于选取最优识别结果:
class ChooseBestResult(MapFunction): def init(self, args): """ 初始化逻辑,设置相关参数初值 """ def map(self, dataframe, context): """ 根据分布式计算框架ATOM的返回结果选择出最优值 """ def clean(self): """ 重置相关变量 """
在实现了各个业务逻辑算子后,只需要简单的编排代码即可组装为具体的业务流程:
def build_graph(): MediaData('input').filter(empty_binary_filter).stream_map(config('decoder')).filter(empty_binary_filter).map(AudioLengthCount).stream_map(config('sid')).map(ChooseBestResult).last()
▎多模态流式编排平台
目前基于新的流式编排框架,我们已经拥有了一句话识别、实时语音识别、录音文件识别、长文本语音合成等多种不同场景的语音服务。
随着不断的迭代,如何高效管理众多的图资源和服务版本成为了非常迫切的需求。为此,我们构建了多模态流式编排平台,平台提供了包含图资源管理、服务发布及服务运维等在内的一站式能力。
下面我们将从整体架构、图资源管理、图服务发布这三个方面来对其进行详细的介绍。
整体架构
整体架构图多模态流式编排平台的整体架构如上图所示,其中:
- 接口层:针对实时流处理和离线批处理任务分别进行统一的协议定义。因此仅需一套 SDK 即可对外部所有用户提供服务。
- 应用层:基于 Medaflow 框架进行业务逻辑处理和图编排。
- 框架层:提供丰富的 AI 能力算子,通过批流一体化流处理引擎 Mediaflow 进行图解释与执行。同时基于自研引擎 ATOM 提供强大的分布式 AI 模型计算能力。
- 一站式管理平台:负责用户的图编排资源以及服务发布管理,通过统一架构进行公共云及专有云的一键部署。
图资源管理
用户可利用平台的图资源管理功能,上传不同版本的资源,平台会自动进行分类存储,同时自动分发至多地域的不同存储介质 ( NAS、OSS 等) 中。
图资源管理
服务发布
用户使用一站式平台上传图资源后,平台同时提供了全流程、可视化的服务部署能力。用户只需简单的几步操作就能将图资源部署为多地域的在线服务。
服务列表
服务发布流程
▎总结
截止目前,语音服务内部所有应用层服务已全部完成 FLOW 流式升级,新应用的开发周期从一个月缩短至一周,极大的减少了新应用的开发成本。语音各类服务已广泛地应用于阿里集团客服、蚂蚁95188、手机淘宝、支付宝、高德地图等阿里生态业务。同时依托新一代云原生的多模态流编排平台,包含几十种算法、几百种应用场景的 AI 模型能够零时差上云,并保证服务的毫秒级延迟,各类服务单日调用量超 30亿次。解决了语音 AI 在银行、证券、法院、运营商、能源、教育等各行各业场景落地难的问题。后续我们将对平台进行持续不断的优化打磨,为语音内外部业务提供更好的技术支撑。