Hadoop MapReduce编程 API入门系列之join(二十六)(未完)

  不多说,直接上代码。

Hadoop MapReduce编程 API入门系列之join(二十六)(未完)

Hadoop MapReduce编程 API入门系列之join(二十六)(未完)

天气记录数据库

Station ID Timestamp Temperature

Hadoop MapReduce编程 API入门系列之join(二十六)(未完)

气象站数据库

Station ID Station Name

Hadoop MapReduce编程 API入门系列之join(二十六)(未完)

气象站和天气记录合并之后的示意图如下所示。

Station ID Station Name Timestamp Temperature
011990-99999 SIHCCAJAVRI 195005150700 0
011990-99999 SIHCCAJAVRI 195005151200 22
011990-99999 SIHCCAJAVRI 195005151800 -11
012650-99999 TYNSET-HANSMOEN 194903241200 111
012650-99999 TYNSET-HANSMOEN 194903241800 78

连接操作的具体实现技术取决于数据集的规模及分区方式。如果一个数据集很大而另外一个集合很小,以至于可以分发到集群中的每一个节点之中, 则可以执行一个 MapReduce 作业,将各个气象站的天气记录放到一块,从而实现连接。mapper 或 reducer 根据各气象站 ID 从较小的数据集合中找到气象站元数据,从而完成气象站数据和天气记录数据的合并。

连接操作如果由 mapper 执行,则称为 “map 端连接” ;如果由 reducer 执行,则称为 “reduce 端连接”。

如果两个数据集的规模均很大,以至于没有哪个数据集可以被完全复制到集群的每个节点,我们仍然可以使用 MapReduce 来进行连接,至于到底采用 map 端连接还是 reduce 端连接,则取决于数据的组织方式。

map 端连接

在两个大规模输入数据集之间的 map 端连接会在数据达到 map 函数之前就执行连接操作。为达到该目的,各 map 的输入数据必须先分区并且以特定方式排序。 各个输入数据集被划分成相同数量的分区,并且均按相同的键(连接键)排序。同一键的所有记录均会放在同一分区之中。听起来似乎要求非常严格,但这的确合乎 MapReduce 作业的输出。

map 端连接操作可以连接多个作业的输出,只要这些作业的 reducer 数量相同、键相同并且输出文件是不可切分的(例如,小于一个 HDFS 块,或 gzip 压缩)。 在上面讲的天气例子中,如果气象站文件以气象站ID部分排序,记录文件也以气象站 ID 部分排序,而且 reducer 的数量相同,则就满足了执行 map 端连接的前提条件。

利用 org.apache.hadoop.mapreduce.join 包中的 CompositeInputFormat 类来运行一个 map 端连接。CompositeInputFormat 类的输入源和连接类型(内连接或外连接)可以通过一个连接表达式进行配置, 连接表达式的语法简单。详情与示例可参考包文档。此种方法不常用,了解即可,这里不再赘述。

reduce 端连接

由于 reduce 端连接并不要求输入数据集符合特定结构,因而 reduce端连接比 map 端连接更为常用。但是,由于两个数据集均需经过 MapReduce 的 shuffle 过程, 所以 reduce 端连接的效率往往要低一些。基本思路是 mapper 为各个记录标记源,并且使用连接键作为 map 输出键,使键相同的记录放在同一 reducer 中。 我们通过下面两种技术实现 reduce 端连接。

1、多输入

数据集的输入源往往有多种格式,因此可以使用 MultipleInputs 类来方便地解析和标注各个数据源。MultipleInputs 的用法,在输入格式课程已经详细介绍,这里就不再赘述。

2、二次排序

如前所述,reducer 将两个数据源中选出键相同的记录并不介意这些记录是否已排好序。此外,为了更好地执行连接操作,先将某一个数据源传输到 reducer 会非常重要。 还以上面的天气数据连接为例,当天气记录发送到 reducer 的时候,与这些记录有相同键的气象站信息最好也已经放在 reducer ,使得 reducer 能够将气象站名称填到天气记录之中就马上输出。 虽然也可以不指定数据传输次序,并将待处理的记录缓存在内存之中,但应该尽量避免这种情况,因为其中任何一组的记录数量可能非常庞大,远远超出 reducer 的可用内存容量。 因此我们用到二次排序技术,对 map 阶段输出的每个键的值进行排序,实现这一效果。

我们使用 TextPair 类构建组合键,包括气象站 ID 和 “标记”。在这里,“标记” 是一个虚拟的字段,其唯一目的是对记录排序,使气象站记录比天气记录先到达。一种简单的做法就是:对于气象站记录, “标记” 的值设为 0;对于天气记录,“标记” 的值设为1。

JoinStationMapper 处理来自气象站数据,代码如下所示。

public class JoinStationMapper extends Mapper< LongWritable,Text,TextPair,Text>{
protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
String line = value.toString();
String[] arr = line.split("\\s+");//解析气象站数据
int length = arr.length;
if(length==2){//满足这种数据格式
//key=气象站id value=气象站名称
context.write(new TextPair(arr[0],"0"),new Text(arr[1]));
}
}
}

JoinRecordMapper 处理来自天气记录数据,代码如下所示。

public class JoinRecordMapper extends Mapper< LongWritable,Text,TextPair,Text>{
protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
String line = value.toString();
String[] arr = line.split("\\s+",2);//解析天气记录数据
int length = arr.length;
if(length==2){
//key=气象站id value=天气记录数据
context.write(new TextPair(arr[0],"1"),new Text(arr[1]));
}
}
}

由于 TextPair 经过了二次排序,所以 reducer 会先接收到气象站数据。因此从中抽取气象站名称,并将其作为后续每条输出记录的一部分写到输出文件。JoinReducer 的代码如下所示。

public class JoinReducer extends Reducer< TextPair,Text,Text,Text>{
protected void reduce(TextPair key, Iterable< Text> values,Context context) throws IOException,InterruptedException{
Iterator< Text> iter = values.iterator();
Text stationName = new Text(iter.next());//气象站名称
while(iter.hasNext()){
Text record = iter.next();//天气记录的每条数据
Text outValue = new Text(stationName.toString()+"\t"+record.toString());
context.write(key.getFirst(),outValue);
}
}
}

上面 JoinReducer 里面的代码,假设天气记录的每个气象站 ID 恰巧与气象站数据集中的一条记录准确匹配。 如果该假设不成立,则需要泛化代码,使用另一个 TextPair 将标记放入值的对象中。reduce() 方法在处理天气记录之前,要能够区分哪些记录是气象站名称, 处理缺失或重复的记录。

下面我们定义作业的驱动类 JoinRecordWithStationName,在该类中,关键在于根据组合键的第一个字段(即气象站 ID)进行分区和分组,即使用一个自定义的 Partitioner 和 一个自定义的分组 comparator 作为TextPair 的嵌套类。JoinRecordWithStationName 类的代码如下所示。

public class JoinRecordWithStationName extends Configured implements Tool{
public static class KeyPartitioner extends Partitioner< TextPair,Text>{
@overwrite
public int getPartition(TextPair key,Text value,int numPartitions){
return (key.getFirst().hashCode()&Integer.MAX_VALUE) % numPartitions;
}
} @overwrite
public int run(String[] args) throws Exception{
Configuration conf = new Configuration();// 读取配置文件 Job job = Job.getInstance();// 新建一个任务
job.setJarByClass(JoinRecordWithStationName.class);// 主类 Path recordInputPath = new Path(args[0]);//天气记录数据源
Path stationInputPath = new Path(args[1]);//气象站数据源
Path outputPath = new Path(args[2]);//输出路径 MultipleInputs.addInputPath(job,recordInputPath,TextInputFormat.class,JoinRecordMapper.class);//读取天气记录Mapper
MultipleInputs.addInputPath(job,stationInputPath,TextInputFormat.class,JoinStationMapper.class);//读取气象站Mapper
FileOutputFormat.setOutputPath(job,outputPath); job.setPartitionerClass(KeyPartitioner.class);//自定义分区
job.setGroupingComparatorClass(TextPair.FirstComparator.class);//自定义分组 job.setMapOutputKeyClass(TextPair.class); job.setReducerClass(JoinReducer.class);// Reducer job.setOutputKeyClass(Text.class); return job.waitForCompletion(true)?0:1;
} public static void main(String[] args) throws Exception{
int exitCode = ToolRunner.run(new JoinRecordWithStationName(),args);
System.exit(exitCode);
}
}

该样本数据上运行程序,获得以下输出结果。

011990-99999	SIHCCAJAVRI	195005150700	0
011990-99999 SIHCCAJAVRI 195005151200 22
011990-99999 SIHCCAJAVRI 195005151800 -11
012650-99999 TYNSET-HANSMOEN 194903241200 111
012650-99999 TYNSET-HANSMOEN 194903241800 78

分布式缓存

当 MapReduce 处理大型数据集间的 join 操作时,此时如果一个数据集很大而另外一个集合很小,以至于可以分发到集群中的每个节点之中。 这种情况下,我们就用到了 Hadoop 的分布式缓存机制,它能够在任务运行过程中及时地将文件和存档复制到任务节点以供使用。为了节约网络宽带,在每一个作业中, 各个文件通常只需要复制到一个节点一次。

1、用法

Hadoop 命令行选项中,有三个命令可以实现文件复制分发到任务的各个节点。

1)用户可以使用 -files 选项指定待分发的文件,文件内包含以逗号隔开的 URL 列表。文件可以存放在本地文件系统、HDFS、或其它 Hadoop 可读文件系统之中。 如果尚未指定文件系统,则这些文件被默认是本地的。即使默认文件系统并非本地文件系统,这也是成立的。

2)用户可以使用 -archives 选项向自己的任务中复制存档文件,比如JAR 文件、ZIP 文件、tar 文件和 gzipped tar文件,这些文件会被解档到任务节点。

3)用户可以使用 -libjars 选项把 JAR 文件添加到 mapper 和 reducer 任务的类路径中。如果作业 JAR 文件并非包含很多库 JAR 文件,这点会很有用。

2、工作机制

当用户启动一个作业,Hadoop 会把由 -files、-archives、和 -libjars 等选项所指定的文件复制到分布式文件系统之中。接着,在任务运行之前, tasktracker 将文件从分布式文件系统复制到本地磁盘(缓存)使任务能够访问文件。此时,这些文件就被视为 “本地化” 了。从任务的角度来看, 这些文件就已经在那儿了,它并不关心这些文件是否来自 HDFS 。此外,有 -libjars 指定的文件会在任务启动前添加到任务的类路径(classpath)中。

3、分布式缓存 API

由于可以通过 Hadoop 命令行间接使用分布式缓存,大多数应用不需要使用分布式缓存 API。然而,一些应用程序需要用到分布式缓存的更高级的特性,这就需要直接使用 API 了。 API 包括两部分:将数据放到缓存中的方法,以及从缓存中读取数据的方法。

  1)首先掌握数据放到缓存中的方法,以下列举 Job 中可将数据放入到缓存中的相关方法:

public void addCacheFile(URI uri);
public void addCacheArchive(URI uri);//以上两组方法将文件或存档添加到分布式缓存
public void setCacheFiles(URI[] files);
public void setCacheArchives(URI[] archives);//以上两组方法将一次性向分布式缓存中添加一组文件或存档
public void addFileToClassPath(Path file);
public void addArchiveToClassPath(Path archive);//以上两组方法将文件或存档添加到 MapReduce 任务的类路径
public void createSymlink();      在缓存中可以存放两类对象:文件(files)和存档(achives)。文件被直接放置在任务节点上,而存档则会被解档之后再将具体文件放置在任务节点上。

2)其次掌握在 map 或者 reduce 任务中,使用 API 从缓存中读取数据。

public Path[] getLocalCacheFiles() throws IOException;
public Path[] getLocalCacheArchives() throws IOException;
public Path[] getFileClassPaths();
public Path[] getArchiveClassPaths();

我们可以使用 getLocalCacheFiles()和getLocalCacheArchives()方法获取缓存中的文件或者存档的引用。 当处理存档时,将会返回一个包含解档文件的目的目录。相应的,用户可以通过 getFileClassPaths()和getArchivesClassPaths()方法获取被添加到任务的类路径下的文件和文档。

下面我们仍然以前面的气象站数据和天气记录数据为例,使用分布式缓存 API,完成两个数据集的连接操作。完整的 MapReduce 程序如下所示。

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.Hashtable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; public class JoinRecordWithStationName extends Configured implements Tool { public static class TemperatureMapper extends
Mapper< LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] arr = value.toString().split("\t", 2);
if (arr.length == 2) {
context.write(new Text(arr[0]), value);
} }
} public static class TemperatureReducer extends
Reducer< Text, Text, Text, Text> {
private Hashtable< String, String> table = new Hashtable< String, String>();//定义Hashtable存放缓存数据 /**
* 获取分布式缓存文件
*/
protected void setup(Context context) throws IOException,
InterruptedException {
Path[] localPaths = (Path[]) context.getLocalCacheFiles();//返回本地文件路径
if (localPaths.length == 0) {
throw new FileNotFoundException(
"Distributed cache file not found.");
}
FileSystem fs = FileSystem.getLocal(context.getConfiguration());//获取本地 FileSystem 实例
FSDataInputStream in = null; in = fs.open(new Path(localPaths[0].toString()));// 打开输入流
BufferedReader br = new BufferedReader(new InputStreamReader(in));// 创建BufferedReader读取器
String infoAddr = null;
while (null != (infoAddr = br.readLine())) {// 按行读取并解析气象站数据
String[] records = infoAddr.split("\t");
table.put(records[0], records[1]);//key为stationID,value为stationName
}
} public void reduce(Text key, Iterable< Text> values, Context context)
throws IOException, InterruptedException {
String stationName = table.get(key.toString());//天气记录根据stationId 获取stationName
for (Text value : values) {
context.write(new Text(stationName), value);
}
}
} @Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration(); FileSystem hdfs = FileSystem.get(new URI(
"hdfs://single.hadoop.dajiangtai.com:9000"), conf);
Path out = new Path(args[1]);
if (hdfs.isDirectory(out)) {
hdfs.delete(out, true);
} Job job = Job.getInstance();//获取一个job实例
job.setJarByClass(JoinRecordWithStationName.class); FileInputFormat.addInputPath(job,
new org.apache.hadoop.fs.Path(args[0]));
FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(
args[1]));
//添加分布式缓存文件 station.txt
job.addCacheFile(new URI("hdfs://HadoopMaster:9000/middle/temperature/station.txt")); job.setMapperClass(TemperatureMapper.class);
job.setReducerClass(TemperatureReducer.class); job.setOutputKeyClass(Text.class);// 输出key类型
job.setOutputValueClass(Text.class);// 输出value类型 return job.waitForCompletion(true)?0:1;
} public static void main(String[] args) throws Exception {
String[] arg = {
"hdfs://HadoopMaster:9000/middle/temperature/records.txt",
"hdfs://HadoopMaster:9000/middle/temperature/out/" };
int ec = ToolRunner.run(new Configuration(),
new JoinRecordWithStationName(), arg);
System.exit(ec);
}
}

添加分布式缓存文件相对简单,只需使用job.addCacheFile(new URI(cacheFilePath))方法添加缓存文件即可。需要注意的是,在获取获取缓存文件时,文件将以 “本地的” Path 对象的形式返回。为了读取文件,用户需要首先使用 getLocal()方法获得一个 Hadoop 本地 FileSystem 实例。本程序中,我们在 Reduce 的 setup() 方法中获取缓存文件。

以下是示例数据集的输出结果,达到我们预期的效果。

SIHCCAJAVRI	011990-99999	195005151800	-11
SIHCCAJAVRI 011990-99999 195005151200 22
SIHCCAJAVRI 011990-99999 195005150700 0
TRNSET-HANSMOEN 012650-99999 194903241800 78
TRNSET-HANSMOEN 012650-99999 194903241200 111

Hadoop MapReduce编程 API入门系列之join(二十六)(未完)

Hadoop MapReduce编程 API入门系列之join(二十六)(未完)

Hadoop MapReduce编程 API入门系列之join(二十六)(未完)

Hadoop MapReduce编程 API入门系列之join(二十六)(未完)

Hadoop MapReduce编程 API入门系列之join(二十六)(未完)

Hadoop MapReduce编程 API入门系列之join(二十六)(未完)

Hadoop MapReduce编程 API入门系列之join(二十六)(未完)

Hadoop MapReduce编程 API入门系列之join(二十六)(未完)

 得到结果

SIHCCAJAVRI	011990-99999	195005151800	-11
SIHCCAJAVRI 011990-99999 195005151200 22
SIHCCAJAVRI 011990-99999 195005150700 0
TRNSET-HANSMOEN 012650-99999 194903241800 78
TRNSET-HANSMOEN 012650-99999 194903241200 111

 代码版本1

package zhouls.bigdata.myMapReduce.Join;

import java.util.Set;

import java.io.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class TextPair implements WritableComparable<TextPair>
{
private Text first; //Text 类型的实例变量first
private Text second;//Text 类型的实例变量second

public TextPair() //无参构造方法
{
set(new Text(),new Text());
}
public TextPair(String first,String second) // Sting类型参数的构造方法
{
set(new Text(first),new Text(second));
}
public TextPair(Text first,Text second) // Text类型参数的构造方法
{
set(first,second);
}
public void set(Text first,Text second) //set方法
{
this.first=first;
this.second=second;
}
public Text getFirst() //getFirst方法
{
return first;
}
public Text getSecond() //getSecond方法
{
return second;
}

//将对象转换为字节流并写入到输出流out中
public void write(DataOutput out) throws IOException //write方法
{
first.write(out);
second.write(out);
}

//从输入流in中读取字节流反序列化为对象
public void readFields(DataInput in) throws IOException //readFields方法
{
first.readFields(in);
second.readFields(in);
}

@Override
public int hashCode() //在mapreduce中,通过hashCode来选择reduce分区
{
return first.hashCode() *163+second.hashCode();
}

@Override
public boolean equals(Object o) //equals方法,这里是两个对象的内容之间比较
{
if (o instanceof TextPair)
{
TextPair tp=(TextPair) o;
return first.equals(tp.first) && second.equals(tp.second);
}
return false;
}

@Override
public String toString() //toString方法
{
return first +"\t"+ second;
}
public int compareTo(TextPair o)
{
// TODO Auto-generated method stub
if(!first.equals(o.first))
{
return first.compareTo(o.first);
}
else if(!second.equals(o.second))
{
return second.compareTo(o.second);
}
return 0;
}

}

package zhouls.bigdata.myMapReduce.Join;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import zhouls.bigdata.myMapReduce.Join.TextPair;

public class JoinStationMapper extends Mapper<LongWritable,Text,TextPair,Text>
{
protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
{
String line = value.toString();
String[] arr = line.split("\\s+");//解析气象站数据
int length = arr.length;
if(length==2)
{//满足这种数据格式
//key=气象站id value=气象站名称
System.out.println("station="+arr[0]+"0");
context.write(new TextPair(arr[0],"0"),new Text(arr[1]));
}
}
}

package zhouls.bigdata.myMapReduce.Join;

import java.io.IOException;

import java.util.Iterator;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class JoinReducer extends Reducer< TextPair,Text,Text,Text>
{
protected void reduce(TextPair key, Iterable< Text> values,Context context) throws IOException,InterruptedException
{
Iterator< Text> iter = values.iterator();
Text stationName = new Text(iter.next());//气象站名称
while(iter.hasNext()){
Text record = iter.next();//天气记录的每条数据
Text outValue = new Text(stationName.toString()+"\t"+record.toString());
context.write(key.getFirst(),outValue);
}
}
}

package zhouls.bigdata.myMapReduce.Join;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class JoinRecordMapper extends Mapper< LongWritable,Text,TextPair,Text>
{
protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
{
String line = value.toString();
String[] arr = line.split("\\s+",2);//解析天气记录数据
int length = arr.length;
if(length==2){
//key=气象站id value=天气记录数据
context.write(new TextPair(arr[0],"1"),new Text(arr[1]));
}
}
}

package zhouls.bigdata.myMapReduce.Join;

import java.io.BufferedReader;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.Hashtable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JoinRecordWithStationName extends Configured implements Tool
{
public static class TemperatureMapper extends Mapper< LongWritable, Text, Text, Text>
{
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String[] arr = value.toString().split("\t", 2);
if (arr.length == 2)
{
context.write(new Text(arr[0]), value);
}

}
}

public static class TemperatureReducer extends Reducer< Text, Text, Text, Text>
{
private Hashtable< String, String> table = new Hashtable< String, String>();//定义Hashtable存放缓存数据

/**
* 获取分布式缓存文件
*/
protected void setup(Context context) throws IOException,
InterruptedException
{
Path[] localPaths = (Path[]) context.getLocalCacheFiles();//返回本地文件路径
if (localPaths.length == 0)
{
throw new FileNotFoundException("Distributed cache file not found.");
}
FileSystem fs = FileSystem.getLocal(context.getConfiguration());//获取本地 FileSystem 实例
FSDataInputStream in = null;

in = fs.open(new Path(localPaths[0].toString()));// 打开输入流
BufferedReader br = new BufferedReader(new InputStreamReader(in));// 创建BufferedReader读取器
String infoAddr = null;
while (null != (infoAddr = br.readLine()))
{// 按行读取并解析气象站数据
String[] records = infoAddr.split("\t");
table.put(records[0], records[1]);//key为stationID,value为stationName
}
}

public void reduce(Text key, Iterable< Text> values, Context context)
throws IOException, InterruptedException
{
String stationName = table.get(key.toString());//天气记录根据stationId 获取stationName
for (Text value : values)
{
context.write(new Text(stationName), value);
}
}
}

public int run(String[] args) throws Exception
{
// TODO Auto-generated method stub
Configuration conf = new Configuration();

// FileSystem hdfs = FileSystem.get(new URI("hdfs://HadoopMaster:9000"), conf);
// Path out = new Path(args[1]);
// if (hdfs.isDirectory(out))
// {
// hdfs.delete(out, true);
// }

Job job = Job.getInstance();//获取一个job实例
job.setJarByClass(JoinRecordWithStationName.class);

// FileInputFormat.addInputPath(job,
// new org.apache.hadoop.fs.Path(args[0]));
// FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(args[1]));

FileInputFormat.addInputPath(job,
new org.apache.hadoop.fs.Path("./data/join/station.txt"));
FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path("./out/join/"));

//添加分布式缓存文件 station.txt
// job.addCacheFile(new URI("hdfs://HadoopMaster:9000/join/station.txt"));
job.addCacheFile(new URI("./data/join/station.txt"));

job.setMapperClass(TemperatureMapper.class);
job.setReducerClass(TemperatureReducer.class);

job.setOutputKeyClass(Text.class);// 输出key类型
job.setOutputValueClass(Text.class);// 输出value类型

return job.waitForCompletion(true)?0:1;
}

public static void main(String[] args) throws Exception
{
// String[] arg = {
// "hdfs://HadoopMaster:9000/join/records.txt",
// "hdfs://HadoopMaster:9000/join/out/" };
//
String[] arg = {
"./data/join/records.txt",
"./out/join/" };

int ec = ToolRunner.run(new Configuration(),new JoinRecordWithStationName(), arg);
System.exit(ec);
}
}

package zhouls.bigdata.myMapReduce.Join;

public class JoinRecordAndStationName
{

/**
* @param args
*/
public static void main(String[] args)
{
// TODO Auto-generated method stub

}

}

代码版本2

package zhouls.bigdata.myMapReduce.Join;

import java.util.Set;

import java.io.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class TextPair implements WritableComparable<TextPair>
{
private Text first; //Text 类型的实例变量first
private Text second;//Text 类型的实例变量second

public TextPair() //无参构造方法
{
set(new Text(),new Text());
}
public TextPair(String first,String second) // Sting类型参数的构造方法
{
set(new Text(first),new Text(second));
}
public TextPair(Text first,Text second) // Text类型参数的构造方法
{
set(first,second);
}
public void set(Text first,Text second) //set方法
{
this.first=first;
this.second=second;
}
public Text getFirst() //getFirst方法
{
return first;
}
public Text getSecond() //getSecond方法
{
return second;
}

//将对象转换为字节流并写入到输出流out中
public void write(DataOutput out) throws IOException //write方法
{
first.write(out);
second.write(out);
}

//从输入流in中读取字节流反序列化为对象
public void readFields(DataInput in) throws IOException //readFields方法
{
first.readFields(in);
second.readFields(in);
}

@Override
public int hashCode() //在mapreduce中,通过hashCode来选择reduce分区
{
return first.hashCode() *163+second.hashCode();
}

@Override
public boolean equals(Object o) //equals方法,这里是两个对象的内容之间比较
{
if (o instanceof TextPair)
{
TextPair tp=(TextPair) o;
return first.equals(tp.first) && second.equals(tp.second);
}
return false;
}

@Override
public String toString() //toString方法
{
return first +"\t"+ second;
}
public int compareTo(TextPair o)
{
// TODO Auto-generated method stub
if(!first.equals(o.first))
{
return first.compareTo(o.first);
}
else if(!second.equals(o.second))
{
return second.compareTo(o.second);
}
return 0;
}

}

package zhouls.bigdata.myMapReduce.Join;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import zhouls.bigdata.myMapReduce.Join.TextPair;

public class JoinStationMapper extends Mapper<LongWritable,Text,TextPair,Text>
{
protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
{
String line = value.toString();
String[] arr = line.split("\\s+");//解析气象站数据
int length = arr.length;
if(length==2)
{//满足这种数据格式
//key=气象站id value=气象站名称
System.out.println("station="+arr[0]+"0");
context.write(new TextPair(arr[0],"0"),new Text(arr[1]));
}
}
}

package zhouls.bigdata.myMapReduce.Join;

import java.io.IOException;

import java.util.Iterator;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class JoinReducer extends Reducer< TextPair,Text,Text,Text>
{
protected void reduce(TextPair key, Iterable< Text> values,Context context) throws IOException,InterruptedException
{
Iterator< Text> iter = values.iterator();
Text stationName = new Text(iter.next());//气象站名称
while(iter.hasNext()){
Text record = iter.next();//天气记录的每条数据
Text outValue = new Text(stationName.toString()+"\t"+record.toString());
context.write(key.getFirst(),outValue);
}
}
}

package zhouls.bigdata.myMapReduce.Join;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class JoinRecordMapper extends Mapper< LongWritable,Text,TextPair,Text>
{
protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
{
String line = value.toString();
String[] arr = line.split("\\s+",2);//解析天气记录数据
int length = arr.length;
if(length==2){
//key=气象站id value=天气记录数据
context.write(new TextPair(arr[0],"1"),new Text(arr[1]));
}
}
}

//版本2
package zhouls.bigdata.myMapReduce.Join;

import java.io.InputStream;
import org.apache.hadoop.util.Tool;
import java.io.OutputStream;
import java.util.Set;

import javax.lang.model.SourceVersion;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ToolRunner;

public class JoinRecordWithStationName extends Configured implements Tool
{
public static class KeyPartitioner extends Partitioner< TextPair,Text>
{

public int getPartition(TextPair key,Text value,int numPartitions)
{
return (key.getFirst().hashCode()&Integer.MAX_VALUE) % numPartitions;
}
}

public static class GroupingComparator extends WritableComparator
{
protected GroupingComparator()
{
super(TextPair.class,true);
}
@Override
public int compare(WritableComparable w1,WritableComparable w2)
{
TextPair ip1=(TextPair) w1;
TextPair ip2=(TextPair) w2;
Text l=ip1.getFirst();
Text r=ip2.getFirst();
return l.compareTo(r);

}
}
public int run(String[] args) throws Exception
{
Configuration conf = new Configuration();// 读取配置文件

Path mypath=new Path(args[2]);
FileSystem hdfs=mypath.getFileSystem(conf);
if (hdfs.isDirectory(mypath))
{
hdfs.delete(mypath,true);
}

Job job = Job.getInstance(conf,"join");// 新建一个任务
job.setJarByClass(JoinRecordWithStationName.class);// 主类

Path recordInputPath = new Path(args[0]);//天气记录数据源,这里是牵扯到多路径输入和多路径输出的问题。默认是从args[0]开始
Path stationInputPath = new Path(args[1]);//气象站数据源
Path outputPath = new Path(args[2]);//输出路径

//若只有一个输入和一个输出,则输入是args[0],输出是args[1]。
//若有两个输入和一个输出,则输入是args[0]和args[1],输出是args[2]

MultipleInputs.addInputPath(job,recordInputPath,TextInputFormat.class,JoinRecordMapper.class);//读取天气记录Mapper
MultipleInputs.addInputPath(job,stationInputPath,TextInputFormat.class,JoinStationMapper.class);//读取气象站Mapper
FileOutputFormat.setOutputPath(job,outputPath);
job.setReducerClass(JoinReducer.class);// Reducer
job.setNumReduceTasks(2);

job.setPartitionerClass(KeyPartitioner.class);//自定义分区
job.setGroupingComparatorClass(GroupingComparator.class);//自定义分组

job.setMapOutputKeyClass(TextPair.class);
job.setMapOutputValueClass(Text.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

return job.waitForCompletion(true)?0:1;
}

public static void main(String[] args) throws Exception
{
String[] args0={"hdfs://HadoopMaster:9000/join/records.txt"
,"hdfs://HadoopMaster:9000/join/station.txt"
,"hdfs://HadoopMaster:9000/join/out"
};
int exitCode=ToolRunner.run( new JoinRecordWithStationName(), args0);
System.exit(exitCode);
}
}

package zhouls.bigdata.myMapReduce.Join;

public class JoinRecordAndStationName
{

/**
* @param args
*/
public static void main(String[] args)
{
// TODO Auto-generated method stub

}

}

上一篇:BUAA OO 第4单元总结


下一篇:测试环境搭建——JDK环境配置