往往从大数据开始,第一个就是手写MR
MR是map-reduce,是hadoop的核心的组件之一,并发执行,主要来处理hdfs分布式文件系统
介绍自己手写的wordcount,然后再进行原理解释
1.下载hadoo安装到windows本地
地址 https://archive.apache.org/dist/hadoop/core/hadoop-2.7.2/hadoop-2.7.2.tar.gz
2. 解压之后进行设置环境变量
新建 HADOOP_HOME D:\hadoop-2.7.2
Path中增加 %HADOOP_HOME%\bin 和 %HADOOP_HOME%\sbin
3.安装好JDK和IDEA 社区版
4. 建立maven 项目
5.引入几个依赖包
<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.2 </version> </dependency> </dependencies>
6.map,reducer drvier写好以后会出现空指针异常
原因是hadoop.dll文件和winutils.exe文件缺失了。解决步骤为:
1,下载这两个文件,下载地址:http://download.csdn.net/download/fly_leopard/9503059
2,解压之后,把hadoop.dll拷贝到C:\Windows\System32下面
3,创建环境变量HADOOP_HOME,然后把winutils.exe文件拷贝到${HADOOP_HOME}/bin目录下
地址:链接:https://pan.baidu.com/s/1g75yEqOaZtljZrfdssDZ5w
提取码:andy
复制这段内容后打开百度网盘手机App,操作更方便哦
7.运行后,可正常统计
mapper:
package cn.andy.mr; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * @author AndyShi * @version 1.0 * @date 2021/4/2 0002 22:04 */ public class WCMapper extends Mapper<LongWritable, Text,Text, IntWritable> { //实现父类的快捷键 alt+insert(INS) @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //获取一行数据 String line = value.toString(); //切分数据,按照空格切分 String[] fields = line.split(" "); //遍历获取每个单词 for (String field : fields){ //输出,每个单词拼接 1(标记) (java(K) 1(V)) context.write(new Text(field),new IntWritable(1)); } } }
reducer
package cn.andy.mr; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * @author AndyShi * @version 1.0 * @date 2021/4/2 0002 22:04 */ public class WCReducer extends Reducer<Text, IntWritable,Text, IntWritable> { //实现父类方法快捷键 ctrl+o @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //定义一个计数器 int count = 0; //累加计数 for (IntWritable intWritable : values){ //intWritable转化成int count+=intWritable.get(); } //输出 context.write(key,new IntWritable(count)); } }
Driver
package cn.andy.mr; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * @author AndyShi * @version 1.0 * @date 2021/4/2 0002 22:04 */ public class WCDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //实例化配置文件 Configuration configuration = new Configuration(); //定义一个job任务 Job job = Job.getInstance(configuration); //配置job的信息 job.setJarByClass(WCDriver.class); //指定自定义的mapper类以及mapper的输出数据类型到job job.setMapperClass(WCMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //指定自定义的reduce以及reduce的输出数据类型(总输出的类型)到job job.setReducerClass(WCReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileSystem fileSystem = FileSystem.get(configuration); Path outputPath = new Path("wordcount/output/20210402"); if(fileSystem.exists(outputPath)){ fileSystem.delete(outputPath,true); } //配置输入数据的路径 FileInputFormat.setInputPaths(job,new Path("wordcount/input/wordcount.txt")); //配置输出数据的路径 FileOutputFormat.setOutputPath(job,new Path("wordcount/output/20210402")); //提交任务 job.waitForCompletion(true); } }
参考网上的教程在本地手写的MR,下一篇开始一步步剖析原理