graphframes package

Subpackages

Contents

class graphframes.GraphFrame(v: DataFrame, e: DataFrame)[source]

Represents a graph with vertices and edges stored as DataFrames.

Parameters:
  • vDataFrame holding vertex information. Must contain a column named “id” that stores unique vertex IDs.

  • eDataFrame holding edge information. Must contain two columns “src” and “dst” storing source vertex IDs and destination vertex IDs of edges, respectively.

>>> localVertices = [(1,"A"), (2,"B"), (3, "C")]
>>> localEdges = [(1,2,"love"), (2,1,"hate"), (2,3,"follow")]
>>> v = spark.createDataFrame(localVertices, ["id", "name"])
>>> e = spark.createDataFrame(localEdges, ["src", "dst", "action"])
>>> g = GraphFrame(v, e)
DST: str = 'dst'
EDGE: str = 'edge'
ID: str = 'id'
SRC: str = 'src'
aggregateMessages(aggCol: list[Column | str] | Column, sendToSrc: list[Column | str] | Column | str | None = None, sendToDst: list[Column | str] | Column | str | None = None, intermediate_storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) DataFrame[source]

Aggregates messages from the neighbours.

When specifying the messages and aggregation function, the user may reference columns using the static methods in graphframes.lib.AggregateMessages.

See Scala documentation for more details.

Warning! The result of this method is persisted DataFrame object! Users should handle unpersist to avoid possible memory leaks!

Parameters:
  • aggCol – the requested aggregation output either as a collection of pyspark.sql.Column or SQL expression string

  • sendToSrc – message sent to the source vertex of each triplet either as a collection of pyspark.sql.Column or SQL expression string (default: None)

  • sendToDst – message sent to the destination vertex of each triplet either as collection of pyspark.sql.Column or SQL expression string (default: None)

  • intermediate_storage_level – the level of intermediate storage that will be used for both intermediate result and the output.

Returns:

Persisted DataFrame with columns for the vertex ID and the resulting aggregated message. The name of the resulted message column is based on the alias of the provided aggCol!

as_undirected() GraphFrame[source]

Converts the directed graph into an undirected graph by ensuring that all directed edges are bidirectional. For every directed edge (src, dst), a corresponding edge (dst, src) is added.

Returns:

A new GraphFrame representing the undirected graph.

bfs(fromExpr: str, toExpr: str, edgeFilter: str | None = None, maxPathLength: int = 10) DataFrame[source]

Breadth-first search (BFS).

See Scala documentation for more details.

Returns:

DataFrame with one Row for each shortest path between matching vertices.

cache() GraphFrame[source]

Persist the dataframe representation of vertices and edges of the graph with the default storage level.

connectedComponents(algorithm: str = 'graphframes', checkpointInterval: int = 2, broadcastThreshold: int = 1000000, useLabelsAsComponents: bool = False, use_local_checkpoints: bool = False, max_iter: int = 31, storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) DataFrame[source]

Computes the connected components of the graph.

See Scala documentation for more details.

Parameters:
  • algorithm – connected components algorithm to use (default: “graphframes”) Supported algorithms are “graphframes” and “graphx”.

  • checkpointInterval – checkpoint interval in terms of number of iterations (default: 2)

  • broadcastThreshold – broadcast threshold in propagating component assignments (default: 1000000). Passing -1 disable manual broadcasting and allows AQE to handle skewed joins. This mode is much faster and is recommended to use. Default value may be changed to -1 in the future versions of GraphFrames.

  • useLabelsAsComponents – if True, uses the vertex labels as components, otherwise will use longs

  • use_local_checkpoints – should local checkpoints be used, default false; local checkpoints are faster and does not require to set a persistent checkpointDir; from the other side, local checkpoints are less reliable and require executors to have big enough local disks.

  • storage_level – storage level for both intermediate and final dataframes.

Returns:

DataFrame with new vertices column “component”

property degrees: DataFrame
The degree of each vertex in the graph, returned as a DataFrame with two columns:
  • “id”: the ID of the vertex

  • ‘degree’ (integer) the degree of the vertex

Note that vertices with 0 edges are not returned in the result.

Returns:

DataFrame with new vertices column “degree”

detectingCycles(checkpoint_interval: int = 2, use_local_checkpoints: bool = False, storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) DataFrame[source]

Find all cycles in the graph.

An implementation of the Rocha–Thatte cycle detection algorithm. Rocha, Rodrigo Caetano, and Bhalchandra D. Thatte. “Distributed cycle detection in large-scale sparse graphs.” Proceedings of Simpósio Brasileiro de Pesquisa Operacional (SBPO’15) (2015): 1-11.

Returns a DataFrame with unique cycles.

Parameters:
  • checkpoint_interval – Pregel checkpoint interval, default is 2

  • use_local_checkpoints – should local checkpoints be used instead of checkpointDir

Storage_level:

the level of storage for both intermediate results and an output DataFrame

Returns:

Persisted DataFrame with all the cycles

dropIsolatedVertices() GraphFrame[source]

Drops isolated vertices, vertices are not contained in any edges.

Returns:

GraphFrame with filtered vertices.

property edges: DataFrame

DataFrame holding edge information, with unique columns “src” and “dst” storing source vertex IDs and destination vertex IDs of edges, respectively.

filterEdges(condition: str | Column) GraphFrame[source]

Filters the edges based on expression, keep all vertices.

Parameters:

condition – String or Column describing the condition expression for filtering.

Returns:

GraphFrame with filtered edges.

filterVertices(condition: str | Column) GraphFrame[source]

Filters the vertices based on expression, remove edges containing any dropped vertices.

Parameters:

condition – String or Column describing the condition expression for filtering.

Returns:

GraphFrame with filtered vertices and edges.

find(pattern: str) DataFrame[source]

Motif finding.

See Scala documentation for more details.

Parameters:

pattern – String describing the motif to search for.

Returns:

DataFrame with one Row for each instance of the motif found

property inDegrees: DataFrame
The in-degree of each vertex in the graph, returned as a DataFame with two columns:
  • “id”: the ID of the vertex

  • “inDegree” (int) storing the in-degree of the vertex

Note that vertices with 0 in-edges are not returned in the result.

Returns:

DataFrame with new vertices column “inDegree”

k_core(checkpoint_interval: int = 2, use_local_checkpoints: bool = False, storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) DataFrame[source]

The k-core is the maximal subgraph such that every vertex has at least degree k. The k-core metric is a measure of the centrality of a node in a network, based on its degree and the degrees of its neighbors. Nodes with higher k-core values are considered to be more central and influential within the network.

This implementation is based on the algorithm described in: Mandal, Aritra, and Mohammad Al Hasan. “A distributed k-core decomposition algorithm on spark.” 2017 IEEE International Conference on Big Data (Big Data). IEEE, 2017.

Parameters:
  • checkpoint_interval – Pregel checkpoint interval, default is 2

  • use_local_checkpoints – should local checkpoints be used instead of checkpointDir

  • storage_level – the level of storage for both intermediate results and an output DataFrame

Returns:

Persisted DataFrame with ID and k-core values (column “kcore”)

labelPropagation(maxIter: int, algorithm: str = 'graphx', use_local_checkpoints: bool = False, checkpoint_interval: int = 2, storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) DataFrame[source]

Runs static label propagation for detecting communities in networks.

See Scala documentation for more details.

Parameters:
  • maxIter – the number of iterations to be performed

  • algorithm – implementation to use, posible values are “graphframes” and “graphx”; “graphx” is faster for small-medium sized graphs, “graphframes” requires less amount of memory

  • use_local_checkpoints – should local checkpoints be used, default false; local checkpoints are faster and does not require to set a persistent checkpointDir; from the other side, local checkpoints are less reliable and require executors to have big enough local disks.

  • storage_level – storage level for both intermediate and final dataframes.

Checkpoint_interval:

How often should the intermediate result be checkpointed; Using big value here may tend to huge logical plan growth due to the iterative nature of the algorithm.

Returns:

Persisted DataFrame with new vertices column “label”

maximal_independent_set(seed: int = 42, checkpoint_interval: int = 2, use_local_checkpoints: bool = False, storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) DataFrame[source]

This method implements a distributed algorithm for finding a Maximal Independent Set (MIS) in a graph.

An MIS is a set of vertices such that no two vertices in the set are adjacent (i.e., there is no edge between any two vertices in the set), and the set is maximal, meaning that adding any other vertex to the set would violate the independence property. Note that this implementation finds a maximal (but not necessarily maximum) independent set; that is, it ensures no more vertices can be added to the set, but does not guarantee that the set has the largest possible number of vertices among all possible independent sets in the graph.

The algorithm implemented here is based on the paper: Ghaffari, Mohsen. “An improved distributed algorithm for maximal independent set.” Proceedings of the twenty-seventh annual ACM-SIAM symposium on Discrete algorithms. Society for Industrial and Applied Mathematics, 2016.

Note: This is a randomized, non-deterministic algorithm. The result may vary between runs even if a fixed random seed is provided because of how Apache Spark works.

Parameters:
  • seed – random seed used for tie-breaking in the algorithm (default: 42)

  • checkpoint_interval – checkpoint interval in terms of number of iterations (default: 2)

  • use_local_checkpoints – whether to use local checkpoints (default: False); local checkpoints are faster and do not require setting a persistent checkpoint directory; however, they are less reliable and require executors to have sufficient local disk space.

  • storage_level – storage level for both intermediate and final DataFrames (default: MEMORY_AND_DISK_DESER)

Returns:

DataFrame with new vertex column “selected”, where “true” indicates the vertex is part of the Maximal Independent Set

property nodes: DataFrame

Alias to vertices.

property outDegrees: DataFrame
The out-degree of each vertex in the graph, returned as a DataFrame with two columns:
  • “id”: the ID of the vertex

  • “outDegree” (integer) storing the out-degree of the vertex

Note that vertices with 0 out-edges are not returned in the result.

Returns:

DataFrame with new vertices column “outDegree”

pageRank(resetProbability: float = 0.15, sourceId: Any | None = None, maxIter: int | None = None, tol: float | None = None) GraphFrame[source]

Runs the PageRank algorithm on the graph. Note: Exactly one of fixed_num_iter or tolerance must be set.

See Scala documentation for more details.

Parameters:
  • resetProbability – Probability of resetting to a random vertex.

  • sourceId – (optional) the source vertex for a personalized PageRank.

  • maxIter – If set, the algorithm is run for a fixed number of iterations. This may not be set if the tol parameter is set.

  • tol – If set, the algorithm is run until the given tolerance. This may not be set if the numIter parameter is set.

Returns:

GraphFrame with new vertices column “pagerank” and new edges column “weight”

parallelPersonalizedPageRank(resetProbability: float = 0.15, sourceIds: list[Any] | None = None, maxIter: int | None = None) GraphFrame[source]

Run the personalized PageRank algorithm on the graph, from the provided list of sources in parallel for a fixed number of iterations.

See Scala documentation for more details.

Parameters:
  • resetProbability – Probability of resetting to a random vertex

  • sourceIds – the source vertices for a personalized PageRank

  • maxIter – the fixed number of iterations this algorithm runs

Returns:

GraphFrame with new vertices column “pageranks” and new edges column “weight”

persist(storageLevel: StorageLevel = StorageLevel(False, True, False, False, 1)) GraphFrame[source]

Persist the dataframe representation of vertices and edges of the graph with the given storage level.

powerIterationClustering(k: int, maxIter: int, weightCol: str | None = None) DataFrame[source]

Power Iteration Clustering (PIC), a scalable graph clustering algorithm developed by Lin and Cohen. From the abstract: PIC finds a very low-dimensional embedding of a dataset using truncated power iteration on a normalized pair-wise similarity matrix of the data.

Parameters:
  • k – the numbers of clusters to create

  • maxIter – param for maximum number of iterations (>= 0)

  • weightCol – optional name of weight column, 1.0 is used if not provided

Returns:

DataFrame with new column “cluster”

property pregel: Pregel

Get the graphframes.classic.pregel.Pregel or :class`graphframes.connect.graphframes_client.Pregel` object for running pregel.

See graphframes.lib.Pregel for more details.

shortestPaths(landmarks: list[str | int], algorithm: str = 'graphx', use_local_checkpoints: bool = False, checkpoint_interval: int = 2, storage_level: StorageLevel = StorageLevel(True, True, False, True, 1), is_directed: bool = True) DataFrame[source]

Runs the shortest path algorithm from a set of landmark vertices in the graph.

See Scala documentation for more details.

Parameters:
  • landmarks – a set of one or more landmarks

  • algorithm – implementation to use, posible values are “graphframes” and “graphx”; “graphx” is faster for small-medium sized graphs, “graphframes” requires less amount of memory

  • use_local_checkpoints – should local checkpoints be used, default false; local checkpoints are faster and does not require to set a persistent checkpointDir; from the other side, local checkpoints are less reliable and require executors to have big enough local disks.

  • checkpoint_interval – How often should the intermediate result be checkpointed; Using big value here may tend to huge logical plan growth due to the iterative nature of the algorithm.

  • storage_level – storage level for both intermediate and final dataframes.

  • is_directed – should algorithm find directed paths or any paths.

Returns:

persistent DataFrame with new vertices column “distances”

stronglyConnectedComponents(maxIter: int) DataFrame[source]

Runs the strongly connected components algorithm on this graph.

See Scala documentation for more details.

Parameters:

maxIter – the number of iterations to run

Returns:

DataFrame with new vertex column “component”

svdPlusPlus(rank: int = 10, maxIter: int = 2, minValue: float = 0.0, maxValue: float = 5.0, gamma1: float = 0.007, gamma2: float = 0.007, gamma6: float = 0.005, gamma7: float = 0.015) tuple[DataFrame, float][source]

Runs the SVD++ algorithm.

See Scala documentation for more details.

Returns:

Tuple of DataFrame with new vertex columns storing learned model, and loss value

triangleCount(storage_level: StorageLevel) DataFrame[source]

Counts the number of triangles passing through each vertex in this graph. This impementation is based on the computing the intersection of vertices neighborhoods. It requires to collect the whole neighborhood of each vertex. It may fail because of memory errors on graphs with power law degrees distribution (graphs with a few very high-degree vertices). Consider edges sampling for that case to get an approximate count of trangles.

Parameters:

storage_level – storage level that is used for both intermediate and final dataframes.

Returns:

DataFrame with new vertex column “count”

property triplets: DataFrame

The triplets (source vertex)-[edge]->(destination vertex) for all edges in the graph.

Returned as a DataFrame with three columns:
  • “src”: source vertex with schema matching ‘vertices’

  • “edge”: edge with schema matching ‘edges’

  • ‘dst’: destination vertex with schema matching ‘vertices’

Returns:

DataFrame with columns ‘src’, ‘edge’, and ‘dst’

type_degree(edge_type_col: str, edge_types: List[Any] | None = None) DataFrame[source]

The total degree of each vertex per edge type (both in and out), returned as a DataFrame with two columns:

  • “id”: the ID of the vertex

  • “degrees”: a struct with a field for each edge type, storing the total degree count

Parameters:
  • edge_type_col – Name of the column in edges DataFrame that contains edge types

  • edge_types – Optional list of edge type values. If None, edge types will be

discovered automatically. :return: DataFrame with columns “id” and “degrees” (struct type)

type_in_degree(edge_type_col: str, edge_types: List[Any] | None = None) DataFrame[source]
The in-degree of each vertex per edge type, returned as a DataFrame with two columns:
  • “id”: the ID of the vertex

  • “inDegrees”: a struct with a field for each edge type, storing the in-degree count

Parameters:
  • edge_type_col – Name of the column in edges DataFrame that contains edge types

  • edge_types – Optional list of edge type values. If None, edge types will be

discovered automatically. :return: DataFrame with columns “id” and “inDegrees” (struct type)

type_out_degree(edge_type_col: str, edge_types: List[Any] | None = None) DataFrame[source]
The out-degree of each vertex per edge type, returned as a DataFrame with two columns:
  • “id”: the ID of the vertex

  • “outDegrees”: a struct with a field for each edge type, storing the out-degree count

Parameters:
  • edge_type_col – Name of the column in edges DataFrame that contains edge types

  • edge_types – Optional list of edge type values. If None, edge types will be

discovered automatically. :return: DataFrame with columns “id” and “outDegrees” (struct type)

unpersist(blocking: bool = False) GraphFrame[source]

Mark the dataframe representation of vertices and edges of the graph as non-persistent, and remove all blocks for it from memory and disk.

validate(check_vertices: bool = True, intermediate_storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) None[source]

Validates the consistency and integrity of a graph by performing checks on the vertices and edges.

Parameters:
  • check_vertices – a flag to indicate whether additional vertex consistency checks should be performed. If true, the method will verify that all vertices in the vertex DataFrame are represented in the edge DataFrame and vice versa. It is slow on big graphs.

  • intermediate_storage_level – the storage level to be used when persisting intermediate DataFrame computations during the validation process.

Returns:

Unit, as the method performs validation checks and throws an exception if validation fails.

Raises:

ValueError – if there are any inconsistencies in the graph, such as duplicate vertices, mismatched vertices between edges and vertex DataFrames or missing connections.

property vertices: DataFrame

DataFrame holding vertex information, with unique column “id” for vertex IDs.