Spark Graphx常用函数

Spark Graph定义

object SparkGraph {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().master("local").appName("Graph").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    //创建Vertices
    //注意元组的顶点的数据类型只能为Long,否则无法创图
    val rddVertice: RDD[(Long, String)] = sc.makeRDD(Seq((1L,"A"),(2L,"B"),(3L,"C")))
    //创建Edge
    val rddEdge: RDD[Edge[String]] = sc.makeRDD(Seq(Edge(1L,2L,"aaa"),Edge(2L,3L,"bbb")))
    //创建图Graph
    val graph1 = Graph(rddVertice,rddEdge)
    }
    //输出graph
    ((1,A),(2,B),aaa)
	((2,B),(3,C),bbb)

常用函数

	//显示图的顶点集合视图
    println("__________verices_______________")
    graph1.vertices.foreach(println)
    /*
    (1,A)
	(3,C)
	(2,B)
    */
    //显示图的边集合视图
    println("__________edge_______________")
    graph1.edges.foreach(println)
    /*
    Edge(1,2,aaa)
	Edge(2,3,bbb)
    */
    //显示图集合视图
    println("__________triplets_______________")
    graph1.triplets.foreach(println)
    /*
    ((1,A),(2,B),aaa)
	((2,B),(3,C),bbb)
    */
    //针对Edge过滤数据
    println("__________filter_______________")
    graph1.edges.filter(x=>x.attr.equals("aaa")).foreach(println)
    /*
    Edge(1,2,aaa)
    */
    //图中边的数量
    println("____________numEdge__________________")
    println(graph1.numEdges)
    /*
    2
    */
    //图中顶点数量
    println("______________vertices______________")
    println(graph1.numVertices)
    /*
    3
    */
    //顶点入度数
    println("______________inDegrees__________________")
    graph1.inDegrees.foreach(println)
    /*
    (3,1)
	(2,1)
    */
    //顶点出度数
    println("_______________outDegrees_________________")
    graph1.outDegrees.foreach(println)
    /*
    (1,1)
	(2,1)
    */
    //顶点出入度之和
    println("______________degrees_____________")
    graph1.degrees.foreach(println)
    /*
    (1,1)
	(3,1)
	(2,2)
    */
    
    //mapVertices的使用有点特别,这个方法虽然传入的是整个顶点数据,但经过操作后的数据结果都只会做为新的value,而不会改变原本key的值
    //def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
    println("_________________mapVertices_______________________")
    //只改变value,使所有顶点value加上”x“
    graph1.mapVertices((x,y)=>(y+"x")).vertices.foreach(println)
    /*
    (1,Ax)
    (3,Cx)
    (2,Bx)
     */
    //将key值加1,value加上”x“
    //这里虽然改变了key值,但最后也会把整体作为新的元组存放在value的位置
    graph1.mapVertices((x,y)=>(x+1,y+"x")).vertices.foreach(println)
    /*
    (1,(2,Ax))
    (3,(4,Cx))
    (2,(3,Bx))
     */

    //mapEdge
    //def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
    //传入的是Edge,经过处理后的的数据结果,返回到的位置是两个顶点的之间的属性
    println("_______________mapEdge_________________________")
    graph1.mapEdges(x=>x.srcId+"/"+x.dstId+"/"+x.attr).edges.foreach(println)
	/*
    Edge(1,2,1/2/aaa)
	Edge(2,3,2/3/bbb)
    */

    //反转
    println("________________reverse_______________________")
    graph1.reverse.triplets.foreach(println)
	/*
    ((2,B),(1,A),aaa)
	((3,C),(2,B),bbb)
    */
    
    //subgraph截取
    //x传入的是每行图数据
    println("_______________subgraph____________________")
    graph1.subgraph(x=>x.attr.equals("aaa")).triplets.foreach(println)
	/*
    ((1,A),(2,B),aaa)
    */
    
    //joinVertice
    //将图里的顶点与另一个RDD顶点进行join,最后将处理过后的值作为顶点新的属性
    println("_______________joinVertice____________________")
    val rddVertice2: RDD[(VertexId, PartitionID)] = sc.makeRDD(Array((1L,1),(2L,2),(3L,3),(4L,4)))
    graph1.joinVertices(rddVertice2)((id,v1,v2)=>(v1+v2)).vertices.foreach(println)
	/*
    (1,A1)
	(3,C3)
	(2,B2)
    */
    
    //outerJoinVertice
    println("________________outerJoinVertice___________________________")
    //定义样例类User,属性有ID,入度数,出度数
    case class User(x:String,inDeg:Int,outDeg:Int)
    //实现将ID与入度数保存在User中,出度数设为0
    graph1.outerJoinVertices(graph1.inDegrees){ case(key,value,inDeg)=>User(value,inDeg.getOrElse(0),0)}
    //在outerJoin顶点出度数
      .outerJoinVertices(graph1.outDegrees){case(key,value,outDeg)=>User(value.x,value.inDeg,outDeg.getOrElse(0))}.vertices.foreach(println)
     /*
    (1,User(A,0,1))
	(3,User(C,1,0))
	(2,User(B,1,1))
    */
  }
上一篇:Spark GraphX图计算结构详解【图构建器、顶点、边】


下一篇:spark graphX作图计算