class Pregel extends AnyRef
Implements a Pregel-like bulk-synchronous message-passing API based on DataFrame operations.
See Malewicz et al., Pregel: a system for large-scale graph processing for a detailed description of the Pregel algorithm.
You can construct a Pregel instance using either this constructor or org.graphframes.GraphFrame#pregel, then use builder pattern to describe the operations, and then call run to start a run. It returns a DataFrame of vertices from the last iteration.
When a run starts, it expands the vertices DataFrame using column expressions defined by withVertexColumn. Those additional vertex properties can be changed during Pregel iterations. In each Pregel iteration, there are three phases:
- Given each edge triplet, generate messages and specify target vertices to send, described by sendMsgToDst and sendMsgToSrc.
- Aggregate messages by target vertex IDs, described by aggMsgs.
- Update additional vertex properties based on aggregated messages and states from previous iteration, described by withVertexColumn.
Please find what columns you can reference at each phase in the method API docs.
You can control the number of iterations by setMaxIter and check API docs for advanced controls.
Example code for Page Rank:
val edges = ... val vertices = GraphFrame.fromEdges(edges).outDegrees.cache() val numVertices = vertices.count() val graph = GraphFrame(vertices, edges) val alpha = 0.15 val ranks = graph.pregel .withVertexColumn("rank", lit(1.0 / numVertices), coalesce(Pregel.msg, lit(0.0)) * (1.0 - alpha) + alpha / numVertices) .sendMsgToDst(Pregel.src("rank") / Pregel.src("outDegree")) .aggMsgs(sum(Pregel.msg)) .run()
- Alphabetic
- By Inheritance
- Pregel
- AnyRef
- Any
- by any2stringadd
- by StringFormat
- by Ensuring
- by ArrowAssoc
- Hide All
- Show All
- Public
- All
Instance Constructors
-
new
Pregel(graph: GraphFrame)
- graph
The graph that Pregel will run on.
Value Members
-
final
def
!=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
final
def
##(): Int
- Definition Classes
- AnyRef → Any
- def +(other: String): String
- def ->[B](y: B): (Pregel, B)
-
final
def
==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
def
aggMsgs(aggExpr: Column): Pregel.this.type
Defines how messages are aggregated after grouped by target vertex IDs.
Defines how messages are aggregated after grouped by target vertex IDs.
- aggExpr
the message aggregation expression, such as
sum(Pregel.msg)
. You can reference the message column by Pregel$#msg and the vertex ID by GraphFrame$#ID, while the latter is usually not used.
-
final
def
asInstanceOf[T0]: T0
- Definition Classes
- Any
-
def
clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws( ... ) @native() @HotSpotIntrinsicCandidate()
- def ensuring(cond: (Pregel) ⇒ Boolean, msg: ⇒ Any): Pregel
- def ensuring(cond: (Pregel) ⇒ Boolean): Pregel
- def ensuring(cond: Boolean, msg: ⇒ Any): Pregel
- def ensuring(cond: Boolean): Pregel
-
final
def
eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
-
def
equals(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
final
def
getClass(): Class[_]
- Definition Classes
- AnyRef → Any
- Annotations
- @native() @HotSpotIntrinsicCandidate()
- val graph: GraphFrame
-
def
hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native() @HotSpotIntrinsicCandidate()
-
final
def
isInstanceOf[T0]: Boolean
- Definition Classes
- Any
-
final
def
ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
-
final
def
notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native() @HotSpotIntrinsicCandidate()
-
final
def
notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native() @HotSpotIntrinsicCandidate()
-
def
run(): DataFrame
Runs the defined Pregel algorithm.
Runs the defined Pregel algorithm.
- returns
the result vertex DataFrame from the final iteration including both original and additional columns.
-
def
sendMsgToDst(msgExpr: Column): Pregel.this.type
Defines a message to send to the destination vertex of each edge triplet.
Defines a message to send to the destination vertex of each edge triplet.
You can call it multiple times to send more than one messages.
- msgExpr
the message expression to send to the destination vertex given a (
src
,edge
,dst
) triplet. Source/destination vertex properties and edge properties are nested under columnssrc
,dst
, andedge
, respectively. You can reference them using Pregel$#src, Pregel$#dst, and Pregel$#edge. Null messages are not included in message aggregation.
- See also
-
def
sendMsgToSrc(msgExpr: Column): Pregel.this.type
Defines a message to send to the source vertex of each edge triplet.
Defines a message to send to the source vertex of each edge triplet.
You can call it multiple times to send more than one messages.
- msgExpr
the expression of the message to send to the source vertex given a (src, edge, dst) triplet. Source/destination vertex properties and edge properties are nested under columns
src
,dst
, andedge
, respectively. You can reference them using Pregel$#src, Pregel$#dst, and Pregel$#edge. Null messages are not included in message aggregation.
- See also
-
def
setCheckpointInterval(value: Int): Pregel.this.type
Sets the number of iterations between two checkpoints (default: 2).
Sets the number of iterations between two checkpoints (default: 2).
This is an advanced control to balance query plan optimization and checkpoint data I/O cost. In most cases, you should keep the default value.
Checkpoint is disabled if this is set to 0.
-
def
setEarlyStopping(value: Boolean): Pregel.this.type
Should Pregel stop earlier in case of no new messages to send?
Should Pregel stop earlier in case of no new messages to send?
Early stopping allows to terminate Pregel before reaching maxIter by checking is there any non-null message or not. While in some cases it may gain significant performance boost, it other cases it can tend to performance degradation, because checking is messages DataFrame is empty or not is an action and requires materialization of the Spark Plan with some additional computations.
In the case when user can assume a good value of maxIter it is recommended to leave this value to the default "false". In the case when it is hard to estimate an amount of iterations required for convergence, it is recommended to set this value to "false" to avoid iterating over convergence until reaching maxIter. When this value is "true", maxIter can be set to a bigger value without risks.
- value
should Pregel checks for the termination condition on each step
-
def
setMaxIter(value: Int): Pregel.this.type
Sets the max number of iterations (default: 10).
-
final
def
synchronized[T0](arg0: ⇒ T0): T0
- Definition Classes
- AnyRef
-
def
toString(): String
- Definition Classes
- AnyRef → Any
-
final
def
wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
final
def
wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... ) @native()
-
final
def
wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
def
withVertexColumn(colName: String, initialExpr: Column, updateAfterAggMsgsExpr: Column): Pregel.this.type
Defines an additional vertex column at the start of run and how to update it in each iteration.
Defines an additional vertex column at the start of run and how to update it in each iteration.
You can call it multiple times to add more than one additional vertex columns.
- colName
the name of the additional vertex column. It cannot be an existing vertex column in the graph.
- initialExpr
the expression to initialize the additional vertex column. You can reference all original vertex columns in this expression.
- updateAfterAggMsgsExpr
the expression to update the additional vertex column after messages aggregation. You can reference all original vertex columns, additional vertex columns, and the aggregated message column using Pregel$#msg. If the vertex received no messages, the message column would be null.
- def →[B](y: B): (Pregel, B)
Deprecated Value Members
-
def
finalize(): Unit
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws( classOf[java.lang.Throwable] ) @Deprecated
- Deprecated
-
def
formatted(fmtstr: String): String
- Implicit
- This member is added by an implicit conversion from Pregel to StringFormat[Pregel] performed by method StringFormat in scala.Predef.
- Definition Classes
- StringFormat
- Annotations
- @deprecated @inline()
- Deprecated
(Since version 2.12.16) Use
formatString.format(value)
instead ofvalue.formatted(formatString)
, or use thef""
string interpolator. In Java 15 and later,formatted
resolves to the new method in String which has reversed parameters.