• 图算法
    • PageRank算法
    • 连通体算法
    • 三角形计数算法

    图算法

    GraphX包括一组图算法来简化分析任务。这些算法包含在org.apache.spark.graphx.lib包中,可以被直接访问。

    PageRank算法

    PageRank度量一个图中每个顶点的重要程度,假定从u到v的一条边代表v的重要性标签。例如,一个Twitter用户被许多其它人粉,该用户排名很高。GraphX带有静态和动态PageRank的实现方法
    ,这些方法在PageRank object中。静态的PageRank运行固定次数
    的迭代,而动态的PageRank一直运行,直到收敛。GraphOps允许直接调用这些算法作为图上的方法。

    GraphX包含一个我们可以运行PageRank的社交网络数据集的例子。用户集在graphx/data/users.txt中,用户之间的关系在graphx/data/followers.txt中。我们通过下面的方法计算
    每个用户的PageRank。

    1. // Load the edges as a graph
    2. val graph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt")
    3. // Run PageRank
    4. val ranks = graph.pageRank(0.0001).vertices
    5. // Join the ranks with the usernames
    6. val users = sc.textFile("graphx/data/users.txt").map { line =>
    7. val fields = line.split(",")
    8. (fields(0).toLong, fields(1))
    9. }
    10. val ranksByUsername = users.join(ranks).map {
    11. case (id, (username, rank)) => (username, rank)
    12. }
    13. // Print the result
    14. println(ranksByUsername.collect().mkString("\n"))

    连通体算法

    连通体算法用id标注图中每个连通体,将连通体中序号最小的顶点的id作为连通体的id。例如,在社交网络中,连通体可以近似为集群。GraphX在ConnectedComponents object
    中包含了一个算法的实现,我们通过下面的方法计算社交网络数据集中的连通体。

    1. / Load the graph as in the PageRank example
    2. val graph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt")
    3. // Find the connected components
    4. val cc = graph.connectedComponents().vertices
    5. // Join the connected components with the usernames
    6. val users = sc.textFile("graphx/data/users.txt").map { line =>
    7. val fields = line.split(",")
    8. (fields(0).toLong, fields(1))
    9. }
    10. val ccByUsername = users.join(cc).map {
    11. case (id, (username, cc)) => (username, cc)
    12. }
    13. // Print the result
    14. println(ccByUsername.collect().mkString("\n"))

    三角形计数算法

    一个顶点有两个相邻的顶点以及相邻顶点之间的边时,这个顶点是一个三角形的一部分。GraphX在TriangleCount object
    中实现了一个三角形计数算法,它计算通过每个顶点的三角形的数量。需要注意的是,在计算社交网络数据集的三角形计数时,TriangleCount需要边的方向是规范的方向(srcId < dstId),
    并且图通过Graph.partitionBy分片过。

    1. // Load the edges in canonical order and partition the graph for triangle count
    2. val graph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt", true).partitionBy(PartitionStrategy.RandomVertexCut)
    3. // Find the triangle count for each vertex
    4. val triCounts = graph.triangleCount().vertices
    5. // Join the triangle counts with the usernames
    6. val users = sc.textFile("graphx/data/users.txt").map { line =>
    7. val fields = line.split(",")
    8. (fields(0).toLong, fields(1))
    9. }
    10. val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
    11. (username, tc)
    12. }
    13. // Print the result
    14. println(triCountByUsername.collect().mkString("\n"))