03_MapReduce框架原理_3.9 合并 Combiner(Map端合并)

1. 说明

03_MapReduce框架原理_3.9 合并 Combiner(Map端合并)

 

 2. 指定 合并器

  // 指定 合并器
  public void setCombinerClass(Class<? extends Reducer> cls
                               ) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    // 检测 指定的Combiner类 必须是Reducer 的子类
    conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
  }

3. 案例

package CombinerPk {

  import java.lang
  import org.apache.hadoop.conf.Configuration
  import org.apache.hadoop.fs.Path
  import org.apache.hadoop.io.{IntWritable, LongWritable, Text}
  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
  import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer}

  // Mapper 类
  class WCComMapper extends Mapper[LongWritable, Text, Text, IntWritable] {
    var text = new Text
    var intWritable = new IntWritable(1)

    // 每行记录调用一次map方法
    override def map(key: LongWritable, value: Text, context: Mapper[LongWritable, Text, Text, IntWritable]#Context) = {
      println("map enter .....")
      //1. 获取一行记录
      val line = value.toString

      //2. 切割
      val words = line.split(" ")

      //3. 输出到缓冲区
      words.foreach(
        key1 => {
          text.set(key1);
          context.write(text, intWritable)
        }
      )

    }
  }

  // Reducer 类
  class WCComReducer extends Reducer[Text, IntWritable, Text, IntWritable] {

    private val intWritable = new IntWritable

    // 每个key调用一次
    override def reduce(key: Text, values: lang.Iterable[IntWritable], context: Reducer[Text, IntWritable, Text, IntWritable]#Context) = {
      var sum: Int = 0
      // 1. 对词频数 求sum
      values.forEach(sum += _.get)

      // 2. 输出结果
      intWritable.set(sum)
      context.write(key, intWritable)

    }
  }

  // 自定义Combiner
  class WCCombiner extends Reducer[Text, IntWritable, Text, IntWritable] {

    private val intWritable = new IntWritable

    // 每个key调用一次
    override def reduce(key: Text, values: lang.Iterable[IntWritable], context: Reducer[Text, IntWritable, Text, IntWritable]#Context) = {
      var sum: Int = 0
      // 1. 对词频数 求sum
      values.forEach(sum += _.get)

      // 2. 输出结果
      intWritable.set(sum)
      context.write(key, intWritable)

    }
  }




  // Driver
  object Driver {
    def main(args: Array[String]): Unit = {
      //1. 获取配置信息以及 获取job对象
      //读取配置文件  Configuration: core-default.xml, core-site.xml
      var configuration = new Configuration
      var job: Job = Job.getInstance(configuration)

      //2. 注册本Driver程序的jar
      job.setJarByClass(this.getClass)

      job.setJobName("scala mr")

      //3. 注册 Mapper 和 Reducer的jar
      job.setMapperClass(classOf[WCComMapper])
      job.setReducerClass(classOf[WCComReducer])

      //4. 设置Mapper 类输出key-value 数据类型
      job.setMapOutputKeyClass(classOf[Text])
      job.setMapOutputValueClass(classOf[IntWritable])

      //5. 设置最终输出key-value 数据类型
      job.setOutputKeyClass(classOf[Text])
      job.setOutputValueClass(classOf[IntWritable])

      //6. 设置输入输出路径
      FileInputFormat.setInputPaths(job, "src/main/data/input/1.txt")
      FileOutputFormat.setOutputPath(job, new Path("src/main/data/output"))

      //7. 指定分区器
      job.setCombinerClass(classOf[WCCombiner])

      //8. 提交job
      val bool: Boolean = job.waitForCompletion(true)
      System.exit(bool match {
        case true => "0".toInt
        case false => "1".toInt
      })

    }


  }


}

 

上一篇:105_实例


下一篇:MapReduce编程笔记(3)-计算部门工资