"""APS estimation functions"""
from pathlib import Path
from typing import Tuple, Dict, Set, Union, Sequence, Optional
import onnxruntime as rt
import warnings
import numpy as np
import pandas as pd
import os
import gc
from numba import jit, njit
from numba.core.errors import NumbaDeprecationWarning, NumbaPendingDeprecationWarning
from IVaps import run_onnx_session, standardize, cumMean1D, cumMean2D
warnings.simplefilter('ignore', category=NumbaDeprecationWarning)
warnings.simplefilter('ignore', category=NumbaPendingDeprecationWarning)
def _computeAPS(onnx, X_c: np.ndarray, X_d: np.ndarray, L_inds: Tuple[np.ndarray, np.ndarray], L_vals: np.ndarray,
types: Tuple[np.dtype, np.dtype], S: int, delta: float, mu: np.ndarray, sigma: np.ndarray,
input_type: int, input_names: Tuple[str, str], fcn, cpu: bool, parallel: bool, **kwargs):
"""Compute APS for a single row of data
Approximate propensity score estimation involves taking draws :math:`X_c^1, \\ldots,X_c^S` from the uniform distribution on :math:`N(X_{ci}, \\delta)`, where :math:`N(X_{ci},\\delta)` is the :math:`p_c` dimensional ball centered at :math:`X_{ci}` with radius :math:`\\delta`.
:math:`X_c^1, \\ldots,X_c^S` are destandardized before passed for ML inference. The estimation equation is :math:`p^s(X_i;\\delta) = \\frac{1}{S} \\sum_{s=1}^{S} ML(X_c^s, X_{di})`.
Parameters
-----------
onnx: str
Path to saved ONNX model
X_c: array-like
1D vector of standardized continuous inputs
X_d: array-like
1D vector of discrete inputs
L_inds: tuple
Tuple of indices for mixed values in X_c
L_vals: array-like
1D vector of original mixed discrete values
types: list-like, length(2)
Numpy dtypes for continuous and discrete data
S: int
Number of draws
delta: float
Radius of sampling ball
mu: array-like, shape(n_continuous,)
1D vector of means of continuous variables
sigma: array-like, shape(n_continuous,)
1D vector of standard deviations of continuous variables
input_type: 1 or 2
Whether the model takes continuous/discrete inputs together or separately
input_names: tuple, length(2)
Names of input nodes if separate continuous and discrete inputs
fcn: Object
Vectorized decision function to wrap ML output
cpu: bool
Whether to run inference on CPU
parallel: bool
Whether function is being called in a parallelized process
**kwargs: keyword arguments to pass into decision function
Returns
-----------
np.ndarray
Estimated aps for the observation row. If list of deltas given, then returns 2D array with every column corresponding to a different delta. Otherwise, returns 1D array.
"""
# APS estimation ----------------------------------------------------------------------------------------------------------
nrows = X_c.shape[0]
p_c = X_c.shape[1]
standard_draws = np.random.normal(size = (nrows, S, p_c))
u_draws = np.random.uniform(size=(nrows, S))
if isinstance(delta, Sequence):
multi_delta = True
inference_draws_list = _drawAPS2D(X_c, standard_draws, u_draws, L_inds, L_vals, S, delta, mu, sigma)
inference_draws_list = [d.reshape((nrows*S, p_c)) for d in inference_draws_list]
else:
multi_delta = False
inference_draws = _drawAPS1D(X_c, standard_draws, u_draws, L_inds, L_vals, S, delta, mu, sigma)
# Run ONNX inference ----------------------------------------------------------------------------------------------------------
sess = rt.InferenceSession(onnx)
options = rt.SessionOptions()
# Set CPU provider
if cpu == True:
sess.set_providers(["CPUExecutionProvider"])
else: # If on GPU then put input and output on CUDA: don't need to implement this yet
if sess.get_providers()[0] == "CUDAExecutionProvider":
pass
cts_type = types[0]
disc_type = types[1]
# Set session threads if parallelizing
if parallel == True:
os.environ["OMP_NUM_THREADS"] = '1'
options.inter_op_num_threads = 1
options.intra_op_num_threads = 1
# Multi-output models are typically in the order [label, probabilities], so this is what we'll assume for now
if len(sess.get_outputs()) > 1:
label_name = sess.get_outputs()[1].name
else:
label_name = sess.get_outputs()[0].name
input_name = sess.get_inputs()[0].name
if multi_delta == True:
ml_out = []
for inference_draws in inference_draws_list:
# Adapt input based on settings
if X_d is None:
inputs = inference_draws.astype(cts_type)
ml_out_tmp = run_onnx_session([inputs], sess, [input_name], [label_name], fcn, **kwargs)
else:
X_d_long = np.repeat(X_d, S, axis=0)
if input_type == 2:
disc_inputs = X_d_long.astype(disc_type)
cts_inputs = inference_draws.astype(cts_type)
ml_out_tmp = run_onnx_session([cts_inputs, disc_inputs], sess, input_names, [label_name], fcn, **kwargs)
else:
# If input type = 1, then coerce all to the continuous type
inputs = np.append(inference_draws, X_d_long, axis=1).astype(cts_type)
ml_out_tmp = run_onnx_session([inputs], sess, [input_name], [label_name], fcn, **kwargs)
ml_out.append(ml_out_tmp)
ml_out = np.stack(ml_out)
else:
# Adapt input based on settings
if X_d is None:
inputs = inference_draws.astype(cts_type)
ml_out = run_onnx_session([inputs], sess, [input_name], [label_name], fcn, **kwargs)
else:
X_d_long = np.repeat(X_d, S, axis=0)
if input_type == 2:
disc_inputs = X_d_long.astype(disc_type)
cts_inputs = inference_draws.astype(cts_type)
ml_out = run_onnx_session([cts_inputs, disc_inputs], sess, input_names, [label_name], fcn, **kwargs)
else:
# If input type = 1, then coerce all to the continuous type
inputs = np.append(inference_draws, X_d_long, axis=1).astype(cts_type)
ml_out = run_onnx_session([inputs], sess, [input_name], [label_name], fcn, **kwargs)
# Explicitly delete ONNX session
del sess
# Return means of every S rows
if multi_delta == True:
aps = cumMean2D(ml_out, S)
else:
aps = cumMean1D(ml_out, S)
return aps
@jit(nopython = True)
def _drawAPS1D(X_c: np.ndarray, standard_draws: np.ndarray, u_draws: np.ndarray, L_inds: Tuple[np.ndarray, np.ndarray],
L_vals: np.ndarray, S: int, delta: float, mu: np.ndarray, sigma: np.ndarray):
nrows = X_c.shape[0]
p_c = X_c.shape[1]
na_inds = np.where(np.isnan(X_c))
# For each row in X_c, run separate sampling procedure
for i in range(len(na_inds[0])):
standard_draws[na_inds[0][i], :, na_inds[1][i]] = np.nan
scaled_draws = np.empty_like(standard_draws)
for i in range(standard_draws.shape[0]):
for s in range(standard_draws.shape[1]):
row = standard_draws[i, s, :]
scaled = row/np.sqrt(np.sum(row[~np.isnan(row)]**2))
scaled_draws[i, s] = scaled
na_counts = np.empty(nrows)
for i in range(X_c.shape[0]):
na_counts[i] = np.sum(np.isnan(X_c[i, :]))
non_na_cts = p_c - na_counts # Count of non-na draws for each row
u = np.empty_like(u_draws)
for i in range(len(non_na_cts)):
ct = non_na_cts[i]
if ct != 0:
u[i] = u_draws[i]**(1/ct)
else:
u[i] = np.array([np.nan] * len(u[i]))
# Draw from uniform distribution
uniform_draws = scaled_draws * np.expand_dims(u, 2) * np.array(delta) + np.expand_dims(X_c, 1) # Scale by sampled u and ball mean/radius to get the final uniform draws (nrow x S x p_c)
# De-standardize each of the variables
destandard_draws = np.add(np.multiply(uniform_draws, sigma), mu) # This applies the transformations continuous variable-wise
# Add back the original discrete mixed values
if L_inds is not None:
for i in range(len(L_inds[0])):
destandard_draws[L_inds[0][i], :, L_inds[1][i]] = L_vals[i]
# Collapse to 2D for inference
inference_draws = destandard_draws.reshape((nrows*S, p_c))
return inference_draws
@jit(nopython = True)
def _drawAPS2D(X_c: np.ndarray, standard_draws: np.ndarray, u_draws: np.ndarray, L_inds: Tuple[np.ndarray, np.ndarray],
L_vals: np.ndarray, S: int, delta: Sequence, mu: np.ndarray, sigma: np.ndarray):
nrows = X_c.shape[0]
p_c = X_c.shape[1]
na_inds = np.where(np.isnan(X_c))
# For each row in X_c, run separate sampling procedure
for i in range(len(na_inds[0])):
standard_draws[na_inds[0][i], :, na_inds[1][i]] = np.nan
scaled_draws = np.empty_like(standard_draws)
for i in range(standard_draws.shape[0]):
for s in range(standard_draws.shape[1]):
row = standard_draws[i, s, :]
scaled = row/np.sqrt(np.sum(row[~np.isnan(row)]**2))
scaled_draws[i, s] = scaled
na_counts = np.empty(nrows)
for i in range(X_c.shape[0]):
na_counts[i] = np.sum(np.isnan(X_c[i, :]))
non_na_cts = p_c - na_counts # Count of non-na draws for each row
u = np.empty_like(u_draws)
for i in range(len(non_na_cts)):
ct = non_na_cts[i]
if ct != 0:
u[i] = u_draws[i]**(1/ct)
else:
u[i] = np.array([np.nan] * len(u[i]))
# If list of deltas, then create new set of draws for each
uniform_draws = [scaled_draws * np.expand_dims(u, 2) * d + np.expand_dims(X_c, 1) for d in delta]
# De-standardize each of the variables
destandard_draws = [np.add(np.multiply(unif, sigma), mu) for unif in uniform_draws]
# Add back the original discrete mixed values
if L_inds is not None:
for d in destandard_draws:
for i in range(len(L_inds[0])):
d[L_inds[0][i], :, L_inds[1][i]] = L_vals[i]
# Collapse draws for each delta to 2D for inference
return destandard_draws
def _preprocessMixedVars(X_c, L_keys, L_vals):
# Get indices of mixed vars to replace for each row
mixed_og_rows = [np.where(np.isin(X_c[:,L_keys[i]], list(L_vals[i])))[0] for i in range(len(L_keys))] # List of row indices for each mixed variable column
mixed_og_cols = [np.repeat(L_keys[i], len(mixed_og_rows[i])) for i in range(len(mixed_og_rows))]
mixed_rows = np.concatenate(mixed_og_rows)
mixed_cols = np.concatenate(mixed_og_cols)
mixed_og_inds = (mixed_rows, mixed_cols)
# Save original discrete values
mixed_og_vals = X_c[mixed_og_inds]
# Replace values at indices with NA
X_c[mixed_og_inds] = np.nan
return (X_c, mixed_og_vals, mixed_og_inds)
[docs]def estimate_aps_onnx(onnx: str, X_c = None, X_d = None, data = None, C: Sequence = None, D: Sequence = None, L: Dict[int, Set] = None,
S: int = 100, delta: float = 0.8, seed: int = None, types: Tuple[np.dtype, np.dtype] = (None, None), input_type: int = 1,
input_names: Tuple[str, str]=("c_inputs", "d_inputs"), fcn = None, vectorized: bool = False, cpu: bool = False, iobound: bool = False,
parallel: bool = False, nprocesses: int = None, ntasks: int = 1, **kwargs):
"""Estimate APS for given dataset and ONNX model
Approximate propensity score estimation involves taking draws :math:`X_c^1, \\ldots,X_c^S` from the uniform distribution on :math:`N(X_{ci}, \\delta)`, where :math:`N(X_{ci},\\delta)` is the :math:`p_c` dimensional ball centered at :math:`X_{ci}` with radius :math:`\\delta`.
:math:`X_c^1, \\ldots,X_c^S` are destandardized before passed for ML inference. The estimation equation is :math:`p^s(X_i;\\delta) = \\frac{1}{S} \\sum_{s=1}^{S} ML(X_c^s, X_{di})`.
Parameters
-----------
onnx: str
String path to ONNX model
X_c: array-like, default: None
1D/2D vector of continuous input variables
X_d: array-like, default: None
1D/2D vector of discrete input variables
data: array-like, default: None
Dataset containing ML input variables
C: array-like, default: None
Integer column indices for continous variables
D: array-like, default: None
Integer column indices for discrete variables
L: Dict[int, Set]
Dictionary with keys as indices of X_c and values as sets of discrete values
S: int, default: 100
Number of draws for each APS estimation
delta: float/list, default: 0.8
Radius of sampling ball. If list, then APS is recomputed for each delta in list.
seed: int, default: None
Seed for sampling
types: Tuple[np.dtype, np.dtype], default: (None, None)
Numpy dtypes for continuous and discrete data; by default types are inferred
input_type: int, default: 1
Whether the model takes continuous/discrete inputs together (1) or separately (2)
input_names: Tuple[str,str], default: ("c_inputs", "d_inputs")
Names of input nodes of ONNX model
fcn: Object, default: None
Decision function to apply to ML output
vectorized: bool, default: False
Indicator for whether decision function is already vectorized
cpu: bool, default False
Run inference on CPU; defaults to GPU if available
parallel: bool, default: False
Whether to parallelize the APS estimation
nprocesses: int, default: None
Number of processes to parallelize. Defaults to number of processors on machine.
ntasks: int, default: 1
Number of tasks to send to each worker process.
Returns
-----------
np.ndarray
Array of estimated APS for each observation in sample
Notes
------
X_c, X_d, and data should never have any overlapping columns. This is not checkable through the code, so please double check this when passing in the inputs.
"""
# Set X_c and X_d based on inputs
if X_c is None and data is None:
raise ValueError("APS estimation requires continuous data!")
# Prioritize explicitly passed variables
if X_c is not None:
X_c = np.array(X_c).astype("float")
if X_d is not None:
X_d = np.array(X_d).astype("float")
if data is not None:
data = np.array(data).astype("float")
# If X_c not given, but data is, then we assume all of data is X_c
if X_c is None and X_d is not None and data is not None:
print("`X_c` not given but both `X_d` and `data` given. We will assume that all the variables in `data` are continuous.")
X_c = data
# If X_d not given, but data is, then we assume all of data is X_d
if X_c is not None and X_d is None and data is not None:
print("`X_d` not given but both `X_c` and `data` given. We will assume that all the variables in `data` are discrete.")
X_d = data
# If both X_c and X_d are none, then use indices
if X_c is None and X_d is None:
if C is None and D is None:
print("`data` given but no indices passed. We will assume that all the variables in `data` are continuous.")
X_c = data
elif C is None:
if isinstance(D, int):
d_len = 1
else:
d_len = len(D)
X_d = data[:,D]
if d_len >= data.shape[1]:
raise ValueError(f"Passed discrete indices of length {d_len} for input data of shape {data.shape}. Continuous variables are necessary to conduct APS estimation.")
else:
print(f"Passed discrete indices of length {d_len} for input data of shape {data.shape}. Remaining columns of `data` will be assumed to be continuous variables.")
X_c = np.delete(data, D, axis = 1)
elif D is None:
if isinstance(C, int):
c_len = 1
else:
c_len = len(C)
X_c = data[:,C]
if c_len < data.shape[1]:
print(f"Passed continuous indices of length {c_len} for input data of shape {data.shape}. Remaining columns of `data` will be assumed to be discrete variables.")
X_d = np.delete(data, C, axis = 1)
else:
X_c = data[:,C]
X_d = data[:,D]
# Force data to be 2d arrays
if X_c.ndim == 1:
X_c = X_c[:,np.newaxis]
if X_d is not None:
if X_d.ndim == 1:
X_d = X_d[:,np.newaxis]
# Vectorize decision function if not
if fcn is not None and vectorized == False:
fcn = np.vectorize(fcn)
# Preprocess mixed variables
if L is not None:
L_keys = np.array(list(L.keys()))
L_vals = np.array(list(L.values()))
X_c, mixed_og_vals, mixed_og_inds = _preprocessMixedVars(X_c, L_keys, L_vals)
mixed_rows, mixed_cols = mixed_og_inds
else:
mixed_og_vals = None
mixed_og_inds = None
# If types not given, then infer from data
types = list(types)
if types[0] is None:
types[0] = X_c.dtype
if types[1] is None:
if X_d is not None:
types[1] = X_d.dtype
# Standardize cts vars
# Formula: (X_ik - u_k)/o_k; k represents a continuous variable
X_c, mu, sigma = standardize(X_c)
if seed is not None:
np.random.seed(seed)
# If parallelizing, then force inference on CPU
if parallel == True:
cpu = True
# # Need to force Windows implementation of spawning on Linux
# import multiprocess.context as ctx
# ctx._force_start_method('spawn')
import pathos
from functools import partial
from itertools import repeat
computeAPS_frozen = partial(_computeAPS, types = types, S = S, delta = delta, mu = mu, sigma = sigma, input_type = input_type,
input_names = input_names, fcn = fcn, cpu = cpu, parallel = parallel, **kwargs)
mp = pathos.helpers.mp
p = mp.Pool(nprocesses)
#p = pathos.pools._ProcessPool(nprocesses)
if nprocesses is None:
workers = "default (# processors)"
nprocesses = mp.cpu_count()
else:
workers = nprocesses
print(f"Running APS estimation with {workers} workers...")
# Split input arrays into chunked rows
nchunks = ntasks * nprocesses
X_c_split = np.array_split(X_c, nchunks)
iter_c = iter(X_c_split)
if X_d is None:
iter_d = repeat(None)
else:
iter_d = iter(np.array_split(X_d, nchunks))
if L is None:
iter_L_ind = repeat(None)
iter_L_val = repeat(None)
else:
# Split indices depending on which chunk they fall into
chunksizes = np.append([0], np.cumsum([c.shape[0] for c in X_c_split]))
chunked_inds = [(mixed_rows[np.where(np.isin(mixed_rows, range(chunksizes[i], chunksizes[i+1])))] - chunksizes[i],
mixed_cols[np.where(np.isin(mixed_rows, range(chunksizes[i], chunksizes[i+1])))]) for i in range(len(chunksizes) - 1)]
chunked_vals = [mixed_og_vals[np.where(np.isin(mixed_rows, range(chunksizes[i], chunksizes[i+1])))] for i in range(len(chunksizes) - 1)]
iter_L_ind = iter(chunked_inds)
iter_L_val = iter(chunked_vals)
iter_args = zip(repeat(onnx), iter_c, iter_d, iter_L_ind, iter_L_val)
p_out = p.starmap(computeAPS_frozen, iter_args)
p.close()
p.join()
aps_vec = np.concatenate(p_out)
else:
aps_vec = _computeAPS(onnx, X_c, X_d, mixed_og_inds, mixed_og_vals, types, S, delta, mu, sigma, input_type, input_names, fcn, cpu, parallel, **kwargs) # Compute APS for each individual i
aps_vec = np.array(aps_vec)
gc.collect()
return aps_vec
def _computeUserAPS(X_c: np.ndarray, X_d: np.ndarray, L_inds: np.ndarray, L_vals: np.ndarray, ml, S: int, delta: float, mu: np.ndarray, sigma: np.ndarray,
pandas: bool, pandas_cols: Sequence, order: Sequence, reorder: Sequence, **kwargs):
"""Compute APS using a user-defined input function.
Approximate propensity score estimation involves taking draws :math:`X_c^1, \\ldots,X_c^S` from the uniform distribution on :math:`N(X_{ci}, \\delta)`, where :math:`N(X_{ci},\\delta)` is the :math:`p_c` dimensional ball centered at :math:`X_{ci}` with radius :math:`\\delta`.
:math:`X_c^1, \\ldots,X_c^S` are destandardized before passed for ML inference. The estimation equation is :math:`p^s(X_i;\\delta) = \\frac{1}{S} \\sum_{s=1}^{S} ML(X_c^s, X_{di})`.
Parameters
-----------
X_c: array-like
1D vector of standardized continuous inputs
X_d: array-like
1D vector of discrete inputs
L_inds: tuple
Tuple of indices for mixed values in X_c
L_vals: array-like
1D vector of original mixed discrete values
ml: Object
User-defined vectorized ML function
S: int
Number of draws
delta: float
Radius of sampling ball
mu: array-like, shape(n_continuous,)
1D vector of means of continuous variables
sigma: array-like, shape(n_continuous,)
1D vector of standard deviations of continuous variables
pandas: bool
Whether to convert input to pandas DataFrame before sending into function
pandas_cols: Sequence
Column names for pandas input. Pandas defaults to integer names.
order: Sequence
Reording the columns after ordering into [cts vars, discrete vars]
reorder: Sequence
Indices to reorder the data assuming original order `order`
seed: int
Numpy random seed
**kwargs: keyword arguments to pass into user function
Returns
-----------
float
Estimated aps for the observation row
"""
# =================================== APS estimation with full matrix form ===================================
nrows = X_c.shape[0]
p_c = X_c.shape[1]
standard_draws = np.random.normal(size = (nrows, S, p_c))
u_draws = np.random.uniform(size=(nrows, S))
if isinstance(delta, Sequence):
multi_delta = True
inference_draws_list = _drawAPS2D(X_c, standard_draws, u_draws, L_inds, L_vals, S, delta, mu, sigma)
inference_draws_list = [d.reshape((nrows*S, p_c)) for d in inference_draws_list]
else:
multi_delta = False
inference_draws = _drawAPS1D(X_c, standard_draws, u_draws, L_inds, L_vals, S, delta, mu, sigma)
# Run ML inference ----------------------------------------------------------------------------------------------------------
# We will assume that ML always takes a single concatenated matrix as input
if multi_delta == True:
ml_out = []
for inference_draws in inference_draws_list:
if X_d is None:
inputs = inference_draws
else:
X_d_long = np.repeat(X_d, S, axis=0)
inputs = np.append(inference_draws, X_d_long, axis=1)
# Reorder if specified
if order is not None:
inputs = inputs[:,order]
if reorder is not None:
inputs = inputs[:,reorder]
# Create pandas input if specified
if pandas:
inputs = pd.DataFrame(inputs, columns = pandas_cols)
ml_out_tmp = np.squeeze(np.array(ml(inputs, **kwargs)))
ml_out.append(ml_out_tmp)
ml_out = np.stack(ml_out)
else:
if X_d is None:
inputs = inference_draws
else:
X_d_long = np.repeat(X_d, S, axis=0)
inputs = np.append(inference_draws, X_d_long, axis=1)
# Reorder if specified
if order is not None:
inputs = inputs[:,order]
if reorder is not None:
inputs = inputs[:,reorder]
# Create pandas input if specified
if pandas:
inputs = pd.DataFrame(inputs, columns = pandas_cols)
ml_out = np.squeeze(np.array(ml(inputs, **kwargs)))
# Return means of every S rows
if multi_delta == True:
aps = cumMean2D(ml_out, S)
else:
aps = cumMean1D(ml_out, S)
return aps
def _get_og_order(n, C, D):
order = None
if C is None and D is None:
pass
elif C is None:
order = []
c_len = n - len(D)
c_ind = 0
for i in range(n):
if i in D:
order.append(c_ind + c_len)
c_ind += 1
else:
order.append(i - c_ind)
else:
order = []
c_len = len(C)
c_ind = 0
for i in range(n):
if i in C:
order.append(i - c_ind)
else:
order.append(c_ind + c_len)
c_ind += 1
return order
[docs]def estimate_aps_user_defined(ml, X_c = None, X_d = None, data = None, C: Sequence = None, D: Sequence = None, L: Dict[int, Set] = None,
S: int = 100, delta: float = 0.8, seed: int = None, pandas: bool = False, pandas_cols: Sequence = None,
keep_order: bool = False, reorder: Sequence = None, parallel: bool = False, nprocesses: int = None, ntasks: int = 1, **kwargs):
"""Estimate APS for given dataset and user defined ML function
Approximate propensity score estimation involves taking draws :math:`X_c^1, \\ldots,X_c^S` from the uniform distribution on :math:`N(X_{ci}, \\delta)`, where :math:`N(X_{ci},\\delta)` is the :math:`p_c` dimensional ball centered at :math:`X_{ci}` with radius :math:`\\delta`.
:math:`X_c^1, \\ldots,X_c^S` are destandardized before passed for ML inference. The estimation equation is :math:`p^s(X_i;\\delta) = \\frac{1}{S} \\sum_{s=1}^{S} ML(X_c^s, X_{di})`.
Parameters
-----------
ml: Object
User defined ml function
X_c: array-like, default: None
1D/2D vector of continuous input variables
X_d: array-like, default: None
1D/2D vector of discrete input variables
data: array-like, default: None
Dataset containing ML input variables
C: array-like, default: None
Integer column indices for continous variables
D: array-like, default: None
Integer column indices for discrete variables
L: Dict[int, Set]
Dictionary with keys as indices of X_c and values as sets of discrete values
S: int, default: 100
Number of draws for each APS estimation
delta: float, default: 0.8
Radius of sampling ball
seed: int, default: None
Seed for sampling
pandas: bool, default: False
Whether to cast inputs into pandas dataframe
pandas_cols: Sequence, default: None
Columns names for dataframe input
keep_order: bool, default: False
Whether to maintain the column order if data passed as a single 2D array
reorder: Sequence, default: False
Indices to reorder the data assuming original order [X_c, X_d]
parallel: bool, default: False
Whether to parallelize the APS estimation
nprocesses: int, default: None
Number of processes to parallelize. Defaults to number of processors on machine.
ntasks: int, default: 1
Number of tasks to send to each worker process.
**kwargs: keyword arguments to pass into user function
Returns
-----------
np.ndarray
Array of estimated APS for each observation in sample
Notes
------
X_c, X_d, and data should never have any overlapping variables. This is not checkable through the code, so please double check this when passing in the inputs.
The arguments `keep_order`, `reorder`, and `pandas_cols` are applied sequentially, in that order. This means that if `keep_order` is set, then `reorder` will reorder the columns from the original column order as `data`. `pandas_cols` will then be the names of the new ordered dataset.
The default ordering of inputs is [X_c, X_d], where the continuous variables and discrete variables will be in the original order regardless of how their input is passed. If `reorder` is called without `keep_order`, then the reordering will be performed on this default ordering.
Parallelization uses the `Pool` module from pathos, which will NOT be able to deal with execution on GPU. If the user function enables inference on GPU, then it is recommended to implement parallelization within the user function as well.
The optimal settings for nprocesses and nchunks are specific to each machine, and it is highly recommended that the user pass these arguments to maximize the performance boost. `This SO thread <https://stackoverflow.com/questions/42074501/python-concurrent-futures-processpoolexecutor-performance-of-submit-vs-map>`_ recommends setting nchunks to be 14 * # of workers for optimal performance.
"""
# Set X_c and X_d based on inputs
if X_c is None and data is None:
raise ValueError("APS estimation requires continuous data!")
# Prioritize explicitly passed variables
if X_c is not None:
X_c = np.array(X_c).astype(float)
if X_d is not None:
X_d = np.array(X_d).astype(float)
if data is not None:
data = np.array(data).astype(float)
# If X_c not given, but data is, then we assume all of data is X_c
if X_c is None and X_d is not None and data is not None:
print("`X_c` not given but both `X_d` and `data` given. We will assume that all the variables in `data` are continuous.")
X_c = data
# If X_d not given, but data is, then we assume all of data is X_d
if X_c is not None and X_d is None and data is not None:
print("`X_d` not given but both `X_c` and `data` given. We will assume that all the variables in `data` are discrete.")
X_d = data
# If both X_c and X_d are none, then use indices
order = None
if X_c is None and X_d is None:
# Save original order if keep order in place
if keep_order:
order = _get_og_order(data.shape[1], C, D)
if C is None and D is None:
print("`data` given but no indices passed. We will assume that all the variables in `data` are continuous.")
X_c = data
elif C is None:
if isinstance(D, int):
d_len = 1
else:
d_len = len(D)
X_d = data[:,D]
if d_len >= data.shape[1]:
raise ValueError(f"Passed discrete indices of length {d_len} for input data of shape {data.shape}. Continuous variables are necessary to conduct APS estimation.")
else:
print(f"Passed discrete indices of length {d_len} for input data of shape {data.shape}. Remaining columns of `data` will be assumed to be continuous variables.")
X_c = np.delete(data, D, axis = 1)
elif D is None:
if isinstance(C, int):
c_len = 1
else:
c_len = len(C)
X_c = data[:,C]
if c_len < data.shape[1]:
print(f"Passed continuous indices of length {c_len} for input data of shape {data.shape}. Remaining columns of `data` will be assumed to be discrete variables.")
X_d = np.delete(data, C, axis = 1)
else:
X_c = data[:,C]
X_d = data[:,D]
# Force X_c to be 2d array
if X_c.ndim == 1:
X_c = X_c[:,np.newaxis]
if X_d is not None:
if X_d.ndim == 1:
X_d = X_d[:,np.newaxis]
# === Preprocess mixed variables ===
if L is not None:
L_keys = np.array(list(L.keys()))
L_vals = np.array(list(L.values()))
X_c, mixed_og_vals, mixed_og_inds = _preprocessMixedVars(X_c, L_keys, L_vals)
mixed_rows, mixed_cols = mixed_og_inds
else:
mixed_og_vals = None
mixed_og_inds = None
# === Standardize continuous variables ===
# Formula: (X_ik - u_k)/o_k; k represents a continuous variable
X_c, mu, sigma = standardize(X_c)
if seed is not None:
np.random.seed(seed)
# If parallelizing, then force inference on CPU
if parallel == True:
cpu = True
import pathos
from functools import partial
from itertools import repeat
computeUserAPS_frozen = partial(_computeUserAPS, ml = ml, S = S, delta = delta, mu = mu, sigma = sigma, pandas = pandas,
pandas_cols = pandas_cols, order = order, reorder = reorder, **kwargs)
mp = pathos.helpers.mp
p = mp.Pool(nprocesses)
if nprocesses is None:
workers = "default (# processors)"
nprocesses = mp.cpu_count()
else:
workers = nprocesses
print(f"Running APS estimation with {workers} workers...")
# Split input arrays into chunked rows
nchunks = ntasks * nprocesses
X_c_split = np.array_split(X_c, nchunks)
iter_c = iter(X_c_split)
if X_d is None:
iter_d = repeat(None)
else:
iter_d = iter(np.array_split(X_d, nchunks))
if L is None:
iter_L_ind = repeat(None)
iter_L_val = repeat(None)
else:
# Split indices depending on which chunk they fall into
chunksizes = np.append([0], np.cumsum([c.shape[0] for c in X_c_split]))
chunked_inds = [(mixed_rows[np.where(np.isin(mixed_rows, range(chunksizes[i], chunksizes[i+1])))] - chunksizes[i],
mixed_cols[np.where(np.isin(mixed_rows, range(chunksizes[i], chunksizes[i+1])))]) for i in range(len(chunksizes) - 1)]
chunked_vals = [mixed_og_vals[np.where(np.isin(mixed_rows, range(chunksizes[i], chunksizes[i+1])))] for i in range(len(chunksizes) - 1)]
iter_L_ind = iter(chunked_inds)
iter_L_val = iter(chunked_vals)
iter_args = zip(iter_c, iter_d, iter_L_ind, iter_L_val)
p_out = p.starmap(computeUserAPS_frozen, iter_args)
p.close()
p.join()
aps_vec = np.concatenate(p_out)
else:
aps_vec = _computeUserAPS(X_c, X_d, mixed_og_inds, mixed_og_vals, ml, S, delta, mu, sigma, pandas, pandas_cols, order, reorder, **kwargs) # Compute APS for each individual i
aps_vec = np.array(aps_vec)
return aps_vec