Map类:
package com.lq.testt; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.util.HashMap; import java.util.StringTokenizer; import java.io.Map; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class Map extends Mapper<LongWritable, Text, Text, Writable> { // @Override // protected void map(LongWritable key, Text value, Context context) // throws IOException, InterruptedException // { // String line = value.toString(); // //切分成各个字段 // String[] fields = StringUtils.split(line, "[,]"); // // //拿到我们需要的字段 // String ip = fields[0]+"."+fields[1]+"."+fields[2]+"."+fields[3]; // String article=fields[7]+"/"+fields[8]; // String time=fields[4]; // int traffic=new Integer(fields[6]); // String video=fields[7]+"/"+fields[8]; // //封装数据为kv并输出 // context.write(new Text(ip), new Writable(ip, time, traffic, article, video)); // } BufferedReader br=new BufferedReader(new FileReader("D:"+File.separator+"result.txt"));//map存放引用数据类型 Map<String,Integer> map=new HashMap<String,Integer>(); String line=null; //读取文件,将向map中添加单词 while((line=br.readLine())!=null) { String[] fields = StringUtils.split(line, "[,]"); //拿到我们需要的字段 String ip = fields[0]+"."+fields[1]+"."+fields[2]+"."+fields[3]; String article=fields[7]+"/"+fields[8]; String time=fields[4]; int traffic=new Integer(fields[6]); String video=fields[7]+"/"+fields[8]; }
Reduce类
package com.lq.testt; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class Reduce extends Reducer<Text, Writable, Text, Writable> { @Override protected void reduce(Text key, Iterable<Writable> values, Context context) throws IOException, InterruptedException { String time=null; int traffic=0; String article=null; String video=null; for(Writable writable:values) { time+=writable.getTime(); traffic+=writable.getTraffic(); article+=writable.getArticle(); video=writable.getVideo(); } context.write(key, new Writable(key.toString(), time, traffic, article, video)); } }
Writable类:
package com.lq.testt; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class Writable implements org.apache.hadoop.io.Writable { private String ip; private String time; private int traffic; private String article; private String video; public Writable() { super(); } public Writable(String ip, String time, int traffic, String article, String video) { super(); this.ip = ip; this.time = time; this.traffic = traffic; this.article = article; this.video = video; } public void write(DataOutput out) throws IOException { out.writeInt(this.traffic); out.writeUTF(this.ip); out.writeUTF(this.time); out.writeUTF(this.article); out.writeUTF(this.video); } @Override public void readFields(DataInput in) throws IOException { this.traffic=in.readInt(); this.ip=in.readUTF(); this.time=in.readUTF(); this.article=in.readUTF(); this.video=in.readUTF(); } public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public String getTime() { return time; } public void setTime(String time) { this.time = time; } public int getTraffic() { return traffic; } public void setTraffic(int traffic) { this.traffic = traffic; } public String getArticle() { return article; } public void setArticle(String article) { this.article = article; } public String getVideo() { return video; } public void setVideo(String video) { this.video = video; } }
Runn类:
package com.lq.testt; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class Runner { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf=new Configuration(); Job job=Job.getInstance(conf); //调用计算程序,封装计算程序的mapper,reduce,输入,输出 job.setJarByClass(Runner.class); //设置主驱动类反射 Hadoop运行是jar包类型 job.setMapperClass(Map.class);//设置mapper类 job.setReducerClass(Reduce.class);//设置reduce类 job.setMapOutputKeyClass(Text.class); //设置map的输出类型 job.setMapOutputValueClass(Writable.class); job.setOutputKeyClass(Text.class);//设置reduce的输出类型 job.setOutputValueClass(Writable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); //设置输入,需要统计单词的路径,args[0]为控制台手动输入的参数 FileOutputFormat.setOutputPath(job, new Path(args[1]));//设置输出,最终结果输出的路径,输出路径之前不能存在 // job.submit(); //job提交,一般不打印日志 job.waitForCompletion(true); //true为打印执行日志 } }