一、DistributedCache简介
DistributedCache是hadoop框架提供的一种机制,可以将job指定的文件,在job执行前,先行分发到task执行的机器上,并有相关机制对cache文件进行管理.
常见的应用场景有:
分发第三方库(jar,so等);分发算法需要的词典文件;分发程序运行需要的配置;分发多表数据join时小表数据简便处理等
主要的注意事项有:
1.DistributedCache只能应用于分布式的情况,包括伪分布式,完全分布式.有些api在这2种情况下有移植性问题.
2.需要分发的文件,必须提前放到hdfs上.默认的路径前缀是hdfs://的,不是file://
3.需要分发的文件,最好在运行期间是只读的.
4.不建议分发较大的文件,比如压缩文件,可能会影响task的启动速度.
二、DistributedCache的相关配置
MRv1
属性名 | 默认值 | 备注 |
${hadoop.tmp.dir}/mapred/local |
The local directory where MapReduce stores intermediate data files. May be a comma-separated list of directories on different devices in order to spread disk i/o. Directories that do not exist are ignored. |
|
10737418240(10G) |
The number of bytes to allocate in each local TaskTracker directory for holding Distributed Cache data. |
|
10000 |
The maximum number of subdirectories that should be created in any particular distributed cache store. After this many directories have been created, cache items will be expunged regardless of whether the total size threshold has been exceeded. |
|
mapreduce.tasktracker.cache.local.keep.pct |
0.95(作用于上面2个参数) |
It is the target percentage of the local distributed cache that should be kept in between garbage collection runs. In practice it will delete unused distributed cache entries in LRU order until the size of the cache is less than mapreduce.tasktracker.cache.local.keep.pct of the maximum cache size. This is a floating point value between 0.0 and 1.0. The default is 0.95. |
MRv2
新的yarn架构的代码还没有看过,不过从配置里可以看出相关的如下配置,本文主要基于MRv1.
yarn.nodemanager.delete.debug-delay-sec
yarn.nodemanager.local-cache.max-files-per-directory
三、DistributedCache的使用方式
1.通过配置
可以配置这三个属性值:
mapred.cache.files,
mapred.cache.archives,
mapred.create.symlink (值设为yes 如果要建link的话)
如果要分发的文件有多个的话,要以逗号分隔(貌似在建link的时候,逗号分隔前后还不能有空格,否则会报错)
2.使用命令行
在pipes和streaming里面可能会用到
-files Specify comma-separated files to be copied to the Map/Reduce cluster
-libjars Specify comma-separated jar files to include in the classpath
-archives Specify comma-separated archives to be unarchived on the compute machines
例如:
-files hdfs://host:fs_port/user/testfile.txt
-files hdfs://host:fs_port/user/testfile.txt#testfile
-files hdfs://host:fs_port/user/testfile1.txt,hdfs://host:fs_port/user/testfile2.txt
-archives hdfs://host:fs_port/user/testfile.jar
-archives hdfs://host:fs_port/user/testfile.tgz#tgzdir
3.代码调用
DistributedCache.addCacheFile(URI,conf) / DistributedCache.addCacheArchive(URI,conf)
DistributedCache.setCacheFiles(URIs,conf) / DistributedCache.setCacheArchives(URIs,conf)
如果要建link,需要增加DistributedCache.createSymlink(Configuration)
获取cache文件可以使用
getLocalCacheFiles(Configuration conf)
getLocalCacheArchives(Configuration conf)代码调用常常会有各样的问题,一般我比较倾向于通过createSymlink的方式来使用,就把cache当做当前目录的文件来操作,简单很多.
常见的通过代码来读取cache文件的问题如下:
a.getLocalCacheFiles在伪分布式情况下,常常返回null.
b.getLocalCacheFiles其实是把DistributedCache中的所有文件都返回.需要自己筛选出所需的文件.archives也有类似的问题.
c.getLocalCacheFiles返回的是tt机器本地文件系统的路径,使用的时候要注意,因为很多地方默认的都是hdfs://,可以自己加上file://来避免这个问题
4.symlink
给分发的文件,在task运行的当前工作目录建立软连接,在使用起来的时候会更方便.没有上面的各种麻烦
mapred.create.symlink 需要设置为yes,不是true或Y之类哦
5.实际文件存放情况
下图显示的为tt机器上实际文件的状况 (只有yarn集群的截图)
四、DistributedCache的内部基本流程
1.每个tasktracker启动时,都会产生一个TrackerDistributedCacheManager对象,用来管理该tt机器上所有的task的cache文件.
2.在客户端提交job时,在JobClient内,对即将cache的文件,进行校验
以确定文件是否存在,文件的大小,文件的修改时间,以及文件的权限是否是private or public.
3.当task在tt初始化job时,会由TrackerDistributedCacheManager产生一个TaskDistributedCacheManager对象,来管理本task的cache文件.
4.和本task相关联的TaskDistributedCacheManager,获取并解压相关cache文件到本地相应目录
如果本tt机器上已经有了本job的其他task,并已经完成了相应cache文件的获取和解压工作,则不会重复进行.
如果本地已经有了cache文件,则比较修改时间和hdfs上的文件是否一致,如果一致则可以使用.
5.当task结束时,会对该cache进行ref减一操作.
6.TrackerDistributedCacheManager有一个clearup线程,每隔1min会去处理那些无人使用的,目录大小大于local.cache.size或者子目录个数大于mapreduce.tasktracker.cache.local.numberdirectories的cache目录.