class AggregateNeighbors extends Serializable with Logging with WithIntermediateStorageLevel with WithCheckpointInterval with WithLocalCheckpoints
A class for performing multi-hop neighbor aggregation on a graph.
AggregateNeighbors allows you to explore the graph up to a specified number of hops, accumulating values along paths using customizable accumulator expressions. It supports both stopping conditions (when to stop exploring) and target conditions (when to collect a result). If no target condition is provided, collect all traversals that reached stopping condition. The algorithm processes the graph in a breadth‑first manner.
Use the builder pattern to configure parameters, then call run() to execute.
- Alphabetic
- By Inheritance
- AggregateNeighbors
- WithLocalCheckpoints
- WithCheckpointInterval
- WithIntermediateStorageLevel
- Logging
- Serializable
- AnyRef
- Any
- by any2stringadd
- by StringFormat
- by Ensuring
- by ArrowAssoc
- Hide All
- Show All
- Public
- Protected
Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##: Int
- Definition Classes
- AnyRef → Any
- def +(other: String): String
- Implicit
- This member is added by an implicit conversion from AggregateNeighbors toany2stringadd[AggregateNeighbors] performed by method any2stringadd in scala.Predef.
- Definition Classes
- any2stringadd
- def ->[B](y: B): (AggregateNeighbors, B)
- Implicit
- This member is added by an implicit conversion from AggregateNeighbors toArrowAssoc[AggregateNeighbors] performed by method ArrowAssoc in scala.Predef.
- Definition Classes
- ArrowAssoc
- Annotations
- @inline()
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- def addAccumulator(name: String, init: Column, update: Column): AggregateNeighbors.this.type
Adds a single accumulator to those already configured.
Adds a single accumulator to those already configured.
This method can be called multiple times to add several accumulators.
- name
accumulator column name
- init
Column expression for the accumulator's initial value
- update
Column expression that updates the accumulator when traversing an edge
- returns
this AggregateNeighbors instance for method chaining
- final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- val checkpointInterval: Int
- Attributes
- protected
- Definition Classes
- WithCheckpointInterval
- def clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @IntrinsicCandidate() @native()
- def ensuring(cond: (AggregateNeighbors) => Boolean, msg: => Any): AggregateNeighbors
- Implicit
- This member is added by an implicit conversion from AggregateNeighbors toEnsuring[AggregateNeighbors] performed by method Ensuring in scala.Predef.
- Definition Classes
- Ensuring
- def ensuring(cond: (AggregateNeighbors) => Boolean): AggregateNeighbors
- Implicit
- This member is added by an implicit conversion from AggregateNeighbors toEnsuring[AggregateNeighbors] performed by method Ensuring in scala.Predef.
- Definition Classes
- Ensuring
- def ensuring(cond: Boolean, msg: => Any): AggregateNeighbors
- Implicit
- This member is added by an implicit conversion from AggregateNeighbors toEnsuring[AggregateNeighbors] performed by method Ensuring in scala.Predef.
- Definition Classes
- Ensuring
- def ensuring(cond: Boolean): AggregateNeighbors
- Implicit
- This member is added by an implicit conversion from AggregateNeighbors toEnsuring[AggregateNeighbors] performed by method Ensuring in scala.Predef.
- Definition Classes
- Ensuring
- final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- def getCheckpointInterval: Int
Gets checkpoint interval.
Gets checkpoint interval.
- Definition Classes
- WithCheckpointInterval
- final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @IntrinsicCandidate() @native()
- def getIntermediateStorageLevel: StorageLevel
Gets storage level for intermediate datasets that require multiple passes.
Gets storage level for intermediate datasets that require multiple passes.
- Definition Classes
- WithIntermediateStorageLevel
- def getUseLocalCheckpoints: Boolean
Gets whether local checkpoints are being used instead of regular checkpoints.
Gets whether local checkpoints are being used instead of regular checkpoints.
- returns
true if local checkpoints are enabled, false otherwise
- Definition Classes
- WithLocalCheckpoints
- def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @IntrinsicCandidate() @native()
- val intermediateStorageLevel: StorageLevel
- Attributes
- protected
- Definition Classes
- WithIntermediateStorageLevel
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- def logDebug(s: => String): Unit
- Attributes
- protected
- Definition Classes
- Logging
- def logInfo(s: => String): Unit
- Attributes
- protected
- Definition Classes
- Logging
- def logTrace(s: => String): Unit
- Attributes
- protected
- Definition Classes
- Logging
- def logWarn(s: => String): Unit
- Attributes
- protected
- Definition Classes
- Logging
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @IntrinsicCandidate() @native()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @IntrinsicCandidate() @native()
- def resultIsPersistent(): Unit
- Attributes
- protected
- Definition Classes
- Logging
- def run(): DataFrame
Executes the configured neighbor aggregation and returns the result DataFrame.
Executes the configured neighbor aggregation and returns the result DataFrame.
The result contains one row per (starting vertex, target vertex) pair that satisfied either the stopping condition or the target condition. Columns are:
id: the vertex ID of the target (or stopping) vertexhop: the number of hops taken to reach it- one column for each accumulator, holding its final value
- returns
DataFrame with aggregation results
- def setAccumulators(names: Seq[String], inits: Seq[Column], updates: Seq[Column]): AggregateNeighbors.this.type
Defines multiple accumulators used to compute values along each path.
Defines multiple accumulators used to compute values along each path.
Each accumulator has a name, an initial value expression, and an update expression. The update expression is evaluated at each hop and can refer to:
- the accumulator's previous value (using the accumulator's column name)
- source vertex attributes via
srcAttr("attrName") - destination vertex attributes via
dstAttr("attrName") - edge attributes via
edgeAttr("attrName")
- names
sequence of accumulator column names
- inits
sequence of Column expressions that give the initial value for each accumulator (evaluated on starting vertices)
- updates
sequence of Column expressions that define how to update the accumulator when traversing an edge
- returns
this AggregateNeighbors instance for method chaining
- def setCheckpointInterval(value: Int): AggregateNeighbors.this.type
Sets checkpoint interval in terms of number of iterations (default: 2).
Sets checkpoint interval in terms of number of iterations (default: 2). Checkpointing regularly helps recover from failures, clean shuffle files, shorten the lineage of the computation graph, and reduce the complexity of plan optimization. As of Spark 2.0, the complexity of plan optimization would grow exponentially without checkpointing. Hence, disabling or setting longer-than-default checkpoint intervals are not recommended. Checkpoint data is saved under
org.apache.spark.SparkContext.getCheckpointDirwith prefix of the algorithm name. If the checkpoint directory is not set, this throws ajava.io.IOException. Set a nonpositive value to disable checkpointing. This parameter is only used when the algorithm is set to "graphframes". Its default value might change in the future.- Definition Classes
- WithCheckpointInterval
- See also
org.apache.spark.SparkContext.setCheckpointDirin Spark API doc
- def setEdgeFilter(value: Column): AggregateNeighbors.this.type
Filters which edges can be traversed during the aggregation.
Filters which edges can be traversed during the aggregation.
Only edges satisfying the given column expression are considered. The default is
true(all edges are traversable).- value
a Boolean Column expression that filters edges
- returns
this AggregateNeighbors instance for method chaining
- def setIntermediateStorageLevel(value: StorageLevel): AggregateNeighbors.this.type
Sets storage level for intermediate datasets that require multiple passes (default:
).MEMORY_AND_DISKSets storage level for intermediate datasets that require multiple passes (default:
).MEMORY_AND_DISK- Definition Classes
- WithIntermediateStorageLevel
- def setMaxHops(value: Int): AggregateNeighbors.this.type
Sets the maximum number of hops to explore from the starting vertices.
Sets the maximum number of hops to explore from the starting vertices.
The algorithm will stop after this many iterations even if the stopping condition hasn't been reached for all paths. The default is 3. Be aware that high value of max hops on a dense graph will lead to a huge memory load and potential OOM errors.
- value
positive integer for maximum hop count
- returns
this AggregateNeighbors instance for method chaining
- def setRemoveLoops(value: Boolean): AggregateNeighbors.this.type
Controls whether self‑loops (edges where src == dst) are excluded.
Controls whether self‑loops (edges where src == dst) are excluded.
- value
if true, self‑loop edges are filtered out
- returns
this AggregateNeighbors instance for method chaining
- def setRequiredEdgeAttributes(values: Seq[String]): AggregateNeighbors.this.type
Specifies which edge attributes should be carried through the traversal.
Specifies which edge attributes should be carried through the traversal.
By default, all edge columns are carried. Specifying a subset can improve performance by reducing the amount of data shuffled.
- values
sequence of edge column names to keep
- returns
this AggregateNeighbors instance for method chaining
- def setRequiredVertexAttributes(values: Seq[String]): AggregateNeighbors.this.type
Specifies which vertex attributes should be carried through the traversal.
Specifies which vertex attributes should be carried through the traversal.
By default, all vertex columns are carried. Specifying a subset can improve performance by reducing the amount of data shuffled.
- values
sequence of vertex column names to keep
- returns
this AggregateNeighbors instance for method chaining
- def setStartingVertices(value: Column): AggregateNeighbors.this.type
Specifies which vertices to start the aggregation from.
Specifies which vertices to start the aggregation from.
Only vertices satisfying the given column expression will be used as seeds for the traversal. The default is
true(all vertices are seeds).- value
a Boolean Column expression that selects seed vertices
- returns
this AggregateNeighbors instance for method chaining
- def setStoppingCondition(value: Column): AggregateNeighbors.this.type
Sets a condition that, when true for a vertex, stops further exploration along that path.
Sets a condition that, when true for a vertex, stops further exploration along that path.
The column expression can refer to:
- accumulator columns
- source vertex attributes via
srcAttr("attrName") - destination vertex attributes via
dstAttr("attrName") - edge attributes via
edgeAttr("attrName")
Either
setStoppingConditionorsetTargetConditionmust be called. When both are provided only accumulators that reachtargetConditionare saved to results.- value
a Boolean Column expression that triggers stopping
- returns
this AggregateNeighbors instance for method chaining
- def setTargetCondition(value: Column): AggregateNeighbors.this.type
Sets a condition that, when true for a vertex, marks it as a target and saves the accumulator values.
Sets a condition that, when true for a vertex, marks it as a target and saves the accumulator values.
Either
setStoppingConditionorsetTargetConditionmust be called. When both are provided only accumulators that reachtargetConditionare saved to results.- value
a Boolean Column expression that defines a target vertex
- returns
this AggregateNeighbors instance for method chaining
- def setUseLocalCheckpoints(value: Boolean): AggregateNeighbors.this.type
Sets whether to use local checkpoints instead of regular checkpoints (default: false).
Sets whether to use local checkpoints instead of regular checkpoints (default: false). Local checkpoints are faster but less reliable as they don't survive node failures.
- value
true to use local checkpoints, false for regular checkpoints
- returns
this instance
- Definition Classes
- WithLocalCheckpoints
- final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- def toString(): String
- Definition Classes
- AnyRef → Any
- val useLocalCheckpoints: Boolean
- Attributes
- protected
- Definition Classes
- WithLocalCheckpoints
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()
- final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
Deprecated Value Members
- def finalize(): Unit
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.Throwable]) @Deprecated
- Deprecated
(Since version 9)
- def formatted(fmtstr: String): String
- Implicit
- This member is added by an implicit conversion from AggregateNeighbors toStringFormat[AggregateNeighbors] performed by method StringFormat in scala.Predef.
- Definition Classes
- StringFormat
- Annotations
- @deprecated @inline()
- Deprecated
(Since version 2.12.16) Use
formatString.format(value)instead ofvalue.formatted(formatString), or use thef""string interpolator. In Java 15 and later,formattedresolves to the new method in String which has reversed parameters.
- def →[B](y: B): (AggregateNeighbors, B)
- Implicit
- This member is added by an implicit conversion from AggregateNeighbors toArrowAssoc[AggregateNeighbors] performed by method ArrowAssoc in scala.Predef.
- Definition Classes
- ArrowAssoc
- Annotations
- @deprecated
- Deprecated
(Since version 2.13.0) Use
->instead. If you still wish to display it as one character, consider using a font with programming ligatures such as Fira Code.