Mapper类
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//
package org.apache.hadoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
@Public
@Stable
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
public Mapper() {
}
protected void setup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
}
protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
context.write(key, value);
}
protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
}
public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
this.setup(context);
try {
while(context.nextKeyValue()) {
this.map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
this.cleanup(context);
}
}
public abstract class Context implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
public Context() {
}
}
}
Mapper类主要有五种方法:
- protected void
setup
(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) - protected void
map
(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) - protected void
cleanup
(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) - public void
run
(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) - public abstract class
Context
implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
1. setup()
protected void setup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
}
初始时调用,用来加载一些初始化的工作,像全局文件、建立数据库的链接等等,执行一次。
2. cleanup()
protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
}
结束时调用,收尾工作,如关闭文件、关闭数据库连接、执行map()后的键值分发等等,执行一次。
3. map()
protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
context.write(key, value);
}
它继承Mapper类,是一个需要重点重写的方法,将map工作的处理逻辑都要放在这个方法中。
map方法有三个参数,Mapper有四个泛型变量KEYIN, VALUEIN, KEIOUT, VALUEOUT
,它们分别代表输入的键值类型和输出的键值类型,context是上下文,用于保存Job的配置信息、状态和map处理结果,用于最后的write方法。
map中有重要的四个方法
-
context.nextKeyValue();
负责读取数据,但是方法的返回值却不是读取到的key-value,而是返回了一个标识有没有读取到数据的布尔值 -
context.getCurrentKey();
负责获取context.nextKeyValue() 读取到的key -
context.getCurrentValue();
负责获取context.nextKeyValue() 读取到的value -
context.write(key,value);
负责输出mapper阶段输出的数据
4. run()
public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
this.setup(context);
try {
while(context.nextKeyValue()) {
this.map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
this.cleanup(context);
}
}
run方法中先执行setup
函数,然后是用map
处理数据,当处理完数据后用cleanup
的收尾工作。值得注意的是,setup函数和cleanup函数由系统作为回调函数只做一次,并不像map函数那样执行多次。
实现的是设计模式中的模板方法模式。Mapper类中定义了一些方法,用户可以继承这个类重写方法以应对不同的需求,但是这些方法内部的执行顺序是确定好的。它封装了程序的算法,让用户能集中精力处理每一部分的具体逻辑。
run方法在程序执行中会默认调用,从他的执行流程来看也给常符合我们的预期,先进行初始化,如果还有输入的数据,那么调用map方法处理每一个键值对,最终执行结束方法。
模板方法模式:
模板模式中分基本方法和模板方法,上述的run可看做基本方法,需要调用其它方法实现整体逻辑,不会被修改,其它几个方法可看做模板方法。基本方法一般会用final修饰,保证其不会被子类修改,而模板方法使用protect修饰,表示需要在子类中实现。
此外,模板模式中还有一个叫钩子方法的概念,即给子类一个授权,允许子类通过重写钩子方法来颠覆基本逻辑的执行,这有时候是非常有用的,可以用来完善具体的逻辑。
5. Context
public abstract class Context implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
public Context() {
}
}
Context类,它实现了MapContext接口,而MapContext继承于TaskInputOutputContext
参考资料:
hadoop之mapper类妙用