import os import pandas as pd import numpy as np import matplotlib.pyplot as plt import pickle import logging from sklearn.preprocessing import MinMaxScaler #self.logger.basicConfig(level=self.logger.INFO, format='%(asctime)s - %(levelname)s - %(message)s') class GasSensorDataLoader: def __init__(self, label_file, force_overwrite=False, output_format="png", lower_limit=-0.01, threshold=0.7, target_list=['C2H2', 'CH4', 'C3H6', 'CO', 'C2H6', 'C3H8', 'C2H4', 'H2', 'O2'], source_channels=["MQ 8", "MQ 9", "MQ 135", "TGS 813", "TGS 821", "TGS 2600", "TGS 2602", "TGS 2611-0", "TGS 2612", "TGS 2620"], debug=False): self.label_file = label_file self.main_file = f"{self.label_file}/{self.label_file}.xlsx" self.data_folder = os.path.splitext(label_file)[0] self.state_file = f"{self.label_file}.pkl" self.lower_limit = lower_limit self.smooth = None self.data = None self.debug = debug self.threshold = threshold self.dataset = {} self.dataset['threshold'] = self.threshold self.dataset['range'] = {} self.samples = {} self.target_list = sorted(target_list) self.target = '_'.join(self.target_list) self.target_len = len(self.target_list) self.source_channels = sorted(source_channels) self.force_overwrite = force_overwrite self.output_format = output_format.lower() self.logger = logging.getLogger("GasSensorDataLoader") if self.debug: self.logger.setLevel(logging.DEBUG) else: self.logger.setLevel(logging.INFO) self.pics_folder = self.create_pics_folder() if self.output_format not in ["png", "pdf"]: raise ValueError("Invalid output format. Choose either 'png' or 'pdf'.") if not os.path.isdir(self.data_folder): raise FileNotFoundError(f"Data folder '{self.data_folder}' does not exist.") if not os.path.exists(self.main_file): raise FileNotFoundError(f"Main Excel file '{self.main_file}' not found.") if not isinstance(threshold, (int, float)) or not 0 <= threshold <= 1: raise ValueError("threshold must be a number between 0 and 1") if os.path.exists(self.state_file): if False:#not self.force_overwrite and not self._compare_state_with_main(): raise ValueError("State file differs from the main Excel file. Use 'force_overwrite=True' to overwrite.") else: self.logger.info(f"Init for {len(self.target_list)} targets => {self.target_list}") self.load_state() else: self.logger.info("State file not found. Loading dataset.") self.load_dataset() def _compare_state_with_main(self): try: existing_labels = pd.read_excel(self.main_file) with open(self.state_file, 'rb') as f: saved_data = pickle.load(f) saved_labels = pd.DataFrame([saved_data[key]['label'] for key in saved_data]) return existing_labels.equals(saved_labels) except Exception as e: self.logger.error(f"Error comparing state file: {e}") return False def reset(self): self.dataset = {} self.dataset['threshold'] = self.threshold self.dataset['range'] = {} if isinstance(self.target_list, list): self.target_list = sorted(self.target_list) elif isinstance(self.target_list, str): self.target_list = list(self.target_list) self.target = '_'.join(self.target_list) self.target_len = len(self.target_list) self.logger.info(f"Reset requested. Init for {len(self.target_list)} targets => {self.target}") if hasattr(self, "delta_data"): delattr(self, "delta_data") if hasattr(self, "scaled_data"): delattr(self, "scaled_data") self.init_minmax() self.stats() def load_dataset(self): self.logger.info("Loading dataset from Excel files.") labels = pd.read_excel(self.main_file) data_dict = {} samples, measuraments = 0, 0 for _, row in labels.iterrows(): file_path = os.path.join(self.data_folder, "Raw_data", '{}{}'.format(row['Raw_data'], '.xlsx')) if os.path.exists(file_path): self.logger.info(f"Loading data from {file_path}.") df = pd.read_excel(file_path, header=0, usecols=self.source_channels) # Ensure first row is used as column names data_dict[row['Raw_data']] = { 'label': row.to_dict(), 'data': df, 'sampleId': samples } samples += 1 measuraments += df.shape[0] else: raise FileNotFoundError(f"measurament file not found: {file_path}") self.data = data_dict self.save_state() self.logger.info("Dataset loaded. {} samples in {} measuraments".format(samples, measuraments)) def save_state(self): with open(self.state_file, 'wb') as f: pickle.dump(self.data, f) self.logger.info("State saved.") def load_state(self): with open(self.state_file, 'rb') as f: self.data = pickle.load(f) self.logger.info("State loaded.") def create_pics_folder(self): pics_folder = os.path.join(self.data_folder, "pics") if not os.path.exists(pics_folder): os.makedirs(pics_folder) self.logger.info(f"Created folder: {pics_folder}") return pics_folder def init_delta(self): self.logger.info("Initializing dataset delta values.") data_copy = {key: {'label': value['label'], 'sampleId': value['sampleId'], 'data': value['data'].copy()} for key, value in self.data.items()} if self.smooth == 'conv3': self.logger.info("Noise filter: Convolution [0.2, 0.6, 0.2] selected.") kernel = np.array([0.2, 0.6, 0.2]) for key in data_copy: tempdf = pd.DataFrame() for col in data_copy[key]['data'].columns: tempdf[col] = np.convolve(data_copy[key]['data'][col], kernel, mode='valid') data_copy[key]['data'] = tempdf.copy() lower_limit = pd.concat([data_copy[key]['data'] for key in data_copy], axis=0).max() * self.lower_limit self.logger.debug("Lower limit {}.".format(lower_limit)) for key in data_copy: data_copy[key]['data'] = data_copy[key]['data'] - data_copy[key]['data'].iloc[0] for column in data_copy[key]['data'].columns: data_copy[key]['data'][column] = data_copy[key]['data'][column].where(data_copy[key]['data'][column] >= lower_limit[column], other=lower_limit[column]) self.delta_data = data_copy def init_minmax(self): if not hasattr(self, 'delta_data'): self.init_delta() self.logger.info("Initializing dataset using MinMaxScaler.") concatenated_data = pd.concat([self.delta_data[key]['data'] for key in self.delta_data], axis=0) scaler = MinMaxScaler() scaler.fit(concatenated_data) self.scaled_data = {key: {'label': value['label'], 'sampleId': value['sampleId'], 'data': pd.DataFrame(scaler.transform(value['data']), columns=value['data'].columns)} for key, value in self.delta_data.items()} def plot_channel(self, data_instance, channel_name, save=False, tag="", title=""): self.logger.debug(f"{title}All measurament Sensor readings for: {channel_name}.") plt.figure(figsize=(12, 6)) for key in data_instance: if channel_name in data_instance[key]['data'].columns: plt.plot(data_instance[key]['data'][channel_name]) plt.xlabel("Time") plt.ylabel("Sensor Array Readings") plt.title(f"{title} Sensor Channel: {channel_name}") if save: filename = os.path.join(self.pics_folder, f"{channel_name}_{tag}.{self.output_format}") plt.savefig(filename, format=self.output_format) self.logger.info(f"Saved plot as {filename}") else: plt.show() plt.close() def plot_measurament(self, data_instance, measurament_name, save=False, tag="", title="", limits=[]): self.logger.debug(f"{title}All sensor readings for measurament: {measurament_name}.") if measurament_name in data_instance: data_instance[measurament_name]['data'].plot(figsize=(12, 6), title=f"{title} measurament: {measurament_name}") plt.xlabel("sample") plt.ylabel(f"{title} value") plt.legend(bbox_to_anchor=(0.95, 0.5), loc="center left") for xvalue, txtcolor in limits: plt.vlines(x=xvalue, ymin=0, ymax=1, colors=txtcolor) if save: filename = os.path.join(self.pics_folder, f"{measurament_name}_{tag}.{self.output_format}") plt.savefig(filename, format=self.output_format) self.logger.info(f"Saved plot as {filename}") else: plt.show() plt.close() def plotRawdata(self, save=True): self.logger.debug("Plotting raw data for all measuraments and channels.") for measurament in self.data: self.plot_measurament(self.data, measurament, save=save, tag="rawv", title="[V]") for channel in self.data[next(iter(self.data))]['data'].columns: self.plot_channel(self.data, channel, save=save, tag="rawv", title="[V]") def plotDeltadata(self, save=True): if not hasattr(self, 'delta_data'): self.init_delta() self.logger.debug("Plotting raw data for all measuraments and channels.") for measurament in self.delta_data: self.plot_measurament(self.delta_data, measurament, save=save, tag="delta", title="[$\\Delta V$] ") for channel in self.delta_data[next(iter(self.delta_data))]['data'].columns: self.plot_channel(self.delta_data, channel, save=save, tag="delta", title="[$\\Delta V$] ") def plotScaleddata(self, save=True): if not hasattr(self, 'scaled_data'): self.init_minmax() self.logger.debug("Plotting raw data for all measuraments and channels.") for measurament in self.scaled_data: self.plot_measurament(self.scaled_data, measurament, save=save, tag="scaled", title="[$\\Delta \\bar{V}$] ") for channel in self.scaled_data[next(iter(self.scaled_data))]['data'].columns: self.plot_channel(self.scaled_data, channel, save=save, tag="scaled", title="[$\\Delta \\bar{V}$] ") def plotScaledBoundaries(self, save=True): if not hasattr(self, 'scaled_data'): self.init_minmax() if self.smooth is None: tag = "raw" else: tag = "denoise" # tag = self.smooth self.logger.debug("Plotting raw data for all measuraments and channels.") for measurament in self.scaled_data: r, l, m = self.findIndicesAbovethreshold(self.scaled_data[measurament]['data']) self.plot_measurament(self.scaled_data, measurament, save=save, tag="{}_bound".format(tag), title=f"[$\\Delta \\bar{{V}}_{{{tag}}}$] ", limits=[(r, 'blue'), (l, 'blue'), (m, 'red')]) def findIndicesAbovethreshold(self, df): if not isinstance(df, pd.DataFrame): raise TypeError("Input must be a pandas DataFrame.") row_sums = df.sum(axis=1) threshold = row_sums.max() * self.threshold above_threshold_indices = row_sums[row_sums > threshold].index if not above_threshold_indices.empty: first_index = above_threshold_indices[0] last_index = above_threshold_indices[-1] return first_index, last_index, row_sums.idxmax() else: return None, None, None def load_dataset_window(self, ws): self.logger.info(f"Requested sample with threshold {self.threshold} and window size {ws}") if not hasattr(self, 'scaled_data'): self.init_minmax() if not hasattr(self, 'dataset'): self.logger.debug(f"Empty dataset") self.dataset = {} if 'threshold' in self.dataset: self.logger.debug(f"threshold in dataset") if self.dataset['threshold'] != self.threshold: self.logger.info(f"wrong threshold {self.dataset['threshold']} {self.threshold}") self.dataset = {} self.dataset['threshold'] = self.threshold self.dataset['range'] = {} self.stats() else: self.logger.debug(f"correct threshold {self.dataset['threshold']} {self.threshold}") else: self.dataset['threshold'] = self.threshold self.dataset['range'] = {} self.stats() self.logger.debug(f"No threshold {self.dataset['threshold']} {self.threshold}") if ws in self.dataset: return self.dataset[ws] g_output = np.empty((0, 1)) y_output = np.empty((0, self.target_len)) x_output = np.empty((0, ws, len(self.source_channels))) sample_size = self.min_sample - ws + 1 self.logger.info(f"Computing sample with threshold {self.threshold} and window size {ws}") for measurament, (r, l) in self.dataset['range'].items(): self.logger.debug('{} | {} | {}-{} | {}'.format(measurament, self.scaled_data[measurament]['data'].shape, r, l, self.scaled_data[measurament]['data'].iloc[r:l].shape)) x_sample, y_sample, g_sample = self.create_conv1d_dataset(measurament, r, l, ws) g_output = np.concatenate((g_output, g_sample)) x_output = np.concatenate((x_output, x_sample)) y_output = np.concatenate((y_output, y_sample)) target_scaler = MinMaxScaler() y_output = target_scaler.fit_transform(y_output) self.dataset[ws] = (x_output, y_output, g_output) return self.dataset[ws] def create_conv1d_dataset(self, measurament, r, l, window): X_data = self.scaled_data[measurament]['data'].iloc[r:l] Y_value = np.array([[self.scaled_data[measurament]['label'][key] for key in self.target_list]]) G_value = self.scaled_data[measurament]['sampleId'] total_samples = X_data.shape[0] - window + 1 sample_size = self.min_sample - window + 1 if sample_size > total_samples: self.logger.warn(f"sample_size ({sample_size}) exceeds available samples ({total_samples}). Using available samples.") sample_size = total_samples random_indices = np.random.choice(total_samples, size=sample_size, replace=False) g_sample = np.zeros((sample_size, 1)) y_sample = np.zeros((sample_size, self.target_len)) x_samples = np.zeros((sample_size, window, self.data_channels)) for i, index in enumerate(random_indices): x_samples[i] = X_data.iloc[index:index + window].values y_sample[i] = Y_value g_sample[i] = G_value return x_samples, y_sample, g_sample def load_dataset_xboost(self): self.logger.info(f"Requested sample with threshold {self.threshold} for xboost") if not hasattr(self, 'scaled_data'): self.init_minmax() if not hasattr(self, 'dataset'): self.logger.debug(f"Empty dataset") self.dataset = {} if 'threshold' in self.dataset: self.logger.debug(f"threshold in dataset") if self.dataset['threshold'] != self.threshold: self.logger.info(f"wrong threshold {self.dataset['threshold']} {self.threshold}") self.dataset = {} self.dataset['threshold'] = self.threshold self.dataset['range'] = {} self.stats() else: self.logger.debug(f"correct threshold {self.dataset['threshold']} {self.threshold}") else: self.dataset['threshold'] = self.threshold self.dataset['range'] = {} self.stats() self.logger.debug(f"No threshold {self.dataset['threshold']} {self.threshold}") if 'xboost' in self.dataset: return self.dataset['xboost'] g_output = np.empty((0, 1)) y_output = np.empty((0, self.target_len)) x_output = np.empty((0, self.data_channels)) self.logger.info(f"Computing sample with threshold {self.threshold} for xboost") for measurament, (r, l) in self.dataset['range'].items(): self.logger.debug('{} | {} | {}-{} | {}'.format(measurament, self.scaled_data[measurament]['data'].shape, r, l, self.scaled_data[measurament]['data'].iloc[r:l].shape)) x_sample, y_sample, g_sample = self.create_xboost_dataset(measurament, r, l) g_output = np.concatenate((g_output, g_sample)) x_output = np.concatenate((x_output, x_sample)) y_output = np.concatenate((y_output, y_sample)) target_scaler = MinMaxScaler() y_output = target_scaler.fit_transform(y_output) self.dataset['xboost'] = (x_output, y_output, g_output) return self.dataset['xboost'] def create_xboost_dataset(self, measurament, r, l): X_data = self.scaled_data[measurament]['data'].iloc[r:l] Y_value = np.array([[self.scaled_data[measurament]['label'][key] for key in self.target_list]]) G_value = self.scaled_data[measurament]['sampleId'] total_samples = X_data.shape[0] sample_size = self.min_sample self.logger.debug(f"{measurament}: ({total_samples}) values, sampling ({sample_size}). (l-r {l-r}) (l {l}) (r {r}).") if sample_size > total_samples: self.logger.warn(f"sample_size ({sample_size}) exceeds available samples ({total_samples}). Using available samples.") sample_size = total_samples random_indices = np.random.choice(total_samples, size=sample_size, replace=False) g_sample = np.zeros((sample_size, 1)) y_sample = np.zeros((sample_size, self.target_len)) x_samples = np.zeros((sample_size, self.data_channels)) for i, index in enumerate(random_indices): x_samples[i] = X_data.iloc[index] y_sample[i] = Y_value g_sample[i] = G_value return x_samples, y_sample, g_sample def stats(self): channel_columns = {} sample_columns = {} for key in self.data: for col in self.data[key]['data'].columns: if col in sample_columns: sample_columns[col].append(key) else: sample_columns[col] = [key] for col in self.data[key]['label']: if col in channel_columns: channel_columns[col].append(key) else: channel_columns[col] = [key] self.data_channels = len(set(sample_columns)) sorted_channel = sorted(channel_columns.items(), key=lambda x: len(x[1]), reverse=True) sorted_samples = sorted(sample_columns.items(), key=lambda x: len(x[1]), reverse=True) self.logger.debug("{:=^60}".format("CHANNELS")) for i, (col, keys) in enumerate(sorted_channel): self.logger.debug(f"{i} | {col}: {len(keys)}") self.logger.debug("{:=^60}".format("SAMPLES")) for i, (col, keys) in enumerate(sorted_samples): self.logger.debug(f"{i} | {col}: {len(keys)}") if not hasattr(self, 'scaled_data'): self.init_minmax() self.logger.debug("{:=^60}".format(f"DATASET SIZE for {self.threshold}")) valid_dataset = [] for measurament in self.scaled_data: r, l, m = self.findIndicesAbovethreshold(self.scaled_data[measurament]['data']) self.logger.debug(f"{measurament}: {l} - {r} = {l-r}") valid_dataset.append(l-r) self.dataset['range'][measurament] = (r,l) self.min_sample = np.min(valid_dataset) self.logger.info("{:=^60}".format(f"DATASET STATS for {self.threshold}")) self.logger.info(f"Min: {np.min(valid_dataset)}") self.logger.info(f"Max: {np.max(valid_dataset)}") self.logger.info(f"Mean: {np.mean(valid_dataset)}") self.logger.info(f"Median: {np.median(valid_dataset)}") # Example usage: # loader = GasSensorDataLoader("enose_dataset", output_format="png", threshold=0.9) # loader.threshold = 0.8 # loader.plotRawdata(save=True) # loader.plotDeltadata(save=True) # loader.plotScaledBoundaries(save=True) # # loader.threshold = 0.90, smooth=None # print(loader.load_dataset_window(128).shape) # loader.threshold = 0.85 # print(loader.load_dataset_window(128).shape) # loader.threshold = 0.80 # print(loader.load_dataset_window(128).shape) # loader.threshold = 0.75 # print(loader.load_dataset_window(128).shape)