hadoop MapReduce自定义分区Partition输出各运营商的手机号码

MapReduce和自定义Partition

MobileDriver主类
package Partition;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;

public class MobileDriver {
    public static void main(String[] args) {
        String[] paths = {"F:\\mobile.txt", "F:\\output"};

        JobUtils.commit(paths, true, 3, MobileDriver.class,
                MobileMapper.class, Text.class, NullWritable.class, MobilePartition.class,
                MobileReduce.class, Text.class, NullWritable.class);

    }
}
JobUtils工具类
package Partition;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.File;
import java.io.IOException;

public class JobUtils {
    private static Configuration conf;

    static {
        conf = new Configuration();
    }

    /**
     * 提交job
     *
     * @param paths        输入输出路径数组
     * @param isPartition  是否包含自定义分区类
     * @param reduceNumber reduce数量(若自定义分区为true,则此项必须>=自定义分区数)
     * @param params       可变参数
     */
    public static void commit(String[] paths, boolean isPartition, int reduceNumber, Class... params) {
        try {
            Job job = Job.getInstance(conf);
            job.setJarByClass(params[0]);

            job.setMapperClass(params[1]);
            job.setOutputKeyClass(params[2]);
            job.setOutputValueClass(params[3]);

            if (isPartition) {
                job.setPartitionerClass(params[4]);//设置自定义分区;
            }

            if (reduceNumber > 0) {
                job.setNumReduceTasks(reduceNumber);
                job.setReducerClass(params[5]);
                job.setOutputKeyClass(params[6]);
                job.setOutputValueClass(params[7]);
            } else {
                job.setNumReduceTasks(0);
            }
            deleteDirectory(paths[1]);
            FileInputFormat.setInputPaths(job, new Path(paths[0]));
            FileOutputFormat.setOutputPath(job, new Path(paths[1]));
            job.waitForCompletion(true);
        } catch (InterruptedException | ClassNotFoundException | IOException e) {
            e.printStackTrace();
        }
    }

    //输出目录存在则删除
    public static void deleteDirectory(String path) {
        File pFile = new File(path);
        if (!pFile.exists()) {
            return;
        }
        if ((pFile.isDirectory() && pFile.listFiles().length == 0) || pFile.isFile()) {
            pFile.delete();
        } else {
            for (File file : pFile.listFiles()) {
                if (file.isDirectory()) {
                    deleteDirectory(file.getAbsolutePath());
                } else {
                    file.delete();
                }
            }
        }
        pFile.delete();
    }
}
Map自定义类
package Partition;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MobileMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] mobiles = line.split("\t");
        for (String mobile : mobiles) {
            //不满足11位手机号进行过滤
            if (mobile.length() == 11) {
                context.write(new Text(mobile), NullWritable.get());
            }
        }
    }
}
Reduce自定义类
package Partition;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MobileReduce extends Reducer<Text, NullWritable, Text, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.write(key, NullWritable.get());
    }
}
Partition自定义分区类
package Partition;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

import java.util.Arrays;

public class MobilePartition extends Partitioner<Text, NullWritable> {
    @Override
    public int getPartition(Text text, NullWritable nullWritable, int i) {
        String line = text.toString();
        String flag = line.substring(0, 3);
        if (Arrays.asList(Mobile.CHINA_MOBILE).contains(flag)) {
            return 0;//移动
        } else if (Arrays.asList(Mobile.CHINA_UNICOM).contains(flag)) {
            return 1;//联通
        } else {
            return 2;//电信
        }
    }
}

hadoop MapReduce自定义分区Partition输出各运营商的手机号码

上一篇:AutoMapper 值转换器


下一篇:AutoMapper Console Sample