图的算子(属性算子,结构算子,join算子,计算用户粉丝数量,joinVertices和outerJoinVertices的区别)

图的算子(属性算子,结构算子,join算子,计算用户粉丝数量,joinVertices和outerJoinVertices的区别)

一.图的算子

1.属性算子

类似于RDD的map操作

class Graph[VD, ED] {
  def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}

(1)mapVertices

遍历所有的顶点(改变的是顶点的属性)

//生成新的graph:(vertextId,(name,age))=>(vartextId,(vartextId,name))
//方法一
scala> val t1_graph=userCallGraph.mapVertices{case(vertextId,(name,age))=>(vertextId,name)}
t1_graph: org.apache.spark.graphx.Graph[(org.apache.spark.graphx.VertexId, String),Int] = org.apache.spark.graphx.impl.GraphImpl@527afebf

scala> t1_graph.vertices.collect.foreach(println)
(4,(4,David))
(1,(1,Alice))
(6,(6,Fran))
(3,(3,Charlie))
(5,(5,Ed))
(2,(2,Bob))

//方法二
scala> val t2_graph=userCallGraph.mapVertices((id,attr)=>(id,attr._1))
t2_graph: org.apache.spark.graphx.Graph[(org.apache.spark.graphx.VertexId, String),Int] = org.apache.spark.graphx.impl.GraphImpl@7c1958de

scala> t2_graph.vertices.collect.foreach(println)
(4,(4,David))
(1,(1,Alice))
(6,(6,Fran))
(3,(3,Charlie))
(5,(5,Ed))
(2,(2,Bob))

(2)mapEdges

遍历所有的边(改变的是边的属性)

//生成新的graph((scrId,srcAttr),(dstId,dstAttr),attr)=>(scrId,dstId,attr),生成新的图
scala> val t3_graph=userCallGraph.mapEdges(e=>Edge(e.srcId,e.dstId,e.attr*7.0))
t3_graph: org.apache.spark.graphx.Graph[(String, Int),org.apache.spark.graphx.Edge[Double]] = org.apache.spark.graphx.impl.GraphImpl@4777360f


scala> t3_graph.edges.collect.foreach(println)
Edge(2,1,Edge(2,1,49.0))
Edge(2,4,Edge(2,4,14.0))
Edge(3,2,Edge(3,2,28.0))
Edge(3,6,Edge(3,6,21.0))
Edge(4,1,Edge(4,1,7.0))
Edge(5,2,Edge(5,2,14.0))
Edge(5,3,Edge(5,3,56.0))
Edge(5,6,Edge(5,6,21.0))

(3)mapTriplets

遍历所有的三元组(改变的是第三个元素)

//将srcId+10,attr*10生成新的图
scala>  val t3_graph=userCallGraph.mapTriplets(x=>(x.srcId+10,x.attr*10))
t3_graph: org.apache.spark.graphx.Graph[(String, Int),(Long, Int)] = org.apache.spark.graphx.impl.GraphImpl@4f88d48

scala> t3_graph.triplets.collect.foreach(println)
((2,(Bob,27)),(1,(Alice,28)),(12,70))
((2,(Bob,27)),(4,(David,42)),(12,20))
((3,(Charlie,65)),(2,(Bob,27)),(13,40))
((3,(Charlie,65)),(6,(Fran,50)),(13,30))
((4,(David,42)),(1,(Alice,28)),(14,10))
((5,(Ed,55)),(2,(Bob,27)),(15,20))
((5,(Ed,55)),(3,(Charlie,65)),(15,80))
((5,(Ed,55)),(6,(Fran,50)),(15,30))
//将srcId+10,dstId,attr*10生成新的图
scala>  val t3_graph=userCallGraph.mapTriplets(x=>(x.srcId+10,x.dstId,x.attr*10))
t3_graph: org.apache.spark.graphx.Graph[(String, Int),(Long, org.apache.spark.graphx.VertexId, Int)] = org.apache.spark.graphx.impl.GraphImpl@1878e1e7

scala> t3_graph.triplets.collect.foreach(println)
((2,(Bob,27)),(1,(Alice,28)),(12,1,70))
((2,(Bob,27)),(4,(David,42)),(12,4,20))
((3,(Charlie,65)),(2,(Bob,27)),(13,2,40))
((3,(Charlie,65)),(6,(Fran,50)),(13,6,30))
((4,(David,42)),(1,(Alice,28)),(14,1,10))
((5,(Ed,55)),(2,(Bob,27)),(15,2,20))
((5,(Ed,55)),(3,(Charlie,65)),(15,3,80))
((5,(Ed,55)),(6,(Fran,50)),(15,6,30))

2.结构算子

class Graph[VD, ED] {
  def reverse: Graph[VD, ED] //改变边的方向
  def subgraph(
      epred: EdgeTriplet[VD,ED] => Boolean,
      vpred: (VertexId, VD) => Boolean): Graph[VD, ED]     //生成满足顶点与边的条件的子图
  }

(1)reverse

reverse:改变边的方向

scala> userCallGraph.edges.collect.foreach(println)
Edge(2,1,7)
Edge(2,4,2)
Edge(3,2,4)
Edge(3,6,3)
Edge(4,1,1)
Edge(5,2,2)
Edge(5,3,8)
Edge(5,6,3)

scala> val reverseuserCallGraph=userCallGraph.reverse
reverseuserCallGraph: org.apache.spark.graphx.Graph[(String, Int),Int] = org.apache.spark.graphx.impl.GraphImpl@7323e669

scala> reverseuserCallGraph.edges.collect.foreach(println)
Edge(1,2,7)
Edge(1,4,1)
Edge(2,3,4)
Edge(2,5,2)
Edge(3,5,8)
Edge(4,2,2)
Edge(6,3,3)
Edge(6,5,3)

(2)subgraph

subgraph:返回的对象是一个图,图中包含着的顶点和边分别要满足vpred和epred两个函数。

//生成新的图结构为(subgraph in,id,(name,age))
//vpred:改变的是顶点的结构,即年龄小于30都加上subgraph in
scala> val t0_graph=userCallGraph.subgraph(vpred=(id,attr)=>{println("subgraph in",id,attr);attr._2<30})
t0_graph: org.apache.spark.graphx.Graph[(String, Int),Int] = org.apache.spark.graphx.impl.GraphImpl@561a4a53

scala> t0_graph.vertices.collect.foreach(println)
(subgraph in,4,(David,42))
(subgraph in,1,(Alice,28))
(subgraph in,6,(Fran,50))
(subgraph in,3,(Charlie,65))
(subgraph in,5,(Ed,55))
(subgraph in,2,(Bob,27))
(1,(Alice,28))
(2,(Bob,27))

//epred:获取源顶点年纪小于65的子图
scala> userCallGraph.triplets.collect.foreach(println)
((2,(Bob,27)),(1,(Alice,28)),7)
((2,(Bob,27)),(4,(David,42)),2)
((3,(Charlie,65)),(2,(Bob,27)),4)
((3,(Charlie,65)),(6,(Fran,50)),3)
((4,(David,42)),(1,(Alice,28)),1)
((5,(Ed,55)),(2,(Bob,27)),2)
((5,(Ed,55)),(3,(Charlie,65)),8)
((5,(Ed,55)),(6,(Fran,50)),3)


scala> userCallGraph.subgraph(epred=(ep)=>ep.srcAttr._2<65).triplets.collect.foreach(println)
((2,(Bob,27)),(1,(Alice,28)),7)
((2,(Bob,27)),(4,(David,42)),2)
((4,(David,42)),(1,(Alice,28)),1)
((5,(Ed,55)),(2,(Bob,27)),2)
((5,(Ed,55)),(3,(Charlie,65)),8)
((5,(Ed,55)),(6,(Fran,50)),3)

3.Join算子

从外部的RDDs加载数据,修改顶点属性

即表和RDD进行join,通过id进行连接,生成三个元素

顶点对应结构:表(id,属性)

​ RDD(id,其他)

class Graph[VD, ED] {
  def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD): Graph[VD, ED]
  def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
}

(1)joinVertices

//新定义一个RDD
scala> val two =sc.makeRDD(Array((1L,"kgc.cn"),(2L,"qq.com"),(3L,"163.COM"),(7L,"sohu.com")))
two: org.apache.spark.rdd.RDD[(Long, String)] = ParallelCollectionRDD[415] at makeRDD at <console>:30

scala> userCallGraph.vertices.collect.foreach(println)
(4,(David,42))
(1,(Alice,28))
(6,(Fran,50))
(3,(Charlie,65))
(5,(Ed,55))
(2,(Bob,27))

scala> two.collect.foreach(println)
(1,kgc.cn)
(2,qq.com)
(3,163.COM)
(7,sohu.com)

//id不变,将与two对应的id的属性中的名字加上相应后缀
scala> val joinGraph=userCallGraph.joinVertices(two)((id,v,cmpy)=>(v._1+"@"+cmpy,v._2))
joinGraph: org.apache.spark.graphx.Graph[(String, Int),Int] = org.apache.spark.graphx.impl.GraphImpl@2d0023b6

scala> joinGraph.vertices.collect.foreach(println)
(4,(David,42))
(1,(Alice@kgc.cn,28))
(6,(Fran,50))
(3,(Charlie@163.COM,65))
(5,(Ed,55))
(2,(Bob@qq.com,27))

(2)outerJoinVertices

计算用户粉丝量

//定义样例类
case class User(name: String, age: Int, inDeg: Int, outDeg: Int)

//修改顶点属性,将inDeg,outDeg赋值0
val initialUserGraph: Graph[User, Int] =userCallGraph.mapVertices{ case (id, (name, age)) =>User(name, age, 0, 0) }

scala> initialUserGraph.vertices.collect.foreach(println)
(4,User(David,42,0,0))
(1,User(Alice,28,0,0))
(6,User(Fran,50,0,0))
(3,User(Charlie,65,0,0))
(5,User(Ed,55,0,0))
(2,User(Bob,27,0,0))

//将顶点入度、出度存入顶点属性中
scala> val userGraph=initialUserGraph.outerJoinVertices(initialUserGraph.inDegrees){case(id,u,inDegOpt)=>User(u.name,u.age,inDegOpt.getOrElse(0),u.outDeg)}.outerJoinVertices(initialUserGraph.outDegrees){case(id,u,outDegOpt)=>User(u.name,u.age,u.inDeg,outDegOpt.getOrElse(0))}
userGraph: org.apache.spark.graphx.Graph[User,Int] = org.apache.spark.graphx.impl.GraphImpl@6ae8c1f8

scala> userGraph.vertices.collect.foreach(println)
(4,User(David,42,1,1))
(1,User(Alice,28,2,0))
(6,User(Fran,50,2,0))
(3,User(Charlie,65,1,2))
(5,User(Ed,55,0,3))
(2,User(Bob,27,2,2))
//计算用户粉丝数量,顶点的入度为分数数量
scala> for((id,property)<-userGraph.vertices.collect)
     | println(s"User $id is ${property.name} and is liked by ${property.inDeg} people")
User 4 is David and is liked by 1 people
User 1 is Alice and is liked by 2 people
User 6 is Fran and is liked by 2 people
User 3 is Charlie and is liked by 1 people
User 5 is Ed and is liked by 0 people
User 2 is Bob and is liked by 2 people

(3)Spark GraphX关联操作之joinVertices和outerJoinVertices的区别

共同点:

根据给定的另一个图(原图的每个顶点id至多对应此图的的一个顶点id)把原图中的顶点的属性值根据指定的mapFunc函数进行修改,返回一个新图,新图的顶点类型不变

不同点是:

当图中的某个顶点id在另一个图中不存在时,它们的处理不同

joinVertices的操作是会保留原图中该顶点属性的原值

outerJoinVertices操作是使用None值作为该顶点的属性值,所以outerJoinVertices一般使用getOrElse进行将None赋值为默认值

上一篇:[Go] 结构体成员的第三个位置标签的作用


下一篇:python3(九) Section