开发者社区> 传学> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

基于自定义日志打印的UDAF调试

简介: 看到最近有一些用户,代码在本地IDE环境里调试成功了后,到线上调试出现结果不符合预期的情况。因为IDE里无法模拟多个worker进行分布式调试UDAF的场景,所以有一些BUG可能需要到线上用一些简单的测试数据进行调试。这里用最简单的手工打印日志的方法,针对代码调试中最麻烦的UDAF的例子做一次调试。
+关注继续查看

看到最近有一些用户,代码在本地IDE环境里调试成功了后,到线上调试出现结果不符合预期的情况。因为IDE里无法模拟多个worker进行分布式调试UDAF的场景,所以有一些BUG可能需要到线上用一些简单的测试数据进行调试。这里用最简单的手工打印日志的方法,针对代码调试中最麻烦的UDAF的例子做一次调试。通过问题的定位和解决,希望能给大家在面对UDF的线上调试的时候提供一些思路。

初始化

首先,线上的真实数据可能非常多,千万不要直接对着上亿条数据直接调试,否则很难定位到原因。面对线上的问题,最好先根据数据情况,简化计算场景。比如我这里,就先把测试数据简化成:

drop table if exists testUDAF;
create table testUDAF(
    str string
) partitioned by (ds string);

--dual表是我早前已经创建好的就一条数据的表
insert overwrite table testUDAF partition (ds)
select str,ds from (
    select 'a' as str,1 as ds from dual union all
    select 'a' as str,1 as ds from dual union all
    select 'b' as str,1 as ds from dual union all
    select 'a' as str,2 as ds from dual union all
    select 'c' as str,2 as ds from dual union all
    select 'c' as str,2 as ds from dual
) a;

select * from testUDAF;

可以看到模拟数据是
screenshot
这样一共6一条记录,分布在2个不同的分区里。
我们希望UDAF的计算结果能类似:

SELECT wm_concat(',', concat(str, ':', cnt)) AS ret
FROM (
    SELECT str, COUNT(*) AS cnt
    FROM testUDAF
    GROUP BY str
) a;

screenshot

代码编写

在本地已经调试好的JAVA代码如下:

package com.aliyun.odps.udaf;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;

import com.aliyun.odps.io.NullWritable;
import com.aliyun.odps.io.Text;
import com.aliyun.odps.io.Writable;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.Aggregator;
import com.aliyun.odps.udf.annotation.Resolve;

@Resolve({"string->string"})
public class MySum extends Aggregator {
    private static final String rd = ":";
    private static final String fd = ",";

    private static class SumBuffer implements Writable {
        private HashMap<String, Long> dict = new HashMap<>();

        @Override
        public void write(DataOutput out) throws IOException {
            String dictStr = dict.toString();
            out.writeUTF(dictStr);
        }

        /*
         * 做了个简单的反序列化
         * */
        @Override
        public void readFields(DataInput in) throws IOException {
            String dictStr = in.readUTF();
            String[] tokens = dictStr.replace("{", "").replace("}", "").replace(" ","").split(",");
            for(int i=0;i<tokens.length;i++) {
                String[] strings = tokens[i].split("=");
                if(strings.length==2) {
                    dict.put(strings[0], Long.parseLong(strings[1]));
                }
            }
        }
    }


    @Override
    public Writable newBuffer() {
        return new SumBuffer();
    }

    @Override
    public void iterate(Writable buffer, Writable[] args) throws UDFException {
        SumBuffer iterateDictBuffer = (SumBuffer) buffer;
        String content;

        if (args[0] instanceof NullWritable) {
            content = "Null";
        } else {
            content = args[0].toString();
        }
        Long count = iterateDictBuffer.dict.containsKey(content) ? iterateDictBuffer.dict.get(content) : 0L;

        iterateDictBuffer.dict.put(content, count + 1);
    }

    @Override
    public void merge(Writable buffer, Writable partial) throws UDFException {
        SumBuffer buf = (SumBuffer) buffer;
        SumBuffer p = (SumBuffer) partial;
        
        for (Entry<String, Long> entry : p.dict.entrySet()) {
            Long count = buf.dict.containsKey(entry.getKey()) ? buf.dict.get(entry.getKey()) + entry.getValue() : entry.getValue();
            buf.dict.put(entry.getKey(), count);
        }
    }


    @Override
    public Writable terminate(Writable buffer) throws UDFException {
        SumBuffer buf = (SumBuffer) buffer;

        StringBuilder sb = new StringBuilder();
        for (Entry<String, Long> entry : buf.dict.entrySet()) {
            sb.append(entry.getKey()).append(rd).append(entry.getValue()).append(fd);
        }
        
        Text resault = new Text();
        resault.set(sb.substring(0,sb.length()-1));
        return resault;
    }
}

因为逻辑不复杂,所以也没有添加更多的注释。可以看到用一个Map来存放中间数据,并用toString来做序列化,然后写了段简单的代码进行反序列化。到了terminate后,拼成需要的结果再返回。

打包后,注册一下函数并测试一下结果:

odps@ >add jar D:\\mysum.jar -f;
OK: Resource 'mysum.jar' have been updated.
odps@ >create function mysum as com.aliyun.odps.udaf.MySum using mysum.jar;
Success: Function 'mysum' have been created.
odps@ >select mysum(str) from testUDAF;
+-----+
| _c0 |
+-----+
| a:3,b:1,c:4 |
+-----+

排查思路

可以看到,这里c的值不知道为什么变成了4,这个是在本地没有发现的问题。还好我们的数据量比较小,所以定位起来比较方便。目前的思路是,我们已经明确输入的数据是什么,也知道我们期望的结果是什么。那么我们首先需要知道,在中间数据的一步步流转的过程中,从哪里开始和我们预期的不一样。定位到是哪里开始数据和预期不符合后,再结合上下文的代码逻辑,定位到问题的原因。

首先我们给代码加上一些异常打印,看看流转过程中的数据分别是什么。通过System.err.println,我们把我们想要的信息打印到stderr里。

    @Override
    public void iterate(Writable buffer, Writable[] args) throws UDFException {
        SumBuffer iterateDictBuffer = (SumBuffer) buffer;
        String content;

        if (args[0] instanceof NullWritable) {
            content = "Null";
        } else {
            content = args[0].toString();
        }
        Long count = iterateDictBuffer.dict.containsKey(content) ? iterateDictBuffer.dict.get(content) : 0L;
        System.err.println("input in iterate:" + content+"\tdict:"+iterateDictBuffer.dict);    //拿到原始的输入和当前的状态
        iterateDictBuffer.dict.put(content, count + 1);
        System.err.println("output in iterate:" + iterateDictBuffer.dict);                    //打印iterate输出的内容
    }

    @Override
    public void merge(Writable buffer, Writable partial) throws UDFException {
        SumBuffer buf = (SumBuffer) buffer;
        SumBuffer p = (SumBuffer) partial;
        System.err.println("buffer in merge:" + buf.dict);                    //打印merge里的buffer的内容
        System.err.println("partial in merge:" + p.dict);                    //打印merge里的partial的内容
        for (Entry<String, Long> entry : p.dict.entrySet()) {
            Long count = buf.dict.containsKey(entry.getKey()) ? buf.dict.get(entry.getKey()) + entry.getValue() : entry.getValue();
            buf.dict.put(entry.getKey(), count);
        }
        System.err.println("output in merge:" + buf.dict);                    //打印merge里的输出的内容
    }


    @Override
    public Writable terminate(Writable buffer) throws UDFException {
        SumBuffer buf = (SumBuffer) buffer;
        System.err.println("output in terminate:" + buf.dict);                    //打印terminate里的输入的内容
        StringBuilder sb = new StringBuilder();
        for (Entry<String, Long> entry : buf.dict.entrySet()) {
            sb.append(entry.getKey()).append(rd).append(entry.getValue()).append(fd);
        }
        System.err.println(sb.substring(0,sb.length()-1));                    //打印terminate里的输出的内容
        Text resault = new Text();
        resault.set(sb.substring(0,sb.length()-1));
        return resault;
    }

先打印了这么几个方法里。这样打印的思路主要是,看看每次调用的时候的数据输入输出是什么。从而定位到是从哪里开始出现的问题。

打包,替换掉jar包,然后重新调用一下函数,可以看到

odps@ >add jar D:\\mysum.jar -f;
OK: Resource 'mysum.jar' have been updated.
odps@ >select mysum(str) from testUDAF;
+-----+
| _c0 |
+-----+
| a:3,b:1,c:4 |
+-----+

结果数据是不变的,但是我们可以看下日志。打开里面的logview,可以看到:
screenshot
里面的日志,2个Map里的日志分别是:

Heap Size: 1024M
input in iterate:a    dict:{}
output in iterate:{a=1}
input in iterate:c    dict:{a=1}
output in iterate:{a=1, c=1}
input in iterate:c    dict:{a=1, c=1}
output in iterate:{a=1, c=2}

Heap Size: 1024M
input in iterate:a    dict:{}
output in iterate:{a=1}
input in iterate:a    dict:{a=1}
output in iterate:{a=2}
input in iterate:b    dict:{a=2}
output in iterate:{a=2, b=1}

看到都是对的,然后看下Reduce里的结果:

Heap Size: 1024M
buffer in merge:{}
partial in merge:{a=1, c=2}
output in merge:{a=1, c=2}
buffer in merge:{a=1, c=2}
partial in merge:{a=2, b=1, c=2}
output in merge:{a=3, b=1, c=4}
output in terminate:{a=3, b=1, c=4}
a:3,b:1,c:4

看一下,partial in merge:{a=2, b=1, c=2} 这条数据不符合预期。照道理说,我们前面输出的是output in iterate:{a=2, b=1},怎么到这里就变成了{a=2, b=1, c=2}了呢。

这种的变化,是在多个worker之间进行传递的时候,我们做了序列号和反序列化,于是我们在这里又打了一些日志:

        @Override
        public void write(DataOutput out) throws IOException {
            String dictStr = dict.toString();
            out.writeUTF(dictStr);
            System.err.println("dict in write:" + dictStr);                    //打印序列化输出
        }

        /*
         * 做了个简单的反序列化
         * */
        @Override
        public void readFields(DataInput in) throws IOException {
            String dictStr = in.readUTF();
            System.err.println("dictStr in readFields:" + dictStr);                    //打印反序列化输出
            String[] tokens = dictStr.replace("{", "").replace("}", "").replace(" ","").split(",");
            for(int i=0;i<tokens.length;i++) {
                String[] strings = tokens[i].split("=");
                if(strings.length==2) {
                    dict.put(strings[0], Long.parseLong(strings[1]));
                }
            }
            System.err.println("dict in readFields:" + dict);                    //打印反序列化输出
        }

重新打包跑一次,这次看到的日志是这样:

--map阶段:
dict in write:{a=1, c=2}
dict in write:{a=2, b=1}

--reduce阶段:
dictStr in readFields:{a=1, c=2}
dict in readFields:{a=1, c=2}
dictStr in readFields:{a=2, b=1}
dict in readFields:{a=2, b=1, c=2}

果然反序列化的时候输出的结果就有问题了。但是从这里还没有明确的证据说明是哪行代码出的问题。看到dict输出的结果不符合预期,我们先看看输入的时候是什么。于是再加一行日志:

        @Override
        public void readFields(DataInput in) throws IOException {
            String dictStr = in.readUTF();
            System.err.println("dictStr in readFields:" + dictStr);                    //打印反序列化输出
            System.err.println("dict in readFields before put:" + dict);                    //打印反序列化输出
            String[] tokens = dictStr.replace("{", "").replace("}", "").replace(" ","").split(",");
            for(int i=0;i<tokens.length;i++) {
                String[] strings = tokens[i].split("=");
                if(strings.length==2) {
                    dict.put(strings[0], Long.parseLong(strings[1]));
                }
            }
            System.err.println("dict in readFields:" + dict);                    //打印反序列化输出
        }

看到这会的reduce阶段日志

dictStr in readFields:{a=1, c=2}
dict in readFields before put:{}
dict in readFields:{a=1, c=2}

dictStr in readFields:{a=2, b=1}
dict in readFields before put:{a=1, c=2}
dict in readFields:{a=2, b=1, c=2}

这下真相大白了。我们第二次调用readFields序列化{a=2, b=1}这个字符串的时候,发现本来应该为空的dict的内容竟然是上次计算后的结果。实际上,在readFields里,相同worker里的SumBuffer被复用了。这种情况下,为了保证计算的准确性,我们可以自己清空一下dict的内容

        @Override
        public void readFields(DataInput in) throws IOException {
            String dictStr = in.readUTF();
            dict = new HashMap<>();
            String[] tokens = dictStr.replace("{", "").replace("}", "").replace(" ","").split(",");
            for(int i=0;i<tokens.length;i++) {
                String[] strings = tokens[i].split("=");
                if(strings.length==2) {
                    dict.put(strings[0], Long.parseLong(strings[1]));
                }
            }
        }
    }

这下终于对了

odps@ >add jar D:\\mysum.jar -f;
OK: Resource 'mysum.jar' have been updated.
odps@ >select mysum(str) from testUDAF;
+-----+
| _c0 |
+-----+
| a:3,b:1,c:2 |
+-----+

总结

代码还有其他更多可以优化的地方。不过这次为了能简单说明调试的过程,简化代码逻辑,就没在这方面再多下功夫。实际的业务代码里还需要考虑到性能和异常捕捉等问题。

System.err.println这个办法虽然很笨,但是很有效,不是吗?

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
MeeGo开发“.NET研究”进程通信核心 DBus调试工具
  我们讲过用QML语言开发MeeGo应用程序,在MeeGo开发中,DBus是MeeGo进程通信的核心。我们简要的看一下MeeGo系统的哪些地方都使用了DBus.   1.从一个应用中启动另一个应用时。
891 0
CI调试
[gerrit]host=gerrit.ruiy-ci.comport=29418project=review-chk.git
465 0
模块已加载,但对dllregisterServer的调用失败
在注册dll或者ocx的时候, 经常会遇到这么一个问题: 模块  已加载,但对dllregisterServer的调用失败,错误代码为0x8004***** 网上有网友回复说需要在管理员的模式下进行注册:本人测试过, 会出现如下问题: 模块  加载失败。
2732 0
RedisManager使用手册(五)-- 自定义Redis安装包
RedisManager物理机安装包的构建和Docker镜像的构建
1142 0
WPF开发-Label自定义背景-Decorator
首先在App.xaml文件当中添加样式和模板
947 0
基于Beego框架的导入导出Excel
基于Beego框架的导入导出Excel,以及解决中文文件名乱码问题
0 0
+关注
传学
传学,专注于大数据领域的解决方案
文章
问答
来源圈子
更多
MaxCompute(原ODPS)是一项面向分析的大数据计算服务,它以Serverless架构提供快速、全托管的在线数据仓库服务,消除传统数据平台在资源扩展性和弹性方面的限制,最小化用户运维投入,使您经济并高效的分析处理海量数据。
+ 订阅
相关文档: MaxCompute
文章排行榜
最热
最新
相关电子书
更多
MaxCompute Logview参数详解和问题排查(废弃)
立即下载
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载