Hadoop上路-03_Hadoop JavaAPI

一。Eclipse安装

1.下载解压

下载:http://www.eclipse.org/downloads/

解压:SHELL$ sudo tar -zxvf eclipse.tar.gz

2.快捷方式

右键Ubuntu桌面,创建启动器

Hadoop上路-03_Hadoop JavaAPI

3.创建一个JavaProject

Hadoop上路-03_Hadoop JavaAPI

4.添加必须jar

全部jar都可以在%Hadoop安装目录%/share/hadoop目录中找到。

Hadoop上路-03_Hadoop JavaAPI

二。基本操作

这里仅限FileSystem中的方法,其数量繁多,具体查看API。

1.遍历目录和文件 listStatus

// 威格灵博客:www.cuiweiyou.com
package com.cuiweiyou.hadooptest;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test; public class HdfsTest { private static FileSystem hdfs; @Test
public void test() throws Exception {
// 1.创建配置器
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
conf.set("mapred.jop.tracker", "192.168.1.240:9001");
// 2.创建文件系统
hdfs = FileSystem.get(conf);
// 3.遍历HDFS上的文件和目录
FileStatus[] fs = hdfs.listStatus(new Path("hdfs:/"));
if (fs.length > 0) {
for (FileStatus f : fs) {
showDir(f);
}
}
} private static void showDir(FileStatus fs) throws Exception {
Path path = fs.getPath();
System.out.println(path);
// 如果是目录
//if (fs.isDir()) { //已过期
if (fs.isDirectory()) {
FileStatus[] f = hdfs.listStatus(path);
if (f.length > 0) {
for (FileStatus file : f) {
showDir(file);
}
}
}
}
}

Hadoop上路-03_Hadoop JavaAPI

2.遍历文件 listFiles

    @Test
public void test() throws Exception {
// 1.配置器
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
conf.set("mapred.jop.tracker", "192.168.1.240:9001");
// 2.文件系统
hdfs = FileSystem.get(conf);
// 3.遍历HDFS上的文件
RemoteIterator<LocatedFileStatus> fs = hdfs.listFiles(new Path("hdfs:/"), true);
while(fs.hasNext()){
System.out.println(fs.next());
}
}

Hadoop上路-03_Hadoop JavaAPI

3.判断存在 exists

    @Test
public void test() throws Exception {
// 1.创建配置器
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
conf.set("mapred.jop.tracker", "192.168.1.240:9001");
//2.创建文件系统
FileSystem hdfs = FileSystem.get(conf);
//3.创建可供hadoop使用的文件系统路径
Path file = new Path("hdfs:/test.txt");
// 4.判断文件是否存在(文件目标路径)
System.out.println("文件存在:" + hdfs.exists(file));
}

4.判断目录/文件 isDirectory/isFile

// 威格灵博客:www.cuiweiyou.com
package com.cuiweiyou.hadooptest; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test; public class HdfsTest {
private static FileSystem hdfs; @Test
public void test() throws Exception {
// 1.配置器
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
conf.set("mapred.jop.tracker", "192.168.1.240:9001");
// 2.文件系统
hdfs = FileSystem.get(conf);
// 3.遍历HDFS上目前拥有的文件和目录
FileStatus[] fs = hdfs.listStatus(new Path("hdfs:/"));
if (fs.length > 0) {
for (FileStatus f : fs) {
showDir(f);
}
}
else{
System.out.println("没什么好遍历的...");
}
} private static void showDir(FileStatus fs) throws Exception {
Path path = fs.getPath();
// 如果是目录
if (fs.isDirectory()) { System.out.println("目录:" + path); FileStatus[] f = hdfs.listStatus(path);
if (f.length > 0) {
for (FileStatus file : f) {
showDir(file);
}
}
} else {
System.out.println("文件:" + path);
}
}
}

Hadoop上路-03_Hadoop JavaAPI

5.最后修改时间 getModificationTime

// 威格灵博客:www.cuiweiyou.com
package com.cuiweiyou.hadooptest; import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test; public class HdfsTest2 {
private static FileSystem hdfs; @Test
public void test() throws Exception {
// 1.创建配置器
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
conf.set("mapred.jop.tracker", "192.168.1.240:9001");
// 2.创建文件系统(指定为HDFS文件系统到URI)
hdfs = FileSystem.get(conf);
// 3.列出HDFS上目前拥有的文件和目录
FileStatus[] fs = hdfs.listStatus(new Path("hdfs:/"));
if(fs.length>0){
for (FileStatus f : fs) {
showDir(f);
}
}
} private static void showDir(FileStatus fs) throws Exception {
Path path = fs.getPath();
//获取最后修改时间
long time = fs.getModificationTime();
System.out.println("HDFS文件的最后修改时间:"+new Date(time));
System.out.println(path);
if (fs.isDirectory()) {
FileStatus[] f = hdfs.listStatus(path);
if(f.length>0){
for (FileStatus file : f) {
showDir(file);
}
}
}
}
}

Hadoop上路-03_Hadoop JavaAPI

6.文件备份状态 getFileBlockLocations

// 威格灵博客:www.cuiweiyou.com
package com.cuiweiyou.hadooptest; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test; public class HdfsTest2 { @Test
public void test() throws Exception {
//1.配置器
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
conf.set("mapred.jop.tracker", "192.168.1.240:9001");
//2.文件系统
FileSystem fs = FileSystem.get(conf);
//3.已存在的,必须是文件
Path path = new Path("hdfs:/vigiles/dir/test3.txt");
//4.文件状态
FileStatus status = fs.getFileStatus(path);
//5.文件块
//BlockLocation[] blockLocations = fs.getFileBlockLocations(status, 0, status.getLen()); //方法1,传入文件的FileStatus
BlockLocation[] blockLocations = fs.getFileBlockLocations(path, 0, status.getLen()); //方法2,传入文件的Path
int blockLen = blockLocations.length;
System.err.println("块数量:"+blockLen); //如果文件不够大,就不会分块,即得到1
for (int i = 0; i < blockLen; i++) {
//得到块文件大小
long sizes = blockLocations[i].getLength();
System.err.println("块大小:"+sizes); //按照备份数量得到全部主机名
String[] hosts = blockLocations[i].getHosts();
for (String host : hosts) {
System.err.println("主机名:"+host);
} //按照备份数量得到全部主机名
String[] names = blockLocations[i].getNames();
for (String name : names) {
System.err.println("IP:"+ name);
}
}
}
}

Hadoop上路-03_Hadoop JavaAPI

7.读取文件 open

// 威格灵博客:www.cuiweiyou.com
package com.cuiweiyou.hadooptest; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test; public class HdfsTest2 { @Test
public void test() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
conf.set("mapred.jop.tracker", "192.168.1.240:9001");
FileSystem fs = FileSystem.get(conf);
Path path = new Path("hdfs:/vigiles/dir/test3.txt");
FSDataInputStream is = fs.open(path);
FileStatus stat = fs.getFileStatus(path);
byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))];
is.readFully(0, buffer);
is.close();
fs.close();
System.out.println(new String(buffer));
}
}

8.复制上传文件 copyFromLocalFile

@Test
public void test() throws Exception {
// 1.创建配置器
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
conf.set("mapred.jop.tracker", "192.168.1.240:9001");
//2.创建文件系统
FileSystem hdfs = FileSystem.get(conf);
//3.创建可供hadoop使用的文件系统路径
Path src = new Path("file:/home/hadoop/桌面/copy_test.txt"); //本地目录/文件
Path dst = new Path("hdfs:/"); //目标目录/文件
// 4.拷贝本地文件上传(本地文件,目标路径)
hdfs.copyFromLocalFile(src, dst);
System.out.println("文件上传成功至:" + conf.get("fs.default.name"));
// 5.列出HDFS上的文件
FileStatus[] fs = hdfs.listStatus(dst);
for (FileStatus f : fs) {
System.out.println(f.getPath());
} Path path = new Path("hdfs:/copy_test.txt");
FSDataInputStream is = hdfs.open(path);
FileStatus stat = hdfs.getFileStatus(path);
byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))];
is.readFully(0, buffer);
is.close();
hdfs.close();
System.out.println("文件内容:" + new String(buffer));
}

Hadoop上路-03_Hadoop JavaAPI

另:移动上传moveFromLocalFile,和copyFromLocalFile类似,但其操作后源文件将不存在。

9.复制下载文件 copyToLocalFile

    @Test
public void test() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
conf.set("mapred.jop.tracker", "192.168.1.240:9001");
FileSystem hdfs = FileSystem.get(conf);
//创建HDFS源路径和本地目标路径
Path src = new Path("hdfs:/copy_test.txt"); //目标目录/文件
Path dst = new Path("file:/home/hadoop/桌面/new.txt"); //本地目录/文件
//拷贝本地文件上传(本地文件,目标路径)
hdfs.copyToLocalFile(src, dst);
}

另:moveToLocalFile,其操作后源文件将不存在。

10.创建目录 mkdirs

    @Test
public void test() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
conf.set("mapred.jop.tracker", "192.168.1.240:9001");
FileSystem hdfs = FileSystem.get(conf);
//创建目录
hdfs.mkdirs(new Path("hdfs:/eminem"));
}

11.创建目录/文件 create

  @Test
public void test() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
conf.set("mapred.jop.tracker", "192.168.1.240:9001");
FileSystem hdfs = FileSystem.get(conf); // 使用HDFS数据输出流(写)对象 在HDSF上根目录创建一个文件夹,其内再创建文件
FSDataOutputStream out = hdfs.create(new Path("hdfs:/vigiles/eminem.txt"));
// 在文件中写入一行数据,必须使用UTF-8
out.write("痞子阿姆,Hello !".getBytes("UTF-8")); out = hdfs.create(new Path("/vigiles/alizee.txt"));
out.write("艾莉婕,Hello !".getBytes("UTF-8")); out.close(); FSDataInputStream is = hdfs.open(new Path("hdfs:/vigiles/alizee.txt"));
FileStatus stat = hdfs.getFileStatus(new Path("hdfs:/vigiles/alizee.txt"));
byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))];
is.readFully(0, buffer);
is.close();
hdfs.close();
System.out.println(new String(buffer));
}

12.创建空文件 createNewFile

    @Test
public void test() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
conf.set("mapred.jop.tracker", "192.168.1.240:9001");
FileSystem hdfs = FileSystem.get(conf);
//创建空文件
hdfs.createNewFile(new Path("hdfs:/newfile.txt"));
}

13.写入文件 append

    @Test
public void test() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
conf.set("mapred.jop.tracker", "192.168.1.240:9001");
FileSystem hdfs = FileSystem.get(conf);
//创建空文件
FSDataOutputStream out = hdfs.append(new Path("hdfs:/newfile.txt"));
out.write("使用append方法写入文件\n".getBytes("UTF-8"));
out.close(); out = hdfs.append(new Path("/newfile.txt"));
out.write("再次写入!!!\n".getBytes("UTF-8"));
out.close();
}

14.重命名文件 rename

    @Test
public void test() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
conf.set("mapred.jop.tracker", "192.168.1.240:9001");
FileSystem fs = FileSystem.get(conf);
//重命名:fs.rename(源文件,新文件)
boolean rename = fs.rename(new Path("/copy_test.txt"), new Path("/copy.txt"));
System.out.println(rename);
}

15.删除文件 delete

    @Test
public void test() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
conf.set("mapred.jop.tracker", "192.168.1.240:9001");
FileSystem fs = FileSystem.get(conf);
//判断删除(路径,true。false=非空时不删除,抛RemoteException、IOException异常)
boolean delete = fs.delete(new Path("hdfs:/test.txt"), true);
System.out.println("执行删除:"+delete);
//FileSystem关闭时执行
boolean exit = fs.deleteOnExit(new Path("/out.txt"));
System.out.println("执行删除:"+exit);
fs.close();
}

三。MapReduce常用算法

Hadoop上路-03_Hadoop JavaAPI

1.计数

1)数据准备

Hadoop上路-03_Hadoop JavaAPI

2)代码

// 威格灵博客:www.cuiweiyou.com
package com.cuiweiyou.hadooptest; import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /*
* 单词计数
*/
public class WordCount { /*
* 先经过mapper运算,然后才是reducer。
* 内部类:映射器 Mapper<Key_IN, Value_IN, Key_OUT, Value_OUT>
*/
public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> { //计数,查到一个就占个坑
private static final IntWritable one = new IntWritable(1);
//文本
private Text word = new Text(); /**
* 重写map方法,实现理想效果
* MyMapper的实例只有一个,但实例的这个map方法却一直在执行
* Key1:本行首字符在全文中的索引。Value1:本行的文本。context:上下文对象
* 这里K1、V1像这样[K,V]
**/
public void map(Object Key1, Text Value1, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
//拆分字符串,返回单词集合。默认以空格拆分
StringTokenizer itr = new StringTokenizer(Value1.toString());
//遍历一行的全部单词
while (itr.hasMoreTokens()) {
//将文本转为临时Text变量
this.word.set(itr.nextToken());
//将单词保存到上下文对象中(单词,占坑),输出
context.write(this.word, one);
}
}
} /************************************************************************
* 在Mapper后,Reducer前,有个shuffle过程,会根据k2将对应的v2归并为v2[...] *
*************************************************************************/ /*
* mapper结束后,执行现在的reducer。
* 内部类:拆分器 Reducer<Key_IN, Value_IN, Key_OUT, Value_OUT>
*/
public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { //个数统计
private IntWritable result = new IntWritable(); /**
* 重写reduce方法,实现理想效果
* MyReducer的实例也只有一个,但实例的这个reduce方法却一直在执行
* Key2:单词。Values2:value的集合,也就是[1,1,1,...]。context:上下文对象
* 这里这里K2、V2像这样[K,V[1,1,1,...]]
**/
public void reduce(Text Key2, Iterable<IntWritable> Values2, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
//累加V2的元素,有多少个 1 ,即有多少个指定单词
for (IntWritable val : Values2) {
sum += val.get();
}
this.result.set(sum);
//终于将单词和总个数再次输出
context.write(Key2, this.result);
}
} public static void main(String[] args) throws Exception {
// 声明配置信息
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
// 创建作业
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
// 设置mr
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
// 设置输出类型,和Context上下文对象write的参数类型一致
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置输入输出路径
FileInputFormat.addInputPath(job, new Path("hdfs:/input")); //文件已经存在
FileOutputFormat.setOutputPath(job, new Path("hdfs:/output")); //尚未存在
// 执行
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

3)结果

Hadoop上路-03_Hadoop JavaAPI

2.排序

1)数据准备

Hadoop上路-03_Hadoop JavaAPI

2)代码

 package com.cuiweiyou.hadooptest;

 import *

 //hadoop默认排序:
//如果k2、v2类型是Text-文本,结果是按照字典顺序
//如果k2、v2类型是LongWritable-数字,结果是按照数字大小顺序 public class TestSort {
/**
* 内部类:映射器 Mapper<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
*/
public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> {
/**
* 重写map方法
*/
public void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
//这里v1转为k2-数字类型,舍弃k1。null为v2
context.write(new LongWritable(Long.parseLong(v1.toString())), NullWritable.get());
//因为v1可能重复,这时,k2也是可能有重复的
}
} /*** 在此方法执行前,有个shuffle过程,会根据k2将对应的v2归并为v2[...] ***/ /**
* 内部类:拆分器 Reducer<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
*/
public static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
/**
* 重写reduce方法
*/
protected void reduce(LongWritable k2, Iterable<NullWritable> v2, Context context) throws IOException, InterruptedException {
//k2=>k3, v2[...]舍弃。null => v3
context.write(k2, NullWritable.get());
//此时,k3如果发生重复,根据默认算法会发生覆盖,即最终仅保存一个k3
}
} public static void main(String[] args) throws Exception {
// 声明配置信息
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://192.168.1.240:9000"); // 创建作业
Job job = new Job(conf, "Test Sort");
job.setJarByClass(TestSort.class); // 设置mr
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class); // 设置输出类型,和Context上下文对象write的参数类型一致
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(NullWritable.class); // 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path("/input/"));
FileOutputFormat.setOutputPath(job, new Path("/out")); // 执行
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

3)结果

Hadoop上路-03_Hadoop JavaAPI

3.去重

     /*
* 内部类:映射器 Mapper<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
*/
public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> {
/****
* 重写map方法
****/
public void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
//因为我们读入的数据就是一行一个数字,直接使用
//这个数字有几个都无所谓,只有知道有这么一个数字即可,所以输出的v2为null
context.write(new LongWritable(Long.parseLong(v1.toString())), NullWritable.get());
}
} /** 在此方法执行前,有个shuffle过程,会根据k2将对应的v2归并为v2[...] **/ /*
* 内部类:拆分器 Reducer<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
*/
public static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
/****
* 重写reduce方法
****/
protected void reduce(LongWritable k2, Iterable<NullWritable> v2, Context context) throws IOException, InterruptedException {
//此时,k3(即眼前的k2)如果发生重复,根据默认算法会发生覆盖,即最终仅保存一个k3,达到去重到效果,而v3是null无所谓
context.write(k2, NullWritable.get()); }
}

4.过滤

     /*
* 内部类:映射器 Mapper<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
*/
public static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
String tmp = "8238"; /**
* 重写map方法。k1:行首字符索引,v1:这一行文本
**/
protected void map(LongWritable k1, Text v1, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException ,InterruptedException {
System.out.println(v1+", "+tmp);
//如果行文本是指定值,过滤之
if(v1.toString().equals(tmp)){
System.out.println("有了");
//保存(按照泛型限制,k2是Text,v2是Nullritable)
context.write(v1, NullWritable.get());
}
}
} /*
* 内部类:拆分器 Reducer<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
*/
public static class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
/**
* 重写reduce方法
**/
protected void reduce(Text k2, Iterable<NullWritable> v2, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException ,InterruptedException {
context.write(k2, NullWritable.get());
}
}

如果报错:

  Error: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.LongWritable, received org.apache.hadoop.io.Text
一定要检查main方法里:

// 设置输出类型,和Context上下文对象write的参数类型一致
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

5.TopN

1)数值最大

     // map(泛型定义了输入和输出类型)
public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> { // 首先创建一个临时变量,保存一个可存储的最小值:Long.MIN_VALUE=-9223372036854775808
long temp = Long.MIN_VALUE; // 找出最大值。这个map不断迭代v1,最终保存最大值
protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { // 将文本转数值
long val = Long.parseLong(v1.toString());
// 如果v1比临时变量大,则保存v1的值
if (temp < val) {
temp = val;
}
} /** ---此方法在全部的map任务结束后执行一次。这时仅输出临时变量到最大值--- **/
protected void cleanup(Context context) throws IOException, InterruptedException {
context.write(new LongWritable(temp), NullWritable.get());
System.out.println("文件读取完毕,保存最大值"); //输出两次,对应两个文本文件
}
} // reduce
public static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
// 临时变量
Long temp = Long.MIN_VALUE; // 因为一个文件得到一个最大值,我们有两个txt文件会得到两个值。再次将这些值比对,得到最大的
protected void reduce(LongWritable k2, Iterable<NullWritable> v2, Context context) throws IOException, InterruptedException { long val = Long.parseLong(k2.toString());
// 如果k2比临时变量大,则保存k2的值
if (temp < val) {
temp = val;
}
} /** !!!此方法在全部的reduce任务结束后执行一次。这时仅输出唯一最大值!!! **/
protected void cleanup(Context context) throws IOException, InterruptedException {
context.write(new LongWritable(temp), NullWritable.get());
}
}

2)数值前5大

     // map
public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> { // 首先创建一个临时变量,保存一个可存储的最小值:Long.MIN_VALUE=-9223372036854775808
long temp = Long.MIN_VALUE;
// Top5存储空间,我们取前5个
long[] tops; /** 这个方法在run中调用,在全部map之前执行一次 **/
protected void setup(Context context) {
// 初始化数组长度为5
tops = new long[5];
} // 找出最大值
protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { // 将文本转数值
final long val = Long.parseLong(v1.toString());
// 保存在0索引
tops[0] = val;
// 排序后最大值在最后一个索引,这样从[5]到[0]依次减小。每执行一次map,最小的[0]都会赋予新值
Arrays.sort(tops);
} /** ---此方法在全部到map任务结束后执行一次。输出map后得到的前5个最大值--- **/
protected void cleanup(Context context) throws IOException, InterruptedException {
for (int i = 0; i < tops.length; i++) {
context.write(new LongWritable(tops[i]), NullWritable.get());
}
}
} // reduce
public static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
Long temp = Long.MIN_VALUE;
long[] tops; /** 次方法在run中调用,在全部map之前执行一次 **/
protected void setup(Context context) {
tops = new long[5];
} // 因为每个文件都得到5个值,再次将这些值比对,得到最大的
protected void reduce(LongWritable k2, Iterable<NullWritable> v2, Context context) throws IOException, InterruptedException {
long top = Long.parseLong(k2.toString());
tops[0] = top;
Arrays.sort(tops);
} /** ---此方法在全部到reduce任务结束后执行一次--- **/
protected void cleanup(Context context) throws IOException, InterruptedException {
for (int i = 0; i < tops.length; i++) {
context.write(new LongWritable(tops[i]), NullWritable.get());
}
}
}

3)数量最大

Hadoop上路-03_Hadoop JavaAPI

    public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> {

        private static final IntWritable one = new IntWritable(1);
private Text word = new Text(); public void map(Object Key1, Text Value1, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
String[] strings = Value1.toString().split(" ");
for (String str : strings) {
this.word.set(str);
context.write(this.word, one);
}
}
} public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { //临时变量,保存最大数量的单词
private String keyer; //注意这里不能用Hadoop的类型,如Text
private IntWritable valer; //这里最好也是基本的java数据类型,如int
//计数
private Integer temp = Integer.MIN_VALUE; public void reduce(Text Key2, Iterable<IntWritable> Values2, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
//统计数量
for (IntWritable val : Values2) {
sum += val.get();
}
//保存最大数量值
if (sum > temp) {
temp = sum; keyer = Key2.toString();
valer = new IntWritable(temp);
}
} //最终输出最大数量的单词
protected void cleanup(Context context) throws IOException, InterruptedException {
context.write(new Text(keyer), valer);
}
}

6.单表关联

Hadoop上路-03_Hadoop JavaAPI

    /*
父 子
子 孙
1 2
2 3
A B
B C
*/
// map
public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
// 拆分原始数据
protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
// 按制表符拆分记录。一行拆出两个角色
String[] splits = v1.toString().split(" ");
//针对无意义的换行过滤
if (splits.length > 1) {
// 把“父”作为k2;“子“加下划线区分,作为v2
context.write(new Text(splits[0]), new Text("_" + splits[1])); // 把“子”作为k2;“父”辈作为v2。就是把原两个单词调换位置保存
context.write(new Text(splits[1]), new Text(splits[0]));
}
} /**
* 父 _子
* 子 父
*
* 子 _孙
* 孙 子
**/
} /**
* k2 v2[...]
* 子 [父,_孙]
**/ // reduce
public static class MyReducer extends Reducer<Text, Text, Text, Text> {
// 拆分k2v2[...]数据
protected void reduce(Text k2, Iterable<Text> v2, Context context) throws IOException, InterruptedException {
String grandson = ""; // “孙”
String grandfather = ""; // “父” // 从迭代中遍历v2[...]
for (Text man : v2) {
String p = man.toString();
System.out.println("得到:" + p);
// 如果单词是以下划线开始的
if (p.startsWith("_")) {
grandson = p.substring(1);
}
// 如果单词没有下划线起始
else {
// 直接赋值给孙辈变量
grandfather = p;
}
} // 在得到有效数据的情况下
if (grandson != "" && grandfather != "") {
// 写出得到的结果。
context.write(new Text(grandson), new Text(grandfather));
} /**
* k3=父,v3=孙
**/
}
}

 Hadoop上路-03_Hadoop JavaAPI

7.双表关联

Hadoop上路-03_Hadoop JavaAPI

// map
public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
// 拆分原始数据
protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
// 拆分记录
String[] splited = v1.toString().split(" ");
// 如果第一列是数字(使用正则判断),就是歌曲表。先读入那个文件由hadoop决定
if (splited[0].matches("^[-+]?(([0-9]+)([.]([0-9]+))?|([.]([0-9]+))?)$")) {
String id = splited[0];
String song = splited[1];
//v2加两条下划线作为前缀标识为歌曲
context.write(new Text(id), new Text("__" + song));
}
// 否则就是歌手表
else {
String singer = splited[0];
String id = splited[1];
//v2-加两条横线作为前缀标识为歌手
context.write(new Text(id), new Text("--" + singer));
}
/**
* 1 __Eminem 1 --LoseYourself
**/
}
} // reduce
public static class MyReducer extends Reducer<Text, Text, Text, Text> {
// 拆分k2v2[...]数据
protected void reduce(Text k2, Iterable<Text> v2, Context context) throws IOException, InterruptedException {
String song = ""; // 歌曲
String singer = ""; // 歌手
/**
* 1, [__Eminem, --LoseYourself]
**/
for (Text text : v2) {
String tmp = text.toString(); if (tmp.startsWith("__")) {
// 如果是__开头的是song
song = tmp.substring(2); // 从索引2开始截取字符串
}
if (tmp.startsWith("--")) {
// 如果是--开头的是歌手
singer = tmp.substring(2);
}
}
context.write(new Text(singer), new Text(song));
}
/**
* k3=Eminem,v3=LoseYourself
* Eminem LoseYourself
Alizee LaIslaBonita
Michael YouAreNotAlone
Manson FuckFrankie *
**/
}

- end

威格灵博客:www.cuiweiyou.com

上一篇:使用Visual Assistant X创建C程序注释模板


下一篇:blog 社会化评论插件 多说for china, disqus for global range