通过之前的操作,
http://www.cnblogs.com/wenbronk/p/6636926.html
http://www.cnblogs.com/wenbronk/p/6659481.html
hadoop-HA的集群已经搭建完成了, 需要写个小程序来认识下hadoop了
统计文本文件中, 每个单词出现的次数
1, Eclipse下新建Java-project
2, 新建lib文件, 导入jar包, 并buildpath
hadoop-2.5.\share\hadoop\common 所有jar,
hadoop-2.5.\share\hadoop\common\lib 所有jar, hadoop-2.5.\share\hadoop\hdfs 所有jar
hadoop-2.5.\share\hadoop\mapreduce 所有jar
hadoop-2.5.\share\hadoop\yarn 所有jar
3, Mapper类: WordCountMapper.java
package com.wenbronk.mapreduce; import java.io.IOException; import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper; /**
* 测试mapreduce, 计算单词出现的次数
* @author wenbronk
* KEYIN: split的键, 行坐在的下标
* VALUEIN: split的值, 行值
* KEYOUT: 需求, 输出给reduce
* VALUEOUT: 需求, 输出给reduce
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { /**
* 重写map方法, 循环调用
* 从split中读取一行调用一次, 以行所在下标为key, 行内容为value
*/
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException { // text 转string, toString(), 使用空格分隔为单词数组
String[] words = StringUtils.split(value.toString(), ' ');
for (String word : words) {
// 键值对输出, 输出给reduce
context.write(new Text(word), new IntWritable());
} } }
4, Reduce类, WordCountReduce.java
package com.wenbronk.mapreduce; import java.io.IOException; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; /**
* shuffling 后传给 reduce
* @author wenbronk
* KEYIN: mapper的输出
* VALUEIN: mapper的输出
*/
public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{ /**
* 循环调用
* 每组调用一次, key相同, value可能多个, 使用迭代器
*/
@Override
protected void reduce(Text arg0, Iterable<IntWritable> arg1,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// 对值进行累加
int sum = ;
// 使用迭代器
for (IntWritable value : arg1) {
sum += value.get();
}
// 使用context输出
context.write(arg0 , new IntWritable(sum));
} }
5, 然后是具体的执行类: RunMapReduce.java
package com.wenbronk.mapreduce; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /**
* 执行mapreduce
* 统计单词出新的次数
* @author wenbr
*
*/
public class RunMapReduce { public static void main(String[] args) throws Exception {
// 初始化时加载src或classpath下所有的hadoop配置文件
Configuration configuration = new Configuration(); // 得到执行的任务
Job job = Job.getInstance(config); // 入口类
job.setJarByClass(RunMapReduce.class); // job名字
job.setJobName("wordCount"); // job执行是map执行的类
job.setMapperClass(WordCountMapper.class); // job执行的reduce类
job.setReducerClass(WordCountReduce.class); // job输出的键类型
job.setMapOutputKeyClass(Text.class); // job输出的value类型
job.setMapOutputValueClass(IntWritable.class); //**** 使用插件上传data.txt到hdfs/root/usr/data.txt // 使用文件
FileInputFormat.addInputPath(job, new Path("/root/usr/")); // 使用一个不存在的目录进行
Path path = new Path("/root/usr/output");
// 如果存在删除
FileSystem fs = FileSystem.get(configuration);
if (fs.exists(path)) {
fs.delete(path, true);
} // 输出
FileOutputFormat.setOutputPath(job, path); boolean forCompletion = job.waitForCompletion(true); if (forCompletion) {
System.out.println("success");
}
} }
所有的类编写好了, 接下来是上传文件
6, 使用eclipse插件上传data.txt到hadoop目录 /usr/data.txt
我是用的插件为:
7, 运行
这儿使用直接发布到服务器运行的方式
eclipse打包项目成jar包(只需要源码即可), 然后上传到服务器目录下, 使用hadoop命令执行
格式: hadoop jar jar路径 类全限定名
hadoop jar wc.jar com.wenbronk.mapreduce.RunMapReduce
之后在hadoop的目录下就可以看到统计后输出的文件了