Source code for ananke.graphs.sg

"""
Class for segregated graphs (SGs).
"""
import copy
import logging

from .graph import Graph

logger = logging.getLogger(__name__)


[docs]class SG(Graph): def __init__( self, vertices=[], di_edges=set(), bi_edges=set(), ud_edges=set(), **kwargs, ): """ Constructor :param vertices: iterable of names of vertices. :param di_edges: iterable of tuples of directed edges i.e. (X, Y) = X -> Y. :param bi_edges: iterable of tuples of bidirected edges i.e. (X, Y) = X <-> Y. :param ud_edges: iterable of tuples of undirected edges i.e. (X, Y) = X - Y. """ # initialize vertices in SG super().__init__( vertices, di_edges=di_edges, bi_edges=bi_edges, ud_edges=ud_edges, **kwargs, ) if not self._segregated(): raise TypeError("Graph is not segregated") if not self._acyclic(): raise TypeError("TypeError: Graph is not acyclic") # a mapping of vertices to their district ids # and a list of districts self._district_map = {} self._districts = [] # a mapping of vertices to their block ids # and a list of blocks # NOTE: in an SG, only blocks of size >= 2 are considered self._block_map = {} self._blocks = [] #### VALID GRAPH CHECKS #### def _acyclic(self): """ Checks if the graph is directed and partially directed cycle free. :return: boolean indicator whether graph is acyclic. """ # TODO: check correctness for vertex in self.vertices.values(): if self._dfs_cycle(vertex): return False return True def _dfs_cycle(self, vertex): """ Check if there exists a cyclic path starting at a given vertex. :param vertex: vertex object to start DFS at. :return: boolean indicating whether there is a cycle. """ visit_stack = [vertex] prev_vertex = None partially_directed = False visited = set() # DFS through children and neighbors for partially directed cycle while visit_stack and not visited.issuperset(visit_stack): visited.update(visit_stack) v = visit_stack.pop() if len(v.children) != 0: partially_directed = True visit_stack.extend( s for s in v.children.union(v.neighbors) - {prev_vertex} ) prev_vertex = v # if we arrive at the same vertex, then we have a cycle if vertex in visit_stack and partially_directed: return True return False def _segregated(self): """ Checks if graph is segregated i.e. lacks Z <-> X - Y. :return: boolean indicator whether graph is segregated. """ # TODO: is this enough to check segregated property? for vertex in self.vertices.values(): if len(vertex.siblings) > 0 and len(vertex.neighbors) > 0: return False return True #### DISTRICT CODE #### @property def districts(self): """ Returns list of all districts in the graph. :return: list of sets corresponding to districts in the graph. """ return self._calculate_districts() def _calculate_districts(self): """ Update districts in the graph. :return: List of districts. """ self._district_map = {} district_counter = 0 # Add all vertices to the district_map for vertex in self.vertices: if ( vertex not in self._district_map and not self.vertices[vertex].fixed ): self._dfs_district(self.vertices[vertex], district_counter) district_counter += 1 # Now process the district_map into a list of lists districts = [set() for _ in range(district_counter)] for vertex, district_id in self._district_map.items(): districts[district_id].add(vertex) self._districts = districts return districts def _dfs_district(self, vertex, district_id): """ DFS from vertex to discover its district. :param vertex: vertex object to start the DFS at. :param district_id: int corresponding to district ID :return: None """ visit_stack = [vertex] while visit_stack: v = visit_stack.pop() self._district_map[v.name] = district_id visit_stack.extend( s for s in v.siblings if s.name not in self._district_map )
[docs] def district(self, vertex): """ Returns the district of a vertex. :param vertex: name of the vertex. :return: set corresponding to district. """ if not self._districts: self.districts return self._districts[self._district_map[vertex]]
#### BLOCK CODE #### @property def blocks(self): """ Returns list of all blocks in the graph. :return: list of sets corresponding to blocks in the graph. """ return self._calculate_blocks() def _calculate_blocks(self): """ Update blocks in the graph. :return: None. """ self._block_map = {} block_counter = 0 # Add all vertices to the district_map for vertex in self.vertices: if ( vertex not in self._block_map and not self.vertices[vertex].fixed ): self._dfs_block(self.vertices[vertex], block_counter) block_counter += 1 # Now process the district_map into a list of lists self._blocks = [set() for _ in range(block_counter)] for vertex, block_id in self._block_map.items(): self._blocks[block_id].add(vertex) return self._blocks def _dfs_block(self, vertex, block_id): """ DFS from vertex to discover its block. :param vertex: vertex object to start the DFS at. :param block_id: int corresponding to block ID. :return: None """ visit_stack = [vertex] while visit_stack: v = visit_stack.pop() self._block_map[v.name] = block_id visit_stack.extend( n for n in v.neighbors if n.name not in self._block_map )
[docs] def block(self, vertex): """ Returns the block of a vertex. :param vertex: name of the vertex. :return: set corresponding to block. """ if not self._blocks: self.blocks block = self._blocks[self._block_map[vertex]] return block
[docs] def add_biedge(self, sib1, sib2, recompute=True): """ Add a bidirected edge to the graph. Overridden to recompute districts. :param sib1: endpoint 1 of edge. :param sib2: endpoint 2 of edge. :param recompute: boolean indicating whether districts should be recomputed. :return: None. """ super().add_biedge(sib1, sib2) if recompute: self._districts = self._calculate_districts()
[docs] def delete_biedge(self, sib1, sib2, recompute=True): """ Delete given bidirected edge from the graph. Overridden to recompute districts. :param sib1: endpoint 1 of edge. :param sib2: endpoint 2 of edge. :param recompute: boolean indicating whether districts should be recomputed. :return: None. """ super().delete_biedge(sib1, sib2) if recompute: self._districts = self._calculate_districts()
[docs] def add_udedge(self, neb1, neb2, recompute=True): """ Add an undirected edge to the graph. Overridden to recompute blocks :param neb1: endpoint 1 of edge. :param neb2: endpoint 2 of edge. :param recompute: boolean indicating whether blocks should be recomputed. :return: None. """ super().add_udedge(neb1, neb2) if recompute: self._blocks = self._calculate_blocks()
[docs] def delete_udedge(self, neb1, neb2, recompute=True): """ Delete given undirected edge from the graph. Overridden to recompute blocks. :param neb1: endpoint 1 of edge. :param neb2: endpoint 2 of edge. :param recompute: boolean indicating whether blocks should be recomputed. :return: None. """ super().delete_udedge(neb1, neb2) if recompute: self._blocks = self._calculate_blocks()
[docs] def fix(self, vertices): """ Perform the graphical operation of fixing on a set of vertices. Does not check that vertices are fixable. :param vertices: iterable of vertices to be fixed. :return: None. """ if type(vertices) == str: vertices = [vertices] for v in vertices: self.vertices[v].fixed = True # delete incoming directed edges parents = [p.name for p in self.vertices[v].parents] for p in parents: self.delete_diedge(p, v) # delete bidirected edges siblings = [s.name for s in self.vertices[v].siblings] for s in siblings: self.delete_biedge(s, v, recompute=False) # delete undirected edges neighbors = [n.name for n in self.vertices[v].neighbors] for n in neighbors: if self.vertices[n].fixed: self.delete_udedge(n, v, recompute=False) # recompute the districts and blocks as they may have changed self._districts = self._calculate_districts() self._blocks = self._calculate_blocks() return self
[docs] def fixable(self, vertices): """ Check if there exists a valid fixing order and return such an order in the form of a list, else returns an empty list. :param vertices: set of vertices to check fixability for. :return: a boolean indicating whether the set was fixable and a valid fixing order as a stack. """ # keep track of vertices still left to fix # and initialize a fixing order G = copy.deepcopy(self) remaining_vertices = list(sorted(vertices)) fixing_order = [] fixed = True # flag to check if we fixed a variable on each pass # while we have more vertices to fix, and were able to perform a fix while remaining_vertices and fixed: fixed = False for v in remaining_vertices: # check if any nodes are reachable via -> AND <-> # by looking at intersection of district and descendants if len(G.descendants([v]).intersection(G.district(v))) == 1: G.fix([v]) remaining_vertices.remove(v) fixing_order.append(v) fixed = True break # if unsuccessful, return failure and # fixing order up until point of failure if not fixed: return False, fixing_order # if fixing vertices was successful, return success # and the fixing order return True, fixing_order