maxCompute的UDAF demo,实现累加功能-阿里云开发者社区

开发者社区> 阴转多云转晴> 正文

maxCompute的UDAF demo,实现累加功能

简介: maxCompute的UDAF demo,实现累加功能
+关注继续查看

需求:将表中数据按照name聚合,并且count进行累加

name count
Jan 1
Jan 2
Feb 3
Feb 1
Mar 1
Mar 5

预期结果:

name count
Jan 3
Feb 7
Mar 13

使用idea的maxCompute studio新增UDAF

_
_
然后自动生成未实现的方法,我的字段name是string,count是bigint
所以@Resolve("string,bigint->bigint")

新增一个class用来存储字段

        private String  name;
        private Long count;
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(name);
            out.writeLong(count);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            name =  in.readUTF();
            count = in.readLong();
        }
    }

这样newBuffer就可以这样写了

    public Writable newBuffer() {
        return new MyBuffer();
    }

还需要一个Map来存储key(name)和value(count),一个long类型的参数存储累加的值

    Long old_count = 0L;//存储累加值
    private LongWritable ret = new LongWritable();//存储输出值

完整代码参考:

public class UDAFTest extends Aggregator {
    private static class MyBuffer implements Writable {
        private String  name;
        private Long count;
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(name);
            out.writeLong(count);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            name =  in.readUTF();
            count = in.readLong();
        }
    }



    @Override
    public Writable newBuffer() {
        return new MyBuffer();
    }
    Map<String,Long> map = new LinkedHashMap<>();
    Long old_count = 0L;
    @Override
    public void iterate(Writable buffer, Writable[] args) throws UDFException {
        String arg = String.valueOf(args[0]);
        Long cnt = Long.parseLong(String.valueOf(args[1]));
        MyBuffer buf = (MyBuffer) buffer;

        if (arg != null) {
            if(map.containsKey(arg)){
                Long newcnt = map.get(arg);
                old_count = cnt+newcnt;
                map.put(arg,old_count);
            }else {
                map.put(arg,old_count+cnt);
            }
        }
        buf.name = arg;
        buf.count = map.get(arg);

    }
    private LongWritable ret = new LongWritable();
    @Override
    public Writable terminate(Writable arg0) throws UDFException {
        MyBuffer buffer = (MyBuffer) arg0;
        ret.set(buffer.count);
        return ret;
    }

    @Override
    public void merge(Writable buffer, Writable partial) throws UDFException {
        MyBuffer buf = (MyBuffer) buffer;
        MyBuffer p = (MyBuffer) partial;
        buf.name = p.name;
        buf.count = p.count;
    }


}

然后通过maxCompute studio发布下
_

发布名为test20191119,这样就可以在Dataworks中调用了。

其中原表数据:
_

_

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
警惕!国内某广告SDK内置“后门”功能,Google Play商店已强制下架
本文讲的是警惕!国内某广告SDK内置“后门”功能,Google Play商店已强制下架,Lookout安全团队近日发现了个信的广告软件开发工具包(SDK),可以通过下载恶意插件,借助其他合法app对用户实施监控。
2197 0
项目owner看这里,MaxCompute全表扫描新功能,给你“失误”的机会
MaxCompute发布了“ALIAS 命令”,提供了在不修改代码的前提下,在MapReduce或自定义函数(UDF) 代码中,通过某个固定的资源名读取不同资源(数据)的需求。
1253 0
asp.net RadGrid分页功能扩展Demo
1、在权限判断代码后面增加如下代码     VSDataTable1 = new DataSet();          public void BindRadGrid()     {              //.
1066 0
请问:hive中avg聚合函数会使用到combiner功能吗?
hive avg函数是否可以使用combiner功能
2899 0
+关注
阴转多云转晴
小白一枚
11
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
《2021云上架构与运维峰会演讲合集》
立即下载
《零基础CSS入门教程》
立即下载
《零基础HTML入门教程》
立即下载