MaxCompute-udaf实现group_concat或listagg

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: odps-udaf实现group_concat或listagg

1. 背景

在某些业务场景中,经常会遇到需要按照A字段分组,再按照B字段排序,并合并聚合C字段的场景,例如:

  1. 业务服务在线对话业务,数据库端针对每一条发送的对话依次进行落库。在分析场景中,针对每一个session,需要按照时间顺序升序,对对话内容进行concat并进行问题分类与情感分析等应用

在传统关系型数据库中,例如mysql的group_concat或者db2的listagg函数均可以实现上述的功能。对应到maxcompute(原odps)则为wm_concat函数,本文通过如下两种方案对该功能在odps上进行实现:

  1. odps侧需多个内建函数配合
  2. odps-udaf以python为例

2. odps内建函数实现代码

参考:https://help.aliyun.com/document_detail/400533.html

【wm_concat不保证有序】

目前不能通过该参考链接上的set odps.sql.validate.orderby.limit=false; 结合子查询排序的方式来进行实现,该方案在通过logview或者explain查看执行计划可以发现(如下图),整个任务仅一个Map阶段一个mapper,子查询做的sort动作应该是在执行计划优化的时候直接进行了下推,并未产生已排序的中间表,导致数据结果的排序不符合预期。

较小数据量的情况下,可以通过手动落地中间表的形式排序,并通过wm_concat得到符合预期排序的聚合结果,但是该方案存在如下限制:

  1. 在一个Mapper的前提下符合预期,这就间接的限制了中间表不能有分区,不能数据量过大超过block_size的额定大小(128M or default?)

不符合上述规则的情况下,同一个key的数据会在shuffle到reducer的过程中存在排序不一定全都有序的问题,如下图1所示,当数据量足够大(当前排序表数据量2600万),大量数据会是有序的(猜测因为他们在一个mapper中顺序读取落地block),但是因为mapper足够多(指定odps.stage.mapper.split.size=1,当前任务分片数245,如下图2所示),不能保证所有id对应的所有值均分配到同一个mapper,导致排序不符合预期。

实现步骤样例代码参考如下:

-- 创建样例表createtable if not EXISTS price_total
(name STRING,price int,saleid int);-- 创建根据groupbykey和待聚合字段排序好的中间表createtable price_total_mid asselect name, price from price_total orderby name, price desc;-- 使用wm_concat进行聚合select name, wm_concat(',', price)as price_new from price_total_mid groupby name;

【实现结果】

3. udaf实现代码-python

通过自建udaf实现

思路:

  1. mapper阶段new复杂数组的buffer,存储聚合字段和排序字段
  2. reducer阶段的terminate方法中再进行某个key下的全局排序
  3. 该任务执行过程对reducer的压力较大,可以通过调整odps.stage.reducer.mem来增大内存以便存储排序字段和待聚合字段,并进行排序。

代码参考如下:

fromodps.udfimportannotatefromodps.udfimportBaseUDAFfromdatetimeimportdatetime@annotate('STRING,DATETIME->STRING')
classListagg(BaseUDAF):
defnew_buffer(self):
return []
defiterate(self, buffer, context, datime):
buffer.append([datetime.timestamp(datime), context])
defmerge(self, buffer, pbuffer):
buffer.extend(pbuffer)
defterminate(self, buffer):
return'|'.join(item[1] foriteminsorted(buffer, key=lambdax: x[0], reverse=True))

4. 问题

【报错】

FAILED: ODPS-0123055:Script exception - ValueError: unmarshallable object

【报错原因】

buffer 里的元素不是 marshal对象,python环境下,复杂类型的buffer的时候高频遇到

  1. marshal官网参考https://docs.python.org/3.3/library/marshal.html?spm=a2c6h.12873639.article-detail.164.67fe1869Y1j9hU
  2. buffer里的元素是list、dict或者工厂函数defaultdict等时,里面的元素、key或者value都必须是marshal对象

【解决方案】

可以将复杂类型对象转换为基础类型字段,本例中主要是datetime.datetime对象marshal导致报错,可以将时间戳转换为unix_timestamp,转换为int类型来解决这一问题

【参考链接】

https://blog.csdn.net/taiyangdao/article/details/78409331

https://blog.csdn.net/www_rsqdz_net/article/details/79798798

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
相关文章
|
28天前
|
存储 大数据 数据挖掘
【数据新纪元】Apache Doris:重塑实时分析性能,解锁大数据处理新速度,引爆数据价值潜能!
【9月更文挑战第5天】Apache Doris以其卓越的性能、灵活的架构和高效的数据处理能力,正在重塑实时分析的性能极限,解锁大数据处理的新速度,引爆数据价值的无限潜能。在未来的发展中,我们有理由相信Apache Doris将继续引领数据处理的潮流,为企业提供更快速、更准确、更智能的数据洞察和决策支持。让我们携手并进,共同探索数据新纪元的无限可能!
78 11
|
2月前
|
存储 分布式计算 大数据
MaxCompute 数据分区与生命周期管理
【8月更文第31天】随着大数据分析需求的增长,如何高效地管理和组织数据变得至关重要。阿里云的 MaxCompute(原名 ODPS)是一个专为海量数据设计的计算服务,它提供了丰富的功能来帮助用户管理和优化数据。本文将重点讨论 MaxCompute 中的数据分区策略和生命周期管理方法,并通过具体的代码示例来展示如何实施这些策略。
81 1
|
2月前
数据平台问题之在数据影响决策的过程中,如何实现“决策/行动”阶段
数据平台问题之在数据影响决策的过程中,如何实现“决策/行动”阶段
|
2月前
|
存储 监控 安全
大数据架构设计原则:构建高效、可扩展与安全的数据生态系统
【8月更文挑战第23天】大数据架构设计是一个复杂而系统的工程,需要综合考虑业务需求、技术选型、安全合规等多个方面。遵循上述设计原则,可以帮助企业构建出既高效又安全的大数据生态系统,为业务创新和决策支持提供强有力的支撑。随着技术的不断发展和业务需求的不断变化,持续优化和调整大数据架构也将成为一项持续的工作。
|
2月前
|
分布式计算 DataWorks 关系型数据库
DataWorks产品使用合集之ODPS数据怎么Merge到MySQL数据库
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
2月前
|
消息中间件 数据采集 JSON
大数据 - DWD&DIM 行为数据
大数据 - DWD&DIM 行为数据
43 1
|
2月前
|
机器学习/深度学习 人工智能 分布式计算
理解并利用大数据的力量:解锁数据背后的价值
【8月更文挑战第7天】大数据已成为推动社会进步和经济发展的重要力量。通过理解并利用大数据的力量,企业可以解锁数据背后的价值,优化业务流程、提升决策效率和创新能力。然而,大数据应用也面临着诸多挑战和风险,需要企业不断学习和实践以应对。相信在未来的发展中,大数据将为我们带来更多的惊喜和机遇。
|
2月前
|
分布式计算 安全 大数据
MaxCompute 的安全性和数据隐私保护
【8月更文第31天】在当今数字化转型的时代背景下,企业越来越依赖于大数据分析来推动业务增长。与此同时,数据安全和隐私保护成为了不容忽视的关键问题。作为阿里巴巴集团推出的大数据处理平台,MaxCompute(原名 ODPS)致力于为企业提供高效、安全的数据处理解决方案。本文将探讨 MaxCompute 在数据安全方面的实践,包括数据加密、访问控制及合规性考虑等方面。
66 0
|
2月前
|
数据采集 数据挖掘 Python
python爬虫去哪儿网上爬取旅游景点14万条,可以做大数据分析的数据基础
本文介绍了使用Python编写的爬虫程序,成功从去哪儿网上爬取了14万条旅游景点信息,为大数据分析提供了数据基础。
|
2月前
|
存储 分布式计算 大数据
大数据处理竟然这么简单?学会这几招,你也能在数据洪流中游刃有余,秒变数据大师!
【8月更文挑战第6天】面对海量数据,有效处理成为关键。本文介绍大规模数据处理的核心挑战及解决方案,涵盖分布式存储(如HDFS)和计算(如Spark)。通过示例代码展示HDFS文件读写及Spark数据处理流程。此外,还强调了数据质量、安全及合理资源配置的重要性,助您在数据海洋中洞察先机。
59 1

热门文章

最新文章

下一篇
无影云桌面