Pregel API

Pregel API is one of the core backbones of GraphFrames. It is based on the implementation of the Pregel algorithm in terms of relational operations using the Apache Spark DataFrame / Dataset API.

API

For the API details, please refer to:

Arguments

Message Columns

Internally, GraphFrames handles all the messages via a specific column:

val pregelMessageColumn = org.graphframes.lib.Pregel.msg
val pregelMessageColumnName = org.graphframes.lib.Pregel.MSG_COL_NAME

This name is applied to both raw and aggregated messages and should be used in conditions and aggregations. For example, in the simplest implementation of the PageRank algorithm it may look like this:

val aggregateMessagesExpression = sum(Pregel.msg)
val updateExpression = coalesce(Pregel.msg, lit(0.0)) * (1 - alpha) + alpha / numVertices

Triplet Column

At the stage of message generation, GraphFrames internally creates triplets, that contains StructType columns for source vertex, edge and destination vertex. These structs contain all the columns from the original source, destination and edge, including not only initial attributes, but also Pregel columns, like activity column, all the vertex columns, etc. To access these data, user should rely on the following API:

val sourceVertexColumn1 = Pregel.src("vertexColumn1")
val sourceVertexColumn2 = Pregel.src("vertexColumn2")
val destinationVertexColumn1 = Pregel.dst("vertexColumn1")
val edgeWeight = Pregel.edge("weight")

Under the hood, the passed name of the column will be resolved to get the corresponding element of the triplet structs.

Sending Messages

GraphFrames Pregel API support arbitrary number of messages per vertex. Inside the Pregel API graphs are always considered directed. This means that if a vertex has an outgoing edge to another vertex, then the message will be sent to the destination vertex. To emulate the behavior of the undirected graph, the user can send the same message to both the source and the destination vertex.

graph
  .pregel
  .sendMsgToDst(Pregel.src("vertexColumn")) // Sending the vertex column of the destination vertex to the source vertex.
  .sendMsgToSrc(Pregel.dst("vertexColumn")) // Sending the vertex column of the source vertex to the destination vertex.
  .run()

Aggregation

GraphFrames Pregel API requires the user to specify the aggregation function for the messages. It is called over all the messages sent to the vertex no matter the direction.

graph.pregel.aggMsgs(sum(Pregel.msg))

Termination Conditions

GraphFrames Pregel API provides the following termination conditions: