1. Hadoop Streaming方式运行程序
Hadoop Streaming可以运行除JAVA语言以外,其它的语言编写的程序。其启动脚本示例如下:
1 #!/bin/sh 2 3 # 参数合法性判断 4 5 if [ $# != 7 ]; then 6 echo "./bin/avp_platform_startup.sh [USER_NAME] [INPUT_PAT] [OUTPUT_PAT] [MAP_TASKS] [REDUCE_TASKS] [CLASS_ID] [CODE_TYPE]" 7 exit 8 fi 9 10 # GLOBAL VARS 11 USER_NAME=$1 12 INPUT_PAT=$2 13 OUTPUT_PAT=$3 14 MAP_TASKS=$4 15 REDUCE_TASKS=$5 16 CLASS_ID=$6 17 CODE_TYPE=$7 18 19 # Hadoop Start Area 20 /home/work/software/hadoop/bin/hadoop jar /home/work/software/hadoop/contrib/streaming/hadoop-streaming.jar 21 -input /home/$USER/$USER_NAME/output/webpage/$INPUT_PAT/ 22 -output /home/$USER/$USER_NAME/output/avp/avp-extract-$USER_NAME\_nlp_$OUTPUT_PAT-`date +%F-%H-%M-%S` 23 -mapper "avp_extract_mapper.sh . . $CODE_TYPE" 24 -reducer "avp_extract_reducer.sh" 25 -file ./script/avp_extract_mapper.sh 26 -file ./script/avp_extract_reducer.sh 27 -file ./script/extract_tools/tidy_page.py 28 -file ./script/decode.pl 29 -file ./script/extract_tools/format.py 30 -file ./script/extract_tools/extract_tool.py 31 -file ./class/$CLASS_ID/site.list 32 -file ./class/$CLASS_ID/*.conf 33 -jobconf mapred.job.name=$USER_NAME\_avp_platform_extract_tools_$OUTPUT_PAT 34 -jobconf mapred.map.tasks=$MAP_TASKS 35 -jobconf mapred.reduce.tasks=$REDUCE_TASKS
2. JAVA原生接口编写HADOOP程序
第一步,需要将用JAVA编写的代码打包成JAR包。
MaxTemperatureMapper.java
1 package oldapi; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapred.MapReduceBase; 9 import org.apache.hadoop.mapred.Mapper; 10 import org.apache.hadoop.mapred.OutputCollector; 11 import org.apache.hadoop.mapred.Reporter; 12 13 public class MaxTemperatureMapper extends MapReduceBase 14 implements Mapper<LongWritable, Text, Text, IntWritable> { 15 16 private static final int MISSING = 9999; 17 18 public void map(LongWritable key, Text value, 19 OutputCollector<Text, IntWritable> output, Reporter reporter) 20 throws IOException { 21 22 String line = value.toString(); 23 String year = line.substring(15, 19); 24 int airTemperature; 25 if (line.charAt(87) == ‘+‘) { // parseInt doesn‘t like leading plus signs 26 airTemperature = Integer.parseInt(line.substring(88, 92)); 27 } else { 28 airTemperature = Integer.parseInt(line.substring(87, 92)); 29 } 30 String quality = line.substring(92, 93); 31 if (airTemperature != MISSING && quality.matches("[01459]")) { 32 output.collect(new Text(year), new IntWritable(airTemperature)); 33 } 34 } 35 }
MaxTemperatureReducer.java
1 package oldapi; 2 3 import java.io.IOException; 4 import java.util.Iterator; 5 6 import org.apache.hadoop.io.IntWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapred.MapReduceBase; 9 import org.apache.hadoop.mapred.OutputCollector; 10 import org.apache.hadoop.mapred.Reducer; 11 import org.apache.hadoop.mapred.Reporter; 12 13 public class MaxTemperatureReducer extends MapReduceBase 14 implements Reducer<Text, IntWritable, Text, IntWritable> { 15 16 public void reduce(Text key, Iterator<IntWritable> values, 17 OutputCollector<Text, IntWritable> output, Reporter reporter) 18 throws IOException { 19 20 int maxValue = Integer.MIN_VALUE; 21 while (values.hasNext()) { 22 maxValue = Math.max(maxValue, values.next().get()); 23 } 24 output.collect(key, new IntWritable(maxValue)); 25 } 26 }
MaxTemperature.java
1 package oldapi; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.io.IntWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapred.FileInputFormat; 9 import org.apache.hadoop.mapred.FileOutputFormat; 10 import org.apache.hadoop.mapred.JobClient; 11 import org.apache.hadoop.mapred.JobConf; 12 import org.apache.hadoop.util.Tool; 13 import org.apache.hadoop.util.ToolRunner; 14 import org.apache.hadoop.conf.Configured; 15 import oldapi.MaxTemperatureMapper; 16 import oldapi.MaxTemperatureReducer; 17 18 public class MaxTemperature extends Configured implements Tool{ 19 20 public int run(String[] args) throws IOException { 21 if (args.length != 2) { 22 System.err.println("Usage: MaxTemperature <input path> <output path>"); 23 System.exit(-1); 24 } 25 26 JobConf conf = new JobConf(MaxTemperature.class); 27 conf.setJobName("Max temperature"); 28 29 FileInputFormat.addInputPath(conf, new Path(args[0])); 30 FileOutputFormat.setOutputPath(conf, new Path(args[1])); 31 32 conf.setMapperClass(MaxTemperatureMapper.class); 33 conf.setCombinerClass(MaxTemperatureReducer.class); 34 conf.setReducerClass(MaxTemperatureReducer.class); 35 36 conf.setOutputKeyClass(Text.class); 37 conf.setOutputValueClass(IntWritable.class); 38 39 JobClient.runJob(conf); 40 return 0; 41 } 42 43 public static void main(String[] args) throws Exception { 44 int exitCode = ToolRunner.run(new MaxTemperature(),args); 45 System.exit(exitCode); 46 } 47 48 }
打包命令:(即会生成MaxTemperature.jar文件,-C参数指定的文件夹目录结构:MaxTemperature/oldapi/*.java)
1 jar cvf MaxTemperature.jar -C MaxTemperature/ .
第二步,启动运行。
1 sudo -u sniper hadoop jar MaxTemperature.jar oldapi.MaxTemperature /home/sniper/zhuliang/sample.txt /home/sniper/zhuliang/max-temp-output
附:输入文件样例
1 0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999 2 0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999 3 0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999 4 0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999 5 0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999
Apache Hadoop 运行分布式程序方法总结(Streaming方式与原生JAVA接口),布布扣,bubuko.com