training code

main
Israel Figueroa 2024-12-06 19:08:51 -03:00
parent ccf814e7de
commit a56a9b1d4e
4 changed files with 1036 additions and 0 deletions

BIN
MODY_data.xlsx 100644

Binary file not shown.

322
load_dataset.py 100644
View File

@ -0,0 +1,322 @@
import numpy as np
import os
import xlsxwriter
def safe_float(x):
try:
return float(x.replace(',', '.'))
except ValueError:
return np.nan
def load_data(Reload=False):
if os.path.isfile('MODY_data.xlsx'):
import pandas as pd
with pd.ExcelFile("MODY_data.xlsx") as xls:
dsm1_complete = pd.read_excel(xls, sheet_name='Dataset MODY1')
dsm2_complete = pd.read_excel(xls, sheet_name='Dataset MODY2')
dsm3_complete = pd.read_excel(xls, sheet_name='Dataset MODY3')
dsm5_complete = pd.read_excel(xls, sheet_name='Dataset MODY5')
else:
print("========================================================================================")
if not os.path.isfile('HC.xlsx'):
raise 'NoDatasetToLoad'
import pandas as pd
with pd.ExcelFile("HC.xlsx") as xls:
raw_data = pd.read_excel(xls, header=0)
# pd.read_excel('HC.xlsx', header=0)
# Retiramos las columnas que no son de interes
drop_columns=['HC', 'probando', 'procedencia','apellido','fecha ingreso','edad','pago','factura','monto','Pendiente','método','Referencias','Analisis','aclar_pagos','tratamiento','notas','nro de familia', 'resultado']
raw_data.drop(columns=drop_columns, inplace=True)
for index, var in raw_data.iterrows():
if not pd.isna(var['IMC']) and isinstance(var['IMC'], str):
raw_data.loc[index, 'IMC'] = safe_float(var['IMC'])
if not pd.isna(var['A1c']) and isinstance(var['A1c'], str):
raw_data.loc[index, 'A1c'] = safe_float(var['A1c'])
if not pd.isna(var['edad diag']) and isinstance(var['edad diag'], str):
raw_data.loc[index, 'edad diag'] = round(safe_float(var['edad diag']),0)
if not pd.isna(var['glu ayu']) and isinstance(var['glu ayu'], str):
raw_data.loc[index, 'glu ayu'] = round(safe_float(var['glu ayu']),0)
if not pd.isna(var['glu 120']) and isinstance(var['glu 120'], str):
raw_data.loc[index, 'glu 120'] = round(safe_float(var['glu 120']),0)
raw_data['IMC'] = raw_data['IMC'].astype(np.float64)
raw_data['A1c'] = raw_data['A1c'].astype(np.float64)
raw_data['edad diag'] = raw_data['edad diag'].astype(np.float64)
raw_data['glu ayu'] = raw_data['glu ayu'].astype(np.float64)
raw_data['glu 120'] = raw_data['glu 120'].astype(np.float64)
diagnosticos = []
for index, var in raw_data.iterrows():
if var['sospecha MODY'] == '2':
diagnosticos.append(var['diagnostico'])
print("Total elementos en el dataset con sospecha MODY2:\t{}".format(len(diagnosticos)))
print("Diagnosticos del grupo:")
diagnosticos = list(set(diagnosticos))
for diagnostico in diagnosticos:
print("- '{}'".format(diagnostico))
print("========================================================================================")
diagnosticos = []
for index, var in raw_data.iterrows():
if var['sospecha MODY'] == '3':
diagnosticos.append(var['diagnostico'])
print("Total elementos en el dataset con sospecha MODY3:\t{}".format(len(diagnosticos)))
print("Diagnosticos del grupo:")
diagnosticos = list(set(diagnosticos))
for diagnostico in diagnosticos:
print("- '{}'".format(diagnostico))
print("========================================================================================")
diagnosticos = []
for index, var in raw_data.iterrows():
if var['sospecha MODY'] not in ['2', '3']:
diagnosticos.append(var['diagnostico'])
print("Total elementos en el dataset con sospechas diferentes a 2 o 3:\t{}".format(len(diagnosticos)))
print("Diagnosticos del grupo:")
diagnosticos = list(set(diagnosticos))
for diagnostico in diagnosticos:
print("- '{}'".format(diagnostico))
## generación de las clases en base a la confirmación de la sospecha
raw_data['MODY1_pos'] = False
raw_data['MODY1_neg'] = False
raw_data['MODY2_pos'] = False
raw_data['MODY2_neg'] = False
raw_data['MODY3_pos'] = False
raw_data['MODY3_neg'] = False
raw_data['MODY5_pos'] = False
raw_data['MODY5_neg'] = False
raw_data['SiEntiqueta'] = False
raw_data['Normal'] = False
raw_data.loc[ (raw_data['diagnostico'].str.contains('Diagnóstico MODY1', case=False, na=False)), 'MODY1_pos'] = True
raw_data.loc[ (raw_data['diagnostico'].str.contains('Diagnóstico MODY2', case=False, na=False)), 'MODY2_pos'] = True
raw_data.loc[ (raw_data['diagnostico'].str.contains('Diagnóstico MODY3', case=False, na=False)), 'MODY3_pos'] = True
raw_data.loc[ (raw_data['diagnostico'].str.contains('Diagnóstico MODY5', case=False, na=False)), 'MODY5_pos'] = True
raw_data.loc[ (raw_data['sospecha MODY'] == '1') & (raw_data['diagnostico'].str.contains('No se confirma', case=False, na=False)), 'MODY1_neg'] = True
raw_data.loc[ (raw_data['sospecha MODY'] == '2') & (raw_data['diagnostico'].str.contains('No se confirma', case=False, na=False)), 'MODY2_neg'] = True
raw_data.loc[ (raw_data['sospecha MODY'] == '3') & (raw_data['diagnostico'].str.contains('No se confirma', case=False, na=False)), 'MODY3_neg'] = True
raw_data.loc[ (raw_data['sospecha MODY'] == '5') & (raw_data['diagnostico'].str.contains('No se confirma', case=False, na=False)), 'MODY5_neg'] = True
raw_data.loc[ (raw_data['diagnostico'].str.contains('Normal', case=False, na=False)), 'Normal' ] = True
raw_data.loc[ (raw_data['diagnostico'].str.contains('No se hace', case=False, na=False)), 'SiEntiqueta'] = True
raw_data.loc[ (raw_data['diagnostico'].str.contains('Sin diagnóstico', case=False, na=False)), 'SiEntiqueta'] = True
raw_data.loc[ (raw_data['diagnostico'].str.contains('Otros', case=False, na=False)), 'SiEntiqueta'] = True
raw_data.loc[ (raw_data['diagnostico'].str.contains('No es MODY', case=False, na=False)), 'SiEntiqueta'] = True
raw_data.loc[ (raw_data['diagnostico'].str.contains('Falta definir', case=False, na=False)), 'SiEntiqueta'] = True
raw_data.loc[ (~raw_data['sospecha MODY'].isin(['1', '2', '3', '5'])) & (raw_data['diagnostico'].str.contains('No se confirma', case=False, na=False)), 'SiEntiqueta'] = True
raw_data.loc[ pd.isna(raw_data['diagnostico']), 'SiEntiqueta'] = True
print("================== Datos sin confirmar/descartar ningún MODY ===========================")
tipos = ['MODY1_pos', 'MODY1_neg', 'MODY2_pos', 'MODY2_neg', 'MODY3_pos', 'MODY3_neg','MODY5_pos', 'MODY5_neg','Normal','SiEntiqueta']
sinconfirmar = 0
## Datos que no cumplen con el criterio
for index, var in raw_data.iterrows(): # imprime los registros que no pertenecen a ninguna categoria:
if not any(var[col] for col in tipos):
print("sujeto: {} \t| sospecha: {} \t| diagnostico: {:18} \t | historial: {} ".format(var['protocolo'],var['sospecha MODY'], var['diagnostico'], var['historial']))
sinconfirmar += 1
print("====================== Diagnosticos confirmados/descartados ==========================")
contador = {}
for tipo in tipos:
contador[tipo] = 0
for index, var in raw_data.iterrows():
for tipo in tipos:
if var[tipo]:
contador[tipo] += 1
for tipo in tipos:
print("{:20} \t {} ({}%)".format(tipo, contador[tipo], round((contador[tipo]/len(raw_data))*100, 2)))
print("=========================== ==================== ==================================")
label_vars = ['protocolo', 'nombre', 'edad diag', 'IMC', 'antecedentes fam', 'glu ayu', 'glu 120', 'A1c','MODY1_pos', 'MODY1_neg', 'MODY2_pos', 'MODY2_neg', 'MODY3_pos', 'MODY3_neg','MODY5_pos', 'MODY5_neg','Normal']
pre_labeled_data = raw_data[raw_data['SiEntiqueta'] == False][label_vars]
pre_labeled_data.head()
"""## 2.2. Antecedentes familiares
Se genera el campo a partir del comentario del grupo familiar
"""
pre_labeled_data['diabetes_familia'] = np.nan
## -1 == no hay antecedentes familiares de diabetes
pre_labeled_data.loc[pre_labeled_data['antecedentes fam'].str.lower().str.startswith('no', na=False), 'diabetes_familia'] = -1.0
pre_labeled_data.loc[ pre_labeled_data['antecedentes fam'].str.contains('no dm', case=False, na=False), 'diabetes_familia'] = -1.0
## 1 == si hay antecedentes familiares de diabetes
pre_labeled_data.loc[ pre_labeled_data['antecedentes fam'].str.lower().str.startswith('si', na=False), 'diabetes_familia'] = 1.0
pre_labeled_data.loc[ pre_labeled_data['antecedentes fam'].str.lower().str.startswith('her', na=False), 'diabetes_familia'] = 1.0 #hermana o hermano
pre_labeled_data.loc[ pre_labeled_data['antecedentes fam'].str.lower().str.startswith('pad', na=False), 'diabetes_familia'] = 1.0
pre_labeled_data.loc[ pre_labeled_data['antecedentes fam'].str.lower().str.startswith('mad', na=False), 'diabetes_familia'] = 1.0
pre_labeled_data.loc[ pre_labeled_data['antecedentes fam'].str.lower().str.startswith('amb', na=False), 'diabetes_familia'] = 1.0
pre_labeled_data.loc[ pre_labeled_data['antecedentes fam'].str.lower().str.startswith('hij', na=False), 'diabetes_familia'] = 1.0 #hija o hijo
pre_labeled_data.loc[ pre_labeled_data['antecedentes fam'].str.lower().str.startswith('multi', na=False), 'diabetes_familia'] = 1.0
pre_labeled_data.loc[ pre_labeled_data['antecedentes fam'].str.lower().str.startswith('ti', na=False), 'diabetes_familia'] = 1.0 #tia o tio
pre_labeled_data.loc[ pre_labeled_data['antecedentes fam'].str.lower().str.startswith('abu', na=False), 'diabetes_familia'] = 1.0 #abuela o abuelo
pre_labeled_data.loc[ pre_labeled_data['antecedentes fam'].str.lower().str.startswith('diab', na=False), 'diabetes_familia'] = 1.0
## 0 == no se sabe: sin información (Ej: adoptado)
# clean_data.loc[ clean_data['diabetes_familia'] == 0, 'antecedentes fam'].unique() #muestra los valores que no tienen match con lo indicado anteriormente
pre_labeled_data.loc[ pre_labeled_data['antecedentes fam'].str.lower().str.startswith('mare', na=False), 'diabetes_familia'] = 1.0 #anomalía, madre mal escrito
print("==================================== Clasificados =============================================")
for value, count in pre_labeled_data[~pre_labeled_data['diabetes_familia'].isna()]['diabetes_familia'].value_counts(dropna=False).items():
print(f"Value: {value}, Count: {count}")
print("==================================== No se pudo Clasificar =============================================")
for value, count in pre_labeled_data[pre_labeled_data['diabetes_familia'].isna()]['antecedentes fam'].value_counts(dropna=False).items():
print(f"Value: {value}, Count: {count}")
"""## 2.3. Sexo
Se infiere el sexo a partir de los nombres
"""
pre_labeled_data['sexo'] = np.nan
## 1 == Mujer
nombres_f = ['andrea', 'agustina', 'antonella', 'angelica', 'alicia', 'alejandra', 'ariana', 'ayelen', 'ayleen', 'belen', 'bianca',
'camila', 'carolina', 'catalina', 'claudia', 'delfina', 'eliana', 'estefania', 'eva', 'karina', 'florencia', 'gabriela',
'georgina', 'geraldine', 'guillermina', 'jazmin', 'jessica', 'julieta', 'karen', 'laura', 'lidia', 'lucia', 'magali', 'mina',
'mabel', 'malena', 'malena', 'mariana', 'marina', 'martina', 'micaela', 'micalela', 'milagros', 'milena',
'miriam', 'morena', 'natalia', 'noemi', 'nayla', 'rocio', 'rosa', 'sandra', 'sara', 'sasha', 'silvia', 'silvana',
'sofia', 'solange', 'soledad', 'valentina', 'victoria', 'vanina', 'vanesa', 'virginia', 'yanina', 'zamira',
'abril', 'adriana', 'ailen', 'aixa', 'ambar', 'ana', 'ana esmerlada', 'ana iris', 'anahi', 'analia', 'aylen', 'barbara',
'brenda', 'brisa', 'candela', 'carmela (carmen)', 'chiara', 'elizabeth', 'ema', 'emilia', 'emma', 'eugenia', 'fiorella',
'flavia', 'franca', 'francesca', 'graciela', 'helena', 'isabela', 'isabella', 'jacinta', 'jesica', 'jorgelina', 'julia', 'lorena',
'lucila', 'lucía', 'magdalena', 'maricruz', 'mariel', 'mariela', 'marilina', 'marixa', 'martha', 'maría emilia', 'maría verónica',
'melany', 'mercedes', 'monica', 'nancy rosa alba', 'nerina', 'oriana', 'paola', 'patricia', 'paula', 'pilar', 'priscila', 'renata',
'romina', 'roxana', 'ruth', 'shirley', 'tamara', 'valeria' ]
nombres_f.append('zahirah') # dejo los nombres que me hacen duda en forma individual
nombres_f.append('antu')
nombres_f.append('tali')
nombres_f.append('ma laura')
nombres_f.append('qian') # nombre femenino de origen chino
nombres_f.append('maria')
for nombre_f in nombres_f:
pre_labeled_data.loc[ pre_labeled_data['sexo'].isna() & (pre_labeled_data['nombre'].str.lower().str.startswith(nombre_f, na=False)), 'sexo'] = 1.0
## -1 == Hombre
nombres_h = ['agustin', 'alejandro', 'alvaro', 'augusto', 'benjamin', 'bruno', 'camilo', 'cristian', 'damian', 'dario', 'daniel', 'dante',
'david', 'diego', 'emiliano', 'elian', 'enzo', 'ezequiel', 'facundo', 'federico', 'felipe', 'fernando', 'felix', 'franco', 'german',
'gonzalo', 'gustavo', 'guillermo', 'ignacio', 'ian','joaquin', 'juan', 'julian', 'leandro', 'lorenzo', 'lucas', 'luka', 'marcelo',
'marcos', 'martin', 'martin', 'maximiliano', 'mateo', 'matias', 'pablo', 'nehemias', 'nicolas', 'ramiro', 'rogelio', 'rodrigo',
'santiago', 'santino', 'sebastian', 'thiago', 'tomas',
'alan', 'alfredo', 'antonio', 'axel', 'benicio', 'carlos', 'carlos gonzalo', 'claudio', 'dylan', 'eduardo', 'emanuel', 'ernesto',
'fabian', 'farid', 'fidel', 'francisco', 'gabriel facundo', 'gael', 'gerardo', 'gerónimo', 'hernan', 'ivan', 'javier', 'jorge',
'julio', 'mauricio', 'miguel angel', 'oscar', 'pedro', 'raul', 'rene', 'ricardo', 'roberto', 'sergio', 'teo', 'tiago', 'tobias', 'walter']
nombres_h.append('agustín')
for nombre_h in nombres_h:
pre_labeled_data.loc[ pre_labeled_data['sexo'].isna() & (pre_labeled_data['nombre'].str.lower().str.startswith(nombre_h, na=False)), 'sexo'] = -1.0
print("==================================== Clasificados =============================================")
for value, count in pre_labeled_data[~pre_labeled_data['sexo'].isna()]['sexo'].value_counts(dropna=False).items():
print(f"Value: {value}, Count: {count}")
listnames = []
print("==================================== No se pudo Clasificar =============================================")
for value, count in pre_labeled_data[pre_labeled_data['sexo'].isna()]['nombre'].value_counts(dropna=False).items():
print(f"Value: {value}, Count: {count}")
listnames.append(value)
print(sorted([x for x in listnames if isinstance(x, str)]))
"""## 2.1. Registros incompletos
Se desplegan información sobre valores faltantes en las variables de interés, sujetos sin datos y se genera una versión que solo incluye los registros que contienen toda la información para poder ser usados en el entrenamiento.
"""
import pandas as pd
variables = ['sexo', 'diabetes_familia','edad diag', 'IMC', 'glu ayu', 'glu 120', 'A1c']
print("========================================================================================")
print("Total registros en el dataset etiquetado:\t{}".format(pre_labeled_data.shape[0]))
print("Variables:\t{}".format(str(variables)))
print("==================== Desglose por N de variables faltantes ==============================")
for num in range(len(variables)+1):
nrows = len(pre_labeled_data[pre_labeled_data[variables].isnull().sum(axis=1) == num])
print("Le faltan {}/{} variables:\t{}\t({}%)".format(num, len(variables), nrows, round(nrows*100/pre_labeled_data.shape[0], 2)))
print("============================ Desglose por variables =====000=============================")
for var in variables:
nrows = pre_labeled_data[var].isna().astype(int).sum()
print("Variable {} ausente en \t\t {} ({}%) registros ".format(var, nrows, round(nrows*100/pre_labeled_data.shape[0], 2)))
pre_labeled_data['MODY1_label'] = np.nan
pre_labeled_data.loc[pre_labeled_data['MODY1_pos'], 'MODY1_label'] = 1
pre_labeled_data.loc[pre_labeled_data['MODY1_neg'], 'MODY1_label'] = 0#-1
pre_labeled_data['MODY2_label'] = np.nan
pre_labeled_data.loc[pre_labeled_data['MODY2_pos'], 'MODY2_label'] = 1
pre_labeled_data.loc[pre_labeled_data['MODY2_neg'], 'MODY2_label'] = 0#-1
pre_labeled_data['MODY3_label'] = np.nan
pre_labeled_data.loc[pre_labeled_data['MODY3_pos'], 'MODY3_label'] = 1
pre_labeled_data.loc[pre_labeled_data['MODY3_neg'], 'MODY3_label'] = 0#-1
pre_labeled_data['MODY5_label'] = np.nan
pre_labeled_data.loc[pre_labeled_data['MODY5_pos'], 'MODY5_label'] = 1
pre_labeled_data.loc[pre_labeled_data['MODY5_neg'], 'MODY5_label'] = 0#-1
"""# 3. Datos iniciales"""
dsm1_complete = pre_labeled_data[~pre_labeled_data['MODY1_label'].isna()][variables+['MODY1_label']]
dsm2_complete = pre_labeled_data[~pre_labeled_data['MODY2_label'].isna()][variables+['MODY2_label']]
dsm3_complete = pre_labeled_data[~pre_labeled_data['MODY3_label'].isna()][variables+['MODY3_label']]
dsm5_complete = pre_labeled_data[~pre_labeled_data['MODY5_label'].isna()][variables+['MODY5_label']]
dsnormal_complete = pre_labeled_data[pre_labeled_data['Normal']][variables]
"""# 4. Salida intermedia de los datos para verificación manual
Guarda los dataframes en un excel para verificación
"""
with pd.ExcelWriter("MODY_data.xlsx", engine='xlsxwriter') as xls:
raw_data.to_excel(xls, sheet_name='HC Original', index=False)
pre_labeled_data.to_excel(xls, sheet_name='Datos etiquetados', index=False)
raw_data[raw_data['SiEntiqueta'] == True].to_excel(xls, sheet_name='Datos excluídos', index=False)
dsm1_complete.to_excel(xls, sheet_name='Dataset MODY1', index=False)
dsm1_complete.dropna().to_excel(xls, sheet_name='Dataset MODY1 sin ausentes', index=False)
dsm2_complete.to_excel(xls, sheet_name='Dataset MODY2', index=False)
dsm2_complete.dropna().to_excel(xls, sheet_name='Dataset MODY2 sin ausentes', index=False)
dsm3_complete.to_excel(xls, sheet_name='Dataset MODY3', index=False)
dsm3_complete.dropna().to_excel(xls, sheet_name='Dataset MODY3 sin ausentes', index=False)
dsm5_complete.to_excel(xls, sheet_name='Dataset MODY5', index=False)
dsm5_complete.dropna().to_excel(xls, sheet_name='Dataset MODY5 sin ausentes', index=False)
dsnormal_complete.to_excel(xls, sheet_name='Sin Diabetes', index=False)
return dsm1_complete, dsm2_complete, dsm3_complete, dsm5_complete

19
train.py 100644
View File

@ -0,0 +1,19 @@
from load_dataset import load_data
from trainer import BinaryTuner
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings("ignore")
_, dms2, dms3, _ = load_data()
mody2 = BinaryTuner(dms2, 'MODY2_label', seeds=[231964], drop_ratio=0.2)
mody2.fit()
mody2.explain_model('GaussianNB', 'fulldataset-oversampled-mice', 231964)
mody2.wrap_and_save()
mody3 = BinaryTuner(dms3, 'MODY3_label', seeds=[536202], drop_ratio=0.2)
mody3.fit()
mody3.explain_model('RandomForestClassifier', 'fulldataset-original-mice', 536202)
mody3.wrap_and_save()

695
trainer.py 100644
View File

@ -0,0 +1,695 @@
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
import matplotlib
matplotlib.rcParams['text.usetex'] = True
from sklearn.model_selection import train_test_split, StratifiedKFold, GridSearchCV, StratifiedShuffleSplit
from sklearn.preprocessing import StandardScaler
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import KNNImputer, IterativeImputer
from sklearn.metrics import accuracy_score, recall_score, f1_score, roc_auc_score, confusion_matrix
from sklearn.utils.multiclass import type_of_target
from sklearn.linear_model import LogisticRegression, Perceptron, SGDClassifier
from sklearn.cross_decomposition import PLSRegression
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.naive_bayes import GaussianNB
from sklearn.neighbors import KNeighborsClassifier
from imblearn.over_sampling import RandomOverSampler
from xgboost import XGBClassifier
from ray import tune
import ray
from keras.callbacks import TensorBoard
from keras.models import Sequential
from keras.callbacks import EarlyStopping
from keras.utils import set_random_seed
from keras.metrics import AUC
from keras.layers import Dense, BatchNormalization, Dropout
from kerastuner.tuners import RandomSearch, Hyperband, GridSearch
from kerastuner.engine.trial import TrialStatus
import shap
from datetime import datetime
import enlighten
import logging
import joblib
import zipfile
import pickle
import time
import json
import os
from scipy.stats import loguniform, randint
#tf.config.experimental.enable_op_determinism()
#from sklearn.experimental import enable_halving_search_cv # noqa
#from sklearn.model_selection import HalvingRandomSearchCV
class BinaryTuner:
def __init__(self, dataFrame, label_class, seeds=None, dnn=False, drop_ratio=0.2, test_prio=0.9, tuneScoring=None, debug=False, n_seeds=3):
self.ledger = pd.DataFrame(columns=["node", "ts", "Dataset", "Model", "Params", "Seed", "Ratio", "Accuracy", "Specificity", "Recall", "F1", "ROC_AUC"])
self.name = label_class
os.makedirs(self.name, exist_ok=True)
self.start = int(time.time())
log_format = '%(asctime)s | %(levelname)-8s | %(name)-15s | %(message)s'
date_format = '%Y-%m-%d %H:%M:%S'
logging.basicConfig(format=log_format, datefmt=date_format)
target_log = '{}/load-{}.log'.format(self.name, self.start)
fh = logging.FileHandler(target_log)
self.debug = debug
self.test_prio = test_prio
self.tuneScoring = tuneScoring
self.dataFrame = dataFrame.copy()
self.dnn = dnn
self.logger = logging.getLogger("BinaryTuners")
if self.debug:
self.logger.setLevel(logging.DEBUG)
fh.setLevel(logging.DEBUG)
else:
self.logger.setLevel(logging.INFO)
fh.setLevel(logging.INFO)
self.logger.addHandler(fh)
self.last_ping = self.start
self.ratio = drop_ratio
if not isinstance(seeds, list):
self.seeds = [np.random.randint(1, 2**20) for _ in range(n_seeds)]
else:
self.seeds = seeds
self.logger.info('{:#^60}'.format(label_class))
self.loadCheckPoint()
self.__metaVars()
def __metaVars(self):
len_models = len(self.get_model_train()) + self.dnn
self.logger.info("Len models: {}".format(len_models))
len_seeds = len(self.seeds)
self.logger.info("Len seeds: {}".format(len_seeds))
full = self.dataFrame.drop(self.name, axis=1)
self.nvars = full.shape[1]
self.logger.info("Nvars: {}".format(self.nvars))
label_data_drop = self.dataFrame.dropna()[self.name]
label_data_full = self.dataFrame[self.name]
if label_data_drop.shape[0] != label_data_full.shape[0]: # Missingvals in the dataset
valsize = int(self.nvars/2)
self.logger.info("Valsize: {}".format(valsize))
self.noMissingDataset = self.dataFrame.dropna().copy()
self.missingDatasets = []
self.missingDatasets.append((self.dataFrame.copy(), 'fulldataset'))
self.missingDatasets.append((self.dataFrame[self.dataFrame.isna().sum(axis=1) <= valsize].copy(), 'drop{}'.format(valsize)))
self.logger.info("Len noMissingDataset: {}".format(self.noMissingDataset.shape[0]))
for i, df in enumerate(self.missingDatasets):
self.logger.info("Len MissingDataset {}: {}".format(i, df[0].shape[0]))
else:
self.noMissingDataset = self.dataFrame.copy()
self.missingDatasets = []
os.makedirs("{}/nomissing-original".format(self.name), exist_ok=True)
len_datasets = 1 + 2*len(self.missingDatasets)
self.logger.info("Len datasets: {}".format(len_datasets))
len_unbalanced = 0
if not self.is_balanced(self.noMissingDataset[self.name]):
len_unbalanced += 1
os.makedirs("{}/nomissing-oversampled".format(self.name), exist_ok=True)
for dfData, dfname in self.missingDatasets:
os.makedirs("{}/{}-original".format(self.name, dfname), exist_ok=True)
if not self.is_balanced(dfData[self.name]):
len_unbalanced += 2
os.makedirs("{}/{}-oversampled".format(self.name, dfname), exist_ok=True)
self.logger.info("Len unbalanced: {}".format(len_unbalanced))
total_models = len_seeds * len_models * (len_datasets + len_unbalanced)
self.logger.info("Total Models to be trained: {}".format(total_models))
self.logger.info("Total Models in the ledger: {}".format(self.trained))
self.total_models = total_models
self.logger.info("{:=^60}".format("######"))
def addSeed(self, n_seeds=None, seeds=None):
if isinstance(seeds, list):
self.seeds = list(set(self.seeds + seeds))
elif isinstance(n_seeds, int):
seeds = [np.random.randint(1, 2**20) for _ in range(n_seeds)]
self.seeds = list(set(self.seeds + seeds))
else:
seeds = [np.random.randint(1, 2**20)]
self.seeds = list(set(self.seeds + seeds))
self.saveCheckPoint()
self.__metaVars()
def is_balanced(self, dfData):
balance_count = dfData.value_counts()
total_len = balance_count[0] + balance_count[1] #dataset length
balance_ratio = int(100*abs((balance_count[0] - balance_count[1])/(balance_count[0] + balance_count[1])))
return balance_ratio < 5
def ping(self, msg):
curtime = int(time.time())
delta = curtime - self.last_ping
self.last_ping = curtime
self.logger.info("{:<50}\t|{:4}m {:2}s".format(msg, int(delta//60), int(delta%60)))
def loadCheckPoint(self):
if not os.path.isfile('{}/Simulaciones.xlsx'.format(self.name)):
self.saveCheckPoint()
with pd.ExcelFile('{}/Simulaciones.xlsx'.format(self.name)) as xls:
self.ledger = pd.read_excel(xls, sheet_name='Historial')
self.trained = self.ledger.shape[0]
with pd.ExcelFile('{}/Dataset.xlsx'.format(self.name)) as xls:
self.dataFrame = pd.read_excel(xls, sheet_name=self.name)
with open('{}/vars.pickle'.format(self.name), 'rb') as pfile:
self.name, self.seeds, self.dnn, self.ratio, self.test_prio, self.tuneScoring = pickle.load(pfile)
def saveCheckPoint(self):
with pd.ExcelWriter('{}/Simulaciones.xlsx'.format(self.name), engine='xlsxwriter') as xls:
self.ledger.to_excel(xls, sheet_name='Historial', index=False)
with pd.ExcelWriter('{}/Dataset.xlsx'.format(self.name), engine='xlsxwriter') as xls:
self.dataFrame.to_excel(xls, sheet_name=self.name, index=False)
with open('{}/vars.pickle'.format(self.name), 'wb') as pfile:
pickle.dump((self.name, self.seeds, self.dnn, self.ratio, self.test_prio, self.tuneScoring), pfile, protocol=pickle.HIGHEST_PROTOCOL)
self.trained = self.ledger.shape[0]
def get_model_train_keras(self, hp):
model = Sequential()
model.add(Dense(units=hp.Int('units_input', min_value=48, max_value=56, step=8), input_dim=self.nvars, activation='relu'))
model.add(BatchNormalization())
model.add(Dropout(rate=hp.Float('dropout_input', min_value=0.1, max_value=0.1, step=0.1)))
model.add(Dense(units=hp.Int('units_hidden', min_value=32, max_value=48, step=8), activation='relu'))
model.add(BatchNormalization())
model.add(Dropout(rate=hp.Float('dropout_hidden', min_value=0.4, max_value=0.4, step=0.1)))
model.add(Dense(1, activation='sigmoid'))
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy', AUC()])
return model
def train_and_score_model_keras(self, X_train, X_test, y_train, y_test, seed, label):
# set_random_seed(seed)
ntrials = 6
tuner = RandomSearch(
self.get_model_train_keras,
objective='val_loss', #val_loss
# seed=seed,
max_trials=ntrials,
# executions_per_trial=1, # Número de ejecuciones por cada configuración
directory=self.name,
project_name='{}-{}'.format(label,seed))
self.logger.info("{:~^60}".format(' {}-{} '.format(label,seed)))
search_dir = "{}/keras-tuner-{}/".format(self.name,label)
os.makedirs(search_dir, exist_ok=True)
search_callback = TensorBoard(log_dir=search_dir)
early_stopping_search = EarlyStopping(monitor='val_loss', patience=13, min_delta=0.005, start_from_epoch=7, restore_best_weights=True)
tuner.search(X_train, y_train, epochs=150, batch_size=10, validation_data=(X_test, y_test), callbacks=[early_stopping_search, search_callback])
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]
# best_worse = float(0)
# model_seq = 0
# best_hps = ''
# optimized_model = None
#
# for current, trial in enumerate(tuner.oracle.get_best_trials(num_trials=ntrials)):
# if trial.status == TrialStatus.COMPLETED:
# # Retrieve the training and validation metrics for the last step
# auc = trial.metrics.get_last_value("auc")
# val_auc = trial.metrics.get_last_value("val_auc")
# if auc is not None and val_auc is not None:
# worse = min(auc, val_auc)
#
# # Update the best trial if this difference is the smallest
# if worse > best_worse:
# best_worse = worse
# model_seq = current
# best_auc, best_val_auc = auc, val_auc
# optimized_model = tuner.load_model(trial)
# best_hps = trial.hyperparameters
#
# self.logger.info(f"Selected trial with (auc, val_auc) : ({best_auc}, {best_val_auc})")
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]
optimized_model = tuner.get_best_models(num_models=1)[0]
# if optimized_model is None:
# raise('model load failed')
# # Train the model
# optimized_model = Sequential()
# optimized_model.add(Dense(units=best_hps.get('units_input'), input_dim=X_train.shape[1], activation='relu'))
# optimized_model.add(BatchNormalization())
# optimized_model.add(Dropout(rate=best_hps.get('dropout_input')))
# optimized_model.add(Dense(units=best_hps.get('units_hidden'), activation='relu'))
# optimized_model.add(BatchNormalization())
# optimized_model.add(Dropout(rate=best_hps.get('dropout_hidden')))
# optimized_model.add(Dense(1, activation='sigmoid'))
# optimized_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy', 'auc'])
#
#
fit_dir = "{}/keras-fit-{}/".format(self.name, label)
os.makedirs(fit_dir, exist_ok=True)
train_callback = TensorBoard(log_dir=fit_dir)
#
model_params = "UI:{}, DI:{}, UH: {}, DH: {}".format(best_hps.get('units_input'), best_hps.get('dropout_input'), best_hps.get('units_hidden'), best_hps.get('dropout_hidden'))
self.logger.info("Model Params: {}".format(model_params))
early_stopping_train = EarlyStopping(monitor='val_loss', start_from_epoch=7, patience=43, restore_best_weights=True)
optimized_model.fit(X_train, y_train, epochs=200, batch_size=10, validation_data=(X_test, y_test), callbacks=[early_stopping_train, train_callback])
y_pred = optimized_model.predict(X_test)
if type_of_target(y_pred) == "continuous":
# make a numpy array from y_pred where all the values > 0.5 become 1 and all remaining values are 0
y_pred = np.where(y_pred > 0.5, 1, 0)
accuracy = accuracy_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
roc_auc = roc_auc_score(y_test, y_pred)
tn, fp, fn, tp = confusion_matrix(y_test, y_pred).ravel()
self.logger.info(confusion_matrix(y_test, y_pred))
specificity = tn / (tn + fp)
self.logger.info(f"True Negativ : {tn}")
self.logger.info(f"True Positive : {tp}")
self.logger.info(f"False Negative : {fn}")
self.logger.info(f"False Positive : {fp}")
self.logger.info(f"Returned model val_auc : {roc_auc}")
self.trained += 1
self.bar.update()
return accuracy, specificity, recall, f1, roc_auc, optimized_model, model_params
def get_model_train(self):
return [
LogisticRegression(),
XGBClassifier(),
RandomForestClassifier(),
Perceptron(),
SGDClassifier(),
SVC(),
GaussianNB(),
KNeighborsClassifier(),
# GradientBoostingClassifier(),
PLSRegression(),
LinearDiscriminantAnalysis()
]
def get_tunable_params(self, model):
if isinstance(model, LogisticRegression):
return {
"C": np.logspace(-2, 2, 15),
"max_iter": [80, 100, 150]
}
elif isinstance(model, XGBClassifier):
return {
"n_estimators": [50, 100, 200],
"learning_rate": np.logspace(-4, -1, 8),
"max_depth": [3, 5, 7]
}
elif isinstance(model, RandomForestClassifier):
return {
"n_estimators": [50, 100, 200],
"max_depth": [5, 10, 15],
"max_features": [2, 5, 10] #['n', 'max_depth', 'max_features', 'max_leaf_nodes', 'max_samples', 'min_impurity_decrease', 'min_samples_leaf', 'min_samples_split', 'min_weight_fraction_leaf', 'monotonic_cst', 'n_estimators', 'n_jobs', 'oob_score', 'random_state', 'verbose', 'warm_start']
}
elif isinstance(model, Perceptron):
return {
"penalty": ["l2", "l1", "elasticnet"],
"max_iter": [50, 100, 200]
}
elif isinstance(model, SGDClassifier):
return {
"alpha": np.logspace(-4, -1, 8),
"max_iter": [100, 300, 500],
"penalty": ["l2", "l1", "elasticnet"]
}
elif isinstance(model, SVC):
return {
"C": np.logspace(-1, 2, 15),
"kernel": ["linear", "poly", "rbf", "sigmoid"]
}
elif isinstance(model, LinearDiscriminantAnalysis):
return {
"solver": ["svd", "lsqr", "eigen"],
"shrinkage": [None, "auto"]
}
elif isinstance(model, PLSRegression):
return {
"n_components": [2, 3, 5]
}
elif isinstance(model, GaussianNB):
return {
"var_smoothing": np.logspace(-11, -8, 10)
}
elif isinstance(model, KNeighborsClassifier):
return {
"n_neighbors": [3, 5, 7, 9],
"weights": ["uniform", "distance"],
"p": [1, 2]
}
elif isinstance(model, GradientBoostingClassifier):
return {
"n_estimators": [50, 100, 200],
"learning_rate": np.logspace(-4, -1, 10),
"max_depth": [3, 5, 7]
}
else:
return {}
def train_and_score_model(self, model, X_train, X_test, y_train, y_test, seed):
param_dist = self.get_tunable_params(model)
rsh = GridSearchCV(estimator=model, param_grid=param_dist, cv=StratifiedKFold(3, shuffle=True, random_state=seed), scoring=self.tuneScoring, verbose=(self.debug > 3))
rsh.fit(X_train, y_train)
optimized_model = model.set_params(**rsh.best_params_)
optimized_model.fit(X_train, y_train)
y_pred = optimized_model.predict(X_test)
# make a numpy array from y_pred where all the values > 0.5 become 1 and all remaining values are 0
if type_of_target(y_pred) == "continuous":
y_pred = np.where(y_pred > 0.5, 1, 0)
accuracy = accuracy_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
roc_auc = roc_auc_score(y_test, y_pred)
tn, fp, fn, tp = confusion_matrix(y_test, y_pred).ravel()
specificity = tn / (tn + fp)
self.trained += 1
self.bar.update()
return accuracy, specificity, recall, f1, roc_auc, optimized_model, json.dumps(rsh.best_params_)
def run_dataset(self, label, X_train, X_test, y_train, y_test, seed, sublabel=None):
node = os.uname()[1]
for model in self.get_model_train():
if sublabel is None:
model_file = '{}/{}/{}_{}'.format(self.name, label, type(model).__name__, seed )
model_label = "{}".format(label)
else:
model_file = '{}/{}/{}_{}_{}'.format(self.name, label, sublabel, type(model).__name__, seed )
model_label = "{}-{}".format(label, sublabel)
inEntry = ((self.ledger['Dataset']==model_label) & (self.ledger['Model']==type(model).__name__) & (self.ledger['Seed'] == seed)).any()
if inEntry:
if os.path.isfile(model_file):
continue
else:
self.trained -= 1
self.ledger.drop(((self.ledger['Dataset']==model_label) & (self.ledger['Model']==type(model).__name__) & (self.ledger['Seed'] == seed)).index)
accuracy, specificity, recall, f1, roc_auc, optimized_model, parms = self.train_and_score_model(model, X_train, X_test, y_train, y_test, seed)
ts = datetime.now().strftime("%d/%m/%Y %H:%M:%S")
joblib.dump(optimized_model, model_file)
#[, , "Parms", "Seed", "Ratio", , , , "F1", , "ts", "node"]
newrow = pd.DataFrame( [{"node": node,
"ts": ts,
"Dataset": model_label,
"Model": type(model).__name__,
"Params": parms,
"Seed": seed,
"Ratio": self.ratio,
"Accuracy": accuracy,
"Specificity": specificity,
"Recall": recall,
"F1": f1,
"ROC_AUC": roc_auc,
}] )
self.ledger = pd.concat([self.ledger, newrow], ignore_index=True)
if self.dnn:
if sublabel is None:
model_file = '{}/{}/DNN_{}'.format(self.name, label, seed )
model_label = "{}".format(label)
else:
model_file = '{}/{}/{}_DNN_{}'.format(self.name, label, sublabel, seed )
model_label = "{}-{}".format(label, sublabel)
inEntry = ((self.ledger['Dataset']==model_label) & (self.ledger['Model']=='DNN') & (self.ledger['Seed'] == seed)).any()
if inEntry:
if os.path.isfile(model_file):
return
else:
self.trained -= 1
self.ledger.drop(((self.ledger['Dataset']==model_label) & (self.ledger['Model']=='DNN') & (self.ledger['Seed'] == seed)).index)
accuracy, specificity, recall, f1, roc_auc, optimized_model, parms = self.train_and_score_model_keras(X_train, X_test, y_train, y_test, seed, model_label)
ts = datetime.now().strftime("%d/%m/%Y %H:%M:%S")
# joblib.dump(optimized_model, model_file)
#[, , "Parms", "Seed", "Ratio", , , , "F1", , "ts", "node"]
newrow = pd.DataFrame( [{"node": node,
"ts": ts,
"Dataset": model_label,
"Model": 'DNN',
"Params": parms,
"Seed": seed,
"Ratio": self.ratio,
"Accuracy": accuracy,
"Specificity": specificity,
"Recall": recall,
"F1": f1,
"ROC_AUC": roc_auc
}] )
self.ledger = pd.concat([self.ledger, newrow], ignore_index=True)
def fit(self):
self.logger.info("{:=^60}".format(' Begin Fit {} Models '.format(self.total_models-self.trained)))
manager = enlighten.get_manager()
self.bar = manager.counter(total=self.total_models,
count=self.trained,
format='{desc}{desc_pad}{percentage:3.0f}%|{bar}| {count:{len_total}d}/{total:d} [{elapsed}<{eta}, {rate:.2f}{unit_pad}{unit}/s]',
desc='Tunning',
unit='Models')
for seed in self.seeds:
X = self.noMissingDataset.drop(self.name, axis=1)
y = self.noMissingDataset[self.name]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=self.ratio, random_state=seed, stratify=y)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
joblib.dump(scaler, '{}/nomissing-original/StandardScaler_{}'.format(self.name, seed) )
self.run_dataset('nomissing-original', X_train_scaled, X_test_scaled, y_train, y_test, seed)
if not self.is_balanced(y):
ros = RandomOverSampler(random_state=seed)
Xr_train_scaled, yr_train = ros.fit_resample(X_train_scaled, y_train)
self.run_dataset('nomissing-oversampled', Xr_train_scaled, X_test_scaled, yr_train, y_test, seed)
self.saveCheckPoint()
for dfData, dfname in self.missingDatasets:
mice = IterativeImputer(max_iter=10, random_state=seed)
df_mice = dfData.copy()
X = df_mice.drop(self.name, axis=1)
y = df_mice[self.name]
X_mice = mice.fit_transform(X)
X_train, X_test, y_train, y_test = train_test_split(X_mice, y, test_size=self.ratio, random_state=seed, stratify=y)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
joblib.dump(scaler, '{}/{}-original/mice_StandardScaler_{}'.format(self.name, dfname, seed) )
self.run_dataset('{}-original'.format(dfname), X_train_scaled, X_test_scaled, y_train, y_test, seed, 'mice')
if not self.is_balanced(y):
ros = RandomOverSampler(random_state=seed)
Xr_train_scaled, yr_train = ros.fit_resample(X_train_scaled, y_train)
self.run_dataset('{}-oversampled'.format(dfname), Xr_train_scaled, X_test_scaled, yr_train, y_test, seed, 'mice')
self.saveCheckPoint()
for dfData, dfname in self.missingDatasets:
knn = KNNImputer(n_neighbors=5)
df_knn = dfData.copy()
X = df_knn.drop(self.name, axis=1)
y = df_knn[self.name]
X_knn = knn.fit_transform(X)
X_train, X_test, y_train, y_test = train_test_split(X_knn, y, test_size=self.ratio, random_state=seed, stratify=y)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
joblib.dump(scaler, '{}/{}-original/knn_StandardScaler_{}'.format(self.name, dfname, seed) )
self.run_dataset('{}-original'.format(dfname), X_train_scaled, X_test_scaled, y_train, y_test, seed, 'knn')
if not self.is_balanced(y):
ros = RandomOverSampler(random_state=seed)
Xr_train_scaled, yr_train = ros.fit_resample(X_train_scaled, y_train)
self.run_dataset('{}-oversampled'.format(dfname), Xr_train_scaled, X_test_scaled, yr_train, y_test, seed, 'knn')
self.saveCheckPoint()
self.bar.close()
def get_best_models(self):
return self.ledger.groupby(["Dataset", "Model"])["ROC_AUC"].agg(['mean', 'std'])
def explain_model(self, modelname=None, dataset=None, seed=None):
self.logger.info("{:=^60}".format(' Begin SHAP Explainer: {} {} {} '.format(modelname, dataset, seed)))
Xbase = self.noMissingDataset.drop(self.name, axis=1)
ybase = self.noMissingDataset[self.name]
X_1 = self.noMissingDataset[ybase == 1].drop(self.name, axis=1)
X_0 = self.noMissingDataset[ybase == 0].drop(self.name, axis=1)
X_raw_explain = pd.concat([X_1[:5], X_0[:5]], ignore_index=True)
self.logger.info("Model: {}".format(modelname))
self.logger.info("Seed: {}".format(seed))
pieces = dataset.split('-')
dataset = pieces[0]
sample = pieces[1]
self.logger.info("Dataset: {}".format(dataset))
self.logger.info("Sample: {}".format(sample))
if pieces[-1] in (['mice', 'knn']):
imputer = pieces[2]
scaler_path = "{}/{}-original/{}_StandardScaler".format(self.name,dataset, imputer)
model_path = "{}/{}-{}/{}_{}".format(self.name, dataset, sample, imputer, modelname)
if dataset == 'fulldataset':
X_na = self.missingDatasets[0][0].drop(self.name, axis=1)
y = self.missingDatasets[0][0][self.name]
else:
X_na = self.missingDatasets[1][0].drop(self.name, axis=1)
y = self.missingDatasets[1][0][self.name]
if imputer == 'knn':
knn = KNNImputer(n_neighbors=5)
X = knn.fit_transform(X_na)
else:
imputer = None
scaler_path = "{}/{}-original/StandardScaler".format(self.name, '-'.join(pieces[:-1]))
model_path = "{}/{}-{}/{}".format(self.name, dataset, sample, modelname)
X = self.noMissingDataset.drop(self.name, axis=1)
y = self.noMissingDataset[self.name]
all_shap_base_values = []
base_dim = []
all_shap_values = []
if imputer == 'mice':
mice = IterativeImputer(max_iter=10, random_state=seed)
X = mice.fit_transform(X_na)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=4, random_state=seed, stratify=y)
scaler = joblib.load('{}_{}'.format(scaler_path, seed))
model = joblib.load('{}_{}'.format(model_path, seed))
X_train = scaler.transform(X_train)
X_test = scaler.transform(X_test)
X_explain = scaler.transform(X_raw_explain)
X_model = scaler.transform(Xbase)
if not self.is_balanced(y):
ros = RandomOverSampler(random_state=seed)
X_train, y_train = ros.fit_resample(X_train, y_train)
# explainer_model = shap.Explainer(model)
# expected_value = explainer_model.expected_value
# if isinstance(expected_value, list):
# expected_value = expected_value[1]
# shap_values = explainer.shap_values(X_test)[1]
self.logger.info("Columns: {}".format(Xbase.columns))
eng_columns = ['sex', 'family hist', 'age diag', 'BMI', 'base glu', 'glu 120','HbA1c']
explainer = shap.Explainer(model.predict, X_train, seed=seed)
shap_values = explainer(X_model)
exp = shap.Explanation(shap_values,
data=X_model,
feature_names=eng_columns)
#
# shap.plots.initjs()
shap.plots.decision(exp.base_values[0], exp.values, features=eng_columns, show=False)
# shap.plots.force(exp.base_values, exp.values, feature_names=Xbase.columns, show=False)
# shap.plots.force(exp.base_values[0], exp.values[0, :], feature_names=Xbase.columns, matplotlib=True, show=False)
# shap.plots.force(expected_values[0], shap_values.values, Xbase.columns , show=False)
plt.title(r"{0}".format(modelname))
plt.savefig("{}/shap_{}_{}_{}.png".format(self.name, modelname, dataset, seed),dpi=150, bbox_inches='tight')
plt.close()
shap_values = explainer(X_explain)
exp = shap.Explanation(shap_values,
data=X_explain,
feature_names=eng_columns)
for i in range(5):
shap.plots.waterfall(exp[i], show=False)
plt.title(r"{0} $y_{{{1}}}=0$".format(modelname, i))
plt.savefig("{}/pos_{}_{}_{}_{}.png".format(self.name, i, modelname, dataset, seed),dpi=150, bbox_inches='tight')
plt.close()
for i in range(5, 10):
shap.plots.waterfall(exp[i], show=False)
plt.title(r"{0} $y_{{{1}}}=1$".format(modelname, i-5))
plt.savefig("{}/neg_{}_{}_{}_{}.png".format(self.name, i-5, modelname, dataset, seed),dpi=150, bbox_inches='tight')
plt.close()
def wrap_and_save(self):
self.logger.info("{:=^60}".format(' Saving Summary and Wrap the output in a ZipFile '))
with pd.ExcelWriter('{}/Summary.xlsx'.format(self.name) , engine='xlsxwriter') as xls:
self.get_best_models().to_excel(xls, sheet_name='Results')
with zipfile.ZipFile('{}-{}.zip'.format(self.name, self.start), 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, dirs, files in os.walk(self.name):
for file in files:
zipf.write(os.path.join(root, file))