很长时间以来一直写hive,嵌套脚本、偶尔写UDF. 最近用Hive的dynamic partition和多路插入做一些事情,很遗憾的结果是非常不稳定,有时能成功,有时失败。(可能是因为hive版本的问题,查了一些资料也没查的太清楚,因为服务器不能随便动,就想用mapreduce的多路输出吧)。
1.首先这个多路插入也是用的hive的表,表的输出是SequenceFile格式。
按说sequencefile格式输入,取决于内部的Key/value格式。
在驱动类里需要添加
Job job=new Job(getConf(),"dsp_data");
job.setInputFormatClass(SequenceFileInputFormat.class);
SequenceFileInputFormat.addInputPath(job, input1);
SequenceFileInputFormat.addInputPath(job, input2);
Mapper函数的输入:
public class * extends Mapper<BytesWritable , Text, TextPair,TextPair>{}
2.MultipleOutPuts使用:
private static Text value = new Text();
private MultipleOutputs<Text, Text> mos;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
mos = new MultipleOutputs<Text,Text>(context);
}
Iterator<TextPair> iter = values.iterator();
TextPair middle=iter.next();
if (! middle.getSecond().equals("0")) return;
// String[] middle_fields=middle.getFirst().toString().split("\t",-1);
while(iter.hasNext()){
TextPair xx=iter.next();
if (xx.getSecond().toString().equals("0")) continue;
String[] xx_fields=xx.getFirst().toString().split("\t");
if(xx_fields.length<3) continue;
String custom_id=xx_fields[xx_fields.length-1];
value.set(xx_fields[0]+"\t"+xx_fields[1]+"\t"+middle.getFirst().toString());
mos.write(key.getFirst(), value, custom_id+"/");
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
super.cleanup(context);
mos.close();
}
3.上面的语句有点问题。
在于middle的使用,因为reduce中iterable values使用的对象都是反序列化出来的,而指定的具体的类都是由一个初始化的对象,不断更新里面的字段实现的。
上面的例子,就造成了middle指向的对象没变,但是实际对象中的内容已经被更新成了新序列化的结果,得不到middle最初赋值地方的值。
解决办法有两个:将middle中,需要的数据部分事先取出来。 另外一个实现TextPair的clone或者实现一个get方法,获得一个新对象来解决。