graphframes package¶
Subpackages¶
Contents¶
- class graphframes.GraphFrame(v: DataFrame, e: DataFrame)[source]¶
Represents a graph with vertices and edges stored as DataFrames.
- Parameters:
v –
DataFrameholding vertex information. Must contain a column named “id” that stores unique vertex IDs.e –
DataFrameholding edge information. Must contain two columns “src” and “dst” storing source vertex IDs and destination vertex IDs of edges, respectively.
>>> localVertices = [(1,"A"), (2,"B"), (3, "C")] >>> localEdges = [(1,2,"love"), (2,1,"hate"), (2,3,"follow")] >>> v = spark.createDataFrame(localVertices, ["id", "name"]) >>> e = spark.createDataFrame(localEdges, ["src", "dst", "action"]) >>> g = GraphFrame(v, e)
- DST: str = 'dst'¶
- EDGE: str = 'edge'¶
- ID: str = 'id'¶
- SRC: str = 'src'¶
- aggregateMessages(aggCol: list[Column | str] | Column, sendToSrc: list[Column | str] | Column | str | None = None, sendToDst: list[Column | str] | Column | str | None = None, intermediate_storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) DataFrame[source]¶
Aggregates messages from the neighbours.
When specifying the messages and aggregation function, the user may reference columns using the static methods in
graphframes.lib.AggregateMessages.See Scala documentation for more details.
Warning! The result of this method is persisted DataFrame object! Users should handle unpersist to avoid possible memory leaks!
- Parameters:
aggCol – the requested aggregation output either as a collection of
pyspark.sql.Columnor SQL expression stringsendToSrc – message sent to the source vertex of each triplet either as a collection of
pyspark.sql.Columnor SQL expression string (default: None)sendToDst – message sent to the destination vertex of each triplet either as collection of
pyspark.sql.Columnor SQL expression string (default: None)intermediate_storage_level – the level of intermediate storage that will be used for both intermediate result and the output.
- Returns:
Persisted DataFrame with columns for the vertex ID and the resulting aggregated message. The name of the resulted message column is based on the alias of the provided aggCol!
- as_undirected() GraphFrame[source]¶
Converts the directed graph into an undirected graph by ensuring that all directed edges are bidirectional. For every directed edge (src, dst), a corresponding edge (dst, src) is added.
- Returns:
A new GraphFrame representing the undirected graph.
- bfs(fromExpr: str, toExpr: str, edgeFilter: str | None = None, maxPathLength: int = 10) DataFrame[source]¶
Breadth-first search (BFS).
See Scala documentation for more details.
- Returns:
DataFrame with one Row for each shortest path between matching vertices.
- cache() GraphFrame[source]¶
Persist the dataframe representation of vertices and edges of the graph with the default storage level.
- connectedComponents(algorithm: str = 'graphframes', checkpointInterval: int = 2, broadcastThreshold: int = 1000000, useLabelsAsComponents: bool = False, use_local_checkpoints: bool = False, max_iter: int = 31, storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) DataFrame[source]¶
Computes the connected components of the graph.
See Scala documentation for more details.
- Parameters:
algorithm – connected components algorithm to use (default: “graphframes”) Supported algorithms are “graphframes” and “graphx”.
checkpointInterval – checkpoint interval in terms of number of iterations (default: 2)
broadcastThreshold – broadcast threshold in propagating component assignments (default: 1000000). Passing -1 disable manual broadcasting and allows AQE to handle skewed joins. This mode is much faster and is recommended to use. Default value may be changed to -1 in the future versions of GraphFrames.
useLabelsAsComponents – if True, uses the vertex labels as components, otherwise will use longs
use_local_checkpoints – should local checkpoints be used, default false; local checkpoints are faster and does not require to set a persistent checkpointDir; from the other side, local checkpoints are less reliable and require executors to have big enough local disks.
storage_level – storage level for both intermediate and final dataframes.
- Returns:
DataFrame with new vertices column “component”
- property degrees: DataFrame¶
- The degree of each vertex in the graph, returned as a DataFrame with two columns:
“id”: the ID of the vertex
‘degree’ (integer) the degree of the vertex
Note that vertices with 0 edges are not returned in the result.
- Returns:
DataFrame with new vertices column “degree”
- detectingCycles(checkpoint_interval: int = 2, use_local_checkpoints: bool = False, storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) DataFrame[source]¶
Find all cycles in the graph.
An implementation of the Rocha–Thatte cycle detection algorithm. Rocha, Rodrigo Caetano, and Bhalchandra D. Thatte. “Distributed cycle detection in large-scale sparse graphs.” Proceedings of Simpósio Brasileiro de Pesquisa Operacional (SBPO’15) (2015): 1-11.
Returns a DataFrame with unique cycles.
- Parameters:
checkpoint_interval – Pregel checkpoint interval, default is 2
use_local_checkpoints – should local checkpoints be used instead of checkpointDir
- Storage_level:
the level of storage for both intermediate results and an output DataFrame
- Returns:
Persisted DataFrame with all the cycles
- dropIsolatedVertices() GraphFrame[source]¶
Drops isolated vertices, vertices are not contained in any edges.
- Returns:
GraphFrame with filtered vertices.
- property edges: DataFrame¶
DataFrameholding edge information, with unique columns “src” and “dst” storing source vertex IDs and destination vertex IDs of edges, respectively.
- filterEdges(condition: str | Column) GraphFrame[source]¶
Filters the edges based on expression, keep all vertices.
- Parameters:
condition – String or Column describing the condition expression for filtering.
- Returns:
GraphFrame with filtered edges.
- filterVertices(condition: str | Column) GraphFrame[source]¶
Filters the vertices based on expression, remove edges containing any dropped vertices.
- Parameters:
condition – String or Column describing the condition expression for filtering.
- Returns:
GraphFrame with filtered vertices and edges.
- find(pattern: str) DataFrame[source]¶
Motif finding.
See Scala documentation for more details.
- Parameters:
pattern – String describing the motif to search for.
- Returns:
DataFrame with one Row for each instance of the motif found
- property inDegrees: DataFrame¶
- The in-degree of each vertex in the graph, returned as a DataFame with two columns:
“id”: the ID of the vertex
“inDegree” (int) storing the in-degree of the vertex
Note that vertices with 0 in-edges are not returned in the result.
- Returns:
DataFrame with new vertices column “inDegree”
- k_core(checkpoint_interval: int = 2, use_local_checkpoints: bool = False, storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) DataFrame[source]¶
The k-core is the maximal subgraph such that every vertex has at least degree k. The k-core metric is a measure of the centrality of a node in a network, based on its degree and the degrees of its neighbors. Nodes with higher k-core values are considered to be more central and influential within the network.
This implementation is based on the algorithm described in: Mandal, Aritra, and Mohammad Al Hasan. “A distributed k-core decomposition algorithm on spark.” 2017 IEEE International Conference on Big Data (Big Data). IEEE, 2017.
- Parameters:
checkpoint_interval – Pregel checkpoint interval, default is 2
use_local_checkpoints – should local checkpoints be used instead of checkpointDir
storage_level – the level of storage for both intermediate results and an output DataFrame
- Returns:
Persisted DataFrame with ID and k-core values (column “kcore”)
- labelPropagation(maxIter: int, algorithm: str = 'graphx', use_local_checkpoints: bool = False, checkpoint_interval: int = 2, storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) DataFrame[source]¶
Runs static label propagation for detecting communities in networks.
See Scala documentation for more details.
- Parameters:
maxIter – the number of iterations to be performed
algorithm – implementation to use, posible values are “graphframes” and “graphx”; “graphx” is faster for small-medium sized graphs, “graphframes” requires less amount of memory
use_local_checkpoints – should local checkpoints be used, default false; local checkpoints are faster and does not require to set a persistent checkpointDir; from the other side, local checkpoints are less reliable and require executors to have big enough local disks.
storage_level – storage level for both intermediate and final dataframes.
- Checkpoint_interval:
How often should the intermediate result be checkpointed; Using big value here may tend to huge logical plan growth due to the iterative nature of the algorithm.
- Returns:
Persisted DataFrame with new vertices column “label”
- maximal_independent_set(seed: int = 42, checkpoint_interval: int = 2, use_local_checkpoints: bool = False, storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) DataFrame[source]¶
This method implements a distributed algorithm for finding a Maximal Independent Set (MIS) in a graph.
An MIS is a set of vertices such that no two vertices in the set are adjacent (i.e., there is no edge between any two vertices in the set), and the set is maximal, meaning that adding any other vertex to the set would violate the independence property. Note that this implementation finds a maximal (but not necessarily maximum) independent set; that is, it ensures no more vertices can be added to the set, but does not guarantee that the set has the largest possible number of vertices among all possible independent sets in the graph.
The algorithm implemented here is based on the paper: Ghaffari, Mohsen. “An improved distributed algorithm for maximal independent set.” Proceedings of the twenty-seventh annual ACM-SIAM symposium on Discrete algorithms. Society for Industrial and Applied Mathematics, 2016.
Note: This is a randomized, non-deterministic algorithm. The result may vary between runs even if a fixed random seed is provided because of how Apache Spark works.
- Parameters:
seed – random seed used for tie-breaking in the algorithm (default: 42)
checkpoint_interval – checkpoint interval in terms of number of iterations (default: 2)
use_local_checkpoints – whether to use local checkpoints (default: False); local checkpoints are faster and do not require setting a persistent checkpoint directory; however, they are less reliable and require executors to have sufficient local disk space.
storage_level – storage level for both intermediate and final DataFrames (default: MEMORY_AND_DISK_DESER)
- Returns:
DataFrame with new vertex column “selected”, where “true” indicates the vertex is part of the Maximal Independent Set
- property nodes: DataFrame¶
Alias to vertices.
- property outDegrees: DataFrame¶
- The out-degree of each vertex in the graph, returned as a DataFrame with two columns:
“id”: the ID of the vertex
“outDegree” (integer) storing the out-degree of the vertex
Note that vertices with 0 out-edges are not returned in the result.
- Returns:
DataFrame with new vertices column “outDegree”
- pageRank(resetProbability: float = 0.15, sourceId: Any | None = None, maxIter: int | None = None, tol: float | None = None) GraphFrame[source]¶
Runs the PageRank algorithm on the graph. Note: Exactly one of fixed_num_iter or tolerance must be set.
See Scala documentation for more details.
- Parameters:
resetProbability – Probability of resetting to a random vertex.
sourceId – (optional) the source vertex for a personalized PageRank.
maxIter – If set, the algorithm is run for a fixed number of iterations. This may not be set if the tol parameter is set.
tol – If set, the algorithm is run until the given tolerance. This may not be set if the numIter parameter is set.
- Returns:
GraphFrame with new vertices column “pagerank” and new edges column “weight”
- parallelPersonalizedPageRank(resetProbability: float = 0.15, sourceIds: list[Any] | None = None, maxIter: int | None = None) GraphFrame[source]¶
Run the personalized PageRank algorithm on the graph, from the provided list of sources in parallel for a fixed number of iterations.
See Scala documentation for more details.
- Parameters:
resetProbability – Probability of resetting to a random vertex
sourceIds – the source vertices for a personalized PageRank
maxIter – the fixed number of iterations this algorithm runs
- Returns:
GraphFrame with new vertices column “pageranks” and new edges column “weight”
- persist(storageLevel: StorageLevel = StorageLevel(False, True, False, False, 1)) GraphFrame[source]¶
Persist the dataframe representation of vertices and edges of the graph with the given storage level.
- powerIterationClustering(k: int, maxIter: int, weightCol: str | None = None) DataFrame[source]¶
Power Iteration Clustering (PIC), a scalable graph clustering algorithm developed by Lin and Cohen. From the abstract: PIC finds a very low-dimensional embedding of a dataset using truncated power iteration on a normalized pair-wise similarity matrix of the data.
- Parameters:
k – the numbers of clusters to create
maxIter – param for maximum number of iterations (>= 0)
weightCol – optional name of weight column, 1.0 is used if not provided
- Returns:
DataFrame with new column “cluster”
- property pregel: Pregel¶
Get the
graphframes.classic.pregel.Pregelor :class`graphframes.connect.graphframes_client.Pregel` object for running pregel.See
graphframes.lib.Pregelfor more details.
- shortestPaths(landmarks: list[str | int], algorithm: str = 'graphx', use_local_checkpoints: bool = False, checkpoint_interval: int = 2, storage_level: StorageLevel = StorageLevel(True, True, False, True, 1), is_directed: bool = True) DataFrame[source]¶
Runs the shortest path algorithm from a set of landmark vertices in the graph.
See Scala documentation for more details.
- Parameters:
landmarks – a set of one or more landmarks
algorithm – implementation to use, posible values are “graphframes” and “graphx”; “graphx” is faster for small-medium sized graphs, “graphframes” requires less amount of memory
use_local_checkpoints – should local checkpoints be used, default false; local checkpoints are faster and does not require to set a persistent checkpointDir; from the other side, local checkpoints are less reliable and require executors to have big enough local disks.
checkpoint_interval – How often should the intermediate result be checkpointed; Using big value here may tend to huge logical plan growth due to the iterative nature of the algorithm.
storage_level – storage level for both intermediate and final dataframes.
is_directed – should algorithm find directed paths or any paths.
- Returns:
persistent DataFrame with new vertices column “distances”
- stronglyConnectedComponents(maxIter: int) DataFrame[source]¶
Runs the strongly connected components algorithm on this graph.
See Scala documentation for more details.
- Parameters:
maxIter – the number of iterations to run
- Returns:
DataFrame with new vertex column “component”
- svdPlusPlus(rank: int = 10, maxIter: int = 2, minValue: float = 0.0, maxValue: float = 5.0, gamma1: float = 0.007, gamma2: float = 0.007, gamma6: float = 0.005, gamma7: float = 0.015) tuple[DataFrame, float][source]¶
Runs the SVD++ algorithm.
See Scala documentation for more details.
- Returns:
Tuple of DataFrame with new vertex columns storing learned model, and loss value
- triangleCount(storage_level: StorageLevel) DataFrame[source]¶
Counts the number of triangles passing through each vertex in this graph. This impementation is based on the computing the intersection of vertices neighborhoods. It requires to collect the whole neighborhood of each vertex. It may fail because of memory errors on graphs with power law degrees distribution (graphs with a few very high-degree vertices). Consider edges sampling for that case to get an approximate count of trangles.
- Parameters:
storage_level – storage level that is used for both intermediate and final dataframes.
- Returns:
DataFrame with new vertex column “count”
- property triplets: DataFrame¶
The triplets (source vertex)-[edge]->(destination vertex) for all edges in the graph.
- Returned as a
DataFramewith three columns: “src”: source vertex with schema matching ‘vertices’
“edge”: edge with schema matching ‘edges’
‘dst’: destination vertex with schema matching ‘vertices’
- Returns:
DataFrame with columns ‘src’, ‘edge’, and ‘dst’
- Returned as a
- type_degree(edge_type_col: str, edge_types: List[Any] | None = None) DataFrame[source]¶
The total degree of each vertex per edge type (both in and out), returned as a DataFrame with two columns:
“id”: the ID of the vertex
“degrees”: a struct with a field for each edge type, storing the total degree count
- Parameters:
edge_type_col – Name of the column in edges DataFrame that contains edge types
edge_types – Optional list of edge type values. If None, edge types will be
discovered automatically. :return: DataFrame with columns “id” and “degrees” (struct type)
- type_in_degree(edge_type_col: str, edge_types: List[Any] | None = None) DataFrame[source]¶
- The in-degree of each vertex per edge type, returned as a DataFrame with two columns:
“id”: the ID of the vertex
“inDegrees”: a struct with a field for each edge type, storing the in-degree count
- Parameters:
edge_type_col – Name of the column in edges DataFrame that contains edge types
edge_types – Optional list of edge type values. If None, edge types will be
discovered automatically. :return: DataFrame with columns “id” and “inDegrees” (struct type)
- type_out_degree(edge_type_col: str, edge_types: List[Any] | None = None) DataFrame[source]¶
- The out-degree of each vertex per edge type, returned as a DataFrame with two columns:
“id”: the ID of the vertex
“outDegrees”: a struct with a field for each edge type, storing the out-degree count
- Parameters:
edge_type_col – Name of the column in edges DataFrame that contains edge types
edge_types – Optional list of edge type values. If None, edge types will be
discovered automatically. :return: DataFrame with columns “id” and “outDegrees” (struct type)
- unpersist(blocking: bool = False) GraphFrame[source]¶
Mark the dataframe representation of vertices and edges of the graph as non-persistent, and remove all blocks for it from memory and disk.
- validate(check_vertices: bool = True, intermediate_storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) None[source]¶
Validates the consistency and integrity of a graph by performing checks on the vertices and edges.
- Parameters:
check_vertices – a flag to indicate whether additional vertex consistency checks should be performed. If true, the method will verify that all vertices in the vertex DataFrame are represented in the edge DataFrame and vice versa. It is slow on big graphs.
intermediate_storage_level – the storage level to be used when persisting intermediate DataFrame computations during the validation process.
- Returns:
Unit, as the method performs validation checks and throws an exception if validation fails.
- Raises:
ValueError – if there are any inconsistencies in the graph, such as duplicate vertices, mismatched vertices between edges and vertex DataFrames or missing connections.
- property vertices: DataFrame¶
DataFrameholding vertex information, with unique column “id” for vertex IDs.