MaxCompute-udaf实现group_concat或listagg

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: odps-udaf实现group_concat或listagg

1. 背景

在某些业务场景中,经常会遇到需要按照A字段分组,再按照B字段排序,并合并聚合C字段的场景,例如:

  1. 业务服务在线对话业务,数据库端针对每一条发送的对话依次进行落库。在分析场景中,针对每一个session,需要按照时间顺序升序,对对话内容进行concat并进行问题分类与情感分析等应用

在传统关系型数据库中,例如mysql的group_concat或者db2的listagg函数均可以实现上述的功能。对应到maxcompute(原odps)则为wm_concat函数,本文通过如下两种方案对该功能在odps上进行实现:

  1. odps侧需多个内建函数配合
  2. odps-udaf以python为例

2. odps内建函数实现代码

参考:https://help.aliyun.com/document_detail/400533.html

【wm_concat不保证有序】

目前不能通过该参考链接上的set odps.sql.validate.orderby.limit=false; 结合子查询排序的方式来进行实现,该方案在通过logview或者explain查看执行计划可以发现(如下图),整个任务仅一个Map阶段一个mapper,子查询做的sort动作应该是在执行计划优化的时候直接进行了下推,并未产生已排序的中间表,导致数据结果的排序不符合预期。

较小数据量的情况下,可以通过手动落地中间表的形式排序,并通过wm_concat得到符合预期排序的聚合结果,但是该方案存在如下限制:

  1. 在一个Mapper的前提下符合预期,这就间接的限制了中间表不能有分区,不能数据量过大超过block_size的额定大小(128M or default?)

不符合上述规则的情况下,同一个key的数据会在shuffle到reducer的过程中存在排序不一定全都有序的问题,如下图1所示,当数据量足够大(当前排序表数据量2600万),大量数据会是有序的(猜测因为他们在一个mapper中顺序读取落地block),但是因为mapper足够多(指定odps.stage.mapper.split.size=1,当前任务分片数245,如下图2所示),不能保证所有id对应的所有值均分配到同一个mapper,导致排序不符合预期。

实现步骤样例代码参考如下:

-- 创建样例表createtable if not EXISTS price_total
(name STRING,price int,saleid int);-- 创建根据groupbykey和待聚合字段排序好的中间表createtable price_total_mid asselect name, price from price_total orderby name, price desc;-- 使用wm_concat进行聚合select name, wm_concat(',', price)as price_new from price_total_mid groupby name;

【实现结果】

3. udaf实现代码-python

通过自建udaf实现

思路:

  1. mapper阶段new复杂数组的buffer,存储聚合字段和排序字段
  2. reducer阶段的terminate方法中再进行某个key下的全局排序
  3. 该任务执行过程对reducer的压力较大,可以通过调整odps.stage.reducer.mem来增大内存以便存储排序字段和待聚合字段,并进行排序。

代码参考如下:

fromodps.udfimportannotatefromodps.udfimportBaseUDAFfromdatetimeimportdatetime@annotate('STRING,DATETIME->STRING')
classListagg(BaseUDAF):
defnew_buffer(self):
return []
defiterate(self, buffer, context, datime):
buffer.append([datetime.timestamp(datime), context])
defmerge(self, buffer, pbuffer):
buffer.extend(pbuffer)
defterminate(self, buffer):
return'|'.join(item[1] foriteminsorted(buffer, key=lambdax: x[0], reverse=True))

4. 问题

【报错】

FAILED: ODPS-0123055:Script exception - ValueError: unmarshallable object

【报错原因】

buffer 里的元素不是 marshal对象,python环境下,复杂类型的buffer的时候高频遇到

  1. marshal官网参考https://docs.python.org/3.3/library/marshal.html?spm=a2c6h.12873639.article-detail.164.67fe1869Y1j9hU
  2. buffer里的元素是list、dict或者工厂函数defaultdict等时,里面的元素、key或者value都必须是marshal对象

【解决方案】

可以将复杂类型对象转换为基础类型字段,本例中主要是datetime.datetime对象marshal导致报错,可以将时间戳转换为unix_timestamp,转换为int类型来解决这一问题

【参考链接】

https://blog.csdn.net/taiyangdao/article/details/78409331

https://blog.csdn.net/www_rsqdz_net/article/details/79798798

相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
相关文章
|
1月前
|
机器学习/深度学习 传感器 分布式计算
数据才是真救命的:聊聊如何用大数据提升灾难预警的精准度
数据才是真救命的:聊聊如何用大数据提升灾难预警的精准度
110 14
|
2月前
|
机器学习/深度学习 运维 监控
运维不怕事多,就怕没数据——用大数据喂饱你的运维策略
运维不怕事多,就怕没数据——用大数据喂饱你的运维策略
99 0
|
3月前
|
数据采集 分布式计算 DataWorks
ODPS在某公共数据项目上的实践
本项目基于公共数据定义及ODPS与DataWorks技术,构建一体化智能化数据平台,涵盖数据目录、归集、治理、共享与开放六大目标。通过十大子系统实现全流程管理,强化数据安全与流通,提升业务效率与决策能力,助力数字化改革。
111 4
|
3月前
|
分布式计算 DataWorks 数据处理
在数据浪潮中前行:记录一次我与ODPS的实践、思考与展望
本文详细介绍了在 AI 时代背景下,如何利用阿里云 ODPS 平台(尤其是 MaxCompute)进行分布式多模态数据处理的实践过程。内容涵盖技术架构解析、完整操作流程、实际部署步骤以及未来发展方向,同时结合 CSDN 博文深入探讨了多模态数据处理的技术挑战与创新路径,为企业提供高效、低成本的大规模数据处理方案。
240 3
|
3月前
|
SQL 人工智能 分布式计算
ODPS:数据浪潮中的成长与突围
本文讲述了作者在大数据浪潮中,通过引入阿里云ODPS体系(包括MaxCompute、DataWorks、Hologres)解决数据处理瓶颈、实现业务突破与个人成长的故事。从被海量数据困扰到构建“离线+实时”数据架构,ODPS不仅提升了数据处理效率,更推动了技术能力与业务影响力的双重跃迁。
|
1月前
|
传感器 人工智能 监控
数据下田,庄稼不“瞎种”——聊聊大数据如何帮农业提效
数据下田,庄稼不“瞎种”——聊聊大数据如何帮农业提效
108 14
|
11天前
|
传感器 人工智能 监控
拔俗多模态跨尺度大数据AI分析平台:让复杂数据“开口说话”的智能引擎
在数字化时代,多模态跨尺度大数据AI分析平台应运而生,打破数据孤岛,融合图像、文本、视频等多源信息,贯通微观与宏观尺度,实现智能诊断、预测与决策,广泛应用于医疗、制造、金融等领域,推动AI从“看懂”到“会思考”的跃迁。
|
1月前
|
机器学习/深度学习 传感器 监控
吃得安心靠数据?聊聊用大数据盯紧咱们的餐桌安全
吃得安心靠数据?聊聊用大数据盯紧咱们的餐桌安全
76 1
|
1月前
|
数据采集 自动驾驶 机器人
数据喂得好,机器人才能学得快:大数据对智能机器人训练的真正影响
数据喂得好,机器人才能学得快:大数据对智能机器人训练的真正影响
125 1
|
2月前
|
机器学习/深度学习 监控 大数据
数据当“安全带”:金融市场如何用大数据玩转风险控制?
数据当“安全带”:金融市场如何用大数据玩转风险控制?
110 10

热门文章

最新文章