hadoop2.6.5 Mapper类源码解析

Mapper类

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.apache.hadoop.mapreduce;

import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;

@Public
@Stable
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    public Mapper() {
    }

    protected void setup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
    }

    protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
        context.write(key, value);
    }

    protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
    }

    public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
        this.setup(context);

        try {
            while(context.nextKeyValue()) {
                this.map(context.getCurrentKey(), context.getCurrentValue(), context);
            }
        } finally {
            this.cleanup(context);
        }

    }

    public abstract class Context implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
        public Context() {
        }
    }
}

Mapper类主要有五种方法:

  • protected void setup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context)
  • protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context)
  • protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context)
  • public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context)
  • public abstract class Context implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

1. setup()

protected void setup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
}

初始时调用,用来加载一些初始化的工作,像全局文件、建立数据库的链接等等,执行一次。

2. cleanup()

protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
}

结束时调用,收尾工作,如关闭文件、关闭数据库连接、执行map()后的键值分发等等,执行一次。

3. map()

protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
	context.write(key, value);
}

它继承Mapper类,是一个需要重点重写的方法,将map工作的处理逻辑都要放在这个方法中。

map方法有三个参数,Mapper有四个泛型变量KEYIN, VALUEIN, KEIOUT, VALUEOUT,它们分别代表输入的键值类型和输出的键值类型,context是上下文,用于保存Job的配置信息、状态和map处理结果,用于最后的write方法。

map中有重要的四个方法

  • context.nextKeyValue();
    负责读取数据,但是方法的返回值却不是读取到的key-value,而是返回了一个标识有没有读取到数据的布尔值
  • context.getCurrentKey();
    负责获取context.nextKeyValue() 读取到的key
  • context.getCurrentValue();
    负责获取context.nextKeyValue() 读取到的value
  • context.write(key,value);
    负责输出mapper阶段输出的数据

4. run()

public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
	this.setup(context);

	try {
		while(context.nextKeyValue()) {
			this.map(context.getCurrentKey(), context.getCurrentValue(), context);
		}
	} finally {
		this.cleanup(context);
    }

}

run方法中先执行setup函数,然后是用map处理数据,当处理完数据后用cleanup的收尾工作。值得注意的是,setup函数和cleanup函数由系统作为回调函数只做一次,并不像map函数那样执行多次。

实现的是设计模式中的模板方法模式。Mapper类中定义了一些方法,用户可以继承这个类重写方法以应对不同的需求,但是这些方法内部的执行顺序是确定好的。它封装了程序的算法,让用户能集中精力处理每一部分的具体逻辑。

run方法在程序执行中会默认调用,从他的执行流程来看也给常符合我们的预期,先进行初始化,如果还有输入的数据,那么调用map方法处理每一个键值对,最终执行结束方法。

模板方法模式:
模板模式中分基本方法和模板方法,上述的run可看做基本方法,需要调用其它方法实现整体逻辑,不会被修改,其它几个方法可看做模板方法。基本方法一般会用final修饰,保证其不会被子类修改,而模板方法使用protect修饰,表示需要在子类中实现。

此外,模板模式中还有一个叫钩子方法的概念,即给子类一个授权,允许子类通过重写钩子方法来颠覆基本逻辑的执行,这有时候是非常有用的,可以用来完善具体的逻辑。

5. Context

public abstract class Context implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
	public Context() {
	}
}

Context类,它实现了MapContext接口,而MapContext继承于TaskInputOutputContext

参考资料:
hadoop之mapper类妙用

Hadoop学习笔记(六)实战wordcount

Java设计模式中的模板模式

上一篇:JAVA Schedule的Cron表达式


下一篇:Android手机管理工具类,【Android面试题】