Hadoop
一、什么是hadoop
1.什么是hadoop
- hadoop是apache基金会下的一套开源软件 hadoop提供利用服务器集群,根据用户自定义逻辑去对海量的数据进行分布式处理。
核心组件:HDFS,Mapreduce,Yarn
广义上来说,HADOOP通常是指一个更广泛的概念——HADOOP生态圈
2.hadoop 产生背景
产生背景:2003年和2004年谷歌两篇论文:GFS\MAPREDUCE
3.生态圈
HDFS:分布式文件系统
MAPREDUCE:分布式运算程序开发框架
HIVE:基于大数据技术(文件系统+运算框架)的SQL数据仓库工具
HBASE:基于HADOOP的分布式海量数据库
ZOOKEEPER:分布式协调服务基础组件
Mahout:基于mapreduce/spark/flink等分布式运算框架的机器学习算法库
Oozie:工作流调度框架
Sqoop:数据导入导出工具
Flume:日志数据采集框架
4.集群搭建:
1.登录root用户
2.修改ip
3.修改主机名
4.配置ssh免密登录
5.关闭防火墙
6.按照jdk
7.解压hadoop安装包
8.配置核心文件: hadoop-env.sh,core-site.xml , mapred-site.xml , hdfs-site.xml
9.配置hadoop环境变量
10.启动节点start-all.sh
二、常见命令
hdfs dfsadmin -report (查看集群的健康状态,等同于浏览器上50070端口。可以查看是否安全模式)
hdfs dfs -help rm ------------------------输出某个命令参数手册
hdfs dfs -ls ------------------------显示目录信息
hdfs dfs -mkdir -p /test/test1/test2 ----创建目录
hdfs dfs -moveFromLocal /local/test.txt /testhdfs/test ----从本地剪切粘贴到hdfs
hdfs dfs -moveToLocal /testhdfs/test/test.txt /local/ ----从hdfs剪切粘贴到local
hdfs dfs -appendToFile /hello.text /nihao.text ----追加数据到已经存在的一个文件末尾
hdfs dfs -cat /hello.txt ----查看文件全部内容
hdfs dfs -tail /hello.txt ----查看文件末尾
hdfs dfs -text /test/log.log ----以字符串形式打印文件内容
hdfs dfs -chmod 666 /hello.txt --修改权限
hdfs dfs -chow hadoop:hadoop001 /hello.txt --修改权限组
hdfs dfs -copyFromLocal ==hdfs dfs -put --上传文件
hdfs dfs -copyToLocal ==hdfs dfs -get --下载文件
hdfs dfs -cp /hello.txt /testcp/hello2.txt -------hdfs之间复制文件
hdfs dfs -mv /hello.txt /testcp/ ---移动文件或者修改文件名
hdfs dfs -getmerge /testhdfs/* /local/sum.txt 合并下载到本地的一个文件
hdfs dfs -rm -r /test 删除文件活文件夹
hdfs dfs -rmdir /testhdfs/test 删除空目录
hdfs dfs -df -h / 统计当前目录可用空间信息
hdfs dfs -du -h /test/ 统计文件夹的大小信息
hdfs dfs -count /testhdfs/ 统计目录下的所有文件节点数量
hdfs dfs -setrep 3 /testhdfs/test/txt 设置hdfs中文件的副本数量
三、HDFS
3.1HDFS构成
NameNode、DataNode、Secondary NameNode
3.2概述
- NameNode:负责管理整个文件系统的元数据(稍后详细)
- DataNode:负责存储用户的文件数据块:文件会按固定的大小(blocksize,hadoop2.x上为128M,hadoop1.x为64M)被切分为若干块存储在若干台datanode上。
每一个文件数据块可以拥有多个副本,并存放于不同的datanode上
datanode会定期向NameNode汇报自身保存的文件Block信息,而NameNode则负责保持控制文件的副本数。
HDFS的内部工作机制对客户端保持透明,客户端访问hdfs都是通过向NameNode来申请进行的。 - Secondary NameNode:负责定期checkpoint,下载NameNode上的最新fsimage和所有的edits日志到Secondary NameNode目录并做一个合并merge,供给下一次NameNode启动加载使用,同时做一个自带的备份作用
3.3 NameNode工作机制
3.3.1 职责:
- 1.负责响应客户端的需求
- 2.负责元数据的管理(查询和修改)(写文件和查文件)
3.3.2 NameNode中元数据的存储形式:
- 1.NameNode内存有一份完整的元数据(meta data)
- 2.NameNode工作目录中中有一份准完整的元数据镜像(fsimage)存储在磁盘中
- 3.用于衔接内存中metadata和持久化元数据镜像fsimage之间的操作日志edits。
ps:当客户端对hdfs的文件进行修改或者新增时候,操作记录先被记载在edits日志文件中,当客户端操作成功后,响应的元数据才会更新到meta data中。
3.3.4 NameNode写数据
- 1.client跟namenode通信请求上传文件,namenode确认目标文件是否已经存在,父目录是否存在。
- 2.namenode返回信息允许上传
- 3.client通信请求第一个block应该传输到那个datanode服务器
- 4.namenode返回三个datanode服务器abc地址给client
- 5.client请求三台服务器中的a上传数据,进行RPC调用,建立pipeline,a收到请求后调用b,b调用c。pipeline建立完成,又c逐级返回信息给client。
- 6.client开始往a上传第一个block(先从磁盘中读取数据放到本地内存缓存),然后以packet为单位上传,a收到第一个packet后就上传给b,b上传给c。(a每传一个packet就会将每个packet放进一个应答队列等待应答)
- 7.当一个block传输完成后,client再次请namenode上传第二个block的服务器地址。
3.3.5 NameNode读数据
- 1.client 跟namenode通信查询文件,namenode确认该目标文件是否存在,返回datanode地址
- 2.client根据就近原则和随机原则挑选服务器,请求建立scoket流
- 3.datanode从磁盘读取数据放入scoket流中,以packet为单位做校验,发送数据
- 4.client以packet位单位接收数据,然后本地缓存,然后写入目标文件
3.4 Secondary NameNode 工作机制
3.4.1 Secondary NameNode
工作职责:定期合并 NameNode 上的fsimage元数据镜像和所有操作日志edits。
每隔一段时间,会由 Secondary NameNode 去复制 NameNode 上最新的一个fsimage文件和累积所有的edits文件下载到本地,然后加载到内存中merge合并。(这个过程交checkpoint)
3.4.2 Secondary NameNode
- 附带作用: Secondary NameNode 和 NameNode 工作目录存储结构是相同的,当 NameNode
单点故障的时候退出需要恢复数据时,可以从 Secondary NameNode 拷贝fsimage 到 NameNode 工作目录,以恢复
NameNode 的元数据。(只能恢复大部分,不能是全部,因为部分数据可能还没来得及做checkpoint)
ps:
checkpoint 的配置参数:
dfs.namenode.checkpoint.check.period=60 #检查是否满足条件触发checkpoint 为60秒
dfs.namenode.checkpoint.dir=file://${hadoop.tmp.dir}/dfs/namesecondary #checkpoint目录 Secondary NameNode
dfs.namenode.checkpoint.edits.dir=${dfs.namenode.checkpoint.dir} #edits 目录
dfs.namenode.checkpoint.max-retries=3 #最大重试次数
dfs.namenode.checkpoint.period=3600 #两次checkpoint 间隔
dfs.namenode.checkpoint.txns=1000000 #两次checkpoint之间的操作记录最大限制
3.4.3 checkpoint流程
- 1.Secondary NameNode请求是否需要checkpoint
- 2.得到NameNode响应后,Secondary NameNode请求checkpoint
- 3.NameNode滚动(edits有一个seen_txid滚动序号,每次重启namenode,namenode就知道将那些edits加载到edits中)当前正在写的edits文件,该文件为待合并状态,并生成新的edits.inprogress文件,后续的hdfs修改操作将写入该日志中。
- 4.Secondary NameNode将待合并状态的edits文件和最新的fsimage文件下载到secondary namenode本地。
- 5.Secondary NameNode将fsimage文件和edits文件加载到内存中进行merge合并,dump转存储成新的的fsimage文件fismage.checkpoint.
- 6.Secondary NameNode 将fsimage 上传到NameNode并重命名为fsimage。
3.5 DataNode工作机制
3.5.1 工作职责:
- 1.存储用户的文件块数据
- 2.定期向namenode汇报自身持有的block信息(当集群中某些block副本失效的时候,集群可namenode可以根据判断是否已经失效下命令重新复制副本)
3.5.3 DataNode掉线判断时间设置
- datanode进程死亡或者网络故障造成无法与namenode通信的时候,namenode不会立即将该节点判断为死亡,
要经过一段时间后才会判断死亡,这段时间称为超时时长。 - HDFS的超时时长默认是10分组+30秒,计算公式如下 timeout = 2*
heartbeat.recheck.interval(心跳检查时间) +10*dfs.heartbeat.interval(心跳时间)
一般心跳检查时间为5分钟,心跳时间为3秒钟
3.6 Hadoop HA 高可用模式
Hight Availability 高可用主要用于解决NameNode单点故障问他,以热备方式为NameNode提供一个备用Namenode。一旦active的主namenode出故障了,standby状态的Namenode马上切换为active接替工作。
HA机制:
HA主要是为了解决单点故障问题,通过jounalnode共享集群状态,通过zkfc选举active的namenode,时刻监控着状态,自动备援。
jounalnode存放管理edits文件,active namenode会将修改或者增加数据的操作日志edits写入共享文件管理系统jounalnode,standby namenode监控这这个系统,一旦发现有新的数据写入,则读取这些数据,加载入自己的内存中。
以保证stadnby namenode 跟active namenode的内存状态是一致的。在紧致状态下standby namenode便可快速转换为active namenode。
转换步骤如下:
- 1.active namenode 遭遇故障宕机,进入假死状态。
- 2.active namenode节点的zkfc 检测到假死,
- 3.active namenode 上的zkfc就会通知standby namenode下的zkfc。
- 4.standby namenode上的zkfc收到通知后就会强心接触active上的锁,强心杀死active namenode,防止脑裂
- 5.standby namenode 上的zkfc激活本节点的namenode进入到active 状态。
四、MapReduce
4.1 什么是MapReduce
mapreduce是一个分布式运算程序框架
mapreduce核心功能是将用户编写的业务逻辑代码和自带的默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上。
4.2为什么要mapreduce
- 1.海量的数据在单机上无法被处理
- 2.一旦将单机版本程序扩展到集群上分布式运行,将极大增加程序复杂度和开发难度。
- 3,引入mapreduce框架后,开发人员可以将绝大部份工作放在业务逻辑上的开发,而将分布式计算上的复杂性交由框架处理。
4.3 mapreduce 运行框架
一个完整的mapreduce程序在分布式运行上有三个实例进程和1个重要机制
- MRappMaster 负责整个程序的过程调度和状态协调
- mapTask 负责map阶段的整个数据处理流程
- ReduceTask 负责整个reduce阶段的整个数据处理流程
4.shuffle机制,负责将maptask阶段的处理结果数据,分发给reducetask,并在分发中对数据按key进行了分区和排序。shuffle机制分别在- 1.在collect阶段
- 2.spill阶段
- 3.Combiner
- 4.copy阶段
- 5.Merge阶段
- 6.sort阶段
4.4 MR过程分析
- 1.一个MR程序启动时,最先启动MRappMaster ,mrappmaster根据本次job的描述信息计算出需要的maptask实例数量,然后跟集群申请相对于的maptask进程。
(问题1:读取数据的时候产生多少个mapper好? mapper数量太多会产生大量的小文件,过多的mapper创建和初始化及关闭虚拟机会产生大量的硬件资源。mapper太少则并发度小,job执行时间长,无法充分利用分布式硬件资源)
(问题2:mapper由什么决定? 1.输入文件的数量 2,输入文件大小 3.配置参数,可以设置minsize,maxsize,block.size,这三个配置影响了splitSize切分文件size的大小,SplitSize=Math.max(minsize,Math.min(MaxSize,blockSize)),一般情况下切分文件是按blocksize切分,hadoop2.x是128M,1.x是60M)
(问题3:解决小文件过多 减少mapper数增大合并值 ?例如5M,10M的数据很快处理完了,但是128M的处理很长,需要控制参数来调整mapper,这里需要减少mapper,需要合并小的文件,可以设置参数合并器的大小))
- 2.maptask split切分input进来的数据,收集kv对,然后进入分区方法partition,进行标记分区,这里的partition值是通过计算key的hash值后对reduce
task的数量取模的(后面reduce取mapper结果集是按照分区标记去下载)。
(问题:自定义partition。 每一个reducer 的输出都是有序的,但是合并所有的reducer不是全局有序的,所以为了全局有序,可以自定义partition,用输入的数值除以reducer的数量的商为值分割界限,即边界为商的一倍、二倍…numpartition-1倍。这样就能保持全局有序。)
(问题:自定义partition ,重写partition 返回值, return key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;)
(问题:自定义partition解决数据倾斜 :对于某些数据值,由于不同的key的hash值是一样的,导致大部分键值对分配给了同一个reducer处理,其他reducer处理很少,从而拖延了job运行,这时候可以自定义相同partition保证相同key值到同一个reducer处理)
-
3.标记好分区后,maptask会不断将键值对输入到内存的一个环形结构中,这个环形结构是一个字节数组,叫KVbuffer,也叫环形缓冲区。环形缓冲区默认大小是100M可以通过调整mapreduce.task.io.sort.mb参数调整大小。
-
4.当环形缓冲区达到了80%后,会进行溢写数据,将数据从内存中溢写spill到磁盘中去。但是在溢写之前会进行一个sort排序,按照key索引进行字典排序。
(问题:通过调整参数增大spill内存大小和增大spill阈值去尽量减少溢写的次数去优化提高性能。)
-
5.溢出的数据文件会被用归并算法合并成更大的溢出文件,如果设定了combiner的话也可以对溢写的文件进行combiner操作,根据自定义的combiner函数对mapper的结果进行合并(默认一般是spill出的文件每三个就合并一个)
-
6.在spill溢写和merge合并的过程中,都要调用partition进行分组和针对key进行排序。
(问题:通过压缩减少磁盘IO和网络IO。如果中间文件非常大,可以通过设置mapreduce.map.output.compres为true进行压缩,reducer读数据后也需要解压文件。压缩一般能10倍的减少IO操作,但是这个过程会消耗CPU)
- 7.reductask根据自己的partition分区号去下载copymaptask的结果集。因为maptask的结果集有可能包含着每一个reducetask需要的数据,所以等第一个maptask结束后,所有的reducetask都会进行尝试去下载,所以map和reduce是交叉的
(数据被下载后,map不会马上删除,这是为了防止reduce错误。)
(reducetask下载数据的时候,有可能会因为map机器发生错误或者中间文件丢失或者网络瞬断,导致下载失败,当一定时间后下载还是失败,reducetask就会放弃此次下载,到另外的地方下载(这段时间可能map会重跑),可以设定增加mapreduce.reduce.shuffle.read.timeout来优化)
- 8.下载的数据也是先放入内存缓冲区,当内存缓冲区到了一定量后才会spill溢写数据,大概是reducetask的max heapsize的70%。再溢写过程中,会有一个后台线程将溢写的文件合并merge成一个大文件,并做一个全局的排序。
(reducer溢写后也可以设定combiner,减少写入磁盘的数据量)
- 9.当reducetask全部下载号数据后,就会进入到真正的reduce计算阶段,调用用户自定义的reduce方法。
4.5 MapReduce 的序列化
因为java中的序列化会附带很多额外的信息,像校验信息、hearder和继承体系,不便再网络中传输,所以hadoop有一套自己的序列化机制。
ByteWritable、ShortWritable、IntWritable、LongWritable、FloatWritable、DoubleWritable、Text、NullWritable
五.yarn调度过程
- 1.用户提交程序,包括applicationMaster程序,applicationmaster启动命令,用户程序等。
- 2.resourcemanager为该程序分配第一个container,并与相对应的nodemanager通信,要它在这个container里面启动应用程序的applicationmaster
- 3.applicationmaster 向resourcemanager注册,这样用户可以通过resourcemanager监控程序的运行状态,然后为它申请资源并监控允许状态。
- 4.applicationmaster采用轮询的方式通过rpc协议向resourcemanager申请和领取资源
- 5.applicationmaster领取资源后便与相对应的nodemanager通信,请求启动任务。
- 6.nodemanager为任务设置好运行环境后,将任务启动命令写在一个脚本中,通过运行该脚本启动任务。
- 7.各个任务通过rpc协议向applicationmaster汇报自己的状态和进度,方便application 掌握各个任务的运行状态,方便失败时重启任务。
- 8.应用程序完成后,applicationmaster向resourcemanager注销并关闭自己。