Source code for ananke.estimation.automated_if

"""
Class for automated derivation of influence functions.
"""

import copy

from .counterfactual_mean import CausalEffect


[docs]class AutomatedIF: """ IF for a single treatment and single outcome. """ def __init__(self, graph, treatment, outcome): """ Constructor. :param graph: ADMG corresponding to substantive knowledge/model. :param treatment: name of vertex corresponding to the intervention. :param outcome: name of vertex corresponding to the outcome. """ # without loss of generality, focus on ancestors of Y self.graph = copy.deepcopy(graph) self.treatment = treatment self.outcome = outcome self.ace = CausalEffect(self.graph, self.treatment, self.outcome) if self.ace.strategy == "Not ID": raise RuntimeError("Query is not identified.") self.beta_primal_ = "" self.beta_dual_ = "" self.nonparametric_if_ = "" self.eff_if_ = "" if self.ace.strategy == "a-fixable": self._format_augmented_ipw() elif self.ace.strategy == "p-fixable": self._format_augmented_primal_ipw() else: raise NotImplementedError( "Influence functions are available only when the treatment is " + "a-fixable or p-fixable" ) def _format_density(self, V, cond_set, intervened=False, expectation=False): """ Provide appropriate formatting of a p(V | conditioning set). :param V: vertex of interest. :param cond_set: set of vertices to condition on. :param expectation: whether it is a conditional expectation instead of density i.e. E[ | ]. :return: string formatted appropriately. """ density = "" if len(cond_set) == 0: if expectation: density = "E[{}]".format(V) else: density = "p({})".format(V) else: if expectation: density = "E[{}|{}]".format(V, ",".join(cond_set)) else: density = "p({}|{})".format(V, ",".join(cond_set)) if intervened: density = density.replace( self.treatment, self.treatment + "=" + self.treatment.lower() ) return density def _format_augmented_ipw(self): """ Format the IF for augmented IPW and efficient augmented IPW if possible. :return: None. """ # get the Markov pillow of T mp_T = self.graph.markov_pillow([self.treatment], self.ace.p_order) # format the propensity score/IPW ipw_form = "I({}={}) x 1/".format( self.treatment, self.treatment.lower() ) ipw_form += self._format_density(self.treatment, mp_T) self.beta_primal_ += ipw_form + " x {}".format(self.outcome) # format the outcome regression self.beta_dual_ += self._format_density( self.outcome, mp_T.union({self.treatment}), expectation=True, intervened=True, ) # nonparametric IF self.nonparametric_if_ = ( ipw_form + " x ({} - {})".format(self.outcome, self.beta_dual_) + " + " + self.beta_dual_ + " - Ψ" ) # eff IF if nonparametric saturated is just nonparametric IF if self.graph.nonparametric_saturated(): self.eff_if_ = self.nonparametric_if_ # otherwise do projections if graph is mb-shielded elif self.ace.is_mb_shielded: contributions = [] for V in set(self.graph.vertices).difference({self.treatment}): mpV = self.graph.markov_pillow([V], self.ace.p_order) VmpV = set([V]).union(mpV) contributions.append( self._format_density("βprimal", VmpV, expectation=True) + " - " + self._format_density("βprimal", mpV, expectation=True) ) self.eff_if_ = " + ".join(contributions) # otherwise we do not know how to compute the efficient IF else: self.eff_if_ = "Cannot compute, graph is not mb-shielded." def _format_augmented_primal_ipw(self): """ Format the IF for augmented primal IPW and efficient augmented primal IPW if possible. :return: """ # C := pre-treatment vars and L := post-treatment vars in district of treatment C = self.graph.pre([self.treatment], self.ace.p_order) post = set(self.graph.vertices).difference(C) L = post.intersection(self.graph.district(self.treatment)) self.beta_primal_ = "I({}={}) x 1/[".format( self.treatment, self.treatment.lower() ) primal_terms = [] for Li in L.difference([self.outcome]): mpLi = self.graph.markov_pillow([Li], self.ace.p_order) primal_terms.append(self._format_density(Li, mpLi)) self.beta_primal_ += "".join(primal_terms) + "] x Σ_{} ".format( self.treatment ) self.beta_primal_ += "".join(primal_terms) if self.outcome in L: mpY = self.graph.markov_pillow([self.outcome], self.ace.p_order) self.beta_primal_ += " x " + self._format_density( self.outcome, mpY, expectation=True ) else: self.beta_primal_ += " x {}".format(self.outcome) # M := inverse Markov pillow of the treatment M = set( [ m for m in self.graph.vertices if self.treatment in self.graph.markov_pillow([m], self.ace.p_order) ] ) M = M.difference(self.graph.district(self.treatment)) dual_terms = [] for Mi in M.difference([self.outcome]): mpMi = self.graph.markov_pillow([Mi], self.ace.p_order) dual_fixed_term = self._format_density(Mi, mpMi, intervened=True) dual_random_term = self._format_density(Mi, mpMi) dual_terms.append( "[" + dual_fixed_term + "/" + dual_random_term + "]" ) self.beta_dual_ = " x ".join(dual_terms) if self.outcome in M: mpY = self.graph.markov_pillow([self.outcome], self.ace.p_order) self.beta_dual_ += " x " + self._format_density( self.outcome, mpY, expectation=True, intervened=True ) else: self.beta_dual_ += " x {}".format(self.outcome) # nonparametric IF contributions = [] for V in self.graph.vertices: preV = self.graph.pre([V], self.ace.p_order) VpreV = set([V]).union(preV) if V in M: contributions.append( self._format_density("βprimal", VpreV, expectation=True) + " - " + self._format_density("βprimal", preV, expectation=True) ) elif V in L: contributions.append( self._format_density("βdual", VpreV, expectation=True) + " - " + self._format_density("βdual", preV, expectation=True) ) else: contributions.append( self._format_density( "βprimal or βdual", VpreV, expectation=True ) + " - " + self._format_density( "βprimal or βdual", preV, expectation=True ) ) self.nonparametric_if_ = " + ".join(contributions) # eff IF if nonparametric saturated is just nonparametric IF if self.graph.nonparametric_saturated(): self.eff_if_ = self.nonparametric_if_ elif self.ace.is_mb_shielded: contributions = [] for V in self.graph.vertices: mpV = self.graph.markov_pillow([V], self.ace.p_order) VmpV = set([V]).union(mpV) if V in M: contributions.append( self._format_density("βprimal", VmpV, expectation=True) + " - " + self._format_density("βprimal", mpV, expectation=True) ) elif V in L: contributions.append( self._format_density("βdual", VmpV, expectation=True) + " - " + self._format_density("βdual", mpV, expectation=True) ) else: contributions.append( self._format_density( "βprimal or βdual", VmpV, expectation=True ) + " - " + self._format_density( "βprimal or βdual", mpV, expectation=True ) ) self.eff_if_ = " + ".join(contributions) # otherwise we do not know how to compute the efficient IF else: self.eff_if_ = "Cannot compute, graph is not mb-shielded."