graphframes package¶
Subpackages¶
Contents¶
- class graphframes.GraphFrame(v: DataFrame, e: DataFrame)[source]¶
Represents a graph with vertices and edges stored as DataFrames.
- Parameters:
v –
DataFrameholding vertex information. Must contain a column named “id” that stores unique vertex IDs.e –
DataFrameholding edge information. Must contain two columns “src” and “dst” storing source vertex IDs and destination vertex IDs of edges, respectively.
>>> localVertices = [(1,"A"), (2,"B"), (3, "C")] >>> localEdges = [(1,2,"love"), (2,1,"hate"), (2,3,"follow")] >>> v = spark.createDataFrame(localVertices, ["id", "name"]) >>> e = spark.createDataFrame(localEdges, ["src", "dst", "action"]) >>> g = GraphFrame(v, e)
- DST: str = 'dst'¶
- EDGE: str = 'edge'¶
- ID: str = 'id'¶
- SRC: str = 'src'¶
- WEIGHT: str = 'weight'¶
- aggregateMessages(aggCol: list[Column | str] | Column, sendToSrc: list[Column | str] | Column | str | None = None, sendToDst: list[Column | str] | Column | str | None = None, intermediate_storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) DataFrame[source]¶
Aggregates messages from the neighbours.
When specifying the messages and aggregation function, the user may reference columns using the static methods in
graphframes.lib.AggregateMessages.See Scala documentation for more details.
Warning! The result of this method is persisted DataFrame object! Users should handle unpersist to avoid possible memory leaks!
- Parameters:
aggCol – the requested aggregation output either as a collection of
pyspark.sql.Columnor SQL expression stringsendToSrc – message sent to the source vertex of each triplet either as a collection of
pyspark.sql.Columnor SQL expression string (default: None)sendToDst – message sent to the destination vertex of each triplet either as collection of
pyspark.sql.Columnor SQL expression string (default: None)intermediate_storage_level – the level of intermediate storage that will be used for both intermediate result and the output.
- Returns:
Persisted DataFrame with columns for the vertex ID and the resulting aggregated message. The name of the resulted message column is based on the alias of the provided aggCol!
- aggregate_neighbors(starting_vertices: Column | str, accumulator_names: list[str], accumulator_inits: list[Column | str], accumulator_updates: list[Column | str], max_hops: int = 3, stopping_condition: Column | str | None = None, target_condition: Column | str | None = None, required_vertex_attributes: list[str] | None = None, required_edge_attributes: list[str] | None = None, edge_filter: Column | str | None = None, remove_loops: bool = False, checkpoint_interval: int = 2, use_local_checkpoints: bool = False, storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) DataFrame[source]¶
Multi-hop neighbor aggregation with customizable accumulators.
AggregateNeighbors allows you to explore the graph up to a specified number of hops, accumulating values along paths using customizable accumulator expressions. It supports both stopping conditions (when to stop exploring) and target conditions (when to collect a result).
Basic Example:
>>> from pyspark.sql import functions as F >>> g = GraphFrame(vertices, edges) >>> result = g.aggregate_neighbors( ... starting_vertices=F.col("id") == 1, ... max_hops=3, ... accumulator_names=["path_length"], ... accumulator_inits=[F.lit(0)], ... accumulator_updates=[F.col("path_length") + 1], ... target_condition=AggregateNeighbors.dst_attr("id") == 4 ... )
Using Accumulators:
Accumulators track values as the algorithm traverses the graph. Each accumulator has: - A name (becomes a column in the result) - An initial value expression (evaluated on starting vertices) - An update expression (evaluated when traversing each edge)
In update expressions, you can reference: - Source vertex attributes via
src_attr("attrName")- Destination vertex attributes viadst_attr("attrName")- Edge attributes viaedge_attr("attrName")Example with Multiple Accumulators:
>>> result = g.aggregate_neighbors( ... starting_vertices=F.col("id") == 1, ... max_hops=5, ... accumulator_names=["sum_values", "product_weights"], ... accumulator_inits=[F.lit(0), F.lit(1.0)], ... accumulator_updates=[ ... F.col("sum_values") + AggregateNeighbors.dst_attr("value"), ... F.col("product_weights") * AggregateNeighbors.edge_attr("weight") ... ], ... target_condition=F.col("dst.id") == 10 ... )
Stopping vs Target Conditions:
stopping_condition: Stops traversal along a path when true. Useful for avoiding cycles or limiting search depth based on custom criteria.target_condition: Marks vertices as results when true. Only accumulators reaching target vertices are returned.If both are provided, only accumulators that reach
target_conditionare saved.At least one must be provided.
Performance Considerations:
Use
required_vertex_attributesandrequired_edge_attributesto limit the columns carried through traversal, reducing memory usage.Use
edge_filterto limit traversable edges.Set
remove_loops=Trueto exclude self-loop edges.Use
checkpoint_intervalto prevent logical plan growth on deep traversals.Be cautious with high
max_hopson dense graphs (risk of OOM).
Warning: The result is a persisted DataFrame. Call
.unpersist()when done to release memory.- Parameters:
starting_vertices – Column expression selecting seed vertices (e.g.,
F.col("id") == 1)max_hops – Maximum number of hops to explore (positive integer)
accumulator_names – List of names for accumulators (become result columns)
accumulator_inits – List of initial value expressions for accumulators
accumulator_updates – List of update expressions for accumulators
stopping_condition – Optional condition to stop traversal along a path
target_condition – Optional condition to mark vertices as results
required_vertex_attributes – Vertex columns to carry (None = all columns)
required_edge_attributes – Edge columns to carry (None = all columns)
edge_filter – Optional condition to filter traversable edges
remove_loops – If True, exclude self-loop edges (default: False)
checkpoint_interval – Checkpoint every N iterations, 0 = disabled (default: 0)
use_local_checkpoints – Use local checkpoints (faster but less reliable)
storage_level – Storage level for intermediate results
- Returns:
DataFrame with columns:
id(target vertex),hop(path length), and one column per accumulator with its final value
- as_undirected() GraphFrame[source]¶
Converts the directed graph into an undirected graph by ensuring that all directed edges are bidirectional. For every directed edge (src, dst), a corresponding edge (dst, src) is added.
- Returns:
A new GraphFrame representing the undirected graph.
- bfs(fromExpr: str, toExpr: str, edgeFilter: str | None = None, maxPathLength: int = 10) DataFrame[source]¶
Breadth-first search (BFS).
See Scala documentation for more details.
- Returns:
DataFrame with one Row for each shortest path between matching vertices.
- cache() GraphFrame[source]¶
Persist the dataframe representation of vertices and edges of the graph with the default storage level.
- connectedComponents(algorithm: str = 'graphframes', checkpointInterval: int = 2, broadcastThreshold: int = 1000000, useLabelsAsComponents: bool = False, use_local_checkpoints: bool = False, max_iter: int = 31, storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) DataFrame[source]¶
Computes the connected components of the graph.
See Scala documentation for more details.
- Parameters:
algorithm – connected components algorithm to use (default: “graphframes”) Supported algorithms are “two_phase”, “randomized_contraction”, “graphframes” (deprecated alias for “two_phase”) and “graphx”.
checkpointInterval – checkpoint interval in terms of number of iterations (default: 2)
broadcastThreshold – broadcast threshold in propagating component assignments (default: 1000000). Passing -1 disable manual broadcasting and allows AQE to handle skewed joins. This mode is much faster and is recommended to use. Default value may be changed to -1 in the future versions of GraphFrames.
useLabelsAsComponents – if True, uses the vertex labels as components, otherwise will use longs
use_local_checkpoints – should local checkpoints be used, default false; local checkpoints are faster and does not require to set a persistent checkpointDir; from the other side, local checkpoints are less reliable and require executors to have big enough local disks.
storage_level – storage level for both intermediate and final dataframes.
- Returns:
DataFrame with new vertices column “component”
- property degrees: DataFrame¶
- The degree of each vertex in the graph, returned as a DataFrame with two columns:
“id”: the ID of the vertex
‘degree’ (integer) the degree of the vertex
Note that vertices with 0 edges are not returned in the result.
- Returns:
DataFrame with new vertices column “degree”
- detectingCycles(checkpoint_interval: int = 2, use_local_checkpoints: bool = False, storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) DataFrame[source]¶
Find all cycles in the graph.
An implementation of the Rocha–Thatte cycle detection algorithm. Rocha, Rodrigo Caetano, and Bhalchandra D. Thatte. “Distributed cycle detection in large-scale sparse graphs.” Proceedings of Simpósio Brasileiro de Pesquisa Operacional (SBPO’15) (2015): 1-11.
Returns a DataFrame with unique cycles.
- Parameters:
checkpoint_interval – Pregel checkpoint interval, default is 2
use_local_checkpoints – should local checkpoints be used instead of checkpointDir
- Storage_level:
the level of storage for both intermediate results and an output DataFrame
- Returns:
Persisted DataFrame with all the cycles
- dropIsolatedVertices() GraphFrame[source]¶
Drops isolated vertices, vertices are not contained in any edges.
- Returns:
GraphFrame with filtered vertices.
- property edges: DataFrame¶
DataFrameholding edge information, with unique columns “src” and “dst” storing source vertex IDs and destination vertex IDs of edges, respectively.
- filterEdges(condition: str | Column) GraphFrame[source]¶
Filters the edges based on expression, keep all vertices.
- Parameters:
condition – String or Column describing the condition expression for filtering.
- Returns:
GraphFrame with filtered edges.
- filterVertices(condition: str | Column) GraphFrame[source]¶
Filters the vertices based on expression, remove edges containing any dropped vertices.
- Parameters:
condition – String or Column describing the condition expression for filtering.
- Returns:
GraphFrame with filtered vertices and edges.
- find(pattern: str) DataFrame[source]¶
Motif finding.
See Scala documentation for more details.
- Parameters:
pattern – String describing the motif to search for.
- Returns:
DataFrame with one Row for each instance of the motif found
- property inDegrees: DataFrame¶
- The in-degree of each vertex in the graph, returned as a DataFame with two columns:
“id”: the ID of the vertex
“inDegree” (int) storing the in-degree of the vertex
Note that vertices with 0 in-edges are not returned in the result.
- Returns:
DataFrame with new vertices column “inDegree”
- k_core(checkpoint_interval: int = 2, use_local_checkpoints: bool = False, storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) DataFrame[source]¶
The k-core is the maximal subgraph such that every vertex has at least degree k. The k-core metric is a measure of the centrality of a node in a network, based on its degree and the degrees of its neighbors. Nodes with higher k-core values are considered to be more central and influential within the network.
This implementation is based on the algorithm described in: Mandal, Aritra, and Mohammad Al Hasan. “A distributed k-core decomposition algorithm on spark.” 2017 IEEE International Conference on Big Data (Big Data). IEEE, 2017.
- Parameters:
checkpoint_interval – Pregel checkpoint interval, default is 2
use_local_checkpoints – should local checkpoints be used instead of checkpointDir
storage_level – the level of storage for both intermediate results and an output DataFrame
- Returns:
Persisted DataFrame with ID and k-core values (column “kcore”)
- labelPropagation(maxIter: int, algorithm: str = 'graphx', use_local_checkpoints: bool = False, checkpoint_interval: int = 2, storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) DataFrame[source]¶
Runs static label propagation for detecting communities in networks.
See Scala documentation for more details.
- Parameters:
maxIter – the number of iterations to be performed
algorithm – implementation to use, posible values are “graphframes” and “graphx”; “graphx” is faster for small-medium sized graphs, “graphframes” requires less amount of memory
use_local_checkpoints – should local checkpoints be used, default false; local checkpoints are faster and does not require to set a persistent checkpointDir; from the other side, local checkpoints are less reliable and require executors to have big enough local disks.
storage_level – storage level for both intermediate and final dataframes.
- Checkpoint_interval:
How often should the intermediate result be checkpointed; Using big value here may tend to huge logical plan growth due to the iterative nature of the algorithm.
- Returns:
Persisted DataFrame with new vertices column “label”
- maximal_independent_set(seed: int = 42, checkpoint_interval: int = 2, use_local_checkpoints: bool = False, storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) DataFrame[source]¶
This method implements a distributed algorithm for finding a Maximal Independent Set (MIS) in a graph.
An MIS is a set of vertices such that no two vertices in the set are adjacent (i.e., there is no edge between any two vertices in the set), and the set is maximal, meaning that adding any other vertex to the set would violate the independence property. Note that this implementation finds a maximal (but not necessarily maximum) independent set; that is, it ensures no more vertices can be added to the set, but does not guarantee that the set has the largest possible number of vertices among all possible independent sets in the graph.
The algorithm implemented here is based on the paper: Ghaffari, Mohsen. “An improved distributed algorithm for maximal independent set.” Proceedings of the twenty-seventh annual ACM-SIAM symposium on Discrete algorithms. Society for Industrial and Applied Mathematics, 2016.
Note: This is a randomized, non-deterministic algorithm. The result may vary between runs even if a fixed random seed is provided because of how Apache Spark works.
- Parameters:
seed – random seed used for tie-breaking in the algorithm (default: 42)
checkpoint_interval – checkpoint interval in terms of number of iterations (default: 2)
use_local_checkpoints – whether to use local checkpoints (default: False); local checkpoints are faster and do not require setting a persistent checkpoint directory; however, they are less reliable and require executors to have sufficient local disk space.
storage_level – storage level for both intermediate and final DataFrames (default: MEMORY_AND_DISK_DESER)
- Returns:
DataFrame with new vertex column “selected”, where “true” indicates the vertex is part of the Maximal Independent Set
- property nodes: DataFrame¶
Alias to vertices.
- property outDegrees: DataFrame¶
- The out-degree of each vertex in the graph, returned as a DataFrame with two columns:
“id”: the ID of the vertex
“outDegree” (integer) storing the out-degree of the vertex
Note that vertices with 0 out-edges are not returned in the result.
- Returns:
DataFrame with new vertices column “outDegree”
- pageRank(resetProbability: float = 0.15, sourceId: Any | None = None, maxIter: int | None = None, tol: float | None = None) GraphFrame[source]¶
Runs the PageRank algorithm on the graph. Note: Exactly one of fixed_num_iter or tolerance must be set.
See Scala documentation for more details.
- Parameters:
resetProbability – Probability of resetting to a random vertex.
sourceId – (optional) the source vertex for a personalized PageRank.
maxIter – If set, the algorithm is run for a fixed number of iterations. This may not be set if the tol parameter is set.
tol – If set, the algorithm is run until the given tolerance. This may not be set if the numIter parameter is set.
- Returns:
GraphFrame with new vertices column “pagerank” and new edges column “weight”
- parallelPersonalizedPageRank(resetProbability: float = 0.15, sourceIds: list[Any] | None = None, maxIter: int | None = None) GraphFrame[source]¶
Run the personalized PageRank algorithm on the graph, from the provided list of sources in parallel for a fixed number of iterations.
See Scala documentation for more details.
- Parameters:
resetProbability – Probability of resetting to a random vertex
sourceIds – the source vertices for a personalized PageRank
maxIter – the fixed number of iterations this algorithm runs
- Returns:
GraphFrame with new vertices column “pageranks” and new edges column “weight”
- persist(storageLevel: StorageLevel = StorageLevel(False, True, False, False, 1)) GraphFrame[source]¶
Persist the dataframe representation of vertices and edges of the graph with the given storage level.
- powerIterationClustering(k: int, maxIter: int, weightCol: str | None = None) DataFrame[source]¶
Power Iteration Clustering (PIC), a scalable graph clustering algorithm developed by Lin and Cohen. From the abstract: PIC finds a very low-dimensional embedding of a dataset using truncated power iteration on a normalized pair-wise similarity matrix of the data.
- Parameters:
k – the numbers of clusters to create
maxIter – param for maximum number of iterations (>= 0)
weightCol – optional name of weight column, 1.0 is used if not provided
- Returns:
DataFrame with new column “cluster”
- property pregel: Pregel¶
Get the
graphframes.classic.pregel.Pregelor :class`graphframes.connect.graphframes_client.Pregel` object for running pregel.See
graphframes.lib.Pregelfor more details.
- shortestPaths(landmarks: list[str | int], algorithm: str = 'graphx', use_local_checkpoints: bool = False, checkpoint_interval: int = 2, storage_level: StorageLevel = StorageLevel(True, True, False, True, 1), is_directed: bool = True) DataFrame[source]¶
Runs the shortest path algorithm from a set of landmark vertices in the graph.
See Scala documentation for more details.
- Parameters:
landmarks – a set of one or more landmarks
algorithm – implementation to use, posible values are “graphframes” and “graphx”; “graphx” is faster for small-medium sized graphs, “graphframes” requires less amount of memory
use_local_checkpoints – should local checkpoints be used, default false; local checkpoints are faster and does not require to set a persistent checkpointDir; from the other side, local checkpoints are less reliable and require executors to have big enough local disks.
checkpoint_interval – How often should the intermediate result be checkpointed; Using big value here may tend to huge logical plan growth due to the iterative nature of the algorithm.
storage_level – storage level for both intermediate and final dataframes.
is_directed – should algorithm find directed paths or any paths.
- Returns:
persistent DataFrame with new vertices column “distances”
- stronglyConnectedComponents(maxIter: int) DataFrame[source]¶
Runs the strongly connected components algorithm on this graph.
See Scala documentation for more details.
- Parameters:
maxIter – the number of iterations to run
- Returns:
DataFrame with new vertex column “component”
- svdPlusPlus(rank: int = 10, maxIter: int = 2, minValue: float = 0.0, maxValue: float = 5.0, gamma1: float = 0.007, gamma2: float = 0.007, gamma6: float = 0.005, gamma7: float = 0.015) tuple[DataFrame, float][source]¶
Runs the SVD++ algorithm for Collaborative Filtering.
Based on the paper “Factorization Meets the Neighborhood: a Multifaceted Collaborative Filtering Model” by Yehuda Koren (2008).
Algorithm Description SVD++ improves upon standard Matrix Factorization by incorporating implicit feedback (the history of items a user has interacted with) alongside explicit ratings. The prediction rule is:
r_ui = µ + b_u + b_i + q_i^T * (p_u + |N(u)|^-0.5 * sum(y_j for j in N(u)))Input Requirements The input graph must be a Directed Bipartite Graph: - Vertices: A mix of Users and Items. - Edges: Directed strictly from User (src) -> Item (dst). - Edge Attribute: Represents the rating (weight).
- Parameters:
rank – The number of latent factors (embedding size).
maxIter – The maximum number of iterations.
minValue – The minimum possible rating value (used for clipping predictions).
maxValue – The maximum possible rating value (used for clipping predictions).
gamma1 – Learning rate for bias parameters (b_u, b_i).
gamma2 – Learning rate for factor parameters (p_u, q_i, y_j).
gamma6 – Regularization coefficient for bias parameters.
gamma7 – Regularization coefficient for factor parameters.
- Returns:
A tuple
(v, loss)where: -vis a DataFrame of vertices containing the trained model parameters (embeddings). -lossis the final training loss (double).
Output DataFrame Columns The returned DataFrame
vcontains the following new columns containing the model parameters:- column1 (Array[Double]): Primary Latent Factors (Explicit Embedding).
For Users: Preferences vector (p_u).
For Items: Characteristics vector (q_i).
- column2 (Array[Double]): Implicit Factors (Implicit Embedding).
For Items: Influence vector (y_i).
For Users: Unused/Zero (users aggregate y from neighbors).
- column3 (Double): Bias term.
For Users: User bias (b_u).
For Items: Item bias (b_i).
- column4 (Double): Implicit Normalization term.
For Users: Precomputed
|N(u)|^-0.5.For Items: Unused.
- triangleCount(storage_level: StorageLevel, algorithm: str = 'exact', lg_nom_entries: int = 12) DataFrame[source]¶
Computes the number of triangles passing through each vertex. This algorithm identifies sets of three vertices where each pair is connected by an edge.
The implementation provides two algorithms: - “exact”: Computes the exact triangle count using set intersection of neighbor lists.
Note: This method can fail or encounter OOM errors on power-law graphs or graphs with very high-degree nodes, as it requires collecting and intersecting the full neighbor lists for the source and destination vertices of every edge.
“approx”: Uses DataSketches (Theta sketches) to estimate the triangle count. This trades off perfect accuracy for significantly improved performance and lower memory overhead, making it suitable for large-scale or dense graphs.
- Parameters:
storage_level – Storage level for caching intermediate DataFrames.
algorithm – The triangle counting algorithm to use, “exact” or “approx” (default: “exact”).
lg_nom_entries – The log2 of the nominal entries for the Theta sketch (only used if algorithm=”approx”). Higher values increase accuracy at the cost of memory. (default: 12).
- Returns:
A DataFrame containing the vertex “id” and the triangle “count”.
- property triplets: DataFrame¶
The triplets (source vertex)-[edge]->(destination vertex) for all edges in the graph.
- Returned as a
DataFramewith three columns: “src”: source vertex with schema matching ‘vertices’
“edge”: edge with schema matching ‘edges’
‘dst’: destination vertex with schema matching ‘vertices’
- Returns:
DataFrame with columns ‘src’, ‘edge’, and ‘dst’
- Returned as a
- type_degree(edge_type_col: str, edge_types: list[Any] | None = None) DataFrame[source]¶
The total degree of each vertex per edge type (both in and out), returned as a DataFrame with two columns:
“id”: the ID of the vertex
“degrees”: a struct with a field for each edge type, storing the total degree count
- Parameters:
edge_type_col – Name of the column in edges DataFrame that contains edge types
edge_types – Optional list of edge type values. If None, edge types will be
discovered automatically. :return: DataFrame with columns “id” and “degrees” (struct type)
- type_in_degree(edge_type_col: str, edge_types: list[Any] | None = None) DataFrame[source]¶
- The in-degree of each vertex per edge type, returned as a DataFrame with two columns:
“id”: the ID of the vertex
“inDegrees”: a struct with a field for each edge type, storing the in-degree count
- Parameters:
edge_type_col – Name of the column in edges DataFrame that contains edge types
edge_types – Optional list of edge type values. If None, edge types will be
discovered automatically. :return: DataFrame with columns “id” and “inDegrees” (struct type)
- type_out_degree(edge_type_col: str, edge_types: list[Any] | None = None) DataFrame[source]¶
- The out-degree of each vertex per edge type, returned as a DataFrame with two columns:
“id”: the ID of the vertex
“outDegrees”: a struct with a field for each edge type, storing the out-degree count
- Parameters:
edge_type_col – Name of the column in edges DataFrame that contains edge types
edge_types – Optional list of edge type values. If None, edge types will be
discovered automatically. :return: DataFrame with columns “id” and “outDegrees” (struct type)
- unpersist(blocking: bool = False) GraphFrame[source]¶
Mark the dataframe representation of vertices and edges of the graph as non-persistent, and remove all blocks for it from memory and disk.
- validate(check_vertices: bool = True, intermediate_storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) None[source]¶
Validates the consistency and integrity of a graph by performing checks on the vertices and edges.
- Parameters:
check_vertices – a flag to indicate whether additional vertex consistency checks should be performed. If true, the method will verify that all vertices in the vertex DataFrame are represented in the edge DataFrame and vice versa. It is slow on big graphs.
intermediate_storage_level – the storage level to be used when persisting intermediate DataFrame computations during the validation process.
- Returns:
Unit, as the method performs validation checks and throws an exception if validation fails.
- Raises:
ValueError – if there are any inconsistencies in the graph, such as duplicate vertices, mismatched vertices between edges and vertex DataFrames or missing connections.
- property vertices: DataFrame¶
DataFrameholding vertex information, with unique column “id” for vertex IDs.