Spark Graph定义
object SparkGraph {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("local").appName("Graph").getOrCreate()
val sc: SparkContext = spark.sparkContext
//创建Vertices
//注意元组的顶点的数据类型只能为Long,否则无法创图
val rddVertice: RDD[(Long, String)] = sc.makeRDD(Seq((1L,"A"),(2L,"B"),(3L,"C")))
//创建Edge
val rddEdge: RDD[Edge[String]] = sc.makeRDD(Seq(Edge(1L,2L,"aaa"),Edge(2L,3L,"bbb")))
//创建图Graph
val graph1 = Graph(rddVertice,rddEdge)
}
//输出graph
((1,A),(2,B),aaa)
((2,B),(3,C),bbb)
常用函数
//显示图的顶点集合视图
println("__________verices_______________")
graph1.vertices.foreach(println)
/*
(1,A)
(3,C)
(2,B)
*/
//显示图的边集合视图
println("__________edge_______________")
graph1.edges.foreach(println)
/*
Edge(1,2,aaa)
Edge(2,3,bbb)
*/
//显示图集合视图
println("__________triplets_______________")
graph1.triplets.foreach(println)
/*
((1,A),(2,B),aaa)
((2,B),(3,C),bbb)
*/
//针对Edge过滤数据
println("__________filter_______________")
graph1.edges.filter(x=>x.attr.equals("aaa")).foreach(println)
/*
Edge(1,2,aaa)
*/
//图中边的数量
println("____________numEdge__________________")
println(graph1.numEdges)
/*
2
*/
//图中顶点数量
println("______________vertices______________")
println(graph1.numVertices)
/*
3
*/
//顶点入度数
println("______________inDegrees__________________")
graph1.inDegrees.foreach(println)
/*
(3,1)
(2,1)
*/
//顶点出度数
println("_______________outDegrees_________________")
graph1.outDegrees.foreach(println)
/*
(1,1)
(2,1)
*/
//顶点出入度之和
println("______________degrees_____________")
graph1.degrees.foreach(println)
/*
(1,1)
(3,1)
(2,2)
*/
//mapVertices的使用有点特别,这个方法虽然传入的是整个顶点数据,但经过操作后的数据结果都只会做为新的value,而不会改变原本key的值
//def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
println("_________________mapVertices_______________________")
//只改变value,使所有顶点value加上”x“
graph1.mapVertices((x,y)=>(y+"x")).vertices.foreach(println)
/*
(1,Ax)
(3,Cx)
(2,Bx)
*/
//将key值加1,value加上”x“
//这里虽然改变了key值,但最后也会把整体作为新的元组存放在value的位置
graph1.mapVertices((x,y)=>(x+1,y+"x")).vertices.foreach(println)
/*
(1,(2,Ax))
(3,(4,Cx))
(2,(3,Bx))
*/
//mapEdge
//def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
//传入的是Edge,经过处理后的的数据结果,返回到的位置是两个顶点的之间的属性
println("_______________mapEdge_________________________")
graph1.mapEdges(x=>x.srcId+"/"+x.dstId+"/"+x.attr).edges.foreach(println)
/*
Edge(1,2,1/2/aaa)
Edge(2,3,2/3/bbb)
*/
//反转
println("________________reverse_______________________")
graph1.reverse.triplets.foreach(println)
/*
((2,B),(1,A),aaa)
((3,C),(2,B),bbb)
*/
//subgraph截取
//x传入的是每行图数据
println("_______________subgraph____________________")
graph1.subgraph(x=>x.attr.equals("aaa")).triplets.foreach(println)
/*
((1,A),(2,B),aaa)
*/
//joinVertice
//将图里的顶点与另一个RDD顶点进行join,最后将处理过后的值作为顶点新的属性
println("_______________joinVertice____________________")
val rddVertice2: RDD[(VertexId, PartitionID)] = sc.makeRDD(Array((1L,1),(2L,2),(3L,3),(4L,4)))
graph1.joinVertices(rddVertice2)((id,v1,v2)=>(v1+v2)).vertices.foreach(println)
/*
(1,A1)
(3,C3)
(2,B2)
*/
//outerJoinVertice
println("________________outerJoinVertice___________________________")
//定义样例类User,属性有ID,入度数,出度数
case class User(x:String,inDeg:Int,outDeg:Int)
//实现将ID与入度数保存在User中,出度数设为0
graph1.outerJoinVertices(graph1.inDegrees){ case(key,value,inDeg)=>User(value,inDeg.getOrElse(0),0)}
//在outerJoin顶点出度数
.outerJoinVertices(graph1.outDegrees){case(key,value,outDeg)=>User(value.x,value.inDeg,outDeg.getOrElse(0))}.vertices.foreach(println)
/*
(1,User(A,0,1))
(3,User(C,1,0))
(2,User(B,1,1))
*/
}