Source code for ananke.models.linear_gaussian_sem

"""
Class for Linear Gaussian SEMs parametrized
by a matrix B representing regression coefficients
and a matrix Omega representing correlated errors
"""

import functools

import numpy as np
import pandas as pd
import statsmodels.api as sm
from scipy.optimize import minimize


[docs]def is_positive_definite(X): try: np.linalg.cholesky(X) return True except LinAlgError: return False
[docs]class LinearGaussianSEM: def __init__(self, graph): """ Fits a linear Gaussian structural equation model to an :class:`ADMG` using the recursive iterative conditional fitting algorithm as proposed by Drton and Richardson. :param graph: graph (:class:`ADMG`) corresponding to the linear Gaussian SEM. """ self.graph = graph # for a linear Gaussian SEM each edge is a parameter + noise for each vertex self.n_params = ( len(graph.di_edges) + len(graph.bi_edges) + len(graph.vertices) ) self._vertex_index_map = {v: i for i, v in enumerate(graph.vertices)} self._parent_index_map = { v: [self._vertex_index_map[p] for p in graph.parents([v])] for v in graph.vertices } self._sibling_index_map = { v: [self._vertex_index_map[s] for s in graph.siblings([v])] for v in graph.vertices } self.X_ = None # data matrix self.S_ = None # sample covariance matrix self.B_ = None # direct edge coefficients self.omega_ = None # correlation of errors
[docs] def neg_loglikelihood(self, X): """ Calculate log-likelihood of the data given the model. :param X: a N x M dimensional data matrix. :param weights: optional 1d numpy array with weights for each data point (rows with higher weights are given greater importance). :return: a float corresponding to the log-likelihood. """ assert self.B_ is not None S = np.cov(X.T) assert is_positive_definite( S ), "Sample covariance matrix is not positive definite" n, d = X.shape sigma = ( np.linalg.inv(np.eye(d) - self.B_)
[docs] @ (self.omega_) @ np.linalg.inv((np.eye(d) - self.B_).T) ) # assert is_positive_definite(sigma) neg_lld = (n / 2) * ( np.log(np.linalg.det(sigma)) + np.trace(np.dot(np.linalg.inv(sigma), S)) ) return neg_lld
def bic(self, X): """ Calculate Bayesian information criterion of the data given the model. :param X: a N x M dimensional data matrix. :param weights: optional 1d numpy array with weights for each data point (rows with higher weights are given greater importance). :return: a float corresponding to the log-likelihood. """ return 2 * self.neg_loglikelihood(X) + np.log(len(X)) * self.n_params
[docs] def fit(self, X, tol=1e-6, disp=None, standardize=False, max_iters=100): """ Fit the model to data via (weighted) maximum likelihood estimation :param X: data -- a N x M dimensional pandas data frame. :param weights: optional 1d numpy array with weights for each data point (rows with higher weights are given greater importance). :return: self. """ # make copy of the data and standardize if necessary self.X_ = X.values.copy() n, d = self.X_.shape if standardize: self.X_ = (self.X_ - np.mean(self.X_, axis=0)) / np.std( self.X_, axis=0 ) # compute sample covariance self.S_ = np.cov(self.X_.T) # initialize B and omega self.B_, self.omega_ = np.zeros((d, d)), np.eye(d) # keep going until desired convergence cur_lld = self.neg_loglikelihood(self.X_) lld_diff = tol + 1 n_iters = 0 while np.abs(lld_diff) > tol or n_iters == max_iters: n_iters += 1 for var in self.graph.vertices: # get index of the vertex var_index = self._vertex_index_map[var] # get epsilon_minusi epsilon = self.X_ - self.X_ @ self.B_.T epsilon_minusi = np.delete(epsilon, var_index, axis=1) # get omega and calculate pseudo variables Zi omega_minusi = np.delete(self.omega_, var_index, axis=0) omega_minusii = np.delete(omega_minusi, var_index, axis=1) omega_minusii_inv = np.linalg.inv(omega_minusii) # calculate Z_minusi Z_minusi = (omega_minusii_inv @ epsilon_minusi.T).T # insert a column of zeros to maintain the shape Z = np.insert(Z_minusi, var_index, 0, axis=1) Y = self.X_[:, var_index] parent_mat = self.X_[:, self._parent_index_map[var]] pseudo_mat = Z[:, self._sibling_index_map[var]] Xmat = np.hstack((np.ones((n, 1)), parent_mat, pseudo_mat)) results = sm.OLS(Y, Xmat).fit() # need to now extract results and put it in the right spots i = 1 for idx in self._parent_index_map[var]: self.B_[var_index, idx] = results.params[i] i += 1 for idx in self._sibling_index_map[var]: self.omega_[var_index, idx] = results.params[i] self.omega_[idx, var_index] = results.params[i] i += 1 parent_mat = np.hstack((np.ones((n, 1)), parent_mat)) _, cols = parent_mat.shape omega_i_minusi = np.delete(self.omega_, var_index, axis=1)[ var_index, : ] omega_minusi_i = np.delete(self.omega_, var_index, axis=0)[ :, var_index ] omega_ii = results.scale + ( omega_i_minusi @ omega_minusii_inv @ omega_minusi_i ) self.omega_[var_index, var_index] = omega_ii new_lld = self.neg_loglikelihood(self.X_) lld_diff = new_lld - cur_lld cur_lld = new_lld return self
[docs] def total_effect(self, A, Y): """ Calculate the total causal effect of a set of treatments A on a set of outcomes Y. :param A: iterable corresponding to variable names that act as treatments. :param Y: iterable corresponding to variable names that act as outcomes. :return: a float corresponding to the total causal effect. """ directed_paths = self.graph.directed_paths(A, Y) # if there are no directed paths, the effect is 0 if len(directed_paths) == 0: return 0 # otherwise do path analysis total_effect = 0 for path in directed_paths: path_effect = 1 for u, v in path: path_effect *= self.B_[ self._vertex_index_map[v], self._vertex_index_map[u] ] total_effect += path_effect return total_effect
[docs] def draw(self, direction=None): """ Visualize the graph. :return : dot language representation of the graph. """ from graphviz import Digraph if self.B_ is None: raise AssertionError("Model must be fit before model can be drawn.") dot = Digraph() # set direction from left to right if that's preferred if direction == "LR": dot.graph_attr["rankdir"] = direction for v in self.graph.vertices.values(): dot.node(v.name, shape="plaintext", height=".5", width=".5") for parent, child in self.graph.di_edges: i, j = self._vertex_index_map[child], self._vertex_index_map[parent] dot.edge( parent, child, color="blue", label=str(round(self.B_[i, j], 2)), fontsize="12", ) for sib1, sib2 in self.graph.bi_edges: i, j = self._vertex_index_map[sib1], self._vertex_index_map[sib2] dot.edge( sib1, sib2, dir="both", color="red", label=str(round(self.omega_[i, j], 2)), fontsize="12", ) return dot