hadoop2.2.0 MapReduce求和并排序

javabean必须实现WritableComparable接口,并实现该接口的序列化,反序列话和比较方法

package com.my.hadoop.mapreduce.sort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class InfoBean implements WritableComparable<InfoBean> {
    
    private String account;
    private double income;
    private double expences;
    private double surplus;

public void set(String account, double income, double expences){
        this.account = account;
        this.income = income;
        this.expences = expences;
        this.surplus = income - expences;
    }
    
    @Override
    public String toString() {
        return income+"\t"+expences+"\t"+surplus;
    }

@Override
    public void readFields(DataInput in) throws IOException {
        this.account = in.readUTF();
        this.income = in.readDouble();
        this.expences = in.readDouble();
        this.surplus = in.readDouble();
    }

@Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(this.account);
        out.writeDouble(this.income);
        out.writeDouble(this.expences);
        out.writeDouble(this.surplus);
    }

@Override
    public int compareTo(InfoBean o) {
        if (this.income == o.getIncome()) {
            return this.expences > o.getExpences() ? 1 : -1;
        } else {
            return this.income > o.getIncome() ? -1 : 1;
        }
    }

public String getAccount() {
        return account;
    }

public void setAccount(String account) {
        this.account = account;
    }

public double getIncome() {
        return income;
    }

public void setIncome(double income) {
        this.income = income;
    }

public double getExpences() {
        return expences;
    }

public void setExpences(double expences) {
        this.expences = expences;
    }

public double getSurplus() {
        return surplus;
    }

public void setSurplus(double surplus) {
        this.surplus = surplus;
    }

}

先求和

package com.my.hadoop.mapreduce.sort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SumStep {

public static class SumMap extends Mapper<LongWritable, Text, Text, InfoBean>{
        private Text k = new Text();
        private InfoBean v = new InfoBean();
        
        @Override
        public void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {
            String[] fields = value.toString().split("\t");
            String account = fields[0];
            double in = Double.parseDouble(fields[1]);
            double out = Double.parseDouble(fields[2]);
            k.set(account);
            v.set(account, in, out);
            context.write(k, v);
        }
    }
    
    public static class SumReduce extends Reducer<Text, InfoBean, Text, InfoBean>{
        private InfoBean v = new InfoBean();
        
        @Override
        public void reduce(Text key, Iterable<InfoBean> value, Context context) throws java.io.IOException ,InterruptedException {
            double in_sum = 0;
            double out_sum = 0;
            for (InfoBean bean : value) {
                in_sum += bean.getIncome();
                out_sum += bean.getExpences();
            }
            v.set("", in_sum, out_sum);
            context.write(key, v);
        }
    }
    
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, SumStep.class.getSimpleName());
        job.setJarByClass(SumStep.class);
        
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        job.setMapperClass(SumMap.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(InfoBean.class);
        
        job.setReducerClass(SumReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(InfoBean.class);
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        System.exit(job.waitForCompletion(true) ? 0 :1);
        
    }

}

后排序

package com.my.hadoop.mapreduce.sort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SortStep {

public static class SortMap extends Mapper<LongWritable, Text, InfoBean, NullWritable>{
        private InfoBean k = new InfoBean();
        
        @Override
        public void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {
            System.out.println("===="+value.toString()+"====");
            String[] fields = value.toString().split("\t");
            String account = fields[0];
            double in = Double.parseDouble(fields[1]);
            double out = Double.parseDouble(fields[2]);
            k.set(account, in, out);
            context.write(k, NullWritable.get());
        }
    }
    
    public static class SortReduce extends Reducer<InfoBean, NullWritable, Text, InfoBean>{
        private Text k = new Text();
        
        @Override
        public void reduce(InfoBean bean, Iterable<NullWritable> value, Context context) throws java.io.IOException ,InterruptedException {
            k.set(bean.getAccount());
            context.write(k, bean);
        }
    }
    
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, SortStep.class.getSimpleName());
        job.setJarByClass(SortStep.class);
        
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        job.setMapperClass(SortMap.class);
        job.setMapOutputKeyClass(InfoBean.class);
        job.setMapOutputValueClass(NullWritable.class);
        
        job.setReducerClass(SortReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(InfoBean.class);
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        System.exit(job.waitForCompletion(true) ? 0 :1);        
        
    }

}

上一篇:尝试在Linux上编译KestrelHttpServer


下一篇:ArcGIS Engine 连接SQL Server并建立关联