CI2 : Régularisation et optimisation
Dans ce TP, vous allez explorer les techniques de régularisation, les optimizers avancés et les métriques de classification. Vous apprendrez également à créer votre propre dataset en PyTorch et à utiliser les fonctionnalités avancées des datasets. Enfin, vous utiliserez Tensorboard pour comparer les résultats et analyser vos approches.
Objectifs
- Comprendre et implémenter les régularisations L1 et L2.
- Observer les effets du weight decay sur l'entraînement.
- Analyser et interpréter les métriques de classification.
- Tester et comparer différents optimizers : ADAM, Momentum, AdaGrad, RMSProp.
- Créer et manipuler votre propre dataset en PyTorch.
- Utiliser Tensorboard pour suivre et comparer vos expériences.
Création d'un dataset personnalisé
Nous allons commencer à voir comment créer notre propre dataset en PyTorch. Pour cela, nous allons utiliser l'interface fournie et implémenter les méthodes manquantes.
Pour l'instant, on demande que le dataset retourne (à travers __get_item__) un dictionnaire composé des features au format numpy, et des labels au format numpy. Dans le __init__, on effectura les transformations suivantes à l'aide de sklearn :
- Supprimer les duplicats
- Supprimer la colonne id
- Transformer les colonnes catégorielles avec un one-hot encoding ("gender", "cholesterol", "gluc")
- Normalisation des features
class CardioDataset(Dataset): def __init__(self): df = pd.read_csv("cardio_train.csv", sep=";") df = df.drop_duplicates().drop("id", axis=1) categorical_features = ["gender", "cholesterol", "gluc"] for feature in categorical_features: one_hot = pd.get_dummies(df[feature], dtype=np.float32) one_hot.columns = [feature + "_" + str(c) for c in one_hot.columns] df.drop(feature, axis=1, inplace=True) df = pd.concat([df, one_hot], axis=1) print(len(df.columns)) print(list(df.columns)) features_names = list(df.columns) features_names.remove("cardio") self.features = df[features_names] scaler = StandardScaler() self.features = scaler.fit_transform(self.features).astype(np.float32) self.labels = df["cardio"].to_numpy().astype(int).reshape(-1, 1) def __len__(self): return len(self.features) def __getitem__(self, idx): if torch.is_tensor(idx): idx = idx.tolist() local_features = self.features[idx] local_labels = self.labels[idx] res = {"features": local_features, "labels": local_labels} return res
if __name__ == '__main__': for i, fl in enumerate(CardioDataset()): if i == 5: break print(fl["features"], fl["labels"])
if __name__ == '__main__': dataset = CardioDataset() data_loader = DataLoader(dataset, batch_size=32, shuffle=True) for i, fl in enumerate(data_loader): print(fl["features"], fl["labels"]) print(fl["features"].shape, fl["labels"].shape) break
train, val, test = random_split(dataset, [0.8, 0.1, 0.1]) print(len(train), len(val), len(test)) train_loader = DataLoader(train, batch_size=32, shuffle=True) val_loader = DataLoader(val, batch_size=32, shuffle=True) test_loader = DataLoader(test, batch_size=32, shuffle=True)
Régularisation L1 et L2
import torch.nn as nn class MyModel(nn.Module): def __init__(self, input_size, hidden_size, output_size, n_layers): super(MyModel, self).__init__() layers = [nn.Linear(input_size, hidden_size)] layers.append(nn.ReLU()) for _ in range(n_layers - 1): layers.append(nn.Linear(hidden_size, hidden_size)) layers.append(nn.ReLU()) layers.append(nn.Linear(hidden_size, output_size)) layers.append(nn.Sigmoid()) self.model = nn.Sequential(*layers) def forward(self, x): return self.model(x) model = MyModel(dataset.features.shape[1], 10000, 1,1)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") dataset = CardioDataset() train, val, test = random_split(dataset, [0.8, 0.1, 0.1]) print(len(train), len(val), len(test)) model = MyModel(dataset.features.shape[1], 10000, 1,1).to(device) print(model) train_loader = DataLoader(train, batch_size=32, shuffle=True) val_loader = DataLoader(val, batch_size=32, shuffle=True) test_loader = DataLoader(test, batch_size=32, shuffle=True) print(len(train_loader), len(val_loader), len(test_loader)) optimizer = torch.optim.SGD(model.parameters(), lr=0.001) criterion = nn.BCELoss() l1_lambda = 0 l2_lambda = 1e-4 for epoch in range(200): running_loss = 0.0 total_l2_loss = 0.0 model.train() correct = 0 total = 0 for batch in train_loader: inputs = batch["features"].to(device) targets = batch["labels"].to(device) optimizer.zero_grad() outputs = model(inputs) loss = criterion(outputs, targets.float()) l1_loss = sum(p.abs().sum() for p in model.parameters()) l2_loss = sum(p.pow(2).sum() for p in model.parameters()) total_loss = loss + l1_lambda * l1_loss + l2_lambda * l2_loss total_loss.backward() optimizer.step() running_loss += loss.item() total_l2_loss += l2_loss predicted = (outputs > 0.5).float() total += targets.size(0) correct += (predicted == targets).sum().item() metric = correct / total print(f"Accuracy Train : {metric:.2f}") print(f"Époque {epoch + 1}, Train Perte : {running_loss / len(train_loader)}") print(f"Époque {epoch + 1}, L2 loss : {total_l2_loss / len(train_loader)}") model.eval() running_loss_val = 0.0 with torch.no_grad(): correct = 0 total = 0 for batch in val_loader: inputs = batch["features"].to(device) targets = batch["labels"].to(device) outputs = model(inputs) loss = criterion(outputs, targets.float()) running_loss_val += loss.item() predicted = (outputs > 0.5).float() total += targets.size(0) correct += (predicted == targets).sum().item() metric = correct / total print(f"Accuracy Val : {metric:.2f}") print(f"Époque {epoch + 1}, Val Perte : {running_loss_val / len(val_loader)}")
Comparaison des optimizers
Nous voulons maintenant comparer les performances de différents optimizers sur notre dataset.
optimizers = { "SGD": torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9), "ADAM": torch.optim.Adam(model.parameters(), lr=0.01), "RMSProp": torch.optim.RMSprop(model.parameters(), lr=0.01), "AdaGrad": torch.optim.Adagrad(model.parameters(), lr=0.01) } for name, optimizer in optimizers.items(): print(f"Training with {name}") # Re-entraînez le modèle avec cet optimizer et tracez les résultats.
Analyse des métriques
Finalement, nous voulons calculer et analyser les différentes métriques de classification.
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score # Exemple d'utilisation avec des données factices true_labels = [0, 1, 0, 1, 1] predicted_probs = [0.2, 0.8, 0.1, 0.6, 0.9] predicted_labels = [int(p > 0.5) for p in predicted_probs] precision = precision_score(true_labels, predicted_labels) recall = recall_score(true_labels, predicted_labels) f1 = f1_score(true_labels, predicted_labels) auc = roc_auc_score(true_labels, predicted_probs) print(f"Precision: {precision}, Recall: {recall}, F1: {f1}, AUC: {auc}")
import torch from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score from sklearn.preprocessing import StandardScaler from torch.utils.data import Dataset, DataLoader, random_split import pandas as pd import torch.nn as nn import numpy as np from torch.utils.tensorboard import SummaryWriter from tqdm import tqdm class CardioDataset(Dataset): def __init__(self): df = pd.read_csv("cardio_train.csv", sep=";") df = df.drop_duplicates().drop("id", axis=1) categorical_features = ["gender", "cholesterol", "gluc"] for feature in categorical_features: one_hot = pd.get_dummies(df[feature], dtype=np.float32) one_hot.columns = [feature + "_" + str(c) for c in one_hot.columns] df.drop(feature, axis=1, inplace=True) df = pd.concat([df, one_hot], axis=1) print(len(df.columns)) print(list(df.columns)) features_names = list(df.columns) features_names.remove("cardio") self.features = df[features_names] scaler = StandardScaler() self.features = scaler.fit_transform(self.features).astype(np.float32) self.labels = df["cardio"].to_numpy().astype(int).reshape(-1, 1) def __len__(self): return len(self.features) def __getitem__(self, idx): if torch.is_tensor(idx): idx = idx.tolist() local_features = self.features[idx] local_labels = self.labels[idx] res = {"features": local_features, "labels": local_labels} return res class MyModel(nn.Module): def __init__(self, input_size, hidden_size, output_size, n_layers): super(MyModel, self).__init__() layers = [nn.Linear(input_size, hidden_size)] layers.append(nn.ReLU()) for _ in range(n_layers - 1): layers.append(nn.Linear(hidden_size, hidden_size)) layers.append(nn.ReLU()) layers.append(nn.Linear(hidden_size, output_size)) layers.append(nn.Sigmoid()) self.model = nn.Sequential(*layers) def forward(self, x): return self.model(x) if __name__ == '__main__': device = torch.device("cuda" if torch.cuda.is_available() else "cpu") HIDDEN_SIZE = 1000 N_LAYERS = 1 BATCH_SIZE = 64 LR = 0.001 dataset = CardioDataset() train, val, test = random_split(dataset, [0.8, 0.1, 0.1]) print(len(train), len(val), len(test)) model = MyModel(dataset.features.shape[1], HIDDEN_SIZE, 1,N_LAYERS).to(device) print(model) train_loader = DataLoader(train, batch_size=BATCH_SIZE, shuffle=True) val_loader = DataLoader(val, batch_size=BATCH_SIZE, shuffle=True) test_loader = DataLoader(test, batch_size=BATCH_SIZE, shuffle=True) print(len(train_loader), len(val_loader), len(test_loader)) optimizer = torch.optim.SGD(model.parameters(), lr=LR) criterion = nn.BCELoss() l1_lambda = 0 l2_lambda = 1e-4 writer = SummaryWriter('runs/cardio') for epoch in tqdm(list(range(200))): running_loss = 0.0 total_l2_loss = 0.0 model.train() correct = 0 total = 0 for batch in train_loader: inputs = batch["features"].to(device) targets = batch["labels"].to(device) optimizer.zero_grad() outputs = model(inputs) loss = criterion(outputs, targets.float()) l1_loss = sum(p.abs().sum() for p in model.parameters()) l2_loss = sum(p.pow(2).sum() for p in model.parameters()) total_loss = loss + l1_lambda * l1_loss + l2_lambda * l2_loss total_loss.backward() optimizer.step() running_loss += loss.item() total_l2_loss += l2_loss predicted = (outputs > 0.5).float() total += targets.size(0) correct += (predicted == targets).sum().item() accuracy = correct / total writer.add_scalar('train/accuracy', accuracy, epoch) writer.add_scalar("train/loss", running_loss / len(train_loader), epoch) writer.add_scalar("train/l2_loss", total_l2_loss / len(train_loader), epoch) model.eval() running_loss_val = 0.0 precision = 0 recall = 0 f1 = 0 auc = 0 with torch.no_grad(): correct = 0 total = 0 for batch in val_loader: inputs = batch["features"].to(device) targets = batch["labels"].to(device) outputs = model(inputs) loss = criterion(outputs, targets.float()) running_loss_val += loss.item() predicted = (outputs > 0.5).float() total += targets.size(0) correct += (predicted == targets).sum().item() targets = targets.cpu().numpy() predicted = predicted.cpu().numpy() precision += precision_score(targets, predicted) recall = recall_score(targets, predicted) f1 = f1_score(targets, predicted) auc = roc_auc_score(targets, predicted) accuracy = correct / total writer.add_scalar("val/accuracy", accuracy, epoch) writer.add_scalar("val/loss", running_loss_val / len(val_loader), epoch) writer.add_scalar("val/precision", precision, epoch) writer.add_scalar("val/recall", recall, epoch) writer.add_scalar("val/f1", f1, epoch) writer.add_scalar("val/auc", auc, epoch) model.eval() running_loss_val = 0.0 precision = 0 recall = 0 f1 = 0 auc = 0 with torch.no_grad(): correct = 0 total = 0 for batch in test_loader: inputs = batch["features"].to(device) targets = batch["labels"].to(device) outputs = model(inputs) loss = criterion(outputs, targets.float()) running_loss_val += loss.item() predicted = (outputs > 0.5).float() total += targets.size(0) correct += (predicted == targets).sum().item() targets = targets.cpu().numpy() predicted = predicted.cpu().numpy() precision += precision_score(targets, predicted) recall = recall_score(targets, predicted) f1 = f1_score(targets, predicted) auc = roc_auc_score(targets, predicted) accuracy = correct / total hparams_dict = {"lr": LR, "hidden_size": HIDDEN_SIZE, "n_layers": N_LAYERS, "batch_size": BATCH_SIZE, "L1": l1_lambda, "L2": l2_lambda, } value_dict = { "accuracy": accuracy, "loss": running_loss_val / len(test_loader), "precision": precision, "recall": recall, "f1": f1, "auc": auc } writer.add_hparams(hparams_dict, value_dict) writer.close()