graphframes package

Subpackages

Contents

class graphframes.GraphFrame(v: DataFrame, e: DataFrame)[source]

Represents a graph with vertices and edges stored as DataFrames.

Parameters:
  • vDataFrame holding vertex information. Must contain a column named “id” that stores unique vertex IDs.

  • eDataFrame holding edge information. Must contain two columns “src” and “dst” storing source vertex IDs and destination vertex IDs of edges, respectively.

>>> localVertices = [(1,"A"), (2,"B"), (3, "C")]
>>> localEdges = [(1,2,"love"), (2,1,"hate"), (2,3,"follow")]
>>> v = spark.createDataFrame(localVertices, ["id", "name"])
>>> e = spark.createDataFrame(localEdges, ["src", "dst", "action"])
>>> g = GraphFrame(v, e)
DST: str = 'dst'
EDGE: str = 'edge'
ID: str = 'id'
SRC: str = 'src'
WEIGHT: str = 'weight'
aggregateMessages(aggCol: list[Column | str] | Column, sendToSrc: list[Column | str] | Column | str | None = None, sendToDst: list[Column | str] | Column | str | None = None, intermediate_storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) DataFrame[source]

Aggregates messages from the neighbours.

When specifying the messages and aggregation function, the user may reference columns using the static methods in graphframes.lib.AggregateMessages.

See Scala documentation for more details.

Warning! The result of this method is persisted DataFrame object! Users should handle unpersist to avoid possible memory leaks!

Parameters:
  • aggCol – the requested aggregation output either as a collection of pyspark.sql.Column or SQL expression string

  • sendToSrc – message sent to the source vertex of each triplet either as a collection of pyspark.sql.Column or SQL expression string (default: None)

  • sendToDst – message sent to the destination vertex of each triplet either as collection of pyspark.sql.Column or SQL expression string (default: None)

  • intermediate_storage_level – the level of intermediate storage that will be used for both intermediate result and the output.

Returns:

Persisted DataFrame with columns for the vertex ID and the resulting aggregated message. The name of the resulted message column is based on the alias of the provided aggCol!

aggregate_neighbors(starting_vertices: Column | str, accumulator_names: list[str], accumulator_inits: list[Column | str], accumulator_updates: list[Column | str], max_hops: int = 3, stopping_condition: Column | str | None = None, target_condition: Column | str | None = None, required_vertex_attributes: list[str] | None = None, required_edge_attributes: list[str] | None = None, edge_filter: Column | str | None = None, remove_loops: bool = False, checkpoint_interval: int = 2, use_local_checkpoints: bool = False, storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) DataFrame[source]

Multi-hop neighbor aggregation with customizable accumulators.

AggregateNeighbors allows you to explore the graph up to a specified number of hops, accumulating values along paths using customizable accumulator expressions. It supports both stopping conditions (when to stop exploring) and target conditions (when to collect a result).

Basic Example:

>>> from pyspark.sql import functions as F
>>> g = GraphFrame(vertices, edges)
>>> result = g.aggregate_neighbors(
...     starting_vertices=F.col("id") == 1,
...     max_hops=3,
...     accumulator_names=["path_length"],
...     accumulator_inits=[F.lit(0)],
...     accumulator_updates=[F.col("path_length") + 1],
...     target_condition=AggregateNeighbors.dst_attr("id") == 4
... )

Using Accumulators:

Accumulators track values as the algorithm traverses the graph. Each accumulator has: - A name (becomes a column in the result) - An initial value expression (evaluated on starting vertices) - An update expression (evaluated when traversing each edge)

In update expressions, you can reference: - Source vertex attributes via src_attr("attrName") - Destination vertex attributes via dst_attr("attrName") - Edge attributes via edge_attr("attrName")

Example with Multiple Accumulators:

>>> result = g.aggregate_neighbors(
...     starting_vertices=F.col("id") == 1,
...     max_hops=5,
...     accumulator_names=["sum_values", "product_weights"],
...     accumulator_inits=[F.lit(0), F.lit(1.0)],
...     accumulator_updates=[
...         F.col("sum_values") + AggregateNeighbors.dst_attr("value"),
...         F.col("product_weights") * AggregateNeighbors.edge_attr("weight")
...     ],
...     target_condition=F.col("dst.id") == 10
... )

Stopping vs Target Conditions:

  • stopping_condition: Stops traversal along a path when true. Useful for avoiding cycles or limiting search depth based on custom criteria.

  • target_condition: Marks vertices as results when true. Only accumulators reaching target vertices are returned.

  • If both are provided, only accumulators that reach target_condition are saved.

  • At least one must be provided.

Performance Considerations:

  • Use required_vertex_attributes and required_edge_attributes to limit the columns carried through traversal, reducing memory usage.

  • Use edge_filter to limit traversable edges.

  • Set remove_loops=True to exclude self-loop edges.

  • Use checkpoint_interval to prevent logical plan growth on deep traversals.

  • Be cautious with high max_hops on dense graphs (risk of OOM).

Warning: The result is a persisted DataFrame. Call .unpersist() when done to release memory.

Parameters:
  • starting_vertices – Column expression selecting seed vertices (e.g., F.col("id") == 1)

  • max_hops – Maximum number of hops to explore (positive integer)

  • accumulator_names – List of names for accumulators (become result columns)

  • accumulator_inits – List of initial value expressions for accumulators

  • accumulator_updates – List of update expressions for accumulators

  • stopping_condition – Optional condition to stop traversal along a path

  • target_condition – Optional condition to mark vertices as results

  • required_vertex_attributes – Vertex columns to carry (None = all columns)

  • required_edge_attributes – Edge columns to carry (None = all columns)

  • edge_filter – Optional condition to filter traversable edges

  • remove_loops – If True, exclude self-loop edges (default: False)

  • checkpoint_interval – Checkpoint every N iterations, 0 = disabled (default: 0)

  • use_local_checkpoints – Use local checkpoints (faster but less reliable)

  • storage_level – Storage level for intermediate results

Returns:

DataFrame with columns: id (target vertex), hop (path length), and one column per accumulator with its final value

as_undirected() GraphFrame[source]

Converts the directed graph into an undirected graph by ensuring that all directed edges are bidirectional. For every directed edge (src, dst), a corresponding edge (dst, src) is added.

Returns:

A new GraphFrame representing the undirected graph.

bfs(fromExpr: str, toExpr: str, edgeFilter: str | None = None, maxPathLength: int = 10) DataFrame[source]

Breadth-first search (BFS).

See Scala documentation for more details.

Returns:

DataFrame with one Row for each shortest path between matching vertices.

cache() GraphFrame[source]

Persist the dataframe representation of vertices and edges of the graph with the default storage level.

connectedComponents(algorithm: str = 'graphframes', checkpointInterval: int = 2, broadcastThreshold: int = 1000000, useLabelsAsComponents: bool = False, use_local_checkpoints: bool = False, max_iter: int = 31, storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) DataFrame[source]

Computes the connected components of the graph.

See Scala documentation for more details.

Parameters:
  • algorithm – connected components algorithm to use (default: “graphframes”) Supported algorithms are “two_phase”, “randomized_contraction”, “graphframes” (deprecated alias for “two_phase”) and “graphx”.

  • checkpointInterval – checkpoint interval in terms of number of iterations (default: 2)

  • broadcastThreshold – broadcast threshold in propagating component assignments (default: 1000000). Passing -1 disable manual broadcasting and allows AQE to handle skewed joins. This mode is much faster and is recommended to use. Default value may be changed to -1 in the future versions of GraphFrames.

  • useLabelsAsComponents – if True, uses the vertex labels as components, otherwise will use longs

  • use_local_checkpoints – should local checkpoints be used, default false; local checkpoints are faster and does not require to set a persistent checkpointDir; from the other side, local checkpoints are less reliable and require executors to have big enough local disks.

  • storage_level – storage level for both intermediate and final dataframes.

Returns:

DataFrame with new vertices column “component”

property degrees: DataFrame
The degree of each vertex in the graph, returned as a DataFrame with two columns:
  • “id”: the ID of the vertex

  • ‘degree’ (integer) the degree of the vertex

Note that vertices with 0 edges are not returned in the result.

Returns:

DataFrame with new vertices column “degree”

detectingCycles(checkpoint_interval: int = 2, use_local_checkpoints: bool = False, storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) DataFrame[source]

Find all cycles in the graph.

An implementation of the Rocha–Thatte cycle detection algorithm. Rocha, Rodrigo Caetano, and Bhalchandra D. Thatte. “Distributed cycle detection in large-scale sparse graphs.” Proceedings of Simpósio Brasileiro de Pesquisa Operacional (SBPO’15) (2015): 1-11.

Returns a DataFrame with unique cycles.

Parameters:
  • checkpoint_interval – Pregel checkpoint interval, default is 2

  • use_local_checkpoints – should local checkpoints be used instead of checkpointDir

Storage_level:

the level of storage for both intermediate results and an output DataFrame

Returns:

Persisted DataFrame with all the cycles

dropIsolatedVertices() GraphFrame[source]

Drops isolated vertices, vertices are not contained in any edges.

Returns:

GraphFrame with filtered vertices.

property edges: DataFrame

DataFrame holding edge information, with unique columns “src” and “dst” storing source vertex IDs and destination vertex IDs of edges, respectively.

filterEdges(condition: str | Column) GraphFrame[source]

Filters the edges based on expression, keep all vertices.

Parameters:

condition – String or Column describing the condition expression for filtering.

Returns:

GraphFrame with filtered edges.

filterVertices(condition: str | Column) GraphFrame[source]

Filters the vertices based on expression, remove edges containing any dropped vertices.

Parameters:

condition – String or Column describing the condition expression for filtering.

Returns:

GraphFrame with filtered vertices and edges.

find(pattern: str) DataFrame[source]

Motif finding.

See Scala documentation for more details.

Parameters:

pattern – String describing the motif to search for.

Returns:

DataFrame with one Row for each instance of the motif found

property inDegrees: DataFrame
The in-degree of each vertex in the graph, returned as a DataFame with two columns:
  • “id”: the ID of the vertex

  • “inDegree” (int) storing the in-degree of the vertex

Note that vertices with 0 in-edges are not returned in the result.

Returns:

DataFrame with new vertices column “inDegree”

k_core(checkpoint_interval: int = 2, use_local_checkpoints: bool = False, storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) DataFrame[source]

The k-core is the maximal subgraph such that every vertex has at least degree k. The k-core metric is a measure of the centrality of a node in a network, based on its degree and the degrees of its neighbors. Nodes with higher k-core values are considered to be more central and influential within the network.

This implementation is based on the algorithm described in: Mandal, Aritra, and Mohammad Al Hasan. “A distributed k-core decomposition algorithm on spark.” 2017 IEEE International Conference on Big Data (Big Data). IEEE, 2017.

Parameters:
  • checkpoint_interval – Pregel checkpoint interval, default is 2

  • use_local_checkpoints – should local checkpoints be used instead of checkpointDir

  • storage_level – the level of storage for both intermediate results and an output DataFrame

Returns:

Persisted DataFrame with ID and k-core values (column “kcore”)

labelPropagation(maxIter: int, algorithm: str = 'graphx', use_local_checkpoints: bool = False, checkpoint_interval: int = 2, storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) DataFrame[source]

Runs static label propagation for detecting communities in networks.

See Scala documentation for more details.

Parameters:
  • maxIter – the number of iterations to be performed

  • algorithm – implementation to use, posible values are “graphframes” and “graphx”; “graphx” is faster for small-medium sized graphs, “graphframes” requires less amount of memory

  • use_local_checkpoints – should local checkpoints be used, default false; local checkpoints are faster and does not require to set a persistent checkpointDir; from the other side, local checkpoints are less reliable and require executors to have big enough local disks.

  • storage_level – storage level for both intermediate and final dataframes.

Checkpoint_interval:

How often should the intermediate result be checkpointed; Using big value here may tend to huge logical plan growth due to the iterative nature of the algorithm.

Returns:

Persisted DataFrame with new vertices column “label”

maximal_independent_set(seed: int = 42, checkpoint_interval: int = 2, use_local_checkpoints: bool = False, storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) DataFrame[source]

This method implements a distributed algorithm for finding a Maximal Independent Set (MIS) in a graph.

An MIS is a set of vertices such that no two vertices in the set are adjacent (i.e., there is no edge between any two vertices in the set), and the set is maximal, meaning that adding any other vertex to the set would violate the independence property. Note that this implementation finds a maximal (but not necessarily maximum) independent set; that is, it ensures no more vertices can be added to the set, but does not guarantee that the set has the largest possible number of vertices among all possible independent sets in the graph.

The algorithm implemented here is based on the paper: Ghaffari, Mohsen. “An improved distributed algorithm for maximal independent set.” Proceedings of the twenty-seventh annual ACM-SIAM symposium on Discrete algorithms. Society for Industrial and Applied Mathematics, 2016.

Note: This is a randomized, non-deterministic algorithm. The result may vary between runs even if a fixed random seed is provided because of how Apache Spark works.

Parameters:
  • seed – random seed used for tie-breaking in the algorithm (default: 42)

  • checkpoint_interval – checkpoint interval in terms of number of iterations (default: 2)

  • use_local_checkpoints – whether to use local checkpoints (default: False); local checkpoints are faster and do not require setting a persistent checkpoint directory; however, they are less reliable and require executors to have sufficient local disk space.

  • storage_level – storage level for both intermediate and final DataFrames (default: MEMORY_AND_DISK_DESER)

Returns:

DataFrame with new vertex column “selected”, where “true” indicates the vertex is part of the Maximal Independent Set

property nodes: DataFrame

Alias to vertices.

property outDegrees: DataFrame
The out-degree of each vertex in the graph, returned as a DataFrame with two columns:
  • “id”: the ID of the vertex

  • “outDegree” (integer) storing the out-degree of the vertex

Note that vertices with 0 out-edges are not returned in the result.

Returns:

DataFrame with new vertices column “outDegree”

pageRank(resetProbability: float = 0.15, sourceId: Any | None = None, maxIter: int | None = None, tol: float | None = None) GraphFrame[source]

Runs the PageRank algorithm on the graph. Note: Exactly one of fixed_num_iter or tolerance must be set.

See Scala documentation for more details.

Parameters:
  • resetProbability – Probability of resetting to a random vertex.

  • sourceId – (optional) the source vertex for a personalized PageRank.

  • maxIter – If set, the algorithm is run for a fixed number of iterations. This may not be set if the tol parameter is set.

  • tol – If set, the algorithm is run until the given tolerance. This may not be set if the numIter parameter is set.

Returns:

GraphFrame with new vertices column “pagerank” and new edges column “weight”

parallelPersonalizedPageRank(resetProbability: float = 0.15, sourceIds: list[Any] | None = None, maxIter: int | None = None) GraphFrame[source]

Run the personalized PageRank algorithm on the graph, from the provided list of sources in parallel for a fixed number of iterations.

See Scala documentation for more details.

Parameters:
  • resetProbability – Probability of resetting to a random vertex

  • sourceIds – the source vertices for a personalized PageRank

  • maxIter – the fixed number of iterations this algorithm runs

Returns:

GraphFrame with new vertices column “pageranks” and new edges column “weight”

persist(storageLevel: StorageLevel = StorageLevel(False, True, False, False, 1)) GraphFrame[source]

Persist the dataframe representation of vertices and edges of the graph with the given storage level.

powerIterationClustering(k: int, maxIter: int, weightCol: str | None = None) DataFrame[source]

Power Iteration Clustering (PIC), a scalable graph clustering algorithm developed by Lin and Cohen. From the abstract: PIC finds a very low-dimensional embedding of a dataset using truncated power iteration on a normalized pair-wise similarity matrix of the data.

Parameters:
  • k – the numbers of clusters to create

  • maxIter – param for maximum number of iterations (>= 0)

  • weightCol – optional name of weight column, 1.0 is used if not provided

Returns:

DataFrame with new column “cluster”

property pregel: Pregel

Get the graphframes.classic.pregel.Pregel or :class`graphframes.connect.graphframes_client.Pregel` object for running pregel.

See graphframes.lib.Pregel for more details.

shortestPaths(landmarks: list[str | int], algorithm: str = 'graphx', use_local_checkpoints: bool = False, checkpoint_interval: int = 2, storage_level: StorageLevel = StorageLevel(True, True, False, True, 1), is_directed: bool = True) DataFrame[source]

Runs the shortest path algorithm from a set of landmark vertices in the graph.

See Scala documentation for more details.

Parameters:
  • landmarks – a set of one or more landmarks

  • algorithm – implementation to use, posible values are “graphframes” and “graphx”; “graphx” is faster for small-medium sized graphs, “graphframes” requires less amount of memory

  • use_local_checkpoints – should local checkpoints be used, default false; local checkpoints are faster and does not require to set a persistent checkpointDir; from the other side, local checkpoints are less reliable and require executors to have big enough local disks.

  • checkpoint_interval – How often should the intermediate result be checkpointed; Using big value here may tend to huge logical plan growth due to the iterative nature of the algorithm.

  • storage_level – storage level for both intermediate and final dataframes.

  • is_directed – should algorithm find directed paths or any paths.

Returns:

persistent DataFrame with new vertices column “distances”

stronglyConnectedComponents(maxIter: int) DataFrame[source]

Runs the strongly connected components algorithm on this graph.

See Scala documentation for more details.

Parameters:

maxIter – the number of iterations to run

Returns:

DataFrame with new vertex column “component”

svdPlusPlus(rank: int = 10, maxIter: int = 2, minValue: float = 0.0, maxValue: float = 5.0, gamma1: float = 0.007, gamma2: float = 0.007, gamma6: float = 0.005, gamma7: float = 0.015) tuple[DataFrame, float][source]

Runs the SVD++ algorithm for Collaborative Filtering.

Based on the paper “Factorization Meets the Neighborhood: a Multifaceted Collaborative Filtering Model” by Yehuda Koren (2008).

Algorithm Description SVD++ improves upon standard Matrix Factorization by incorporating implicit feedback (the history of items a user has interacted with) alongside explicit ratings. The prediction rule is: r_ui = µ + b_u + b_i + q_i^T * (p_u + |N(u)|^-0.5 * sum(y_j for j in N(u)))

Input Requirements The input graph must be a Directed Bipartite Graph: - Vertices: A mix of Users and Items. - Edges: Directed strictly from User (src) -> Item (dst). - Edge Attribute: Represents the rating (weight).

Parameters:
  • rank – The number of latent factors (embedding size).

  • maxIter – The maximum number of iterations.

  • minValue – The minimum possible rating value (used for clipping predictions).

  • maxValue – The maximum possible rating value (used for clipping predictions).

  • gamma1 – Learning rate for bias parameters (b_u, b_i).

  • gamma2 – Learning rate for factor parameters (p_u, q_i, y_j).

  • gamma6 – Regularization coefficient for bias parameters.

  • gamma7 – Regularization coefficient for factor parameters.

Returns:

A tuple (v, loss) where: - v is a DataFrame of vertices containing the trained model parameters (embeddings). - loss is the final training loss (double).

Output DataFrame Columns The returned DataFrame v contains the following new columns containing the model parameters:

  • column1 (Array[Double]): Primary Latent Factors (Explicit Embedding).
    • For Users: Preferences vector (p_u).

    • For Items: Characteristics vector (q_i).

  • column2 (Array[Double]): Implicit Factors (Implicit Embedding).
    • For Items: Influence vector (y_i).

    • For Users: Unused/Zero (users aggregate y from neighbors).

  • column3 (Double): Bias term.
    • For Users: User bias (b_u).

    • For Items: Item bias (b_i).

  • column4 (Double): Implicit Normalization term.
    • For Users: Precomputed |N(u)|^-0.5.

    • For Items: Unused.

triangleCount(storage_level: StorageLevel, algorithm: str = 'exact', lg_nom_entries: int = 12) DataFrame[source]

Computes the number of triangles passing through each vertex. This algorithm identifies sets of three vertices where each pair is connected by an edge.

The implementation provides two algorithms: - “exact”: Computes the exact triangle count using set intersection of neighbor lists.

Note: This method can fail or encounter OOM errors on power-law graphs or graphs with very high-degree nodes, as it requires collecting and intersecting the full neighbor lists for the source and destination vertices of every edge.

  • “approx”: Uses DataSketches (Theta sketches) to estimate the triangle count. This trades off perfect accuracy for significantly improved performance and lower memory overhead, making it suitable for large-scale or dense graphs.

Parameters:
  • storage_level – Storage level for caching intermediate DataFrames.

  • algorithm – The triangle counting algorithm to use, “exact” or “approx” (default: “exact”).

  • lg_nom_entries – The log2 of the nominal entries for the Theta sketch (only used if algorithm=”approx”). Higher values increase accuracy at the cost of memory. (default: 12).

Returns:

A DataFrame containing the vertex “id” and the triangle “count”.

property triplets: DataFrame

The triplets (source vertex)-[edge]->(destination vertex) for all edges in the graph.

Returned as a DataFrame with three columns:
  • “src”: source vertex with schema matching ‘vertices’

  • “edge”: edge with schema matching ‘edges’

  • ‘dst’: destination vertex with schema matching ‘vertices’

Returns:

DataFrame with columns ‘src’, ‘edge’, and ‘dst’

type_degree(edge_type_col: str, edge_types: list[Any] | None = None) DataFrame[source]

The total degree of each vertex per edge type (both in and out), returned as a DataFrame with two columns:

  • “id”: the ID of the vertex

  • “degrees”: a struct with a field for each edge type, storing the total degree count

Parameters:
  • edge_type_col – Name of the column in edges DataFrame that contains edge types

  • edge_types – Optional list of edge type values. If None, edge types will be

discovered automatically. :return: DataFrame with columns “id” and “degrees” (struct type)

type_in_degree(edge_type_col: str, edge_types: list[Any] | None = None) DataFrame[source]
The in-degree of each vertex per edge type, returned as a DataFrame with two columns:
  • “id”: the ID of the vertex

  • “inDegrees”: a struct with a field for each edge type, storing the in-degree count

Parameters:
  • edge_type_col – Name of the column in edges DataFrame that contains edge types

  • edge_types – Optional list of edge type values. If None, edge types will be

discovered automatically. :return: DataFrame with columns “id” and “inDegrees” (struct type)

type_out_degree(edge_type_col: str, edge_types: list[Any] | None = None) DataFrame[source]
The out-degree of each vertex per edge type, returned as a DataFrame with two columns:
  • “id”: the ID of the vertex

  • “outDegrees”: a struct with a field for each edge type, storing the out-degree count

Parameters:
  • edge_type_col – Name of the column in edges DataFrame that contains edge types

  • edge_types – Optional list of edge type values. If None, edge types will be

discovered automatically. :return: DataFrame with columns “id” and “outDegrees” (struct type)

unpersist(blocking: bool = False) GraphFrame[source]

Mark the dataframe representation of vertices and edges of the graph as non-persistent, and remove all blocks for it from memory and disk.

validate(check_vertices: bool = True, intermediate_storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) None[source]

Validates the consistency and integrity of a graph by performing checks on the vertices and edges.

Parameters:
  • check_vertices – a flag to indicate whether additional vertex consistency checks should be performed. If true, the method will verify that all vertices in the vertex DataFrame are represented in the edge DataFrame and vice versa. It is slow on big graphs.

  • intermediate_storage_level – the storage level to be used when persisting intermediate DataFrame computations during the validation process.

Returns:

Unit, as the method performs validation checks and throws an exception if validation fails.

Raises:

ValueError – if there are any inconsistencies in the graph, such as duplicate vertices, mismatched vertices between edges and vertex DataFrames or missing connections.

property vertices: DataFrame

DataFrame holding vertex information, with unique column “id” for vertex IDs.