spark连接mongodb

1.添加依赖

hadoop和mongodb的连接器
<dependency>
<groupId>org.mongodb.mongo-hadoop</groupId>
<artifactId>mongo-hadoop-core</artifactId>
<version>1.4.2</version>
</dependency>
java连接mongodb连接器
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>2.13.0</version>
</dependency>
2.使用示例
import com.mongodb.hadoop.MongoOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.bson.BSONObject;
import scala.Tuple2;

import java.util.Date;
import java.util.List;

/**
* Created by Administrator on 2015/12/8.
*/
public class ConnectMongo {

public static void main(String args[]){
JavaSparkContext sc =new JavaSparkContext("local","test");
Configuration config =new Configuration();
//解释 主机:端口号/数据库名.Collection名
config.set("mongo.input.uri","mongodb://127.0.0.1:27017/lang.sanlu");
config.set("mongo.output.uri", "mongodb://127.0.0.1:27017/lang.output");
//读取
JavaPairRDD<Object, BSONObject> mongoRDD = sc.newAPIHadoopRDD(config, com.mongodb.hadoop.MongoInputFormat.class, Object.class, BSONObject.class);
//BasonObject-> text
JavaRDD<text> result = mongoRDD.map(
new Function<Tuple2<Object, BSONObject>, text>() {
public text call(Tuple2<Object, BSONObject> v1) throws Exception {
String title = (String) v1._2().get("title");
Date date =(Date) v1._2().get("date");
List<String> paragraph = (List<String>) v1._2().get("paragraph");
return new text(title,date,paragraph);
}
}
);
//copy lang.sanlu to lang.output
mongoRDD.saveAsNewAPIHadoopFile("file:///copy",Object.class, Object.class, MongoOutputFormat.class, config);

}
}












上一篇:纯JS实现可拖拽表单


下一篇:Oracle用户密码过期和用户被锁解决方法【转】