MaxCompute自定义extractor访问OSS文本文件DateTime类型数据

本文涉及的产品
对象存储 OSS,OSS 加速器 50 GB 1个月
简介: MaxCompute自定义extractor访问OSS文本文件,官方示例无法支持DateTime数据读入。该文档示范引入joda-time,解决自定义时间日期格式,读入外部非结构化数据。

根据产品文档《访问OSS非结构化数据》,自定义Extractor访问OSS。github项目详见:TextExtractor

一、问题

该extractor在读取非结构化数据时,如果字段存在DateTime类型(例如:2019-10-27 19:44:36),会出现如下报错:
20191104162246

FAILED: ODPS-0123131:User defined function exception - Traceback:
java.lang.IllegalArgumentException
    at java.sql.Date.valueOf(Date.java:143)
    at com.aliyun.odps.udf.example.text.TextExtractor.textLineToRecord(TextExtractor.java:194)
    at com.aliyun.odps.udf.example.text.TextExtractor.extract(TextExtractor.java:153)
    at com.aliyun.odps.udf.ExtractorHandler.extract(ExtractorHandler.java:120)

根据堆栈,查看指定位置的代码:Date.valueOf(parts[i]),其中java.sql.Date.valueOf(),查询该函数官方文档,发现只能支持形如:"yyyy-[m]m-[d]d"的String类型参数。不支持时间部分。
20191104162644
20191104163839

二、解决方法

引入joda-time依赖

<dependency>
  <groupId>joda-time</groupId>
  <artifactId>joda-time</artifactId>
  <version>2.10</version>
</dependency>
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;

引入DateTimeFormat.forPattern(),指定日期格式对文本数据进行读取。

record.setDate(index, new Date(DateTime.parse(parts[i], DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).getMillis()));

三、结果验证

1. extractor项目打包生成jar包,通过odpscmd上传resource

add jar /Users/gary/big_data/odps/text_extractor/target/text_extractor-1.0-SNAPSHOT.jar

/Users/gary/big_data/odps/text_extractor/target/text_extractor-1.0-SNAPSHOT.jar,替换为本地实际jar包路径

另外,extractor使用了Joda-Time,需要额外添加第三方包。
add jar /Users/gary/.m2/repository/joda-time/joda-time/2.10/joda-time-2.10.jar

/Users/gary/.m2/repository/joda-time/joda-time/2.10/joda-time-2.10.jar,替换为本地实际jar包路径

2. DDL建表,直接在odpscmd或者datastudio执行

CREATE EXTERNAL TABLE video_play_log
(
    UUID STRING
    ,action STRING
    ,ip STRING
    ,time datetime
)
STORED BY 'me.gary.test.odps.examples.TextStorageHandler'
WITH SERDEPROPERTIES ( 
 'odps.properties.rolearn'='acs:ram::<填写主账号uid>:role/aliyunodpsdefaultrole',
 'delimiter'='^'  --SERDEPROPERITES可以指定参数,这些参数会通过DataAttributes传递到Extractor代码中。
 )
 LOCATION 'oss://oss-cn-hangzhou-internal.aliyuncs.com/<Bucket名称>/<目录名称>/'
 USING 'text_extractor-1.0-SNAPSHOT.jar,joda-time-2.10.jar';

odps.properties.rolearn中的信息是RAM中AliyunODPSDefaultRole的ARN信息。通过RAM控制台中的角色详情获取。
OSS的连接格式为oss://oss-cn-shanghai-internal.aliyuncs.com/Bucket名称/目录名称/。按实际信息替换

3. 上传测试数据至oss bucket的指定目录,命名video_play_log.txt。

5c661071dba64d5080c91da085ff1073^视频播放页-点击-快进^27.17.94.60^2019-10-27 19:44:36

4. select外部表

select * from <project_name>.video_play_log;
读取结果:
20191104170726

相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
目录
相关文章
|
消息中间件 分布式计算 大数据
大数据-113 Flink DataStreamAPI 程序输入源 自定义输入源 非并行源与并行源
大数据-113 Flink DataStreamAPI 程序输入源 自定义输入源 非并行源与并行源
206 0
|
10月前
|
Web App开发 监控 安全
OSS客户端签名直传实践:Web端安全上传TB级文件方案(含STS临时授权)
本文深入解析了客户端直传技术,涵盖架构设计、安全机制、性能优化等方面。通过STS临时凭证与分片上传实现高效安全的文件传输,显著降低服务端负载与上传耗时,提升系统稳定性与用户体验。
862 2
|
消息中间件 监控 数据挖掘
【有奖实践】轻量消息队列(原 MNS)订阅 OSS 事件实时处理文件变动
当你需要对对象存储 OSS(Object Storage Service)中的文件变动进行实时处理、同步、监听、业务触发、日志记录等操作时,你可以通过设置 OSS 的事件通知规则,自定义关注的文件,并将 OSS 事件推送到轻量消息队列(原 MNS)的队列或主题中,开发者的服务即可及时收到相关通知,并通过消费消息进行后续的业务处理。
318 95
|
10月前
|
存储 缓存 分布式计算
OSS大数据分析集成:MaxCompute直读OSS外部表优化查询性能(减少数据迁移的ETL成本)
MaxCompute直读OSS外部表优化方案,解决传统ETL架构中数据同步延迟高、传输成本大、维护复杂等问题。通过存储格式优化(ORC/Parquet)、分区剪枝、谓词下推与元数据缓存等技术,显著提升查询性能并降低成本。结合冷热数据分层与并发控制策略,实现高效数据分析。
258 2
|
自然语言处理 大数据 应用服务中间件
大数据-172 Elasticsearch 索引操作 与 IK 分词器 自定义停用词 Nginx 服务
大数据-172 Elasticsearch 索引操作 与 IK 分词器 自定义停用词 Nginx 服务
346 5
|
分布式计算 DataWorks 调度
oss数据同步maxcompute报错
在使用阿里云DataWorks同步OSS数据至MaxCompute时,遇到“Input is not in the .gz format”的报错。问题源于目标目录中存在一个空文件,导致同步时识别错误。
|
分布式计算 监控 大数据
大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction
大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction
268 0
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
374 0
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
消息中间件 分布式计算 算法
大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器
大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器
275 3