大数据基础
定义
大数据是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力来适应海量、高增长率和多样化的信息资产。
大数据技术的战略意义不在于掌握庞大的数据信息,而在于对这些含有意义的数据进行专业化处理。
大数据的意义不在于数量,在于挖掘数据的价值,探究海量数据间的相关性
基本特征
容量(Volume):数据的大小决定所考虑的数据的价值和潜在的信息
种类(Variety):数据类型的多样性
速度(Velocity):获得数据的速度
可变性(Variability):妨碍处理和有效管理数据的过程
真实性(Veracity):数据的质量
复杂性(Complexity):数据量巨大,来源多渠道
价值(Value):合理运用大数据,以低成本创造高价值
Hadoop
Hadoop是一个由Apache基金会所开发的分布式系统基础架构,是一个开源框架,允许使用简单的编程模型在跨计算机集群的分布式环境中存储和处理大数据。
它的设计是从单个服务器扩展到千数个机器,每个提供本地计算和存储。
Hadoop框架实现分布式最核心的设计:HDFS 和 MapReduce 其中HDFS为海量的数据提供了存储,MapReduce为海量的数据提供了计算。以及在Hadoop2.x内,YARN框架实现了分布式资源调度。
Hadoop 1.0到Hadoop 2.0架构的变化图如下
在Hadoop Core中最底层的是HDFS,在其之上的是YARN层,YARN层之上是不同的计算框架。所以不同的计算框架可以共享同一个HDFS集群上的数据,享受整体的资源调度,进而提高集群资源的利用率。
一些分布式计算框架(MapReduce、Spark等)作为YARN应用运行在YARN层上。Hadoop 2.0 绑定了两个YARN应用程序,其中一个就是MapReduce。如pig、hive和crunch都是运行在MapReduce、Spark或Tez之上的处理框架,不和YARN直接打交道。
一、HDFS(Hadoop Distributed File System)分布式文件系统
是基于流数据模式访问和处理超大文件的需要而开发的,可以运行与廉价的服务器上。它所具有的高容错、高可靠性、高扩展性、高获得性、高吞吐率等特征为海量数据提供了不怕故障的存储,为超大数据集的应用带来了很多便利。
1. HDFS基本架构
Master/Slave架构
四个主要部分:HDFS Client、NameNode、DataNode和Secondary NameNode。
一个HDFS集群是由一个NameNode和一定数目的DataNode组成的。
-
NameNode是集群的中心服务器,用来管理文件系统以及管控Client对集群中文件的访问。负责构建命名空间,管理文件的元数据。
-
DataNode有多个,通常来说,集群中的一个节点有一个DataNode,负责实际存储数据和读写工作。
-
用户可以将文件保存在集群中,但是在集群内部,文件将被划分成一组block,这些block会被保存在一组DataNode中。
-
The File System Namespace HDFS支持传统文件系统层级。用户或者应用可以创建文件夹,并在文件夹下保存文件。文件系统命名空间层级和大多数系统的文件层级相同,也同样支持file或者block的增删改查。但是HDFS的文件系统暂时不支持硬链接和软链接,不支持用户配额(user quotas)【配额是操作系统的一个可选功能,它允许管理员以文件系统为单元,限制分派给用户或组成员所使用的磁盘空间大小或是使用的总文件数量】
在NameNode上,用户或者应用可以进行对**文件夹或文件**的操作,如打开、关闭、重命名。在NameNode上也确定了block和各DataNode之间的映射。
而DataNode负责对Client的读或写请求作出反馈,并且根据NameNode的部署结构执行对block的创建、删除或是复制。
NameNode和DataNode本质上是运行在机器上的软件。通常来说,这些机器运行的应该是Linux/GUN系统。由于HDFS是用java语言编写的,只要是支持JAVA的机器又可以用来运行NameNode或者DataNode。
目前,一个机器只有一个DataNode在上面运行,但是Hadoop并不将同一机器上运行多个DataNode的情况摈弃。
HDFS中,单个NameNode的设计大大简化了系统的架构,NameNode是所有HDFS元数据的仲裁者和存储仓库,所有的用户数据不会流经NameNode。
但是这样的设计可能造成集群的单点故障,NameNode不可用时,整个文件系统将不可用。HDFS针对单点故障提供了两种解决方案: ① 备份持久化元数据 ② Secondary NameNode
hadoop apache 官方文档: http://hadoop.apache.org/docs/r1.2.1/hdfs_design.html#Introduction
图片来源于https://blog.****.net/weixin_43275864
2. HDFS原理
图片来源于 https://blog.****.net/xjz729827161
metadata
metadata是存储在NameNode上的元数据信息,它存储到磁盘的文件名为:fsimage。并且HDFS有一个叫edits的文件记录对metadata的操作日志。总体来说,fsimage和edits文件记录了metadata中的权限信息和文件系统目录数、文件包含哪些块、确定block到DataNode的映射、block存放在哪些DataNode上(这个信息由DataNode启动时上报)。
NameNode将这些信息加载到内存并进行拼装,就成为了一个完整的元数据信息。
block
block是磁盘操作的最小单元,读写操作均以block为最小单元。一般的磁盘里通常block的大小为512Byte,而文件系统再物理block之上抽象了另一层概念,文件系统block概念是物理磁盘block的整数倍,通常为几KB。
HDFS的block与文件系统的block概念相同,但是HDFS的block块比一般单机文件系统大得多,默认为128M。HDFS的文件比拆分成block-sized的chunk,chunk作为独立单元存储。比block小的文件不会占用整个block,只会占据实际大小。
HDFS block数据块大小的设置规则
block的大小默认为128M,但是修改hdfs-site.xml文件中的dfs.blocksize值来改变block的大小。
HDFS读取文件中的某一个block的时间为 block寻址时间 + 传输时间
block越大 —> 寻址时间越短,传输时间越长
block越小 —> 寻址时间越长,传输时间越长
HDFS的平均寻址时间为10ms,经过大量测试,寻址时间为传输时间的1%时,为最佳状态,则最佳传输时间为10ms/0.01=1s
如果磁盘的传输速度为100MB/s,则最佳block大小为100MB/s × 1s = 100MB。如果磁盘传输速度为200MB/s,则block的最佳大小为256MB,以此类推。
将文件拆分成block使得单个文件大小可以大于整个磁盘的容量,构成文件的Block可以分布在整个集群。并且也简化了存储系统,对于block无需关注其权限、所有者等内容(这些信息在文件级别进行控制)。HDFS以block为单位复制,作为容错和高可用机制。
NameNode
NameNode就是HDFS的master架构,主要负责HDFS文件系统的管理工作,具体包括命名空间管理和文件block管理
(1) 命名空间namespace管理:它维护这文件系统树(filesystem tree)以及文件树中所有的文件和文件夹的元数据。管理这些信息的文件有两个,分别是Namespace镜像文件(fsimage)和操作日志文件(edit log)
- fsimage用于维护文件系统树以及文件树中所有的文件和文件夹的元数据
- edit log记录了所有针对文件创建、删除、重命名等操作。
拆成两个文件的原因是fsimage文件很大,如果任何操作都要同步更改这个文件执行效率会很低。所以将后续增量的修改放到edit log中,fsimage和 edit log进行合并后会得到一个新的fsimage。这两个文件会被持久化存储在本地磁盘
(2) 文件block管理:NameNode记录着每个文件中各个块所在的数据节点的位置信息(元数据信息),从NameNode中你可以获得每个文件的每个块所在的DataNode。但集群并不持久化存储这些信息,因为这些信息NameNode会在每次启动系统时动态地重建这些信息。
存在两种映射: ① 文件名 —> block组 (持久化在本地磁盘)
② block —> DataNode列表 (不持久化,通过DataNode报给NameNode)
DataNode
DataNode是HDFS的slave架构,负责存储和提取block,读写请求可能来自NameNode或者Client,并且周期性地(心跳定时)向NameNode汇报自己节点上所存储的block相关信息,维护了block_id与DataNode本地文件的映射。
一个block会在多个DataNode中进行冗余备份,而一个DataNode对于一个block最多只包含一个备份。
block caching:频繁使用的block可以在内存中缓存,通常一个block只有一个DataNode会缓存。用户或者应用可以向NameNode发送缓存指令,缓存池的概念用于管理一组缓存的权限和资源。
Client
HDFS的client是用户和HDFS进行交互的手段,HDFS提供了各种各样的客户端,包括命令行接口、JAVA API、C语言库、用户空间文件系统等。
对于一个Hadoop集群,client就是用来访问这个hadoop文件系统的机器。它既可以是装有hadoop的机器,也可以是没有装hadoop的机器。client通过NameNode和DataNode交互访问HDFS中的文件。client提供了一个类似POSIX的文件系统接口供用户调用。
POSIX表示可移植操作系统接口(Portable Operating System Interface),这个标准定义了操作系统应该为应用程序提供的接口标准,是IEEE为要在各种UNIX操作系统上运行的软件而定义的一系列API标准的总称,其正式称呼为IEEE 1003。
**访问HDFS的程序或者HDFS shell命令都可以称为HDFS的客户端(client)。**在HDFS的客户端中至少需要指定HDFS集群配置中的NameNode地址以及端口号信息,或者通过配置HDFS的core-site.xml配置文件来指定。一般可以把客户端和HDFS节点服务器放在同一台机器上。
Secondary NameNode
HDFS针对单点故障提供了两种解决机制:
1)备份持久化元数据
将文件系统的元数据同时写到多个文件系统, 例如同时将元数据写到本地文件系统及NFS。这些备份操作都是同步的、原子的。
2)Secondary NameNode
NameNode的备胎,Secondary NameNode定期合并主NameNode的fsimage和edit log, 避免edit log过大,通过创建检查点checkpoint来合并。它会维护一个合并后的fsimage副本, 可用于在NameNode完全崩溃时恢复数据。也可以手工将其设置为主机。
合并的具体操作如图:
图片来源于 https://blog.****.net/xjz729827161
整个合并操作过程会消耗大量内存和cpu,且不是实时的,所以Secondary NameNode的数据落后NameNode,当NameNode完全狗带的时候,即使用Secondary NameNode进行数据恢复,也必定会有数据丢失。
3. HDFS 2.0
由于HDFS瑕疵很大,所以apache出台了一系列新的机制来修复这些问题。
HDFS HA(高可用HDFS的热备份)
主要是为了解决NameNode的单点故障问题,HDFS HA泳有两个NameNode,其中一个NameNode的状态为 active-活跃,另一个为standby-待命。HDFS应用共享存储系统(比如NFS、QJM等)来实现两个NameNode之间的状态同步。Zookeeper来确保一个NameNode在对外服务。DataNode同时向两个NameNode汇报信息。
Federation
由于NameNode的内存大小会制约文件数量,HDFS Federation提供了一种横向扩展NameNode的方式。在Federation模式中,每个NameNode管理命名空间的一部分,例如一个NameNode管理/user目录下的文件, 另一个NameNode管理/share目录下的文件。 各NameNode之间是独立的,一个节点的失败不会导致其他节点管理的文件不可用。
其实就是把一整个NameNode肢解了…
Federation使用了多个独立的NameNode/namespace。这些NameNode之间是联合的,也就是说,他们之间相互独立且不需要互相协调,各自分工,管理自己的区域。分布式的DataNode被用作通用的数据块存储存储设备。每个DataNode要向集群中所有的NameNode注册,且周期性地向所有NameNode发送心跳和块报告,并执行来自所有NameNode的命令。
**一个block pool由属于同一个namespace的数据块组成,每个DataNode可能会存储集群中所有block pool的数据块。**每个block pool内部自治,也就是说各自管理各自的block,不会与其他block pool交流。
某个NameNode上的namespace和它对应的block pool一起被称为namespace volume(命名空间卷)。它是管理的基本单位。当一个NameNode/namespace被删除后,其所有DataNode上对应的block pool也会被删除。当集群升级时,每个namespace volume作为一个基本单元进行升级。
Federation关键技术点
Federation中存在多个命名空间,如何划分和管理这些命名空间非常关键。在Federation中并采用 文件名hash(目录打散算法,其目的是防止上传文件到服务器的时候造成某个目录下的文件数过多) 的方法,因为该方法的locality非常差,比如:查看某个目录下面的文件,如果采用文件名hash的方法存放文件,则这些文件可能被放到不同namespace中,HDFS需要访问所有namespace,代价过大。为了方便管理多个命名空间,HDFS Federation采用了经典的Client Side Mount Table。
如上图所示,下面四个深色三角形代表一个独立的命名空间,上方浅色的三角形代表从客户角度去访问的子命名空间。各个深色的命名空间Mount到浅色的表中,客户可以访问不同的挂载点来访问不同的命名空间,这就如同在Linux系统中访问不同挂载点一样。这就是HDFS Federation中命名空间管理的基本原理:将各个命名空间挂载到全局mount-table中,就可以做将数据到全局共享;同样的命名空间挂载到个人的mount-table中,这就成为应用程序可见的命名空间视图。
二、分布式资源调度——YARN框架
Hadoop 2.0后的特性
MapReduce是一个分布式框架和编程模型,允许在多节点集群上执行基于批处理的并行化供做,但是MapReduce不适合实时的数据处理。
YARN可以弥补这一不足,其核心是分布式调度程序,主要负责响应客户端创建容器的请求和监控正在运行的请求。
有两类长期运行的守护进程(Daemon),一是运行在Hadoop cluster的master节点上的资源管理器(ResourceManager)和运行在Hadoop cluster中所有节点上且能启动和监控容器(container,这里容器的含义与linux container的概念相同)的节点管理器(NodeManager)。
1. YARN框架的组件
① ResourceManager(RM)
一个Hadoop cluster同一时间提供服务的RM只有一个,负责集群资源的统一管理和调度,以及处理客户端的请求并响应客户端创建容器。RM是YARN的主进程。
② NodeManager
NodeManager是在集群每个节点上运行的从属进程,主要作用是创建、监控和杀死容器。它为来自ResourceManager和ApplicationMaster的请求创建容器,并向ResourceManager报告容器的状态。
2. YARN应用程序的组件
一个YARN应用程序(MapReduce就是YARN应用程序的一个示例,每个应用程序对应一个AM,比如MapReduce就要对应一个AM)包含三个组件:客户端、ApplicationMaster(AM)、容器(Container)。
启动新的YARN应用程序步骤如下:
① YARN客户端与ResourceManager通信以创建新的YARN ApplicationMaster实例(这个实例会被创建在某一个slave节点中)
② YARN客户端会让RM通知ApplicationMaster物理资源要求。AM负责应用程序的管理,为应用程序向RM申请资源(Core、MEMO)
③ ApplicationMaster是YARN应用程序的主进程,AM不会执行任何特定于应用程序的工作,这些函数将会被分配给容器(Container)。AM将通知NM创建容器,并将资源分配给内部的task。
④ RM根据资源要求安排工作。
容器是NM替AM创建的特定于应用程序的进程。AM本身也是由RM创建的容器。task也是运行在Container之内的。
三、MapReduce编程模型(2.0)
MapReduce是一个并行计算模型,包含Map(映射)和Reduce(归约)两个过程。
1. MapReduce执行流程
MapReduce程序的输入和输出都来自HDFS,通常来说,MapReduce的计算节点和存储节点是同一个(不愧是计算靠近数据!)换句话说,就是MapReduce计算框架和HDFS运行在相同的节点上。
运行一个MapReduce程序需要一个ResourceManager,cluster中每个节点需要一个NodeManager(RM、NM不是程序特定的),以及一个与MapReduce应用程序对应的ApplicationMaster。
对于一个MapReduce程序,输入输出类型以下所示:
(input) <k1, v1> ->
map -> <k2, v2> ->
combine -> <k2, v2> ->
reduce -> <k3, v3>
(output)
InputFormat、Mapper、Reducer、OutputFormat都是用户可以实现的,不过通常情况下用户只需要实现Mapper和Reducer,其他默认就可以了。
MapReduce中存在4个独立的实体
1. Job Client
运行在Client上,负责将MapReduce程序打包成Jar包存储到HDFS,并把Jar包的路径提交到 Job Tracker上,由Job Tracker进行任务的分配和监控。
2. Job Tracker
运行在NameNode上,负责接收Job Client提交的Job,调度Job的每一个子Task运行于Task Tracker之上,并监控它们,如果有运行失败的task就重新运行它。
3. Task Tracker
运行在DataNode上,负责主动与Job Tracker通信,接收作业,并直接执行每一个任务。
4. HDFS
用来与Job Client、Job Tracker和Task Tracker共享作业文件。
各实体间通过以下过程完成一次MapReduce作业
- 在客户端启动一个作业。
- 向Job Tracker请求一个Job ID。
- 将运行作业所需要的资源文件复制到HDFS上,包括MapReduce程序打包的JAR文件、配置文件和客户端计算所得的输入划分信息。这些文件都存放在Job Tracker专门为该作业创建的文件夹中。文件夹名为该作业的Job ID。JAR文件默认会有10个副本(mapred.submit.replication属性控制);输入划分信息告诉了Job Tracker应该为这个作业启动多少个map任务等信息。
- Job Tracker接收到作业后,将其放在一个作业队列里,等待作业调度器对其进行调度,当作业调度器根据自己的调度算法调度到该作业时,会根据输入划分信息为每个划分创建一个map任务,并将map任务分配给Task Tracker执行。对于map和reduce任务,Task Tracker根据主机核的数量和内存的大小有固定数量的map槽和reduce槽。这里需要强调的是:map任务不是随随便便地分配给某个Task Tracker的,这里有个概念叫:数据本地化(Data-Local)。意思是:将map任务分配给含有该map处理的数据块的Task Tracker上,同时将程序JAR包复制到该Task Tracker上来运行,这叫“运算移动,数据不移动”。而分配reduce任务时并不考虑数据本地化。
- Task Tracker每隔一段时间会给Job Tracker发送一个心跳,告诉Job Tracker它依然在运行,同时心跳中还携带着很多的信息,比如当前map任务完成的进度等信息。当Job Tracker收到作业的最后一个任务完成信息时,便把该作业设置成“成功”。当Job Client查询状态时,它将得知任务已完成,便显示一条消息给用户。
2. Map和Reduce原理
① Map
-
Splitting: 读取HDFS中的file(由多个block组成),文件将被分割成许多split(默认情况下以HDFS一个块的大小为单位)。每一个split对应一个Mapper Task对其进行处理。输入分片存储的并非数据本身,而是分片长度和一个记录数据位置的数组。InputFormat组件用来解析输入数据的存放格式,会将输入数据解析成一系列键值对<k,v>
-
Mapping: map线程(Mapper Task)将输入的split拆分成一组键值对。**map函数需要程序员重写。**map输出的结果会暂时放在一个环形内存缓存区中,当这个缓冲区快要溢出时(默认缓冲区的80%),会在本地文件系统中创建一个溢出文件,将该缓冲区中的数据写入这个文件。所以对于一个map线程来说,它可能会产生一系列的溢出文件。
-
Shuffling: shuffle阶段负责将map端生成的数据传递给reduce端,因此shuffle阶段分为两部分,一部分在map端执行,另一部分在reduce端执行。
① map端部分:分析数据结果属于哪一个partition,一个partition对应一个reduce,然后将map数据写入Memory Buffer中,达到80%的阈值,开启溢写进磁盘过程,同时进行key排序,如果有combiner步骤,则会对相同的key做归并处理,最终多个溢写文件合并成一个文件。
② reduce部分:reduce节点从各个节点拉取存储在磁盘上的数据放入Memory Buffer中,同理将各个map的数据进行合并并存在磁盘,最终磁盘的数据和缓存区剩下的20%合并传递给reduce阶段。
数据分组 Partition
Mapper数据处理之后输出之前(如上文所说,为shuffling阶段的一部分),输入key会经过Partitioner分组(分桶)选择不同的reduce。默认情况下,Partitioner会对key进行hash取模。比如有6个Reduce Task,那么就会对key取hash code以后mod 6。
② reduce
对shuffle阶段传来的数据进行最后的整理合并。reduce阶段由一定数量Reduce Task组成,这些Reduce Task可以并行运行。
- 数据运程拷贝:Reduce Task要运程拷贝每个mapper Task处理的结果,从每个结果里读取一部分(Partitioner决定每个Reduce Task拷贝哪一部分)
- 数据按照key排序:Reduce Task读取完数据之后,要按照key进行排序,相同的key被放到一组,交给同一个Reduce Task处理
- 数据处理,即执行Reducer函数。
- 数据输出,OutputFormat,Reducer统计的结果将按照OutputFormat格式输出。
3. 示例:Word Count
Word Count是MapReduce最简单的实例,其描述为:
假如有大量文件,里面存储的都是单词,如何统计每个单词出现的次数。
解决方案:首先,分别统计每个文件中单词出现的次数,然后累加不同文件中同一个单词出现的次数。
1. Input
Job Client输入输入文件的存储位置,在本例中,输入文件中包含了三行英文语句。
2. Splitting
Job Client通过InputFormat接口设计分割的逻辑,默认按照HDFS文件分割(这里说明其实MapReduce能处理的不只是HDFS架构的文件)
Hadoop把文件再次分割为<k, v>对。在Word Count中,将该文件分成三个split所得到的的三个<k, v>键值对,其中k代表的是行首的偏移量,v的值则为该行文件的内容。所以在本例中:
<k1, v1> = <0, Dear Bear River>
<k2, v2> = <16, Car Car River>
<k3, v3> = <30, Dear Car Bear>
3. Mapping
Map通过RecordReader读取分割好的<k, v>对(Input文件分割出的每一个split对应一个Map Task),Map Task会根据用户自定义的map函数,运行完毕后,将这一个键值对处理成一系列键值对,并将其写到Hadoop的内存缓冲区中。以第一个键值对为例,在第一个Map Task中,它将会被解析成为:
<0, Dear Bear River> -> <Dear, 1>
<Bear, 1>
<River, 1>
4. Map Shuffling
上一步解析出来的键值对在缓冲区中会按照key排序,然后partitioner会将其分区,在word count中,首先会对key根据ASCII码排序,然后key相同的分为同一个partition(这里假设partitioner设置的分区机制就是相同单词放在同一个Reduce Task中),并将相同partition的键值对进行合并。存储缓冲区空间达到80%的时候,内容将被溢写进本地磁盘的文件里,这个文件称为Split file(在真实例子中,由于一个split文件的大小一般在128M左右,即一个block的大小,所以可能会产生多个split文件,每个split文件内都有关于partition的划分)。多个split文件中相同的partition划分再在磁盘中排序合并。
所以本例最终map shuffling的结果如下(由于例子中数据量太少,无法体现以上内容):
map1 -> <Bear, 1> <Dear, 1> <River, 1>
map2 -> <Car, 1, 1> <River, 1>
map3 -> <Bear, 1> <Car, 1> <Dear, 1>
5. Reduce Shuffling + Reducing
在本例中,假设设定一个单词对应一个Reduce Task,所以有4个Reduce Task,每个Reduce Task将去每个Map Task的输出结果中拉取属于它的partition的部分,比如Reduce Task 1负责统计单词Bear的个数,那么它需要去map 1、map 2、map 3中读取key为Bear的键值对。
然后将读取到的键值对根据key进行排序,再合并(由于这个例子中,partition设置的机制就是相同key的为同一个分区,所以看不出来排序的效果),则Reduce Task 1的执行流程既是:
map 1
<Bear, 1>
map 2 排序 合并 <Bear, 2>
<Bear, 1>
map 3
6. Final Result
本例中,最终结果如下所示,结果将会被写进HDFS的文件中。
reduce task 1 -> <Bear, 2>
reduce task 2 -> <Car, 3>
reduce task 3 -> <Dear, 2>
reduce task 4 -> <River, 2>
注:该篇文章中图片都是从网上搜的,但真的找不到具体地址了。