Partitactioner
Partitioner 组件可以对 MapTask后的数据按Key进行分区,从而将不同分区的Key交由不同的Reduce处理。这个也是我们经常会用到的功能。
1.使用场景
比如上个案例中我们统计出来了每个用户的流量数据,那么我们接下来想把统计的用户数据根据不同的手机号输出到不同的文件中,那么这时使用分区器就非常合适了。
2.HashPartitioner
在一般的 MapReduce 过程中,我们知道可以通过 job.setNumReduceTasks(N) 来创建多个 ReducerTask 进行处理任务。可是这种情况下,系统会调用默认的Partitioner也就是 HashPartitioner来对Map的 key 进行分区。进入 Hadoop 的源码,可以看到 HashPartitioner 的实现其实很简单。如下:
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> { public void configure(JobConf job) {} /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K2 key, V2 value, int numReduceTasks) { // key的hash值与integer的最大值取与然后对ReduceTask的个数取余 return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
hash的好处是可以很key的分布更加随机,但是这样会将一些不同的key放在同一个分区中,这并不是我们所期望的。
3.自定义Partitioner
面对HashPartitioner所具有的局限,我们可以通过自定义Partitioner来解决,如下:
3.1 实现自定义分区器
import java.util.HashMap; import java.util.Map; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; /** * 自定义分区器 * @author 波波烤鸭 * */ public class CustomPartitioner extends Partitioner< Text, Flow>{ private static Map<String, Integer> map = new HashMap<>(); // 此处我们将数据写死,实际开发中我们应该从对应的数据源中获取数据然后存储在缓存中(Redis) static{ map.put("138", 0); map.put("139", 1); map.put("158", 2); map.put("159", 3); } /** * 根据key获取对应的分区号 * @param key 就是用的手机号码 * @param value 统计的用户的信息 */ @Override public int getPartition(Text key, Flow value, int numPartitions) { // 获取手机号码的前3位 138 String prefix = key.toString().substring(0, 3); return map.containsKey(prefix)?map.get(prefix):4; } }
3.2 启动类设置
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(true); conf.set("mapreduce.framework.name", "local"); // 输出到HDFS文件系统中 // conf.set("fs.defaultFS", "hdfs://hadoop-node01:9000"); // 输出到本地文件系统 conf.set("fs.defaultFS", "file:///"); Job job = Job.getInstance(conf); job.setJarByClass(FlowTest.class); // 设置ReduceTask的个数 job.setNumReduceTasks(5); // 设置自定义的分区器 job.setPartitionerClass(CustomPartitioner.class); // 指定本job要使用的map/reduce的工具类 job.setMapperClass(MyMapTask.class); job.setReducerClass(MyReduceTask.class); // 指定mapper输出kv的类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Flow.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Flow.class); // 指定job的原始文件输入目录 // 6.设置输出输出类 FileInputFormat.setInputPaths(job, new Path("c:/tools/bigdata/mr/flow/input/")); FileOutputFormat.setOutputPath(job, new Path("c:/tools/bigdata/mr/flow/output/")); //将job中配置的相关参数,以及job所用的jar包提交给yarn运行 //job.submit(); waitForCompletion等待执行完成 boolean flag = job.waitForCompletion(true); System.exit(flag?0:1); }
MapTask和ReduceTask的代码内容不需要改变,可以参考上篇内容。
Ok ~ partitioner的作用就是用来对Map之后的数据做分区处理操作!