class Pregel extends Logging
Implements a Pregel-like bulk-synchronous message-passing API based on DataFrame operations.
See Malewicz et al., Pregel: a system for large-scale graph processing for a detailed description of the Pregel algorithm.
You can construct a Pregel instance using either this constructor or org.graphframes.GraphFrame#pregel, then use builder pattern to describe the operations, and then call run to start a run. It returns a DataFrame of vertices from the last iteration.
When a run starts, it expands the vertices DataFrame using column expressions defined by withVertexColumn. Those additional vertex properties can be changed during Pregel iterations. In each Pregel iteration, there are three phases:
- Given each edge triplet, generate messages and specify target vertices to send, described by sendMsgToDst and sendMsgToSrc.
- Aggregate messages by target vertex IDs, described by aggMsgs.
- Update additional vertex properties based on aggregated messages and states from previous iteration, described by withVertexColumn.
Please find what columns you can reference at each phase in the method API docs.
You can control the number of iterations by setMaxIter and check API docs for advanced controls.
Example code for Page Rank:
val edges = ... val vertices = GraphFrame.fromEdges(edges).outDegrees.cache() val numVertices = vertices.count() val graph = GraphFrame(vertices, edges) val alpha = 0.15 val ranks = graph.pregel .withVertexColumn("rank", lit(1.0 / numVertices), coalesce(Pregel.msg, lit(0.0)) * (1.0 - alpha) + alpha / numVertices) .sendMsgToDst(Pregel.src("rank") / Pregel.src("outDegree")) .aggMsgs(sum(Pregel.msg)) .run()
- Alphabetic
- By Inheritance
- Pregel
- Logging
- AnyRef
- Any
- by any2stringadd
- by StringFormat
- by Ensuring
- by ArrowAssoc
- Hide All
- Show All
- Public
- All
Instance Constructors
-
new
Pregel(graph: GraphFrame)
- graph
The graph that Pregel will run on.
Value Members
-
final
def
!=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
final
def
##(): Int
- Definition Classes
- AnyRef → Any
- def +(other: String): String
- def ->[B](y: B): (Pregel, B)
-
final
def
==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
def
aggMsgs(aggExpr: Column): Pregel.this.type
Defines how messages are aggregated after grouped by target vertex IDs.
Defines how messages are aggregated after grouped by target vertex IDs.
- aggExpr
the message aggregation expression, such as
sum(Pregel.msg)
. You can reference the message column by Pregel$#msg and the vertex ID by GraphFrame$#ID, while the latter is usually not used.
-
final
def
asInstanceOf[T0]: T0
- Definition Classes
- Any
-
def
clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws( ... ) @native() @HotSpotIntrinsicCandidate()
- def ensuring(cond: (Pregel) ⇒ Boolean, msg: ⇒ Any): Pregel
- def ensuring(cond: (Pregel) ⇒ Boolean): Pregel
- def ensuring(cond: Boolean, msg: ⇒ Any): Pregel
- def ensuring(cond: Boolean): Pregel
-
final
def
eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
-
def
equals(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
final
def
getClass(): Class[_]
- Definition Classes
- AnyRef → Any
- Annotations
- @native() @HotSpotIntrinsicCandidate()
- val graph: GraphFrame
-
def
hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native() @HotSpotIntrinsicCandidate()
-
final
def
isInstanceOf[T0]: Boolean
- Definition Classes
- Any
-
def
logDebug(s: ⇒ String): Unit
- Attributes
- protected
- Definition Classes
- Logging
-
def
logInfo(s: ⇒ String): Unit
- Attributes
- protected
- Definition Classes
- Logging
-
def
logTrace(s: ⇒ String): Unit
- Attributes
- protected
- Definition Classes
- Logging
-
def
logWarn(s: ⇒ String): Unit
- Attributes
- protected
- Definition Classes
- Logging
-
final
def
ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
-
final
def
notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native() @HotSpotIntrinsicCandidate()
-
final
def
notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native() @HotSpotIntrinsicCandidate()
-
def
run(): DataFrame
Runs the defined Pregel algorithm.
Runs the defined Pregel algorithm.
- returns
the result vertex DataFrame from the final iteration including both original and additional columns.
-
def
sendMsgToDst(msgExpr: Column): Pregel.this.type
Defines a message to send to the destination vertex of each edge triplet.
Defines a message to send to the destination vertex of each edge triplet.
You can call it multiple times to send more than one messages.
- msgExpr
the message expression to send to the destination vertex given a (
src
,edge
,dst
) triplet. Source/destination vertex properties and edge properties are nested under columnssrc
,dst
, andedge
, respectively. You can reference them using Pregel$#src, Pregel$#dst, and Pregel$#edge. Null messages are not included in message aggregation.
- See also
-
def
sendMsgToSrc(msgExpr: Column): Pregel.this.type
Defines a message to send to the source vertex of each edge triplet.
Defines a message to send to the source vertex of each edge triplet.
You can call it multiple times to send more than one messages.
- msgExpr
the expression of the message to send to the source vertex given a (src, edge, dst) triplet. Source/destination vertex properties and edge properties are nested under columns
src
,dst
, andedge
, respectively. You can reference them using Pregel$#src, Pregel$#dst, and Pregel$#edge. Null messages are not included in message aggregation.
- See also
-
def
setCheckpointInterval(value: Int): Pregel.this.type
Sets the number of iterations between two checkpoints (default: 2).
Sets the number of iterations between two checkpoints (default: 2).
This is an advanced control to balance query plan optimization and checkpoint data I/O cost. In most cases, you should keep the default value.
Checkpoint is disabled if this is set to 0.
-
def
setEarlyStopping(value: Boolean): Pregel.this.type
Should Pregel stop earlier in case of no new messages to send?
Should Pregel stop earlier in case of no new messages to send?
Early stopping allows to terminate Pregel before reaching maxIter by checking is there any non-null message or not. While in some cases it may gain significant performance boost, it other cases it can tend to performance degradation, because checking is messages DataFrame is empty or not is an action and requires materialization of the Spark Plan with some additional computations.
In the case when user can assume a good value of maxIter it is recommended to leave this value to the default "false". In the case when it is hard to estimate an amount of iterations required for convergence, it is recommended to set this value to "false" to avoid iterating over convergence until reaching maxIter. When this value is "true", maxIter can be set to a bigger value without risks.
- value
should Pregel checks for the termination condition on each step
-
def
setInitialActiveVertexExpression(expression: Column): Pregel.this.type
Set the initial expression for the active/non-active flag per vertex.
Set the initial expression for the active/non-active flag per vertex.
In most of the cases the default expression (true for all the vertices) should works fine. For some cases it makes sense to set a custom expression. A good example is multiple-landmarks shortest-paths algorithm:
- the only initially active vertices in that case should be landmarkds, because only this vertices initially have non-null distances but all the other vertices have null distances and there is no reason to mark them active initially.
- expression
an initial expression that will be used to create an active-flag vertex column
-
def
setMaxIter(value: Int): Pregel.this.type
Sets the max number of iterations (default: 10).
-
def
setSkipMessagesFromNonActiveVertices(value: Boolean): Pregel.this.type
With a true value, Pregel will not generate messages from vertices, marked as non active.
With a true value, Pregel will not generate messages from vertices, marked as non active.
For example, for Shortest Paths, there is no reason to pass distances from vertices, for that these distances did not change at the latest iteration. It allows significantly reduce an amount of generated messages.
Be careful, for algorithms like Label Propagation or Pregel, even if the vertex is not active, we still need to generate messages, otherwise algorithm will return an incorrect result!
- value
should Pregel skip generation of messages for non active vertices.
-
def
setStopIfAllNonActiveVertices(value: Boolean): Pregel.this.type
Should Pregel stop earlier in case all the vertices are marked as non active.
Should Pregel stop earlier in case all the vertices are marked as non active.
This feature allows to terminate Pregel before reaching maxIter by checking are there active vertex left. A good example of activity check is PageRank: (see Malewicz, Grzegorz, et al. "Pregel: a system for large-scale graph processing." Proceedings of the 2010 ACM SIGMOD International Conference on Management of data. 2010., a part about voting to halt)
- after each iteration we are checking is the change in rank less than tolerance and if so, we can mark vertex as non active
- if all the vertices are non active, we stop iterations
- value
should Pregel stop earlier by vertices voting
-
def
setUpdateActiveVertexExpression(expression: Column): Pregel.this.type
Set an expression that will be used after each superstep to update the active-flag vertex column.
Set an expression that will be used after each superstep to update the active-flag vertex column.
An example is PageRank algorithm: in that case such an expression may looks like abs(old_rank - new_rank) >= tolerance
- expression
an expression, that will be used after each superstep to update the active-flag vertex column
-
final
def
synchronized[T0](arg0: ⇒ T0): T0
- Definition Classes
- AnyRef
-
def
toString(): String
- Definition Classes
- AnyRef → Any
-
final
def
wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
final
def
wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... ) @native()
-
final
def
wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
def
withVertexColumn(colName: String, initialExpr: Column, updateAfterAggMsgsExpr: Column): Pregel.this.type
Defines an additional vertex column at the start of run and how to update it in each iteration.
Defines an additional vertex column at the start of run and how to update it in each iteration.
You can call it multiple times to add more than one additional vertex columns.
- colName
the name of the additional vertex column. It cannot be an existing vertex column in the graph.
- initialExpr
the expression to initialize the additional vertex column. You can reference all original vertex columns in this expression.
- updateAfterAggMsgsExpr
the expression to update the additional vertex column after messages aggregation. You can reference all original vertex columns, additional vertex columns, and the aggregated message column using Pregel$#msg. If the vertex received no messages, the message column would be null.
- def →[B](y: B): (Pregel, B)
Deprecated Value Members
-
def
finalize(): Unit
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws( classOf[java.lang.Throwable] ) @Deprecated
- Deprecated
-
def
formatted(fmtstr: String): String
- Implicit
- This member is added by an implicit conversion from Pregel to StringFormat[Pregel] performed by method StringFormat in scala.Predef.
- Definition Classes
- StringFormat
- Annotations
- @deprecated @inline()
- Deprecated
(Since version 2.12.16) Use
formatString.format(value)
instead ofvalue.formatted(formatString)
, or use thef""
string interpolator. In Java 15 and later,formatted
resolves to the new method in String which has reversed parameters.