假如我们集群和伪分布式hadoop系统已经搭建完毕。我们都会根据官网或一些资料提供的wordcount函数来测试我们系统是否能正常工作。假设,我们在执行wordcount函数,都没有问题。那我们就可以开始写M/R程序,开始数据分析了。
因为,hadoop集群,还有其他一些组件需要我们去安装,这里还没有涉及,暂时不考虑。你要做的就是,把要分析的数据上传到HDFS中。至于其余组件,遇到的时候,在学习。这里对概念,不做太多的介绍。必要的概念,和程序执行步骤,这个是必须了解的。
任务要求:分析通话记录,查处每个手机号码有哪些打过来的号码
— 现有电话通讯清单,记录用户A打给用户B的记录
— 查找出没有电话号码,都有哪些电话来了
例如:120 13688893333 13733883333 1393302942
说明:120 有后面3个电话打过来
要实现上述功能,传统处理方面同样也可以实现,但当数据大到一定程度,就会遇到瓶颈:
1、通过c,java程序直接截取上面的数据。
2、把数据直接导入数据库中,直接select也能解决问题。
3、但当数据TB级别的时候,简单的计算的话,传统方法就会遇到瓶颈。这时候M/R就起到作用了,hadoop会将我们上传到HDFS中的数据自动的分配到每一台机器中。
4、mapper程序,开始按行读取数据: 一行行的读入,然后做的是:一个个的分割原始数据、输出所需要数据,处理异常数据(导致数据的崩溃,如何处理异常数据)最后输出到HDFS上。每一行数据都会运行这个mapper,最后这个mapper输出到hdfs上。最后得到的结果在输出到hdfs上。
5、可以没有reduce函数,根据情况而定。不带mapper函数的输出发送到输出文件,map函数的输出格式必须与程序输出格式一致。
6、带有reduce函数任务,系统首先把mapper中输出的key相同的部分都发送到同一个reduce,然后再把reduce函数的结果输出,map函数的输出格式必须和reduce函数的输入格式一致。
Reduce的功能:
1、某一台机器负责一些key,最后通过reduce函数汇总,输出:
源数据:
13599999999 10086
13633333333 120
12678998778 dada13690378933
13833238233 13690378933
12678998778 120
19837366873 10086
16389323733 10086
18373984487 120
19877367388 13690378933
18937933444 13840238422
输出数据:中间用制表符分隔10086 13599999999|19837366873|16389323733|
120 13633333333|12678998778|18373984487|
13690378933 13833238233|19877367388|
13840238422 18937933444|
测试代码:import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; public class PhoneTest extends Configured implements Tool { enum Counter { LINESKIP; // 出错的行 } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = new Job(conf, "PhoneTest"); // 任务名 job.setJarByClass(PhoneTest.class); // 指定Class FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/home/zhangzhen/input")); // 输入路径 FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/home/zhangzhen/output")); // 输出路径 job.setMapperClass(Map.class); // 调用Map类作为Mapper任务代码 job.setReducerClass(Reduce.class); // 调用Reduce类作为Reducer任务代码 job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); // 指定输出的Key的格式(KEYOUT) job.setOutputValueClass(Text.class); // 指定输出的Value的格式(VALUEOUT) job.waitForCompletion(true); return job.isSuccessful() ? 0 : 1; } public static class Map extends Mapper<LongWritable, Text, Text, Text> { //<KEYIN, VALUEIN, KEYOUT, VALUEOUT> @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { try { // key - 行号 value - 一行的文本 String line = value.toString(); //13510000000 10086(13510000000拨打10086) // 数据处理 String[] lineSplit = line.split(" "); String phone1 = lineSplit[0]; String phone2 = lineSplit[1]; context.write(new Text(phone2), new Text(phone1)); // 输出 key \t value } catch (java.lang.ArrayIndexOutOfBoundsException e) { context.getCounter(Counter.LINESKIP).increment(1); // 出错令计数器+1 } } } public static class Reduce extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String valueStr; String out = ""; for(Text value:values){ valueStr = value.toString() + "|"; out += valueStr; } // 输出 key \t value(如果我们的输出结果不是key \t value格式,那么我们的key可定义为NullWritable,而value使用key与value的组合。) context.write(key, new Text(out)); } } public static void main(String[] args) throws Exception { //运行任务 int res = ToolRunner.run(new Configuration(), new PhoneTest(), args); System.exit(res); } }
eclipse终端输出:
14/01/22 10:44:47 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/01/22 10:44:47 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
14/01/22 10:44:47 INFO input.FileInputFormat: Total input paths to process : 1
14/01/22 10:44:47 WARN snappy.LoadSnappy: Snappy native library not loaded
14/01/22 10:44:48 INFO mapred.JobClient: Running job: job_local1140955208_0001
14/01/22 10:44:48 INFO mapred.LocalJobRunner: Waiting for map tasks
14/01/22 10:44:48 INFO mapred.LocalJobRunner: Starting task: attempt_local1140955208_0001_m_000000_0
14/01/22 10:44:48 INFO util.ProcessTree: setsid exited with exit code 0
14/01/22 10:44:48 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@99548b
14/01/22 10:44:48 INFO mapred.MapTask: Processing split: hdfs://localhost:9000/home/zhangzhen/input/tonghua.txt:0+202
14/01/22 10:44:48 INFO mapred.MapTask: io.sort.mb = 100
14/01/22 10:44:48 INFO mapred.MapTask: data buffer = 79691776/99614720
14/01/22 10:44:48 INFO mapred.MapTask: record buffer = 262144/327680
14/01/22 10:44:48 INFO mapred.MapTask: Starting flush of map output
14/01/22 10:44:48 INFO mapred.MapTask: Finished spill 0
14/01/22 10:44:48 INFO mapred.Task: Task:attempt_local1140955208_0001_m_000000_0 is done. And is in the process of commiting
14/01/22 10:44:48 INFO mapred.LocalJobRunner:
14/01/22 10:44:48 INFO mapred.Task: Task ‘attempt_local1140955208_0001_m_000000_0‘ done.
14/01/22 10:44:48 INFO mapred.LocalJobRunner: Finishing task: attempt_local1140955208_0001_m_000000_0
14/01/22 10:44:48 INFO mapred.LocalJobRunner: Map task executor complete.
14/01/22 10:44:48 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@1457048
14/01/22 10:44:48 INFO mapred.LocalJobRunner:
14/01/22 10:44:48 INFO mapred.Merger: Merging 1 sorted segments
14/01/22 10:44:48 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 194 bytes
14/01/22 10:44:48 INFO mapred.LocalJobRunner:
14/01/22 10:44:48 INFO mapred.Task: Task:attempt_local1140955208_0001_r_000000_0 is done. And is in the process of commiting
14/01/22 10:44:48 INFO mapred.LocalJobRunner:
14/01/22 10:44:48 INFO mapred.Task: Task attempt_local1140955208_0001_r_000000_0 is allowed to commit now
14/01/22 10:44:48 INFO output.FileOutputCommitter: Saved output of task ‘attempt_local1140955208_0001_r_000000_0‘ to hdfs://localhost:9000/home/zhangzhen/output
14/01/22 10:44:49 INFO mapred.LocalJobRunner: reduce > reduce
14/01/22 10:44:49 INFO mapred.Task: Task ‘attempt_local1140955208_0001_r_000000_0‘ done.
14/01/22 10:44:49 INFO mapred.JobClient: map 100% reduce 100%
14/01/22 10:44:49 INFO mapred.JobClient: Job complete: job_local1140955208_0001
14/01/22 10:44:49 INFO mapred.JobClient: Counters: 23
14/01/22 10:44:49 INFO mapred.JobClient: PhoneAnalyzer$Counter
14/01/22 10:44:49 INFO mapred.JobClient: LINESKIP=1
14/01/22 10:44:49 INFO mapred.JobClient: File Output Format Counters
14/01/22 10:44:49 INFO mapred.JobClient: Bytes Written=146
14/01/22 10:44:49 INFO mapred.JobClient: File Input Format Counters
14/01/22 10:44:49 INFO mapred.JobClient: Bytes Read=202
14/01/22 10:44:49 INFO mapred.JobClient: FileSystemCounters
14/01/22 10:44:49 INFO mapred.JobClient: FILE_BYTES_READ=568
14/01/22 10:44:49 INFO mapred.JobClient: HDFS_BYTES_READ=404
14/01/22 10:44:49 INFO mapred.JobClient: FILE_BYTES_WRITTEN=136400
14/01/22 10:44:49 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=146
14/01/22 10:44:49 INFO mapred.JobClient: Map-Reduce Framework
14/01/22 10:44:49 INFO mapred.JobClient: Reduce input groups=4
14/01/22 10:44:49 INFO mapred.JobClient: Map output materialized bytes=198
14/01/22 10:44:49 INFO mapred.JobClient: Combine output records=0
14/01/22 10:44:49 INFO mapred.JobClient: Map input records=10
14/01/22 10:44:49 INFO mapred.JobClient: Reduce shuffle bytes=0
14/01/22 10:44:49 INFO mapred.JobClient: Physical memory (bytes) snapshot=0
14/01/22 10:44:49 INFO mapred.JobClient: Reduce output records=4
14/01/22 10:44:49 INFO mapred.JobClient: Spilled Records=18
14/01/22 10:44:49 INFO mapred.JobClient: Map output bytes=174
14/01/22 10:44:49 INFO mapred.JobClient: Total committed heap usage (bytes)=340262912
14/01/22 10:44:49 INFO mapred.JobClient: CPU time spent (ms)=0
14/01/22 10:44:49 INFO mapred.JobClient: Virtual memory (bytes) snapshot=0
14/01/22 10:44:49 INFO mapred.JobClient: SPLIT_RAW_BYTES=119
14/01/22 10:44:49 INFO mapred.JobClient: Map output records=9
14/01/22 10:44:49 INFO mapred.JobClient: Combine input records=0
14/01/22 10:44:49 INFO mapred.JobClient: Reduce input records=9
运行结束后,你可以通过你设置的路径找到输出的文件,这里可以查看输出的文件;同样你也可以在eclipse中,如下图刷新DFS目录,就可以看到input和output两个目录,目录下的文件也可以在eclipse中打开查阅。测试截图:
Copyright?BUAA