class Pregel extends Logging

Implements a Pregel-like bulk-synchronous message-passing API based on DataFrame operations.

See Malewicz et al., Pregel: a system for large-scale graph processing for a detailed description of the Pregel algorithm.

You can construct a Pregel instance using either this constructor or org.graphframes.GraphFrame#pregel, then use builder pattern to describe the operations, and then call run to start a run. It returns a DataFrame of vertices from the last iteration.

When a run starts, it expands the vertices DataFrame using column expressions defined by withVertexColumn. Those additional vertex properties can be changed during Pregel iterations. In each Pregel iteration, there are three phases:

  • Given each edge triplet, generate messages and specify target vertices to send, described by sendMsgToDst and sendMsgToSrc.
  • Aggregate messages by target vertex IDs, described by aggMsgs.
  • Update additional vertex properties based on aggregated messages and states from previous iteration, described by withVertexColumn.

Please find what columns you can reference at each phase in the method API docs.

You can control the number of iterations by setMaxIter and check API docs for advanced controls.

Example code for Page Rank:

val edges = ...
val vertices = GraphFrame.fromEdges(edges).outDegrees.cache()
val numVertices = vertices.count()
val graph = GraphFrame(vertices, edges)
val alpha = 0.15
val ranks = graph.pregel
  .withVertexColumn("rank", lit(1.0 / numVertices),
    coalesce(Pregel.msg, lit(0.0)) * (1.0 - alpha) + alpha / numVertices)
  .sendMsgToDst(Pregel.src("rank") / Pregel.src("outDegree"))
  .aggMsgs(sum(Pregel.msg))
  .run()
See also

org.graphframes.GraphFrame#pregel

Malewicz et al., Pregel: a system for large-scale graph processing.

Linear Supertypes
Logging, AnyRef, Any
Ordering
  1. Alphabetic
  2. By Inheritance
Inherited
  1. Pregel
  2. Logging
  3. AnyRef
  4. Any
Implicitly
  1. by any2stringadd
  2. by StringFormat
  3. by Ensuring
  4. by ArrowAssoc
  1. Hide All
  2. Show All
Visibility
  1. Public
  2. All

Instance Constructors

  1. new Pregel(graph: GraphFrame)

    graph

    The graph that Pregel will run on.

Value Members

  1. final def !=(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  2. final def ##(): Int
    Definition Classes
    AnyRef → Any
  3. def +(other: String): String
    Implicit
    This member is added by an implicit conversion from Pregel to any2stringadd[Pregel] performed by method any2stringadd in scala.Predef.
    Definition Classes
    any2stringadd
  4. def ->[B](y: B): (Pregel, B)
    Implicit
    This member is added by an implicit conversion from Pregel to ArrowAssoc[Pregel] performed by method ArrowAssoc in scala.Predef.
    Definition Classes
    ArrowAssoc
    Annotations
    @inline()
  5. final def ==(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  6. def aggMsgs(aggExpr: Column): Pregel.this.type

    Defines how messages are aggregated after grouped by target vertex IDs.

    Defines how messages are aggregated after grouped by target vertex IDs.

    aggExpr

    the message aggregation expression, such as sum(Pregel.msg). You can reference the message column by Pregel$#msg and the vertex ID by GraphFrame$#ID, while the latter is usually not used.

  7. final def asInstanceOf[T0]: T0
    Definition Classes
    Any
  8. def clone(): AnyRef
    Attributes
    protected[lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( ... ) @native() @HotSpotIntrinsicCandidate()
  9. def ensuring(cond: (Pregel) ⇒ Boolean, msg: ⇒ Any): Pregel
    Implicit
    This member is added by an implicit conversion from Pregel to Ensuring[Pregel] performed by method Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  10. def ensuring(cond: (Pregel) ⇒ Boolean): Pregel
    Implicit
    This member is added by an implicit conversion from Pregel to Ensuring[Pregel] performed by method Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  11. def ensuring(cond: Boolean, msg: ⇒ Any): Pregel
    Implicit
    This member is added by an implicit conversion from Pregel to Ensuring[Pregel] performed by method Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  12. def ensuring(cond: Boolean): Pregel
    Implicit
    This member is added by an implicit conversion from Pregel to Ensuring[Pregel] performed by method Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  13. final def eq(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  14. def equals(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  15. final def getClass(): Class[_]
    Definition Classes
    AnyRef → Any
    Annotations
    @native() @HotSpotIntrinsicCandidate()
  16. val graph: GraphFrame
  17. def hashCode(): Int
    Definition Classes
    AnyRef → Any
    Annotations
    @native() @HotSpotIntrinsicCandidate()
  18. final def isInstanceOf[T0]: Boolean
    Definition Classes
    Any
  19. def logDebug(s: ⇒ String): Unit
    Attributes
    protected
    Definition Classes
    Logging
  20. def logInfo(s: ⇒ String): Unit
    Attributes
    protected
    Definition Classes
    Logging
  21. def logTrace(s: ⇒ String): Unit
    Attributes
    protected
    Definition Classes
    Logging
  22. def logWarn(s: ⇒ String): Unit
    Attributes
    protected
    Definition Classes
    Logging
  23. final def ne(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  24. final def notify(): Unit
    Definition Classes
    AnyRef
    Annotations
    @native() @HotSpotIntrinsicCandidate()
  25. final def notifyAll(): Unit
    Definition Classes
    AnyRef
    Annotations
    @native() @HotSpotIntrinsicCandidate()
  26. def run(): DataFrame

    Runs the defined Pregel algorithm.

    Runs the defined Pregel algorithm.

    returns

    the result vertex DataFrame from the final iteration including both original and additional columns.

  27. def sendMsgToDst(msgExpr: Column): Pregel.this.type

    Defines a message to send to the destination vertex of each edge triplet.

    Defines a message to send to the destination vertex of each edge triplet.

    You can call it multiple times to send more than one messages.

    msgExpr

    the message expression to send to the destination vertex given a (src, edge, dst) triplet. Source/destination vertex properties and edge properties are nested under columns src, dst, and edge, respectively. You can reference them using Pregel$#src, Pregel$#dst, and Pregel$#edge. Null messages are not included in message aggregation.

    See also

    sendMsgToSrc

  28. def sendMsgToSrc(msgExpr: Column): Pregel.this.type

    Defines a message to send to the source vertex of each edge triplet.

    Defines a message to send to the source vertex of each edge triplet.

    You can call it multiple times to send more than one messages.

    msgExpr

    the expression of the message to send to the source vertex given a (src, edge, dst) triplet. Source/destination vertex properties and edge properties are nested under columns src, dst, and edge, respectively. You can reference them using Pregel$#src, Pregel$#dst, and Pregel$#edge. Null messages are not included in message aggregation.

    See also

    sendMsgToDst

  29. def setCheckpointInterval(value: Int): Pregel.this.type

    Sets the number of iterations between two checkpoints (default: 2).

    Sets the number of iterations between two checkpoints (default: 2).

    This is an advanced control to balance query plan optimization and checkpoint data I/O cost. In most cases, you should keep the default value.

    Checkpoint is disabled if this is set to 0.

  30. def setEarlyStopping(value: Boolean): Pregel.this.type

    Should Pregel stop earlier in case of no new messages to send?

    Should Pregel stop earlier in case of no new messages to send?

    Early stopping allows to terminate Pregel before reaching maxIter by checking is there any non-null message or not. While in some cases it may gain significant performance boost, it other cases it can tend to performance degradation, because checking is messages DataFrame is empty or not is an action and requires materialization of the Spark Plan with some additional computations.

    In the case when user can assume a good value of maxIter it is recommended to leave this value to the default "false". In the case when it is hard to estimate an amount of iterations required for convergence, it is recommended to set this value to "false" to avoid iterating over convergence until reaching maxIter. When this value is "true", maxIter can be set to a bigger value without risks.

    value

    should Pregel checks for the termination condition on each step

  31. def setInitialActiveVertexExpression(expression: Column): Pregel.this.type

    Set the initial expression for the active/non-active flag per vertex.

    Set the initial expression for the active/non-active flag per vertex.

    In most of the cases the default expression (true for all the vertices) should works fine. For some cases it makes sense to set a custom expression. A good example is multiple-landmarks shortest-paths algorithm:

    • the only initially active vertices in that case should be landmarkds, because only this vertices initially have non-null distances but all the other vertices have null distances and there is no reason to mark them active initially.
    expression

    an initial expression that will be used to create an active-flag vertex column

  32. def setMaxIter(value: Int): Pregel.this.type

    Sets the max number of iterations (default: 10).

  33. def setSkipMessagesFromNonActiveVertices(value: Boolean): Pregel.this.type

    With a true value, Pregel will not generate messages from vertices, marked as non active.

    With a true value, Pregel will not generate messages from vertices, marked as non active.

    For example, for Shortest Paths, there is no reason to pass distances from vertices, for that these distances did not change at the latest iteration. It allows significantly reduce an amount of generated messages.

    Be careful, for algorithms like Label Propagation or Pregel, even if the vertex is not active, we still need to generate messages, otherwise algorithm will return an incorrect result!

    value

    should Pregel skip generation of messages for non active vertices.

  34. def setStopIfAllNonActiveVertices(value: Boolean): Pregel.this.type

    Should Pregel stop earlier in case all the vertices are marked as non active.

    Should Pregel stop earlier in case all the vertices are marked as non active.

    This feature allows to terminate Pregel before reaching maxIter by checking are there active vertex left. A good example of activity check is PageRank: (see Malewicz, Grzegorz, et al. "Pregel: a system for large-scale graph processing." Proceedings of the 2010 ACM SIGMOD International Conference on Management of data. 2010., a part about voting to halt)

    • after each iteration we are checking is the change in rank less than tolerance and if so, we can mark vertex as non active
    • if all the vertices are non active, we stop iterations
    value

    should Pregel stop earlier by vertices voting

  35. def setUpdateActiveVertexExpression(expression: Column): Pregel.this.type

    Set an expression that will be used after each superstep to update the active-flag vertex column.

    Set an expression that will be used after each superstep to update the active-flag vertex column.

    An example is PageRank algorithm: in that case such an expression may looks like abs(old_rank - new_rank) >= tolerance

    expression

    an expression, that will be used after each superstep to update the active-flag vertex column

  36. final def synchronized[T0](arg0: ⇒ T0): T0
    Definition Classes
    AnyRef
  37. def toString(): String
    Definition Classes
    AnyRef → Any
  38. final def wait(arg0: Long, arg1: Int): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  39. final def wait(arg0: Long): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws( ... ) @native()
  40. final def wait(): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  41. def withVertexColumn(colName: String, initialExpr: Column, updateAfterAggMsgsExpr: Column): Pregel.this.type

    Defines an additional vertex column at the start of run and how to update it in each iteration.

    Defines an additional vertex column at the start of run and how to update it in each iteration.

    You can call it multiple times to add more than one additional vertex columns.

    colName

    the name of the additional vertex column. It cannot be an existing vertex column in the graph.

    initialExpr

    the expression to initialize the additional vertex column. You can reference all original vertex columns in this expression.

    updateAfterAggMsgsExpr

    the expression to update the additional vertex column after messages aggregation. You can reference all original vertex columns, additional vertex columns, and the aggregated message column using Pregel$#msg. If the vertex received no messages, the message column would be null.

  42. def [B](y: B): (Pregel, B)
    Implicit
    This member is added by an implicit conversion from Pregel to ArrowAssoc[Pregel] performed by method ArrowAssoc in scala.Predef.
    Definition Classes
    ArrowAssoc

Deprecated Value Members

  1. def finalize(): Unit
    Attributes
    protected[lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( classOf[java.lang.Throwable] ) @Deprecated
    Deprecated
  2. def formatted(fmtstr: String): String
    Implicit
    This member is added by an implicit conversion from Pregel to StringFormat[Pregel] performed by method StringFormat in scala.Predef.
    Definition Classes
    StringFormat
    Annotations
    @deprecated @inline()
    Deprecated

    (Since version 2.12.16) Use formatString.format(value) instead of value.formatted(formatString), or use the f"" string interpolator. In Java 15 and later, formatted resolves to the new method in String which has reversed parameters.

Inherited from Logging

Inherited from AnyRef

Inherited from Any

Inherited by implicit conversion any2stringadd from Pregel to any2stringadd[Pregel]

Inherited by implicit conversion StringFormat from Pregel to StringFormat[Pregel]

Inherited by implicit conversion Ensuring from Pregel to Ensuring[Pregel]

Inherited by implicit conversion ArrowAssoc from Pregel to ArrowAssoc[Pregel]

Ungrouped