需求:将表中数据按照name聚合,并且count进行累加
name | count |
---|---|
Jan | 1 |
Jan | 2 |
Feb | 3 |
Feb | 1 |
Mar | 1 |
Mar | 5 |
预期结果:
name | count |
---|---|
Jan | 3 |
Feb | 7 |
Mar | 13 |
使用idea的maxCompute studio新增UDAF
然后自动生成未实现的方法,我的字段name是string,count是bigint
所以@Resolve("string,bigint->bigint")
新增一个class用来存储字段
private String name;
private Long count;
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeLong(count);
}
@Override
public void readFields(DataInput in) throws IOException {
name = in.readUTF();
count = in.readLong();
}
}
这样newBuffer就可以这样写了
public Writable newBuffer() {
return new MyBuffer();
}
还需要一个Map来存储key(name)和value(count),一个long类型的参数存储累加的值
Long old_count = 0L;//存储累加值
private LongWritable ret = new LongWritable();//存储输出值
完整代码参考:
public class UDAFTest extends Aggregator {
private static class MyBuffer implements Writable {
private String name;
private Long count;
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeLong(count);
}
@Override
public void readFields(DataInput in) throws IOException {
name = in.readUTF();
count = in.readLong();
}
}
@Override
public Writable newBuffer() {
return new MyBuffer();
}
Map<String,Long> map = new LinkedHashMap<>();
Long old_count = 0L;
@Override
public void iterate(Writable buffer, Writable[] args) throws UDFException {
String arg = String.valueOf(args[0]);
Long cnt = Long.parseLong(String.valueOf(args[1]));
MyBuffer buf = (MyBuffer) buffer;
if (arg != null) {
if(map.containsKey(arg)){
Long newcnt = map.get(arg);
old_count = cnt+newcnt;
map.put(arg,old_count);
}else {
map.put(arg,old_count+cnt);
}
}
buf.name = arg;
buf.count = map.get(arg);
}
private LongWritable ret = new LongWritable();
@Override
public Writable terminate(Writable arg0) throws UDFException {
MyBuffer buffer = (MyBuffer) arg0;
ret.set(buffer.count);
return ret;
}
@Override
public void merge(Writable buffer, Writable partial) throws UDFException {
MyBuffer buf = (MyBuffer) buffer;
MyBuffer p = (MyBuffer) partial;
buf.name = p.name;
buf.count = p.count;
}
}
然后通过maxCompute studio发布下
发布名为test20191119,这样就可以在Dataworks中调用了。
其中原表数据: