Packages

package lib

Ordering
  1. Alphabetic
Visibility
  1. Public
  2. Protected

Type Members

  1. class AggregateMessages extends Arguments with Serializable with WithIntermediateStorageLevel with Logging

    This is a primitive for implementing graph algorithms.

    This is a primitive for implementing graph algorithms. This method aggregates messages from the neighboring edges and vertices of each vertex.

    For each triplet (source vertex, edge, destination vertex) in GraphFrame.triplets, this can send a message to the source and/or destination vertices.

    • AggregateMessages.sendToSrc() sends a message to the source vertex of each triplet
    • AggregateMessages.sendToDst() sends a message to the destination vertex of each triplet
    • AggregateMessages.agg specifies an aggregation function for aggregating the messages sent to each vertex. It also runs the aggregation, computing a DataFrame with one row for each vertex which receives > 0 messages. The DataFrame has 2 columns:
      • vertex column ID (named GraphFrame.ID)
      • aggregate from messages sent to vertex (with the name given to the Column specified in AggregateMessages.agg())

    When specifying the messages and aggregation function, the user may reference columns using:

    Note: If you use this operation to write an iterative algorithm, you may want to use checkpoint() (localCheckpoint()) as a workaround for caching issues.

    Example:
    1. We can use this function to compute the in-degree of each vertex

      val g: GraphFrame = Graph.textFile("twittergraph")
      val inDeg: DataFrame =
        g.aggregateMessages().sendToDst(lit(1)).agg(sum(AggregateMessagesBuilder.msg))
  2. class AggregateNeighbors extends Serializable with Logging with WithIntermediateStorageLevel with WithCheckpointInterval with WithLocalCheckpoints

    A class for performing multi-hop neighbor aggregation on a graph.

    A class for performing multi-hop neighbor aggregation on a graph.

    AggregateNeighbors allows you to explore the graph up to a specified number of hops, accumulating values along paths using customizable accumulator expressions. It supports both stopping conditions (when to stop exploring) and target conditions (when to collect a result). If no target condition is provided, collect all traversals that reached stopping condition. The algorithm processes the graph in a breadth‑first manner.

    Use the builder pattern to configure parameters, then call run() to execute.

  3. class BFS extends Arguments with Serializable

    Breadth-first search (BFS)

    Breadth-first search (BFS)

    This method returns a DataFrame of valid shortest paths from vertices matching fromExpr to vertices matching toExpr. If multiple paths are valid and have the same length, the DataFrame will return one Row for each path. If no paths are valid, the DataFrame will be empty. Note: "Shortest" means globally shortest path. I.e., if the shortest path between two vertices matching fromExpr and toExpr is length 5 (edges) but no path is shorter than 5, then all paths returned by BFS will have length 5.

    The returned DataFrame will have the following columns:

    • from start vertex of path
    • e[i] edge i in the path, indexed from 0
    • v[i] intermediate vertex i in the path, indexed from 1
    • to end vertex of path Each of these columns is a StructType whose fields are the same as the columns of GraphFrame.vertices or GraphFrame.edges.

    For example, suppose we have a graph g. Say the vertices DataFrame of g has columns "id" and "job", and the edges DataFrame of g has columns "src", "dst", and "relation".

    // Search from vertex "Joe" to find the closet vertices with attribute job = CEO.
    g.bfs(col("id") === "Joe", col("job") === "CEO").run()

    If we found a path of 3 edges, each row would have columns:

    from | e0 | v1 | e1 | v2 | e2 | to

    In the above row, each vertex column (from, v1, v2, to) would have fields "id" and "job" (just like g.vertices). Each edge column (e0, e1, e2) would have fields "src", "dst", and "relation".

    If there are ties, then each of the equal paths will be returned as a separate Row.

    If one or more vertices match both the from and to conditions, then there is a 0-hop path. The returned DataFrame will have the "from" and "to" columns (as above); however, the "from" and "to" columns will be exactly the same. There will be one row for each vertex in GraphFrame.vertices matching both fromExpr and toExpr.

    Parameters:

    • fromExpr Spark SQL expression specifying valid starting vertices for the BFS. This condition will be matched against each vertex's id or attributes. To start from a specific vertex, this could be "id = [start vertex id]". To start from multiple valid vertices, this can operate on vertex attributes.
    • toExpr Spark SQL expression specifying valid target vertices for the BFS. This condition will be matched against each vertex's id or attributes.
    • maxPathLength Limit on the length of paths. If no valid paths of length <= maxPathLength are found, then the BFS is terminated. (default = 10)
    • edgeFilter Spark SQL expression specifying edges which may be used in the search. This allows the user to disallow crossing certain edges. Such filters can be applied post-hoc after BFS, run specifying the filter here is more efficient.

    Returns:

    • DataFrame of valid shortest paths found in the BFS
  4. class ConnectedComponents extends Arguments with Logging with WithCheckpointInterval with WithBroadcastThreshold with WithIntermediateStorageLevel with WithUseLabelsAsComponents with WithMaxIter with WithLocalCheckpoints

    Connected Components algorithm.

    Connected Components algorithm.

    Computes the connected component membership of each vertex and returns a DataFrame of vertex information with each vertex assigned a component ID.

    The resulting DataFrame contains all the vertex information and one additional column:

    • component (LongType): unique ID for this component
  5. class DetectingCycles extends Arguments with Serializable with Logging with WithIntermediateStorageLevel with WithLocalCheckpoints with WithCheckpointInterval
  6. class KCore extends Serializable with WithIntermediateStorageLevel with WithCheckpointInterval with WithLocalCheckpoints

    K-Core decomposition algorithm implementation for GraphFrames.

    K-Core decomposition algorithm implementation for GraphFrames.

    This object provides the run method to compute the k-core decomposition of a graph, which assigns each vertex the maximum k such that the vertex is part of a k-core. A k-core is a maximal connected subgraph in which every vertex has degree at least k.

    The algorithm is based on the distributed k-core decomposition approach described in:

    Mandal, Aritra, and Mohammad Al Hasan. "A distributed k-core decomposition algorithm on spark." 2017 IEEE International Conference on Big Data (Big Data). IEEE, 2017.

    Edge representation: K-core decomposition is defined for undirected graphs. Since GraphFrames represents edges as directed, each undirected edge {u, v} should be supplied as a single directed edge in either direction — the algorithm symmetrizes internally. Supplying both (u, v) and (v, u) will double-count the edge and produce incorrect results.

  7. class LabelPropagation extends Arguments with WithAlgorithmChoice with WithCheckpointInterval with WithMaxIter with WithLocalCheckpoints with WithIntermediateStorageLevel with Logging

    Run static Label Propagation for detecting communities in networks.

    Run static Label Propagation for detecting communities in networks.

    Each node in the network is initially assigned to its own community. At every iteration, nodes send their community affiliation to all neighbors and update their state to the mode community affiliation of incoming messages.

    LPA is a standard community detection algorithm for graphs. It is very inexpensive computationally, although (1) convergence is not guaranteed and (2) one can end up with trivial solutions (all nodes are identified into a single community).

    The resulting DataFrame contains all the original vertex information and one additional column:

    • label (LongType): label of community affiliation
  8. class MaximalIndependentSet extends Serializable with WithIntermediateStorageLevel with WithCheckpointInterval with WithLocalCheckpoints

    This class implements a distributed algorithm for finding a Maximal Independent Set (MIS) in a graph.

    This class implements a distributed algorithm for finding a Maximal Independent Set (MIS) in a graph.

    An MIS is a set of vertices such that no two vertices in the set are adjacent (i.e., there is no edge between any two vertices in the set), and the set is maximal, meaning that adding any other vertex to the set would violate the independence property. Note that this implementation finds a maximal (but not necessarily maximum) independent set; that is, it ensures no more vertices can be added to the set, but does not guarantee that the set has the largest possible number of vertices among all possible independent sets in the graph.

    The algorithm implemented here is based on the paper: Ghaffari, Mohsen. "An improved distributed algorithm for maximal independent set." Proceedings of the twenty-seventh annual ACM-SIAM symposium on Discrete algorithms. Society for Industrial and Applied Mathematics, 2016.

    Note: This is a randomized, non-deterministic algorithm. The result may vary between runs even if a fixed random seed is provided because how Apache Spark works.

  9. class PageRank extends Arguments with Logging

    PageRank algorithm implementation.

    PageRank algorithm implementation. There are two implementations of PageRank.

    The first one uses the org.apache.spark.graphx.graph interface with aggregateMessages and runs PageRank for a fixed number of iterations. This can be executed by setting maxIter. Conceptually, the algorithm does the following:

    var PR = Array.fill(n)( 1.0 )
    val oldPR = Array.fill(n)( 1.0 )
    for( iter <- 0 until maxIter ) {
      swap(oldPR, PR)
      for( i <- 0 until n ) {
        PR[i] = alpha + (1 - alpha) * inNbrs[i].map(j => oldPR[j] / outDeg[j]).sum
      }
    }

    The second implementation uses the org.apache.spark.graphx.Pregel interface and runs PageRank until convergence and this can be run by setting tol. Conceptually, the algorithm does the following:

    var PR = Array.fill(n)( 1.0 )
    val oldPR = Array.fill(n)( 0.0 )
    while( max(abs(PR - oldPr)) > tol ) {
      swap(oldPR, PR)
      for( i <- 0 until n if abs(PR[i] - oldPR[i]) > tol ) {
        PR[i] = alpha + (1 - \alpha) * inNbrs[i].map(j => oldPR[j] / outDeg[j]).sum
      }
    }

    alpha is the random reset probability (typically 0.15), inNbrs[i] is the set of neighbors which link to i and outDeg[j] is the out degree of vertex j.

    Note that this is not the "normalized" PageRank and as a consequence pages that have no inlinks will have a PageRank of alpha. In particular, the pageranks may have some values greater than 1.

    The resulting vertices DataFrame contains one additional column:

    • pagerank (DoubleType): the pagerank of this vertex

    The resulting edges DataFrame contains one additional column:

    • weight (DoubleType): the normalized weight of this edge after running PageRank
  10. class ParallelPersonalizedPageRank extends Arguments with WithMaxIter with Logging

    Parallel Personalized PageRank algorithm implementation.

    Parallel Personalized PageRank algorithm implementation.

    This implementation uses the standalone GraphFrame interface and runs personalized PageRank in parallel for a fixed number of iterations. This can be run by setting maxIter. The source vertex Ids are set in sourceIds. A simple local implementation of this algorithm is as follows.

    var oldPR = Array.fill(n)( 1.0 )
    val PR = (0 until n).map(i => if sourceIds.contains(i) alpha else 0.0)
    for( iter <- 0 until maxIter ) {
      swap(oldPR, PR)
      for( i <- 0 until n ) {
        PR[i] = (1 - alpha) * inNbrs[i].map(j => oldPR[j] / outDeg[j]).sum
        if (sourceIds.contains(i)) PR[i] += alpha
      }
    }

    alpha is the random reset probability (typically 0.15), inNbrs[i] is the set of neighbors which link to i and outDeg[j] is the out degree of vertex j.

    Note that this is not the "normalized" PageRank and as a consequence pages that have no inlinks will have a PageRank of alpha. In particular, the pageranks may have some values greater than 1.

    The resulting vertices DataFrame contains one additional column:

    • pageranks (VectorType): the pageranks of this vertex from all input source vertices

    The resulting edges DataFrame contains one additional column:

    • weight (DoubleType): the normalized weight of this edge after running PageRank
  11. class Pregel extends Logging with WithLocalCheckpoints with WithIntermediateStorageLevel

    Implements a Pregel-like bulk-synchronous message-passing API based on DataFrame operations.

    Implements a Pregel-like bulk-synchronous message-passing API based on DataFrame operations.

    See Malewicz et al., Pregel: a system for large-scale graph processing for a detailed description of the Pregel algorithm.

    You can construct a Pregel instance using either this constructor or org.graphframes.GraphFrame#pregel, then use builder pattern to describe the operations, and then call run to start a run. It returns a DataFrame of vertices from the last iteration.

    When a run starts, it expands the vertices DataFrame using column expressions defined by withVertexColumn. Those additional vertex properties can be changed during Pregel iterations. In each Pregel iteration, there are three phases:

    • Given each edge triplet, generate messages and specify target vertices to send, described by sendMsgToDst and sendMsgToSrc.
    • Aggregate messages by target vertex IDs, described by aggMsgs.
    • Update additional vertex properties based on aggregated messages and states from previous iteration, described by withVertexColumn.

    Please find what columns you can reference at each phase in the method API docs.

    You can control the number of iterations by setMaxIter and check API docs for advanced controls.

    Example code for Page Rank:

    val edges = ...
    val vertices = GraphFrame.fromEdges(edges).outDegrees.cache()
    val numVertices = vertices.count()
    val graph = GraphFrame(vertices, edges)
    val alpha = 0.15
    val ranks = graph.pregel
      .withVertexColumn("rank", lit(1.0 / numVertices),
        coalesce(Pregel.msg, lit(0.0)) * (1.0 - alpha) + alpha / numVertices)
      .sendMsgToDst(Pregel.src("rank") / Pregel.src("outDegree"))
      .aggMsgs(sum(Pregel.msg))
      .run()
    See also

    org.graphframes.GraphFrame#pregel

    Malewicz et al., Pregel: a system for large-scale graph processing.

  12. class SVDPlusPlus extends Arguments with WithMaxIter with Logging

    Arguments for SVD++ algorithm.

    Arguments for SVD++ algorithm.

    This class implements the SVD++ algorithm for Collaborative Filtering, primarily used for Recommender Systems (Link Prediction).

    Based on the paper "Factorization Meets the Neighborhood: a Multifaceted Collaborative Filtering Model" by Yehuda Koren (2008), available at https://dl.acm.org/citation.cfm?id=1401944.

    Problem Definition

    The algorithm predicts unknown ratings in a user-item system. It accounts for:

    • Explicit preferences (user ratings).
    • Implicit feedback (the history of items a user has interacted with).
    • User and Item biases.

    The prediction rule for a rating r_ui (user u, item i) is:

    r_ui = µ + b_u + b_i + q_i^T * (p_u + |N(u)|^-0.5 * sum(y_j for j in N(u)))

    Where N(u) is the set of items user u has interacted with (implicit feedback).

    Input Requirements

    !!! IMPORTANT !!! The input graph MUST be a **Directed Bipartite Graph** representing interactions:

    • **Vertices**: A mix of Users and Items.
    • **Edges**: Directed strictly from **User (src) -> Item (dst)**.
    • **Edge Attribute**: A numeric column (default "weight") representing the rating.

    DO NOT use this on general/undirected graphs (e.g., social networks), as the algorithm relies on the asymmetry between Users (who provide feedback) and Items (who receive it).

    Output Model (Node Embeddings)

    The algorithm returns a DataFrame of vertices with the trained model parameters. These parameters function as embeddings:

    • column1 (Array[Double]): **Primary Latent Factors (Explicit Embedding)**.
    • For Users: Represents preferences (p_u).
    • For Items: Represents characteristics (q_i).
    • column2 (Array[Double]): **Implicit Factors (Implicit Embedding)**.
    • For Items: Represents the influence of the item (y_i) on a user's profile based on viewing history.
    • For Users: Generally unused/zero.
    • column3 (Double): **Bias**.
    • For Users: User bias (b_u).
    • For Items: Item bias (b_i).
    • column4 (Double): **Implicit Normalization Term**.
    • For Users: Precomputed |N(u)|^-0.5.
    • For Items: Unused.

    Parameter Tuning Guide

    Constraints:

    • minValue / maxValue: Hard bounds for predicted ratings. Predictions outside this range are clipped. Set these to your rating scale limits (e.g., 1.0 and 5.0).

    Learning Rates (Step sizes for Gradient Descent):

    • gamma1: Learning rate for **Biases** (b_u, b_i).
    • gamma2: Learning rate for **Embeddings/Factors** (p_u, q_i, y_j). > Tip: Increase if convergence is too slow. Decrease if the loss explodes (NaN).

    Regularization (Preventing Overfitting):

    • gamma6: Regularization for **Biases**.
    • gamma7: Regularization for **Embeddings/Factors**. > Tip: Increase these if the model performs well on training data but poorly on test data.
  13. class ShortestPaths extends Arguments with WithAlgorithmChoice with WithCheckpointInterval with WithLocalCheckpoints with WithIntermediateStorageLevel with WithDirection

    Computes shortest paths from every vertex to the given set of landmark vertices.

    Computes shortest paths from every vertex to the given set of landmark vertices. Note that this takes edge direction into account.

    The returned DataFrame contains all the original vertex information as well as one additional column:

    • distances (MapType[vertex ID type, IntegerType]): For each vertex v, a map containing the shortest-path distance to each reachable landmark vertex.
  14. class StronglyConnectedComponents extends Arguments with WithMaxIter with Logging

    Compute the strongly connected component (SCC) of each vertex and return a DataFrame with each vertex assigned to the SCC containing that vertex.

    Compute the strongly connected component (SCC) of each vertex and return a DataFrame with each vertex assigned to the SCC containing that vertex.

    The resulting DataFrame contains all the original vertex information and one additional column:

    • component (LongType): unique ID for this component
  15. class TriangleCount extends Arguments with Serializable with WithIntermediateStorageLevel

    Triangle count implementation.

    Triangle count implementation.

    This class provides two algorithms for counting triangles:

    • A direct version that computes exact triangle counts using set intersection of neighbor lists.
    • An approximate version based on the DataSketches library (Theta sketches), which trades off accuracy for performance on large-scale graphs.

    The output DataFrame contains two columns:

    • "id": the vertex id
    • "count": the number of triangles passing through the vertex

Value Members

  1. object AggregateMessages extends Logging with Serializable
  2. object AggregateNeighbors extends Serializable
  3. object ConnectedComponents extends Logging
  4. object DetectingCycles extends Serializable
  5. object KCore extends Serializable with Logging
  6. object MaximalIndependentSet extends Serializable with Logging
  7. object Pregel extends Serializable

    Constants and utilities for the Pregel algorithm.

  8. object SVDPlusPlus

Ungrouped