例题
- 词频统计(word count)
- 一篇文章用哈希表统计即可
- 对互联网所有网页的词频进行统计(Google搜索引擎的需求),无法将所有网页读入内存
- 过程
- map:将单词提取出来,对每个单词输入一个<word,1>这样的<k,v>对,进而将相同的数据放在一起,形成<word,<1,1,1,...>>这样的<k,v集合>
- reduce:将集合里的1求和,再将单词和这个和组成<word,sum>输出
- 一个map函数仅对一个HDFS数据块上的数据进行计算,从而实现大数据的分布式计算
- 在分布式集群中调度执行MapReduce程序的计算框架也叫MapReduce
- 求部门工资总额
- SQL:select deptno,sum(sal) from emp gruop by deptno;
- 分析数据类型,套用模板重写map()、reduce()
- 导出jar包,指定main class
- 把数据保存在hdfs中
- hadoop jar s1.jar /input/emp.csv /output/0910/s1
SalaryTotalMapper.java
View Code
SalaryTotalReducer.java
View Code
SalaryTotalMain.java
View Code
- 数据去重
- select distinct depno, job from emp;
- 用MR实现
- 只有Mapper没有Reducer(排序)
DistinctMain.java
View Code
DistinctMapper.java
View Code
DistinctReducer.java
View Code
- 多表查询
- select dname,ename from dept, emp where emp.deptno=dept.deptno;
- 关系型数据库的子查询会转换成多表查询(通过执行计划看SQL语句的执行过程和效率)
- 笛卡尔积:列数相加,行数相乘,得到全集
- 用连接条件(如 emp.deptno=dept.deptno)去掉全集中的错误数据
- 连接条件至少N-1个(N为表的个数)
- 根据连接条件不同,分为:
- 等值连接 / 不等值连接
- 外连接 / 自连接
-
- 通过MR实现连接
EqualJoinMain.java
View Code
EqualJoinMapper.java
View Code
EqualJoinReducer.java
View Code
- 自连接
- 通过表的别名,将同一张表视为多张表
- 一个人是下级的老板,同时是上级的员工
- 同一条数据输出两次,一次作为老板,一次作为员工(看作两张表)
- 相同的k2,其value2会被同一个reducer处理
- 存在非法数据需进行清洗(如把大老板的老板编号置为-1)
- 老板和员工同时存在才输出(员工树的根节点和叶子节点不显示)
SelfJoinMain.java
View Code
SelfJoinMapper.java
View Code
SelfJoinReducer.java
View Code
- 倒排索引
- 关系型数据库的索引
-
- 数据存储在HDFS后会建立索引,提高查找效率
- MR实现倒排索引
- 记录一个单词在一个文件中出现的次数
- Combiner对同一文件中重复出现的单词进行求和
- Reducer对不同文件中出现的单词进行汇总
- 保证有无Combiner前后数据类型一样
RevertedIndexMain.java
1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Job; 8 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 9 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 10 11 12 public class RevertedIndexMain { 13 14 public static void main(String[] args) throws Exception { 15 //1、创建一个任务 16 Job job = Job.getInstance(new Configuration()); 17 job.setJarByClass(RevertedIndexMain.class); //任务的入口 18 19 //2、指定任务的map和map输出的数据类型 20 job.setMapperClass(RevertedIndexMapper.class); 21 job.setMapOutputKeyClass(Text.class); //k2的数据类型 22 job.setMapOutputValueClass(Text.class); //v2的类型 23 24 //指定任务的Combiner 25 job.setCombinerClass(RevertedIndexCombiner.class); 26 27 //3、指定任务的reduce和reduce的输出数据的类型 28 job.setReducerClass(RevertedIndexReducer.class); 29 job.setOutputKeyClass(Text.class); //k4的类型 30 job.setOutputValueClass(Text.class); //v4的类型 31 32 //4、指定任务的输入路径、任务的输出路径 33 FileInputFormat.setInputPaths(job, new Path(args[0])); 34 FileOutputFormat.setOutputPath(job, new Path(args[1])); 35 36 //5、执行任务 37 job.waitForCompletion(true); 38 } 39 40 }
RevertedIndexMapper.java
1 import java.io.IOException; 2 3 import org.apache.hadoop.io.LongWritable; 4 import org.apache.hadoop.io.Text; 5 import org.apache.hadoop.mapreduce.Mapper; 6 import org.apache.hadoop.mapreduce.lib.input.FileSplit; 7 8 public class RevertedIndexMapper extends Mapper<LongWritable, Text, Text, Text> { 9 10 @Override 11 protected void map(LongWritable key1, Text value1, Context context) 12 throws IOException, InterruptedException { 13 //数据:/indexdata/data01.txt 14 //得到对应文件名 15 String path = ((FileSplit)context.getInputSplit()).getPath().toString(); 16 17 //解析出文件名 18 //得到最后一个斜线的位置 19 int index = path.lastIndexOf("/"); 20 String fileName = path.substring(index+1); 21 22 //数据:I love Beijing and love Shanghai 23 String data = value1.toString(); 24 String[] words = data.split(" "); 25 26 //输出 27 for(String word:words){ 28 context.write(new Text(word+":"+fileName), new Text("1")); 29 } 30 } 31 }
RevertedIndexCombiner.java
1 import java.io.IOException; 2 3 import org.apache.hadoop.io.Text; 4 import org.apache.hadoop.mapreduce.Reducer; 5 6 public class RevertedIndexCombiner extends Reducer<Text, Text, Text, Text> { 7 8 @Override 9 protected void reduce(Text k21, Iterable<Text> v21, Context context) 10 throws IOException, InterruptedException { 11 // 求和:对同一个文件中的单词进行求和 12 int total = 0; 13 for(Text v:v21){ 14 total = total + Integer.parseInt(v.toString()); 15 } 16 17 //k21是:love:data01.txt 18 String data = k21.toString(); 19 //找到:冒号的位置 20 int index = data.indexOf(":"); 21 22 String word = data.substring(0, index); //单词 23 String fileName = data.substring(index + 1); //文件名 24 25 //输出: 26 context.write(new Text(word), new Text(fileName+":"+total)); 27 } 28 }
RevertedIndexReducer.java
1 import java.io.IOException; 2 3 import org.apache.hadoop.io.Text; 4 import org.apache.hadoop.mapreduce.Reducer; 5 6 public class RevertedIndexReducer extends Reducer<Text, Text, Text, Text> { 7 8 @Override 9 protected void reduce(Text k3, Iterable<Text> v3, Context context) 10 throws IOException, InterruptedException { 11 String str = ""; 12 13 for(Text t:v3){ 14 str = "("+t.toString()+")"+str; 15 } 16 17 context.write(k3, new Text(str)); 18 } 19 20 }
- Hadoop自带例题
- start-all.sh
- /root/training/hadoop-2.7.3/share/hadoop/mapreduce
- hadoop jar hadoop-mapreduce-examples-2.7.3.jar
- wordcount: A map/reduce program that counts the words in the input files.
- hadoop jar hadoop-mapreduce-examples-2.7.3.jar wordcount
- Usage: wordcount <in> [<in>...] <out>
- hadoop jar hadoop-mapreduce-examples-2.7.3.jar wordcount /input/data.txt /output/0402/wc