MapReduce WordCount Combiner程序
注意使用Combiner之后的累加情况是不同的;
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.stono</groupId> <artifactId>mr01</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <name>mr01</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <java.version>1.7</java.version> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <maven.build.timestamp.format>yyyy-MM-dd HH:mm:ss</maven.build.timestamp.format> <hadoop-mapreduce-client.version>2.7.2</hadoop-mapreduce-client.version> <hbase-client.version>1.1.2</hbase-client.version> <slf4j.version>1.7.25</slf4j.version> <kafka-client.version>0.10.2.1</kafka-client.version> </properties> <dependencies> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.8</version> <scope>system</scope> <systemPath>D:/Java/jdk1.8.0_161/lib/tools.jar</systemPath> </dependency> <!-- 日志记录 Slf4j --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> </dependency> <!-- mapreduce --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop-mapreduce-client.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoop-mapreduce-client.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <configuration> <archive> <manifest> <addClasspath>false</addClasspath> <mainClass>com.bsr.combiner.JobRunner</mainClass> <!-- 你的主类名 --> </manifest> </archive> </configuration> </plugin> <!--<plugin>--> <!--<artifactId> maven-assembly-plugin </artifactId>--> <!--<configuration>--> <!--<descriptorRefs>--> <!--<descriptorRef>jar-with-dependencies</descriptorRef>--> <!--</descriptorRefs>--> <!--<archive>--> <!--<manifest>--> <!--<mainClass>com.bsr.basis.JobRunner</mainClass>--> <!--</manifest>--> <!--</archive>--> <!--</configuration>--> <!--<executions>--> <!--<execution>--> <!--<id>make-assembly</id>--> <!--<phase>package</phase>--> <!--<goals>--> <!--<goal>single</goal>--> <!--</goals>--> <!--</execution>--> <!--</executions>--> <!--</plugin>--> </plugins> </build> </project>
Mapper:
package com.bsr.combiner; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /* 四个参数的含义 第一个参数:map中key-value的key的类型,默认值是输入行的偏移量 第二个参数:map中key-value的value的类型 在此需求中是某一行的内容(数据) 第三个参数:reduce中key-value中的key类型 第四个参数:redece的输出参数int 但是在mapreduce中涉及到了网络间的传输,所以需要序列化,而hadoop提供了相关的序列化类型 long-LongWritable String-Text int-IntWritable */ public class MapperWordCount extends Mapper<LongWritable, Text, Text, IntWritable>{ /*重写mapper的map方法 编写自己的逻辑 * key是偏移量不用管 * value是一行的内容 例:hello zhangsan you you * context是返回结果 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] values=value.toString().split(" ");//对得到的一行数据进行切分 在此需求中是以空格进行切分 for(String word:values){ context.write(new Text(word), new IntWritable(1));//遍历数组 输出map的返回值 即<hello,1><zhangsan,1><you,1><you,1> } } }
Combiner:
package com.bsr.combiner; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class Combiner extends Reducer<Text, IntWritable,Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count=0;//初始一个计数器 for(IntWritable value:values){
count ++;//对values进行遍历 每次加1 } context.write(key,new IntWritable(count));//最后写返回值<hello,5> } }
reduce:
package com.bsr.combiner; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /* * 此方法是WordCount的reduce * 参数:1.map阶段返回的key类型String-Text * 2.map阶段返回值中value的类型Int-IntWritable * 3.reduce阶段返回值中key的类型String-Text * 4.reduce阶段返回值中value的类型Int-IntWritable */ public class ReducerWordCount extends Reducer<Text, IntWritable,Text, IntWritable>{ /* * 实现父类的reduce方法 *key是一组key-value的相同的哪个key *values是一组key-value的所有value *key value 的情况,比如<hello,{1,1,1,1,1}> * * context 返回值,<hello,5> */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException { int count=0;//初始一个计数器 for(IntWritable value:values){ count = count + i.get();//对values进行遍历 需要累加 } context.write(key,new IntWritable(count));//最后写返回值<hello,5> } }
Job:
package com.bsr.combiner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Job; public class JobRunner { /* * 提交写好的mapreduce程序 当做一个Job进行提交 * */ public static void main(String[] args) throws Exception { //读取classpath下的所有xxx-site.xml配置文件,并进行解析 Configuration conf=new Configuration(); FileSystem fs = FileSystem.get(configuration); String s = "/wc/output2"; Path path = new Path(s); fs.delete(path, true) Job wcjob=Job.getInstance(conf);//初始一个job //通过主类的类加载器机制获取到本job的所有代码所在的jar包 wcjob.setJarByClass(JobRunner.class); //指定本job使用的mapper类 wcjob.setMapperClass(MapperWordCount.class); //指定本job使用的reducer类 wcjob.setReducerClass(ReducerWordCount.class); //设置本job使用的从combiner类 wcjob.setCombinerClass(Combiner.class); //指定mapper输出的kv的数据类型 wcjob.setMapOutputKeyClass(Text.class); wcjob.setMapOutputValueClass(IntWritable.class); //指定reducer输出的kv数据类型 wcjob.setOutputKeyClass(Text.class); wcjob.setOutputValueClass(IntWritable.class); //指定本job要处理的文件所在的路径 FileInputFormat.setInputPaths(wcjob, new Path("/wc/data/")); //指定本job输出的结果文件放在哪个路径 FileOutputFormat.setOutputPath(wcjob, new Path("/wc/output2/")); //将本job向hadoop集群提交执行 boolean res=wcjob.waitForCompletion(true); System.exit(res?0:1);//执行成功的话正常退出系统执行有误则终止执行 } }
注意:https://www.cnblogs.com/esingchan/p/3917094.html 的讲解