• Pregel API

    Pregel API

    图本身是递归数据结构,顶点的属性依赖于它们邻居的属性,这些邻居的属性又依赖于自己邻居的属性。所以许多重要的图算法都是迭代的重新计算每个顶点的属性,直到满足某个确定的条件。
    一系列的graph-parallel抽象已经被提出来用来表达这些迭代算法。GraphX公开了一个类似Pregel的操作,它是广泛使用的Pregel和GraphLab抽象的一个融合。

    在GraphX中,更高级的Pregel操作是一个约束到图拓扑的批量同步(bulk-synchronous)并行消息抽象。Pregel操作者执行一系列的超级步骤(super steps),在这些步骤中,顶点从
    之前的超级步骤中接收进入(inbound)消息的总和,为顶点属性计算一个新的值,然后在以后的超级步骤中发送消息到邻居顶点。不像Pregel而更像GraphLab,消息作为一个边三元组的函数被并行
    计算,消息计算既访问了源顶点特征也访问了目的顶点特征。在超级步中,没有收到消息的顶点被跳过。当没有消息遗留时,Pregel操作停止迭代并返回最终的图。

    注意,与更标准的Pregel实现不同的是,GraphX中的顶点仅仅能发送信息给邻居顶点,并利用用户自定义的消息函数构造消息。这些限制允许在GraphX进行额外的优化。

    一下是@pregel[A](A,Int,EdgeDirection"> Pregel操作((VertexId,VD,A)⇒VD,(EdgeTriplet[VD,ED])⇒Iterator[(VertexId,A)],(A,A)⇒A)(ClassTag[A]):Graph[VD,ED])的类型签名以及实现草图(注意,访问graph.cache已经被删除)

    1. class GraphOps[VD, ED] {
    2. def pregel[A]
    3. (initialMsg: A,
    4. maxIter: Int = Int.MaxValue,
    5. activeDir: EdgeDirection = EdgeDirection.Out)
    6. (vprog: (VertexId, VD, A) => VD,
    7. sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
    8. mergeMsg: (A, A) => A)
    9. : Graph[VD, ED] = {
    10. // Receive the initial message at each vertex
    11. var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
    12. // compute the messages
    13. var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
    14. var activeMessages = messages.count()
    15. // Loop until no messages remain or maxIterations is achieved
    16. var i = 0
    17. while (activeMessages > 0 && i < maxIterations) {
    18. // Receive the messages: -----------------------------------------------------------------------
    19. // Run the vertex program on all vertices that receive messages
    20. val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
    21. // Merge the new vertex values back into the graph
    22. g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache()
    23. // Send Messages: ------------------------------------------------------------------------------
    24. // Vertices that didn't receive a message above don't appear in newVerts and therefore don't
    25. // get to send messages. More precisely the map phase of mapReduceTriplets is only invoked
    26. // on edges in the activeDir of vertices in newVerts
    27. messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache()
    28. activeMessages = messages.count()
    29. i += 1
    30. }
    31. g
    32. }
    33. }

    注意,pregel有两个参数列表(graph.pregel(list1)(list2))。第一个参数列表包含配置参数初始消息、最大迭代数、发送消息的边的方向(默认是沿边方向出)。第二个参数列表包含用户
    自定义的函数用来接收消息(vprog)、计算消息(sendMsg)、合并消息(mergeMsg)。

    我们可以用Pregel操作表达计算单源最短路径( single source shortest path)。

    1. import org.apache.spark.graphx._
    2. // Import random graph generation library
    3. import org.apache.spark.graphx.util.GraphGenerators
    4. // A graph with edge attributes containing distances
    5. val graph: Graph[Int, Double] =
    6. GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
    7. val sourceId: VertexId = 42 // The ultimate source
    8. // Initialize the graph such that all vertices except the root have distance infinity.
    9. val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
    10. val sssp = initialGraph.pregel(Double.PositiveInfinity)(
    11. (id, dist, newDist) => math.min(dist, newDist), // Vertex Program
    12. triplet => { // Send Message
    13. if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
    14. Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
    15. } else {
    16. Iterator.empty
    17. }
    18. },
    19. (a,b) => math.min(a,b) // Merge Message
    20. )
    21. println(sssp.vertices.collect.mkString("\n"))