看到最近有一些用户,代码在本地IDE环境里调试成功了后,到线上调试出现结果不符合预期的情况。因为IDE里无法模拟多个worker进行分布式调试UDAF的场景,所以有一些BUG可能需要到线上用一些简单的测试数据进行调试。这里用最简单的手工打印日志的方法,针对代码调试中最麻烦的UDAF的例子做一次调试。通过问题的定位和解决,希望能给大家在面对UDF的线上调试的时候提供一些思路。
初始化
首先,线上的真实数据可能非常多,千万不要直接对着上亿条数据直接调试,否则很难定位到原因。面对线上的问题,最好先根据数据情况,简化计算场景。比如我这里,就先把测试数据简化成:
drop table if exists testUDAF;
create table testUDAF(
str string
) partitioned by (ds string);
--dual表是我早前已经创建好的就一条数据的表
insert overwrite table testUDAF partition (ds)
select str,ds from (
select 'a' as str,1 as ds from dual union all
select 'a' as str,1 as ds from dual union all
select 'b' as str,1 as ds from dual union all
select 'a' as str,2 as ds from dual union all
select 'c' as str,2 as ds from dual union all
select 'c' as str,2 as ds from dual
) a;
select * from testUDAF;
可以看到模拟数据是
这样一共6一条记录,分布在2个不同的分区里。
我们希望UDAF的计算结果能类似:
SELECT wm_concat(',', concat(str, ':', cnt)) AS ret
FROM (
SELECT str, COUNT(*) AS cnt
FROM testUDAF
GROUP BY str
) a;
代码编写
在本地已经调试好的JAVA代码如下:
package com.aliyun.odps.udaf;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;
import com.aliyun.odps.io.NullWritable;
import com.aliyun.odps.io.Text;
import com.aliyun.odps.io.Writable;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.Aggregator;
import com.aliyun.odps.udf.annotation.Resolve;
@Resolve({"string->string"})
public class MySum extends Aggregator {
private static final String rd = ":";
private static final String fd = ",";
private static class SumBuffer implements Writable {
private HashMap<String, Long> dict = new HashMap<>();
@Override
public void write(DataOutput out) throws IOException {
String dictStr = dict.toString();
out.writeUTF(dictStr);
}
/*
* 做了个简单的反序列化
* */
@Override
public void readFields(DataInput in) throws IOException {
String dictStr = in.readUTF();
String[] tokens = dictStr.replace("{", "").replace("}", "").replace(" ","").split(",");
for(int i=0;i<tokens.length;i++) {
String[] strings = tokens[i].split("=");
if(strings.length==2) {
dict.put(strings[0], Long.parseLong(strings[1]));
}
}
}
}
@Override
public Writable newBuffer() {
return new SumBuffer();
}
@Override
public void iterate(Writable buffer, Writable[] args) throws UDFException {
SumBuffer iterateDictBuffer = (SumBuffer) buffer;
String content;
if (args[0] instanceof NullWritable) {
content = "Null";
} else {
content = args[0].toString();
}
Long count = iterateDictBuffer.dict.containsKey(content) ? iterateDictBuffer.dict.get(content) : 0L;
iterateDictBuffer.dict.put(content, count + 1);
}
@Override
public void merge(Writable buffer, Writable partial) throws UDFException {
SumBuffer buf = (SumBuffer) buffer;
SumBuffer p = (SumBuffer) partial;
for (Entry<String, Long> entry : p.dict.entrySet()) {
Long count = buf.dict.containsKey(entry.getKey()) ? buf.dict.get(entry.getKey()) + entry.getValue() : entry.getValue();
buf.dict.put(entry.getKey(), count);
}
}
@Override
public Writable terminate(Writable buffer) throws UDFException {
SumBuffer buf = (SumBuffer) buffer;
StringBuilder sb = new StringBuilder();
for (Entry<String, Long> entry : buf.dict.entrySet()) {
sb.append(entry.getKey()).append(rd).append(entry.getValue()).append(fd);
}
Text resault = new Text();
resault.set(sb.substring(0,sb.length()-1));
return resault;
}
}
因为逻辑不复杂,所以也没有添加更多的注释。可以看到用一个Map来存放中间数据,并用toString来做序列化,然后写了段简单的代码进行反序列化。到了terminate后,拼成需要的结果再返回。
打包后,注册一下函数并测试一下结果:
odps@ >add jar D:\\mysum.jar -f;
OK: Resource 'mysum.jar' have been updated.
odps@ >create function mysum as com.aliyun.odps.udaf.MySum using mysum.jar;
Success: Function 'mysum' have been created.
odps@ >select mysum(str) from testUDAF;
+-----+
| _c0 |
+-----+
| a:3,b:1,c:4 |
+-----+
排查思路
可以看到,这里c的值不知道为什么变成了4,这个是在本地没有发现的问题。还好我们的数据量比较小,所以定位起来比较方便。目前的思路是,我们已经明确输入的数据是什么,也知道我们期望的结果是什么。那么我们首先需要知道,在中间数据的一步步流转的过程中,从哪里开始和我们预期的不一样。定位到是哪里开始数据和预期不符合后,再结合上下文的代码逻辑,定位到问题的原因。
首先我们给代码加上一些异常打印,看看流转过程中的数据分别是什么。通过System.err.println,我们把我们想要的信息打印到stderr里。
@Override
public void iterate(Writable buffer, Writable[] args) throws UDFException {
SumBuffer iterateDictBuffer = (SumBuffer) buffer;
String content;
if (args[0] instanceof NullWritable) {
content = "Null";
} else {
content = args[0].toString();
}
Long count = iterateDictBuffer.dict.containsKey(content) ? iterateDictBuffer.dict.get(content) : 0L;
System.err.println("input in iterate:" + content+"\tdict:"+iterateDictBuffer.dict); //拿到原始的输入和当前的状态
iterateDictBuffer.dict.put(content, count + 1);
System.err.println("output in iterate:" + iterateDictBuffer.dict); //打印iterate输出的内容
}
@Override
public void merge(Writable buffer, Writable partial) throws UDFException {
SumBuffer buf = (SumBuffer) buffer;
SumBuffer p = (SumBuffer) partial;
System.err.println("buffer in merge:" + buf.dict); //打印merge里的buffer的内容
System.err.println("partial in merge:" + p.dict); //打印merge里的partial的内容
for (Entry<String, Long> entry : p.dict.entrySet()) {
Long count = buf.dict.containsKey(entry.getKey()) ? buf.dict.get(entry.getKey()) + entry.getValue() : entry.getValue();
buf.dict.put(entry.getKey(), count);
}
System.err.println("output in merge:" + buf.dict); //打印merge里的输出的内容
}
@Override
public Writable terminate(Writable buffer) throws UDFException {
SumBuffer buf = (SumBuffer) buffer;
System.err.println("output in terminate:" + buf.dict); //打印terminate里的输入的内容
StringBuilder sb = new StringBuilder();
for (Entry<String, Long> entry : buf.dict.entrySet()) {
sb.append(entry.getKey()).append(rd).append(entry.getValue()).append(fd);
}
System.err.println(sb.substring(0,sb.length()-1)); //打印terminate里的输出的内容
Text resault = new Text();
resault.set(sb.substring(0,sb.length()-1));
return resault;
}
先打印了这么几个方法里。这样打印的思路主要是,看看每次调用的时候的数据输入输出是什么。从而定位到是从哪里开始出现的问题。
打包,替换掉jar包,然后重新调用一下函数,可以看到
odps@ >add jar D:\\mysum.jar -f;
OK: Resource 'mysum.jar' have been updated.
odps@ >select mysum(str) from testUDAF;
+-----+
| _c0 |
+-----+
| a:3,b:1,c:4 |
+-----+
结果数据是不变的,但是我们可以看下日志。打开里面的logview,可以看到:
里面的日志,2个Map里的日志分别是:
Heap Size: 1024M
input in iterate:a dict:{}
output in iterate:{a=1}
input in iterate:c dict:{a=1}
output in iterate:{a=1, c=1}
input in iterate:c dict:{a=1, c=1}
output in iterate:{a=1, c=2}
Heap Size: 1024M
input in iterate:a dict:{}
output in iterate:{a=1}
input in iterate:a dict:{a=1}
output in iterate:{a=2}
input in iterate:b dict:{a=2}
output in iterate:{a=2, b=1}
看到都是对的,然后看下Reduce里的结果:
Heap Size: 1024M
buffer in merge:{}
partial in merge:{a=1, c=2}
output in merge:{a=1, c=2}
buffer in merge:{a=1, c=2}
partial in merge:{a=2, b=1, c=2}
output in merge:{a=3, b=1, c=4}
output in terminate:{a=3, b=1, c=4}
a:3,b:1,c:4
看一下,partial in merge:{a=2, b=1, c=2} 这条数据不符合预期。照道理说,我们前面输出的是output in iterate:{a=2, b=1},怎么到这里就变成了{a=2, b=1, c=2}了呢。
这种的变化,是在多个worker之间进行传递的时候,我们做了序列号和反序列化,于是我们在这里又打了一些日志:
@Override
public void write(DataOutput out) throws IOException {
String dictStr = dict.toString();
out.writeUTF(dictStr);
System.err.println("dict in write:" + dictStr); //打印序列化输出
}
/*
* 做了个简单的反序列化
* */
@Override
public void readFields(DataInput in) throws IOException {
String dictStr = in.readUTF();
System.err.println("dictStr in readFields:" + dictStr); //打印反序列化输出
String[] tokens = dictStr.replace("{", "").replace("}", "").replace(" ","").split(",");
for(int i=0;i<tokens.length;i++) {
String[] strings = tokens[i].split("=");
if(strings.length==2) {
dict.put(strings[0], Long.parseLong(strings[1]));
}
}
System.err.println("dict in readFields:" + dict); //打印反序列化输出
}
重新打包跑一次,这次看到的日志是这样:
--map阶段:
dict in write:{a=1, c=2}
dict in write:{a=2, b=1}
--reduce阶段:
dictStr in readFields:{a=1, c=2}
dict in readFields:{a=1, c=2}
dictStr in readFields:{a=2, b=1}
dict in readFields:{a=2, b=1, c=2}
果然反序列化的时候输出的结果就有问题了。但是从这里还没有明确的证据说明是哪行代码出的问题。看到dict输出的结果不符合预期,我们先看看输入的时候是什么。于是再加一行日志:
@Override
public void readFields(DataInput in) throws IOException {
String dictStr = in.readUTF();
System.err.println("dictStr in readFields:" + dictStr); //打印反序列化输出
System.err.println("dict in readFields before put:" + dict); //打印反序列化输出
String[] tokens = dictStr.replace("{", "").replace("}", "").replace(" ","").split(",");
for(int i=0;i<tokens.length;i++) {
String[] strings = tokens[i].split("=");
if(strings.length==2) {
dict.put(strings[0], Long.parseLong(strings[1]));
}
}
System.err.println("dict in readFields:" + dict); //打印反序列化输出
}
看到这会的reduce阶段日志
dictStr in readFields:{a=1, c=2}
dict in readFields before put:{}
dict in readFields:{a=1, c=2}
dictStr in readFields:{a=2, b=1}
dict in readFields before put:{a=1, c=2}
dict in readFields:{a=2, b=1, c=2}
这下真相大白了。我们第二次调用readFields序列化{a=2, b=1}这个字符串的时候,发现本来应该为空的dict的内容竟然是上次计算后的结果。实际上,在readFields里,相同worker里的SumBuffer被复用了。这种情况下,为了保证计算的准确性,我们可以自己清空一下dict的内容
@Override
public void readFields(DataInput in) throws IOException {
String dictStr = in.readUTF();
dict = new HashMap<>();
String[] tokens = dictStr.replace("{", "").replace("}", "").replace(" ","").split(",");
for(int i=0;i<tokens.length;i++) {
String[] strings = tokens[i].split("=");
if(strings.length==2) {
dict.put(strings[0], Long.parseLong(strings[1]));
}
}
}
}
这下终于对了
odps@ >add jar D:\\mysum.jar -f;
OK: Resource 'mysum.jar' have been updated.
odps@ >select mysum(str) from testUDAF;
+-----+
| _c0 |
+-----+
| a:3,b:1,c:2 |
+-----+
总结
代码还有其他更多可以优化的地方。不过这次为了能简单说明调试的过程,简化代码逻辑,就没在这方面再多下功夫。实际的业务代码里还需要考虑到性能和异常捕捉等问题。
System.err.println这个办法虽然很笨,但是很有效,不是吗?