优秀是一种习惯
知识点01:回顾
-
为什么要设计Shuffle?
- 全局分组和排序
-
Map端的Shuffle阶段如何实现?
- Spill:将每个MapTask每个小的部分实现局部排序,生成多个有序的小文件
- MapTask将数据写入一个环形缓冲区【内存:100M】
- 阈值:80%
- 先排序:快排
- 后溢写:排序好的内容生成小文件
- Merge:将自己所有小文件合并为一个大文件,并且实现排序
- 实现每个MapTask整体有序
- 排序:插入排序
- Spill:将每个MapTask每个小的部分实现局部排序,生成多个有序的小文件
-
Reduce端的Shuffle阶段如何实现?
- Merge:将所有MapTask中属于自己的数据进行合并并排序
- 排序:插入排序
- Merge:将所有MapTask中属于自己的数据进行合并并排序
-
Combiner的功能是什么?如何实现Combiner?
- 功能:Map端聚合,利用MapTask个数远大于ReduceTask个数,MapTask提前做一次聚合
- 实现
- job.setCombinerClass(Reducer)
- 效果:减少Reduce输出的数据量,降低了Reduce负载
-
压缩的好处是什么?常见的压缩类型有哪些?MapReduce如何配置压缩?
- 优点:降低了数据存储大小,提高了IO传输速度,提高性能
- 类型:Snappy、Lzo、Lz4
- 配置
- Input:不用配置
- Map Out:配置开启和指定压缩类型
- Reduce Out:配置开启和指定压缩类型
-
Shuffle分组的规则是什么?如何自定义分组比较器?
-
规则:先调用分组比较器,如果有,直接调用比较方法,如果没有,调用K2的比较方法
-
定义
-
继承WritableComparator,重写compare
-
排序:大于、等于、小于
job.setSortCOmparatorClass
-
分组:等于、不等于
job.setGroupingComparatorClass
-
-
知识点02:目标
- MapReduce补充知识点
- 分片的规则:读取文件时,如何决定划分多少个分片,决定了MapTask个数
- MapReduce如何实现Join?
- Reduce Join
- Map Join【重点】
- YARN的介绍
- 功能与应用场景
- 架构以及MR程序的运行流程
- 资源管理和任务调度
- 数据仓库与Hive的入门
- 数据仓库
- Hive的介绍:功能与应用【重要】
- Hive的安装部署
- 案例:Wordcount、二手房统计
知识点03:MapReduce补充:分片规则
-
引入:Reduce的个数可以自己指定,Map的个数如何决定?
-
目标:了解TextInputFormat中的分片规则
-
路径
- step1:InputFormat的功能
- step2:读取数据的实现
- step3:分片的规则
-
实施
-
InputFormat的功能
- 功能一:将读取到的所有输入数据划分为多个分片Split
- 功能二:将每个分片的数据转换为KV
-
TextInputFormat读取数据的实现
- createRecordReader:真正调用读取器读取数据的方法
- LineRecordReader:真正读取器的对象【JavaBean】
- nextKeyValue:将每一条数据转换为KV结构的方法
-
TextInputFormat分片的规则
-
getSplits:用于将输入的所有数据划分为多个分片
-
规则
-
step1:判断是否分片的条件
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) 文件大小 / splitSize > 1.1
-
step2:计算splitSize大小
long splitSize = computeSplitSize(blockSize, minSize, maxSize); 128M 1 Long.MAX_VALUE minsize = 最小分片数 max(1,mapreduce.input.fileinputformat.split.minsize=1) maxsize = 最大分片数 mapreduce.input.fileinputformat.split.maxsize
-
step3:computeSplitSize计算逻辑
Math.max(minSize, Math.min(maxSize, blockSize)) max(1,min(Long.MAX_VALUE,128M))
-
规则
- 判断当前文件的大小是否大于128M的1.1倍
- 如果大于:将文件的128M作为一个分片,再次判断,直到所有分片构建
- 如果不大于:剩下的整体作为一个分片
- 130M:1个分片
- 145M:2个
- split1:128M
- split2:17M
- 判断当前文件的大小是否大于128M的1.1倍
-
-
-
-
小结
- TextInputFormat中分片的规则是什么?
- 按照文件大小的1.1倍判断
- 大于:每128M作为一个分片
- 不大于:整体作为一个分片
- 注意:如果要干预MapTask个数,怎么干预?
- 调整minSplitSize和maxSplitSize大小
- TextInputFormat中分片的规则是什么?
知识点04:MapReduce补充:Reduce Join
-
引入:MapReduce用于实现数据统计分析,类似于SQL,如何实现Join过程?
-
目标:了解MapReduce中Reduce Join的实现
- 什么是Reduce Join,特点和应用场景是什么?
-
路径
- step1:Reduce Join的原理
- step2:Reduce Join的实现
- step3:Reduce Join的特点
-
实施
-
Reduce Join的原理
- 将Join的关联字段作为K2,将订单信息和商品的信息作为V2
- Shuffle过程中分组时候,会将相同商品id对应的商品信息和订单信息放入同一个迭代器中
- Reduce只要拼接即可
-
Reduce Join的实现
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; /** * @ClassName MRDriver * @Description TODO 实现ReduceJoin * @Create By Frank */ public class ReduceJoinMr extends Configured implements Tool { /** * 用于将Job的代码封装 * @param args * @return * @throws Exception */ @Override public int run(String[] args) throws Exception { //todo:1-构建一个Job Job job = Job.getInstance(this.getConf(),"model");//构建Job对象,调用父类的getconf获取属性的配置 job.setJarByClass(ReduceJoinMr.class);//指定可以运行的类型 //todo:2-配置这个Job //input // job.setInputFormatClass(TextInputFormat.class);//设置输入的类的类型,默认就是TextInputFormat Path inputPath1 = new Path("datas/join/orders.txt");//读取订单数据 Path inputPath2 = new Path("datas/join/product.txt");//读取商品的数据 //设置的路径可以给目录,也可以给定文件,如果给定目录,会将目录中所有文件作为输入,但是目录中不能包含子目录 TextInputFormat.setInputPaths(job,inputPath1,inputPath2);//为当前job设置输入的路径 //map job.setMapperClass(MRMapper.class);//设置Mapper的类,需要调用对应的map方法 job.setMapOutputKeyClass(Text.class);//设置Mapper输出的key类型 job.setMapOutputValueClass(Text.class);//设置Mapper输出的value类型 //shuffle // job.setPartitionerClass(HashPartitioner.class);//自定义分区 // job.setGroupingComparatorClass(null);//自定义分组的方式 // job.setSortComparatorClass(null);//自定义排序的方式 //reduce job.setReducerClass(MRReducer.class);//设置Reduce的类,需要调用对应的reduce方法 job.setOutputKeyClass(Text.class);//设置Reduce输出的Key类型 job.setOutputValueClass(Text.class);//设置Reduce输出的Value类型 job.setNumReduceTasks(1);//设置ReduceTask的个数,默认为1 //output:输出目录默认不能提前存在 // job.setOutputFormatClass(TextOutputFormat.class);//设置输出的类,默认我诶TextOutputFormat Path outputPath = new Path("datas/output/join/reduceJoin");//用程序的第三个参数作为输出 //解决输出目录提前存在,不能运行的问题,提前将目前删掉 //构建一个HDFS的文件系统 FileSystem hdfs = FileSystem.get(this.getConf()); //判断输出目录是否存在,如果存在就删除 if(hdfs.exists(outputPath)){ hdfs.delete(outputPath,true); } TextOutputFormat.setOutputPath(job,outputPath);//为当前Job设置输出的路径 //todo:3-提交运行Job return job.waitForCompletion(true) ? 0:-1; } /** * 程序的入口,调用run方法 * @param args */ public static void main(String[] args) throws Exception { //构建一个Configuration对象,用于管理这个程序所有配置,工作会定义很多自己的配置 Configuration conf = new Configuration(); //t通过Toolruner的run方法调用当前类的run方法 int status = ToolRunner.run(conf, new ReduceJoinMr(), args); //退出程序 System.exit(status); } /** * @ClassName MRMapper * @Description TODO 这是MapReduce模板的Map类 * 输入的KV类型:由inputformat决定,默认是TextInputFormat * 输出的KV类型:由map方法中谁作为key,谁作为Value决定 */ public static class MRMapper extends Mapper<LongWritable, Text, Text,Text> { Text outputKey= new Text(); Text outputValue = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //两个文件的每一行数据,就是Value // 要判断这条数据来自哪个文件 FileSplit fileSplit = (FileSplit) context.getInputSplit();//获取这条数据对应的分片 String name = fileSplit.getPath().getName();//获取这条数据对应的文件的名称 if("orders.txt".equals(name)){ //如果这是订单的数据:1001,20150710,p0001,2 String[] split1 = value.toString().split(","); //用商品id作为Key this.outputKey.set(split1[2]); //其他信息作为Value this.outputValue.set(split1[0]+"\t"+split1[1]+"\t"+split1[3]); //输出 context.write(this.outputKey,this.outputValue); }else{ //这是商品数据:p0001,直升机,1000,2000 String[] split2 = value.toString().split(","); //用商品id作为key this.outputKey.set(split2[0]); //用商品名称作为Value this.outputValue.set(split2[1]); //输出 context.write(this.outputKey,this.outputValue); } } } /** * @ClassName MRReducer * @Description TODO MapReduce模板的Reducer的类 * 输入的KV类型:由Map的输出决定,保持一致 * 输出的KV类型:由reduce方法中谁作为key,谁作为Value决定 */ public static class MRReducer extends Reducer<Text,Text,Text,Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //传进来的是每个商品id对应的订单信息和商品名称 StringBuilder stringBuilder = new StringBuilder(); for (Text value : values) { //将订单信息和商品的名称进行拼接 stringBuilder.append(value.toString()+"\t"); } //输出 context.write(key,new Text(stringBuilder.toString())); } } }
-
Reduce Join的特点
- 利用了Shuffle中全局分组实现两份数据的Join
- Join发生在Reduce端
- 必须经过Shuffle
- 应用:适合于大数据join大数据
-
-
小结
- 什么是Reduce Join,特点和应用场景是什么?
- 利用SHuffle中的分组,将Join字段作为K2,所有join字段相关的数据放在同一个迭代器中
- 特点:必须经过shuffle
- 应用:大数据join大数据
- 什么是Reduce Join,特点和应用场景是什么?
知识点05:MapReduce补充:Map Join
-
引入:Reduce Join必须经过Shuffle,有没有更好的方案?
-
目标:了解MapReduce中Map Join的实现
-
路径
- step1:Map Join的原理
- step2:Map Join的实现
- step3:Map Join的特点
-
实施
- Map Join的原理
-
在每一个MapTask 的内存中放入一份完整的商品表,前提:商品表的数据比较小
-
每个MapTask处理一部分订单表,直接在MapTask所在的内存中订单表的每一个小部分与商品表的完整的数据进行join
-
Map Join的实现
- Mapper类或者Reduce类总共有3个方法
- setup:初始化方法,在Mapper构建实例的时候会被调用一次
- map/reduce
- cleanUp:关闭方法,用于释放资源,最后释放之前会被调用一次
import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.HashMap; import java.util.Map; /** * @ClassName MRDriver * @Description TODO 这是MapReduce程序的Driver类的模板 * @Date 2020/5/30 10:34 * @Create By Frank */ public class MapJoinMr extends Configured implements Tool { /** * 用于将Job的代码封装 * @param args * @return * @throws Exception */ @Override public int run(String[] args) throws Exception { //todo:1-构建一个Job Job job = Job.getInstance(this.getConf(),"model");//构建Job对象,调用父类的getconf获取属性的配置 job.setJarByClass(MapJoinMr.class);//指定可以运行的类型 //todo:2-配置这个Job //input // job.setInputFormatClass(TextInputFormat.class);//设置输入的类的类型,默认就是TextInputFormat Path inputPath = new Path("datas/join/orders.txt");//将大的数据进行读取 //将订单作为输入 TextInputFormat.setInputPaths(job,inputPath);//为当前job设置输入的路径 //将商品表放入分布式缓存中 job.addCacheFile(new Path("datas/join/product.txt").toUri()); //map job.setMapperClass(MRMapper.class);//设置Mapper的类,需要调用对应的map方法 job.setMapOutputKeyClass(Text.class);//设置Mapper输出的key类型 job.setMapOutputValueClass(Text.class);//设置Mapper输出的value类型 //shuffle // job.setPartitionerClass(HashPartitioner.class);//自定义分区 // job.setGroupingComparatorClass(null);//自定义分组的方式 // job.setSortComparatorClass(null);//自定义排序的方式 //reduce // job.setReducerClass(MRReducer.class);//设置Reduce的类,需要调用对应的reduce方法 // job.setOutputKeyClass(NullWritable.class);//设置Reduce输出的Key类型 // job.setOutputValueClass(NullWritable.class);//设置Reduce输出的Value类型 job.setNumReduceTasks(0);//设置ReduceTask的个数,默认为1 //output:输出目录默认不能提前存在 // job.setOutputFormatClass(TextOutputFormat.class);//设置输出的类,默认我诶TextOutputFormat Path outputPath = new Path("datas/output/join/mapJoin");//用程序的第三个参数作为输出 //解决输出目录提前存在,不能运行的问题,提前将目前删掉 //构建一个HDFS的文件系统 FileSystem hdfs = FileSystem.get(this.getConf()); //判断输出目录是否存在,如果存在就删除 if(hdfs.exists(outputPath)){ hdfs.delete(outputPath,true); } TextOutputFormat.setOutputPath(job,outputPath);//为当前Job设置输出的路径 //todo:3-提交运行Job return job.waitForCompletion(true) ? 0:-1; } /** * 程序的入口,调用run方法 * @param args */ public static void main(String[] args) throws Exception { //构建一个Configuration对象,用于管理这个程序所有配置,工作会定义很多自己的配置 Configuration conf = new Configuration(); //t通过Toolruner的run方法调用当前类的run方法 int status = ToolRunner.run(conf, new MapJoinMr(), args); //退出程序 System.exit(status); } /** * @ClassName MRMapper * @Description TODO 这是MapReduce模板的Map类 * 输入的KV类型:由inputformat决定,默认是TextInputFormat * 输出的KV类型:由map方法中谁作为key,谁作为Value决定 */ public static class MRMapper extends Mapper<LongWritable, Text, Text,Text> { //获取分布式缓存中的数据,存入Map集合,Key是商品id,Value是商品名称 Map<String,String> map = new HashMap<String, String>(); /** * Map类中有三个方法 * setup:在调用map之前会调用一次,类似于初始化的方法 * map:实现Map处理逻辑的方法 * cleanup:Map结束以后会调用的方法,相当于close方法 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void setup(Context context) throws IOException, InterruptedException { //将缓存中的数据读取出来,封装到Map集合中 URI[] cacheFiles = context.getCacheFiles(); //打开这个缓存的文件 BufferedReader bufferedReader = new BufferedReader(new FileReader(cacheFiles[0].getPath())); //将每一行的内容,封装到Map集合中 String line = null; while(StringUtils.isNotBlank(line = bufferedReader.readLine())){ //分割将商品id和商品名称放入Map集合 String pid = line.split(",")[0]; String pname = line.split(",")[1]; map.put(pid,pname); } } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //只有订单的数据,取出订单中的商品id String pid = value.toString().split(",")[2]; //在Map集合中找到这个商品id对应的商品名称 String name = map.get(pid); //输出订单信息和商品名称 context.write(value,new Text(name)); } } }
- Mapper类或者Reduce类总共有3个方法
-
Map Join的特点
- 思想:将小文件放入每个Task的内存中,每个Task用大文件的每个部分与小文件的整体做Join,实现Join的古城
- 特点:Map端实现Join,不需要经过shuffle或者reduce
- 应用:小数据join大数据 或者 小数据join小叔叔
-
小结
- 什么是Map Join,特点和应用场景是什么?
- 将小数据放入分布式缓存,每个Task从缓存中构建完整的小数据加载到Task内存中,完整的小数据与大数据的一个部分进行join
- 特点:Map端实现,不需要经过shuffle
- 应用:小join大,小join小
- 什么是Map Join,特点和应用场景是什么?
知识点06:YARN:功能与应用场景
- 引入:YARN是什么,什么场景下需要使用YARN?
- 目标:掌握YARN的功能与应用场景
-
路径
- step1:YARN的定义与本质
- step2:YARN的功能
- step3:YARN的应用场景
-
实施
-
YARN的定义与本质
- 定义:YARN是一个分布式的资源管理和任务调度的框架
- 本质:将多台的机器CPU和内存从逻辑上合并为一个整体,对外提供统一的计算服务
- 分布式计算 = 分布式程序 + 分布式运行环境
- 分布式程序:MapReduce
- 分布式运行环境:YARN
-
YARN的功能
- 功能:提供分布式运行资源环境
- 理解:资源容器,用于提供分布式资源的
-
YARN的应用场景
- 实现分布式计算,将分布式程序提交到分布式资源容器中运行
- MapReduce
- Tez
- Spark
- ……
-
YARN的定义与本质
-
小结
- YARN是什么,什么场景下需要使用YARN?
- 分布式资源管理和任务调度框架
- 本质:将多台机器的CPU和内存从逻辑上合并,构建成一个整体的资源服务框架
- 只要实现分布式计算的运行
- YARN是什么,什么场景下需要使用YARN?
知识点07:YARN:集群架构
-
引入:YARN如何实现分布式的资源管理的?
-
目标:掌握YARN的分布式集群架构
-
实施
-
架构
- 分布式主从架构
-
角色
- ResourceManager:主:管理节点
- 管理从节点的状态:心跳机制
- 管理资源管理和任务调度
- 资源管理:每个Task能用多少资源
- 任务调度:多个程序谁先运行谁后运行
- 接客
- NodeManager:从:计算节点
- 接受主节点的任务分配
- 使用自己所在节点的CPU和内存实现Task的运行
- ResourceManager:主:管理节点
-
-
小结
- YARN的架构是什么?
- 分布式主从架构
- ResourceManager
- 管理从节点
- 资源管理和任务调度
- 接客
- NodeManager
- 执行主节点的任务
- YARN的架构是什么?
知识点08:YARN:MR提交运行流程
-
引入:一个MapReduce程序提交给YARN,YARN是如何实现运行的?
-
目标:掌握MapReduce程序在YARN中的运行流程
-
实施
- step1:客户端提交运行一个MapReduce的程序给YARN,YARN会检查请求是否合法
- step2:如果合法,RM会随机选择一台NodeManager启动这个程序的管理者:AppMaster
- AppMaster每一个程序都有一个
- 负责这个程序的资源申请、Task的运行、Task的监控,运行的汇报
- step3:APPMaster会向ResourceManager申请指令和资源信息,获取如何运行这个程序的信息
- step4:ResourceManager根据Task的情况分配运行的节点以及资源的信息,以Container的形式返回
- Contiainer:信息包,记录每个Task运行的节点、处理的数据、运行的逻辑、资源
- 也是MapTask和ReduceTask的父进程
- Contiainer:信息包,记录每个Task运行的节点、处理的数据、运行的逻辑、资源
- step5:APPMaster将指令和container信息分发给对应的NodeManager,每个NodeManger根据指令和资源来启动对应的Task进程
- step6:MapTask运行结束后,会通知AppMaster,APPMaster会通知ReduceTask到MapTask中拉取数据
- step7:ReduceTask到每个MapTask中拉取数据,并执行处理,最后运行结束,将结果返回给APPMaster
- step8:最终结果返回给客户端
-
小结
- 记住大体的过程即可
知识点09:YARN:资源管理配置
-
目标:了解YARN中的资源管理配置
-
实施
- 基本资源属性
| 配置属性 | 值 | 含义 |
| ---------------------------------------- | ---- | --------------------------------------- |
| yarn.nodemanager.resource.memory-mb | 8192 | 每台NodeManger能够使用的最大物理内存数 |
| yarn.nodemanager.resource.cpu-vcores | 8 | 每台NodeManger能够使用的最大物理CPU核数 |
| yarn.scheduler.minimum-allocation-mb | 1024 | 每个Task容器最少申请的内存 |
| yarn.scheduler.maximum-allocation-mb | 8192 | 每个Task容器最多申请的内存 |
| yarn.scheduler.minimum-allocation-vcores | 1 | 每个Task容器最少申请的CPU核数 |
| yarn.scheduler.maximum-allocation-vcores | 8 | 每个Task容器最多申请的CPU核数 |
| yarn.nodemanager.vmem-pmem-ratio | 2.1 | 虚拟内存率【磁盘空间】 |
-
常见问题
- OOM:out of memory
- 物理内存溢出:增加物理内存配置
- 报错:physic memory 6GB of 5GB
- 虚拟内存溢出:增加虚拟内存倍数
- 报错:vistual memory 17 of 16.8
- 堆内存溢出:配置允许每个Task使用的内存不足
- 修改MapReduce配置
- 物理内存溢出:增加物理内存配置
- OOM:out of memory
-
小结
- 了解常见资源属性即可
知识点10:YARN:任务调度机制
-
引入:提交给YARN的程序,按照什么顺序运行,怎么分配资源?
-
目标:了解YARN中常见的任务的调度机制
-
路径
- step1:FIFO
- step2:Capacity
- step3:Fair
-
实施
-
FIFO:先进先出调度
-
应用:Hadoop1.x版本系列
-
原理
-
特点
- 单队列的,单个队列享有整个YARN的所有资源
- 队列内部是先进先出的,每个程序按照提交的顺序挨个执行
- 缺点:资源浪费,无法实现并发和并行
-
-
Capacity:容量调度
-
应用:Apache版本的Hadoop默认的调度器
-
原理
-
-
特点
- 多队列的,多个队列自定义分配所有资源
- 队列内部是先进先出的,每个程序按照提交的顺序挨个执行
- 优点:能实现多队列的并行执行、用户使用队列机制的隔离,互不干扰,允许资源动态抢占
- 缺点:队列内部只能运行一个程序,不能实现并发
-
Fair:公平调度
-
应用:CDH版本的Hadoop默认的调度器
-
原理
-
-
- 特点
- 多队列
- 队列内部公平的分配资源,允许指定权重和优先级来指定资源的优先分配
- 动态资源抢占、权重分配
- 配置
| 属性名称 | 值 | 含义 |
| :----------------------------------------------------------- | :----------------------------------------------------------- | :----------------------------------------------------------- |
| yarn.resourcemanager.scheduler.class | org.apache.hadoop.<br/>yarn.server.resourcemanager<br/>.scheduler.fair.FairScheduler | 指定调度器类型 |
| yarn.scheduler.fair.allocation.file | etc/hadoop/fair-scheduler.xml | 指定公平调度的配置文件,配置文件中定义每个队列以及每个队列的资源 |
| yarn.scheduler.fair.user-as-default-queue | false | 没有指定队列名时,是否使用用户名作为队列名称 |
| yarn.scheduler.fair.preemption | false | 是否允许资源抢占 |
| yarn.scheduler.fair.preemption.cluster-utilization-threshold | 0.8 | 最大抢占比例 |
| yarn.scheduler.fair.sizebasedweight | true | 是否启用权重 |
| yarn.scheduler.fair.assignmultiple | true | 是否允许分配多个container |
| yarn.scheduler.fair.max.assign | 20 | 如果允许,最多分配多少个 |
| yarn.scheduler.fair.locality.threshold.node | -1 | 放弃等待优先本地计算的节点比例,-1表示都不放弃 |
| yarn.scheduler.fair.locality.threshold.rack | -1 | 放弃等待优先机架计算的节点比例,-1表示都不放弃 |
| yarn.scheduler.fair.allow-undeclared-pools | true | 是否允许创建未定义的队列 |
| yarn.scheduler.fair.update-interval-ms | 500 | 表示重新计算公平调度的间隔,单位毫秒 |
<?xml version="1.0"?>
<allocations>
<queue name="sample_queue">
<minResources>10000 mb,0vcores</minResources>
<maxResources>90000 mb,0vcores</maxResources>
<maxRunningApps>50</maxRunningApps>
<maxAMShare>0.1</maxAMShare>
<weight>2.0</weight>
<schedulingPolicy>fair</schedulingPolicy>
<queue name="sample_sub_queue">
<aclSubmitApps>charlie</aclSubmitApps>
<minResources>5000 mb,0vcores</minResources>
</queue>
</queue>
<queueMaxAMShareDefault>0.5</queueMaxAMShareDefault>
<!-- Queue 'secondary_group_queue' is a parent queue and may have
user queues under it -->
<queue name="secondary_group_queue" type="parent">
<weight>3.0</weight>
</queue>
<user name="sample_user">
<maxRunningApps>30</maxRunningApps>
</user>
<userMaxAppsDefault>5</userMaxAppsDefault>
<queuePlacementPolicy>
<rule name="specified" />
<rule name="primaryGroup" create="false" />
<rule name="nestedUserQueue">
<rule name="secondaryGroupExistingQueue" create="false" />
</rule>
<rule name="default" queue="sample_queue"/>
</queuePlacementPolicy>
</allocations>
-
小结
- 常见的调度机制有哪些,各自的特点和区别是什么?
- FIFO:单队列,每个队列内部是FIFO,不能并发和并行
- Capacity:多队列,每个队列内部还是FIFO,能并行不能并发,资源动态抢占
- Fair:多队列,每个内部是共享资源,能并发也能并行,资源动态抢占,支持权重
- 常见的调度机制有哪些,各自的特点和区别是什么?
知识点11:数据仓库的介绍
-
引入:工作中处理的数据都放在哪里?
-
目标:了解数据仓库的基本功能和场景
- 什么是数据仓库?
-
路径
- step1:OLTP与OLAP
- step2:数据仓库的设计
- step3:数据仓库的流程
-
实施
-
OLTP与OLAP
-
OLTP:联机事务处理
-
应用:满足业务上的数据存储
-
例如
-
-
特点
- 数据量小
- 支持事务
- 效率高,延迟性比较低
-
模型:数据库
-
实现:数据库工具
- MySQL、Oracle
-
OLAP:联机分析处理
-
应用:专门用于解决数据处理的数据存储的平台
-
例如
-
-
特点
- 数据量大
- 不需要支持事务
- 时间的要求不高,离线的OLAP:T+1
-
存储:数据仓库模型
-
实现:数据仓库工具
- Hive、Greenplum
-
-
数据仓库的设计
- 数据仓库:更加规范化和统一化的数据管理的设计模型,用于实现数据加工和处理的平台,实现数据的应用
- 举个例子:做一道红烧肉
- 数据库:为了数据存储而设计,不同的数据存储在不同的数据库中
- step1:买肉
- step2:买配料
- step3:切、腌制、焯水
- step4:炖
- step5:吃
- 数据仓库
- step1:去超市买现成的
- step2:加热以下吃
- 数据库:为了数据存储而设计,不同的数据存储在不同的数据库中
- 数据仓库是一个数据的超市:所有数据按照分类进行存放,在数据仓库中对数据进行加工
- 数据库
- 人事管理系统:MySQL
- 财务管理系统:Oracle
- |
- 公司的年度报表:人事和财务数据
- 数据仓库
- 所有数据进行集中存储
- 数据库
-
数据仓库的流程
- ETL:数据清洗,将各种各样的数据按照规则进行过滤,补全、转换
- 分层:数据从进入数据仓库到最后被应用,数据经过处理的步骤
- 建模:表应该怎么建,表的设计
-
-
小结
- 什么是数据仓库?
- 定义:数据仓库是一种规范化和统一化的数据存储的设计模型,用于解决数据处理应用的问题
- 本质:数据仓库本质就是一个数据超市,数据在数据仓库中分类存放并且可以对数据进行加工,直接使用加工后的数据
- 什么是数据仓库?
知识点12:Hive的功能及应用场景
-
引入:数据仓库中的分布式计算如果用MR来实现,业务分析人员不会写Java怎么办?
-
目标:掌握Hive的功能与应用场景
- 什么是Hive,有什么功能?
- Hive在工作中的应用场景是什么?
-
路径
- step1:Hive的诞生
- step2:Hive的功能
- step3:Hive的应用场景
-
实施
-
Hive的诞生
- 缘起FackBook,用Hadoop来实现大量数据的分析处理时候遇到了问题
- 分析的需求只有业务人员才知道怎么分析:不会写Java,会SQL
- 分析的程序只有开发人员才知道怎么开发:不会写分析
- 解决:让开发人员写一个工具,将SQL语句转换为MapReduce程序
- 缘起FackBook,用Hadoop来实现大量数据的分析处理时候遇到了问题
-
Hive的功能
-
MapReduce处理的数据来自于HDFS文件
-
SQL处理的来自于表
-
功能一:Hive实现了将HDFS文件映射成表的数据
- 使用功能一来构建数据仓库,实现分布式的数据存储
-
功能二:Hive实现了将SQL转换为MapReduce/Tez/Spark的程序,提交给YARN执行
- 功能二一般不使用Hive来做,Hive默认转换为MapReduce,性能很差
- 工作中使用其他的一些分布式计算的SQL引擎来代替Hive
- Presto
- Impala
- SparkSQL
-
功能:实现分布式存储【HDFS】和分布式计算【MapReduce/Tez/Spark】
-
本质:Hive是一种支持SQL的特殊的Hadoop的客户端
-
不使用Hive:用户必须会Java或者Python,才能操作Hadoop
-
-
有了Hive:用户可以通过SQL直接操作Hadoop
-
Hive的应用场景
- 通过功能一构建数据仓库
-
Hive官方:hive.apache.org
-
-
小结
- 什么是Hive,有什么功能?
- 本质:一种支持SQL的Hadoop的客户端
- 功能
- 功能一:将HDFS文件映射成Hive中表【离线数据仓库】
- 功能二:将SQL转换为MR程序,提交给YARN执行
- Hive在工作中的应用场景是什么?
- 用Hive作为数据仓库工具,实现数据仓库的构建
- 分布式SQL计算查询:一般不用Hive自带的计算来实现
- Impala
- SparkSQL
- 什么是Hive,有什么功能?
知识点13:Hive的架构
-
目标:掌握Hive的架构组成
- Hive由哪些部分组成?
-
实施
-
Hive客户端
- 连接用户与服务端
- 提供SQL开发界面,允许用户开发SQL,将用户开发的SQL提交给Hive的服务端
- 将服务端执行的结果返回给用户
-
Hive服务端
- 接受客户端的请求
- 解析用户提交的SQL语句
- 将SQL转换为分布式计算或者分布式存储的程序提交给Hadoop运行
- SQL解析器:解析SQL语句的
- 编译器:将SQL生成执行计划
- 优化器:选择最优的方案来实现
- 执行器:执行对应的处理操作
-
元数据
-
Hive写一条SQL语句
select count(*) from table;
-
问题:SQL语句会变成MapReduce,读的文件是谁呢?
-
元数据:存储整个Hive中所有核心的数据
- Hive中所有数据库、表的信息
- 记录了HDFS文件与Hive表的映射关系
-
-
小结
- Hive由哪些部分组成?
- 客户端:提交SQL
- 服务端:处理SQL
- 元数据:记录HDFS文件与Hive表的映射关系
- Hive由哪些部分组成?
知识点14:Hive的部署
-
目标:实现Hive的安装部署
-
路径
- step1:版本下载
- step2:解压安装
- step3:修改配置
- step4:启动
-
实施
-
版本下载
- 目前在工作中:Hive1.x和Hive2.x
-
区别
- 越来越像MySQL:语法支持的越全面,功能特点都支持的越来越多
- 底层计算引擎的优化越来越好,性能越来越高【不用MapReduce】
-
-
解压安装
-
以第三台机器为例
-
将hive安装包和MySQL连接驱动包上传到/export/software目录下
-
上传
-
-
cd /export/software/
rz
-
解压安装Hive
tar -zxvf apache-hive-2.1.0-bin.tar.gz -C /export/server/ cd /export/server/ mv apache-hive-2.1.0-bin hive-2.1.0-bin
-
bin:客户端和服务端的管理命令存储目录
- conf:配置文件目录
-
lib:依赖包
-
- 安装MySQL
- 为什么要用到MySQL?
- 将Hive的元数据存储在MySQL中
- inux阶段已安装过,不用安装,检查配置即可
- 检查MySQL登录与授权是否正常
- 检查登录
```shell
mysql -uroot -p
```
- 检查授权
```shell
select user,host from mysql.user;
```
- 如果不一样,没有授权,请执行授权语句
```sql
grant all privileges on *.* to 'root'@'%' identified by '123456';
delete from mysql.user where host != '%';
flush privileges;
```
-
修改配置
-
切换到配置文件目录
cd /export/server/hive-2.1.0-bin/conf/
-
修改hive-env.sh
mv hive-env.sh.template hive-env.sh
#修改48行 HADOOP_HOME=/export/server/hadoop-2.7.5 #修改51行 export HIVE_CONF_DIR=/export/server/hive-2.1.0-bin/conf
-
- 将提供的hive-site.xml放入conf目录中
- 修改hive-site.xml
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://node3:3306/hivemetadata?createDatabaseIfNotExist=true&useSSL=false</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
<property>
<name>datanucleus.schema.autoCreateAll</name>
<value>true</value>
</property>
<property>
<name>hive.server2.thrift.bind.host</name>
<value>node3</value>
</property>
-
将MySQL连接驱动放入Hive的lib目录中
cp /export/software/mysql-connector-java-5.1.38.jar /export/server/hive-2.1.0-bin/lib/ cd /export/server/hive-2.1.0-bin/ ll lib/
-
配置Hive的环境变量
vim /etc/profile #HIVE_HOME export HIVE_HOME=/export/server/hive-2.1.0-bin export PATH=:$PATH:$HIVE_HOME/bin source /etc/profile
-
启动
-
启动HDFS和YARN
start-dfs.sh
start-yarn.sh
-
启动Hive
-
第一次启动Hive之前需要初始化元数据【只有第一次启动需要,以后每次启动都不需要了,类似于格式化】
-
创建HDFS相关目录
hdfs dfs -mkdir -p /user/hive/warehouse hdfs dfs -chmod g+w /tmp hdfs dfs -chmod g+w /user/hive/warehouse
- /user/hive/warehouse:Hive中所有表的数据在HDFS中的存储目录
-
初始化Hive元数据
cd /export/server/hive-2.1.0-bin/ bin/schematool -dbType mysql -initSchema
-
-
- 启动Hive
hive
-
小结
- 参考笔记一步步实现即可
知识点15:Hive实现WordCount
-
引入:Hive如何实现WordCount的问题?
-
目标:实现HiveSQL开发WordCount程序
-
路径
- step1:创建表
- step2:加载HDFS数据
- step3:SQL分析处理
-
实施
-
创建表
create table tb_word( words string );
-
加载HDFS数据
load data inpath '/wordcount.txt' into table tb_word;
-
-
SQL分析处理
create table tb_word2 as select explode(split(words," ")) as word from tb_word; select word,count(*) as numb from tb_word2 group by word order by numb desc;
-
小结
- 如何使用HiveSQL实现WordCount开发?
- 创建表,实现HDFS文件与表的映射
- SQL + 函数实现计算处理需求
- 如何使用HiveSQL实现WordCount开发?
知识点16:Hive实现二手房统计分析
-
引入:使用MR实现二手房分析时,需要自定义数据类型解决,使用HiveSQL如何解决?
-
目标:实现HiveSQL开发二手房统计分析程序
-
路径
- step1:创建表
- step2:加载HDFS数据
- step3:SQL分析处理
-
实施
-
数据
梅园六街坊,2室0厅,47.72,浦东,低区/6层,朝南,500,104777,1992年建
-
创建表
create table tb_house( xiaoqu string, huxing string, area double, region string, floor string, fangxiang string, t_price int, s_price int, buildinfo string ) row format delimited fields terminated by ',';
-
加载本地文件数据
load data local inpath '/export/data/secondhouse.csv' into table tb_house;
-
-
SQL分析处理
select region, count(*) as numb, round(avg(s_price),2) as avgprice, max(s_price) as maxprice, min(s_price) as minprice from tb_house group by region;
-
小结
- 如何实现HiveSQL开发二手房统计分析程序?
- step1:创建表:根据数据的格式
- step2:关联数据文件
- step3:SQL + 函数
- 如何实现HiveSQL开发二手房统计分析程序?