浅谈Hadoop的应用

今天没有多少的心思学习新的内容,但是对手机的依赖也并不是很大,就干脆来发布一条心得,来谈谈Hadoop的实战吧

hadoop

介绍

学过大数据的我们都知道,Mapreduce是一种模式。 Hadoop是一种框架。 Hadoop是一个实现了mapreduce模式的开源的分布式并行编程框架。MapReduce是一种简化的分布式的编程模式,让程序自动分配到一个由普普通通的机器组成的超大集群上并发执行的。mapreduce的run-time系统会解决输入数据的分布细节,跨越机器集群的执行调度,处理机器的实效,并且管理机器之间的通讯请求。这样的模式允许程序员可以不需要有什么并发处理或者分布式系统的经验,就可以处理超大的分布式系统得资源。

概论

如果想做一名Hadoop的程序员,那么要做的事情就是

1. 定义Mapper,处理输入的Key-Value对,然后输出中间的结果。
2. 定义Reducer,(这个可选),对中间的结果进行规约,输出结果
3. 定义InputFormat和OutputFormat,这个步骤也是可选的
    接触过Hadoop的肯定对这些有很大的了解,InputFormat将每行输入文件的内容转换成Java类供Mapper函数使用,如果这个时候没有定义的话,默认的就是String。
4. 定义main函数,在main函数中定义一个Job,并且执行它。
5. 将写好的Java文件用maven打包,(当然,可以使用其他的方法打包),然后上传到Hadoop的madreduce文件夹中。
6. 应用这个.jar的包去处理hdfs中已经上传的文件
7. 然后剩下的事情就交给了系统。

我们来说说,文件系统和一些方法的基本概念和用法

我们肯定知道,Hadoop的hdfs实现了GooGle的GFS的文件系统,
Hadoop安装完成以后,我们通过jps可以查看他有很多的进程,当然
我们肯定也都知道这些进程是什么意思:
1.Hadoop的HDFS实现了google的GFS文件系统,NameNode作为文件
系统的负责调度运行在master,DataNode运行在每个机器上。同时
Hadoop实现了Google的MapReduce,JobTracker作为MapReduce
的总调度运行在master,TaskTracker则运行在每个机器上执行Task。

2.main()函数,主要就是创建JobConf,定义一下Mapper。
Reducer,Input/OutputFormat来输入输出文件目录,最后
把Job提交给JobTracker。

3. JobTracker,创建一个InputFormat的实例,调用getSplit()方法,
把输入目录的文件拆分成FileSplit作为task的输入,然后加入Queue。

4. TaskTrack向JobTracker索求下一个Map/Reduce。

 Mapper Task先从InputFormat创建RecordReader,循环读入FileSplits的内容生成Key与Value,传给Mapper函数,处理完后中间结果写成SequenceFile.
 Reducer Task 从运行Mapper的TaskTracker的Jetty上使用http协议获取所需的中间内容(33%),Sort/Merge后(66%),执行Reducer函数,最后按照OutputFormat写入结果目录。 

  TaskTracker 每10秒向JobTracker报告一次运行情况,每完成一个Task10秒后,就会向JobTracker索求下一个Task。

Nutch项目的全部数据处理都构建在Hadoop之上,详情可以看我的博客Scalable Computing with Hadoop。
这里写一个简单的分布式Grep,简单对输入文件进行逐行的正则匹配,如果符合就将该行打印到输出文件。因为是简单的全部输出,所以我们只要写Mapper函数,不用写Reducer函数,也不用定义Input/Output Format。

import java.io.IOException;
import java.net.URI;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class testGrep {
    public static class GrepMapper extends
            Mapper<Object, Text, Text, IntWritable> {

        @Override
        public void map(Object obj, Text text, Context context)
                throws IOException, InterruptedException {
            String pattern = context.getConfiguration().get("grep");
            // System.out.println(split.getPath().toString());
            String str = text.toString();
            Pattern r = Pattern.compile(pattern);
            Matcher m = r.matcher(str);
            if (m.find()) {
                FileSplit split = (FileSplit) context.getInputSplit();
                String filename = split.getPath().getName();
                context.write(new Text(filename), new IntWritable(1));
            }
        }
    }

    public static class GrepReducer extends
            Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        public void reduce(Text text, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable t : values) {
                sum += t.get();
            }
            context.write(text, new IntWritable(sum));
        }
    }


    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        String pattern = ".22*.";//匹配含有w字符,这里修改我们需要匹配的模式
        conf.set("grep", pattern);// 在这里设置需要匹配的正则表达式
        Job job = Job.getInstance(conf, "grep");
        job.setJarByClass(testGrep.class);
        job.setMapperClass(GrepMapper.class);
        job.setReducerClass(GrepReducer.class);
        job.setCombinerClass(GrepReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //下面根据具体情况进行修改
        String args1 = "hdfs://master:9000/user/grep_input";
        String args2 = "hdfs://master:9000/user/grep_output";
        FileSystem fs = FileSystem.newInstance(URI.create(args1), conf);
        fs.delete(new Path(args2), true);
        FileInputFormat.addInputPath(job, new Path(args1));
        FileOutputFormat.setOutputPath(job, new Path(args2));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}

Hadoop的运行

Hadoop的运行分为两种,一种是本地模式,一种是集群模式,这里集群模式参考我之前的文章,Hadoop的安装教程

  1 )执行bin/hadoop dfs 可以看到它所支持的文件操作指令。   

  2) 创建目录输入input:   
       $ bin/hadoop dfs -mkdir input    

  3)上传文件xx.log到指定目录 input :   
       $ bin/hadoop dfs -put xx.log input

   4 )  执行 bin/hadoop demo.hadoop.HadoopGrep input  output 
         (jar包运行:bin/hadoop jar HadoopGrep.jar  HadoopGrep  input   /tmp/output  "[a-b]" ) 
   5 ) 查看输出文件:

       将输出文件从分布式文件系统拷贝到本地文件系统查看: 
        $ bin/hadoop fs -get output output 
        $ cat output/* 
        或者
        在分布式文件系统上查看输出文件:
        $ bin/hadoop fs -cat output/*

        重新执行前,运行hadoop/bin/hadoop dfs -rm output删除output目录 

   7.运行hadoop/bin/stop-all.sh 结束。 

总结

我们可以发现,其实Hadoop并不适用于文件特别小的情况,
我们知道hdfs的一个block块的大小是128M,整体有一个
10%的溢出情况,即便这样,你的数据如果只有几M或者只
有几十、几百k的情况,需要对你的海量的数据进行整合处
理,要不然资源进行了浪费,就连带宽,如果文件小、数量
小、集群数量小,处理的复杂度小的时候,也不是很有优势。
例如:
如果我们不用Hadoop用java写简单的grep函数处理100M的log文
件主需要4s,用了hadoop local的方式运行是14秒,用了hadoop
单机集群的方式是30秒,用双机集群10M网口的话则更慢。

这里的内容是我学习Hadoop以后的心得体会,

上一篇:Hadoop实践小项目——统计学生成绩,按总成绩排序


下一篇:03_MapReduce框架原理_3.2 Job提交流程(源码)