Traversals and Connectivity

Shortest paths

Computes shortest paths from each vertex to the given set of landmark vertices, where landmarks are specified by the vertex ID. Note that this takes an edge direction into account.

See Wikipedia for a background.


NOTE

Be aware, that returned DataFrame is persistent and should be unpersisted manually after processing to avoid memory leaks!


Python API

For API details, refer to the graphframes.GraphFrame.shortestPaths.

from graphframes.examples import Graphs

g = Graphs(spark).friends()  # Get example graph

results = g.shortestPaths(landmarks=["a", "d"])
results.select("id", "distances").show()

Scala API

For API details, refer to the org.graphframes.lib.ShortestPaths.

import org.graphframes.{examples, GraphFrame}

val g: GraphFrame = examples.Graphs.friends // get example graph

val results = g.shortestPaths.landmarks(Seq("a", "d")).run()
results.select("id", "distances").show()

Arguments

The list (Seq) of vertices that are used as landmarks to compute shortest paths from them to all other vertices.

Possible values are graphx and graphframes. Both implementations are based on the same logic. GraphX is faster for small-medium sized graphs but requires more memory due to less efficient RDD serialization and it's triplets-based nature. GraphFrames requires much less memory due to efficient Thungsten serialization and because the core structures are edges and messages, not triplets.

For graphframes only. To avoid exponential growing of the Spark' Logical Plan, DataFrame lineage and query optimization time, it is required to do checkpointing periodically. While checkpoint itself is not free, it is still recommended to set this value to something less than 5.

For graphframes only. By default, GraphFrames uses persistent checkpoints. They are realiable and reduce the errors rate. The downside of the persistent checkpoints is that they are requiride to set up a checkpointDir in persistent storage like S3 or HDFS. By providing use_local_checkpoints=True, user can say GraphFrames to use local disks of Spark' executurs for checkpointing. Local checkpoints are faster, but they are less reliable: if the executur lost, for example, is taking by the higher priority job, checkpoints will be lost and the whole job fails.

The level of storage for intermediate results and the output DataFrame with components. By default it is memory and disk deserialized as a good balance between performance and reliability. For very big graphs and out-of-core scenarious, using DISK_ONLY may be faster.

By default this is true and algorithm will look for only directed paths. By passing false, graph will be considered as undirected and algorithm will look for any shortest path.

Breadth-first search (BFS)

Breadth-first search (BFS) finds the shortest path(s) from one vertex (or a set of vertices) to another vertex (or a set of vertices). The beginning and end vertices are specified as Spark DataFrame expressions.

See Wikipedia on BFS for more background.

The following code snippets use BFS to find the path between vertex with name "Esther" to a vertex with age < 32.

Python API

For API details, refer to the graphframes.GraphFrame.bfs.

g = Graphs(spark).friends()  # Get example graph

# Search from "Esther" for users of age < 32

paths = g.bfs("name = 'Esther'", "age < 32")
paths.show()

# Specify edge filters or max path lengths

g.bfs("name = 'Esther'", "age < 32",
      edgeFilter="relationship != 'friend'", maxPathLength=3)

Scala API

For API details, refer to org.graphframes.lib.BFS.

import org.graphframes.{examples, GraphFrame}

val g: GraphFrame = examples.Graphs.friends // get example graph

// Search from "Esther" for users of age < 32.
val paths = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32").run()
paths.show()

// Specify edge filters or max path lengths.
val paths = {
  g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32")
    .edgeFilter("relationship != 'friend'")
    .maxPathLength(3).run()
}
paths.show()

Connected components

Computes the connected component membership of each vertex and returns a graph with each vertex assigned a component ID.

See Wikipedia for the background.


NOTE:

With GraphFrames 0.3.0 and later releases, the default Connected Components algorithm requires setting a Spark checkpoint directory. Users can revert to the old algorithm using connectedComponents.setAlgorithm("graphx"). Starting from GraphFrames 0.9.3 release, users can also use localCheckpoints that does not require setting a Spark checkpoint directory. To use localCheckpoints users can set the config spark.graphframes.useLocalCheckpoints to true or use the API connectedComponents.setUseLocalCheckpoints(true). While localCheckpoints provides better performance they are not as reliable as the persistent checkpointing.

NOTE

Be aware, that returned DataFrame is persistent and should be unpersisted manually after processing to avoid memory leaks!


Python API

For API details, refer to the graphframes.GraphFrame.connectedComponents.

from graphframes.examples import Graphs

sc.setCheckpointDir("/tmp/spark-checkpoints")

g = Graphs(spark).friends()  # Get example graph

result = g.connectedComponents()
result.select("id", "component").orderBy("component").show()

Scala API

For API details, refer to the org.graphframes.lib.ConnectedComponents.

import org.graphframes.{examples, GraphFrame}

val g: GraphFrame = examples.Graphs.friends // get example graph

val result = g.connectedComponents.setUseLocalCheckpoints(true).run()
result.select("id", "component").orderBy("component").show()

Arguments

Possible values are graphx and graphframes. GraphX-based implementation is a naive Pregel one-by-one. While it may be slightly faster on small-medium sized graphs, it has a much bigger convergence complexity and requires much more memory due to less efficient RDD serialization. GraphFrame-based implementation is based on the ideas from the Kiveris, Raimondas, et al. "Connected components in mapreduce and beyond." Proceedings of the ACM Symposium on Cloud Computing. 2014.. This implementation has much better convergence complexity as well as requires less amount of memory.

For graphx only. Limit the maximal amount of Pregel iterations. By default it is infinity (Integer.maxValue). It is recommended do not change this value. If the algorithm stucks, it is a problem of the graph, not algorithm.

For graphframes only. To avoid exponential growing of the Spark' Logical Plan, DataFrame lineage and query optimization time, it is required to do checkpointing periodically. While checkpoint itself is not free, it is still recommended to set this value to something less than 5.

For graphframes only. See this section for details.

For graphframes only. In the case, when the type of the input graph vertices is not one of Long, Int, Short, Byte, output labels (components) are a random Long numbers by default. By providing use_labels_as_components=True user can ask GraphFrames to use original vertex labels for output components. In that case, the minimal value of all original IDs will be used for each of found components. This operation is not free and require an additional groupBy + agg + join.

For graphframes only. By default, GraphFrames uses persistent checkpoints. They are realiable and reduce the errors rate. The downside of the persistent checkpoints is that they are requiride to set up a checkpointDir in persistent storage like S3 or HDFS. By providing use_local_checkpoints=True, user can say GraphFrames to use local disks of Spark' executurs for checkpointing. Local checkpoints are faster, but they are less reliable: if the executur lost, for example, is taking by the higher priority job, checkpoints will be lost and the whole job fails.

The level of storage for intermediate results and the output DataFrame with components. By default it is memory and disk deserialized as a good balance between performance and reliability. For very big graphs and out-of-core scenarious, using DISK_ONLY may be faster.

AQE-broadcast mode

Starting from 0.10.0

For graphframes algorithm only. During iterations, this algorithm can generate new edges that may tend to high skewness in joins and aggregates, because some vertices are having a very-high degree. In previous versions of GraphFrames this issue was addressed by manual broadcasting very high-degree nodes. Unfortunately, Apache Spark Adaptive Quey Execution optimization fails on such a case and that was the reason shy AQE was disabled for Connected Components.

In the new versions of GraphFrames (0.10+) there is a way to disable manual broadcasting, enable AQE and allow it to handle skewnewss. To enable this mode, pass -1 to the setBroadcastThreshold. Based on benchmarks, this mode provides about 5x speed-up. It is possible, that in the future releases, the default value of the broadcastThreshold will be changed to -1.

Strongly connected components

Compute the strongly connected component (SCC) of each vertex and return a graph with each vertex assigned to the SCC containing that vertex. At the moment, SCC in GraphFrames is a wrapper around GraphX implementation.

See Wikipedia for the background.


NOTE

Be aware, that returned DataFrame is persistent and should be unpersisted manually after processing to avoid memory leaks!


Python API

For API details, refer to the graphframes.GraphFrame.stronglyConnectedComponents.

from graphframes.examples import Graphs

sc.setCheckpointDir("/tmp/spark-checkpoints")

g = Graphs(spark).friends()  # Get example graph

result = g.stronglyConnectedComponents(maxIter=10)
result.select("id", "component").orderBy("component").show()

Scala API

For API details, refer to the org.graphframes.lib.StronglyConnectedComponents.

import org.graphframes.{examples, GraphFrame}

val g: GraphFrame = examples.Graphs.friends // get example graph

val result = g.stronglyConnectedComponents.maxIter(10).run()
result.select("id", "component").orderBy("component").show()

Triangle count

Computes the number of triangles passing through each vertex.


WARNING!

The current implementation is based on collecting neighbor sets for vertices and compute a pairwise intersection of them. While this works for regular graphs, it will most probably fail on any kind of power-law graphs (graphs with a few very high-degree vertices) or at least will require a lot of memory for Spark Cluster. Consider edge sampling strategies before running the algorithm to get an approximate count of triangles.


Python API

For API details, refer to the graphframes.GraphFrame.triangleCount.

from graphframes.examples import Graphs

g = Graphs(spark).friends()  # Get example graph

results = g.triangleCount()
results.select("id", "count").show()

Scala API

For API details, refer to the org.graphframes.lib.TriangleCount.

import org.graphframes.{examples, GraphFrame}

val g: GraphFrame = examples.Graphs.friends // get example graph

val results = g.triangleCount.run()
results.select("id", "count").show()

Cycles Detection

GraphFrames provides an implementation of the Rocha–Thatte cycle detection algorithm.


NOTE

Be aware, that returned DataFrame is persistent and should be unpersisted manually after processing to avoid memory leaks!

WARNING:


Python API

from graphframes import GraphFrame

vertices = spark.createDataFrame(
    [(1, "a"), (2, "b"), (3, "c"), (4, "d"), (5, "e")], ["id", "attr"]
)
edges = spark.createDataFrame(
    [(1, 2), (2, 3), (3, 1), (1, 4), (2, 5)], ["src", "dst"]
)
graph = GraphFrame(vertices, edges)
res = graph.detectingCycles(
    checkpoint_interval=3,
    use_local_checkpoints=True,
)
res.show(False)

# Output:
# +----+--------------+
# | id | found_cycles |
# +----+--------------+
# |1   |[1, 3, 1]     |
# |1   |[1, 2, 1]     |
# |1   |[1, 2, 5, 1]  |
# |2   |[2, 1, 2]     |
# |2   |[2, 5, 1, 2]  |
# |3   |[3, 1, 3]     |
# |5   |[5, 1, 2, 5]  |
# +----+--------------+

Scala API

import org.graphframes.GraphFrame

val graph = GraphFrame(
  spark
    .createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"), (4L, "d"), (5L, "e")))
    .toDF("id", "attr"),
  spark
    .createDataFrame(Seq((1L, 2L), (2L, 1L), (1L, 3L), (3L, 1L), (2L, 5L), (5L, 1L)))
    .toDF("src", "dst"))
val res = graph.detectingCycles.setUseLocalCheckpoints(true).run()
res.show(false)

// Output:
// +--------------+
// | found_cycles |
// +--------------+
// |[1, 3, 1]     |
// |[1, 2, 1]     |
// |[1, 2, 5, 1]  |

Arguments

For graphframes only. To avoid exponential growing of the Spark' Logical Plan, DataFrame lineage and query optimization time, it is required to do checkpointing periodically. While checkpoint itself is not free, it is still recommended to set this value to something less than 5.

For graphframes only. By default, GraphFrames uses persistent checkpoints. They are realiable and reduce the errors rate. The downside of the persistent checkpoints is that they are requiride to set up a checkpointDir in persistent storage like S3 or HDFS. By providing use_local_checkpoints=True, user can say GraphFrames to use local disks of Spark' executurs for checkpointing. Local checkpoints are faster, but they are less reliable: if the executur lost, for example, is taking by the higher priority job, checkpoints will be lost and the whole job fails.

The level of storage for intermediate results and the output DataFrame with components. By default it is memory and disk deserialized as a good balance between performance and reliability. For very big graphs and out-of-core scenarious, using DISK_ONLY may be faster.