Source code for IVaps.helpers

"""Helper functions"""
from pathlib import Path
from typing import Tuple, Dict, Union, Sequence, Optional
import onnxruntime as rt
import warnings
import numpy as np
import pandas as pd
import onnx
from onnx import helper, numpy_helper
from onnx import TensorProto
from onnxmltools.convert.common.data_types import FloatTensorType, DoubleTensorType, Int64TensorType, Int32TensorType, StringTensorType, BooleanTensorType
from numba import jit, njit
from numba.core.errors import NumbaDeprecationWarning, NumbaPendingDeprecationWarning

warnings.simplefilter('ignore', category=NumbaDeprecationWarning)
warnings.simplefilter('ignore', category=NumbaPendingDeprecationWarning)

[docs]def run_onnx_session(inputs: Sequence[np.ndarray], sess: rt.InferenceSession, input_names: Sequence[str], label_names: Sequence[str] = None, fcn = None, **kwargs): """Convenience function to execute ONNX inference with an optional post-inference function Parameters ----------- inputs: Sequence of array-likes ONNX inference inputs sess: onnxruntime InferenceSession input_names: Sequence of strings Input names to assign to inputs label_names: Sequence of strings, default: all outputs Specific outputs to return from inference fcn: Object, default: None Vectorized function to pass inference outputs through **kwargs: additional arguments to pass into fcn Returns ----------- np.ndarray Outputs of ONNX inference or post-inference function """ feed_dict = dict(zip(input_names, inputs)) ml_out = sess.run(label_names, feed_dict) # All outputs are wrapped in a list -- if single label then send to 1d list if len(label_names) == 1: ml_out = ml_out[0] # Account for case in which output probabilities are in dictionary of class labels if isinstance(ml_out[0], Dict): ml_out = np.array([d[1] for d in ml_out]) if fcn is not None: ml_out = fcn(ml_out, **kwargs) # Remove unnecessary dims ml_out = np.squeeze(ml_out) return ml_out
[docs]def convert_to_onnx(model, framework: str, dummy_input1 = None, dummy_input2 = None, output_path: str = None, input_names: Tuple[str, str] = ("c_inputs", "d_inputs"), output_names: Sequence = None, tf_input_names: Sequence = None, tf_output_names: Sequence = None, target_opset: int = None, **kwargs): """Convenience function to quickly convert and save ONNX model with expected input/output settings Parameters ----------- model: object fitted model object (or path to saved model in Tensorflow case) framework: str Reference string for one of the implemented frameworks dummy_input1: list-like, default: None Dummy input for first model input used for type inference and passed into downstream conversion functions dummy_input2: list-like, default: None Dummy input for second model input (if applicable) used for type inference and passed into downstream conversion functions output_path: str, default: None path to save ONNX model input_names: Tuple[str, str], default: ("c_inputs", "d_inputs") input names to assign ONNX model output_names: list-like, default: None output names for later ONNX inference; if None defaults to naming the outputs sequentially "output_1", "output_2", etc... tf_input_names: list-like, default: None Input names for Tensorflow graph. Only required when converting from Tensorflow using a frozen graph or checkpoints. tf_output_names: list-like, default: None Output names for Tensorflow graph. Only required when converting from Tensorflow using a frozen graph or checkpoints. **kwargs: keyword arguments to be passed into mltools conversion function Returns ----------- Object Converted ONNX model or boolean flag indicating successful conversion, depending on specific framework. """ # Adjust dummy input(s) if incorrect dimension if dummy_input1 is not None: if framework != "pytorch": dummy_input1 = np.array(dummy_input1) if dummy_input1.ndim > 1: dummy_input1 = dummy_input1[0].flatten() else: if dummy_input1.ndim == 1: dummy_input1 = dummy_input1[:, np.newaxis] if dummy_input2 is not None: if framework != "pytorch": dummy_input2 = np.array(dummy_input2) if dummy_input2.ndim > 1: dummy_input2 = dummy_input2[0].flatten() else: if dummy_input2.ndim == 1: dummy_input2 = dummy_input2[:, np.newaxis] if framework in ["sklearn", "lightgbm", "xgboost", "catboost", "coreml", "libsvm", "sparkml", "keras",]: if dummy_input1 is None: raise ValueError(f"Conversion from {framework} model requires a dummy input.") elif dummy_input2 is None: tensortype = _guess_numpy_type(dummy_input1.dtype) initial_type = [(input_names[0], tensortype([None, len(dummy_input1)]))] else: tensortype_1 = _guess_numpy_type(dummy_input1.dtype) tensortype_2 = _guess_numpy_type(dummy_input2.dtype) initial_type = [(input_names[0], tensortype_1([None, len(dummy_input1)])), (input_names[1], tensortype_2([None, len(dummy_input2)]))] if framework == "sklearn": from onnxmltools import convert_sklearn onx = convert_sklearn(model, initial_types=initial_type, target_opset = target_opset, **kwargs) if framework == "keras": from onnxmltools import convert_keras onx = convert_keras(model, initial_types=initial_type, target_opset = target_opset, **kwargs) if framework == "lightgbm": from onnxmltools import convert_lightgbm onx = convert_lightgbm(model, initial_types=initial_type, target_opset = target_opset, **kwargs) if framework == "xgboost": from onnxmltools import convert_xgboost onx = convert_xgboost(model, initial_types=initial_type, target_opset = target_opset, **kwargs) if framework == "catboost": from onnxmltools import convert_catboost onx = convert_catboost(model, initial_types=initial_type, target_opset = target_opset, **kwargs) if framework == "coreml": from onnxmltools import convert_coreml onx = convert_coreml(model, initial_types=initial_type, target_opset = target_opset, **kwargs) if framework == "libsvm": from onnxmltools import convert_libsvm onx = convert_libsvm(model, initial_types=initial_type, target_opset = target_opset, **kwargs) if framework == "sparkml": from onnxmltools import sparkml onx = convert_sparkml(model, initial_types=initial_type, target_opset = target_opset, **kwargs) # TODO: Rename outputs -- BELOW DOESNT WORK # if output_names is None: # for i in range(len(onx.graph.output)): # onx.graph.output[i].name = f"output_{i}" # else: # for i in range(len(onx.graph.output)): # onx.graph.output[i].name = f"{output_names[i]}" if output_path is not None: with open(output_path, "wb") as f: f.write(onx.SerializeToString()) return onx # Models that don't use intial_types if framework in ["cntk",]: if framework == "cntk": import cntk if output_path is None: raise ValueError(f"Conversion from {framework} requires output_path.") model.save(output_path, format=cntk.ModelFormat.ONNX) if framework == "pytorch": import torch from torch.onnx import export if output_path is None: raise ValueError(f"Conversion from {framework} requires output_path.") if dummy_input1 is None: raise ValueError(f"Conversion from {framework} model requires a dummy input.") if dummy_input2 is None: d_axes = {input_names[0]:{0:'N'}} if output_names is None: output_names = ["output_0"] d_axes.update({"output_0": {0:'N'}}) else: d_axes.update({key: {0:'N'} for key in output_names}) # Convert to tensor if necessary if not isinstance(dummy_input1, torch.Tensor): dummy_input1 = torch.tensor(dummy_input1) export(model, dummy_input1, output_path, input_names=[input_names[0]], output_names=output_names, dynamic_axes=d_axes, **kwargs) else: if not isinstance(dummy_input1, torch.Tensor): dummy_input1 = torch.tensor(dummy_input1) if not isinstance(dummy_input2, torch.Tensor): dummy_input2 = torch.tensor(dummy_input2) dummy_input = (dummy_input1, dummy_input2) d_axes = {input_names[0]:{0:'N'}, input_names[1]:{0:'N'}} if output_names is None: output_names = ["output_0"] d_axes.update({"output_0": {0:'N'}}) else: d_axes.update({key: {0:'N'} for key in output_names}) export(model, dummy_input, output_path, input_names=list(input_names), output_names=output_names, dynamic_axes=d_axes, **kwargs) return True if framework == "tensorflow": from onnxmltools import convert_tensorflow from IVaps.utils import get_extension import subprocess import tensorflow as tf if output_path is None: raise ValueError(f"Conversion from {framework} requires output_path.") if not isinstance(model, str): raise ValueError(f"Conversion from {framework} requires `model` to be a str path.") if target_opset is None: from onnxconverter_common.onnx_ex import get_maximum_opset_supported target_opset = get_maximum_opset_supported() target_opset = str(target_opset) # Run tf2onnx conversion # TODO: PIPE SUBPROCESS OUTPUT TO STDOUT if get_extension(model) == "pb": if not tf_input_names or not tf_output_names: raise ValueError( "Please provide --model_inputs_names and --model_outputs_names to convert Tensorflow graphdef models.") # Convert input vars to string tf_input_names = ",".join(tf_input_names) tf_output_names = ",".join(tf_output_names) call = ["python", "-m", "tf2onnx.convert", "--input", model, "--output", output_path, "--inputs", tf_input_names, "--outputs", tf_output_names, "--opset", target_opset, "--fold_const", "--target", "rs6"] subprocess.check_call(call) elif get_extension(model) == "meta": if not tf_input_names or not tf_output_names: raise ValueError( "Please provide --model_inputs_names and --model_outputs_names to convert Tensorflow graphdef models.") # Convert input vars to string tf_input_names = ",".join(tf_input_names) tf_output_names = ",".join(tf_output_names) call = ["python", "-m", "tf2onnx.convert", "--checkpoint", model, "--output", output_path, "--inputs", tf_input_names, "--outputs", tf_output_names, "--opset", target_opset, "--fold_const", "--target", "rs6"] subprocess.check_call(call) else: call = ["python", "-m", "tf2onnx.convert", "--saved-model", model, "--output", output_path, "--opset", target_opset, "--fold_const", "--target", "rs6"] subprocess.check_call(call) return True else: print(f"{framework} conversion not yet implemented for this function." "Please see https://github.com/onnx/onnxmltools for more conversion functions.") return False
# Wraps check_model function in onnx_converter
[docs]def check_conversion(model_path: str, onnx_model_path: str, framework: str, test_input_path: str = None, tf_input_names: Sequence = None, tf_output_names: Sequence = None, log_path: str = None): """Check successful conversion of ONNX model Parameters ----------- model_path: str Path to original saved model onnx_model_path: str Path to converted ONNX model framework: str Reference string for one of the implemented frameworks test_input_path: str, default: None Path to folder with saved .pb test inputs tf_input_names: Sequence, default: None Names of inputs for Tensorflow model, if applicable tf_output_names: Sequence, default: None Names of outputs for Tensorflow model, if applicable log_path: str, default: None Path to save test results Returns ----------- bool True if model passses all checks """ from IVaps.utils import check_model, generate_inputs output_template = { "output_onnx_path": onnx_model_path, # The output path where the converted .onnx file is stored. "correctness_verified": "", # SUCCEED, NOT SUPPORTED, SKIPPED "input_folder": "", "error_message": "" } # Generate random inputs for the model if input files are not provided try: # Will search for .pb files in `test_input_path` if not None, otherwise checks `model_path` # and copies the files over to `test_data_set_0` in the onnx model directory if not already created inputs_path = generate_inputs(model_path, test_input_path, onnx_model_path) output_template["input_folder"] = inputs_path except Exception as e: output_template["error_message"] = str(e) output_template["correctness_verified"] = "SKIPPED" print("\n-------------\nMODEL CONVERSION SUMMARY\n") print(output_template) if log_path is not None: print(f"Writing log output to {log_path}...") with open(log_path, "w") as f: json.dump(output_template, f, indent=4) raise e print("\n-------------\nMODEL CORRECTNESS VERIFICATION\n") # Test correctness: check_model can be called with arbitrary inputs without output as well # Get saved ONNX model input names verify_status = check_model(model_path, onnx_model_path, inputs_path, framework, tf_input_names, tf_output_names) output_template["correctness_verified"] = verify_status print("\n-------------\nMODEL CONVERSION SUMMARY\n") print(output_template) if log_path is not None: print(f"Writing log output to {log_path}...") with open(output_json_path, "w") as f: json.dump(output_template, f, indent=4) return True
[docs]def convert_data_to_pb(pickle_path: str, output_folder: str ="test_data_set_0", is_input=True): """ Convert pickle test data file to ONNX .pb files. Parameters ----------- pickle_path: str The path to your pickle file. The pickle file should contain a dictionary with the following format: \\{ input_name_1: test_data_1, input_name_2: test_data_2, ... \\} output_folder: str, default: "test_data_set_0" The folder to store .pb files. The folder should be empty and its name starts with test_data_*. """ import pickle, os extension = pickle_path.split(".")[1] if extension == "pb": print("Test Data already in .pb format. ") return try: test_data_dict = pickle.load(open(pickle_path, "rb")) except: raise ValueError("Cannot load test data with pickle. ") # Type check for the pickle file. Expect a dictionary with input names as keys # and data as values. if type(test_data_dict) is not dict: raise ValueError("Data type error. Expect a dictionary with input names as keys and data as values.") # Create a test_data_set folder if not exists if not os.path.exists(output_folder): os.makedirs(output_folder) file_prefix = "input_" if is_input == False: file_prefix = "output_" idx = 0 for name, data in test_data_dict.items(): tensor = numpy_helper.from_array(data) tensor.name = name pb_file_name = file_prefix + f"{idx}.pb" pb_file_location = os.path.join(output_folder, pb_file_name) with open(pb_file_location, 'wb') as f: f.write(tensor.SerializeToString()) print("Successfully stored input {} in {}".format(name, pb_file_location)) idx += 1
[docs]@jit(nopython=True) def standardize(X): """ Standardize 2D array of variables """ mu = [] sigma = [] for i in range(X.shape[1]): mu.append(np.nanmean(X[:,i])) sigma.append(np.nanstd(X[:,i])) mu = np.array(mu) sigma = np.array(sigma) X = (X - mu)/sigma return (X, mu, sigma)
[docs]@jit(nopython=True) def cumMean1D(X, S): """ Return mean of every S rows """ i = 0 ret = [] while (i+1)*S <= X.shape[0]: ret.append(np.mean(X[(i*S):(i+1)*S])) i += 1 ret = np.array(ret) return ret
[docs]@jit(nopython=True) def cumMean2D(X, S): """ Return mean of every S rows for every delta """ nobs = int(X[0].shape[0]/S) ret = np.empty((nobs, 0)) for x_tmp in X: i = 0 ret_tmp = [] while (i+1)*S <= x_tmp.shape[0]: ret_tmp.append(np.mean(x_tmp[(i*S):(i+1)*S])) i += 1 ret_tmp = np.array(ret_tmp) ret = np.column_stack((ret, ret_tmp)) return ret
def _olive_convert(model_name: str, framework: str, test_data_path: str = None, convert_from_pickle: bool = False, input_pickle: str = None, output_pickle: str = None, output_folder: str = None, model_path: str = "./", convert_directory: str = "./", convert_name: str = None, update_sdk: bool = True, **kwargs): import os import wget import subprocess url = "https://raw.githubusercontent.com/microsoft/OLive/master/utils/" sdk_files = ["onnxpipeline.py", "convert_test_data.py", "config.py"] sdk_dir = "./python_sdk" if not os.path.exists(sdk_dir): os.makedirs(sdk_dir) if update_sdk == True: for filename in sdk_files: target_file = os.path.join(sdk_dir, filename) if not os.path.exists(target_file) or update_sdk == True: print("Downloading OLive Python SDK files...") wget.download(url + filename, target_file) print("Downloaded", filename) # Pull latest onnx-converter image from mcr print("Pulling latest onnx-converter image...") subprocess.run(["docker", "pull", "mcr.microsoft.com/onnxruntime/onnx-converter"]) # Convert test data if toggled -- output path will be same directoy as converted model if convert_from_pickle == True: pass # Initiate conversion pipeline in convert directory sys.path.append("./python_sdk") import onnxpipeline pipeline = onnxpipeline.Pipeline(model_path, convert_directory = convert_directory, convert_name = convert_name) # Different frameworks require different inputs model = pipeline.convert_model(model = model_name, model_type = framework) def _guess_numpy_type(data_type): """Guess the ONNX tensortype from the given numpy dtype""" if data_type == np.float32: return FloatTensorType if data_type == np.float64: return DoubleTensorType if data_type == np.str: return StringTensorType if data_type in (np.int64, np.uint64): return Int64TensorType if data_type in (np.int32, np.uint32): return Int32TensorType if data_type == np.bool: return BooleanTensorType raise NotImplementedError( "Unsupported data_type '{}'. You may raise an issue " "at https://github.com/onnx/sklearn-onnx/issues." "".format(data_type))