Hadoop上路-03_Hadoop JavaAPI

一。Eclipse安装

1.下载解压

下载:http://www.eclipse.org/downloads/  

解压:SHELL$ sudo tar -zxvf eclipse.tar.gz  

 

2.快捷方式

右键Ubuntu桌面,创建启动器

 Hadoop上路-03_Hadoop JavaAPI

 

3.创建一个JavaProject

 Hadoop上路-03_Hadoop JavaAPI

 

4.添加必须jar

全部jar都可以在%Hadoop安装目录%/share/hadoop目录中找到。

 Hadoop上路-03_Hadoop JavaAPI

 

 

二。基本操作

这里仅限FileSystem中的方法,其数量繁多,具体查看API

1.遍历目录和文件 listStatus

Hadoop上路-03_Hadoop JavaAPI
package hadooptest;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;

public class HdfsTest {

    private static FileSystem hdfs;
    
    @Test
    public void test() throws Exception {
        // 1.创建配置器 
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
        conf.set("mapred.jop.tracker", "192.168.1.240:9001");
        // 2.创建文件系统 
        hdfs = FileSystem.get(conf);
        // 3.遍历HDFS上的文件和目录 
        FileStatus[] fs = hdfs.listStatus(new Path("hdfs:/")); 
        if (fs.length > 0) { 
            for (FileStatus f : fs) { 
                showDir(f);
            }
        }
    }

    private static void showDir(FileStatus fs) throws Exception {
        Path path = fs.getPath();
        System.out.println(path);
        // 如果是目录
        //if (fs.isDir()) {    //已过期
        if (fs.isDirectory()) {
            FileStatus[] f = hdfs.listStatus(path);
            if (f.length > 0) {
                for (FileStatus file : f) {
                    showDir(file);
                }
            }
        }
    }
}
Hadoop上路-03_Hadoop JavaAPI

  Hadoop上路-03_Hadoop JavaAPI

 

2.遍历文件 listFiles

Hadoop上路-03_Hadoop JavaAPI
    @Test
    public void test() throws Exception {
        // 1.配置器
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
        conf.set("mapred.jop.tracker", "192.168.1.240:9001");
        // 2.文件系统
        hdfs = FileSystem.get(conf);
        // 3.遍历HDFS上的文件
        RemoteIterator<LocatedFileStatus> fs = hdfs.listFiles(new Path("hdfs:/"), true);
        while(fs.hasNext()){
            System.out.println(fs.next());
        }
    }
Hadoop上路-03_Hadoop JavaAPI

  Hadoop上路-03_Hadoop JavaAPI

 

3.判断存在 exists

Hadoop上路-03_Hadoop JavaAPI
    @Test
    public void test() throws Exception {
        // 1.创建配置器  
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
        conf.set("mapred.jop.tracker", "192.168.1.240:9001");
        //2.创建文件系统  
        FileSystem hdfs = FileSystem.get(conf);  
        //3.创建可供hadoop使用的文件系统路径
        Path file = new Path("hdfs:/test.txt");  
        // 4.判断文件是否存在(文件目标路径)    
        System.out.println("文件存在:" + hdfs.exists(file));
    }
Hadoop上路-03_Hadoop JavaAPI

 

4.判断目录/文件 isDirectory/isFile

Hadoop上路-03_Hadoop JavaAPI
package hadooptest;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;

public class HdfsTest {
    private static FileSystem hdfs;

    @Test
    public void test() throws Exception {
        // 1.配置器
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
        conf.set("mapred.jop.tracker", "192.168.1.240:9001");
        // 2.文件系统
        hdfs = FileSystem.get(conf);
        // 3.遍历HDFS上目前拥有的文件和目录
        FileStatus[] fs = hdfs.listStatus(new Path("hdfs:/"));
        if (fs.length > 0) {
            for (FileStatus f : fs) {
                showDir(f);
            }
        }
        else{
            System.out.println("没什么好遍历的...");
        }
    }

    private static void showDir(FileStatus fs) throws Exception {
        Path path = fs.getPath();
        // 如果是目录
        if (fs.isDirectory()) {
            
            System.out.println("目录:" + path);
            
            FileStatus[] f = hdfs.listStatus(path);
            if (f.length > 0) {
                for (FileStatus file : f) {
                    showDir(file);
                }
            }
        } else {
            System.out.println("文件:" + path);
        }
    }
}
Hadoop上路-03_Hadoop JavaAPI

  Hadoop上路-03_Hadoop JavaAPI

 

5.最后修改时间 getModificationTime

Hadoop上路-03_Hadoop JavaAPI
package hadooptest;

import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;

public class HdfsTest2 {
    private static FileSystem hdfs;

    @Test
    public void test() throws Exception {
        // 1.创建配置器
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
        conf.set("mapred.jop.tracker", "192.168.1.240:9001");
        // 2.创建文件系统(指定为HDFS文件系统到URI)
        hdfs = FileSystem.get(conf);
        // 3.列出HDFS上目前拥有的文件和目录
        FileStatus[] fs = hdfs.listStatus(new Path("hdfs:/"));
        if(fs.length>0){
            for (FileStatus f : fs) {
                showDir(f);
            }
        }
    }

    private static void showDir(FileStatus fs) throws Exception {
        Path path = fs.getPath(); 
        //获取最后修改时间
        long time = fs.getModificationTime();   
        System.out.println("HDFS文件的最后修改时间:"+new Date(time));   
        System.out.println(path);
        if (fs.isDirectory()) {
            FileStatus[] f = hdfs.listStatus(path);
            if(f.length>0){
                for (FileStatus file : f) {
                    showDir(file);
                }
            }
        }
    }
}
Hadoop上路-03_Hadoop JavaAPI

  Hadoop上路-03_Hadoop JavaAPI

 

6.文件备份状态 getFileBlockLocations

Hadoop上路-03_Hadoop JavaAPI
package hadooptest;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;

public class HdfsTest2 { 

    @Test
    public void test() throws Exception {
        //1.配置器
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
        conf.set("mapred.jop.tracker", "192.168.1.240:9001");
        //2.文件系统
        FileSystem fs = FileSystem.get(conf);
        //3.已存在的,必须是文件
        Path path = new Path("hdfs:/vigiles/dir/test3.txt");
        //4.文件状态
        FileStatus status = fs.getFileStatus(path);
        //5.文件块
        //BlockLocation[] blockLocations = fs.getFileBlockLocations(status, 0, status.getLen());    //方法1,传入文件的FileStatus
        BlockLocation[] blockLocations = fs.getFileBlockLocations(path, 0, status.getLen());    //方法2,传入文件的Path 
        int blockLen = blockLocations.length;
        System.err.println("块数量:"+blockLen);    //如果文件不够大,就不会分块,即得到1
        for (int i = 0; i < blockLen; i++) {
            //得到块文件大小
            long sizes = blockLocations[i].getLength();
            System.err.println("块大小:"+sizes);
            
            //按照备份数量得到全部主机名
            String[] hosts = blockLocations[i].getHosts();
            for (String host : hosts) {
                System.err.println("主机名:"+host);
            }
            
            //按照备份数量得到全部主机名
            String[] names = blockLocations[i].getNames();
            for (String name : names) {
                System.err.println("IP:"+ name);
            }
        }
    }
}
Hadoop上路-03_Hadoop JavaAPI

  Hadoop上路-03_Hadoop JavaAPI

 

7.读取文件 open

Hadoop上路-03_Hadoop JavaAPI
package hadooptest;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;

public class HdfsTest2 {

    @Test
    public void test() throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
        conf.set("mapred.jop.tracker", "192.168.1.240:9001");
        FileSystem fs = FileSystem.get(conf);
        Path path = new Path("hdfs:/vigiles/dir/test3.txt");
        FSDataInputStream is = fs.open(path);
        FileStatus stat = fs.getFileStatus(path);
        byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))];
        is.readFully(0, buffer);
        is.close();
        fs.close();
        System.out.println(new String(buffer));
    }
}
Hadoop上路-03_Hadoop JavaAPI

 

8.复制上传文件 copyFromLocalFile

Hadoop上路-03_Hadoop JavaAPI
@Test
    public void test() throws Exception {
        // 1.创建配置器  
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
        conf.set("mapred.jop.tracker", "192.168.1.240:9001");
        //2.创建文件系统  
        FileSystem hdfs = FileSystem.get(conf);  
        //3.创建可供hadoop使用的文件系统路径  
        Path src = new Path("file:/home/hadoop/桌面/copy_test.txt"); //本地目录/文件  
        Path dst = new Path("hdfs:/");  //目标目录/文件 
        // 4.拷贝本地文件上传(本地文件,目标路径)  
        hdfs.copyFromLocalFile(src, dst);  
        System.out.println("文件上传成功至:" + conf.get("fs.default.name"));  
        // 5.列出HDFS上的文件  
        FileStatus[] fs = hdfs.listStatus(dst);  
        for (FileStatus f : fs) {   
            System.out.println(f.getPath());  
        }
        
        Path path = new Path("hdfs:/copy_test.txt");
        FSDataInputStream is = hdfs.open(path);
        FileStatus stat = hdfs.getFileStatus(path);
        byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))];
        is.readFully(0, buffer);
        is.close();
        hdfs.close();
        System.out.println("文件内容:" + new String(buffer));
    }
Hadoop上路-03_Hadoop JavaAPI

  Hadoop上路-03_Hadoop JavaAPI

另:移动上传moveFromLocalFile,和copyFromLocalFile类似,但其操作后源文件将不存在。

 

9.复制下载文件 copyToLocalFile

Hadoop上路-03_Hadoop JavaAPI
    @Test
    public void test() throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
        conf.set("mapred.jop.tracker", "192.168.1.240:9001");
        FileSystem hdfs = FileSystem.get(conf);  
        //创建HDFS源路径和本地目标路径
        Path src = new Path("hdfs:/copy_test.txt");  //目标目录/文件 
        Path dst = new Path("file:/home/hadoop/桌面/new.txt"); //本地目录/文件  
        //拷贝本地文件上传(本地文件,目标路径)  
        hdfs.copyToLocalFile(src, dst);  
    }
Hadoop上路-03_Hadoop JavaAPI

 另:moveToLocalFile,其操作后源文件将不存在。

 

10.创建目录 mkdirs

Hadoop上路-03_Hadoop JavaAPI
    @Test
    public void test() throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
        conf.set("mapred.jop.tracker", "192.168.1.240:9001");
        FileSystem hdfs = FileSystem.get(conf);
        //创建目录
        hdfs.mkdirs(new Path("hdfs:/eminem"));
    }
Hadoop上路-03_Hadoop JavaAPI

 

11.创建目录/文件 create

Hadoop上路-03_Hadoop JavaAPI
  @Test
    public void test() throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
        conf.set("mapred.jop.tracker", "192.168.1.240:9001");
        FileSystem hdfs = FileSystem.get(conf);
        
        // 使用HDFS数据输出流(写)对象 在HDSF上根目录创建一个文件夹,其内再创建文件
        FSDataOutputStream out = hdfs.create(new Path("hdfs:/vigiles/eminem.txt"));
        // 在文件中写入一行数据,必须使用UTF-8
        out.write("痞子阿姆,Hello !".getBytes("UTF-8"));
        
        out = hdfs.create(new Path("/vigiles/alizee.txt"));
        out.write("艾莉婕,Hello !".getBytes("UTF-8"));
        
        out.close();
        
        FSDataInputStream is = hdfs.open(new Path("hdfs:/vigiles/alizee.txt"));
        FileStatus stat = hdfs.getFileStatus(new Path("hdfs:/vigiles/alizee.txt"));
        byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))];
        is.readFully(0, buffer);
        is.close();
        hdfs.close();
        System.out.println(new String(buffer));
    }
Hadoop上路-03_Hadoop JavaAPI

 

12.创建空文件 createNewFile

Hadoop上路-03_Hadoop JavaAPI
    @Test
    public void test() throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
        conf.set("mapred.jop.tracker", "192.168.1.240:9001");
        FileSystem hdfs = FileSystem.get(conf);
        //创建空文件
        hdfs.createNewFile(new Path("hdfs:/newfile.txt"));
    }
Hadoop上路-03_Hadoop JavaAPI

 

13.写入文件 append

Hadoop上路-03_Hadoop JavaAPI
    @Test
    public void test() throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
        conf.set("mapred.jop.tracker", "192.168.1.240:9001");
        FileSystem hdfs = FileSystem.get(conf);
        //创建空文件
        FSDataOutputStream out = hdfs.append(new Path("hdfs:/newfile.txt"));
        out.write("使用append方法写入文件\n".getBytes("UTF-8"));
        out.close();
        
        out = hdfs.append(new Path("/newfile.txt"));
        out.write("再次写入!!!\n".getBytes("UTF-8"));
        out.close();
    }
Hadoop上路-03_Hadoop JavaAPI

 

14.重命名文件 rename

Hadoop上路-03_Hadoop JavaAPI
    @Test
    public void test() throws Exception {
        Configuration conf = new Configuration();

        conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
        conf.set("mapred.jop.tracker", "192.168.1.240:9001");
        FileSystem fs = FileSystem.get(conf);
        //重命名:fs.rename(源文件,新文件)
        boolean rename = fs.rename(new Path("/copy_test.txt"), new Path("/copy.txt"));
        System.out.println(rename);
    }
Hadoop上路-03_Hadoop JavaAPI

 

15.删除文件 delete

Hadoop上路-03_Hadoop JavaAPI
    @Test
    public void test() throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
        conf.set("mapred.jop.tracker", "192.168.1.240:9001");
        FileSystem fs = FileSystem.get(conf);
        //判断删除(路径,true。false=非空时不删除,抛RemoteException、IOException异常)
        boolean delete = fs.delete(new Path("hdfs:/test.txt"), true);
        System.out.println("执行删除:"+delete);
        //FileSystem关闭时执行
        boolean exit = fs.deleteOnExit(new Path("/out.txt"));
        System.out.println("执行删除:"+exit);
        fs.close();
    }
Hadoop上路-03_Hadoop JavaAPI

 

 

三。MapReduce常用算法

 Hadoop上路-03_Hadoop JavaAPI

1.计数

1)数据准备

 Hadoop上路-03_Hadoop JavaAPI

2)代码

Hadoop上路-03_Hadoop JavaAPI
  1 package hadooptest;
  2 
  3 import java.io.IOException;
  4 import java.util.StringTokenizer;
  5 import org.apache.hadoop.conf.Configuration;
  6 import org.apache.hadoop.fs.Path;
  7 import org.apache.hadoop.io.IntWritable;
  8 import org.apache.hadoop.io.Text;
  9 import org.apache.hadoop.mapreduce.Job;
 10 import org.apache.hadoop.mapreduce.Mapper;
 11 import org.apache.hadoop.mapreduce.Reducer;
 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 14 
 15 /*
 16  * 单词计数
 17  */
 18 public class WordCount {
 19     
 20     /*
 21      * 先经过mapper运算,然后才是reducer。
 22      * 内部类:映射器 Mapper<Key_IN, Value_IN, Key_OUT, Value_OUT>
 23      */
 24     public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> {
 25     
 26         //计数,查到一个就占个坑
 27         private static final IntWritable one = new IntWritable(1);
 28         //文本
 29         private Text word = new Text();
 30 
 31         /** 
 32          * 重写map方法,实现理想效果
 33          * MyMapper的实例只有一个,但实例的这个map方法却一直在执行
 34          * Key1:文本行号。Value1:指定行的文本。context:上下文对象
 35          * 这里K1、V1像这样[K,V]
 36         **/
 37         public void map(Object Key1, Text Value1, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
 38             //拆分字符串,返回单词集合。默认以空格拆分
 39             StringTokenizer itr = new StringTokenizer(Value1.toString());
 40             //遍历一行的全部单词
 41             while (itr.hasMoreTokens()) {
 42                 //将文本转为临时Text变量
 43                 this.word.set(itr.nextToken());
 44                 //将单词保存到上下文对象中(单词,占坑),输出
 45                 context.write(this.word, one);
 46             }
 47         }
 48     }
 49 
 50     /************************************************************************
 51      *  在Mapper后,Reducer前,有个shuffle过程,会根据k2将对应的v2归并为v2[...]  *
 52      *************************************************************************/
 53     
 54     /*
 55      * mapper结束后,执行现在的reducer。
 56      * 内部类:拆分器 Reducer<Key_IN, Value_IN, Key_OUT, Value_OUT>
 57      */
 58     public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
 59     
 60         //个数统计
 61         private IntWritable result = new IntWritable();
 62         
 63         /** 
 64          * 重写reduce方法,实现理想效果
 65          * MyReducer的实例也只有一个,但实例的这个reduce方法却一直在执行
 66          * Key2:单词。Values2:value的集合,也就是[1,1,1,...]。context:上下文对象
 67          * 这里这里K2、V2像这样[K,V[1,1,1,...]]
 68         **/
 69         public void reduce(Text Key2, Iterable<IntWritable> Values2, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
 70             int sum = 0;
 71             //累加V2的元素,有多少个 1 ,即有多少个指定单词
 72             for (IntWritable val : Values2) {
 73                 sum += val.get();
 74             }
 75             this.result.set(sum);
 76             //终于将单词和总个数再次输出
 77             context.write(Key2, this.result);
 78         }
 79     }
 80     
 81     public static void main(String[] args) throws Exception {
 82          // 声明配置信息
 83         Configuration conf = new Configuration();
 84         conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
 85          // 创建作业
 86         Job job = new Job(conf, "word count");
 87         job.setJarByClass(WordCount.class);
 88          // 设置mr
 89         job.setMapperClass(MyMapper.class);
 90         job.setReducerClass(MyReducer.class);
 91          // 设置输出类型,和Context上下文对象write的参数类型一致
 92         job.setOutputKeyClass(Text.class);
 93         job.setOutputValueClass(IntWritable.class);
 94          // 设置输入输出路径
 95         FileInputFormat.addInputPath(job, new Path("hdfs:/input"));    //文件已经存在 
 96         FileOutputFormat.setOutputPath(job, new Path("hdfs:/output"));    //尚未存在 
 97          // 执行
 98         System.exit(job.waitForCompletion(true) ? 0 : 1);
 99     }
100 }
Hadoop上路-03_Hadoop JavaAPI

 

3)结果

 Hadoop上路-03_Hadoop JavaAPI

 

2.排序

1)数据准备

 Hadoop上路-03_Hadoop JavaAPI

2)代码

Hadoop上路-03_Hadoop JavaAPI
 1 package hadooptest;
 2 
 3 import *
 4 
 5 //hadoop默认排序:
 6 //如果k2、v2类型是Text-文本,结果是按照字典顺序
 7 //如果k2、v2类型是LongWritable-数字,结果是按照数字大小顺序
 8 
 9 public class TestSort {
10     /**
11      * 内部类:映射器 Mapper<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
12      */
13     public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> {
14         /**
15          * 重写map方法
16          */
17         public void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
18             //这里v1转为k2-数字类型,舍弃k1。null为v2
19             context.write(new LongWritable(Long.parseLong(v1.toString())), NullWritable.get());
20             //因为v1可能重复,这时,k2也是可能有重复的
21         }
22     }
23 
24     /*** 在此方法执行前,有个shuffle过程,会根据k2将对应的v2归并为v2[...] ***/
25 
26     /**
27      * 内部类:拆分器 Reducer<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
28      */
29     public static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
30         /**
31          * 重写reduce方法
32          */
33         protected void reduce(LongWritable k2, Iterable<NullWritable> v2, Context context) throws IOException, InterruptedException {
34             //k2=>k3, v2[...]舍弃。null => v3
35             context.write(k2, NullWritable.get());
36             //此时,k3如果发生重复,根据默认算法会发生覆盖,即最终仅保存一个k3
37         }
38     }
39 
40     public static void main(String[] args) throws Exception {
41         // 声明配置信息
42         Configuration conf = new Configuration();
43         conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
44 
45         // 创建作业
46         Job job = new Job(conf, "Test Sort");
47         job.setJarByClass(TestSort.class);
48 
49         // 设置mr
50         job.setMapperClass(MyMapper.class);
51         job.setReducerClass(MyReducer.class);
52 
53         // 设置输出类型,和Context上下文对象write的参数类型一致
54         job.setOutputKeyClass(LongWritable.class);
55         job.setOutputValueClass(NullWritable.class);
56 
57         // 设置输入输出路径
58         FileInputFormat.setInputPaths(job, new Path("/input/"));
59         FileOutputFormat.setOutputPath(job, new Path("/out"));
60 
61         // 执行
62         System.exit(job.waitForCompletion(true) ? 0 : 1);
63     }
64 }
Hadoop上路-03_Hadoop JavaAPI

 

3)结果

 Hadoop上路-03_Hadoop JavaAPI

 

3.去重

Hadoop上路-03_Hadoop JavaAPI
 1     /*
 2      * 内部类:映射器 Mapper<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
 3      */
 4     public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> {
 5         /****
 6          * 重写map方法
 7         ****/
 8         public void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
 9             //因为我们读入的数据就是一行一个数字,直接使用
10             //这个数字有几个都无所谓,只有知道有这么一个数字即可,所以输出的v2为null
11             context.write(new LongWritable(Long.parseLong(v1.toString())), NullWritable.get());
12         }
13     }
14     
15     /** 在此方法执行前,有个shuffle过程,会根据k2将对应的v2归并为v2[...] **/
16 
17     /*
18      * 内部类:拆分器 Reducer<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
19      */
20     public static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
21         /****
22          * 重写reduce方法
23         ****/
24         protected void reduce(LongWritable k2, Iterable<NullWritable> v2, Context context) throws IOException, InterruptedException {
25             //此时,k3(即眼前的k2)如果发生重复,根据默认算法会发生覆盖,即最终仅保存一个k3,达到去重到效果,而v3是null无所谓
26             context.write(k2, NullWritable.get());
27 
28         }
29     }
Hadoop上路-03_Hadoop JavaAPI

 

4.过滤

Hadoop上路-03_Hadoop JavaAPI
 1     /*
 2      * 内部类:映射器 Mapper<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
 3      */
 4     public static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
 5         String tmp = "8238";
 6         
 7         /**
 8          * 重写map方法。k1:行首字符索引,v1:这一行文本
 9         **/
10         protected void map(LongWritable k1, Text v1, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException ,InterruptedException {
11             System.out.println(v1+", "+tmp);
12             //如果行文本是指定值,过滤之
13             if(v1.toString().equals(tmp)){
14                 System.out.println("有了");
15                 //保存(按照泛型限制,k2是Text,v2是Nullritable)
16                 context.write(v1, NullWritable.get());
17             }
18         }
19     }
20 
21     /*
22      * 内部类:拆分器 Reducer<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
23      */
24     public static class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
25         /**
26          * 重写reduce方法
27         **/
28         protected void reduce(Text k2, Iterable<NullWritable> v2, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException ,InterruptedException {
29             context.write(k2, NullWritable.get());
30         }
31     }
Hadoop上路-03_Hadoop JavaAPI

 如果报错:

  Error: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.LongWritable, received org.apache.hadoop.io.Text
一定要检查main方法里:

Hadoop上路-03_Hadoop JavaAPI
// 设置输出类型,和Context上下文对象write的参数类型一致
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
Hadoop上路-03_Hadoop JavaAPI

 

5.TopN

1)数值最大

Hadoop上路-03_Hadoop JavaAPI
 1     // map(泛型定义了输入和输出类型)
 2     public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> {
 3 
 4         // 首先创建一个临时变量,保存一个可存储的最小值:Long.MIN_VALUE=-9223372036854775808
 5         long temp = Long.MIN_VALUE;
 6 
 7         // 找出最大值。这个map不断迭代v1,最终保存最大值
 8         protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
 9             
10             // 将文本转数值
11             long val = Long.parseLong(v1.toString());
12             // 如果v1比临时变量大,则保存v1的值
13             if (temp < val) {
14                 temp = val;
15             }
16         }
17 
18         /** ---此方法在全部的map任务结束后执行一次。这时仅输出临时变量到最大值--- **/
19         protected void cleanup(Context context) throws IOException, InterruptedException {
20             context.write(new LongWritable(temp), NullWritable.get());
21             System.out.println("文件读取完毕,保存最大值");    //输出两次,对应两个文本文件
22         }
23     }
24 
25     // reduce
26     public static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
27         // 临时变量
28         Long temp = Long.MIN_VALUE;
29 
30         // 因为一个文件得到一个最大值,我们有两个txt文件会得到两个值。再次将这些值比对,得到最大的
31         protected void reduce(LongWritable k2, Iterable<NullWritable> v2, Context context) throws IOException, InterruptedException {
32 
33             long val = Long.parseLong(k2.toString());
34             // 如果k2比临时变量大,则保存k2的值
35             if (temp < val) {
36                 temp = val;
37             }
38         }
39 
40         /** !!!此方法在全部的reduce任务结束后执行一次。这时仅输出唯一最大值!!! **/
41         protected void cleanup(Context context) throws IOException, InterruptedException {
42             context.write(new LongWritable(temp), NullWritable.get());
43         }
44     }
Hadoop上路-03_Hadoop JavaAPI

 

2)数值前5

Hadoop上路-03_Hadoop JavaAPI
 1     // map
 2     public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> {
 3 
 4         // 首先创建一个临时变量,保存一个可存储的最小值:Long.MIN_VALUE=-9223372036854775808
 5         long temp = Long.MIN_VALUE;
 6         // Top5存储空间,我们取前5个
 7         long[] tops;
 8 
 9         /** 这个方法在run中调用,在全部map之前执行一次 **/
10         protected void setup(Context context) {
11             // 初始化数组长度为5
12             tops = new long[5];
13         }
14 
15         // 找出最大值
16         protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
17             
18             // 将文本转数值
19             final long val = Long.parseLong(v1.toString());
20             // 保存在0索引
21             tops[0] = val;
22             // 排序后最大值在最后一个索引,这样从[5]到[0]依次减小。每执行一次map,最小的[0]都会赋予新值
23             Arrays.sort(tops);
24         }
25 
26         /** ---此方法在全部到map任务结束后执行一次。输出map后得到的前5个最大值--- **/
27         protected void cleanup(Context context) throws IOException, InterruptedException {
28             for (int i = 0; i < tops.length; i++) {
29                 context.write(new LongWritable(tops[i]), NullWritable.get());
30             }
31         }
32     }
33 
34     // reduce
35     public static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
36         Long temp = Long.MIN_VALUE;
37         long[] tops;
38 
39         /** 次方法在run中调用,在全部map之前执行一次 **/
40         protected void setup(Context context) {
41             tops = new long[5];
42         }
43 
44         // 因为每个文件都得到5个值,再次将这些值比对,得到最大的
45         protected void reduce(LongWritable k2, Iterable<NullWritable> v2, Context context) throws IOException, InterruptedException {
46             long top = Long.parseLong(k2.toString());
47             tops[0] = top;
48             Arrays.sort(tops);
49         }
50 
51         /** ---此方法在全部到reduce任务结束后执行一次--- **/
52         protected void cleanup(Context context) throws IOException, InterruptedException {
53             for (int i = 0; i < tops.length; i++) {
54                 context.write(new LongWritable(tops[i]), NullWritable.get());
55             }
56         }
57     }
Hadoop上路-03_Hadoop JavaAPI

 

3)数量最大

 Hadoop上路-03_Hadoop JavaAPI

Hadoop上路-03_Hadoop JavaAPI
    public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> {

        private static final IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object Key1, Text Value1, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            String[] strings = Value1.toString().split(" ");
            for (String str : strings) {
                this.word.set(str);
                context.write(this.word, one);
            }
        }
    }

    public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

        //临时变量,保存最大数量的单词
        private String keyer;    //注意这里不能用Hadoop的类型,如Text 
        private IntWritable valer;    //这里最好也是基本的java数据类型,如int
        //计数
        private Integer temp = Integer.MIN_VALUE;

        public void reduce(Text Key2, Iterable<IntWritable> Values2, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            int sum = 0;
            //统计数量
            for (IntWritable val : Values2) {
                sum += val.get();
            }
            //保存最大数量值
            if (sum > temp) {
                temp = sum;

                keyer = Key2.toString();
                valer = new IntWritable(temp);
            }
        }

        //最终输出最大数量的单词
        protected void cleanup(Context context) throws IOException, InterruptedException {
            context.write(new Text(keyer), valer);
        }
    }
Hadoop上路-03_Hadoop JavaAPI

 

6.单表关联

 Hadoop上路-03_Hadoop JavaAPI

Hadoop上路-03_Hadoop JavaAPI
    /*
        父 子
        子 孙
        1 2
        2 3
        A B
        B C
     */
    // map
    public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
        // 拆分原始数据
        protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
            // 按制表符拆分记录。一行拆出两个角色
            String[] splits = v1.toString().split(" ");
            //针对无意义的换行过滤
            if (splits.length > 1) {
                // 把“父”作为k2;“子“加下划线区分,作为v2
                context.write(new Text(splits[0]), new Text("_" + splits[1]));
                
                // 把“子”作为k2;“父”辈作为v2。就是把原两个单词调换位置保存
                context.write(new Text(splits[1]), new Text(splits[0]));
            }
        }

        /**
         * 父 _子
         * 子 父
         * 
         * 子 _孙
         * 孙 子
         **/
    }
    
    /**
     * k2 v2[...]
     * 子 [父,_孙]
     **/

    // reduce
    public static class MyReducer extends Reducer<Text, Text, Text, Text> {
        // 拆分k2v2[...]数据
        protected void reduce(Text k2, Iterable<Text> v2, Context context) throws IOException, InterruptedException {
            String grandson = ""; // “孙” 
            String grandfather = ""; // “父” 

            // 从迭代中遍历v2[...]
            for (Text man : v2) {
                String p = man.toString();
                System.out.println("得到:" + p);
                // 如果单词是以下划线开始的
                if (p.startsWith("_")) {
                    grandson = p.substring(1);
                }
                // 如果单词没有下划线起始
                else {
                    // 直接赋值给孙辈变量
                    grandfather = p;
                }
            }

            // 在得到有效数据的情况下
            if (grandson != "" && grandfather != "") {
                // 写出得到的结果。
                context.write(new Text(grandson), new Text(grandfather));
            }

            /**
             * k3=父,v3=孙
             **/
        }
    }
Hadoop上路-03_Hadoop JavaAPI

  Hadoop上路-03_Hadoop JavaAPI

 

7.双表关联

 Hadoop上路-03_Hadoop JavaAPI

Hadoop上路-03_Hadoop JavaAPI
// map
    public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
        // 拆分原始数据
        protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
            // 拆分记录
            String[] splited = v1.toString().split(" ");
            // 如果第一列是数字(使用正则判断),就是歌曲表。先读入那个文件由hadoop决定
            if (splited[0].matches("^[-+]?(([0-9]+)([.]([0-9]+))?|([.]([0-9]+))?)$")) {
                String id = splited[0];
                String song = splited[1];
                //v2加两条下划线作为前缀标识为歌曲
                context.write(new Text(id), new Text("__" + song));
            }
            // 否则就是歌手表
            else {
                String singer = splited[0];
                String id = splited[1];
                //v2-加两条横线作为前缀标识为歌手
                context.write(new Text(id), new Text("--" + singer));
            }
            /**
             * 1 __Eminem 1 --LoseYourself
             **/
        }
    }

    // reduce
    public static class MyReducer extends Reducer<Text, Text, Text, Text> {
        // 拆分k2v2[...]数据
        protected void reduce(Text k2, Iterable<Text> v2, Context context) throws IOException, InterruptedException {
            String song = ""; // 歌曲
            String singer = ""; // 歌手
            /**
             * 1, [__Eminem, --LoseYourself]
             **/
            for (Text text : v2) {
                String tmp = text.toString();

                if (tmp.startsWith("__")) {
                    // 如果是__开头的是song
                    song = tmp.substring(2); // 从索引2开始截取字符串
                }
                if (tmp.startsWith("--")) {
                    // 如果是--开头的是歌手
                    singer = tmp.substring(2);
                }
            }
            context.write(new Text(singer), new Text(song));
        }
        /**
         * k3=Eminem,v3=LoseYourself
         *
        
        Eminem    LoseYourself
        Alizee    LaIslaBonita
        Michael    YouAreNotAlone
        Manson    FuckFrankie

         *
         **/
    }
Hadoop上路-03_Hadoop JavaAPI

 

- end

Hadoop上路-03_Hadoop JavaAPI,布布扣,bubuko.com

Hadoop上路-03_Hadoop JavaAPI

上一篇:js判断当前网络状态


下一篇:栈的图文解析 和 对应3种语言的实现(C/C++/Java)