MaxCompute-udaf实现group_concat或listagg

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: odps-udaf实现group_concat或listagg

1. 背景

在某些业务场景中,经常会遇到需要按照A字段分组,再按照B字段排序,并合并聚合C字段的场景,例如:

  1. 业务服务在线对话业务,数据库端针对每一条发送的对话依次进行落库。在分析场景中,针对每一个session,需要按照时间顺序升序,对对话内容进行concat并进行问题分类与情感分析等应用

在传统关系型数据库中,例如mysql的group_concat或者db2的listagg函数均可以实现上述的功能。对应到maxcompute(原odps)则为wm_concat函数,本文通过如下两种方案对该功能在odps上进行实现:

  1. odps侧需多个内建函数配合
  2. odps-udaf以python为例

2. odps内建函数实现代码

参考:https://help.aliyun.com/document_detail/400533.html

【wm_concat不保证有序】

目前不能通过该参考链接上的set odps.sql.validate.orderby.limit=false; 结合子查询排序的方式来进行实现,该方案在通过logview或者explain查看执行计划可以发现(如下图),整个任务仅一个Map阶段一个mapper,子查询做的sort动作应该是在执行计划优化的时候直接进行了下推,并未产生已排序的中间表,导致数据结果的排序不符合预期。

较小数据量的情况下,可以通过手动落地中间表的形式排序,并通过wm_concat得到符合预期排序的聚合结果,但是该方案存在如下限制:

  1. 在一个Mapper的前提下符合预期,这就间接的限制了中间表不能有分区,不能数据量过大超过block_size的额定大小(128M or default?)

不符合上述规则的情况下,同一个key的数据会在shuffle到reducer的过程中存在排序不一定全都有序的问题,如下图1所示,当数据量足够大(当前排序表数据量2600万),大量数据会是有序的(猜测因为他们在一个mapper中顺序读取落地block),但是因为mapper足够多(指定odps.stage.mapper.split.size=1,当前任务分片数245,如下图2所示),不能保证所有id对应的所有值均分配到同一个mapper,导致排序不符合预期。

实现步骤样例代码参考如下:

-- 创建样例表createtable if not EXISTS price_total
(name STRING,price int,saleid int);-- 创建根据groupbykey和待聚合字段排序好的中间表createtable price_total_mid asselect name, price from price_total orderby name, price desc;-- 使用wm_concat进行聚合select name, wm_concat(',', price)as price_new from price_total_mid groupby name;

【实现结果】

3. udaf实现代码-python

通过自建udaf实现

思路:

  1. mapper阶段new复杂数组的buffer,存储聚合字段和排序字段
  2. reducer阶段的terminate方法中再进行某个key下的全局排序
  3. 该任务执行过程对reducer的压力较大,可以通过调整odps.stage.reducer.mem来增大内存以便存储排序字段和待聚合字段,并进行排序。

代码参考如下:

fromodps.udfimportannotatefromodps.udfimportBaseUDAFfromdatetimeimportdatetime@annotate('STRING,DATETIME->STRING')
classListagg(BaseUDAF):
defnew_buffer(self):
return []
defiterate(self, buffer, context, datime):
buffer.append([datetime.timestamp(datime), context])
defmerge(self, buffer, pbuffer):
buffer.extend(pbuffer)
defterminate(self, buffer):
return'|'.join(item[1] foriteminsorted(buffer, key=lambdax: x[0], reverse=True))

4. 问题

【报错】

FAILED: ODPS-0123055:Script exception - ValueError: unmarshallable object

【报错原因】

buffer 里的元素不是 marshal对象,python环境下,复杂类型的buffer的时候高频遇到

  1. marshal官网参考https://docs.python.org/3.3/library/marshal.html?spm=a2c6h.12873639.article-detail.164.67fe1869Y1j9hU
  2. buffer里的元素是list、dict或者工厂函数defaultdict等时,里面的元素、key或者value都必须是marshal对象

【解决方案】

可以将复杂类型对象转换为基础类型字段,本例中主要是datetime.datetime对象marshal导致报错,可以将时间戳转换为unix_timestamp,转换为int类型来解决这一问题

【参考链接】

https://blog.csdn.net/taiyangdao/article/details/78409331

https://blog.csdn.net/www_rsqdz_net/article/details/79798798

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
相关文章
|
2月前
|
存储 机器学习/深度学习 分布式计算
大数据技术——解锁数据的力量,引领未来趋势
【10月更文挑战第5天】大数据技术——解锁数据的力量,引领未来趋势
|
16天前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
128 7
|
16天前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
32 2
|
29天前
|
数据采集 监控 数据管理
数据治理之道:大数据平台的搭建与数据质量管理
【10月更文挑战第26天】随着信息技术的发展,数据成为企业核心资源。本文探讨大数据平台的搭建与数据质量管理,包括选择合适架构、数据处理与分析能力、数据质量标准与监控机制、数据清洗与校验及元数据管理,为企业数据治理提供参考。
74 1
|
2月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
51 3
|
13天前
|
存储 大数据 数据管理
大数据分区简化数据维护
大数据分区简化数据维护
22 4
|
23天前
|
存储 大数据 定位技术
大数据 数据索引技术
【10月更文挑战第26天】
48 3
|
23天前
|
存储 大数据 OLAP
大数据数据分区技术
【10月更文挑战第26天】
58 2
|
26天前
|
消息中间件 分布式计算 大数据
数据为王:大数据处理与分析技术在企业决策中的力量
【10月更文挑战第29天】在信息爆炸的时代,大数据处理与分析技术为企业提供了前所未有的洞察力和决策支持。本文探讨了大数据技术在企业决策中的重要性和实际应用,包括数据的力量、实时分析、数据驱动的决策以及数据安全与隐私保护。通过这些技术,企业能够从海量数据中提取有价值的信息,预测市场趋势,优化业务流程,从而在竞争中占据优势。
74 2
|
28天前
|
数据采集 分布式计算 大数据
数据治理之道:大数据平台的搭建与数据质量管理
【10月更文挑战第27天】在数字化时代,数据治理对于确保数据资产的保值增值至关重要。本文探讨了大数据平台的搭建和数据质量管理的重要性及实践方法。大数据平台应包括数据存储、处理、分析和展示等功能,常用工具如Hadoop、Apache Spark和Flink。数据质量管理则涉及数据的准确性、一致性和完整性,通过建立数据质量评估和监控体系,确保数据分析结果的可靠性。企业应设立数据治理委员会,投资相关工具和技术,提升数据治理的效率和效果。
59 2