** Hadoop 框架基础(四)
上一节虽然大概了解了一下 mapreduce,徒手抓了海胆,不对,徒手写了 mapreduce 代码,也运行了出来。但是没有做更深入的理解和探讨。
那么……
本节目标:
* 深入了解 mapreduce 过程
* 成功部署 Hadoop 集群
** mapreduce 原理
想要了解 mapreduce 原理,我们必须搞清楚处理数据时的每一个重要阶段,首先,贴上一张官方的图:
我们依次讨论每一个过程以及该过程对应的作用:
我先在这里假设一个情景,我现在有一个 10G 大小的 words.txt,里面存放的是 N 多个英文单词。
这 10G 大小的文件分为若干个 128M 的文件块 block 分布存储于若干个服务器。
好,现在我要统计这 10G 文件中的单词出现频率。
** input split
一个 split 会对应一个 map 任务。
一般来讲,split 的个数和 block 的个数相同,当然也可以多个 block 属于同一个 split,但是后者会产生大量的网络和磁盘 IO,原因在于一个 split 对应一个 map 任务,一个 map 任务肯定跑在某一台机器上,如果某个 split 所包含的多个 block 分布于不同的机器,首先需要做的操作就是把其他机器的 block 拷贝到运行 map 任务的机器上,这会耗费一定时间,所以,默认情况下,一个 block 对应一个 split,源码中设定如下:
mapreduce.input.fileinputformat.split.minsize == 0
mapreduce.input.fileinputformat.split.maxsize == 10000
splitSize=max(minSize,min(maxSize, blockSize)),此为默认 split 大小
如果要修改,则如下方式:
recordSize 表示一个记录的大小,分块要保证数据的完整性,所以:
int blockSize = Integer.parseInt(x); //x 表示你希望的 split 大小
int splitSize = blockSize / recordSize * recordSize;
conf.setLong("mapred.max.split.size",splitSize);
conf.setLong("mapred.min.split.size",splitSize);
** map
此时输入的到 map 中的数据形式大致为:
<0, cat one hadoop element...> ---> 调用一次 map
<30, dog two one hadoop....> ---> 调用一次 map
……
省略号表示后边还有,其中 0,30 表示的是偏移量,每次从当前 split 中读取 1 行数据,比如第一次读取第一行,偏移量为 0~29,第二次是第二行数据,偏移量是 30~?,以此类推。每次读取都执行一次 map 任务,并调用一次 map 方法。map 阶段结束,输出结果形式大致为:
<cat , 1> <one, 1> <hadoop, 1> <element, 1> …… 等等
从此进入 shuffle 阶段
** buffer in memory
这是一个状态描述,表明此刻在内存中开始操作,buffer 在这里是内存中的一个环形数组。
之所以用环形数组来存放数据,是因为这样可以最大化的利用存储空间。
这个环形数组中存放数据分为两个类别:
1、元数据区(Kvmeta):
里面存放的每组数据都包含:
** value 的起始位置
** key 的起始位置
** partition 值
** value 的长度
2、数据区(Kvbuffer):
里面存放的每组数据都包含:
** key 值,例如 <cat ,1> 中的 cat
** value 值,例如 <cat, 1> 中的 1
注意:
* 以上两个区域的分界点为 0,即 0 以上存储数据区内容,0 以下存储元数据区的内容。
* 这个环形 buffer 虽然实际为一个字节数组,但抽象为一个 IntBuffer,无论哪个区域中的数据,每组数据中的每个元素都占用 4 个字节,也就是每组中的每个元素的存储,数组下标都将移动 4 位 (因为一个 int 为 4 个字节)。
* partition
分区的意义在于把一系列相似的单词分为同一个区。即单词归类处理,这样不同机器上的不同 map 任务输出的单词可以依据分区递交给相同的 reduce 做处理。
注意:
* 相关类: HashPartitioner
* 这里的 “相似”,指的是:键(此例中为单词)的 hash 值在某一个范围内
* sort
map 排序阶段,在 buffer 中把数据按照 partion 和 key 两个关键字做升序排序,这个排序只需要移动 “元数据区” 中的每组数据顺序即可。排序结果是 “元数据区” 中的每组数据按照 partition 分区聚集在一起,同一个 partition 分区内的 key 按照字典顺序排序。
* combine(可选)
结合阶段,可以在 map 阶段简化数据输出,减少后边 spill 溢写过程中,spill 溢写文件的大小,例如:可以将 <cat, 1> <cat, 1 > 这样的数据在 map 阶段合并为 < cat, 2 > 这样的数据作为 map 输出,默认没有开启。
* spill
溢写阶段,当内存中的环形存储结构占用率达到一定程度(默认占用 80% 时,则开始溢写),则将环形数据区中的所有内容,刷入到当前本地硬盘能够存的下这些数据的目录中,以使内容腾出空间供后边继续使用。
相同的 partition 分区的数据写入到同一个文件中,类似:“spill10.out”,“spill11.out”这样的文件,每一个 partition 分区所产生的文件的存放位置和一些相关信息,存放在另一个 “元数据” 文件中,类似“spill10.out.index”,“spill11.out.index”(注意,这个元数据文件和刚才说的元数据区不是一码事)。
这个元数据文件包含:
** 起始位置
** 原始数据长度
** 压缩之后的数据长度
** crc32 的校验数据
该文件的作用是:标记某个 partition 对应的文件在哪个目录,哪个索引中存放。
注意:
* spill10.out.index 这样的文件不一定会产生,如果内存中放得下(针对这个文件数据的存放,内存只提供 1M 空间可用),就放在内存中。
* 内存占用达到 80%,开始溢写,那么此时 map 任务还在进行,还在往内存里添加数据,新的数据的起始点(0 点)为剩余空间的中间部分,然后数据区和元数据区分别往两边递增即可,溢写后释放内存后也不必改变什么,继续写入即可。
** map merge
map 融合阶段,将溢写阶段产生的多个文件,根据所属分区,把具有相同 partition 分区的 “元数据(从 spill10.out.index 这样的文件中读取的)” 放置于同一个 segment 列表中,最后根据 segment 列表,把数据从 spill 溢写出来的文件一个一个中读取出来,写入到 file.out 文件中,同时将这一批段的数据索引(元数据分区等)写入到 file.out.index 文件中,最终生成两个文件,file.out 和 file.out.index,其中包含了多段数据,每段数据对应一个分区。
** compress (可选)
map 压缩阶段,对 map merge 阶段产生的文件进行压缩处理,以便于在后边的网络传输过程中减少网络 IO 压力,提升效率。
至此,map 端的 shuffle 过程结束。
** sort merge
reduce 任务会根据分区数据段拉取每个 map 任务产生的数据,拉取后,因为可能涉及到多个 map 产生的数据,所以要进行排序,一边 copy 一边排序,最后把多个 map 产生的具有相同分区的数据合并为一个分区数据段,这个 merge 过程和 map 的 merge 算法过程一样。
在此完成 shuffle 阶段
** reduce
对于本例而言,此时产生的某个分区中的某个单词形式大概如下:
<cat, [1, 1, 1, 1, 1, 1]>,在调用 reduce 方法时,进行 values 各个元素的叠加操作即可。
** output
统计完成后,输出数据到文件目录,文件格式为 part-r-00000 这样形式的文件,存放于 HDFS 中。文件中 key 和 value 默认的分隔符为:\t
** Hadoop 集群部署
之前我们在 yarn 框架中运行 mapreduce 任务,或者操作 hdfs,其中的各种节点都是运行在一台虚拟机上的,现在我们要将 hadoop 部署在一个多台虚拟机构成的完全分布式集群中(全部都在一个机器节点上的叫做伪分布式,比如之前的方式)。部署前,我们先勾画一下各个节点的部署结构,如下图所示:
描述:
3 台机器共有进程:HDFS 的 datanode,yarn 的 nodemanager
其中,HDFS 的 namenode 开在 z01 这台机器上,secondarynamenode 开在 z03 这台机器上
YARN 的 resourcemanager 开在 z02 这台机器上。
注:SecondaryNameNode 是用来协助 NameNode 整合 fsimage 和 edits 的。
一、准备系统环境
1、修改主机名
# vi /etc/hostname
2、主机名和 ip 地址的映射
# vi /etc/hosts,我的机器修改如图,注意,三台机器都要这么设置:
3、关闭防火墙和 selinux
请跳转至 Linux 基础 04 查看相关方法。
4、创建普通用户
# useradd 用户名,如果已经有普通用户,则无需再次创建
# echo 666666 | passwd --stdin 用户名
5、配置静态 IP 和 DNS
请参看 Linux 基础 01 内容
6、把后面两个虚拟机的系统启动级别改成 “字符模式”(就是没有桌面,这样可以减少虚拟机负担,加速系统启动和运行)
# cat /etc/inittab,内容如图所示:
根据文件中的提示,可以使用命令:
systemctl set-default multi-user.target,来设置*面启动 linux
systemctl set-default graphical.target,来设置有界面启动 linux
7、卸载服务器 JDK
请参看 Linux 基础 02 中的内容
二、配置 NTP 时间服务器
对于我们当前这种案例,主要目标是把 z01 这台服务器设置为时间服务器,剩下的 z02,z03 这两台机器同步 z01 的时间,我们需要这样做的原因是因为,整个集群架构中的时间,要保持一致。
** 检查当前系统时区,使用命令:
# date -R,如图:
注意这里,如果显示的时区不是 + 0800,你可以删除 localtime 文件夹后,再关联一个正确时区的链接过去,命令如下:
# rm -rf /etc/localtime
# ln -s /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
** 同步时间
# ntpdate pool.ntp.org
** 检查软件包
查看 ntp 软件包是否已安装,使用命令:
# rpm -qa | grep ntp,如图,红框中的内容:
如果没有红框中的内容,则可以使用命令:
# yum -y install ntp,来进行安装
** 修改 ntp 配置文件
# vi /etc/ntp.conf
去掉下面这行前面的# , 并把网段修改成自己的网段:
restrict 192.168.122.0 mask 255.255.255.0 nomodify notrap
注释掉以下几行:
#server 0.centos.pool.ntp.org
#server 1.centos.pool.ntp.org
#server 2.centos.pool.ntp.org
把下面两行前面的 #号去掉, 如果没有这两行内容, 需要手动添加
server 127.127.1.0 # local clock
fudge 127.127.1.0 stratum 10
最后,如图所示:
** 重启 ntp 服务
# systemctl start ntpd.service,注意,如果是 centOS7 以下的版本,使用命令:service ntpd start
# systemctl enable ntpd.service,注意,如果是 centOS7 以下的版本,使用命令:chkconfig ntpd on
** z03,z03 去同步 z01 这台时间服务器时间
首先需要关闭这两台计算机的 ntp 服务
# systemctl stop ntpd.service,centOS7 以下,则:service ntpd stop
# systemctl disable ntpd.service,centOS7 以下,则:chkconfig ntpd off
# systemctl status ntpd,查看 ntp 服务状态
# pgrep ntpd,查看 ntp 服务进程 id
同步第一台服务器 z01 的时间:
# ntpdate z01,如图:
** 制定计划任务, 周期性同步时间
# crontab -e
*/10 * * * * /usr/sbin/ntpdate z01,如图所示:
重启定时任务:
# systemctl restart crond.service,centOS7 以下使用:service crond restart,z03 这台机器的配置同理
三、配置无密钥登录
配置 hadoop 集群,首先需要配置集群中的各个主机的 ssh 无密钥访问
在 z01 上,通过如下命令,生成一对公私钥对
$ ssh-keygen -t rsa,一顿回车操作,这条命令执行完毕后(注意使用普通用户执行该命令),会在 / home/z/.ssh / 目录下生成两个文件:id_rsa 和 id_rsa.pub,如图所示:
生成之后呢,把 z01 生成的公钥拷贝给 z01,z02,z03 这三台机器,对,没错,包含当前机器。
$ ssh-copy-id z01
$ ssh-copy-id z02
$ ssh-copy-id z03
完成后,z02 机器如图(z03 同理):
以上完成了 z01 生成私钥,公钥并把公钥拷贝给 z01,z02,z03 三台机器的过程,z02,z03 这两台机器也需要进行如上操作。全部完成后,我们可以在任意一台机器上,无密钥的连接到另外一台机器,比如,我们在 z01 连接到 z02 这台机器,使用命令:
$ ssh z02,如图:
这样就成功的在 z01 的机器登录到 z02 机器了。
四、安装配置 JDK
使用 root 用户,在后面两台机器上创建 / opt/modules 文件夹,并使该文件夹的所属改为普通用户。
接着便可以使用远程命令 scp,把已经在 z01 中安装好的 jdk 目录拷贝给另外两台机器。
$ scp -r /opt/modules/jdk1.7.0_67/ z02:/opt/modules/
$ scp -r /opt/modules/jdk1.7.0_67/ z03:/opt/modules/
注意中间有空格分开。配置完成后,记得去 z02,z03 修改 / etc/profile 环境变量
五、安装配置 Hadoop
** 首先,需要先删除 z01 中的 / opt/modules/hadoop-2.5.0/data 目录,执行命令:
$ rm -rf /opt/modules/hadoop-2.5.0/data
** 在如下文件中,修改 JAVA_HOME
hadoop-env.sh yarn-env.sh mapred-env.sh
export JAVA_HOME=/opt/modules/jdk1.8.0_121
** 修改 HDFS 默认地址、HDFS 临时存储路径
涉及文件:core-site.xml
fs.defaultFS:hdfs://z01:8020
hadoop.tmp.dir:/opt/modules/hadoop-2.5.0/data
如图:
** 声明哪些服务器是 datanode
涉及文件:slaves
z01
z02
z03
如图:
** 修改数据存放的副本数,SecondaryNameNode 节点地址
涉及文件:hdfs-site.xml
dfs.replication:3
dfs.namenode.secondary.http-address:z03:50090
dfs.namenode.http-address:z01:50070
dfs.permissions.enabled:false
如图:
**resourcemanager 节点配置,以及一些其他配置
涉及文件:yarn-site.xml
yarn.resourcemanager.hostname:z02
yarn.nodemanager.aux-services:mapreduce_shuffle
yarn.log-aggregation-enable:true
yarn.log-aggregation.retain-seconds:86400
如图:
** jobhistory 服务以及其他设置
涉及文件:mapred-site.xml
mapreduce.framework.name:yarn
mapreduce.jobhistory.address:z01:10020
mapreduce.jobhistory.webapp.address:z01:19888
如图:
** 配置好后,拷贝 hadoop 安装目录给其他服务器
$ rm -rf /opt/modules/hadoop-2.5.0/share/doc/,删除该文档目录,以减少远程拷贝的体积
$ scp -r /opt/modules/hadoop-2.5.0/ z02:/opt/modules/
$ scp -r/opt/modules/ hadoop-2.5.0/ z03:/opt/modules/
全部搞定后,接下来我们就可以启动这个分布式系统了
六、启动 Hadoop
** 在 z01 需要先格式化 hdfs 的 namenode:
$ bin/hdfs namenode -format
** 使用 start 的脚本启动集群中所有的 hdfs 服务,包含 namenode 和 datanode 节点
$ sbin/start-dfs.sh
** 在 z02 中启动 yarn 服务,包含 resourcemanager 和 nodemanager,注意,如果 resourcemanger 和 namenode 服务不在同一台机器上,那么启动 resourcemanager 服务必须在所在的机器启动,这里参看我们之前设定的集群配置图,所以需要在 z02 机器上执行如下命令:
$ sbin/start-yarn.sh
启动完成后,分别查看 z01,z02,z03 机器的 jps,如下图:
z01:
z02:
z03:
在对比一下之前的集群配置图,是符合我们的期望的。
** 总结
本节主要深入讨论 mapreduce 的运算原理及过程,以及如何配置一个 hadoop 完全分布式集群。
个人微博:http://weibo.com/seal13
QQ 大数据技术交流群(广告勿入):476966007
作者:Z尽际
链接:https://www.jianshu.com/p/0ad52ec23309
來源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。