"""Treatment estimation functions"""
from linearmodels.iv import IV2SLS
from linearmodels.system.model import SUR
from statsmodels.api import add_constant
from statsmodels.multivariate.multivariate_ols import _MultivariateOLS
import numpy as np
import pandas as pd
[docs]def estimate_treatment_effect(aps = None, Y = None, Z = None, D = None, data = None, Y_ind = None, Z_ind = None, D_ind = None, aps_ind = None,
estimator: str = "2SLS", verbose: bool = True):
"""Main treatment effect estimation function
Parameters
-----------
aps: array-like, default: None
Array of estimated APS values
Y: array-like, default: None
Array of outcome variables
Z: array-like, default: None
Array of treatment recommendations
D: array-like, default: None
Array of treatment assignments
data: array-like, default: None
2D array of estimation inputs
Y_ind: int, default: None
Index of outcome variable in `data`
Z_ind: int, default: None
Index of treatment recommendation variable in `data`
D_ind: int, default: None
Index of treatment assignment variable in `data`
aps_ind: int, default: None
Index of APS variable in `data`
estimator: str, default: "2SLS"
Method of IV estimation
verbose: bool, default: True
Whether to print output of estimation
Returns
-----------
IVResults
Fitted IV model object
Notes
-----
Treatment effect is estimated using IV estimation. The default is to use the 2SLS method of estimation, with the equations illustrated below.
.. math::
D_i = \\gamma_0(1-I) + \\gamma_1 Z_i + \\gamma_2 p^s(X_i;\\delta) + v_i \\
Y_i = \\beta_0(1-I) + \\beta_1 D_i + \\beta_2 p^s(X_i;\\delta) + \\epsilon_i
:math:`\\beta_1` is our causal estimation of the treatment effect. :math:`I` is an indicator for if the ML funtion takes only a single nondegenerate value in the sample.
aps, Y, Z, D, and data should never have any overlapping columns. This is not checkable through the code, so please double check this when passing in the inputs.
"""
if data is not None:
data = np.array(data)
vals = {"Y": Y, "Z": Z, "D": D, "APS": aps}
# If `data` given, then use index inputs for values not explicitly passed
infer = []
to_del = []
if data is not None:
inds = {"Y": Y_ind, "Z": Z_ind, "D": D_ind, "APS": aps_ind}
for key, val in vals.items():
if val is None:
if inds[key] is not None:
vals[key] = data[:,inds[key]]
to_del.append(inds[key])
else:
infer.append(key)
data = np.delete(data, to_del, axis=1)
if len(infer) != 0:
print(f"Indices for {infer} not explicitly passed. Assuming remaining columns in order {infer}...")
for i in range(len(infer)):
vals[infer[i]] = data[:,i]
Y = vals["Y"]
Z = vals["Z"]
D = vals["D"]
aps = vals["APS"]
if aps is None or Y is None or Z is None or D is None:
raise ValueError("Treatment effect estimation requires all values aps, Y, Z, and D to be passed!")
lm_inp = pd.DataFrame({"Y": Y, "Z": Z, "D": D, "aps": aps})
# Use only observations where aps is nondegenerate
aps = np.array(aps)
obs_tokeep = np.nonzero((aps > 0) & (aps < 1))
print(f"We will fit on {len(obs_tokeep[0])} values out of {len(Y)} from the dataset for which the APS estimation is nondegenerate.")
assert len(obs_tokeep[0]) > 0
lm_inp = lm_inp.iloc[obs_tokeep[0],:]
# Check for single non-degeneracy
single_nondegen = True
if len(np.unique(aps[obs_tokeep])) > 1:
single_nondegen = False
lm_inp = add_constant(lm_inp)
if estimator == "2SLS":
if single_nondegen:
results = IV2SLS(lm_inp['Y'], lm_inp[['aps']], lm_inp['D'], lm_inp['Z']).fit(cov_type='robust')
else:
results = IV2SLS(lm_inp['Y'], lm_inp[['const', 'aps']], lm_inp['D'], lm_inp['Z']).fit(cov_type='robust')
elif estimator == "OLS":
if single_nondegen:
results = IV2SLS(lm_inp['Y'], lm_inp[['Z', 'aps']], None, None).fit(cov_type='unadjusted')
else:
results = IV2SLS(lm_inp['Y'], lm_inp[['const', 'Z', 'aps']], None, None).fit(cov_type='unadjusted')
else:
raise NotImplementedError(f"Estimator option {estimator} not implemented yet!")
if verbose:
print(results)
return results
[docs]def estimate_treatment_effect_controls(aps, Y, Z, D, W,
estimator: str = "2SLS", verbose: bool = True):
"""Main treatment effect estimation function with added controls
Parameters
-----------
aps: array-like, default: None
Array of estimated APS values
Y: array-like, default: None
Array of outcome variables
Z: array-like, default: None
Array of treatment recommendations
D: array-like, default: None
Array of treatment assignments
W: array-like, default: None
Array of control variables
estimator: str, default: "2SLS"
Method of IV estimation
verbose: bool, default: True
Whether to print output of estimation
Returns
-----------
IVResults
Fitted IV model object
Notes
-----
Treatment effect is estimated using IV estimation. The default is to use the 2SLS method of estimation, with the equations illustrated below.
.. math::
D_i = \\gamma_0(1-I) + \\gamma_1 Z_i + \\gamma_2 p^s(X_i;\\delta) + \\gamma_3 W_i + v_i \\
Y_i = \\beta_0(1-I) + \\beta_1 D_i + \\beta_2 p^s(X_i;\\delta) + + \\beta_3 W_i \\epsilon_i
:math:`\\beta_1` is our causal estimation of the treatment effect. :math:`I` is an indicator for if the ML funtion takes only a single nondegenerate value in the sample.
"""
aps = np.array(aps)
Y = np.array(Y)
Z = np.array(Z)
D = np.array(D)
W = np.array(W)
# Use only observations where aps is nondegenerate
obs_tokeep = np.nonzero((aps > 0) & (aps < 1))
print(f"We will fit on {len(obs_tokeep[0])} values out of {len(Y)} from the dataset for which the APS estimation is nondegenerate.")
assert len(obs_tokeep[0]) > 0
aps = aps[obs_tokeep[0]]
Y = Y[obs_tokeep[0]]
Z = Z[obs_tokeep[0]]
D = D[obs_tokeep[0]]
W = W[obs_tokeep[0]]
cols = {"aps":aps, "Y":Y, "Z":Z, "D":D}
exog = ["aps"]
constant = False
if len(W.shape) > 1:
for i in range(W.shape[1]):
cols["W"+str(i)] = W[:,i]
exog.append("W"+str(i))
constant = (len(np.unique(W[:,i])) == 1) | constant
else:
constant = len(np.unique(W)) == 1
cols["W"] = W
exog.append("W")
# Check for single non-degeneracy
constant = (len(np.unique(aps)) == 1) | constant
if not constant:
cols["const"] = np.ones(len(Y))
exog.append("const")
df = pd.DataFrame(cols)
if estimator == "2SLS":
results = IV2SLS(df['Y'], df[exog], df['D'], df['Z']).fit(cov_type='robust')
elif estimator == "OLS":
results = IV2SLS(df['Y'], df[exog+['Z']], None, None).fit(cov_type='unadjusted')
else:
raise NotImplementedError(f"Estimator option {estimator} not implemented yet!")
if verbose:
print(results)
return results
[docs]def estimate_counterfactual_ml(aps = None, Y = None, Z = None, ml_out = None, cf_ml_out = None, data = None, Y_ind = None,
Z_ind = None, ml_out_ind = None, cf_ml_out_ind = None, aps_ind = None,
cov_type: str = "unadjusted", single_nondegen: bool = False, verbose: bool = True):
"""Estimate counterfactual performance of a new algorithm
Parameters
-----------
aps: array-like, default: None
Array of estimated APS values
Y: array-like, default: None
Array of outcome variables
Z: array-like, default: None
Array of treatment recommendations
ml_out: array-like, default: None
Original ML function outputs
cf_ml_out: array-like, default: None
Counterfactual ML function outputs
data: array-like, default: None
2D array of estimation inputs
Y_ind: int, default: None
Index of outcome variable in `data`
Z_ind: int, default: None
Index of treatment recommendation variable in `data`
ml_out_ind: int, default: None
Index of original ML output variable in `data`
cf_ml_out_ind: int, default: None
Index of counterfactual ML output variable in `data`
aps_ind: int, default: None
Index of APS variable in `data`
estimator: str, default: "2SLS"
Method of IV estimation
single_nondegen: bool, default: False
Indicator for whether the original ML algorithm takes on a single non-degenerate value in the sample
verbose: bool, default: True
Whether to print output of estimation
Returns
-----------
tuple(np.ndarray, OLSResults)
Tuple containing array of predicted float value scores and fitted OLS results.
Notes
-----
The process of estimating counterfactual value works as follows.
First we fit the below OLS regression using historical recommendations and outcome ``Z`` and ``Y``.
.. math::
Y_i = \\beta_0 + \\beta_1 Z_i + \\beta_2 p^s(X_i;\\delta) + \\epsilon_i
:math:`\\beta_1` is our estimated effect of treatment recommendation.
Then we take the original ML output ``ML1`` and the counterfactual ML output ``ML2`` and estimate the below value equation.
.. math::
\\hat{V}(ML') = \\frac{1}{n} \\sum_{i = 1}^n (Y_i + \\hat{\\beta_{ols}}(ML'(X_i) - ML(X_i))
"""
if data is not None:
data = np.array(data)
vals = {"Y": Y, "Z": Z, "APS": aps , "ml_out": ml_out, "cf_ml_out": cf_ml_out}
# If `data` given, then use index inputs for values not explicitly passed
infer = []
to_del = []
if data is not None:
inds = {"Y": Y_ind, "Z": Z_ind, "APS": aps_ind, "ml_out": ml_out_ind, "cf_ml_out": cf_ml_out_ind}
for key, val in vals.items():
if val is None:
if inds[key] is not None:
vals[key] = data[:,inds[key]]
to_del.append(inds[key])
else:
infer.append(key)
data = np.delete(data, to_del, axis=1)
if len(infer) != 0:
print(f"Indices for {infer} not explicitly passed. Assuming remaining columns in order {infer}...")
for i in range(len(infer)):
vals[infer[i]] = data[:,i]
Y = vals["Y"]
Z = vals["Z"]
aps = vals["APS"]
ml_out = np.array(vals["ml_out"])
cf_ml_out = np.array(vals["cf_ml_out"])
if aps is None or Y is None or Z is None or ml_out is None or cf_ml_out is None:
raise ValueError("Treatment effect estimation requires all values aps, Y, Z, D and ML recommendations to be passed!")
lm_inp = pd.DataFrame({"Y": Y, "Z": Z, "aps": aps})
if not single_nondegen:
lm_inp = add_constant(lm_inp)
ols_results = IV2SLS(lm_inp['Y'], lm_inp[['const', 'Z', 'aps']], None, None).fit(cov_type='unadjusted')
if verbose:
print(ols_results)
b_ols = ols_results.params['Z']
v = Y + b_ols * (cf_ml_out - ml_out)
v_score = np.mean(v)
if verbose:
print(f"Counterfactual value of new ML function: {v_score}")
return (v, ols_results)
[docs]def covariate_balance_test(aps = None, X = None, Z = None, data = None, X_ind = None, Z_ind = None, aps_ind = None,
X_labels = None, cov_type = "robust", verbose: bool = True):
"""Covariate Balance Test
Parameters
-----------
aps: array-like, default: None
Array of estimated APS values
X: array-like, default: None
Array of covariates to test
Z: array-like, default: None
Array of treatment recommendations
data: array-like, default: None
2D array of estimation inputs
X_ind: int/array_of_int, default: None
Indices/indices of covariates in `data`
Z_ind: int, default: None
Index of treatment recommendation variable in `data`
aps_ind: int, default: None
Index of APS variable in `data`
X_labels: array-like, default: None
Array of string labels to associate with each covariate
cov_type: str, default: "robust"
Covariance type of SUR. Any value other than "robust" defaults to simple (nonrobust) covariance.
verbose: bool, default: True
Whether to print output for each test
Returns
-----------
tuple(SystemResults, dict(X_label, dict(stat_label, value)))
Tuple containing the fitted SUR model results and a dictionary containing the results of covariate balance estimation for each covariate as well as the joint hypothesis.
Notes
-----
This function estimates a system of Seemingly Unrelated Regression (SUR) as defined in the linearmodels package.
APS, X, Z, and data should never have any overlapping columns. This is not checkable through the code, so please double check this when passing in the inputs.
For APS, X, Z, either the variables themselves should be passed, or their indices in `data`. If neither is passed then an error is raised.
"""
# Error checking
if X is None and (X_ind is None or data is None):
raise ValueError("covariate_balance_test: No valid data passed for X. You must either pass the variable directly into `X` or its index along with a `data` object.")
if aps is None and (aps_ind is None or data is None):
raise ValueError("covariate_balance_test: No valid data passed for aps. You must either pass the variable directly into `X` or its index along with a `data` object.")
if Z is None and (Z is None or data is None):
raise ValueError("covariate_balance_test: No valid data passed for Z. You must either pass the variable directly into `X` or its index along with a `data` object.")
if X_labels is not None:
if X.ndim == 1:
if len(X_labels) > 1:
raise ValueError(f"Column labels {X_labels} not the same length as inputs.")
else:
if len(X_labels) != X.shape[1]:
raise ValueError(f"Column labels {X_labels} not the same length as inputs.")
# Construct covariate balance inputs
if data is not None:
data = np.array(data)
if X_ind is not None:
X = data[:, X_ind]
if aps_ind is not None:
aps = data[:, aps_ind]
if Z_ind is not None:
Z = data[:, Z_ind]
if isinstance(X, np.ndarray):
if X.ndim == 1 and X_labels is None:
X_labels = ['X1']
elif X_labels is None:
X_labels = [f"X{i}" for i in range(X.shape[1])]
X = pd.DataFrame(X, columns = X_labels)
elif X_labels is not None:
X.columns = X_labels
else:
X_labels = X.columns
aps = np.array(aps)
Z = np.array(Z)
# Use only observations where aps is nondegenerate
obs_tokeep = np.nonzero((aps > 0) & (aps < 1))
print(f"We will run balance testing on {len(obs_tokeep[0])} values out of {len(Z)} from the dataset for which the APS estimation is nondegenerate.")
assert len(obs_tokeep[0]) > 0
exog = np.column_stack((Z, aps))
exog = pd.DataFrame(exog, columns = ['Z', 'aps'])
exog = exog.iloc[obs_tokeep[0],:]
X.reset_index(drop=True, inplace=True)
X = X.iloc[obs_tokeep[0]]
if cov_type != "robust":
cov_type = "unadjusted"
# Check for single non-degeneracy
single_nondegen = True
if len(np.unique(aps[obs_tokeep])) > 1:
single_nondegen = False
exog = add_constant(exog)
# Covariate balance test
mv_ols_res = SUR.multivariate_ls(X, exog).fit(cov_type = cov_type)
if verbose == True:
print(mv_ols_res)
# Joint hypothesis test: use multivariate_OLS from statsmodels
# Edge case: single variable then joint test is the same as the original
if len(X_labels) > 1:
mv_ols_joint = _MultivariateOLS(X, exog).fit()
L = np.zeros((1,3))
L[:,1] = 1
mv_test_res = mv_ols_joint.mv_test([("Z", L)])
else:
mv_test_res = None
# Compile results
res_dict = {}
for x_var in X_labels:
res_dict[x_var] = {}
res_dict[x_var]['coef'] = mv_ols_res.params[f"{x_var}_Z"]
res_dict[x_var]['p'] = mv_ols_res.pvalues[f"{x_var}_Z"]
res_dict[x_var]['t'] = mv_ols_res.tstats[f"{x_var}_Z"]
res_dict[x_var]['n'] = mv_ols_res.nobs/len(X_labels)
res_dict[x_var]['stderr'] = mv_ols_res.std_errors[f"{x_var}_Z"]
if mv_test_res is None:
res_dict['joint'] = {}
res_dict['joint']['p'] = mv_ols_res.pvalues[f"{X_labels[0]}_Z"]
res_dict['joint']['t'] = mv_ols_res.tstats[f"{X_labels[0]}_Z"]
else:
res_dict['joint'] = {}
res_dict['joint']['p'] = mv_test_res.results['Z']['stat'].iloc[0, 4]
res_dict['joint']['f'] = mv_test_res.results['Z']['stat'].iloc[0, 3]
return (mv_ols_res, res_dict)
[docs]def covariate_balance_test_controls(aps, X, Z, W, cov_type = "robust", verbose: bool = True):
"""Covariate Balance Test
Parameters
-----------
aps: array-like, default: None
Array of estimated APS values
X: array-like, default: None
Array of covariates to test
Z: array-like, default: None
Array of treatment recommendations
W: array-like, default: None
Array of control variables
cov_type: str, default: "robust"
Covariance type of SUR. Any value other than "robust" defaults to simple (nonrobust) covariance.
verbose: bool, default: True
Whether to print output for each test
Returns
-----------
tuple(SystemResults, dict(X, dict(stat_label, value)))
Tuple containing the fitted SUR model results and a dictionary containing the results of covariate balance estimation for each covariate as well as the joint hypothesis.
Notes
-----
This function estimates a system of Seemingly Unrelated Regression (SUR) as defined in the linearmodels package.
"""
aps = np.array(aps)
X = np.array(X)
Z = np.array(Z)
W = np.array(W)
# Use only observations where aps is nondegenerate
obs_tokeep = np.nonzero((aps > 0) & (aps < 1))
print(f"We will fit on {len(obs_tokeep[0])} values out of {len(Y)} from the dataset for which the APS estimation is nondegenerate.")
assert len(obs_tokeep[0]) > 0
aps = aps[obs_tokeep[0]]
X = X[obs_tokeep[0]]
Z = Z[obs_tokeep[0]]
W = W[obs_tokeep[0]]
cols = {"aps":aps, "Z":Z}
dep = []
if len(X.shape) > 1:
for i in range(X.shape[1]):
cols["X"+str(i)] = X[:,i]
dep.append("X"+str(i))
else:
cols["X"] = X
dep.append("X")
exog = ["aps", "Z"]
constant = False
if len(W.shape) > 1:
for i in range(W.shape[1]):
cols["W"+str(i)] = W[:,i]
exog.append("W"+str(i))
constant = (len(np.unique(W[:,i])) == 1) | constant
else:
constant = len(np.unique(W)) == 1
cols["W"] = W
exog.append("W")
# Check for single non-degeneracy
constant = (len(np.unique(aps)) == 1) | constant
if not constant:
cols["const"] = np.ones(len(aps))
exog.append("const")
df = pd.DataFrame(cols)
# Covariate balance test
mv_ols_res = SUR.multivariate_ls(df[dep], df[exog]).fit(cov_type = cov_type)
if verbose == True:
print(mv_ols_res)
# Joint hypothesis test: use multivariate_OLS from statsmodels
# Edge case: single variable then joint test is the same as the original
if len(dep) > 1:
mv_ols_joint = _MultivariateOLS(df[dep], df[exog]).fit()
L = np.zeros((1,len(exog)))
L[:,1] = 1
mv_test_res = mv_ols_joint.mv_test([("Z", L)])
else:
mv_test_res = None
# Compile results
res_dict = {}
for x_var in dep:
res_dict[x_var] = {}
res_dict[x_var]['coef'] = mv_ols_res.params[f"{x_var}_Z"]
res_dict[x_var]['p'] = mv_ols_res.pvalues[f"{x_var}_Z"]
res_dict[x_var]['t'] = mv_ols_res.tstats[f"{x_var}_Z"]
res_dict[x_var]['n'] = mv_ols_res.nobs/len(dep)
res_dict[x_var]['stderr'] = mv_ols_res.std_errors[f"{x_var}_Z"]
if mv_test_res is None:
res_dict['joint'] = {}
res_dict['joint']['p'] = mv_ols_res.pvalues[f"{dep[0]}_Z"]
res_dict['joint']['t'] = mv_ols_res.tstats[f"{dep[0]}_Z"]
else:
res_dict['joint'] = {}
res_dict['joint']['p'] = mv_test_res.results['Z']['stat'].iloc[0, 4]
res_dict['joint']['f'] = mv_test_res.results['Z']['stat'].iloc[0, 3]
return (mv_ols_res, res_dict)