本文将带您实现在 MaxCompute 上轻松访问 OSS 的数据,关于处理非结构化数据的原理性介绍请参见
前言。
操作步骤
系统内置方式读取 OSS 数据
访问外部数据源时,需要用户自定义不同的 Extractor,同时您也可以使用 MaxCompute 内置的 Extractor,来读取按照约定格式存储的
OSS 数据。只需要创建一个外部表,便可把这张表作为源表进行查询。
假设有一份 CSV 数据存在
OSS 上,endpoint 为oss-cn-shanghai-internal.aliyuncs.com,bucket 为oss-odps-test,数据文件的存放路径为/demo/vehicle.csv。
授予权限
您可通过以下两种方式授予权限:
直接登录阿里云账号后,点击此处完成一键授权。
自定义授权。
首先需要在
RAM 中授予 MaxCompute 访问 OSS 的权限。登录
RAM控制台,通过控制台中的
角色管理 创建角色 AliyunODPSDefaultRole。如下图所示:
修改策略内容设置,如下所示:{- "Statement": [
- {
- "Action": "sts:AssumeRole",
- "Effect": "Allow",
- "Principal": {
- "Service": [
- "odps.aliyuncs.com"
- ]
- }
- }
- ],
- "Version": "1"
- }
编辑该角色的授权策略 AliyunODPSRolePolicy。如下所示:
-
{
- "Version": "1",
- "Statement": [
- {
- "Action": [
- "oss:ListBuckets",
- "oss:GetObject",
- "oss:ListObjects",
- "oss:PutObject",
- "oss:DeleteObject",
- "oss:AbortMultipartUpload",
- "oss:ListParts"
- ],
- "Resource": "*",
- "Effect": "Allow"
- }
- ]
- }
- --可自定义其他权限
将权限AliyunODPSRolePolicy授权给该角色。
创建外部表
创建外部表,语句如下:
- CREATE EXTERNAL TABLE IF NOT EXISTS ambulance_data_csv_external
- (
- vehicleId int,
- recordId int,
- patientId int,
- calls int,
- locationLatitute double,
- locationLongtitue double,
- recordTime string,
- direction string
- )
- STORED BY 'com.aliyun.odps.CsvStorageHandler' -- (1)
- WITH SERDEPROPERTIES (
- 'odps.properties.rolearn'='acs:ram::1811270634786818:role/aliyunodpsdefaultrole'
- ) -- (2)
- LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/Demo/'; -- (3)(4)
上述语句,说明如下:
com.aliyun.odps.CsvStorageHandler是内置的处理 CSV 格式文件的StorageHandler,它定义了如何读写 CSV 文件。您只需指明这个名字,相关逻辑已经由系统实现。
odps.properties.rolearn中的信息是 RAM 中AliyunODPSDefaultRole的Arn信息。您可以通过 RAM 控制台中的 角色详情 获取。
LOCATION 必须指定一个 OSS 目录,默认系统会读取这个目录下所有的文件。
建议您使用 OSS 提供的内网域名,否则将产生 OSS 流量费用。
建议您存放 OSS 数据的区域对应您开通 MaxCompute 的区域。由于 MaxCompute 只有在部分区域部署,我们不承诺跨区域的数据连通性。
OSS 的连接格式为oss://oss-cn-shanghai-internal.aliyuncs.com/Bucket名称/目录名称/。目录后不要加文件名称,如下的集中用法都是错误的:
-
http://oss-odps-test.oss-cn-shanghai-internal.aliyuncs.com/Demo/ -- 不支持http连接
- https://oss-odps-test.oss-cn-shanghai-internal.aliyuncs.com/Demo/ -- 不支持https连接
- oss://oss-odps-test.oss-cn-shanghai-internal.aliyuncs.com/Demo -- 连接地址错误
- oss://oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/Demo/vehicle.csv -- 不必指定文件名
外部表只是在系统中记录了与 OSS 目录的关联,当 Drop 这张表时,对应的LOCATION数据不会被删除。
更多有关外部表的说明请参见
DDL 语句。
查询外部表
外部表创建成功后,便可如同普通表一样使用这个外部表。假设/demo/vehicle.csv数据如下:
- 1,1,51,1,46.81006,-92.08174,9/14/2014 0:00,S
- 1,2,13,1,46.81006,-92.08174,9/14/2014 0:00,NE
- 1,3,48,1,46.81006,-92.08174,9/14/2014 0:00,NE
- 1,4,30,1,46.81006,-92.08174,9/14/2014 0:00,W
- 1,5,47,1,46.81006,-92.08174,9/14/2014 0:00,S
- 1,6,9,1,46.81006,-92.08174,9/14/2014 0:00,S
- 1,7,53,1,46.81006,-92.08174,9/14/2014 0:00,N
- 1,8,63,1,46.81006,-92.08174,9/14/2014 0:00,SW
- 1,9,4,1,46.81006,-92.08174,9/14/2014 0:00,NE
- 1,10,31,1,46.81006,-92.08174,9/14/2014 0:00,N
执行如下 SQL 语句:
- select recordId, patientId, direction from ambulance_data_csv_external where patientId > 25;
这条语句会提交一个作业,调用内置 csv extractor,从 OSS 读取数据进行处理。输出结果如下:
- +------------+------------+-----------+
- | recordId | patientId | direction |
- +------------+------------+-----------+
- | 1 | 51 | S |
- | 3 | 48 | NE |
- | 4 | 30 | W |
- | 5 | 47 | S |
- | 7 | 53 | N |
- | 8 | 63 | SW |
- | 10 | 31 | N |
- +------------+------------+-----------+
自定义 Extractor 访问 OSS
当 OSS 中的数据格式比较复杂,内置的 Extractor 无法满足需求时,需要自定义 Extractor 来读取 OSS 文件中的数据。
例如有一个 txt 数据文件,并不是 CSV 格式,记录之间的列通过|分隔。比如/demo/SampleData/CustomTxt/AmbulanceData/vehicle.csv数据如下:
- 1|1|51|1|46.81006|-92.08174|9/14/2014 0:00|S
- 1|2|13|1|46.81006|-92.08174|9/14/2014 0:00|NE
- 1|3|48|1|46.81006|-92.08174|9/14/2014 0:00|NE
- 1|4|30|1|46.81006|-92.08174|9/14/2014 0:00|W
- 1|5|47|1|46.81006|-92.08174|9/14/2014 0:00|S
- 1|6|9|1|46.81006|-92.08174|9/14/2014 0:00|S
- 1|7|53|1|46.81006|-92.08174|9/14/2014 0:00|N
- 1|8|63|1|46.81006|-92.08174|9/14/2014 0:00|SW
- 1|9|4|1|46.81006|-92.08174|9/14/2014 0:00|NE
- 1|10|31|1|46.81006|-92.08174|9/14/2014 0:00|N
定义 Extractor
写一个通用的 Extractor,将分隔符作为参数传进来,可以处理所有类似格式的 text 文件。如下所示:
- /**
- * Text extractor that extract schematized records from formatted plain-text(csv, tsv etc.)
- **/
- public class TextExtractor extends Extractor {
- private InputStreamSet inputs;
- private String columnDelimiter;
- private DataAttributes attributes;
- private BufferedReader currentReader;
- private boolean firstRead = true;
- public TextExtractor() {
- // default to ",", this can be overwritten if a specific delimiter is provided (via DataAttributes)
- this.columnDelimiter = ",";
- }
- // no particular usage for execution context in this example
- @Override
- public void setup(ExecutionContext ctx, InputStreamSet inputs, DataAttributes attributes) {
- this.inputs = inputs; // inputs 是一个 InputStreamSet,每次调用 next() 返回一个 InputStream,这个 InputStream 可以读取一个 OSS 文件的所有内容。
- this.attributes = attributes;
- // check if "delimiter" attribute is supplied via SQL query
- String columnDelimiter = this.attributes.getValueByKey("delimiter"); //delimiter 通过 DDL 语句传参。
- if ( columnDelimiter != null)
- {
- this.columnDelimiter = columnDelimiter;
- }
- // note: more properties can be inited from attributes if needed
- }
- @Override
- public Record extract() throws IOException {//extactor() 调用返回一条 Record,代表外部表中的一条记录。
- String line = readNextLine();
- if (line == null) {
- return null; // 返回 NULL 来表示这个表中已经没有记录可读。
- }
- return textLineToRecord(line); // textLineToRecord 将一行数据按照 delimiter 分割为多个列。
- }
- @Override
- public void close(){
- // no-op
- }
- }
textLineToRecord 将数据分割的完整实现请参见
此处。
定义 StorageHandler
StorageHandler 作为 External Table 自定义逻辑的统一入口。
- package com.aliyun.odps.udf.example.text;
- public class TextStorageHandler extends OdpsStorageHandler {
- @Override
- public Class<? extends Extractor> getExtractorClass() {
- return TextExtractor.class;
- }
- @Override
- public Class<? extends Outputer> getOutputerClass() {
- return TextOutputer.class;
- }
- }
编译打包
将自定义代码编译打包,并上传到 MaxCompute。
- add jar odps-udf-example.jar;
创建 External 表
与使用内置 Extractor 相似,首先需要创建一张外部表,不同的是在指定外部表访问数据的时候,需要使用自定义的 StorageHandler。
创建外部表语句如下:
- CREATE EXTERNAL TABLE IF NOT EXISTS ambulance_data_txt_external
- (
- vehicleId int,
- recordId int,
- patientId int,
- calls int,
- locationLatitute double,
- locationLongtitue double,
- recordTime string,
- direction string
- )
- STORED BY 'com.aliyun.odps.udf.example.text.TextStorageHandler' --STORED BY 指定自定义 StorageHandler 的类名。
- with SERDEPROPERTIES (
- 'delimiter'='\\|', --SERDEPROPERITES 可以指定参数,这些参数会通过 DataAttributes 传递到 Extractor 代码中。
- 'odps.properties.rolearn'='acs:ram::xxxxxxxxxxxxx:role/aliyunodpsdefaultrole'
- )
- LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/Demo/SampleData/CustomTxt/AmbulanceData/'
- USING 'odps-udf-example.jar'; --同时需要指定类定义所在的 jar 包。
查询外部表
执行如下 SQL 语句:
- select recordId, patientId, direction from ambulance_data_txt_external where patientId > 25;
通过自定义 Extractor 读取非结构化数据
在前面我们看到了通过内置与自定义的 Extractor 可以轻松处理存储在 OSS 上的 CSV 等文本数据。接下来以语音数据(wav 格式文件)为例,为您介绍如何通过自定义的 Extractor 访问并处理 OSS 上的非文本文件。
这里从最终执行的 SQL 开始,介绍以 MaxCompute SQL 为入口,处理存放在 OSS 上的语音文件的使用方法:
- CREATE EXTERNAL TABLE IF NOT EXISTS speech_sentence_snr_external
- (
- sentence_snr double,
- id string
- )
- STORED BY 'com.aliyun.odps.udf.example.speech.SpeechStorageHandler'
- WITH SERDEPROPERTIES (
- 'mlfFileName'='sm_random_5_utterance.text.label' ,
- 'speechSampleRateInKHz' = '16'
- )
- LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/dev/SpeechSentenceTest/'
- USING 'odps-udf-example.jar,sm_random_5_utterance.text.label';
如上所示,同样需要创建外部表,然后通过外部表的 Schema 定义了希望通过外部表从语音文件中抽取出来的信息:
一个语音文件中的语句信噪比(SNR):sentence_snr。
对应语音文件的名字:id。
创建外部表后,通过标准的 Select 语句进行查询,则会触发 Extractor 运行计算。此处便可感受到,在读取处理 OSS数据时,除了可以对文本文件做简单的反序列化处理,还可以通过自定义 Extractor实现更复杂的数据处理抽取逻辑。比如:在这个例子中,我们通过自定义的com.aliyun.odps.udf.example.speech.SpeechStorageHandler 中封装的 Extractor, 实现了对语音文件计算平均有效语句信噪比的功能,并将抽取出来的结构化数据直接进行 SQL 运算(WHERE sentence_snr > 10),最终返回所有信噪比大于 10 的语音文件以及对应的信噪比值。
在 OSS 地址oss://oss-cn-hangzhou-zmf.aliyuncs.com/oss-odps-test/dev/SpeechSentenceTest/上,存储了原始的多个WAV 格式的语音文件,MaxCompute框架将读取该地址上的所有文件,并在必要的时候进行文件级别的分片,自动将文件分配给多个计算节点处理。每个计算节点上的 Extractor则负责处理通过 InputStreamSet分配给该节点的文件集。具体的处理逻辑则与用户单机程序相仿,您不需关心分布计算中的种种细节,按照类单机方式实现其用户算法即可。
定制化的SpeechSentenceSnrExtractor主体逻辑,说明如下:
首先在setup接口中读取参数,进行初始化,并且导入语音处理模型(通过 resource 引入):
- public SpeechSentenceSnrExtractor(){
- this.utteranceLabels = new HashMap<String, UtteranceLabel>();
- }
- @Override
- public void setup(ExecutionContext ctx, InputStreamSet inputs, DataAttributes attributes){
- this.inputs = inputs;
- this.attributes = attributes;
- this.mlfFileName = this.attributes.getValueByKey(MLF_FILE_ATTRIBUTE_KEY);
- String sampleRateInKHzStr = this.attributes.getValueByKey(SPEECH_SAMPLE_RATE_KEY);
- this.sampleRateInKHz = Double.parseDouble(sampleRateInKHzStr);
- try {
- // read the speech model file from resource and load the model into memory
- BufferedInputStream inputStream = ctx.readResourceFileAsStream(mlfFileName);
- loadMlfLabelsFromResource(inputStream);
- inputStream.close();
- } catch (IOException e) {
- throw new RuntimeException("reading model from mlf failed with exception " + e.getMessage());
- }
- }
Extractor() 接口中,实现了对语音文件的具体读取和处理逻辑,对读取的数据根据语音模型进行信噪比的计算,并且将结果填充成 [snr, id] 格式的 Record。
上述示例对实现进行了简化,同时也没有包括涉及语音处理的算法逻辑,具体实现请参见 MaxCompute SDK 在开源社区中提供的
样例代码。
- @Override
- public Record extract() throws IOException {
- SourceInputStream inputStream = inputs.next();
- if (inputStream == null){
- return null;
- }
- // process one wav file to extract one output record [snr, id]
- String fileName = inputStream.getFileName();
- fileName = fileName.substring(fileName.lastIndexOf('/') + 1);
- logger.info("Processing wav file " + fileName);
- String id = fileName.substring(0, fileName.lastIndexOf('.'));
- // read speech file into memory buffer
- long fileSize = inputStream.getFileSize();
- byte[] buffer = new byte[(int)fileSize];
- int readSize = inputStream.readToEnd(buffer);
- inputStream.close();
- // compute the avg sentence snr
- double snr = computeSnr(id, buffer, readSize);
- // construct output record [snr, id]
- Column[] outputColumns = this.attributes.getRecordColumns();
- ArrayRecord record = new ArrayRecord(outputColumns);
- record.setDouble(0, snr);
- record.setString(1, id);
- return record;
- }
- private void loadMlfLabelsFromResource(BufferedInputStream fileInputStream)
- throws IOException {
- // skipped here
- }
- // compute the snr of the speech sentence, assuming the input buffer contains the entire content of a wav file
- private double computeSnr(String id, byte[] buffer, int validBufferLen){
- // computing the snr value for the wav file (supplied as byte buffer array), skipped here
- }
执行查询,如下所示:
- set odps.task.major.version=2dot0_demo_flighting;
- set odps.sql.planner.mode=lot;
- set odps.sql.ddl.odps2=true;
- set odps.sql.preparse.odps2=lot;
- select sentence_snr, id
- from speech_sentence_snr_external
- where sentence_snr > 10.0;
获得计算结果,如下所示:
- --------------------------------------------------------------
- | sentence_snr | id |
- --------------------------------------------------------------
- | 34.4703 | J310209090013_H02_K03_042 |
- --------------------------------------------------------------
- | 31.3905 | tsh148_seg_2_3013_3_6_48_80bd359827e24dd7_0 |
- --------------------------------------------------------------
- | 35.4774 | tsh148_seg_3013_1_31_11_9d7c87aef9f3e559_0 |
- --------------------------------------------------------------
- | 16.0462 | tsh148_seg_3013_2_29_49_f4cb0990a6b4060c_0 |
- --------------------------------------------------------------
- | 14.5568 | tsh_148_3013_5_13_47_3d5008d792408f81_0 |
- --------------------------------------------------------------
综上所述,通过自定义 Extractor,便可在 SQL 语句上分布式地处理多个 OSS 上的语音数据文件。同样的方法,也可以方便的利用 MaxCompute 的大规模计算能力,完成对图像,视频等各种类型非结构化数据的处理。