enose_2025/LoaderClass.py

435 lines
19 KiB
Python

import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import pickle
import logging
from sklearn.preprocessing import MinMaxScaler
#self.logger.basicConfig(level=self.logger.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class GasSensorDataLoader:
def __init__(self, label_file, force_overwrite=False, output_format="png", lower_limit=-0.01, threshold=0.7, target_list=['C2H2', 'CH4', 'C3H6', 'CO', 'C2H6', 'C3H8', 'C2H4', 'H2', 'O2'], source_channels=["MQ 8", "MQ 9", "MQ 135", "TGS 813", "TGS 821", "TGS 2600", "TGS 2602", "TGS 2611-0", "TGS 2612", "TGS 2620"], debug=False):
self.label_file = label_file
self.main_file = f"{self.label_file}/{self.label_file}.xlsx"
self.data_folder = os.path.splitext(label_file)[0]
self.state_file = f"{self.label_file}.pkl"
self.lower_limit = lower_limit
self.data = None
self.debug = debug
self.threshold = threshold
self.dataset = {}
self.dataset['threshold'] = self.threshold
self.dataset['range'] = {}
self.samples = {}
self.target_list = sorted(target_list)
self.target_len = len(self.target_list)
self.source_channels = sorted(source_channels)
self.force_overwrite = force_overwrite
self.output_format = output_format.lower()
self.pics_folder = self.create_pics_folder()
self.logger = logging.getLogger("GasSensorDataLoader")
if self.debug:
self.logger.setLevel(logging.DEBUG)
else:
self.logger.setLevel(logging.INFO)
if self.output_format not in ["png", "pdf"]:
raise ValueError("Invalid output format. Choose either 'png' or 'pdf'.")
if not os.path.isdir(self.data_folder):
raise FileNotFoundError(f"Data folder '{self.data_folder}' does not exist.")
if not os.path.exists(self.main_file):
raise FileNotFoundError(f"Main Excel file '{self.main_file}' not found.")
if not isinstance(threshold, (int, float)) or not 0 <= threshold <= 1:
raise ValueError("threshold must be a number between 0 and 1")
if os.path.exists(self.state_file):
if False:#not self.force_overwrite and not self._compare_state_with_main():
raise ValueError("State file differs from the main Excel file. Use 'force_overwrite=True' to overwrite.")
else:
self.load_state()
else:
self.logger.info("State file not found. Loading dataset.")
self.load_dataset()
def _compare_state_with_main(self):
try:
existing_labels = pd.read_excel(self.main_file)
with open(self.state_file, 'rb') as f:
saved_data = pickle.load(f)
saved_labels = pd.DataFrame([saved_data[key]['label'] for key in saved_data])
return existing_labels.equals(saved_labels)
except Exception as e:
self.logger.error(f"Error comparing state file: {e}")
return False
def load_dataset(self):
self.logger.info("Loading dataset from Excel files.")
labels = pd.read_excel(self.main_file)
data_dict = {}
samples, measuraments = 0, 0
for _, row in labels.iterrows():
file_path = os.path.join(self.data_folder, "Raw_data", '{}{}'.format(row['Raw_data'], '.xlsx'))
if os.path.exists(file_path):
self.logger.info(f"Loading data from {file_path}.")
df = pd.read_excel(file_path, header=0, usecols=self.source_channels) # Ensure first row is used as column names
data_dict[row['Raw_data']] = {
'label': row.to_dict(),
'data': df,
'sampleId': samples
}
samples += 1
measuraments += df.shape[0]
else:
raise FileNotFoundError(f"measurament file not found: {file_path}")
self.data = data_dict
self.save_state()
self.logger.info("Dataset loaded. {} samples in {} measuraments".format(samples, measuraments))
def save_state(self):
with open(self.state_file, 'wb') as f:
pickle.dump(self.data, f)
self.logger.info("State saved.")
def load_state(self):
with open(self.state_file, 'rb') as f:
self.data = pickle.load(f)
self.logger.info("State loaded.")
def create_pics_folder(self):
pics_folder = os.path.join(self.data_folder, "pics")
if not os.path.exists(pics_folder):
os.makedirs(pics_folder)
self.logger.info(f"Created folder: {pics_folder}")
return pics_folder
def init_delta(self):
self.logger.info("Initializing dataset delta values.")
data_copy = {key: {'label': value['label'], 'sampleId': value['sampleId'], 'data': value['data'].copy()} for key, value in self.data.items()}
lower_limit = pd.concat([data_copy[key]['data'] for key in data_copy], axis=0).max() * self.lower_limit
self.logger.debug("Lower limit {}.".format(lower_limit))
for key in data_copy:
data_copy[key]['data'] = data_copy[key]['data'] - data_copy[key]['data'].iloc[0]
for column in data_copy[key]['data'].columns:
data_copy[key]['data'][column] = data_copy[key]['data'][column].where(data_copy[key]['data'][column] >= lower_limit[column], other=lower_limit[column])
self.delta_data = data_copy
def init_minmax(self):
if not hasattr(self, 'delta_data'):
self.init_delta()
self.logger.info("Initializing dataset using MinMaxScaler.")
concatenated_data = pd.concat([self.delta_data[key]['data'] for key in self.delta_data], axis=0)
scaler = MinMaxScaler()
scaler.fit(concatenated_data)
self.scaled_data = {key: {'label': value['label'], 'sampleId': value['sampleId'], 'data': pd.DataFrame(scaler.transform(value['data']), columns=value['data'].columns)} for key, value in self.delta_data.items()}
def plot_channel(self, data_instance, channel_name, save=False, tag="", title=""):
self.logger.debug(f"{title}All measurament Sensor readings for: {channel_name}.")
plt.figure(figsize=(12, 6))
for key in data_instance:
if channel_name in data_instance[key]['data'].columns:
plt.plot(data_instance[key]['data'][channel_name])
plt.xlabel("Time")
plt.ylabel("Sensor Reading")
plt.title(f"{title} Sensor Channel: {channel_name}")
if save:
filename = os.path.join(self.pics_folder, f"{channel_name}_{tag}.{self.output_format}")
plt.savefig(filename, format=self.output_format)
self.logger.info(f"Saved plot as {filename}")
else:
plt.show()
plt.close()
def plot_measurament(self, data_instance, measurament_name, save=False, tag="", title="", limits=[]):
self.logger.debug(f"{title}All sensor readings for measurament: {measurament_name}.")
if measurament_name in data_instance:
data_instance[measurament_name]['data'].plot(figsize=(12, 6), title=f"{title} measurament: {measurament_name}")
plt.xlabel("Time")
plt.ylabel("Sensor Readings")
plt.legend(bbox_to_anchor=(0.95, 0.5), loc="center left")
for xvalue, txtcolor in limits:
plt.vlines(x=xvalue, ymin=0, ymax=1, colors=txtcolor)
if save:
filename = os.path.join(self.pics_folder, f"{measurament_name}_{tag}.{self.output_format}")
plt.savefig(filename, format=self.output_format)
self.logger.info(f"Saved plot as {filename}")
else:
plt.show()
plt.close()
def plotRawdata(self, save=True):
self.logger.debug("Plotting raw data for all measuraments and channels.")
for measurament in self.data:
self.plot_measurament(self.data, measurament, save=save, tag="raw", title="[Original] ")
for channel in self.data[next(iter(self.data))]['data'].columns:
self.plot_channel(self.data, channel, save=save, tag="raw", title="[Original] ")
def plotDeltadata(self, save=True):
if not hasattr(self, 'delta_data'):
self.init_delta()
self.logger.debug("Plotting raw data for all measuraments and channels.")
for measurament in self.delta_data:
self.plot_measurament(self.delta_data, measurament, save=save, tag="delta", title="[$\Delta V$] ")
for channel in self.delta_data[next(iter(self.delta_data))]['data'].columns:
self.plot_channel(self.delta_data, channel, save=save, tag="delta", title="[$\delta V$] ")
def plotScaleddata(self, save=True):
if not hasattr(self, 'scaled_data'):
self.init_minmax()
self.logger.debug("Plotting raw data for all measuraments and channels.")
for measurament in self.scaled_data:
self.plot_measurament(self.scaled_data, measurament, save=save, tag="scaled", title="[Scaled] ")
for channel in self.scaled_data[next(iter(self.scaled_data))]['data'].columns:
self.plot_channel(self.scaled_data, channel, save=save, tag="scaled", title="[Scaled] ")
def plotScaledBoundaries(self, save=True):
if not hasattr(self, 'scaled_data'):
self.init_minmax()
self.logger.debug("Plotting raw data for all measuraments and channels.")
for measurament in self.scaled_data:
r, l, m = self.findIndicesAbovethreshold(self.scaled_data[measurament]['data'])
self.plot_measurament(self.scaled_data, measurament, save=save, tag="train", title=f"[Interval {self.threshold} max] ", limits=[(r, 'blue'), (l, 'blue'), (m, 'red')])
def findIndicesAbovethreshold(self, df):
if not isinstance(df, pd.DataFrame):
raise TypeError("Input must be a pandas DataFrame.")
row_sums = df.sum(axis=1)
threshold = row_sums.max() * self.threshold
above_threshold_indices = row_sums[row_sums > threshold].index
if not above_threshold_indices.empty:
first_index = above_threshold_indices[0]
last_index = above_threshold_indices[-1]
return first_index, last_index, row_sums.idxmax()
else:
return None, None, None
def load_dataset_window(self, ws):
self.logger.info(f"Requested sample with threshold {self.threshold} and window size {ws}")
if not hasattr(self, 'scaled_data'):
self.init_minmax()
if not hasattr(self, 'dataset'):
self.logger.debug(f"Empty dataset")
self.dataset = {}
if 'threshold' in self.dataset:
self.logger.debug(f"threshold in dataset")
if self.dataset['threshold'] != self.threshold:
self.logger.info(f"wrong threshold {self.dataset['threshold']} {self.threshold}")
self.dataset = {}
self.dataset['threshold'] = self.threshold
self.dataset['range'] = {}
self.stats()
else:
self.logger.debug(f"correct threshold {self.dataset['threshold']} {self.threshold}")
else:
self.dataset['threshold'] = self.threshold
self.dataset['range'] = {}
self.stats()
self.logger.debug(f"No threshold {self.dataset['threshold']} {self.threshold}")
if ws in self.dataset:
return self.dataset[ws]
g_output = np.empty((0, 1))
y_output = np.empty((0, self.target_len))
x_output = np.empty((0, ws, len(self.source_channels)))
sample_size = self.min_sample - ws + 1
self.logger.info(f"Computing sample with threshold {self.threshold} and window size {ws}")
for measurament, (r, l) in self.dataset['range'].items():
self.logger.debug('{} | {} | {}-{} | {}'.format(measurament, self.scaled_data[measurament]['data'].shape, r, l, self.scaled_data[measurament]['data'].iloc[r:l].shape))
x_sample, y_sample, g_sample = self.create_conv1d_dataset(measurament, r, l, ws)
g_output = np.concatenate((g_output, g_sample))
x_output = np.concatenate((x_output, x_sample))
y_output = np.concatenate((y_output, y_sample))
self.dataset[ws] = (x_output, y_output, g_output)
return self.dataset[ws]
def load_dataset_xboost(self):
self.logger.info(f"Requested sample with threshold {self.threshold} for xboost")
if not hasattr(self, 'scaled_data'):
self.init_minmax()
if not hasattr(self, 'dataset'):
self.logger.debug(f"Empty dataset")
self.dataset = {}
if 'threshold' in self.dataset:
self.logger.debug(f"threshold in dataset")
if self.dataset['threshold'] != self.threshold:
self.logger.info(f"wrong threshold {self.dataset['threshold']} {self.threshold}")
self.dataset = {}
self.dataset['threshold'] = self.threshold
self.dataset['range'] = {}
self.stats()
else:
self.logger.debug(f"correct threshold {self.dataset['threshold']} {self.threshold}")
else:
self.dataset['threshold'] = self.threshold
self.dataset['range'] = {}
self.stats()
self.logger.debug(f"No threshold {self.dataset['threshold']} {self.threshold}")
if 'xboost' in self.dataset:
return self.dataset['xboost']
g_output = np.empty((0, 1))
y_output = np.empty((0, self.target_len))
x_output = np.empty((0, self.data_channels))
self.logger.info(f"Computing sample with threshold {self.threshold} for xboost")
for measurament, (r, l) in self.dataset['range'].items():
self.logger.debug('{} | {} | {}-{} | {}'.format(measurament, self.scaled_data[measurament]['data'].shape, r, l, self.scaled_data[measurament]['data'].iloc[r:l].shape))
x_sample, y_sample, g_sample = self.create_xboost_dataset(measurament, r, l)
g_output = np.concatenate((g_output, g_sample))
x_output = np.concatenate((x_output, x_sample))
y_output = np.concatenate((y_output, y_sample))
self.dataset['xboost'] = (x_output, y_output, g_output)
return self.dataset['xboost']
def create_xboost_dataset(self, measurament, r, l):
X_data = self.scaled_data[measurament]['data'].iloc[r:l]
Y_value = np.array([[self.scaled_data[measurament]['label'][key] for key in self.target_list]])
G_value = self.scaled_data[measurament]['sampleId']
total_samples = X_data.shape[0]
sample_size = self.min_sample
self.logger.debug(f"{measurament}: ({total_samples}) values, sampling ({sample_size}). (l-r {l-r}) (l {l}) (r {r}).")
if sample_size > total_samples:
self.logger.warn(f"sample_size ({sample_size}) exceeds available samples ({total_samples}). Using available samples.")
sample_size = total_samples
random_indices = np.random.choice(total_samples, size=sample_size, replace=False)
g_sample = np.zeros((sample_size, 1))
y_sample = np.zeros((sample_size, self.target_len))
x_samples = np.zeros((sample_size, self.data_channels))
for i, index in enumerate(random_indices):
x_samples[i] = X_data.iloc[index]
y_sample[i] = Y_value
g_sample[i] = G_value
return x_samples, y_sample, g_sample
def create_conv1d_dataset(self, measurament, r, l, window):
X_data = self.scaled_data[measurament]['data'].iloc[r:l]
Y_value = np.array([[self.scaled_data[measurament]['label'][key] for key in self.target_list]])
G_value = self.scaled_data[measurament]['sampleId']
total_samples = X_data.shape[0] - window + 1
sample_size = self.min_sample - window + 1
if sample_size > total_samples:
self.logger.warn(f"sample_size ({sample_size}) exceeds available samples ({total_samples}). Using available samples.")
sample_size = total_samples
random_indices = np.random.choice(total_samples, size=sample_size, replace=False)
g_sample = np.zeros((sample_size, 1))
y_sample = np.zeros((sample_size, self.target_len))
x_samples = np.zeros((sample_size, window, self.data_channels))
for i, index in enumerate(random_indices):
x_samples[i] = X_data.iloc[index:index + window].values
y_sample[i] = Y_value
g_sample[i] = G_value
return x_samples, y_sample, g_sample
def stats(self):
channel_columns = {}
sample_columns = {}
for key in self.data:
for col in self.data[key]['data'].columns:
if col in sample_columns:
sample_columns[col].append(key)
else:
sample_columns[col] = [key]
for col in self.data[key]['label']:
if col in channel_columns:
channel_columns[col].append(key)
else:
channel_columns[col] = [key]
self.data_channels = len(set(sample_columns))
sorted_channel = sorted(channel_columns.items(), key=lambda x: len(x[1]), reverse=True)
sorted_samples = sorted(sample_columns.items(), key=lambda x: len(x[1]), reverse=True)
self.logger.debug("{:=^60}".format("CHANNELS"))
for i, (col, keys) in enumerate(sorted_channel):
self.logger.debug(f"{i} | {col}: {len(keys)}")
self.logger.debug("{:=^60}".format("SAMPLES"))
for i, (col, keys) in enumerate(sorted_samples):
self.logger.debug(f"{i} | {col}: {len(keys)}")
if not hasattr(self, 'scaled_data'):
self.init_minmax()
self.logger.debug("{:=^60}".format(f"DATASET SIZE for {self.threshold}"))
valid_dataset = []
for measurament in self.scaled_data:
r, l, m = self.findIndicesAbovethreshold(self.scaled_data[measurament]['data'])
self.logger.debug(f"{measurament}: {l} - {r} = {l-r}")
valid_dataset.append(l-r)
self.dataset['range'][measurament] = (r,l)
self.min_sample = np.min(valid_dataset)
self.logger.info("{:=^60}".format(f"DATASET STATS for {self.threshold}"))
self.logger.info(f"Min: {np.min(valid_dataset)}")
self.logger.info(f"Max: {np.max(valid_dataset)}")
self.logger.info(f"Mean: {np.mean(valid_dataset)}")
self.logger.info(f"Median: {np.median(valid_dataset)}")
# Example usage:
# loader = GasSensorDataLoader("enose_dataset", output_format="png", threshold=0.9)
# loader.threshold = 0.8
# loader.plotRawdata(save=True)
# loader.plotDeltadata(save=True)
# loader.plotScaledBoundaries(save=True)
# # loader.threshold = 0.90
# print(loader.load_dataset_window(128).shape)
# loader.threshold = 0.85
# print(loader.load_dataset_window(128).shape)
# loader.threshold = 0.80
# print(loader.load_dataset_window(128).shape)
# loader.threshold = 0.75
# print(loader.load_dataset_window(128).shape)