hadoop学习(四)Map/Reduce数据分析简述-示例-电话通讯清单

    假如我们集群和伪分布式hadoop系统已经搭建完毕。我们都会根据官网或一些资料提供的wordcount函数来测试我们系统是否能正常工作。假设,我们在执行wordcount函数,都没有问题。那我们就可以开始写M/R程序,开始数据分析了。

   因为,hadoop集群,还有其他一些组件需要我们去安装,这里还没有涉及,暂时不考虑。你要做的就是,把要分析的数据上传到HDFS中。至于其余组件,遇到的时候,在学习。这里对概念,不做太多的介绍。必要的概念,和程序执行步骤,这个是必须了解的。

任务要求:分析通话记录,查处每个手机号码有哪些打过来的号码

  — 现有电话通讯清单,记录用户A打给用户B的记录

  — 查找出没有电话号码,都有哪些电话来了

例如:120   13688893333 13733883333 1393302942  

说明:120 有后面3个电话打过来

要实现上述功能,传统处理方面同样也可以实现,但当数据大到一定程度,就会遇到瓶颈:

1、通过c,java程序直接截取上面的数据。

2、把数据直接导入数据库中,直接select也能解决问题。

3、但当数据TB级别的时候,简单的计算的话,传统方法就会遇到瓶颈。这时候M/R就起到作用了,hadoop会将我们上传到HDFS中的数据自动的分配到每一台机器中。

4、mapper程序,开始按行读取数据: 一行行的读入,然后做的是:一个个的分割原始数据、输出所需要数据,处理异常数据(导致数据的崩溃,如何处理异常数据)最后输出到HDFS上。每一行数据都会运行这个mapper,最后这个mapper输出到hdfs上。最后得到的结果在输出到hdfs上。

5、可以没有reduce函数,根据情况而定。不带mapper函数的输出发送到输出文件,map函数的输出格式必须与程序输出格式一致。

6、带有reduce函数任务,系统首先把mapper中输出的key相同的部分都发送到同一个reduce,然后再把reduce函数的结果输出,map函数的输出格式必须和reduce函数的输入格式一致。


Mapper的功能:
1、分割原始数据
2、输出所需数据
3、处理异常数据

Reduce的功能:
1、某一台机器负责一些key,最后通过reduce函数汇总,输出:

源数据:
13599999999 10086
13633333333 120
12678998778    dada13690378933
13833238233 13690378933
12678998778 120
19837366873 10086
16389323733 10086
18373984487 120
19877367388 13690378933
18937933444 13840238422
输出数据:中间用制表符分隔
10086    13599999999|19837366873|16389323733|
120    13633333333|12678998778|18373984487|
13690378933    13833238233|19877367388|
13840238422    18937933444|
测试代码:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;


public class PhoneTest extends Configured implements Tool {

    enum Counter {
        LINESKIP; // 出错的行
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        Job job = new Job(conf, "PhoneTest"); // 任务名
        job.setJarByClass(PhoneTest.class); // 指定Class
        FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/home/zhangzhen/input")); // 输入路径
        FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/home/zhangzhen/output")); // 输出路径
       
        job.setMapperClass(Map.class); // 调用Map类作为Mapper任务代码
        job.setReducerClass(Reduce.class); // 调用Reduce类作为Reducer任务代码
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class); // 指定输出的Key的格式(KEYOUT)
        job.setOutputValueClass(Text.class); // 指定输出的Value的格式(VALUEOUT)
        job.waitForCompletion(true);
        
        return job.isSuccessful() ? 0 : 1;
    }

    public static class Map extends
            Mapper<LongWritable, Text, Text, Text> {    //<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            try {
                // key - 行号 value - 一行的文本
                String line = value.toString();    //13510000000 10086(13510000000拨打10086)
                // 数据处理
                String[] lineSplit = line.split(" ");
                String phone1 = lineSplit[0];
                String phone2 = lineSplit[1];
                
                context.write(new Text(phone2), new Text(phone1));    // 输出 key \t value
            } catch (java.lang.ArrayIndexOutOfBoundsException e) {
                context.getCounter(Counter.LINESKIP).increment(1); // 出错令计数器+1
            }
        }

    }
    
    public static class Reduce extends Reducer<Text, Text, Text, Text> {    
        
        @Override
        protected void reduce(Text key, Iterable<Text> values,
                Context context)
                throws IOException, InterruptedException {
            String valueStr;
            String out = "";
            for(Text value:values){
                valueStr = value.toString() + "|";
                out += valueStr;
            }
            // 输出 key \t value(如果我们的输出结果不是key \t value格式,那么我们的key可定义为NullWritable,而value使用key与value的组合。)
            context.write(key, new Text(out));    
        }
    }
    
    public static void main(String[] args) throws Exception {
        //运行任务
        int res = ToolRunner.run(new Configuration(), new PhoneTest(), args);
        System.exit(res);
    }
}

eclipse终端输出:
14/01/22 10:44:47 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/01/22 10:44:47 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
14/01/22 10:44:47 INFO input.FileInputFormat: Total input paths to process : 1
14/01/22 10:44:47 WARN snappy.LoadSnappy: Snappy native library not loaded
14/01/22 10:44:48 INFO mapred.JobClient: Running job: job_local1140955208_0001
14/01/22 10:44:48 INFO mapred.LocalJobRunner: Waiting for map tasks
14/01/22 10:44:48 INFO mapred.LocalJobRunner: Starting task: attempt_local1140955208_0001_m_000000_0
14/01/22 10:44:48 INFO util.ProcessTree: setsid exited with exit code 0
14/01/22 10:44:48 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@99548b
14/01/22 10:44:48 INFO mapred.MapTask: Processing split: hdfs://localhost:9000/home/zhangzhen/input/tonghua.txt:0+202
14/01/22 10:44:48 INFO mapred.MapTask: io.sort.mb = 100
14/01/22 10:44:48 INFO mapred.MapTask: data buffer = 79691776/99614720
14/01/22 10:44:48 INFO mapred.MapTask: record buffer = 262144/327680
14/01/22 10:44:48 INFO mapred.MapTask: Starting flush of map output
14/01/22 10:44:48 INFO mapred.MapTask: Finished spill 0
14/01/22 10:44:48 INFO mapred.Task: Task:attempt_local1140955208_0001_m_000000_0 is done. And is in the process of commiting
14/01/22 10:44:48 INFO mapred.LocalJobRunner: 
14/01/22 10:44:48 INFO mapred.Task: Task ‘attempt_local1140955208_0001_m_000000_0‘ done.
14/01/22 10:44:48 INFO mapred.LocalJobRunner: Finishing task: attempt_local1140955208_0001_m_000000_0
14/01/22 10:44:48 INFO mapred.LocalJobRunner: Map task executor complete.
14/01/22 10:44:48 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@1457048
14/01/22 10:44:48 INFO mapred.LocalJobRunner: 
14/01/22 10:44:48 INFO mapred.Merger: Merging 1 sorted segments
14/01/22 10:44:48 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 194 bytes
14/01/22 10:44:48 INFO mapred.LocalJobRunner: 
14/01/22 10:44:48 INFO mapred.Task: Task:attempt_local1140955208_0001_r_000000_0 is done. And is in the process of commiting
14/01/22 10:44:48 INFO mapred.LocalJobRunner: 
14/01/22 10:44:48 INFO mapred.Task: Task attempt_local1140955208_0001_r_000000_0 is allowed to commit now
14/01/22 10:44:48 INFO output.FileOutputCommitter: Saved output of task ‘attempt_local1140955208_0001_r_000000_0‘ to hdfs://localhost:9000/home/zhangzhen/output
14/01/22 10:44:49 INFO mapred.LocalJobRunner: reduce > reduce
14/01/22 10:44:49 INFO mapred.Task: Task ‘attempt_local1140955208_0001_r_000000_0‘ done.
14/01/22 10:44:49 INFO mapred.JobClient:  map 100% reduce 100%
14/01/22 10:44:49 INFO mapred.JobClient: Job complete: job_local1140955208_0001
14/01/22 10:44:49 INFO mapred.JobClient: Counters: 23
14/01/22 10:44:49 INFO mapred.JobClient:   PhoneAnalyzer$Counter
14/01/22 10:44:49 INFO mapred.JobClient:     LINESKIP=1
14/01/22 10:44:49 INFO mapred.JobClient:   File Output Format Counters 
14/01/22 10:44:49 INFO mapred.JobClient:     Bytes Written=146
14/01/22 10:44:49 INFO mapred.JobClient:   File Input Format Counters 
14/01/22 10:44:49 INFO mapred.JobClient:     Bytes Read=202
14/01/22 10:44:49 INFO mapred.JobClient:   FileSystemCounters
14/01/22 10:44:49 INFO mapred.JobClient:     FILE_BYTES_READ=568
14/01/22 10:44:49 INFO mapred.JobClient:     HDFS_BYTES_READ=404
14/01/22 10:44:49 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=136400
14/01/22 10:44:49 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=146
14/01/22 10:44:49 INFO mapred.JobClient:   Map-Reduce Framework
14/01/22 10:44:49 INFO mapred.JobClient:     Reduce input groups=4
14/01/22 10:44:49 INFO mapred.JobClient:     Map output materialized bytes=198
14/01/22 10:44:49 INFO mapred.JobClient:     Combine output records=0
14/01/22 10:44:49 INFO mapred.JobClient:     Map input records=10
14/01/22 10:44:49 INFO mapred.JobClient:     Reduce shuffle bytes=0
14/01/22 10:44:49 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
14/01/22 10:44:49 INFO mapred.JobClient:     Reduce output records=4
14/01/22 10:44:49 INFO mapred.JobClient:     Spilled Records=18
14/01/22 10:44:49 INFO mapred.JobClient:     Map output bytes=174
14/01/22 10:44:49 INFO mapred.JobClient:     Total committed heap usage (bytes)=340262912
14/01/22 10:44:49 INFO mapred.JobClient:     CPU time spent (ms)=0
14/01/22 10:44:49 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
14/01/22 10:44:49 INFO mapred.JobClient:     SPLIT_RAW_BYTES=119
14/01/22 10:44:49 INFO mapred.JobClient:     Map output records=9
14/01/22 10:44:49 INFO mapred.JobClient:     Combine input records=0
14/01/22 10:44:49 INFO mapred.JobClient:     Reduce input records=9
运行结束后,你可以通过你设置的路径找到输出的文件,这里可以查看输出的文件;同样你也可以在eclipse中,如下图刷新DFS目录,就可以看到input和output两个目录,目录下的文件也可以在eclipse中打开查阅。

测试截图:
hadoop学习(四)Map/Reduce数据分析简述-示例-电话通讯清单

hadoop学习(四)Map/Reduce数据分析简述-示例-电话通讯清单

hadoop学习(四)Map/Reduce数据分析简述-示例-电话通讯清单


Copyright?BUAA

hadoop学习(四)Map/Reduce数据分析简述-示例-电话通讯清单

上一篇:云计算未来三大服务模式解析


下一篇:VS2013 UPDATE 1来了大家下载体验吧