MapReduce实践及Yarn资源调度

文章目录

1.MapReduce介绍

1.理解MapReduce思想

​ MapReduce思想在生活中处处可见。或多或少都曾接触过这种思想。MapReduce的思想核心是“分而治之”,适用于大量复杂的任务处理场景(大规模数据处理场景)。即使是发布过论文实现分布式计算的谷歌也只是实现了这种思想,而不是自己原创。

Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。

Reduce(规约)负责“合”,即对map阶段的结果进行全局汇总。

这两个阶段合起来正是MapReduce思想的体现。

MapReduce实践及Yarn资源调度

还有一个比较形象的语言解释MapReduce:

我们要数图书馆中的所有书。你数1号书架,我数2号书架。这就是“Map”。我们人越多,数书就更快。

现在我们到一起,把所有人的统计数加在一起。这就是“Reduce”。

2.Hadoop MapReduce设计构思

MapReduce是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在Hadoop集群上。

既然是做计算的框架,那么表现形式就是有个输入(input),MapReduce操作这个输入(input),通过本身定义好的计算模型,得到一个输出(output)。

如何对付大数据处理:分而治之

l 构建抽象模型:Map和Reduce

MapReduce借鉴了函数式语言中的思想,用Map和Reduce两个函数提供了高层的并行编程抽象模型。

Map: 对一组数据元素进行某种重复式的处理;

Reduce: 对Map的中间结果进行某种进一步的结果整理。

MapReduce中定义了如下的Map和Reduce两个抽象的编程接口,由用户去编程实现:

map: (k1; v1) → [(k2; v2)]

reduce: (k2; [v2]) → [(k3; v3)]

MapReduce实践及Yarn资源调度

3.MapReduce编程规范及示例编写

编程规范

mapReduce编程模型的总结:

MapReduce的开发一共有八个步骤其中map阶段分为2个步骤,shuffle阶段4个步骤,reduce阶段分为2个步骤

Map阶段2个步骤

第一步:设置inputFormat类,将我们的数据切分成key,value对,输入到第二步

第二步:自定义map逻辑,处理我们第一步的输入数据,然后转换成新的key,value对进行输出

shuffle阶段4个步骤(了解,可以全部不用管)

第三步:对输出的key,value对进行分区

第四步:对不同分区的数据按照相同的key进行排序

第五步:对分组后的数据进行规约(combine操作),降低数据的网络拷贝(可选步骤)

第六步:对排序后的额数据进行分组,分组的过程中,将相同key的value放到一个集合当中

reduce阶段2个步骤

第七步:对多个map的任务进行合并,排序,写reduce函数自己的逻辑,对输入的key,value对进行处理,转换成新的key,value对进行输出

第八步:设置outputformat将输出的key,value对数据进行保存到文件中

4.WordCount实例

4.1准备数据并上传

cd /opt/servers
vim wordcount.txt

hello,world,hadoop
hive,sqoop,flume,hello
kitty,tom,jerry,world
hadoop

hdfs dfs -mkdir /wordcount/input
hdfs dfs -put wordcount.txt /wordcount/input

4.2测试官方案例

hadoop jar /opt/servers/hadoop-2.7.7/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.7.jar wordcount /wordcount/input /wordcount/output

4.3定义一个mapper类

public class WordCountMapper extends Mapper<LongWritable,Text,Text,LongWritable> {
     @Override
     public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
         String line = value.toString();
         String[] split = line.split(",");
         for (String word : split) {
             context.write(new Text(word),new LongWritable(1));
         }
 
     }
 }

4.4定义一个reducer类

public class WordCountReducer extends Reducer<Text,LongWritable,Text,LongWritable> {
    /**
     * 自定义我们的reduce逻辑
     * 所有的key都是我们的单词,所有的values都是我们单词出现的次数
     * @param key
     * @param values
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        long count = 0;
        for (LongWritable value : values) {
            count += value.get();
        }
        context.write(key,new LongWritable(count));
    }
}

4.5定义一个主类,并提交job

public class JobMain {
     /**
     * 程序main函数的入口类
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
         Job job = Job.getInstance(configuration, JobMain.class.getSimpleName());
        //打包到集群上面运行时候,必须要添加以下配置,指定程序的main函数
        job.setJarByClass(JobMain.class);
        //第一步:读取输入文件解析成key,value对
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("hdfs://192.168.65.101:8020/wordcount"));

        //第二步:设置我们的mapper类
        job.setMapperClass(WordCountMapper.class);
        //设置我们map阶段完成之后的输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        //第三步,第四步,第五步,第六步,省略
        //第七步:设置我们的reduce类
        job.setReducerClass(WordCountReducer.class);
        //设置我们reduce阶段完成之后的输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        //第八步:设置输出类以及输出路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("hdfs://192.168.65.101:8020/wordcount_out"));
        waitForCompletion(true);
        
    }
}

5.hadoop中分片

split,默认情况下一个块对应一个片。

400m ,128 128 44 128 1个map 128 1个map 44 1个map

130m 128 2 130 1个map

2.Yarn资源调度

1.yarn集群的监控管理界面:

http://192.168.65.101:8088/cluster

2.Yarn介绍

yarn是hadoop集群当中的资源管理系统模块,从hadoop2.x开始引入yarn来进行管理集群当中的资源(主要是服务器的各种硬件资源,包括CPU,内存,磁盘,网络IO等)以及运行在yarn上面的各种任务。

总结一句话就是说:yarn主要就是为了调度资源,管理任务等

一级调度管理:
计算资源管理(CPU,内存,网络IO,磁盘)
App生命周期管理 (每一个应用执行的情况,都需要汇报给ResourceManager)
二级调度管理:
	任务内部的计算模型管理  (AppMaster的任务精细化管理)
	多样化的计算模型 

3.yarn当中的调度器介绍:

第一种调度器:FIFO Scheduler (队列调度器)

把应用按提交的顺序排成一个队列,这是一个先进先出队列,在进行资源分配的时候,先给队列中最头上的应用进行分配资源,待最头上的应用需求满足后再给下一个分配,以此类推。

FIFO Scheduler是最简单也是最容易理解的调度器,也不需要任何配置,但它并不适用于共享集群。大的应用可能会占用所有集群资源,这就导致其它应用被阻塞。在共享集群中,更适合采用Capacity Scheduler或Fair Scheduler,这两个调度器都允许大任务和小任务在提交的同时获得一定的系统资源。
MapReduce实践及Yarn资源调度

第二种调度器:capacity scheduler(容量调度器,apache版本默认使用的调度器)

Capacity 调度器允许多个组织共享整个集群,每个组织可以获得集群的一部分计算能力。通过为每个组织分配专门的队列,然后再为每个队列分配一定的集群资源,这样整个集群就可以通过设置多个队列的方式给多个组织提供服务了。除此之外,队列内部又可以垂直划分,这样一个组织内部的多个成员就可以共享这个队列资源了,在一个队列内部,资源的调度是采用的是先进先出(FIFO)策略。
MapReduce实践及Yarn资源调度

第三种调度器:Fair Scheduler(公平调度器,CDH版本的hadoop默认使用的调度器)

Fair调度器的设计目标是为所有的应用分配公平的资源(对公平的定义可以通过参数来设置)。公平调度在也可以在多个队列间工作。举个例子,假设有两个用户A和B,他们分别拥有一个队列。当A启动一个job而B没有任务时,A会获得全部集群资源;当B启动一个job后,A的job会继续运行,不过一会儿之后两个任务会各自获得一半的集群资源。如果此时B再启动第二个job并且其它job还在运行,则它将会和B的第一个job共享B这个队列的资源,也就是B的两个job会用于四分之一的集群资源,而A的job仍然用于集群一半的资源,结果就是资源最终在两个用户之间平等的共享

MapReduce实践及Yarn资源调度

使用哪种调度器取决于yarn-site.xml当中的

yarn.resourcemanager.scheduler.class 这个属性的配置

上一篇:[dubbo] Dubbo API 笔记——配置参考


下一篇:[转帖]一文看懂mysql数据库本质及存储引擎innodb+myisam