1. 背景
在某些业务场景中,经常会遇到需要按照A字段分组,再按照B字段排序,并合并聚合C字段的场景,例如:
- 业务服务在线对话业务,数据库端针对每一条发送的对话依次进行落库。在分析场景中,针对每一个session,需要按照时间顺序升序,对对话内容进行concat并进行问题分类与情感分析等应用
在传统关系型数据库中,例如mysql的group_concat或者db2的listagg函数均可以实现上述的功能。对应到maxcompute(原odps)则为wm_concat函数,本文通过如下两种方案对该功能在odps上进行实现:
- odps侧需多个内建函数配合
- odps-udaf以python为例
2. odps内建函数实现代码
参考:https://help.aliyun.com/document_detail/400533.html
【wm_concat不保证有序】
目前不能通过该参考链接上的set odps.sql.validate.orderby.limit=false; 结合子查询排序的方式来进行实现,该方案在通过logview或者explain查看执行计划可以发现(如下图),整个任务仅一个Map阶段一个mapper,子查询做的sort动作应该是在执行计划优化的时候直接进行了下推,并未产生已排序的中间表,导致数据结果的排序不符合预期。
较小数据量的情况下,可以通过手动落地中间表的形式排序,并通过wm_concat得到符合预期排序的聚合结果,但是该方案存在如下限制:
- 在一个Mapper的前提下符合预期,这就间接的限制了中间表不能有分区,不能数据量过大超过block_size的额定大小(128M or default?)
不符合上述规则的情况下,同一个key的数据会在shuffle到reducer的过程中存在排序不一定全都有序的问题,如下图1所示,当数据量足够大(当前排序表数据量2600万),大量数据会是有序的(猜测因为他们在一个mapper中顺序读取落地block),但是因为mapper足够多(指定odps.stage.mapper.split.size=1,当前任务分片数245,如下图2所示),不能保证所有id对应的所有值均分配到同一个mapper,导致排序不符合预期。
实现步骤样例代码参考如下:
-- 创建样例表createtable if not EXISTS price_total (name STRING,price int,saleid int);-- 创建根据groupbykey和待聚合字段排序好的中间表createtable price_total_mid asselect name, price from price_total orderby name, price desc;-- 使用wm_concat进行聚合select name, wm_concat(',', price)as price_new from price_total_mid groupby name;
【实现结果】
3. udaf实现代码-python
通过自建udaf实现
思路:
- mapper阶段new复杂数组的buffer,存储聚合字段和排序字段
- reducer阶段的terminate方法中再进行某个key下的全局排序
- 该任务执行过程对reducer的压力较大,可以通过调整odps.stage.reducer.mem来增大内存以便存储排序字段和待聚合字段,并进行排序。
代码参考如下:
fromodps.udfimportannotatefromodps.udfimportBaseUDAFfromdatetimeimportdatetime'STRING,DATETIME->STRING') (classListagg(BaseUDAF): defnew_buffer(self): return [] defiterate(self, buffer, context, datime): buffer.append([datetime.timestamp(datime), context]) defmerge(self, buffer, pbuffer): buffer.extend(pbuffer) defterminate(self, buffer): return'|'.join(item[1] foriteminsorted(buffer, key=lambdax: x[0], reverse=True))
4. 问题
【报错】
FAILED: ODPS-0123055:Script exception - ValueError: unmarshallable object
【报错原因】
buffer 里的元素不是 marshal对象,python环境下,复杂类型的buffer的时候高频遇到
- marshal官网参考https://docs.python.org/3.3/library/marshal.html?spm=a2c6h.12873639.article-detail.164.67fe1869Y1j9hU
- buffer里的元素是list、dict或者工厂函数defaultdict等时,里面的元素、key或者value都必须是marshal对象
【解决方案】
可以将复杂类型对象转换为基础类型字段,本例中主要是datetime.datetime对象marshal导致报错,可以将时间戳转换为unix_timestamp,转换为int类型来解决这一问题
【参考链接】
https://blog.csdn.net/taiyangdao/article/details/78409331
https://blog.csdn.net/www_rsqdz_net/article/details/79798798