[DB] MapReduce 例题

例题

  • 词频统计(word count)
    • 一篇文章用哈希表统计即可
    • 对互联网所有网页的词频进行统计(Google搜索引擎的需求),无法将所有网页读入内存
    • 过程
      • map:将单词提取出来,对每个单词输入一个<word,1>这样的<k,v>对,进而将相同的数据放在一起,形成<word,<1,1,1,...>>这样的<k,v集合>
      • reduce:将集合里的1求和,再将单词和这个和组成<word,sum>输出
    • 一个map函数仅对一个HDFS数据块上的数据进行计算,从而实现大数据的分布式计算
    • 在分布式集群中调度执行MapReduce程序的计算框架也叫MapReduce
  • 求部门工资总额
    • SQL:select deptno,sum(sal) from emp gruop by deptno;
    • 分析数据类型,套用模板重写map()、reduce()
    • 导出jar包,指定main class
    • 把数据保存在hdfs中
    • hadoop jar s1.jar /input/emp.csv /output/0910/s1

[DB] MapReduce 例题

[DB] MapReduce 例题

SalaryTotalMapper.java

[DB] MapReduce 例题View Code

SalaryTotalReducer.java

[DB] MapReduce 例题View Code

SalaryTotalMain.java

[DB] MapReduce 例题View Code
  • 数据去重
    • select distinct depno, job from emp;
    • 用MR实现
    • 只有Mapper没有Reducer(排序)

DistinctMain.java

[DB] MapReduce 例题View Code

DistinctMapper.java

[DB] MapReduce 例题View Code

DistinctReducer.java

[DB] MapReduce 例题View Code
  • 多表查询
    • select dname,ename from dept, emp where emp.deptno=dept.deptno;
    • 关系型数据库的子查询会转换成多表查询(通过执行计划看SQL语句的执行过程和效率)
    • 笛卡尔积:列数相加,行数相乘,得到全集
    • 用连接条件(如 emp.deptno=dept.deptno)去掉全集中的错误数据
    • 连接条件至少N-1个(N为表的个数)
    • 根据连接条件不同,分为:
      • 等值连接 / 不等值连接
      • 外连接 / 自连接

[DB] MapReduce 例题  [DB] MapReduce 例题

[DB] MapReduce 例题

    • 通过MR实现连接

EqualJoinMain.java

[DB] MapReduce 例题View Code

EqualJoinMapper.java

[DB] MapReduce 例题View Code

EqualJoinReducer.java

[DB] MapReduce 例题View Code

[DB] MapReduce 例题

  • 自连接
    • 通过表的别名,将同一张表视为多张表
    • 一个人是下级的老板,同时是上级的员工
    • 同一条数据输出两次,一次作为老板,一次作为员工(看作两张表)
    • 相同的k2,其value2会被同一个reducer处理
    • 存在非法数据需进行清洗(如把大老板的老板编号置为-1)
    • 老板和员工同时存在才输出(员工树的根节点和叶子节点不显示)

[DB] MapReduce 例题

[DB] MapReduce 例题

SelfJoinMain.java

[DB] MapReduce 例题View Code

SelfJoinMapper.java

[DB] MapReduce 例题View Code

SelfJoinReducer.java

[DB] MapReduce 例题View Code

[DB] MapReduce 例题  [DB] MapReduce 例题

  • 倒排索引
    • 关系型数据库的索引  

[DB] MapReduce 例题

    • 数据存储在HDFS后会建立索引,提高查找效率

 [DB] MapReduce 例题

 [DB] MapReduce 例题

  • MR实现倒排索引
    • 记录一个单词在一个文件中出现的次数
    • Combiner对同一文件中重复出现的单词进行求和
    • Reducer对不同文件中出现的单词进行汇总
    • 保证有无Combiner前后数据类型一样

[DB] MapReduce 例题[DB] MapReduce 例题

RevertedIndexMain.java

[DB] MapReduce 例题
 1 import java.io.IOException;
 2 
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.Path;
 5 import org.apache.hadoop.io.IntWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Job;
 8 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 9 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
10 
11 
12 public class RevertedIndexMain {
13 
14     public static void main(String[] args) throws Exception {
15         //1、创建一个任务
16         Job job = Job.getInstance(new Configuration());
17         job.setJarByClass(RevertedIndexMain.class); //任务的入口        
18         
19         //2、指定任务的map和map输出的数据类型
20         job.setMapperClass(RevertedIndexMapper.class);
21         job.setMapOutputKeyClass(Text.class);  //k2的数据类型
22         job.setMapOutputValueClass(Text.class);  //v2的类型
23     
24         //指定任务的Combiner
25         job.setCombinerClass(RevertedIndexCombiner.class);
26         
27         //3、指定任务的reduce和reduce的输出数据的类型
28         job.setReducerClass(RevertedIndexReducer.class);
29         job.setOutputKeyClass(Text.class); //k4的类型
30         job.setOutputValueClass(Text.class); //v4的类型
31         
32         //4、指定任务的输入路径、任务的输出路径
33         FileInputFormat.setInputPaths(job, new Path(args[0]));
34         FileOutputFormat.setOutputPath(job, new Path(args[1]));
35         
36         //5、执行任务
37         job.waitForCompletion(true);
38     }
39 
40 }
View Code

RevertedIndexMapper.java

[DB] MapReduce 例题
 1 import java.io.IOException;
 2 
 3 import org.apache.hadoop.io.LongWritable;
 4 import org.apache.hadoop.io.Text;
 5 import org.apache.hadoop.mapreduce.Mapper;
 6 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 7 
 8 public class RevertedIndexMapper extends Mapper<LongWritable, Text, Text, Text> {
 9 
10     @Override
11     protected void map(LongWritable key1, Text value1, Context context)
12             throws IOException, InterruptedException {
13         //数据:/indexdata/data01.txt
14         //得到对应文件名
15         String path = ((FileSplit)context.getInputSplit()).getPath().toString();
16         
17         //解析出文件名
18         //得到最后一个斜线的位置
19         int index = path.lastIndexOf("/");
20         String fileName = path.substring(index+1);
21         
22         //数据:I love Beijing and love Shanghai
23         String data = value1.toString();
24         String[] words = data.split(" ");
25         
26         //输出
27         for(String word:words){
28             context.write(new Text(word+":"+fileName), new Text("1"));
29         }
30     }
31 }
View Code

RevertedIndexCombiner.java

[DB] MapReduce 例题
 1 import java.io.IOException;
 2 
 3 import org.apache.hadoop.io.Text;
 4 import org.apache.hadoop.mapreduce.Reducer;
 5 
 6 public class RevertedIndexCombiner extends Reducer<Text, Text, Text, Text> {
 7 
 8     @Override
 9     protected void reduce(Text k21, Iterable<Text> v21, Context context)
10             throws IOException, InterruptedException {
11         // 求和:对同一个文件中的单词进行求和
12         int total = 0;
13         for(Text v:v21){
14             total = total + Integer.parseInt(v.toString());
15         }
16         
17         //k21是:love:data01.txt
18         String data = k21.toString();
19         //找到:冒号的位置
20         int index = data.indexOf(":");
21         
22         String word = data.substring(0, index);        //单词
23         String fileName = data.substring(index + 1);   //文件名
24         
25         //输出:
26         context.write(new Text(word), new Text(fileName+":"+total));
27     }
28 }
View Code

RevertedIndexReducer.java

[DB] MapReduce 例题
 1 import java.io.IOException;
 2 
 3 import org.apache.hadoop.io.Text;
 4 import org.apache.hadoop.mapreduce.Reducer;
 5 
 6 public class RevertedIndexReducer extends Reducer<Text, Text, Text, Text> {
 7 
 8     @Override
 9     protected void reduce(Text k3, Iterable<Text> v3, Context context)
10             throws IOException, InterruptedException {
11         String str = "";
12         
13         for(Text t:v3){
14             str = "("+t.toString()+")"+str;
15         }
16         
17         context.write(k3, new Text(str));
18     }
19 
20 }
View Code
  • Hadoop自带例题
    • start-all.sh
    • /root/training/hadoop-2.7.3/share/hadoop/mapreduce
    • hadoop jar hadoop-mapreduce-examples-2.7.3.jar
    • wordcount: A map/reduce program that counts the words in the input files.
    • hadoop jar hadoop-mapreduce-examples-2.7.3.jar wordcount
    • Usage: wordcount <in> [<in>...] <out>
    • hadoop jar hadoop-mapreduce-examples-2.7.3.jar wordcount /input/data.txt /output/0402/wc

[DB] MapReduce 例题

上一篇:[DB] Memcache


下一篇:mysql中获取当前时间的前一天