Spark往Elasticsearch读写数据

def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setAppName("DecisionTree1").setMaster("local[2]")
sparkConf.set("es.index.auto.create", "true")
sparkConf.set("es.nodes", "10.3.162.202")
sparkConf.set("es.port", "9200")
val sc = new SparkContext(sparkConf)
//write2Es(sc)
read4Es(sc);
} def write2Es(sc: SparkContext) = {
val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("OTP" -> "Otopeni", "SFO" -> "San Fran")
var rdd = sc.makeRDD(Seq(numbers, airports))
EsSpark.saveToEs(rdd, "spark/docs")
println("--------------------End-----------------")
} def read4Es(sc: SparkContext) {
val rdd = EsSpark.esRDD(sc, "spark/docs")
rdd.foreach(line => {
val key = line._1
val value = line._2
println("------------------key:" + key)
for (tmp <- value) {
val key1 = tmp._1
val value1 = tmp._2
println("------------------key1:" + key1)
println("------------------value1:" + value1)
}
})
}

例子依赖jar:elasticsearch-spark_2.10-2.1.0.jar

上一篇:Python之网路编程之-互斥锁与进程间的通信(IPC)及生产者消费者模型


下一篇:Stern-Brocot树 及 法里级数分析