"""
Base class for all graphs.
TODO: Add error checking
"""
import collections
import copy
from .vertex import Vertex
[docs]class Graph:
def __init__(
self,
vertices=[],
di_edges=set(),
bi_edges=set(),
ud_edges=set(),
**kwargs,
):
"""
Constructor.
:param vertices: iterable of names of vertices.
:param di_edges: iterable of tuples of directed edges i.e. (X, Y) = X -> Y.
:param bi_edges: iterable of tuples of bidirected edges i.e. (X, Y) = X <-> Y.
:param ud_edges: iterable of tuples of undirected edges i.e. (X, Y) = X - Y.
"""
assert not kwargs, "Unrecognised kwargs: {}".format(kwargs)
# initialize vertices
if isinstance(vertices, collections.abc.Mapping):
if all(
isinstance(x, str) or isinstance(x, int) or x is None
for x in vertices.values()
):
self.vertices = {
v: Vertex(name=v, cardinality=c)
for v, c in vertices.items()
}
elif all(isinstance(x, Vertex) for x in vertices.values()):
self.vertices = vertices
else:
raise ValueError("Unrecognized mapping of vertices")
else:
if all(isinstance(x, Vertex) for x in vertices):
self.vertices = {x.name: x for x in vertices}
elif all(isinstance(x, str) for x in vertices):
self.vertices = {v: Vertex(v) for v in vertices}
else:
raise ValueError("Unrecognized iterable of vertices")
# initialize edges
self.di_edges = set()
self.bi_edges = set()
self.ud_edges = set()
# read in directed edges
# explicit reference to Graph so that overridden functions
# aren't called that make calls to recompute districts etc.
for parent, child in di_edges:
Graph.add_diedge(self, parent, child)
# read in bidirected edges
for sib1, sib2 in bi_edges:
Graph.add_biedge(self, sib1, sib2)
# read in undirected edges
for neb1, neb2 in ud_edges:
Graph.add_udedge(self, neb1, neb2)
[docs] def add_vertex(self, name, cardinality=None, **kwargs):
"""
Add a vertex to the graph.
:param name: name of vertex.
:return: None.
"""
if name not in self.vertices:
self.vertices[name] = Vertex(
name=name, cardinality=cardinality, **kwargs
)
[docs] def delete_vertex(self, name):
"""
Delete a vertex from the graph. Additionally removes any edges from other vertices into the
given vertex
:param name: name of vertex.
:return: None.
"""
vertex = self.vertices[name]
for child in [x.name for x in vertex.children]:
self.delete_diedge(name, child)
for parent in [x.name for x in vertex.parents]:
self.delete_diedge(parent, name)
for sib in [x.name for x in vertex.siblings]:
self.delete_biedge(sib, name)
for neb in [x.name for x in vertex.neighbors]:
self.delete_udedge(neb, name)
del self.vertices[name]
[docs] def add_diedge(self, parent, child):
"""
Add a directed edge to the graph.
:param parent: tail of edge.
:param child: head of edge.
:return: None.
"""
if (parent, child) not in self.di_edges:
self.di_edges.add((parent, child))
self.vertices[parent].children.add(self.vertices[child])
self.vertices[child].parents.add(self.vertices[parent])
[docs] def delete_diedge(self, parent, child):
"""
Deleted given directed edge from the graph.
:param parent: tail of edge.
:param child: head of edge.
:return: None.
"""
self.di_edges.remove((parent, child))
self.vertices[parent].children.remove(self.vertices[child])
self.vertices[child].parents.remove(self.vertices[parent])
[docs] def add_biedge(self, sib1, sib2):
"""
Add a bidirected edge to the graph.
:param sib1: endpoint 1 of edge.
:param sib2: endpoint 2 of edge.
:return: None.
"""
if (sib1, sib2) and (sib2, sib1) not in self.bi_edges:
self.bi_edges.add((sib1, sib2))
self.vertices[sib1].siblings.add(self.vertices[sib2])
self.vertices[sib2].siblings.add(self.vertices[sib1])
[docs] def delete_biedge(self, sib1, sib2):
"""
Delete given bidirected edge from the graph.
:param sib1: endpoint 1 of edge.
:param sib2: endpoint 2 of edge.
:return: None.
"""
try:
self.bi_edges.remove((sib1, sib2))
except KeyError:
self.bi_edges.remove((sib2, sib1))
self.vertices[sib1].siblings.remove(self.vertices[sib2])
self.vertices[sib2].siblings.remove(self.vertices[sib1])
[docs] def has_biedge(self, sib1, sib2):
"""
Check existence of a bidirected edge.
:param sib1: endpoint 1 of edge.
:param sib2: endpoint 2 of edge.
:return: boolean result of existence.
"""
if (sib1, sib2) in self.bi_edges or (sib2, sib1) in self.bi_edges:
return True
return False
[docs] def has_udedge(self, neb1, neb2):
"""
Check existence of an undirected edge.
:param neb1: endpoint 1 of edge.
:param neb2: endpoint 2 of edge.
:return: boolean result of existence.
"""
if (neb1, neb2) in self.ud_edges or (neb2, neb1) in self.ud_edges:
return True
return False
[docs] def add_udedge(self, neb1, neb2):
"""
Add an undirected edge to the graph.
:param neb1: endpoint 1 of edge.
:param neb2: endpoint 2 of edge.
:return: None.
"""
if (neb1, neb2) and (neb2, neb1) not in self.ud_edges:
self.ud_edges.add((neb1, neb2))
self.vertices[neb1].neighbors.add(self.vertices[neb2])
self.vertices[neb2].neighbors.add(self.vertices[neb1])
[docs] def delete_udedge(self, neb1, neb2):
"""
Delete given undirected edge from the graph.
:param neb1: endpoint 1 of edge.
:param neb2: endpoint 2 of edge.
:return: None.
"""
try:
self.ud_edges.remove((neb1, neb2))
except KeyError:
self.ud_edges.remove((neb2, neb1))
self.vertices[neb1].neighbors.remove(self.vertices[neb2])
self.vertices[neb2].neighbors.remove(self.vertices[neb1])
#### GENEALOGICAL HELPERS ####
[docs] def parents(self, vertices):
"""
Get parents of a vertex or set of vertices.
:param vertices: iterable of vertex names.
:return: set of parents.
"""
parents = set()
if type(vertices) == str:
vertices = [vertices]
for v in vertices:
for p in self.vertices[v].parents:
parents.add(p.name)
return parents
[docs] def children(self, vertices):
"""
Get children of a vertex or set of vertices.
:param vertices: iterable of vertex names.
:return: set of children.
"""
children = set()
if type(vertices) == str:
vertices = [vertices]
for v in vertices:
for c in self.vertices[v].children:
children.add(c.name)
return children
[docs] def neighbors(self, vertices):
"""
Get neighbors of a vertex or set of vertices.
:param vertices: iterable of vertex names.
:return: set of neighbors.
"""
neighbors = set()
if type(vertices) == str:
vertices = [vertices]
for v in vertices:
for n in self.vertices[v].neighbors:
neighbors.add(n.name)
return neighbors
[docs] def siblings(self, vertices):
"""
Get siblings of a vertex or set of vertices.
:param vertices: vertex name or iterable of vertex names.
:return: set of neighbors.
"""
siblings = set()
if type(vertices) == str:
vertices = [vertices]
for v in vertices:
for s in self.vertices[v].siblings:
siblings.add(s.name)
return siblings
[docs] def ancestors(self, vertices):
"""
Get the ancestors of a vertex or set of vertices.
:param vertices: single vertex name or iterable of vertex names to find ancestors for.
:return: set of ancestors.
"""
if type(vertices) == str:
vertices = [vertices]
ancestors = set()
visit_stack = list([self.vertices[v] for v in vertices])
while visit_stack:
v = visit_stack.pop()
ancestors.add(v.name)
visit_stack.extend(p for p in v.parents if p not in ancestors)
return ancestors
[docs] def descendants(self, vertices):
"""
Get the descendants of a vertex or set of vertices.
:param vertices: single vertex name or iterable of vertex names to find descendants for.
:return: set of descendants.
"""
if type(vertices) == str:
vertices = [vertices]
descendants = set()
visit_stack = list([self.vertices[v] for v in vertices])
while visit_stack:
v = visit_stack.pop()
descendants.add(v.name)
visit_stack.extend(c for c in v.children if c not in descendants)
return descendants
def _bfs_directed_paths(self, source, sink):
"""
Use BFS to find directed paths from a source vertex to sink vertices.
:param source: name of a single source.
:param sink: iterable of possibly multiple sinks.
:return: list of directed paths.
"""
sink_vertices = [self.vertices[v] for v in sink]
queue = [(self.vertices[source], [])]
while queue:
(current_vertex, edge_path) = queue.pop(0)
for c in current_vertex.children:
if c in sink_vertices:
yield edge_path + [(current_vertex.name, c.name)]
else:
queue.append(
(c, edge_path + [(current_vertex.name, c.name)])
)
[docs] def directed_paths(self, source, sink):
"""
Get all directed paths between sets of source vertices and sink vertices.
:param source: a set of vertices that serve as the source.
:param sink: a set of vertices that serve as the sink.
:return: list of directed paths.
"""
# we use BFS for finding all paths
directed_paths = []
for u in source:
directed_paths += self._bfs_directed_paths(u, sink)
return directed_paths
[docs] def copy(self):
"""
Returns copy of a graph
"""
return copy.deepcopy(self)
[docs] def topological_sort(self):
"""
Perform a topological sort from roots (parentless nodes)
to leaves, as ordered by directed edges on the graph.
:return: list corresponding to a valid topological order.
"""
# create a copy of the graph
G = copy.deepcopy(self)
# initialize roots -- vertices that have no incoming edges
roots = [v for v in self.vertices if len(G.parents([v])) == 0]
top_order = []
# iterate until we have explored all nodes
while roots:
# pick a current root
r = roots.pop()
top_order.append(r)
# iterate over all its children
children = G.children([r])
for c in children:
# delete the edge
G.delete_diedge(r, c)
# if r was the only remaining parent, this is now a root
if len(G.parents([c])) == 0:
roots.append(c)
# return the order
return top_order
[docs] def pre(self, vertices, top_order):
"""
Find all nodes prior to the given set of vertices under a topological order.
:param vertices: iterable of vertex names.
:param top_order: a valid topological order.
:return: list corresponding to the order up until the given vertices.
"""
# find all elements that are previous in the topological order
# by iterating over the order until we encounter one of the vertices
pre = []
for v in top_order:
if v in vertices:
break
pre.append(v)
return pre
# def post(self, vertices, top_order):
# """
# Find all nodes that succeed the given set of vertices under a topological order.
#
# :param vertices: iterable of vertex names.
# :param top_order: a valid topological order.
# :return: list corresponding to the order up until the given vertices.
# """
#
# # find all elements that are previous in the topological order
# # by iterating over the order until we encounter one of the vertices
# post = []
# for v in top_order:
# if v in vertices:
# break
# pre.append(v)
#
# return post
[docs] def subgraph(self, vertices):
"""
Return a subgraph on the given vertices (i.e. a graph containing only
the specified vertices and edges between them).
:param vertices: set containing names of vertices in the subgraph.
:return: a new Graph object corresponding to the subgraph.
"""
subgraph = copy.deepcopy(self)
for v in set(self.vertices) - set(vertices):
subgraph.delete_vertex(v)
return subgraph
[docs] def draw(self, direction=None):
"""
Visualize the graph.
:return : dot language representation of the graph.
"""
from graphviz import Digraph
dot = Digraph()
# set direction from left to right if that's preferred
if direction == "LR":
dot.graph_attr["rankdir"] = direction
for v in self.vertices.values():
dot.node(
v.name,
shape="square" if v.fixed else "plaintext",
height=".5",
width=".5",
)
for parent, child in self.di_edges:
# special clause for SWIGs
if parent.lower() == child:
dot.edge(parent, child, style="invis")
else:
dot.edge(parent, child, color="blue")
for sib1, sib2 in self.bi_edges:
dot.edge(sib1, sib2, dir="both", color="red")
for neb1, neb2 in self.ud_edges:
dot.edge(neb1, neb2, dir="none", color="brown")
return dot