1.udtf介绍及编写
1.1.介绍
HIVE中udtf可以将一行转成一行多列,也可以将一行转成多行多列,使用频率较高。本篇文章通过实际案例剖析udtf的编写及使用方法和原理。阅读本篇文章前请先阅读UDF编写
测试数据
drop table if exists test; create table test ( ind int, col string, col1 string ) ; insert into test values (1,'a,b,c','1,2'); insert into test values (2,'j,k',null); insert into test values (3,null,null) ;
对第一行需要输出如下结果:
Ind | Key | Value |
---|---|---|
1 | a | 1 |
1 | b | 2 |
1 | c | Null |
其它行都要输出类似数据,如果输入数据为null,则没输出。
1.2udtf编写
编写UDTF(User-Defined Table-Generating Functions),需要继承GenericUDTF类,类中部分代码如下:
/** * A Generic User-defined Table Generating Function (UDTF) * * Generates a variable number of output rows for a single input row. Useful for * explode(array)... */ public abstract class GenericUDTF { public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException { List<? extends StructField> inputFields = argOIs.getAllStructFieldRefs(); ObjectInspector[] udtfInputOIs = new ObjectInspector[inputFields.size()]; for (int i = 0; i < inputFields.size(); i++) { udtfInputOIs[i] = inputFields.get(i).getFieldObjectInspector(); } return initialize(udtfInputOIs); } /** * Give a set of arguments for the UDTF to process. * * @param args * object array of arguments */ public abstract void process(Object[] args) throws HiveException; /** * Called to notify the UDTF that there are no more rows to process. * Clean up code or additional forward() calls can be made here. */ public abstract void close() throws HiveException; }
继承GenericUDTF需要实现以上方法,其中initialize方法和UDF中类似,主要是判断输入类型并确定返回的字段类型。process方法对udft函数输入的每一样进行操作,通过调用forward方法返回一行或多行数据。close方法在process调用结束后调用,用于进行其它一些额外操作,只执行一次。
package com.practice.hive.udtf; import java.util.List; import com.google.common.collect.Lists; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; /** * @author liufeifei * @date 2018/06/20 */ public class ArrToMapUDTF extends GenericUDTF { private String[] obj = new String[2]; /** * 返回类型为 String,string * * @param argOIs * @return * @throws UDFArgumentException */ @Override public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException { List<String> colName = Lists.newLinkedList(); colName.add("key"); colName.add("value"); List<ObjectInspector> resType = Lists.newLinkedList(); resType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); resType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); // 返回分别为列名 和 列类型 return ObjectInspectorFactory.getStandardStructObjectInspector(colName, resType); } @Override public void process(Object[] args) throws HiveException { if(args[0] == null) { return; } String arg1 = args[0].toString(); String[] arr1 = arg1.split(","); String[] arr2 = null; if(args[1] != null) { arr2 = args[1].toString().split(","); } for(int i = 0; i < arr1.length ; i++ ) { obj[0] = arr1[i]; if(arr2 != null && arr2.length > i) { obj[1] = arr2[i]; } else { obj[1] = null; } forward(obj); } } @Override public void close() throws HiveException { } }