Hadoop03---MapReduce基础

MapReduce基本用法

一 MapReduce基本定义

1.简介

​ MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)“和"Reduce (归约)”,是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。

2.核心流程

  • MapReduce是一个编程模型
  • 将大量数据分解为多个小数据,分由多台设备同时处理
  • Map(映射):
    • 将数据处理为键值对映射(以统计字母数量为例,键为字母,值为+1)
    • 通常将处理结果以键的哈希值进行分组(HashCode%n)
    • 将分组的数据发送到n台reduce设备上
  • Reduce(规约):
    • reduce一次处理一组数据(某个键的所有键值对)
    • reduce传入数据的格式为该组同一的键为reduce的键,组内所有的值组成一个集合存储在迭代器中
    • 在reduce中通过迭代器处理数据,并将结果输出
  • main方法中需要声明:
    • HDFS配置对象
    • 创建工作对象,并起名(不知道什么用)
    • mapper类和reducer类
    • map类和reduce类的输出类型(如果两者输出类型相同,map的可以省略)
    • 参与Reduce的设备数量(可不写,默认1)
    • 要处理的文件路径
    • 结果输出路径
    • job.waitForCompletion("true")

3.基本模型演示

例题演示:求每部电影最高的三个评分

  • Map:
    static class MovieGradeMap extends Mapper<LongWritable, Text,Text, IntWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //切割数据
            String s = value.toString();
            String[] split = s.split("\\s+");
            //获取电影名和评分
            String movie = split[1];
            int rate = Integer.parseInt(split[3]);
            context.write(new Text(movie),new IntWritable(rate));
        }
    }
  • Reduce:
static class MovieGradeReduce extends Reducer<Text,IntWritable,Text,Text> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            //创建list集合接收评分
            ArrayList<Integer> list = new ArrayList<>();
            for (IntWritable value : values) {
                list.add(value.get());
            }
            //进行排序
            list.sort((Integer i1,Integer i2)->i2-i1);
            //归纳输出结果
            String s ="";
            if (list.size()>=3)
            {
                s = "该电影最高的三个评分是:"+list.get(0)+"/"+list.get(1)+"/"+list.get(2);
            }else {
                for (Integer in : list) {
                    s+=in+"/";
                }
                s = "该电影最高的三个评分是:"+s;
            }
            context.write(key,new Text(s));
        }
    }
  • main方法:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.ArrayList;

public static void main(String[] args) throws Exception {
        //获取工作对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, MovieGrade.class.getSimpleName());

        job.setMapperClass(MovieGradeMap.class);
        job.setReducerClass(MovieGradeReduce.class);

  		//设置map的输出格式(如相同,可省略)
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
  		//设置总的输出格式
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(job,new Path("D://duoyi/xxx/movieInfo.txt"));
        FileOutputFormat.setOutputPath(job,new Path("D://duoyi/xxx/moviesresult"));

        boolean b = job.waitForCompletion(true);
    }

二 MapReduce及其组件

1.独特的Iterable

以下内容为现阶段个人理解,可能存在错误

  • MapReduce中reduce里面传入的Iterable(reduce-Iterable)和java中的迭代器原理存在细微不同
  • 当迭代对象类型数据时,reduce-Iterable中的next方法,获取的对象不是新建的
  • 新建该Object对象是在Iterable类中完成的
  • next方法中只是改变了object对象的参数,实际指向的一直是堆内存中的同一空间
  • java中的迭代器与之不同
  • 其next方法中可以看做每次都新建了一个Object对象供其使用

2.setup方法

  • Mapper和Reducer中,均包含一个setup方法

  • 该方法都是在map和reduce方法执行前,执行一次

  • 通常用来获取切块文件的文件名

    //使用setup获取文件名
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
      FileSplit fs = (FileSplit)context.getInputSplit();
      fileName = fs.getPath().getName();
    }
    

3.run方法

  • 在Mapper和Reducer中,都有一个run方法用于主体运行
//以Reducer中的run方法为例
public void run(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
    this.setup(context);

    try {
        while(context.nextKey()) {
            this.reduce(context.getCurrentKey(), context.getValues(), context);
            Iterator<VALUEIN> iter = context.getValues().iterator();
            if (iter instanceof ValueIterator) {
                ((ValueIterator)iter).resetBackupStore();
            }
        }
    } finally {
        this.cleanup(context);
    }
}
  • 循环运行,判断是否有下一个key,有则执行一遍reduce(map)
  • cleanup 方法:
    • maptaskreducetask 都有,在map任务reduce任务 执行完毕后,运行一次
    • 可重写,可用来进行结果统计

4.POJO对象

  • POJO是Plain Ordinary Java Objects 的缩写,是简单的Java对象。POJO实际就是普通JavaBeans;

  • BeanUtils.copyProperties(新,旧); 用于对象的复制的方法

  • MapReduce 中使用自己编写的类时,要保证可以序列化

  • 实现Writable 接口,重写其中的两个方法:

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(uid);
        dataOutput.writeUTF(name);
        dataOutput.writeInt(age);
        dataOutput.writeUTF(gender);
        dataOutput.writeUTF(friend);
        dataOutput.writeUTF(oid);
        dataOutput.writeUTF(dataType);
    }
    
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.uid = dataInput.readUTF();
        this.name = dataInput.readUTF();
        this.age = dataInput.readInt();
        this.gender = dataInput.readUTF();
        this.friend = dataInput.readUTF();
        this.oid = dataInput.readUTF();
        this.dataType = dataInput.readUTF();
    }
    

5.MapReduce原理加强

Hadoop03---MapReduce基础

5.1 文件读取

  • 关联类:
    • FileInputFormat 主要工作类,其中包含了setInputPaths(设置路径)getSplits(获取切片) 等方法
    • 继承自FileInputFormatTextInputFormat 等方法,则包含了RecordReader 等实际用于读写的方法
Hadoop03---MapReduce基础
  • 运行流程:
    • 用户通过FileInputFormatsetinputpaths 方法,设置源文件路径
    • TextInputFormat 等工作类获取文件切片,并进行循环读取(RecordReader 方法)
    • 读取的数据以键值对的格式传递给Mapper

5.2 Mapper

  • 关联类:

    • Mapper类:主要为run方法和map方法,用来处理数据
    • MapOutputBuffer 类是MapperTask 的成员内部类,包含:
      • 环形缓冲数组:kvBuffer
      • collect 方法
      • 溢出组件(Spiller),其中包含多个方法
      • 合并组件(Merger),对写出数据进行合并和重排
  • 运行流程:

    • Mapper运行run方法,以hasNextkey 为判断依据,循环执行重写的map方法
    • 通过context方法将处理结果传出,到环形缓冲数组kvBuffer
      • kvBuffer 是类MapOutputBuffer 中的变量
      • MapOutputBuffer 会调用其中的collect 方法,收集map输出的K、V
      • 并在collect 方法中,会调用分区器HashPartitionergetPartition 方法,通过key,返回一个int 类型的p,用以实现分区的效果
    • 环形缓冲数组一侧存储collect 收集的K、V、P ,一侧存储每组kvp的源数据
    • 缓冲数组容量为100M,当存到80%的时候,本组数据停止接收,通过快排方法进行区内排序
    • 此时kv已经完成了及排序,本组数据分区溢出(Spiller)到磁盘中
    • 在本组数据开始排序时,下一组数据会重新获取缓冲数组的赤道,利用剩余的20%空间,开始写入数据(若在本组数据溢出前,将20%的空间用完,开始阻塞,直到完全溢出)
    • 当有下一组数据溢出到磁盘时,会调用合并组件(Merger),将两次或多次溢出的数据进行合并,并重新排序(快排)
    • 最后,Mapper会提供一个HTTP下载服务,供Reducer拉取属于自己的数据

5.3 Reducer及输出

  • 运行流程:
    • 通过抓取器(fetcher),从磁盘抓取属于自己的数据
    • 通过合并组件(merger),对数据进行合并排序
    • 调用分组器GroupingComparetor 对数据进行分组
      • 使用compareTo 方法进行冒泡比较的方式进行分组
    • 分组后将每组的键作为key,所有的值放到Iterator(迭代器)中,传到Reducer中
    • 通过Reducer的run方法,循环处理数据
    • 处理完的数据通过context传递,通过FileOutputFormat 类写出到磁盘中

三 实践应用

1.求共同好友

package cn.doit.ab.day1118.demo02;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

import java.io.IOException;
import java.util.ArrayList;

public class Friends {

  
    public static void main(String[] args) throws Exception{
        runJob1();
        runJob2();
    }


  
  
    static class Friends1Mapper extends Mapper<LongWritable, Text,Text, Text>{

        Text k = new Text();
        Text v = new Text();
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String s = value.toString();
            String[] split = s.split(":");
            String own = split[0];
            v.set(own);

            String[] fs = split[1].split(",");
            for (String f : fs) {
                k.set(f);
                context.write(k,v);
                System.out.println(k+"---"+v);
            }
        }
    }

  

    static class Friends1Reducer extends Reducer<Text, Text,Text,Text>{
        Text k = new Text();
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            ArrayList<String> list = new ArrayList<>();
            for (Text value : values) {
                String s = value.toString();
                list.add(s);
            }
            if (list.size()>1) {
                for (int i = 0; i < list.size()-1; i++) {
                    for (int i1 = i+1 ; i1 < list.size() ; i1++) {
                        String s = list.get(i)+"和"+list.get(i1)+"的共同好友是:";
                        k.set(s);
                        context.write(k,key);
                    }
                }
            }
        }
    }



    private static void runJob1() throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf,"Friends1");

        job.setMapperClass(Friends1Mapper.class);
        job.setReducerClass(Friends1Reducer.class);

//        job.setMapOutputKeyClass(Text.class);
//        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        //设置输出文件的编码格式为Sequence
        job.setOutputFormatClass(SequenceFileOutputFormat.class);

        job.setNumReduceTasks(1);

        FileInputFormat.setInputPaths(job,new Path("D:\\duoyi\\09_Hadoop\\MR案例\\friends\\input"));
        FileOutputFormat.setOutputPath(job,new Path("D:\\duoyi\\09_Hadoop\\MR案例\\friends\\out1"));

        boolean b = job.waitForCompletion(true);
    }


  
  
  
  
    static class Friends2Mapper extends Mapper<Text, Text,Text, Text>{

        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
            context.write(key,value);
        }
    }

  

    static class Friends2Reducer extends Reducer<Text, Text,Text, Text>{
        Text v= new Text();
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            StringBuilder sb = new StringBuilder();
            for (Text value : values) {
                sb.append(value+"  ");
            }
            v.set(sb.toString());
            context.write(key,v);
        }
    }



    private static void runJob2() throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf,"Friends2");

        job.setMapperClass(Friends2Mapper.class);
        job.setReducerClass(Friends2Reducer.class);

//        job.setMapOutputKeyClass(Text.class);
//        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        //设置输入文件的编码格式为Sequence
        job.setInputFormatClass(SequenceFileInputFormat.class);

        job.setNumReduceTasks(1);

        FileInputFormat.setInputPaths(job,new Path("D:\\duoyi\\09_Hadoop\\MR案例\\friends\\out1"));
        FileOutputFormat.setOutputPath(job,new Path("D:\\duoyi\\09_Hadoop\\MR案例\\friends\\outover"));

        boolean b = job.waitForCompletion(true);
    }
}

2.自定义分区器和分组器

  • POJO

    • 由于要对其进行序列化和自定义比较器,直接实现WritableComparable<>
    package cn.doit.ab.day1121.demo01;
    
    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    public class MovieBean implements WritableComparable<MovieBean> {
    
        private String movie;
        private double rate;
        private String timeStamp;
        private String uid;
    
        public MovieBean() {
        }
    
        public MovieBean(String movie, double rate, String timeStamp, String uid) {
            this.movie = movie;
            this.rate = rate;
            this.timeStamp = timeStamp;
            this.uid = uid;
        }
    
        public String getMovie() {
            return movie;
        }
    
        public void setMovie(String movie) {
            this.movie = movie;
        }
    
        public double getRate() {
            return rate;
        }
    
        public void setRate(double rate) {
            this.rate = rate;
        }
    
        public String getTimeStamp() {
            return timeStamp;
        }
    
        public void setTimeStamp(String timeStamp) {
            this.timeStamp = timeStamp;
        }
    
        public String getUid() {
            return uid;
        }
    
        public void setUid(String uid) {
            this.uid = uid;
        }
    
        @Override
        public String toString() {
            return "movie='" + movie + '\'' +
                    ", rate=" + rate +
                    ", timeStamp='" + timeStamp + '\'' +
                    ", uid='" + uid + '\'' ;
        }
    
        @Override
        public int compareTo(MovieBean o) {
            if (o.getMovie().equals(this.movie))
            {
                return Double.compare(o.rate,this.rate);
            }else {
                return o.getMovie().compareTo(this.movie);
            }
    
        }
    
        @Override
        public void write(DataOutput dataOutput) throws IOException {
            dataOutput.writeUTF(movie);
            dataOutput.writeDouble(rate);
            dataOutput.writeUTF(timeStamp);
            dataOutput.writeUTF(uid);
        }
    
        @Override
        public void readFields(DataInput dataInput) throws IOException {
            this.movie = dataInput.readUTF();
            this.rate = dataInput.readDouble();
            this.timeStamp = dataInput.readUTF();
            this.uid = dataInput.readUTF();
    
        }
    }
    
上一篇:在linux中安装hadoop(详细)


下一篇:centos7访问不到外网