1.在D盘下创建ceshi.txt文件
2.创建Maven文件
3.在src文件下的java下创建包mapreduce
4.在mapreduce包下创建wordcountMapper类
5.编写wordcountMapper类
package mapreduce; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /* 四个泛型解释: KEYIN:k1的类型 VALUEIN:V1的类型 KEYOUT:K2的类型 VALUEOUT:V2的类型 */ public class wordcountMapper extends Mapper<LongWritable, Text,Text,LongWritable> { //map方法就是将k1和v1转为k2和v2 /* 参数:key:k1 行偏移量 value:v1 每一行的文本数据 context:标识上下文对象 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { Text text=new Text(); LongWritable longWritable = new LongWritable(); //1:将一行的文本数据进行拆分 String[] split=value.toString().split(","); //2:遍历数组,组装k2 和 v2 for (String word : split) { //3:将k2和v2写入上下文 text.set(word); longWritable.set(1); context.write(text,longWritable); } } }
6.在mapreduce下创建wordcountReduce类
7.编写wordcountReduce类
package mapreduce; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /* 四个泛型解释: KEYIN:k2的类型 VALUEIN:V2的类型 KEYOUT:K3的类型 VALUEOUT:V3的类型 */ public class wordcountReduce extends Reducer<Text, LongWritable,Text,LongWritable> { //reduce方法的作用是将新的k2和v2转换为k3和v3,将k3和v3写入上下文中 @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long count=0; //1:遍历集合,将集合中的数字相加,得到v3 for (LongWritable value : values) { count+=value.get(); } //2:将k3和v3写入上下文中 context.write(key,new LongWritable(count)); } }
8.在mapreduce包下创建jobMain主类
9.编写jobMain主类
package mapreduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class jobMain extends Configured implements Tool { //该方法用于指定一个job任务 public int run(String[] args) throws Exception { //1:创建一个job任务对象 Job job = Job.getInstance(super.getConf(), "wordcount"); //2:配置job任务对象 //第一步:指定文件的读取方式和读取路径 job.setInputFormatClass(TextInputFormat.class);//指定输入的类 TextInputFormat.addInputPath(job,new Path("D:\\ceshi.txt"));//指定读取路径 //第二步指定Map阶段的处理方式和数据类型 job.setMapperClass(wordcountMapper.class); job.setMapOutputKeyClass(Text.class);//设置map阶段k2的类型 job.setMapOutputValueClass(LongWritable.class);//设置map阶段v2的类型 //第三,四,五,六采用默认的方式 //第七步指定reduce阶段的处理方式和数据类型 job.setReducerClass(wordcountReduce.class); //设置k3的类型 job.setOutputKeyClass(Text.class); //设置v3的类型 job.setOutputValueClass(LongWritable.class); //第八步,设置输出类型 job.setOutputFormatClass(TextOutputFormat.class); //设置输出的路径 TextOutputFormat.setOutputPath(job,new Path("D:\\mapreduc\\ou1")); boolean bl = job.waitForCompletion(true); return bl ? 0:1; } public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); //启动job任务 int run = ToolRunner.run(configuration, new jobMain(), args); System.exit(run); } }