开发者社区> 阴转多云转晴> 正文

maxCompute的UDAF demo,实现累加功能

简介: maxCompute的UDAF demo,实现累加功能
+关注继续查看

需求:将表中数据按照name聚合,并且count进行累加

name count
Jan 1
Jan 2
Feb 3
Feb 1
Mar 1
Mar 5

预期结果:

name count
Jan 3
Feb 7
Mar 13

使用idea的maxCompute studio新增UDAF

_
_
然后自动生成未实现的方法,我的字段name是string,count是bigint
所以@Resolve("string,bigint->bigint")

新增一个class用来存储字段

        private String  name;
        private Long count;
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(name);
            out.writeLong(count);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            name =  in.readUTF();
            count = in.readLong();
        }
    }

这样newBuffer就可以这样写了

    public Writable newBuffer() {
        return new MyBuffer();
    }

还需要一个Map来存储key(name)和value(count),一个long类型的参数存储累加的值

    Long old_count = 0L;//存储累加值
    private LongWritable ret = new LongWritable();//存储输出值

完整代码参考:

public class UDAFTest extends Aggregator {
    private static class MyBuffer implements Writable {
        private String  name;
        private Long count;
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(name);
            out.writeLong(count);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            name =  in.readUTF();
            count = in.readLong();
        }
    }



    @Override
    public Writable newBuffer() {
        return new MyBuffer();
    }
    Map<String,Long> map = new LinkedHashMap<>();
    Long old_count = 0L;
    @Override
    public void iterate(Writable buffer, Writable[] args) throws UDFException {
        String arg = String.valueOf(args[0]);
        Long cnt = Long.parseLong(String.valueOf(args[1]));
        MyBuffer buf = (MyBuffer) buffer;

        if (arg != null) {
            if(map.containsKey(arg)){
                Long newcnt = map.get(arg);
                old_count = cnt+newcnt;
                map.put(arg,old_count);
            }else {
                map.put(arg,old_count+cnt);
            }
        }
        buf.name = arg;
        buf.count = map.get(arg);

    }
    private LongWritable ret = new LongWritable();
    @Override
    public Writable terminate(Writable arg0) throws UDFException {
        MyBuffer buffer = (MyBuffer) arg0;
        ret.set(buffer.count);
        return ret;
    }

    @Override
    public void merge(Writable buffer, Writable partial) throws UDFException {
        MyBuffer buf = (MyBuffer) buffer;
        MyBuffer p = (MyBuffer) partial;
        buf.name = p.name;
        buf.count = p.count;
    }


}

然后通过maxCompute studio发布下
_

发布名为test20191119,这样就可以在Dataworks中调用了。

其中原表数据:
_

_

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
《零基础实现Flume收集网站日志数据到MaxCompute》电子版地址
零基础实现Flume收集网站日志数据到MaxCompute
25 0
《零基础实现hadoop 迁移 MaxCompute 之 数据》电子版地址
零基础实现hadoop 迁移 MaxCompute 之 数据迁移
23 0
《零基础实现hadoop 迁移 MaxCompute 之 数据迁移》电子版地址
零基础实现hadoop 迁移 MaxCompute 之 数据迁移
28 0
MaxCompute 公共云多租户设计的技术要点详解及产品实现特色
公共云大数据平台在多租户的设计和实现方式上有所差异。本文主要介绍在公共云大数据平台的多租实现方案中需要考虑的问题和挑战,重点介绍了MaxCompute在计算和存储多租实现上的特点。期望通过这些介绍来让大家了解大数据云平台多租方案需要关注的技术点和MaxCompute在多租实现上的产品特色。
789 0
MaxCompute最佳实践:SQL实现一行变多行&多行变一行
本文对Dataworks里一行变多行&多行变一行进行实践,其中多行变一行是对现有实践的一个引用,方便大家查找
705 0
MaxCompute中实现IPv4和IPv6地址归属地转换
大数据平台的成熟使得更多种类的非结构化、半结构化的数据分析成为可能。其中把IP地址转换为归属地又是极为常见的一种场景。本文将介绍在MaxCompute如何根据IPv4和IPv6地址实现归属地转换。
3366 0
阿里云Dataworks数据集成工具实现:OTS -> Maxcompute数据同步
数据集成主要用于离线(批量)数据同步。离线(批量)的数据通道通过定义数据来源和去向的数据源和数据集,提供一套抽象化的数据抽取插件(Reader)、数据写入插件(Writer),并基于此框架设计一套简化版的中间数据传输格式,从而实现任意结构化、半结构化数据源之间数据传输。结合用户在使用OTS数据源同步的时候容易出现问题,这里演示:OTS数据源同步数据到Maxcompute的具体实现步骤。
923 0
优化(2)MaxCompute 实现增量数据推送(全量比对增量逻辑)
MaxCompute(ODPS2.0) - 试用新的集合操作命令 EXCEPT & 增量识别
2149 0
友盟+联合EB级云数据仓库 MaxCompute 实现友盟域和企业私域数据全面融合
国内领先的第三方全域数据智能服务商友盟+,联合阿里云EB级云数据仓库 MaxCompute 为企业提供面向分析的,实现友盟域数据与企业私域数据全面融合的自助分析服务“U-DOP数据开放”。
663 0
【云栖号案例 | 互联网】MaxCompute助力千寻位置实现3秒厘米级定位
定位的本质是数据计算,对精度和速度有高要求。混合云架构下,机密数据在专有云完成,大规模数据计算在大数据平台完成,定位数据播发在公共云进行。
1063 0
+关注
阴转多云转晴
小白一枚
文章
问答
文章排行榜
最热
最新
相关电子书
更多
阿里云MaxCompute百问百答
立即下载
大数据处理-原理和MaxCompute实践
立即下载
MaxCompute重磅发布
立即下载