本节书摘来异步社区《Hadoop实战手册》一书中的第1章,第1.7节,作者: 【美】Jonathan R. Owens , Jon Lentz , Brian Femiano 译者: 傅杰 , 赵磊 , 卢学裕 责编: 杨海玲,更多章节内容可以访问云栖社区“异步社区”公众号查看。
1.7 从HDFS导出数据到MongoDB
本节将使用MongoOutputFormat类加载HDFS中的数据并收集到MongoDB中。
准备工作
使用Mongo Hadoop适配器最简单的方法是从GitHub上克隆 Mongo-Hadoop工程,并且将该工程编译到一个特定的Hadoop版本。克隆该工程需要安装一个Git客户端。
本节假定你使用的Hadoop版本是CDH3。
Git客户端官方的下载地址是:http://git-scm.com/downloads。
在Windows操作系统上可以通过http://windows.github.com/访问GitHub。
在Mac操作系统上可以通过http://mac.github.com/访问GitHub。
可以通过https://github.com/mongodb/mongo-hadoop获取到Mongo Hadoop适配器。该工程需要编译在特定的Hadoop版本上。编译完的JAR文件需要复制到Hadoop集群每个节点的$HADOOP_HOME/lib目录下。
Mongo Java的驱动包也需要安装到Hadoop集群每个节点的$HADOOP_HOME/lib目录下。该驱动包可从https://github.com/mongodb/mongo-java-driver/downloads下载。
操作步骤
完成下面步骤实现将HDFS数据复制到MongoDB。
1.通过下面的命令实现克隆mongo-hadoop工程:
git clone https://github.com/mongodb/mongo-hadoop.git
2.切换到稳定发布的1.0分支版本:
git checkout release-1.0
3.必须保持mongo-hadoop与Hadoop的版本一致。使用文本编辑器打开mongo-hadoop克隆目录下的build.sbt文件,将下面这行:
hadoopRelease in ThisBuild := "default"
修改为:
hadoopRelease in ThisBuild := "cdh3"
4.编译mongo-hadoop:
./sbt package.
这将会在core/target文件夹下生成一个名为mongo-hadoop-core_cdh3u3-1.0.0.jar的JAR文件。
5.从https://github.com/mongodb/mongo-java-driver/downloads下载MongoDB 2.8.0版本的Java驱动包。
6.复制mongo-hadoop和MongoDB Java驱动包到Hadoop集群每个节点的$HADOOP_HOME/lib:
cp mongo-hadoop-core_cdh3u3-1.0.0.jar mongo-2.8.0.jar $HADOOP_HOME/lib
7.编写MapReduce读取HDFS上weblog_entries.txt文件并通过MongoOutputFormat类将数据写入MongoDB中:
import java.io.*;
import org.apache.commons.logging.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.*;
import org.bson.*;
import org.bson.types.ObjectId;
import com.mongodb.hadoop.*;
import com.mongodb.hadoop.util.*;
public class ExportToMongoDBFromHDFS {
private static final Log log =
LogFactory.getLog(ExportToMongoDBFromHDFS.class);
public static class ReadWeblogs extends Mapper<LongWritable, Text,
ObjectId, BSONObject>{
public void map(Text key, Text value, Context context) throws
IOException, InterruptedException{
System.out.println("Key: " + key);
System.out.println("Value: " + value);
String[] fields = value.toString().split("\t");
String md5 = fields[0];
String url = fields[1];
String date = fields[2];
String time = fields[3];
String ip = fields[4];
BSONObject b = new BasicBSONObject();
b.put("md5", md5);
b.put("url", url);
b.put("date", date);
b.put("time", time);
b.put("ip", ip);
context.write( new ObjectId(), b);
}
}
public static void main(String[] args) throws Exception{
final Configuration conf = new Configuration();
MongoConfigUtil.setOutputURI(conf,
"mongodb://<HOST>:<PORT>/test. weblogs");
System.out.println("Configuration: " + conf);
final Job job = new Job(conf, "Export to Mongo");
Path in = new Path("/data/weblogs/weblog_entries.txt");
FileInputFormat.setInputPaths(job, in);
job.setJarByClass(ExportToMongoDBFromHDFS.class);
job.setMapperClass(ReadWeblogs.class);
job.setOutputKeyClass(ObjectId.class);
job.setOutputValueClass(BSONObject.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(MongoOutputFormat.class);
job.setNumReduceTasks(0);
System.exit(job.waitForCompletion(true) ? 0 : 1 );
}
}
8.导出为一个可运行的JAR文件,并运行该作业:
hadoop jar ExportToMongoDBFromHDFS.jar
9.在Mongo shell上验证weblogs已经导入MongoDB:
db.weblogs.find();
工作原理
Mongo Hadoop适配器提供了一种新的兼容Hadoop的文件系统实现包括MongoInputFormat和MongoOutputFormat。这些抽象实现使得访问MongoDB和访问任何兼容Hadoop的文件系统一样。