SparkGraphXTest.scala

/**
* Created by root on 9/8/15.
*/
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD object SparkGraphXTest {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("graphx app").setMaster("local")
val sc = new SparkContext(conf)
val users: RDD[(VertexId, (String, String))] = sc.parallelize(
Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
val relationships: RDD[Edge[String]] = sc.parallelize(
Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
val defaultUser = ("John Doe", "Missing")
val graph = Graph(users, relationships, defaultUser)
val count1 = graph.vertices.filter{ case (id, (name, pos)) => pos == "postdoc" }.count()
val count2 = graph.edges.filter(e => e.srcId > e.dstId).count()
val count3 = graph.edges.filter{ case Edge(src, dst, prop) => src > dst }.count()
println(count1)
println(count2)
println(count3)
val facts: RDD[String] = graph.triplets.map(triplet =>
triplet.srcAttr._2 + " is the " + triplet.attr + " of " +triplet.dstAttr._2)
facts.collect().foreach(println(_)) val users2: RDD[(VertexId, (String, String ,String))] = sc.parallelize(
Array((3L, ("rxin", "student", "20")), (7L, ("jgonzal", "postdoc", "22")), (5L, ("franklin", "prof", "24")), (2L, ("istoica", "prof", "26"))))
val relationships2: RDD[Edge[String]] = sc.parallelize(
Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
val defaultUser2 = ("Amy Sun", "aaa", "18")
val graph2 = Graph(users2, relationships2, defaultUser2)
val facts2: RDD[String] = graph2.triplets.map(triplet =>
triplet.srcAttr.toString() + " is the " + triplet.attr + " of " +triplet.dstAttr.toString())
facts2.collect().foreach(println(_))
}
}
上一篇:spring boot 启动遇到报错:Failed to configure a DataSource


下一篇:DevExpress.XtraCharts曲线上的点所对应的坐标值