在MR程序的开发过程中,经常会遇到输入数据不是HDFS或者数据输出目的地不是HDFS的,MapReduce的设计已经考虑到这种情况,它为我们提供了两个组建,只需要我们自定义适合的InputFormat和OutputFormat,就可以完成这个需求,这里简单的介绍一个从MongoDB中读数据,并写出数据到MongoDB中的一种情况,只是一个Demo,所以数据随便找的一个。
一、自定义InputFormat
MapReduce中Map阶段的数据输入是由InputFormat决定的,我们查看org.apache.hadoop.mapreduce.InputFormat的源码可以看到以下代码内容,我们可以看到除了实现InputFormat抽象类以外,我们还需要自定义InputSplit和自定义RecordReader类,这两个类的主要作用分别是:split确定数据分片的大小以及数据的位置信息,recordReader具体的读取数据。
public abstract class InputFormat<K, V> {
public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException; // 获取Map阶段的数据分片集合信息
public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException; // 创建具体的数据读取对象
}
1、自定义InputSplit
自定义InputSplit主要需要实现的方法有一下几个:
public abstract class InputSplit {
public abstract long getLength() throws IOException, InterruptedException; // 获取当前分片的长度大小
public abstract String[] getLocations() throws IOException, InterruptedException; // 获取当前分片的位置信息
}
2、自定义RecordReader
自定义RecordReader的主要实现方法有一下几个:
public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
public abstract void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException; // 初始化,如果在构造函数中初始化了,那么该方法可以为空
public abstract boolean nextKeyValue() throws IOException, InterruptedException; //是否存在下一个key/value,如果存在返回true。否则返回false。
public abstract KEYIN getCurrentKey() throws IOException, InterruptedException; // 获取当然key
public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException; // 获取当然value
public abstract float getProgress() throws IOException, InterruptedException; // 获取进度信息
public abstract void close() throws IOException; // 关闭资源
}
二、自定义OutputFormat
MapReduce中Reducer阶段的数据输出是由OutputFormat决定的,决定数据的输出目的地和job的提交对象,我们查看org.apache.hadoop.mapreduce.OutputFormat的源码可以看到以下代码内容,我们可以看到除了实现OutputFormat抽象类以外,我们还需要自定义RecordWriter和自定义OutputCommitter类,其中OutputCommitter类由于不涉及到具体的输出目的地,所以一般情况下,不用重写,可直接使用FileOutputcommitter对象;RecordWriter类是具体的定义如何将数据写到目的地的。
public abstract class OutputFormat<K, V> {
public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException; // 获取具体的数据写出对象
public abstract void checkOutputSpecs(JobContext context) throws IOException, InterruptedException; // 检查输出配置信息是否正确
public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException; // 获取输出job的提交者对象
}
1、自定义RecordWriter
查看RecordWriter源码,我们可以看到主要需要实现的有下列三个方法,分别是:
public abstract class RecordWriter<K, V> {
public abstract void write(K key, V value) throws IOException, InterruptedException; // 具体的写数据的方法
public abstract void close(TaskAttemptContext context) throws IOException, InterruptedException; // 关闭资源
}
三、详细代码
自定义InputFormat&InputSplit
package com.gerry.mongo.hadoop2x.mr.mongodb.lib; import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map; import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger; import com.mongodb.BasicDBObject;
import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.Mongo;
import com.mongodb.MongoException; public class MongoDBInputFormat<T extends MongoDBWritable> extends InputFormat<LongWritable, T> implements Configurable {
private static final Logger LOG = Logger.getLogger(MongoDBInputFormat.class); /**
* 空的对象,主要作用是不进行任何操作,类似于NullWritable
*/
public static class NullMongoDBWritable implements MongoDBWritable, Writable {
@Override
public void write(DBCollection collection) throws MongoException {
// TODO Auto-generated method stub
} @Override
public void readFields(DBObject object) throws MongoException {
// TODO Auto-generated method stub
} @Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
} @Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
} @Override
public DBObject fetchWriteDBObject(DBObject old) throws MongoException {
// TODO Auto-generated method stub
return old;
} } /**
* MongoDB的input split类
*/
public static class MongoDBInputSplit extends InputSplit implements Writable {
private long end = 0;
private long start = 0; /**
* 默认构造方法
*/
public MongoDBInputSplit() {
} /**
* 便利的构造方法
*
* @param start
* 集合中查询的文档开始行号
* @param end
* 集合中查询的文档结束行号
*/
public MongoDBInputSplit(long start, long end) {
this.start = start;
this.end = end;
} public long getEnd() {
return end;
} public long getStart() {
return start;
} @Override
public void write(DataOutput out) throws IOException {
out.writeLong(this.start);
out.writeLong(this.end);
} @Override
public void readFields(DataInput in) throws IOException {
this.start = in.readLong();
this.end = in.readLong();
} @Override
public long getLength() throws IOException, InterruptedException {
// 分片大小
return this.end - this.start;
} @Override
public String[] getLocations() throws IOException, InterruptedException {
// TODO 返回一个空的数组,表示不进行数据本地化的优化,那么map执行节点随机选择。
return new String[] {};
} } protected MongoDBConfiguration mongoConfiguration; // mongo相关配置信息
protected Mongo mongo; // mongo连接
protected String databaseName; // 连接的数据库名称
protected String collectionName; // 连接的集合名称
protected DBObject conditionQuery; // 选择条件
protected DBObject fieldQuery; // 需要的字段条件 @Override
public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
DBCollection dbCollection = null;
try {
dbCollection = this.getDBCollection();
// 获取数量大小
long count = dbCollection.count(this.getConditionQuery());
int chunks = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
long chunkSize = (count / chunks); // 分片数量 // 开始分片,只是简单的分配每个分片的数据量
List<InputSplit> splits = new ArrayList<InputSplit>();
for (int i = 0; i < chunks; i++) {
MongoDBInputSplit split = null;
if ((i + 1) == chunks) {
split = new MongoDBInputSplit(i * chunkSize, count);
} else {
split = new MongoDBInputSplit(i * chunkSize, (i * chunkSize) + chunkSize);
}
splits.add(split);
}
return splits;
} catch (Exception e) {
throw new IOException(e);
} finally {
dbCollection = null;
closeConnection(); // 关闭资源的连接
}
} @Override
public RecordReader<LongWritable, T> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
return createRecordReader((MongoDBInputSplit) split, context.getConfiguration());
} protected RecordReader<LongWritable, T> createRecordReader(MongoDBInputSplit split, Configuration conf) {
// 获取从mongodb中读取数据需要转换成的value class,默认为NullMongoDBWritable
Class<? extends MongoDBWritable> valueClass = this.mongoConfiguration.getValueClass();
return new MongoDBRecordReader<T>(split, valueClass, conf, getDBCollection(), getConditionQuery(), getFieldQuery());
} @Override
public void setConf(Configuration conf) {
mongoConfiguration = new MongoDBConfiguration(conf);
databaseName = this.mongoConfiguration.getInputDatabaseName(); // 输入数据的数据库
collectionName = this.mongoConfiguration.getInputCollectionName(); // 输入数据的集合
getMongo(); // 初始化
getConditionQuery(); // 初始化
getFieldQuery(); // 初始化
} @Override
public Configuration getConf() {
return this.mongoConfiguration.getConfiguration();
} public Mongo getMongo() {
try {
if (null == this.mongo) {
this.mongo = this.mongoConfiguration.getMongoConnection();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return mongo;
} public DBObject getConditionQuery() {
if (null == this.conditionQuery) {
Map<String, String> conditions = this.mongoConfiguration.getInputConditions();
BasicDBObjectBuilder builder = new BasicDBObjectBuilder();
for (Map.Entry<String, String> entry : conditions.entrySet()) {
if (entry.getValue() != null) {
builder.append(entry.getKey(), entry.getValue());
} else {
builder.push(entry.getKey());
}
}
if (builder.isEmpty()) {
this.conditionQuery = new BasicDBObject();
} else {
this.conditionQuery = builder.get();
}
}
return this.conditionQuery;
} public DBObject getFieldQuery() {
if (fieldQuery == null) {
String[] fields = this.mongoConfiguration.getInputFieldNames();
if (fields != null && fields.length > 0) {
BasicDBObjectBuilder builder = new BasicDBObjectBuilder();
for (String field : fields) {
builder.push(field);
}
fieldQuery = builder.get();
} else {
fieldQuery = new BasicDBObject();
}
}
return fieldQuery;
} protected DBCollection getDBCollection() {
DB db = getMongo().getDB(this.databaseName);
if (this.mongoConfiguration.isEnableAuth()) {
String username = this.mongoConfiguration.getUsername();
String password = this.mongoConfiguration.getPassword();
if (!db.authenticate(username, password.toCharArray())) {
throw new RuntimeException("authenticate failure with the username:" + username + ",pwd:" + password);
}
}
return db.getCollection(collectionName);
} protected void closeConnection() {
try {
if (null != this.mongo) {
this.mongo.close();
this.mongo = null;
}
} catch (Exception e) {
LOG.debug("Exception on close", e);
}
}
}
MongoDBInputFormat.java
自定义RecordReader
package com.gerry.mongo.hadoop2x.mr.mongodb.lib; import java.io.IOException; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.ReflectionUtils; import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject; public class MongoDBRecordReader<T extends MongoDBWritable> extends RecordReader<LongWritable, T> {
private Class<? extends MongoDBWritable> valueClass;
private LongWritable key;
private T value;
private long pos;
private Configuration conf;
private MongoDBInputFormat.MongoDBInputSplit split;
private DBCollection collection;
private DBObject conditionQuery;
private DBObject fieldQuery;
private DBCursor cursor; public MongoDBRecordReader(MongoDBInputFormat.MongoDBInputSplit split, Class<? extends MongoDBWritable> valueClass, Configuration conf, DBCollection collection, DBObject conditionQuery,
DBObject fieldQuery) {
this.split = split;
this.valueClass = valueClass;
this.collection = collection;
this.conditionQuery = conditionQuery;
this.fieldQuery = fieldQuery;
this.conf = conf;
} @Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
// do nothing
} @SuppressWarnings("unchecked")
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
try {
if (key == null) {
key = new LongWritable();
}
if (value == null) {
value = (T) ReflectionUtils.newInstance(valueClass, conf);
}
if (null == cursor) {
cursor = executeQuery();
}
if (!cursor.hasNext()) {
return false;
} key.set(pos + split.getStart()); // 设置key
value.readFields(cursor.next()); // 设置value
pos++;
} catch (Exception e) {
throw new IOException("Exception in nextKeyValue", e);
}
return true;
} protected DBCursor executeQuery() {
try {
return collection.find(conditionQuery, fieldQuery).skip((int) split.getStart()).limit((int) split.getLength());
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
} @Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return this.key;
} @Override
public T getCurrentValue() throws IOException, InterruptedException {
return this.value;
} @Override
public float getProgress() throws IOException, InterruptedException {
return pos;
} @Override
public void close() throws IOException {
if (collection != null) {
collection.getDB().getMongo().close();
}
} }
MongoDBRecordReader.java
自定义OutputFormat&RecordWriter
package com.gerry.mongo.hadoop2x.mr.mongodb.lib; import java.io.IOException; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Logger; import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.Mongo; public class MongoDBOutputFormat<K extends MongoDBWritable, V extends MongoDBWritable> extends OutputFormat<K, V> {
private static Logger LOG = Logger.getLogger(MongoDBOutputFormat.class); /**
* A RecordWriter that writes the reduce output to a MongoDB collection
*
* @param <K>
* @param <T>
*/
public static class MongoDBRecordWriter<K extends MongoDBWritable, V extends MongoDBWritable> extends RecordWriter<K, V> {
private Mongo mongo;
private String databaseName;
private String collectionName;
private MongoDBConfiguration dbConf;
private DBCollection dbCollection;
private DBObject dbObject;
private boolean enableFetchMethod; public MongoDBRecordWriter(MongoDBConfiguration dbConf, Mongo mongo, String databaseName, String collectionName) {
this.mongo = mongo;
this.databaseName = databaseName;
this.collectionName = collectionName;
this.dbConf = dbConf;
this.enableFetchMethod = this.dbConf.isEnableUseFetchMethod();
getDbCollection();// 创建连接
} protected DBCollection getDbCollection() {
if (null == this.dbCollection) {
DB db = this.mongo.getDB(this.databaseName);
if (this.dbConf.isEnableAuth()) {
String username = this.dbConf.getUsername();
String password = this.dbConf.getPassword();
if (!db.authenticate(username, password.toCharArray())) {
throw new RuntimeException("authenticate failure, the username:" + username + ", pwd:" + password);
}
}
this.dbCollection = db.getCollection(this.collectionName);
}
return this.dbCollection;
} @Override
public void write(K key, V value) throws IOException, InterruptedException {
if (this.enableFetchMethod) {
this.dbObject = key.fetchWriteDBObject(null);
this.dbObject = value.fetchWriteDBObject(this.dbObject);
// 写数据
this.dbCollection.insert(this.dbObject);// 在这里可以做一个缓存,一起提交,如果数据量大的情况下。
this.dbObject = null;
} else {
// 直接调用写方法
key.write(dbCollection);
value.write(dbCollection);
}
} @Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
if (this.mongo != null) {
this.dbCollection = null;
this.mongo.close();
}
}
} @Override
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
try {
MongoDBConfiguration dbConf = new MongoDBConfiguration(context.getConfiguration());
String databaseName = dbConf.getOutputDatabaseName();
String collectionName = dbConf.getOutputCollectionName();
Mongo mongo = dbConf.getMongoConnection();
return new MongoDBRecordWriter<K, V>(dbConf, mongo, databaseName, collectionName);
} catch (Exception e) {
LOG.error("Create the record writer occur exception.", e);
throw new IOException(e);
}
} @Override
public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
// 不进行检测
} @Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
// 由于outputcommitter主要作用是提交jar,分配jar的功能。所以我们这里直接使用FileOutputCommitter
return new FileOutputCommitter(FileOutputFormat.getOutputPath(context), context);
} /**
* 设置output属性
*
* @param job
* @param databaseName
* @param collectionName
*/
public static void setOutput(Job job, String databaseName, String collectionName) {
job.setOutputFormatClass(MongoDBOutputFormat.class);
job.setReduceSpeculativeExecution(false);
MongoDBConfiguration mdc = new MongoDBConfiguration(job.getConfiguration());
mdc.setOutputCollectionName(collectionName);
mdc.setOutputDatabaseName(databaseName);
} /**
* 静止使用fetch方法
*
* @param conf
*/
public static void disableFetchMethod(Configuration conf) {
conf.setBoolean(MongoDBConfiguration.OUTPUT_USE_FETCH_METHOD_PROPERTY, false);
}
}
MongoDBOutputFormat.java
其他涉及到的java代码
package com.gerry.mongo.hadoop2x.mr.mongodb.lib; import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.lib.db.DBWritable; import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBInputFormat.NullMongoDBWritable;
import com.mongodb.Mongo;
import com.mongodb.ServerAddress; public class MongoDBConfiguration {
public static final String BIND_HOST_PROPERTY = "mapreduce.mongo.host";
public static final String BIND_PORT_PROPERTY = "mapreduce.mongo.port";
public static final String AUTH_ENABLE_PROPERTY = "mapreduce.mongo.auth.enable";
public static final String USERNAME_PROPERTY = "mapreduce.mongo.username";
public static final String PASSWORD_PROPERTY = "mapreduce.mongo.password";
public static final String PARTITION_PROPERTY = "mapreduce.mongo.partition"; public static final String INPUT_DATABASE_NAME_PROPERTY = "mapreduce.mongo.input.database.name";
public static final String INPUT_COLLECTION_NAME_PROPERTY = "mapreduce.mongo.input.collection.name";
public static final String INPUT_FIELD_NAMES_PROPERTY = "mapreduce.mongo.input.field.names";
public static final String INPUT_CONDITIONS_PROPERTY = "mapreduce.mongo.input.conditions";
public static final String INPUT_CLASS_PROPERTY = "mapreduce.mongo.input.class"; public static final String OUTPUT_DATABASE_NAME_PROPERTY = "mapreduce.mongo.output.database.name";
public static final String OUTPUT_COLLECTION_NAME_PROPERTY = "mapreduce.mongo.output.collection.name";
// 在recordwriter中到底是否调用fetch方法,默认调用。如果设置为不调用,那么就直接使用writer方法
public static final String OUTPUT_USE_FETCH_METHOD_PROPERTY = "mapreduce.mongo.output.use.fetch.method"; private Configuration conf; public MongoDBConfiguration(Configuration conf) {
this.conf = conf;
} /**
* 获取Configuration对象
*
* @return
*/
public Configuration getConfiguration() {
return this.conf;
} /**
* 设置连接信息
*
* @param host
* @param port
* @return
*/
public MongoDBConfiguration configureDB(String host, int port) {
return this.configureDB(host, port, false, null, null);
} /**
* 设置连接信息
*
* @param host
* @param port
* @param enableAuth
* @param username
* @param password
* @return
*/
public MongoDBConfiguration configureDB(String host, int port, boolean enableAuth, String username, String password) {
this.conf.set(BIND_HOST_PROPERTY, host);
this.conf.setInt(BIND_PORT_PROPERTY, port);
if (enableAuth) {
this.conf.setBoolean(AUTH_ENABLE_PROPERTY, true);
this.conf.set(USERNAME_PROPERTY, username);
this.conf.set(PASSWORD_PROPERTY, password);
}
return this;
} /**
* 获取MongoDB的连接对象Connection对象
*
* @return
* @throws UnknownHostException
*/
public Mongo getMongoConnection() throws UnknownHostException {
return new Mongo(new ServerAddress(this.getBindHost(), this.getBindPort()));
} /**
* 获取设置的host
*
* @return
*/
public String getBindHost() {
return this.conf.get(BIND_HOST_PROPERTY, "localhost");
} /**
* 获取设置的port
*
* @return
*/
public int getBindPort() {
return this.conf.getInt(BIND_PORT_PROPERTY, 27017);
} /**
* 获取是否开启安全验证,默认的Mongodb是不开启的。
*
* @return
*/
public boolean isEnableAuth() {
return this.conf.getBoolean(AUTH_ENABLE_PROPERTY, false);
} /**
* 获取完全验证所需要的用户名
*
* @return
*/
public String getUsername() {
return this.conf.get(USERNAME_PROPERTY);
} /**
* 获取安全验证所需要的密码
*
* @return
*/
public String getPassword() {
return this.conf.get(PASSWORD_PROPERTY);
} public String getPartition() {
return conf.get(PARTITION_PROPERTY, "|");
} public MongoDBConfiguration setPartition(String partition) {
conf.set(PARTITION_PROPERTY, partition);
return this;
} public String getInputDatabaseName() {
return conf.get(INPUT_DATABASE_NAME_PROPERTY, "test");
} public MongoDBConfiguration setInputDatabaseName(String databaseName) {
conf.set(INPUT_DATABASE_NAME_PROPERTY, databaseName);
return this;
} public String getInputCollectionName() {
return conf.get(MongoDBConfiguration.INPUT_COLLECTION_NAME_PROPERTY, "test");
} public void setInputCollectionName(String tableName) {
conf.set(MongoDBConfiguration.INPUT_COLLECTION_NAME_PROPERTY, tableName);
} public String[] getInputFieldNames() {
return conf.getStrings(MongoDBConfiguration.INPUT_FIELD_NAMES_PROPERTY);
} public void setInputFieldNames(String... fieldNames) {
conf.setStrings(MongoDBConfiguration.INPUT_FIELD_NAMES_PROPERTY, fieldNames);
} public Map<String, String> getInputConditions() {
Map<String, String> result = new HashMap<String, String>();
String[] conditions = conf.getStrings(INPUT_CONDITIONS_PROPERTY);
if (conditions != null && conditions.length > 0) {
String partition = this.getPartition();
String[] values = null;
for (String condition : conditions) {
values = condition.split(partition);
if (values != null && values.length == 2) {
result.put(values[0], values[1]);
} else {
result.put(condition, null);
}
}
}
return result;
} public void setInputConditions(Map<String, String> conditions) {
if (conditions != null && conditions.size() > 0) {
String[] values = new String[conditions.size()];
String partition = this.getPartition();
int k = 0;
for (Map.Entry<String, String> entry : conditions.entrySet()) {
if (entry.getValue() != null) {
values[k++] = entry.getKey() + partition + entry.getValue();
} else {
values[k++] = entry.getKey();
}
}
conf.setStrings(INPUT_CONDITIONS_PROPERTY, values);
}
} public Class<? extends MongoDBWritable> getValueClass() {
return conf.getClass(INPUT_CLASS_PROPERTY, NullMongoDBWritable.class, MongoDBWritable.class);
} public void setInputClass(Class<? extends DBWritable> inputClass) {
conf.setClass(MongoDBConfiguration.INPUT_CLASS_PROPERTY, inputClass, DBWritable.class);
} public String getOutputDatabaseName() {
return conf.get(OUTPUT_DATABASE_NAME_PROPERTY, "test");
} public MongoDBConfiguration setOutputDatabaseName(String databaseName) {
conf.set(OUTPUT_DATABASE_NAME_PROPERTY, databaseName);
return this;
} public String getOutputCollectionName() {
return conf.get(MongoDBConfiguration.OUTPUT_COLLECTION_NAME_PROPERTY, "test");
} public void setOutputCollectionName(String tableName) {
conf.set(MongoDBConfiguration.OUTPUT_COLLECTION_NAME_PROPERTY, tableName);
} public boolean isEnableUseFetchMethod() {
return conf.getBoolean(OUTPUT_USE_FETCH_METHOD_PROPERTY, true);
} public void setOutputUseFetchMethod(boolean useFetchMethod) {
conf.setBoolean(OUTPUT_USE_FETCH_METHOD_PROPERTY, useFetchMethod);
}
}
MongoDBConfiguration.java
package com.gerry.mongo.hadoop2x.mr.mongodb.lib; import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.MongoException; public interface MongoDBWritable {
/**
* 往mongodb的集合中写数据
*
* @param collection
* @throws MongoException
*/
public void write(DBCollection collection) throws MongoException; /**
* 获取要写的mongoDB对象
*
* @param old
* @return
* @throws MongoException
*/
public DBObject fetchWriteDBObject(DBObject old) throws MongoException; /**
* 从mongodb的集合中读数据
*
* @param collection
* @throws MongoException
*/
public void readFields(DBObject object) throws MongoException;
}
MongoDBWritable.java
package com.gerry.mongo.hadoop2x.mr.mongodb.nw; import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Date;
import java.util.HashSet;
import java.util.Set; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer; import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBConfiguration;
import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBInputFormat;
import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBOutputFormat;
import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBWritable;
import com.mongodb.BasicDBObject;
import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.MongoException; public class Demo {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
// 设置输入的mongodb的数据库和集合,以及对应的输入对象value,这里的数据库和集合要求存在,否则是没有数据的,当然没有数据不会出问题
conf.set(MongoDBConfiguration.INPUT_COLLECTION_NAME_PROPERTY, "users");
conf.set(MongoDBConfiguration.INPUT_DATABASE_NAME_PROPERTY, "db_java");
conf.setClass(MongoDBConfiguration.INPUT_CLASS_PROPERTY, DemoInputValueAndOutputKey.class, MongoDBWritable.class);
Job job = Job.getInstance(conf, "mongodb-demo"); job.setJarByClass(Demo.class);
job.setMapperClass(DemoMapper.class);
job.setReducerClass(DemoReducer.class);
job.setOutputKeyClass(DemoInputValueAndOutputKey.class);
job.setOutputValueClass(DemoOutputValue.class);
job.setMapOutputKeyClass(DemoInputValueAndOutputKey.class);
job.setMapOutputValueClass(NullWritable.class);
job.setInputFormatClass(MongoDBInputFormat.class);
MongoDBOutputFormat.setOutput(job, "foobar2", "users"); // 这个可以不存在 job.waitForCompletion(true);
} public static class DemoOutputValue implements Writable, MongoDBWritable {
private Date clientTime;
private long count; @Override
public void write(DBCollection collection) throws MongoException {
throw new UnsupportedOperationException();
} @Override
public DBObject fetchWriteDBObject(DBObject old) throws MongoException {
BasicDBObjectBuilder builder = null;
Set<String> keys = new HashSet<String>();
if (old != null) {
keys = old.keySet();
builder = BasicDBObjectBuilder.start(old.toMap());
} else {
builder = new BasicDBObjectBuilder();
}
// 添加当前对象的value值,如果存在同样的key,那么加序号
builder.append(getKey(keys, "time", 0), clientTime).append(getKey(keys, "count", 0), this.count);
return builder.get();
} @Override
public void readFields(DBObject object) throws MongoException {
throw new UnsupportedOperationException();
} @Override
public void write(DataOutput out) throws IOException {
out.writeLong(this.clientTime.getTime());
out.writeLong(this.count);
} @Override
public void readFields(DataInput in) throws IOException {
this.clientTime = new Date(in.readLong());
this.count = in.readLong();
} public Date getClientTime() {
return clientTime;
} public void setClientTime(Date clientTime) {
this.clientTime = clientTime;
} public long getCount() {
return count;
} public void setCount(long count) {
this.count = count;
}
} public static class DemoInputValueAndOutputKey implements MongoDBWritable, WritableComparable<DemoInputValueAndOutputKey> {
private String name;
private Integer age;
private String sex; @Override
public void write(DataOutput out) throws IOException {
if (this.name == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(this.name);
}
if (this.age == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeInt(this.age);
}
if (this.sex == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(this.sex);
}
} @Override
public void readFields(DataInput in) throws IOException {
this.name = in.readBoolean() ? in.readUTF() : null;
this.age = in.readBoolean() ? Integer.valueOf(in.readInt()) : null;
this.sex = in.readBoolean() ? in.readUTF() : null;
} @Override
public void write(DBCollection collection) throws MongoException {
DBObject object = new BasicDBObject();
object.put("name", this.name);
object.put("age", this.age.intValue());
object.put("sex", this.sex);
collection.insert(object);
} @Override
public void readFields(DBObject object) throws MongoException {
this.name = (String) object.get("name");
this.age = (Integer) object.get("age");
this.sex = (String) object.get("sex");
} @Override
public DBObject fetchWriteDBObject(DBObject old) throws MongoException {
BasicDBObjectBuilder builder = null;
Set<String> keys = new HashSet<String>();
if (old != null) {
keys = old.keySet();
builder = BasicDBObjectBuilder.start(old.toMap());
} else {
builder = new BasicDBObjectBuilder();
}
// 添加当前对象的value值,如果存在同样的key,那么加序号
if (this.name != null) {
builder.append(getKey(keys, "name", 0), this.name);
}
if (this.age != null) {
builder.append(getKey(keys, "age", 0), this.age.intValue());
}
if (this.sex != null) {
builder.append(getKey(keys, "sex", 0), this.sex);
}
return builder.get();
} @Override
public String toString() {
return "DemoInputValue [name=" + name + ", age=" + age + ", sex=" + sex + "]";
} @Override
public int compareTo(DemoInputValueAndOutputKey o) {
int tmp;
if (this.name == null) {
if (o.name != null) {
return -1;
}
} else if (o.name == null) {
return 1;
} else {
tmp = this.name.compareTo(o.name);
if (tmp != 0) {
return tmp;
}
} if (this.age == null) {
if (o.age != null) {
return -1;
}
} else if (o.age == null) {
return 1;
} else {
tmp = this.age - o.age;
if (tmp != 0) {
return tmp;
}
} if (this.sex == null) {
if (o.sex != null) {
return -1;
}
} else if (o.sex == null) {
return 1;
} else {
return this.sex.compareTo(o.sex);
}
return 0;
} } /**
* 直接输出
*
* @author jsliuming
*
*/
public static class DemoMapper extends Mapper<LongWritable, DemoInputValueAndOutputKey, DemoInputValueAndOutputKey, NullWritable> {
@Override
protected void map(LongWritable key, DemoInputValueAndOutputKey value, Context context) throws IOException, InterruptedException {
context.write(value, NullWritable.get());
}
} /**
* 写出数据,只做一个统计操作
*
* @author jsliuming
*
*/
public static class DemoReducer extends Reducer<DemoInputValueAndOutputKey, NullWritable, DemoInputValueAndOutputKey, DemoOutputValue> {
private DemoOutputValue outputValue = new DemoOutputValue(); @Override
protected void reduce(DemoInputValueAndOutputKey key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
long sum = 0;
for (@SuppressWarnings("unused")
NullWritable value : values) {
sum++;
}
outputValue.setClientTime(new Date());
outputValue.setCount(sum);
context.write(key, outputValue);
}
} /**
* 转换key,作用是当key存在keys集合中的时候,在key后面添加序号
*
* @param keys
* @param key
* @param index
* @return
*/
public static String getKey(Set<String> keys, String key, int index) {
while (keys.contains(key)) {
key = key + (index++);
}
return key;
}
}
Demo
四、结果截图