/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.tdxx.hadoop.example;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* 从input中提取与表达式相符的单词并计算词频
*
* 继承自配置基类Configured,并扩展接口Tool
* Configured类中有一个变量conf用于存储配置文件
* Tool中只有一个方法需要实现
* int run(String [] args)用于运行输入参数
* hadoop jar Grep.jar /user/hadoop/20130704/grep.txt /user/hadoop/output/ 'aaa.*'
* hadoop jar Grep.jar /user/hadoop/20130704/grep.txt /user/hadoop/output/ '[a-z.]+'
*/
public class Grep extends Configured implements Tool {
// singleton
private Grep() {
}
public int run(String[] args) throws Exception {
if (args.length < 3) {
System.out.println("Grep <inDir> <outDir> <regex> [<group>]");
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
Path tempDir = new Path("grep-temp-"
+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
//创建job
JobConf grepJob = new JobConf(getConf(), Grep.class);
try {
//job命名
grepJob.setJobName("grep-search");
//设置job的输入路径
FileInputFormat.setInputPaths(grepJob, args[0]);
//设置Mapper类
grepJob.setMapperClass(RegexMapper.class);
grepJob.set("mapred.mapper.regex", args[2]);
if (args.length == 4)
grepJob.set("mapred.mapper.regex.group", args[3]);
//设置Combiner类
grepJob.setCombinerClass(LongSumReducer.class);
//设置Reducer类
grepJob.setReducerClass(LongSumReducer.class);
//设置输出路径
FileOutputFormat.setOutputPath(grepJob, tempDir);
//设置输出格式
grepJob.setOutputFormat(SequenceFileOutputFormat.class);
//设置输出键的类
grepJob.setOutputKeyClass(Text.class);
//设置输出值的类
grepJob.setOutputValueClass(LongWritable.class);
//运行
JobClient.runJob(grepJob);
JobConf sortJob = new JobConf(getConf(), Grep.class);
sortJob.setJobName("grep-sort");
FileInputFormat.setInputPaths(sortJob, tempDir);
sortJob.setInputFormat(SequenceFileInputFormat.class);
sortJob.setMapperClass(InverseMapper.class);
sortJob.setNumReduceTasks(1); // write a single file
FileOutputFormat.setOutputPath(sortJob, new Path(args[1]));
sortJob.setOutputKeyComparatorClass // sort by decreasing freq
(LongWritable.DecreasingComparator.class);
JobClient.runJob(sortJob);
} finally {
FileSystem.get(grepJob).delete(tempDir, true);
}
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new Grep(), args);
System.exit(res);
}
}