一.简介
参考:https://www.cnblogs.com/yszd/p/10186556.html
二.代码实现
package big.data.analyse.graphx import org.apache.log4j.{Level, Logger}
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession class VertexProperty()
case class UserProperty(val name: String) extends VertexProperty
case class ProductProperty(val name: String, val price: Double) extends VertexProperty /*class Graph[VD, ED]{
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
}*/ /**
* Created by zhen on 2019/10/4.
*/
object GraphXTest {
/**
* 设置日志级别
*/
Logger.getLogger("org").setLevel(Level.WARN)
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("GraphXTest").master("local[2]").getOrCreate()
val sc = spark.sparkContext
/**
* 创建vertices的RDD
*/
val users : RDD[(VertexId, (String, String))] = sc.parallelize(
Array((3L, ("Spark", "GraphX")), (7L, ("Hadoop", "Java")),
(5L, ("HBase", "Mysql")), (2L, ("Hive", "Mysql")))) /**
* 创建edges的RDD
*/
val relationships: RDD[Edge[String]] = sc.parallelize(
Array(Edge(3L, 7L, "Fast"), Edge(5L, 3L, "Relation"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "colleague"))) /**
* 定义默认用户
*/
val defualtUser = ("Machical", "Missing") /**
* 构建初始化图
*/
val graph = Graph(users, relationships, defualtUser) /**
* 使用三元组视图呈现顶点之间关系
*/
val facts : RDD[String] = graph.triplets.map(triplet =>
triplet.srcAttr._1 + " is the " + triplet.attr + " with " + triplet.dstAttr._1)
facts.collect().foreach(println) graph.vertices.foreach(println) //顶点
graph.edges.foreach(println) //边
graph.ops.degrees.foreach(println) // 各顶点的度
graph.triplets.foreach(println) // 顶点,边,关系
println(graph.ops.numEdges) // 边的数量
println(graph.ops.numVertices) // 顶点的数量
}
}
三.结果
1.三元组视图
2.顶点
3.边
4.各顶点的度
5.三元组视图
6.边/顶点数量
四.源码分析
class Graph[VD, ED] {
// Information about the Graph
val numEdges: Long
val numVertices:Long
val inDegrees: VertexRDD[Int]
val outDegrees: VertexRDD[Int]
val degrees: VertexRDD[Int]
// Views of the graph as collections
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
val triplets: RDD[EdgeTriplet[VD,ED]]
//Functions for caching graphs
def persist(newLevel1:StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]//默认存储级别为MEMORY_ONLY
def cache(): Graph[VD, ED]
def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] // Change the partitioning heuristic
def partitionBy(partitionStrategy: PartitionStrategy) // Transform vertex and edge attributes
def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): Graph[VD, ED2] // Modify the graph structure
def reverse: Graph[VD, ED]
def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED] // 返回当前图和其它图的公共子图
def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED] // Join RDDs with the graph
def joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
def outerJoinVertices[U, VD2](other: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, Option[U]))
// Aggregate information about adjacent triplets
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
def aggregateMessages[Msg: ClassTag](sendMsg: EdgeContext[VD, ED, Msg] => Unit, merageMsg: (Msg, Msg) => Msg, tripletFields: TripletFields: TripletFields = TripletFields.All): VertexRDD[A]
//Iterative graph-parallel computation
def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDiection)(vprog: (VertexId, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], mergeMsg: (A, A) => A): Graph[VD, ED]
// Basic graph algorithms
def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
def connectedComponents(): Graph[VertexId, ED]
def triangleCount(): Graph[Int, ED]
def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
}