"""
Class for Intrinsic Graphs (IGs) -- a mixed graph used to compute intrinsic sets
of an ADMG in polynomial time.
"""
import itertools
import logging
from .graph import Graph
logger = logging.getLogger(__name__)
[docs]class IG(Graph):
def __init__(self, admg):
"""
Constructor.
:param admg: ADMG object to calculate intrinsic sets for.
"""
self.admg = admg
self.iset_cadmg_map = (
{}
) # dictionary mapping intrinsic sets to the reachable CADMG
self.iset_fixing_order_map = {}
super().__init__()
# the IG is initialized with vertices corresponding to
# reachable closures of singletons (these are guaranteed to be intrinsic)
for v in admg.vertices:
if not admg.vertices[v].fixed:
rc, fixing_order, G = self.admg.reachable_closure([v])
rc = frozenset(rc)
self.add_vertex(rc)
self.iset_cadmg_map[rc] = G
self.iset_fixing_order_map[rc] = fixing_order
# add di/bi edges that fulfill subset relation
for i in self.vertices:
if i < rc:
self.add_diedge(i, rc)
elif rc < i:
self.add_diedge(rc, i)
if not (
i in self.ancestors([rc]) or rc in self.ancestors([i])
) and self.bidirected_connected(i, rc):
self.add_biedge(i, rc)
[docs] def bidirected_connected(self, s1, s2):
"""
Check if two sets are bidirected connected in the original ADMG
:param s1: First set corresponding to a vertex in the IG.
:param s2: Second set corresponding to a vertex in the IG.
:return: boolean corresponding to connectedness.
"""
possible_endpoints = itertools.product(*[s1, s2])
for combination in possible_endpoints:
if self.admg.has_biedge(*combination):
return True
return False
[docs] def maintain_subset_relation(self, s):
"""
Add di edges to a newly inserted vertex s so as to maintain
the subset relation.
:param s: Frozenset corresponding to the new vertex.
:return: None.
"""
for i in self.vertices:
if i < s:
self.add_diedge(i, s)
elif s < i:
self.add_diedge(s, i)
[docs] def add_new_biedges(self, s):
"""
Naive O(``|I(G)|`` choose 2) implementation. Must ensure that biedges not added to ancestors.
:param s: Frozen set corresponding to the new vertex.
:return: None.
"""
ancestors_s = self.ancestors([s])
for i in self.vertices:
if i not in ancestors_s and self.bidirected_connected(i, s):
self.add_biedge(i, s)
[docs] def merge(self, s1, s2):
"""
Merge operation on two sets s1 and s2 to give a (possibly)
new intrinsic set s3.
:param s1: First set corresponding to a vertex in the IG.
:param s2: Second set corresponding to a vertex in the IG.
:return: None.
"""
s3c = set(s1)
s3c.update(set(s2))
s3, fixing_order, G = self.admg.reachable_closure(s3c)
s3 = frozenset(s3)
self.delete_biedge(s1, s2)
# if the intrinsic set already exists, ignore it
if s3 in self.vertices:
return
# add the vertex and add di edges
self.add_vertex(s3)
self.iset_cadmg_map[s3] = G
self.iset_fixing_order_map[s3] = fixing_order
self.maintain_subset_relation(s3)
# add new bi edges to the added vertex
self.add_new_biedges(s3)
[docs] def get_intrinsic_sets(self):
"""
Get all intrinsic sets for given ADMG.
:return:
"""
# keep merging vertices that are connected
# by bidirected edges to obtain new intrinsic
# sets until there are none left
while len(self.bi_edges) > 0:
u, v = next(iter(self.bi_edges))
self.merge(u, v)
return set(self.vertices)
[docs] def get_heads_tails(self):
"""
Iterate through all intrinsic sets and get the recursive heads and tails.
:return: List of tuples corresponding to (head, tail).
"""
# make sure that the graph is fully initialized
try:
assert len(self.bi_edges) == 0
except AssertionError:
print(
"There are still bidirected edges indicating more intrinsic sets can be merged"
)
heads_and_tails = []
# go through each intrinsic set
for iset in self.vertices:
# look at the reachable CADMG to determine head/tail
reachable_cadmg = self.iset_cadmg_map[iset]
head = set()
# heads are vertices that are childless in the corresponding reachable CADMG
for v in iset:
if len(reachable_cadmg.children([v])) == 0:
head.add(v)
# tail is the remaining intrinsic set + parents of the intrinsic set
tail = set(iset - head)
tail = tail.union(reachable_cadmg.parents(iset))
heads_and_tails.append((head, tail))
return heads_and_tails