Source code for med_bench.get_simulated_data

import numpy as np
from numpy.random import default_rng
from scipy import stats
from scipy.special import expit


[docs] def simulate_data(n, rg, mis_spec_m=False, mis_spec_y=False, dim_x=1, dim_m=1, seed=None, type_m='binary', sigma_y=0.5, sigma_m=0.5, beta_t_factor=1, beta_m_factor=1): """Simulate data for mediation analysis Parameters ---------- n: :obj:`int`, Number of samples to generate. rg: RandomState instance, Controls the pseudo random number generator used to generate the data at fit time. mis_spec_m: obj:`bool`, Whether the mediator generation is misspecified or not defaults to False mis_spec_y: obj:`bool`, Whether the output model is misspecified or not defaults to False dim_x: :obj:`int`, optional, Number of covariates in the input. Defaults to 1 dim_m: :obj:`int`, optional, Number of mediatiors to generate. Defaults to 1 seed: :obj:`int` or None, optional, Controls the pseudo random number generator used to generate the coefficients of the model. Pass an int for reproducible output across multiple function calls. Defaults to None type_m: :obj:`str`, Whether the mediator is binary or continuous Defaults to 'binary', sigma_y: :obj:`float`, noise variance on outcome Defaults to 0.5, sigma_m :obj:`float`, noise variance on mediator Defaults to 0.5, beta_t_factor: :obj:`float`, scaling factor on treatment effect, Defaults to 1, beta_m_factor: :obj:`float`, scaling factor on mediator, Defaults to 1, returns ------- x: ndarray of shape (n, dim_x) the simulated covariates t: ndarray of shape (n, 1) the simulated treatment m: ndarray of shape (n, dim_m) the simulated mediators y: ndarray of shape (n, 1) the simulated outcome total: :obj:`float`, the total simulated effect theta_1: :obj:`float`, the natural direct effect on the treated, theta_0: :obj:`float`, the natural direct effect on the untreated, delta_1: :obj:`float`, the natural indirect effect on the treated, delta_0: :obj:`float`, the natural indirect effect on the untreated, p_t: ndarray of shape (n, 1), Propensity score th_p_t_mx: ndarray of shape (n, 1), overlap """ rg_coef = default_rng(seed) x = rg.standard_normal(n * dim_x).reshape((n, dim_x)) alphas = np.ones(dim_x) / dim_x p_t = expit(alphas.dot(x.T)) t = rg.binomial(1, p_t, n).reshape(-1, 1) t0 = np.zeros((n, 1)) t1 = np.ones((n, 1)) # generate the mediator M beta_x = rg_coef.standard_normal((dim_x, dim_m)) * 1 / (dim_m * dim_x) beta_t = np.ones((1, dim_m)) * beta_t_factor if mis_spec_m: beta_xt = rg_coef.standard_normal((dim_x, dim_m)) * 1 / (dim_m * dim_x) else: beta_xt = np.zeros((dim_x, dim_m)) if type_m == 'binary': p_m0 = expit(x.dot(beta_x) + beta_t * t0 + x.dot(beta_xt) * t0) p_m1 = expit(x.dot(beta_x) + beta_t * t1 + x.dot(beta_xt) * t1) pre_m = rg.random(n) m0 = ((pre_m < p_m0.ravel()) * 1).reshape(-1, 1) m1 = ((pre_m < p_m1.ravel()) * 1).reshape(-1, 1) m_2d = np.hstack((m0, m1)) m = m_2d[np.arange(n), t[:, 0]].reshape(-1, 1) else: random_noise = sigma_m * rg.standard_normal((n, dim_m)) m0 = x.dot(beta_x) + t0.dot(beta_t) + t0 * \ (x.dot(beta_xt)) + random_noise m1 = x.dot(beta_x) + t1.dot(beta_t) + t1 * \ (x.dot(beta_xt)) + random_noise m = x.dot(beta_x) + t.dot(beta_t) + t * (x.dot(beta_xt)) + random_noise # generate the outcome Y gamma_m = np.ones((dim_m, 1)) * 0.5 / dim_m * beta_m_factor gamma_x = np.ones((dim_x, 1)) / dim_x**2 gamma_t = 1.2 if mis_spec_y: gamma_t_m = np.ones((dim_m, 1)) * 0.5 / dim_m else: gamma_t_m = np.zeros((dim_m, 1)) y = x.dot(gamma_x) + gamma_t * t + m.dot(gamma_m) + \ m.dot(gamma_t_m) * t + sigma_y * rg.standard_normal((n, 1)) # Compute differents types of effects if type_m == 'binary': theta_1 = gamma_t + gamma_t_m * np.mean(p_m1) theta_0 = gamma_t + gamma_t_m * np.mean(p_m0) delta_1 = np.mean( (p_m1 - p_m0) * (gamma_m.flatten() + gamma_t_m.dot(t1.T))) delta_0 = np.mean( (p_m1 - p_m0) * (gamma_m.flatten() + gamma_t_m.dot(t0.T))) else: # to do mean(m1) pour avoir un vecteur de taille dim_m theta_1 = gamma_t + gamma_t_m.T.dot(np.mean(m1, axis=0)) theta_0 = gamma_t + gamma_t_m.T.dot(np.mean(m0, axis=0)) delta_1 = (gamma_t * t1 + m1.dot(gamma_m) + m1.dot(gamma_t_m) * t1 - (gamma_t * t1 + m0.dot(gamma_m) + m0.dot(gamma_t_m) * t1)).mean() delta_0 = (gamma_t * t0 + m1.dot(gamma_m) + m1.dot(gamma_t_m) * t0 - (gamma_t * t0 + m0.dot(gamma_m) + m0.dot(gamma_t_m) * t0)).mean() if type_m == 'binary': pre_pm = np.hstack((p_m0.reshape(-1, 1), p_m1.reshape(-1, 1))) pre_pm[m.ravel() == 0, :] = 1 - pre_pm[m.ravel() == 0, :] pm = pre_pm[:, 1].reshape(-1, 1) else: p_m0 = np.prod(stats.norm.pdf((m - x.dot(beta_x)) - t0.dot(beta_t) - t0 * (x.dot(beta_xt)) / sigma_m), axis=1) p_m1 = np.prod(stats.norm.pdf((m - x.dot(beta_x)) - t1.dot(beta_t) - t1 * (x.dot(beta_xt)) / sigma_m), axis=1) pre_pm = np.hstack((p_m0.reshape(-1, 1), p_m1.reshape(-1, 1))) pm = pre_pm[:, 1].reshape(-1, 1) px = np.prod(stats.norm.pdf(x), axis=1) pre_pt = np.hstack(((1-p_t).reshape(-1, 1), p_t.reshape(-1, 1))) double_px = np.hstack((px.reshape(-1, 1), px.reshape(-1, 1))) denom = np.sum(pre_pm * pre_pt * double_px, axis=1) num = pm.ravel() * p_t.ravel() * px.ravel() th_p_t_mx = num.ravel() / denom return (x, t, m, y, theta_1.flatten()[0] + delta_0.flatten()[0], theta_1.flatten()[0], theta_0.flatten()[0], delta_1.flatten()[0], delta_0.flatten()[0], p_t, th_p_t_mx)