Apache Hadoop 运行分布式程序方法总结(Streaming方式与原生JAVA接口)

1. Hadoop Streaming方式运行程序

Hadoop Streaming可以运行除JAVA语言以外,其它的语言编写的程序。其启动脚本示例如下:

Apache Hadoop 运行分布式程序方法总结(Streaming方式与原生JAVA接口)
 1 #!/bin/sh
 2 
 3 # 参数合法性判断
 4 
 5 if [ $# != 7 ]; then
 6     echo "./bin/avp_platform_startup.sh [USER_NAME] [INPUT_PAT] [OUTPUT_PAT] [MAP_TASKS] [REDUCE_TASKS] [CLASS_ID] [CODE_TYPE]"
 7     exit
 8 fi
 9 
10 # GLOBAL VARS
11 USER_NAME=$1
12 INPUT_PAT=$2
13 OUTPUT_PAT=$3
14 MAP_TASKS=$4
15 REDUCE_TASKS=$5
16 CLASS_ID=$6
17 CODE_TYPE=$7
18 
19 # Hadoop Start Area
20 /home/work/software/hadoop/bin/hadoop jar /home/work/software/hadoop/contrib/streaming/hadoop-streaming.jar 21     -input /home/$USER/$USER_NAME/output/webpage/$INPUT_PAT/ 22     -output /home/$USER/$USER_NAME/output/avp/avp-extract-$USER_NAME\_nlp_$OUTPUT_PAT-`date +%F-%H-%M-%S` 23     -mapper "avp_extract_mapper.sh . . $CODE_TYPE" 24     -reducer "avp_extract_reducer.sh" 25     -file ./script/avp_extract_mapper.sh 26     -file ./script/avp_extract_reducer.sh 27     -file ./script/extract_tools/tidy_page.py 28     -file ./script/decode.pl 29     -file ./script/extract_tools/format.py 30     -file ./script/extract_tools/extract_tool.py 31     -file ./class/$CLASS_ID/site.list 32     -file ./class/$CLASS_ID/*.conf 33     -jobconf mapred.job.name=$USER_NAME\_avp_platform_extract_tools_$OUTPUT_PAT 34     -jobconf mapred.map.tasks=$MAP_TASKS 35     -jobconf mapred.reduce.tasks=$REDUCE_TASKS
Apache Hadoop 运行分布式程序方法总结(Streaming方式与原生JAVA接口)

2. JAVA原生接口编写HADOOP程序

第一步,需要将用JAVA编写的代码打包成JAR包。

MaxTemperatureMapper.java

Apache Hadoop 运行分布式程序方法总结(Streaming方式与原生JAVA接口)
 1 package oldapi;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.io.IntWritable;
 6 import org.apache.hadoop.io.LongWritable;
 7 import org.apache.hadoop.io.Text;
 8 import org.apache.hadoop.mapred.MapReduceBase;
 9 import org.apache.hadoop.mapred.Mapper;
10 import org.apache.hadoop.mapred.OutputCollector;
11 import org.apache.hadoop.mapred.Reporter;
12 
13 public class MaxTemperatureMapper extends MapReduceBase
14   implements Mapper<LongWritable, Text, Text, IntWritable> {
15 
16   private static final int MISSING = 9999;
17   
18   public void map(LongWritable key, Text value,
19       OutputCollector<Text, IntWritable> output, Reporter reporter)
20       throws IOException {
21     
22     String line = value.toString();
23     String year = line.substring(15, 19);
24     int airTemperature;
25     if (line.charAt(87) == ‘+‘) { // parseInt doesn‘t like leading plus signs
26       airTemperature = Integer.parseInt(line.substring(88, 92));
27     } else {
28       airTemperature = Integer.parseInt(line.substring(87, 92));
29     }
30     String quality = line.substring(92, 93);
31     if (airTemperature != MISSING && quality.matches("[01459]")) {
32       output.collect(new Text(year), new IntWritable(airTemperature));
33     }
34   }
35 }
Apache Hadoop 运行分布式程序方法总结(Streaming方式与原生JAVA接口)

MaxTemperatureReducer.java

Apache Hadoop 运行分布式程序方法总结(Streaming方式与原生JAVA接口)
 1 package oldapi;
 2 
 3 import java.io.IOException;
 4 import java.util.Iterator;
 5 
 6 import org.apache.hadoop.io.IntWritable;
 7 import org.apache.hadoop.io.Text;
 8 import org.apache.hadoop.mapred.MapReduceBase;
 9 import org.apache.hadoop.mapred.OutputCollector;
10 import org.apache.hadoop.mapred.Reducer;
11 import org.apache.hadoop.mapred.Reporter;
12 
13 public class MaxTemperatureReducer extends MapReduceBase
14   implements Reducer<Text, IntWritable, Text, IntWritable> {
15 
16   public void reduce(Text key, Iterator<IntWritable> values,
17       OutputCollector<Text, IntWritable> output, Reporter reporter)
18       throws IOException {
19     
20     int maxValue = Integer.MIN_VALUE;
21     while (values.hasNext()) {
22       maxValue = Math.max(maxValue, values.next().get());
23     }
24     output.collect(key, new IntWritable(maxValue));
25   }
26 }
Apache Hadoop 运行分布式程序方法总结(Streaming方式与原生JAVA接口)

MaxTemperature.java

Apache Hadoop 运行分布式程序方法总结(Streaming方式与原生JAVA接口)
 1 package oldapi;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.fs.Path;
 6 import org.apache.hadoop.io.IntWritable;
 7 import org.apache.hadoop.io.Text;
 8 import org.apache.hadoop.mapred.FileInputFormat;
 9 import org.apache.hadoop.mapred.FileOutputFormat;
10 import org.apache.hadoop.mapred.JobClient;
11 import org.apache.hadoop.mapred.JobConf;
12 import org.apache.hadoop.util.Tool;
13 import org.apache.hadoop.util.ToolRunner;
14 import org.apache.hadoop.conf.Configured;
15 import oldapi.MaxTemperatureMapper;
16 import oldapi.MaxTemperatureReducer;
17 
18 public class MaxTemperature extends Configured implements Tool{
19 
20   public int run(String[] args) throws IOException {
21     if (args.length != 2) {
22       System.err.println("Usage: MaxTemperature <input path> <output path>");
23       System.exit(-1);
24     }
25     
26     JobConf conf = new JobConf(MaxTemperature.class);
27     conf.setJobName("Max temperature");
28 
29     FileInputFormat.addInputPath(conf, new Path(args[0]));
30     FileOutputFormat.setOutputPath(conf, new Path(args[1]));
31     
32     conf.setMapperClass(MaxTemperatureMapper.class);
33     conf.setCombinerClass(MaxTemperatureReducer.class);
34     conf.setReducerClass(MaxTemperatureReducer.class);
35 
36     conf.setOutputKeyClass(Text.class);
37     conf.setOutputValueClass(IntWritable.class);
38 
39     JobClient.runJob(conf);
40     return 0;
41   }
42 
43   public static void main(String[] args) throws Exception {
44       int exitCode = ToolRunner.run(new MaxTemperature(),args);
45       System.exit(exitCode);
46   }
47 
48 }
Apache Hadoop 运行分布式程序方法总结(Streaming方式与原生JAVA接口)

打包命令:(即会生成MaxTemperature.jar文件,-C参数指定的文件夹目录结构:MaxTemperature/oldapi/*.java)

1 jar cvf MaxTemperature.jar -C MaxTemperature/ .

第二步,启动运行。

1 sudo -u sniper hadoop jar MaxTemperature.jar oldapi.MaxTemperature /home/sniper/zhuliang/sample.txt /home/sniper/zhuliang/max-temp-output

附:输入文件样例

Apache Hadoop 运行分布式程序方法总结(Streaming方式与原生JAVA接口)
1 0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999
2 0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999
3 0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999
4 0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999
5 0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999
Apache Hadoop 运行分布式程序方法总结(Streaming方式与原生JAVA接口)

Apache Hadoop 运行分布式程序方法总结(Streaming方式与原生JAVA接口),布布扣,bubuko.com

Apache Hadoop 运行分布式程序方法总结(Streaming方式与原生JAVA接口)

上一篇:Vue vue-cli webpack 多入口


下一篇:第149天学习打卡(Kubernetes 部署nginx 部署Dashboard)