基于自定义日志打印的UDAF调试

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 看到最近有一些用户,代码在本地IDE环境里调试成功了后,到线上调试出现结果不符合预期的情况。因为IDE里无法模拟多个worker进行分布式调试UDAF的场景,所以有一些BUG可能需要到线上用一些简单的测试数据进行调试。这里用最简单的手工打印日志的方法,针对代码调试中最麻烦的UDAF的例子做一次调试。

看到最近有一些用户,代码在本地IDE环境里调试成功了后,到线上调试出现结果不符合预期的情况。因为IDE里无法模拟多个worker进行分布式调试UDAF的场景,所以有一些BUG可能需要到线上用一些简单的测试数据进行调试。这里用最简单的手工打印日志的方法,针对代码调试中最麻烦的UDAF的例子做一次调试。通过问题的定位和解决,希望能给大家在面对UDF的线上调试的时候提供一些思路。

初始化

首先,线上的真实数据可能非常多,千万不要直接对着上亿条数据直接调试,否则很难定位到原因。面对线上的问题,最好先根据数据情况,简化计算场景。比如我这里,就先把测试数据简化成:

drop table if exists testUDAF;
create table testUDAF(
    str string
) partitioned by (ds string);

--dual表是我早前已经创建好的就一条数据的表
insert overwrite table testUDAF partition (ds)
select str,ds from (
    select 'a' as str,1 as ds from dual union all
    select 'a' as str,1 as ds from dual union all
    select 'b' as str,1 as ds from dual union all
    select 'a' as str,2 as ds from dual union all
    select 'c' as str,2 as ds from dual union all
    select 'c' as str,2 as ds from dual
) a;

select * from testUDAF;

可以看到模拟数据是
screenshot
这样一共6一条记录,分布在2个不同的分区里。
我们希望UDAF的计算结果能类似:

SELECT wm_concat(',', concat(str, ':', cnt)) AS ret
FROM (
    SELECT str, COUNT(*) AS cnt
    FROM testUDAF
    GROUP BY str
) a;

screenshot

代码编写

在本地已经调试好的JAVA代码如下:

package com.aliyun.odps.udaf;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;

import com.aliyun.odps.io.NullWritable;
import com.aliyun.odps.io.Text;
import com.aliyun.odps.io.Writable;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.Aggregator;
import com.aliyun.odps.udf.annotation.Resolve;

@Resolve({"string->string"})
public class MySum extends Aggregator {
    private static final String rd = ":";
    private static final String fd = ",";

    private static class SumBuffer implements Writable {
        private HashMap<String, Long> dict = new HashMap<>();

        @Override
        public void write(DataOutput out) throws IOException {
            String dictStr = dict.toString();
            out.writeUTF(dictStr);
        }

        /*
         * 做了个简单的反序列化
         * */
        @Override
        public void readFields(DataInput in) throws IOException {
            String dictStr = in.readUTF();
            String[] tokens = dictStr.replace("{", "").replace("}", "").replace(" ","").split(",");
            for(int i=0;i<tokens.length;i++) {
                String[] strings = tokens[i].split("=");
                if(strings.length==2) {
                    dict.put(strings[0], Long.parseLong(strings[1]));
                }
            }
        }
    }


    @Override
    public Writable newBuffer() {
        return new SumBuffer();
    }

    @Override
    public void iterate(Writable buffer, Writable[] args) throws UDFException {
        SumBuffer iterateDictBuffer = (SumBuffer) buffer;
        String content;

        if (args[0] instanceof NullWritable) {
            content = "Null";
        } else {
            content = args[0].toString();
        }
        Long count = iterateDictBuffer.dict.containsKey(content) ? iterateDictBuffer.dict.get(content) : 0L;

        iterateDictBuffer.dict.put(content, count + 1);
    }

    @Override
    public void merge(Writable buffer, Writable partial) throws UDFException {
        SumBuffer buf = (SumBuffer) buffer;
        SumBuffer p = (SumBuffer) partial;
        
        for (Entry<String, Long> entry : p.dict.entrySet()) {
            Long count = buf.dict.containsKey(entry.getKey()) ? buf.dict.get(entry.getKey()) + entry.getValue() : entry.getValue();
            buf.dict.put(entry.getKey(), count);
        }
    }


    @Override
    public Writable terminate(Writable buffer) throws UDFException {
        SumBuffer buf = (SumBuffer) buffer;

        StringBuilder sb = new StringBuilder();
        for (Entry<String, Long> entry : buf.dict.entrySet()) {
            sb.append(entry.getKey()).append(rd).append(entry.getValue()).append(fd);
        }
        
        Text resault = new Text();
        resault.set(sb.substring(0,sb.length()-1));
        return resault;
    }
}

因为逻辑不复杂,所以也没有添加更多的注释。可以看到用一个Map来存放中间数据,并用toString来做序列化,然后写了段简单的代码进行反序列化。到了terminate后,拼成需要的结果再返回。

打包后,注册一下函数并测试一下结果:

odps@ >add jar D:\\mysum.jar -f;
OK: Resource 'mysum.jar' have been updated.
odps@ >create function mysum as com.aliyun.odps.udaf.MySum using mysum.jar;
Success: Function 'mysum' have been created.
odps@ >select mysum(str) from testUDAF;
+-----+
| _c0 |
+-----+
| a:3,b:1,c:4 |
+-----+

排查思路

可以看到,这里c的值不知道为什么变成了4,这个是在本地没有发现的问题。还好我们的数据量比较小,所以定位起来比较方便。目前的思路是,我们已经明确输入的数据是什么,也知道我们期望的结果是什么。那么我们首先需要知道,在中间数据的一步步流转的过程中,从哪里开始和我们预期的不一样。定位到是哪里开始数据和预期不符合后,再结合上下文的代码逻辑,定位到问题的原因。

首先我们给代码加上一些异常打印,看看流转过程中的数据分别是什么。通过System.err.println,我们把我们想要的信息打印到stderr里。

    @Override
    public void iterate(Writable buffer, Writable[] args) throws UDFException {
        SumBuffer iterateDictBuffer = (SumBuffer) buffer;
        String content;

        if (args[0] instanceof NullWritable) {
            content = "Null";
        } else {
            content = args[0].toString();
        }
        Long count = iterateDictBuffer.dict.containsKey(content) ? iterateDictBuffer.dict.get(content) : 0L;
        System.err.println("input in iterate:" + content+"\tdict:"+iterateDictBuffer.dict);    //拿到原始的输入和当前的状态
        iterateDictBuffer.dict.put(content, count + 1);
        System.err.println("output in iterate:" + iterateDictBuffer.dict);                    //打印iterate输出的内容
    }

    @Override
    public void merge(Writable buffer, Writable partial) throws UDFException {
        SumBuffer buf = (SumBuffer) buffer;
        SumBuffer p = (SumBuffer) partial;
        System.err.println("buffer in merge:" + buf.dict);                    //打印merge里的buffer的内容
        System.err.println("partial in merge:" + p.dict);                    //打印merge里的partial的内容
        for (Entry<String, Long> entry : p.dict.entrySet()) {
            Long count = buf.dict.containsKey(entry.getKey()) ? buf.dict.get(entry.getKey()) + entry.getValue() : entry.getValue();
            buf.dict.put(entry.getKey(), count);
        }
        System.err.println("output in merge:" + buf.dict);                    //打印merge里的输出的内容
    }


    @Override
    public Writable terminate(Writable buffer) throws UDFException {
        SumBuffer buf = (SumBuffer) buffer;
        System.err.println("output in terminate:" + buf.dict);                    //打印terminate里的输入的内容
        StringBuilder sb = new StringBuilder();
        for (Entry<String, Long> entry : buf.dict.entrySet()) {
            sb.append(entry.getKey()).append(rd).append(entry.getValue()).append(fd);
        }
        System.err.println(sb.substring(0,sb.length()-1));                    //打印terminate里的输出的内容
        Text resault = new Text();
        resault.set(sb.substring(0,sb.length()-1));
        return resault;
    }

先打印了这么几个方法里。这样打印的思路主要是,看看每次调用的时候的数据输入输出是什么。从而定位到是从哪里开始出现的问题。

打包,替换掉jar包,然后重新调用一下函数,可以看到

odps@ >add jar D:\\mysum.jar -f;
OK: Resource 'mysum.jar' have been updated.
odps@ >select mysum(str) from testUDAF;
+-----+
| _c0 |
+-----+
| a:3,b:1,c:4 |
+-----+

结果数据是不变的,但是我们可以看下日志。打开里面的logview,可以看到:
screenshot
里面的日志,2个Map里的日志分别是:

Heap Size: 1024M
input in iterate:a    dict:{}
output in iterate:{a=1}
input in iterate:c    dict:{a=1}
output in iterate:{a=1, c=1}
input in iterate:c    dict:{a=1, c=1}
output in iterate:{a=1, c=2}

Heap Size: 1024M
input in iterate:a    dict:{}
output in iterate:{a=1}
input in iterate:a    dict:{a=1}
output in iterate:{a=2}
input in iterate:b    dict:{a=2}
output in iterate:{a=2, b=1}

看到都是对的,然后看下Reduce里的结果:

Heap Size: 1024M
buffer in merge:{}
partial in merge:{a=1, c=2}
output in merge:{a=1, c=2}
buffer in merge:{a=1, c=2}
partial in merge:{a=2, b=1, c=2}
output in merge:{a=3, b=1, c=4}
output in terminate:{a=3, b=1, c=4}
a:3,b:1,c:4

看一下,partial in merge:{a=2, b=1, c=2} 这条数据不符合预期。照道理说,我们前面输出的是output in iterate:{a=2, b=1},怎么到这里就变成了{a=2, b=1, c=2}了呢。

这种的变化,是在多个worker之间进行传递的时候,我们做了序列号和反序列化,于是我们在这里又打了一些日志:

        @Override
        public void write(DataOutput out) throws IOException {
            String dictStr = dict.toString();
            out.writeUTF(dictStr);
            System.err.println("dict in write:" + dictStr);                    //打印序列化输出
        }

        /*
         * 做了个简单的反序列化
         * */
        @Override
        public void readFields(DataInput in) throws IOException {
            String dictStr = in.readUTF();
            System.err.println("dictStr in readFields:" + dictStr);                    //打印反序列化输出
            String[] tokens = dictStr.replace("{", "").replace("}", "").replace(" ","").split(",");
            for(int i=0;i<tokens.length;i++) {
                String[] strings = tokens[i].split("=");
                if(strings.length==2) {
                    dict.put(strings[0], Long.parseLong(strings[1]));
                }
            }
            System.err.println("dict in readFields:" + dict);                    //打印反序列化输出
        }

重新打包跑一次,这次看到的日志是这样:

--map阶段:
dict in write:{a=1, c=2}
dict in write:{a=2, b=1}

--reduce阶段:
dictStr in readFields:{a=1, c=2}
dict in readFields:{a=1, c=2}
dictStr in readFields:{a=2, b=1}
dict in readFields:{a=2, b=1, c=2}

果然反序列化的时候输出的结果就有问题了。但是从这里还没有明确的证据说明是哪行代码出的问题。看到dict输出的结果不符合预期,我们先看看输入的时候是什么。于是再加一行日志:

        @Override
        public void readFields(DataInput in) throws IOException {
            String dictStr = in.readUTF();
            System.err.println("dictStr in readFields:" + dictStr);                    //打印反序列化输出
            System.err.println("dict in readFields before put:" + dict);                    //打印反序列化输出
            String[] tokens = dictStr.replace("{", "").replace("}", "").replace(" ","").split(",");
            for(int i=0;i<tokens.length;i++) {
                String[] strings = tokens[i].split("=");
                if(strings.length==2) {
                    dict.put(strings[0], Long.parseLong(strings[1]));
                }
            }
            System.err.println("dict in readFields:" + dict);                    //打印反序列化输出
        }

看到这会的reduce阶段日志

dictStr in readFields:{a=1, c=2}
dict in readFields before put:{}
dict in readFields:{a=1, c=2}

dictStr in readFields:{a=2, b=1}
dict in readFields before put:{a=1, c=2}
dict in readFields:{a=2, b=1, c=2}

这下真相大白了。我们第二次调用readFields序列化{a=2, b=1}这个字符串的时候,发现本来应该为空的dict的内容竟然是上次计算后的结果。实际上,在readFields里,相同worker里的SumBuffer被复用了。这种情况下,为了保证计算的准确性,我们可以自己清空一下dict的内容

        @Override
        public void readFields(DataInput in) throws IOException {
            String dictStr = in.readUTF();
            dict = new HashMap<>();
            String[] tokens = dictStr.replace("{", "").replace("}", "").replace(" ","").split(",");
            for(int i=0;i<tokens.length;i++) {
                String[] strings = tokens[i].split("=");
                if(strings.length==2) {
                    dict.put(strings[0], Long.parseLong(strings[1]));
                }
            }
        }
    }

这下终于对了

odps@ >add jar D:\\mysum.jar -f;
OK: Resource 'mysum.jar' have been updated.
odps@ >select mysum(str) from testUDAF;
+-----+
| _c0 |
+-----+
| a:3,b:1,c:2 |
+-----+

总结

代码还有其他更多可以优化的地方。不过这次为了能简单说明调试的过程,简化代码逻辑,就没在这方面再多下功夫。实际的业务代码里还需要考虑到性能和异常捕捉等问题。

System.err.println这个办法虽然很笨,但是很有效,不是吗?

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
17天前
|
JSON 安全 API
.net 自定义日志类
在.NET中,创建自定义日志类有助于更好地管理日志信息。示例展示了如何创建、配置和使用日志记录功能,包括写入日志文件、设置日志级别、格式化消息等。注意事项涵盖时间戳、日志级别、JSON序列化、线程安全、日志格式、文件处理及示例使用。请根据需求调整代码。
39 13
|
5月前
|
消息中间件 存储 Java
手动实现 Spring Boot 日志链路追踪:提升调试效率的利器
【8月更文挑战第8天】在复杂的分布式系统中,日志是诊断问题、追踪系统行为的重要工具。然而,随着微服务架构的普及,服务间的调用链路错综复杂,传统的日志记录方式往往难以快速定位问题源头。今天,我们将探讨如何在不依赖外部组件(如Zipkin、Sleuth等)的情况下,手动实现Spring Boot应用的日志链路追踪,让日志定位更加便捷高效。
222 1
|
1月前
|
存储 Prometheus 监控
Docker容器内进行应用调试与故障排除的方法与技巧,包括使用日志、进入容器检查、利用监控工具及检查配置等,旨在帮助用户有效应对应用部署中的挑战,确保应用稳定运行
本文深入探讨了在Docker容器内进行应用调试与故障排除的方法与技巧,包括使用日志、进入容器检查、利用监控工具及检查配置等,旨在帮助用户有效应对应用部署中的挑战,确保应用稳定运行。
61 5
|
2月前
|
前端开发 数据处理 Android开发
Flutter前端开发中的调试技巧与工具使用方法,涵盖调试的重要性、基本技巧如打印日志与断点调试、常用调试工具如Android Studio/VS Code调试器和Flutter Inspector的介绍
本文深入探讨了Flutter前端开发中的调试技巧与工具使用方法,涵盖调试的重要性、基本技巧如打印日志与断点调试、常用调试工具如Android Studio/VS Code调试器和Flutter Inspector的介绍,以及具体操作步骤、常见问题解决、高级调试技巧、团队协作中的调试应用和未来发展趋势,旨在帮助开发者提高调试效率,提升应用质量。
67 8
|
3月前
|
Java 程序员 应用服务中间件
「测试线排查的一些经验-中篇」&& 调试日志实战
「测试线排查的一些经验-中篇」&& 调试日志实战
36 1
「测试线排查的一些经验-中篇」&& 调试日志实战
|
2月前
|
JSON Java 数据库
SpringBoot项目使用AOP及自定义注解保存操作日志
SpringBoot项目使用AOP及自定义注解保存操作日志
61 1
|
4月前
|
Shell Python
salt自定义模块内使用日志例子
salt自定义模块内使用日志例子
|
5月前
|
开发框架 .NET Docker
【Azure 应用服务】App Service .NET Core项目在Program.cs中自定义添加的logger.LogInformation,部署到App Service上后日志不显示Log Stream中的问题
【Azure 应用服务】App Service .NET Core项目在Program.cs中自定义添加的logger.LogInformation,部署到App Service上后日志不显示Log Stream中的问题
|
5月前
|
XML Java 数据库
"揭秘!Spring Boot日志链路追踪大法,让你的调试之路畅通无阻,效率飙升,问题无所遁形!"
【8月更文挑战第11天】在微服务架构中,请求可能跨越多个服务与组件,传统日志记录难以全局追踪问题。本文以电商系统为例,介绍如何手动实现Spring Boot应用的日志链路追踪。通过为每个请求生成唯一追踪ID并贯穿全链路,在服务间传递该ID,并在日志中记录,即使日志分散也能通过ID串联。提供了实现这一机制所需的关键代码片段,包括使用过滤器设置追踪ID、业务代码中的日志记录及Logback配置。此方案显著提升了问题定位的效率,适用于基于Spring Boot构建的微服务环境。
132 4
|
5月前
|
SQL 数据库 Java
Hibernate 日志记录竟藏着这些秘密?快来一探究竟,解锁调试与监控最佳实践
【8月更文挑战第31天】在软件开发中,日志记录对调试和监控至关重要。使用持久化框架 Hibernate 时,合理配置日志可帮助理解其内部机制并优化性能。首先,需选择合适的日志框架,如 Log4j 或 Logback,并配置日志级别;理解 Hibernate 的多级日志,如 DEBUG 和 ERROR,以适应不同开发阶段需求;利用 Hibernate 统计功能监测数据库交互情况;记录自定义日志以跟踪业务逻辑;定期审查和清理日志避免占用过多磁盘空间。综上,有效日志记录能显著提升 Hibernate 应用的性能和稳定性。
59 0