创建项目的话,和之前写UDF函数的流程是一样的,如果不懂的,看这篇文章:
HIVE-编写UDF函数
在包udf中再创建一个MyUDTF类,继承UDTF函数,实现接口:
package com.atguigu.udf;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
public class MyUDTF extends GenericUDTF {
public void process(Object[] args) throws HiveException {
}
public void close() throws HiveException {
}
}
按提示所实现的方法有两个,其实不够,还要把初始化方法给他加上。
package com.atguigu.udf;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
/**
* 输入数据,hello,atguigu,hive
* 输出数据:
* hello
* atguigu
* hive
*/
public class MyUDTF extends GenericUDTF {
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
return super.initialize(argOIs);
}
//处理输入数据
public void process(Object[] args) throws HiveException {
}
//收尾方法
public void close() throws HiveException {
}
}
完成的业务代码:
package com.atguigu.udf;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import java.util.ArrayList;
import java.util.List;
/**
* 输入数据,hello,atguigu,hive
* 输出数据:
* hello
* atguigu
* hive
*/
public class MyUDTF extends GenericUDTF {
//输出数据的集合
private ArrayList<String> outPutList= new ArrayList<String>();
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
//输出数据的默认别名,可以被别名覆盖
List<String> fieldNames=new ArrayList<String>();
fieldNames.add("word");
//输出数据的类型
List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
//最终返回值
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}
//处理输入数据:hello,atguigu,hive
public void process(Object[] args) throws HiveException {
//1.取出输入数据
String input=args[0].toString();
//2.按照","分割字符串
String[] words = input.split(",");
//3.遍历数据写出
for (String word : words) {
//清空集合
outPutList.clear();
//将数据放入集合
outPutList.add(word);
//输出数据
forward(outPutList);
}
}
//收尾方法
public void close() throws HiveException {
}
}
进行打包,重新拖入到hive的lib目录下:
添加到类路径:
hive (default)> add jar /opt/module/hive/lib/hive-demo-1.0-SNAPSHOT.jar
> ;
Added [/opt/module/hive/lib/hive-demo-1.0-SNAPSHOT.jar] to class path
Added resources: [/opt/module/hive/lib/hive-demo-1.0-SNAPSHOT.jar]
创建函数:
hive (default)> create temporary function myudtf as "com.atguigu.udf.MyUDTF";
OK
Time taken: 0.733 seconds
注意:如果出现问题的话,怎么试都没法成功,就去重新启动hive
input表的数据:
hello,spark
hello,hive
hello,zhoujielun,linjunjie,dengziqi
hello,hadoop,mapreduce,yarn,common
创建表:
hive (default)> create table input(words string) ;
加载数据进去:
load data local inpath '/opt/module/datas/input.txt'
into table input;
查看数据:
hive (default)> select * from input;
OK
input.words
hello,spark
hello,hive
hello,zhoujielun,linjunjie,dengziqi
hello,hadoop,mapreduce,yarn,common
Time taken: 1.386 seconds, Fetched: 4 row(s)
使用UDTF函数:
hive (default)> select my_udtf(words) from input;
OK
word
hello
spark
hello
hive
hello
zhoujielun
linjunjie
dengziqi
hello
hadoop
mapreduce
yarn
common
Time taken: 3.788 seconds, Fetched: 13 row(s)
实现了分割逗号,一进多出的效果。