Day10:YARN与Hive入门

                                                            优秀是一种习惯

知识点01:回顾

  1. 为什么要设计Shuffle?

    • 全局分组和排序
  2. Map端的Shuffle阶段如何实现?

    • Spill:将每个MapTask每个小的部分实现局部排序,生成多个有序的小文件
      • MapTask将数据写入一个环形缓冲区【内存:100M】
      • 阈值:80%
        • 先排序:快排
        • 后溢写:排序好的内容生成小文件
    • Merge:将自己所有小文件合并为一个大文件,并且实现排序
      • 实现每个MapTask整体有序
      • 排序:插入排序
  3. Reduce端的Shuffle阶段如何实现?

    • Merge:将所有MapTask中属于自己的数据进行合并并排序
      • 排序:插入排序
  4. Combiner的功能是什么?如何实现Combiner?

    • 功能:Map端聚合,利用MapTask个数远大于ReduceTask个数,MapTask提前做一次聚合
    • 实现
      • job.setCombinerClass(Reducer)
    • 效果:减少Reduce输出的数据量,降低了Reduce负载
  5. 压缩的好处是什么?常见的压缩类型有哪些?MapReduce如何配置压缩?

    • 优点:降低了数据存储大小,提高了IO传输速度,提高性能
    • 类型:Snappy、Lzo、Lz4
    • 配置
      • Input:不用配置
      • Map Out:配置开启和指定压缩类型
      • Reduce Out:配置开启和指定压缩类型
  6. Shuffle分组的规则是什么?如何自定义分组比较器?

    • 规则:先调用分组比较器,如果有,直接调用比较方法,如果没有,调用K2的比较方法

    • 定义

      • 继承WritableComparator,重写compare

      • 排序:大于、等于、小于

        job.setSortCOmparatorClass
        
      • 分组:等于、不等于

        job.setGroupingComparatorClass
        

知识点02:目标

  1. MapReduce补充知识点
    • 分片的规则:读取文件时,如何决定划分多少个分片,决定了MapTask个数
    • MapReduce如何实现Join?
      • Reduce Join
      • Map Join【重点】
  2. YARN的介绍
    • 功能与应用场景
    • 架构以及MR程序的运行流程
    • 资源管理和任务调度
  3. 数据仓库与Hive的入门
    • 数据仓库
    • Hive的介绍:功能与应用【重要】
    • Hive的安装部署
    • 案例:Wordcount、二手房统计

知识点03:MapReduce补充:分片规则

  • 引入:Reduce的个数可以自己指定,Map的个数如何决定?

  • 目标了解TextInputFormat中的分片规则

  • 路径

    • step1:InputFormat的功能
    • step2:读取数据的实现
    • step3:分片的规则
  • 实施

    • InputFormat的功能

      • 功能一:将读取到的所有输入数据划分为多个分片Split
      • 功能二:将每个分片的数据转换为KV
    • TextInputFormat读取数据的实现

      • createRecordReader:真正调用读取器读取数据的方法
      • LineRecordReader:真正读取器的对象【JavaBean】
        • nextKeyValue:将每一条数据转换为KV结构的方法
    • TextInputFormat分片的规则

      • getSplits:用于将输入的所有数据划分为多个分片

      • 规则

        • step1:判断是否分片的条件

          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP)
              	文件大小 /  splitSize >  1.1
          
        • step2:计算splitSize大小

          long splitSize = computeSplitSize(blockSize, minSize, maxSize);
          									128M		1		Long.MAX_VALUE
                                                  
          minsize = 最小分片数 max(1,mapreduce.input.fileinputformat.split.minsize=1)
          maxsize = 最大分片数 mapreduce.input.fileinputformat.split.maxsize                      	
          
        • step3:computeSplitSize计算逻辑

          Math.max(minSize, Math.min(maxSize, blockSize))
          max(1,min(Long.MAX_VALUE,128M))
          
        • 规则

          • 判断当前文件的大小是否大于128M的1.1倍
            • 如果大于:将文件的128M作为一个分片,再次判断,直到所有分片构建
            • 如果不大于:剩下的整体作为一个分片
          • 130M:1个分片
          • 145M:2个
            • split1:128M
            • split2:17M
  • 小结

    • TextInputFormat中分片的规则是什么?
      • 按照文件大小的1.1倍判断
      • 大于:每128M作为一个分片
      • 不大于:整体作为一个分片
    • 注意:如果要干预MapTask个数,怎么干预?
      • 调整minSplitSize和maxSplitSize大小

知识点04:MapReduce补充:Reduce Join

  • 引入:MapReduce用于实现数据统计分析,类似于SQL,如何实现Join过程?

  • 目标了解MapReduce中Reduce Join的实现

    • 什么是Reduce Join,特点和应用场景是什么?
  • 路径

    • step1:Reduce Join的原理
    • step2:Reduce Join的实现
    • step3:Reduce Join的特点
  • 实施

    • Reduce Join的原理
      Day10:YARN与Hive入门

      • 将Join的关联字段作为K2,将订单信息和商品的信息作为V2
      • Shuffle过程中分组时候,会将相同商品id对应的商品信息和订单信息放入同一个迭代器中
      • Reduce只要拼接即可
    • Reduce Join的实现

    Day10:YARN与Hive入门

    
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import java.io.IOException;
    
    /**
     * @ClassName MRDriver
     * @Description TODO 实现ReduceJoin
     * @Create By     Frank
     */
    public class ReduceJoinMr extends Configured implements Tool {
    
        /**
         * 用于将Job的代码封装
         * @param args
         * @return
         * @throws Exception
         */
        @Override
        public int run(String[] args) throws Exception {
            //todo:1-构建一个Job
            Job job = Job.getInstance(this.getConf(),"model");//构建Job对象,调用父类的getconf获取属性的配置
            job.setJarByClass(ReduceJoinMr.class);//指定可以运行的类型
            //todo:2-配置这个Job
            //input
    //        job.setInputFormatClass(TextInputFormat.class);//设置输入的类的类型,默认就是TextInputFormat
            Path inputPath1 = new Path("datas/join/orders.txt");//读取订单数据
            Path inputPath2 = new Path("datas/join/product.txt");//读取商品的数据
            //设置的路径可以给目录,也可以给定文件,如果给定目录,会将目录中所有文件作为输入,但是目录中不能包含子目录
            TextInputFormat.setInputPaths(job,inputPath1,inputPath2);//为当前job设置输入的路径
    
            //map
            job.setMapperClass(MRMapper.class);//设置Mapper的类,需要调用对应的map方法
            job.setMapOutputKeyClass(Text.class);//设置Mapper输出的key类型
            job.setMapOutputValueClass(Text.class);//设置Mapper输出的value类型
    
            //shuffle
    //        job.setPartitionerClass(HashPartitioner.class);//自定义分区
    //        job.setGroupingComparatorClass(null);//自定义分组的方式
    //        job.setSortComparatorClass(null);//自定义排序的方式
    
            //reduce
            job.setReducerClass(MRReducer.class);//设置Reduce的类,需要调用对应的reduce方法
            job.setOutputKeyClass(Text.class);//设置Reduce输出的Key类型
            job.setOutputValueClass(Text.class);//设置Reduce输出的Value类型
            job.setNumReduceTasks(1);//设置ReduceTask的个数,默认为1
    
            //output:输出目录默认不能提前存在
    //        job.setOutputFormatClass(TextOutputFormat.class);//设置输出的类,默认我诶TextOutputFormat
            Path outputPath = new Path("datas/output/join/reduceJoin");//用程序的第三个参数作为输出
            //解决输出目录提前存在,不能运行的问题,提前将目前删掉
            //构建一个HDFS的文件系统
            FileSystem hdfs = FileSystem.get(this.getConf());
            //判断输出目录是否存在,如果存在就删除
            if(hdfs.exists(outputPath)){
                hdfs.delete(outputPath,true);
            }
            TextOutputFormat.setOutputPath(job,outputPath);//为当前Job设置输出的路径
    
            //todo:3-提交运行Job
            return job.waitForCompletion(true) ? 0:-1;
        }
    
        /**
         * 程序的入口,调用run方法
         * @param args
         */
        public static void main(String[] args) throws Exception {
            //构建一个Configuration对象,用于管理这个程序所有配置,工作会定义很多自己的配置
            Configuration conf = new Configuration();
            //t通过Toolruner的run方法调用当前类的run方法
            int status = ToolRunner.run(conf, new ReduceJoinMr(), args);
            //退出程序
            System.exit(status);
        }
    
    
        /**
         * @ClassName MRMapper
         * @Description TODO 这是MapReduce模板的Map类
         *      输入的KV类型:由inputformat决定,默认是TextInputFormat
         *      输出的KV类型:由map方法中谁作为key,谁作为Value决定
         */
        public static class MRMapper extends Mapper<LongWritable, Text, Text,Text> {
    
            Text outputKey= new Text();
            Text outputValue = new Text();
    
    
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                //两个文件的每一行数据,就是Value
                // 要判断这条数据来自哪个文件
                FileSplit fileSplit = (FileSplit) context.getInputSplit();//获取这条数据对应的分片
                String name = fileSplit.getPath().getName();//获取这条数据对应的文件的名称
                if("orders.txt".equals(name)){
                    //如果这是订单的数据:1001,20150710,p0001,2
                    String[] split1 = value.toString().split(",");
                    //用商品id作为Key
                    this.outputKey.set(split1[2]);
                    //其他信息作为Value
                    this.outputValue.set(split1[0]+"\t"+split1[1]+"\t"+split1[3]);
                    //输出
                    context.write(this.outputKey,this.outputValue);
                }else{
                    //这是商品数据:p0001,直升机,1000,2000
                    String[] split2 = value.toString().split(",");
                    //用商品id作为key
                    this.outputKey.set(split2[0]);
                    //用商品名称作为Value
                    this.outputValue.set(split2[1]);
                    //输出
                    context.write(this.outputKey,this.outputValue);
                }
            }
        }
    
    
    
        /**
         * @ClassName MRReducer
         * @Description TODO MapReduce模板的Reducer的类
         *      输入的KV类型:由Map的输出决定,保持一致
         *      输出的KV类型:由reduce方法中谁作为key,谁作为Value决定
         */
        public static class MRReducer extends Reducer<Text,Text,Text,Text> {
            @Override
            protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                //传进来的是每个商品id对应的订单信息和商品名称
                StringBuilder stringBuilder = new StringBuilder();
                for (Text value : values) {
                    //将订单信息和商品的名称进行拼接
                    stringBuilder.append(value.toString()+"\t");
                }
                //输出
                context.write(key,new Text(stringBuilder.toString()));
    
            }
        }
    
    
    }
    
    
    • Reduce Join的特点

      • 利用了Shuffle中全局分组实现两份数据的Join
      • Join发生在Reduce端
      • 必须经过Shuffle
      • 应用:适合于大数据join大数据
  • 小结

    • 什么是Reduce Join,特点和应用场景是什么?
      • 利用SHuffle中的分组,将Join字段作为K2,所有join字段相关的数据放在同一个迭代器中
      • 特点:必须经过shuffle
      • 应用:大数据join大数据

知识点05:MapReduce补充:Map Join

  • 引入:Reduce Join必须经过Shuffle,有没有更好的方案?

  • 目标了解MapReduce中Map Join的实现

  • 路径

    • step1:Map Join的原理
    • step2:Map Join的实现
    • step3:Map Join的特点
  • 实施

    • Map Join的原理

    Day10:YARN与Hive入门

    • 在每一个MapTask 的内存中放入一份完整的商品表,前提:商品表的数据比较小

    • 每个MapTask处理一部分订单表,直接在MapTask所在的内存中订单表的每一个小部分与商品表的完整的数据进行join

    • Map Join的实现

      • Mapper类或者Reduce类总共有3个方法
        • setup:初始化方法,在Mapper构建实例的时候会被调用一次
        • map/reduce
        • cleanUp:关闭方法,用于释放资源,最后释放之前会被调用一次

      Day10:YARN与Hive入门

      
      
      import org.apache.commons.lang.StringUtils;
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.conf.Configured;
      import org.apache.hadoop.fs.FileSystem;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.NullWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.Mapper;
      import org.apache.hadoop.mapreduce.Reducer;
      import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
      import org.apache.hadoop.util.Tool;
      import org.apache.hadoop.util.ToolRunner;
      
      import java.io.BufferedReader;
      import java.io.FileReader;
      import java.io.IOException;
      import java.net.URI;
      import java.util.HashMap;
      import java.util.Map;
      
      /**
       * @ClassName MRDriver
       * @Description TODO 这是MapReduce程序的Driver类的模板
       * @Date 2020/5/30 10:34
       * @Create By     Frank
       */
      public class MapJoinMr extends Configured implements Tool {
      
          /**
           * 用于将Job的代码封装
           * @param args
           * @return
           * @throws Exception
           */
          @Override
          public int run(String[] args) throws Exception {
              //todo:1-构建一个Job
              Job job = Job.getInstance(this.getConf(),"model");//构建Job对象,调用父类的getconf获取属性的配置
              job.setJarByClass(MapJoinMr.class);//指定可以运行的类型
              //todo:2-配置这个Job
              //input
      //        job.setInputFormatClass(TextInputFormat.class);//设置输入的类的类型,默认就是TextInputFormat
              Path inputPath = new Path("datas/join/orders.txt");//将大的数据进行读取
              //将订单作为输入
              TextInputFormat.setInputPaths(job,inputPath);//为当前job设置输入的路径
              //将商品表放入分布式缓存中
              job.addCacheFile(new Path("datas/join/product.txt").toUri());
      
              //map
              job.setMapperClass(MRMapper.class);//设置Mapper的类,需要调用对应的map方法
              job.setMapOutputKeyClass(Text.class);//设置Mapper输出的key类型
              job.setMapOutputValueClass(Text.class);//设置Mapper输出的value类型
      
              //shuffle
      //        job.setPartitionerClass(HashPartitioner.class);//自定义分区
      //        job.setGroupingComparatorClass(null);//自定义分组的方式
      //        job.setSortComparatorClass(null);//自定义排序的方式
      
              //reduce
      //        job.setReducerClass(MRReducer.class);//设置Reduce的类,需要调用对应的reduce方法
      //        job.setOutputKeyClass(NullWritable.class);//设置Reduce输出的Key类型
      //        job.setOutputValueClass(NullWritable.class);//设置Reduce输出的Value类型
              job.setNumReduceTasks(0);//设置ReduceTask的个数,默认为1
      
              //output:输出目录默认不能提前存在
      //        job.setOutputFormatClass(TextOutputFormat.class);//设置输出的类,默认我诶TextOutputFormat
              Path outputPath = new Path("datas/output/join/mapJoin");//用程序的第三个参数作为输出
              //解决输出目录提前存在,不能运行的问题,提前将目前删掉
              //构建一个HDFS的文件系统
              FileSystem hdfs = FileSystem.get(this.getConf());
              //判断输出目录是否存在,如果存在就删除
              if(hdfs.exists(outputPath)){
                  hdfs.delete(outputPath,true);
              }
              TextOutputFormat.setOutputPath(job,outputPath);//为当前Job设置输出的路径
      
              //todo:3-提交运行Job
              return job.waitForCompletion(true) ? 0:-1;
          }
      
          /**
           * 程序的入口,调用run方法
           * @param args
           */
          public static void main(String[] args) throws Exception {
              //构建一个Configuration对象,用于管理这个程序所有配置,工作会定义很多自己的配置
              Configuration conf = new Configuration();
              //t通过Toolruner的run方法调用当前类的run方法
              int status = ToolRunner.run(conf, new MapJoinMr(), args);
              //退出程序
              System.exit(status);
          }
      
      
          /**
           * @ClassName MRMapper
           * @Description TODO 这是MapReduce模板的Map类
           *      输入的KV类型:由inputformat决定,默认是TextInputFormat
           *      输出的KV类型:由map方法中谁作为key,谁作为Value决定
           */
          public static class MRMapper extends Mapper<LongWritable, Text, Text,Text> {
              //获取分布式缓存中的数据,存入Map集合,Key是商品id,Value是商品名称
              Map<String,String> map = new HashMap<String, String>();
      
      
              /**
               * Map类中有三个方法
               *      setup:在调用map之前会调用一次,类似于初始化的方法
               *      map:实现Map处理逻辑的方法
               *      cleanup:Map结束以后会调用的方法,相当于close方法
               * @param context
               * @throws IOException
               * @throws InterruptedException
               */
              @Override
              protected void setup(Context context) throws IOException, InterruptedException {
                  //将缓存中的数据读取出来,封装到Map集合中
                  URI[] cacheFiles = context.getCacheFiles();
                  //打开这个缓存的文件
                  BufferedReader bufferedReader = new BufferedReader(new FileReader(cacheFiles[0].getPath()));
                  //将每一行的内容,封装到Map集合中
                  String line = null;
                  while(StringUtils.isNotBlank(line = bufferedReader.readLine())){
                      //分割将商品id和商品名称放入Map集合
                      String pid = line.split(",")[0];
                      String pname = line.split(",")[1];
                      map.put(pid,pname);
                  }
              }
      
              @Override
              protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                 //只有订单的数据,取出订单中的商品id
                  String pid = value.toString().split(",")[2];
                  //在Map集合中找到这个商品id对应的商品名称
                  String name = map.get(pid);
                  //输出订单信息和商品名称
                  context.write(value,new Text(name));
              }
          }
      
      }
      
      
    • Map Join的特点

      • 思想:将小文件放入每个Task的内存中,每个Task用大文件的每个部分与小文件的整体做Join,实现Join的古城
      • 特点:Map端实现Join,不需要经过shuffle或者reduce
      • 应用:小数据join大数据 或者 小数据join小叔叔
  • 小结

    • 什么是Map Join,特点和应用场景是什么?
      • 将小数据放入分布式缓存,每个Task从缓存中构建完整的小数据加载到Task内存中,完整的小数据与大数据的一个部分进行join
      • 特点:Map端实现,不需要经过shuffle
      • 应用:小join大,小join小

知识点06:YARN:功能与应用场景

  • 引入:YARN是什么,什么场景下需要使用YARN?
  • 目标掌握YARN的功能与应用场景
  • 路径
    • step1:YARN的定义与本质
    • step2:YARN的功能
    • step3:YARN的应用场景
  • 实施
    • YARN的定义与本质
      • 定义:YARN是一个分布式的资源管理和任务调度的框架
      • 本质:将多台的机器CPU和内存从逻辑上合并为一个整体,对外提供统一的计算服务
        • 分布式计算 = 分布式程序 + 分布式运行环境
        • 分布式程序:MapReduce
        • 分布式运行环境:YARN
    • YARN的功能
      • 功能:提供分布式运行资源环境
      • 理解:资源容器,用于提供分布式资源的
    • YARN的应用场景
      • 实现分布式计算,将分布式程序提交到分布式资源容器中运行
      • MapReduce
      • Tez
      • Spark
      • ……
  • 小结
    • YARN是什么,什么场景下需要使用YARN?
      • 分布式资源管理和任务调度框架
      • 本质:将多台机器的CPU和内存从逻辑上合并,构建成一个整体的资源服务框架
      • 只要实现分布式计算的运行

知识点07:YARN:集群架构

  • 引入:YARN如何实现分布式的资源管理的?

  • 目标掌握YARN的分布式集群架构

  • 实施

    • 架构

      • 分布式主从架构

      Day10:YARN与Hive入门

    • 角色

      • ResourceManager:主:管理节点
        • 管理从节点的状态:心跳机制
        • 管理资源管理和任务调度
          • 资源管理:每个Task能用多少资源
          • 任务调度:多个程序谁先运行谁后运行
        • 接客
      • NodeManager:从:计算节点
        • 接受主节点的任务分配
        • 使用自己所在节点的CPU和内存实现Task的运行
  • 小结

    • YARN的架构是什么?
      • 分布式主从架构
      • ResourceManager
        • 管理从节点
        • 资源管理和任务调度
        • 接客
      • NodeManager
        • 执行主节点的任务

知识点08:YARN:MR提交运行流程

  • 引入:一个MapReduce程序提交给YARN,YARN是如何实现运行的?

  • 目标掌握MapReduce程序在YARN中的运行流程

  • 实施

    • step1:客户端提交运行一个MapReduce的程序给YARN,YARN会检查请求是否合法
    • step2:如果合法,RM会随机选择一台NodeManager启动这个程序的管理者:AppMaster
      • AppMaster每一个程序都有一个
      • 负责这个程序的资源申请、Task的运行、Task的监控,运行的汇报
    • step3:APPMaster会向ResourceManager申请指令和资源信息,获取如何运行这个程序的信息
    • step4:ResourceManager根据Task的情况分配运行的节点以及资源的信息,以Container的形式返回
      • Contiainer:信息包,记录每个Task运行的节点、处理的数据、运行的逻辑、资源
        • 也是MapTask和ReduceTask的父进程
    • step5:APPMaster将指令和container信息分发给对应的NodeManager,每个NodeManger根据指令和资源来启动对应的Task进程
    • step6:MapTask运行结束后,会通知AppMaster,APPMaster会通知ReduceTask到MapTask中拉取数据
    • step7:ReduceTask到每个MapTask中拉取数据,并执行处理,最后运行结束,将结果返回给APPMaster
    • step8:最终结果返回给客户端

Day10:YARN与Hive入门

  • 小结

    • 记住大体的过程即可

知识点09:YARN:资源管理配置

  • 目标了解YARN中的资源管理配置

  • 实施

    • 基本资源属性

Day10:YARN与Hive入门

| 配置属性                                 | 值   | 含义                                    |
| ---------------------------------------- | ---- | --------------------------------------- |
| yarn.nodemanager.resource.memory-mb      | 8192 | 每台NodeManger能够使用的最大物理内存数  |
| yarn.nodemanager.resource.cpu-vcores     | 8    | 每台NodeManger能够使用的最大物理CPU核数 |
| yarn.scheduler.minimum-allocation-mb     | 1024 | 每个Task容器最少申请的内存              |
| yarn.scheduler.maximum-allocation-mb     | 8192 | 每个Task容器最多申请的内存              |
| yarn.scheduler.minimum-allocation-vcores | 1    | 每个Task容器最少申请的CPU核数           |
| yarn.scheduler.maximum-allocation-vcores | 8    | 每个Task容器最多申请的CPU核数           |
| yarn.nodemanager.vmem-pmem-ratio         | 2.1  | 虚拟内存率【磁盘空间】                  |
  • 常见问题

    • OOM:out of memory
      • 物理内存溢出:增加物理内存配置
        • 报错:physic memory 6GB of 5GB
      • 虚拟内存溢出:增加虚拟内存倍数
        • 报错:vistual memory 17 of 16.8
      • 堆内存溢出:配置允许每个Task使用的内存不足
        • 修改MapReduce配置
  • 小结

    • 了解常见资源属性即可

知识点10:YARN:任务调度机制

  • 引入:提交给YARN的程序,按照什么顺序运行,怎么分配资源?

  • 目标了解YARN中常见的任务的调度机制

  • 路径

    • step1:FIFO
    • step2:Capacity
    • step3:Fair
  • 实施

    • FIFO:先进先出调度

      • 应用:Hadoop1.x版本系列

      • 原理

      Day10:YARN与Hive入门

      • 特点

        • 单队列的,单个队列享有整个YARN的所有资源
        • 队列内部是先进先出的,每个程序按照提交的顺序挨个执行
        • 缺点:资源浪费,无法实现并发和并行
    • Capacity:容量调度

      • 应用:Apache版本的Hadoop默认的调度器

      • 原理

    Day10:YARN与Hive入门

    • 特点

      • 多队列的,多个队列自定义分配所有资源
      • 队列内部是先进先出的,每个程序按照提交的顺序挨个执行
      • 优点:能实现多队列的并行执行、用户使用队列机制的隔离,互不干扰,允许资源动态抢占
      • 缺点:队列内部只能运行一个程序,不能实现并发
    • Fair:公平调度

      • 应用:CDH版本的Hadoop默认的调度器

      • 原理

Day10:YARN与Hive入门

- 特点

  - 多队列
  - 队列内部公平的分配资源,允许指定权重和优先级来指定资源的优先分配
  - 动态资源抢占、权重分配

- 配置

  | 属性名称                                                     | 值                                                           | 含义                                                         |
  | :----------------------------------------------------------- | :----------------------------------------------------------- | :----------------------------------------------------------- |
  | yarn.resourcemanager.scheduler.class                         | org.apache.hadoop.<br/>yarn.server.resourcemanager<br/>.scheduler.fair.FairScheduler | 指定调度器类型                                               |
  | yarn.scheduler.fair.allocation.file                          | etc/hadoop/fair-scheduler.xml                                | 指定公平调度的配置文件,配置文件中定义每个队列以及每个队列的资源 |
  | yarn.scheduler.fair.user-as-default-queue                    | false                                                        | 没有指定队列名时,是否使用用户名作为队列名称                 |
  | yarn.scheduler.fair.preemption                               | false                                                        | 是否允许资源抢占                                             |
  | yarn.scheduler.fair.preemption.cluster-utilization-threshold | 0.8                                                          | 最大抢占比例                                                 |
  | yarn.scheduler.fair.sizebasedweight                          | true                                                         | 是否启用权重                                                 |
  | yarn.scheduler.fair.assignmultiple                           | true                                                         | 是否允许分配多个container                                    |
  | yarn.scheduler.fair.max.assign                               | 20                                                           | 如果允许,最多分配多少个                                     |
  | yarn.scheduler.fair.locality.threshold.node                  | -1                                                           | 放弃等待优先本地计算的节点比例,-1表示都不放弃               |
  | yarn.scheduler.fair.locality.threshold.rack                  | -1                                                           | 放弃等待优先机架计算的节点比例,-1表示都不放弃               |
  | yarn.scheduler.fair.allow-undeclared-pools                   | true                                                         | 是否允许创建未定义的队列                                     |
  | yarn.scheduler.fair.update-interval-ms                       | 500                                                          | 表示重新计算公平调度的间隔,单位毫秒                         |
<?xml version="1.0"?>
<allocations>
  <queue name="sample_queue">
    <minResources>10000 mb,0vcores</minResources>
    <maxResources>90000 mb,0vcores</maxResources>
    <maxRunningApps>50</maxRunningApps>
    <maxAMShare>0.1</maxAMShare>
    <weight>2.0</weight>
    <schedulingPolicy>fair</schedulingPolicy>
    <queue name="sample_sub_queue">
      <aclSubmitApps>charlie</aclSubmitApps>
      <minResources>5000 mb,0vcores</minResources>
    </queue>
  </queue>

  <queueMaxAMShareDefault>0.5</queueMaxAMShareDefault>

  <!-- Queue 'secondary_group_queue' is a parent queue and may have
       user queues under it -->
  <queue name="secondary_group_queue" type="parent">
  <weight>3.0</weight>
  </queue>
  
  <user name="sample_user">
    <maxRunningApps>30</maxRunningApps>
  </user>
  <userMaxAppsDefault>5</userMaxAppsDefault>
  
  <queuePlacementPolicy>
    <rule name="specified" />
    <rule name="primaryGroup" create="false" />
    <rule name="nestedUserQueue">
        <rule name="secondaryGroupExistingQueue" create="false" />
    </rule>
    <rule name="default" queue="sample_queue"/>
  </queuePlacementPolicy>
</allocations>
  • 小结

    • 常见的调度机制有哪些,各自的特点和区别是什么?
      • FIFO:单队列,每个队列内部是FIFO,不能并发和并行
      • Capacity:多队列,每个队列内部还是FIFO,能并行不能并发,资源动态抢占
      • Fair:多队列,每个内部是共享资源,能并发也能并行,资源动态抢占,支持权重

知识点11:数据仓库的介绍

  • 引入:工作中处理的数据都放在哪里?

  • 目标了解数据仓库的基本功能和场景

    • 什么是数据仓库?
  • 路径

    • step1:OLTP与OLAP
    • step2:数据仓库的设计
    • step3:数据仓库的流程
  • 实施

    • OLTP与OLAP

      • OLTP:联机事务处理

        • 应用:满足业务上的数据存储

        • 例如

      Day10:YARN与Hive入门

      • 特点

        • 数据量小
        • 支持事务
        • 效率高,延迟性比较低
      • 模型:数据库

      • 实现:数据库工具

        • MySQL、Oracle
      • OLAP:联机分析处理

        • 应用:专门用于解决数据处理的数据存储的平台

        • 例如

      Day10:YARN与Hive入门

      • 特点

        • 数据量大
        • 不需要支持事务
        • 时间的要求不高,离线的OLAP:T+1
      • 存储:数据仓库模型

      • 实现:数据仓库工具

        • Hive、Greenplum
    • 数据仓库的设计

      • 数据仓库:更加规范化和统一化的数据管理的设计模型,用于实现数据加工和处理的平台,实现数据的应用
      • 举个例子:做一道红烧肉
        • 数据库:为了数据存储而设计,不同的数据存储在不同的数据库中
          • step1:买肉
          • step2:买配料
          • step3:切、腌制、焯水
          • step4:炖
          • step5:吃
        • 数据仓库
          • step1:去超市买现成的
          • step2:加热以下吃
      • 数据仓库是一个数据的超市:所有数据按照分类进行存放,在数据仓库中对数据进行加工
        • 数据库
          • 人事管理系统:MySQL
          • 财务管理系统:Oracle
          • |
          • 公司的年度报表:人事和财务数据
        • 数据仓库
          • 所有数据进行集中存储
    • 数据仓库的流程

      • ETL:数据清洗,将各种各样的数据按照规则进行过滤,补全、转换
      • 分层:数据从进入数据仓库到最后被应用,数据经过处理的步骤
      • 建模:表应该怎么建,表的设计
  • 小结

    • 什么是数据仓库?
      • 定义:数据仓库是一种规范化和统一化的数据存储的设计模型,用于解决数据处理应用的问题
      • 本质:数据仓库本质就是一个数据超市,数据在数据仓库中分类存放并且可以对数据进行加工,直接使用加工后的数据

知识点12:Hive的功能及应用场景

  • 引入:数据仓库中的分布式计算如果用MR来实现,业务分析人员不会写Java怎么办?

  • 目标掌握Hive的功能与应用场景

    • 什么是Hive,有什么功能?
    • Hive在工作中的应用场景是什么?
  • 路径

    • step1:Hive的诞生
    • step2:Hive的功能
    • step3:Hive的应用场景
  • 实施

    • Hive的诞生

      • 缘起FackBook,用Hadoop来实现大量数据的分析处理时候遇到了问题
        • 分析的需求只有业务人员才知道怎么分析:不会写Java,会SQL
        • 分析的程序只有开发人员才知道怎么开发:不会写分析
      • 解决:让开发人员写一个工具,将SQL语句转换为MapReduce程序
    • Hive的功能

      • MapReduce处理的数据来自于HDFS文件

      • SQL处理的来自于表

      • 功能一:Hive实现了将HDFS文件映射成表的数据

        • 使用功能一来构建数据仓库,实现分布式的数据存储
      • 功能二:Hive实现了将SQL转换为MapReduce/Tez/Spark的程序,提交给YARN执行

        • 功能二一般不使用Hive来做,Hive默认转换为MapReduce,性能很差
        • 工作中使用其他的一些分布式计算的SQL引擎来代替Hive
          • Presto
          • Impala
          • SparkSQL
      • 功能:实现分布式存储【HDFS】和分布式计算【MapReduce/Tez/Spark】

      • 本质:Hive是一种支持SQL的特殊的Hadoop的客户端

      • 不使用Hive:用户必须会Java或者Python,才能操作Hadoop

    Day10:YARN与Hive入门

    • 有了Hive:用户可以通过SQL直接操作Hadoop

      Day10:YARN与Hive入门

    • Hive的应用场景

      • 通过功能一构建数据仓库
    • Hive官方:hive.apache.org
      Day10:YARN与Hive入门

  • 小结

    • 什么是Hive,有什么功能?
      • 本质:一种支持SQL的Hadoop的客户端
      • 功能
        • 功能一:将HDFS文件映射成Hive中表【离线数据仓库】
        • 功能二:将SQL转换为MR程序,提交给YARN执行
    • Hive在工作中的应用场景是什么?
      • 用Hive作为数据仓库工具,实现数据仓库的构建
      • 分布式SQL计算查询:一般不用Hive自带的计算来实现
        • Impala
        • SparkSQL

知识点13:Hive的架构

  • 目标掌握Hive的架构组成

    • Hive由哪些部分组成?
  • 实施

Day10:YARN与Hive入门

  • Hive客户端

    • 连接用户与服务端
    • 提供SQL开发界面,允许用户开发SQL,将用户开发的SQL提交给Hive的服务端
    • 将服务端执行的结果返回给用户
  • Hive服务端

    • 接受客户端的请求
    • 解析用户提交的SQL语句
    • 将SQL转换为分布式计算或者分布式存储的程序提交给Hadoop运行
    • SQL解析器:解析SQL语句的
    • 编译器:将SQL生成执行计划
    • 优化器:选择最优的方案来实现
    • 执行器:执行对应的处理操作
  • 元数据

    • Hive写一条SQL语句

      select count(*) from table;
      
    • 问题:SQL语句会变成MapReduce,读的文件是谁呢?

    • 元数据:存储整个Hive中所有核心的数据

      • Hive中所有数据库、表的信息
      • 记录了HDFS文件与Hive表的映射关系
  • 小结

    • Hive由哪些部分组成?
      • 客户端:提交SQL
      • 服务端:处理SQL
      • 元数据:记录HDFS文件与Hive表的映射关系

知识点14:Hive的部署

  • 目标实现Hive的安装部署

  • 路径

    • step1:版本下载
    • step2:解压安装
    • step3:修改配置
    • step4:启动
  • 实施

    • 版本下载

      • 目前在工作中:Hive1.x和Hive2.x
    • 区别

      • 越来越像MySQL:语法支持的越全面,功能特点都支持的越来越多
      • 底层计算引擎的优化越来越好,性能越来越高【不用MapReduce】
  • 解压安装

    • 以第三台机器为例

      • 将hive安装包和MySQL连接驱动包上传到/export/software目录下

      • 上传

    cd /export/software/
      rz

Day10:YARN与Hive入门

  • 解压安装Hive

      tar -zxvf apache-hive-2.1.0-bin.tar.gz -C /export/server/
    cd /export/server/
      mv apache-hive-2.1.0-bin hive-2.1.0-bin
    
    • bin:客户端和服务端的管理命令存储目录

      • conf:配置文件目录
    • lib:依赖包

Day10:YARN与Hive入门

- 安装MySQL

- 为什么要用到MySQL?

  - 将Hive的元数据存储在MySQL中

  - inux阶段已安装过,不用安装,检查配置即可

  - 检查MySQL登录与授权是否正常
  
- 检查登录

  ```shell
  mysql -uroot -p
  ```
  
- 检查授权

    ```shell
  select user,host from mysql.user;
  ```

Day10:YARN与Hive入门

- 如果不一样,没有授权,请执行授权语句
  
    ```sql
    grant all privileges on *.* to 'root'@'%' identified by '123456'; 
    delete from mysql.user where host != '%';
    flush privileges; 
    ```
  • 修改配置

    • 切换到配置文件目录

      cd /export/server/hive-2.1.0-bin/conf/
      
    • 修改hive-env.sh

      mv hive-env.sh.template hive-env.sh
      
      #修改48行
      HADOOP_HOME=/export/server/hadoop-2.7.5
      #修改51行
      export HIVE_CONF_DIR=/export/server/hive-2.1.0-bin/conf
      

Day10:YARN与Hive入门

- 将提供的hive-site.xml放入conf目录中

- 修改hive-site.xml
      <property>
      	<name>javax.jdo.option.ConnectionUserName</name>
    	<value>root</value>
      </property>
    <property>
      	<name>javax.jdo.option.ConnectionPassword</name>
      	<value>123456</value>
      </property>
      <property>
      	<name>javax.jdo.option.ConnectionURL</name>
    	<value>jdbc:mysql://node3:3306/hivemetadata?createDatabaseIfNotExist=true&amp;useSSL=false</value>
      </property>
    <property>
      	<name>javax.jdo.option.ConnectionDriverName</name>
    	<value>com.mysql.jdbc.Driver</value>
      </property>
      <property>
      	<name>hive.metastore.schema.verification</name>
      	<value>false</value>
      </property>
      <property>
      	<name>datanucleus.schema.autoCreateAll</name>
      	<value>true</value>
      </property>
    <property>
      	<name>hive.server2.thrift.bind.host</name>
    	<value>node3</value>
      </property>
  • 将MySQL连接驱动放入Hive的lib目录中

    cp /export/software/mysql-connector-java-5.1.38.jar /export/server/hive-2.1.0-bin/lib/ 
    cd /export/server/hive-2.1.0-bin/
    ll lib/
    

Day10:YARN与Hive入门

  • 配置Hive的环境变量

      vim /etc/profile
      
      #HIVE_HOME
      export HIVE_HOME=/export/server/hive-2.1.0-bin
      export PATH=:$PATH:$HIVE_HOME/bin
    
      source /etc/profile
    
  • 启动

    • 启动HDFS和YARN

      start-dfs.sh
      
      start-yarn.sh
      
    • 启动Hive

      • 第一次启动Hive之前需要初始化元数据【只有第一次启动需要,以后每次启动都不需要了,类似于格式化】

      • 创建HDFS相关目录

        hdfs dfs -mkdir  -p    /user/hive/warehouse
        hdfs dfs -chmod g+w   /tmp
        hdfs dfs -chmod g+w   /user/hive/warehouse
        
        • /user/hive/warehouse:Hive中所有表的数据在HDFS中的存储目录
      • 初始化Hive元数据

        cd /export/server/hive-2.1.0-bin/
        bin/schematool -dbType mysql -initSchema
        

    Day10:YARN与Hive入门

Day10:YARN与Hive入门

  • 启动Hive
        hive

Day10:YARN与Hive入门

Day10:YARN与Hive入门

  • 小结
    • 参考笔记一步步实现即可

知识点15:Hive实现WordCount

  • 引入:Hive如何实现WordCount的问题?

  • 目标实现HiveSQL开发WordCount程序

  • 路径

    • step1:创建表
    • step2:加载HDFS数据
    • step3:SQL分析处理
  • 实施

    • 创建表

      create table tb_word(
      words string
      );
      
    • 加载HDFS数据

      load data inpath '/wordcount.txt' into table tb_word;
      

Day10:YARN与Hive入门
Day10:YARN与Hive入门

  • SQL分析处理

    create table tb_word2 as select explode(split(words," ")) as word from tb_word;
    select word,count(*) as numb from tb_word2 group by word order by numb desc;
    

Day10:YARN与Hive入门

  • 小结

    • 如何使用HiveSQL实现WordCount开发?
      • 创建表,实现HDFS文件与表的映射
      • SQL + 函数实现计算处理需求

知识点16:Hive实现二手房统计分析

  • 引入:使用MR实现二手房分析时,需要自定义数据类型解决,使用HiveSQL如何解决?

  • 目标实现HiveSQL开发二手房统计分析程序

  • 路径

    • step1:创建表
    • step2:加载HDFS数据
    • step3:SQL分析处理
  • 实施

    • 数据

      梅园六街坊,2室0厅,47.72,浦东,低区/6层,朝南,500,104777,1992年建
      
    • 创建表

      create table tb_house(
      xiaoqu string,
      huxing string,
      area double,
      region string,
      floor string,
      fangxiang string,
      t_price int,
      s_price int,
      buildinfo string
      ) row format delimited fields terminated by ',';
      
    • 加载本地文件数据

      load data local inpath '/export/data/secondhouse.csv' into table tb_house;
      

Day10:YARN与Hive入门

  • SQL分析处理

    select 
      region,
      count(*) as numb,
      round(avg(s_price),2) as avgprice,
      max(s_price) as maxprice,
      min(s_price) as minprice
    from tb_house
    group by region;
    

Day10:YARN与Hive入门

  • 小结

    • 如何实现HiveSQL开发二手房统计分析程序?
      • step1:创建表:根据数据的格式
      • step2:关联数据文件
      • step3:SQL + 函数
上一篇:逐梦校友圈——冲刺day10


下一篇:day10:第10讲-函数三