CSC 8607 – Introduction au deep learning

Portail informatique

CI2 : Régularisation et optimisation

Dans ce TP, vous allez explorer les techniques de régularisation, les optimizers avancés et les métriques de classification. Vous apprendrez également à créer votre propre dataset en PyTorch et à utiliser les fonctionnalités avancées des datasets. Enfin, vous utiliserez Tensorboard pour comparer les résultats et analyser vos approches.

Objectifs

  • Comprendre et implémenter les régularisations L1 et L2.
  • Observer les effets du weight decay sur l'entraînement.
  • Analyser et interpréter les métriques de classification.
  • Tester et comparer différents optimizers : ADAM, Momentum, AdaGrad, RMSProp.
  • Créer et manipuler votre propre dataset en PyTorch.
  • Utiliser Tensorboard pour suivre et comparer vos expériences.

Création d'un dataset personnalisé

Nous allons commencer à voir comment créer notre propre dataset en PyTorch. Pour cela, nous allons utiliser l'interface fournie et implémenter les méthodes manquantes.

Implémentez une classe PyTorch pour charger un dataset simple. Dans cette exercice, nous vous demandons d'utiliser le Cardiovascular Disease dataset disponible sur Kaggle. Téléchargez-le et mettez le dans le répertoire de développement. Ensuire, créez le dataset en vous assurant que votre classe hérite de torch.utils.data.Dataset (suivez les instructions sur la documentation de PyTorch).
Pour l'instant, on demande que le dataset retourne (à travers __get_item__) un dictionnaire composé des features au format numpy, et des labels au format numpy. Dans le __init__, on effectura les transformations suivantes à l'aide de sklearn :
  • Supprimer les duplicats
  • Supprimer la colonne id
  • Transformer les colonnes catégorielles avec un one-hot encoding ("gender", "cholesterol", "gluc")
  • Normalisation des features
class CardioDataset(Dataset): def __init__(self): df = pd.read_csv("cardio_train.csv", sep=";") df = df.drop_duplicates().drop("id", axis=1) categorical_features = ["gender", "cholesterol", "gluc"] for feature in categorical_features: one_hot = pd.get_dummies(df[feature], dtype=np.float32) one_hot.columns = [feature + "_" + str(c) for c in one_hot.columns] df.drop(feature, axis=1, inplace=True) df = pd.concat([df, one_hot], axis=1) print(len(df.columns)) print(list(df.columns)) features_names = list(df.columns) features_names.remove("cardio") self.features = df[features_names] scaler = StandardScaler() self.features = scaler.fit_transform(self.features).astype(np.float32) self.labels = df["cardio"].to_numpy().astype(int).reshape(-1, 1) def __len__(self): return len(self.features) def __getitem__(self, idx): if torch.is_tensor(idx): idx = idx.tolist() local_features = self.features[idx] local_labels = self.labels[idx] res = {"features": local_features, "labels": local_labels} return res
Quand on manipule de larges jeux de données ou que les accès aléatoires sont coûteux, la classe Dataset n'est plus adaptée car elle demande que l'on charge tout en mémoire. On préfèrera alors la classe IterableDataset.

Écrivez une fonction main permettant de tester que vous pouvez bien itérer sur le dataset.
if __name__ == '__main__': for i, fl in enumerate(CardioDataset()): if i == 5: break print(fl["features"], fl["labels"])

Modifiez la fonction main et chargez ce dataset avec un DataLoader en utilisant torch.utils.data.DataLoader. Expérimentez avec différentes tailles de batch. Quel paramètre de la classe DataLoader permet de mélanger les données ? Quel est le type des données en sortie du data loader ?
if __name__ == '__main__': dataset = CardioDataset() data_loader = DataLoader(dataset, batch_size=32, shuffle=True) for i, fl in enumerate(data_loader): print(fl["features"], fl["labels"]) print(fl["features"].shape, fl["labels"].shape) break

Dans la fonction main, séparer le dataset en trois sous-parties pour le train/val/test. On pourra utiliser la fonction random_split de PyTorch. Ajoutez également un data loader pour chaque split.
train, val, test = random_split(dataset, [0.8, 0.1, 0.1]) print(len(train), len(val), len(test)) train_loader = DataLoader(train, batch_size=32, shuffle=True) val_loader = DataLoader(val, batch_size=32, shuffle=True) test_loader = DataLoader(test, batch_size=32, shuffle=True)

Régularisation L1 et L2

Créez un perceptron multi-couche avec PyTorch pour résoudre un problème de classification binaire. Notre MLP prendra en entrée la taille de l'entrée, la taille de la sortie, la taille des couches cachées, et le nombre de couches cachées. On pourra utiliser nn.Sequential pour faciliter la création d'un nombre variable de couches.
N'oubliez pas la fonction d'activation
import torch.nn as nn class MyModel(nn.Module): def __init__(self, input_size, hidden_size, output_size, n_layers): super(MyModel, self).__init__() layers = [nn.Linear(input_size, hidden_size)] layers.append(nn.ReLU()) for _ in range(n_layers - 1): layers.append(nn.Linear(hidden_size, hidden_size)) layers.append(nn.ReLU()) layers.append(nn.Linear(hidden_size, output_size)) layers.append(nn.Sigmoid()) self.model = nn.Sequential(*layers) def forward(self, x): return self.model(x) model = MyModel(dataset.features.shape[1], 10000, 1,1)

Modifiez la fonction main pour entrainez votre modèle sur le dataset d'entrainement comme cela a été fait au dernier TP.

Implémentez la régularisation L1 et L2 dans la fonction de perte. Expérimentez avec différents coefficients pour la régularisation. En particulier, qu'observez-vous quand la régularisation est trop forte ?
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") dataset = CardioDataset() train, val, test = random_split(dataset, [0.8, 0.1, 0.1]) print(len(train), len(val), len(test)) model = MyModel(dataset.features.shape[1], 10000, 1,1).to(device) print(model) train_loader = DataLoader(train, batch_size=32, shuffle=True) val_loader = DataLoader(val, batch_size=32, shuffle=True) test_loader = DataLoader(test, batch_size=32, shuffle=True) print(len(train_loader), len(val_loader), len(test_loader)) optimizer = torch.optim.SGD(model.parameters(), lr=0.001) criterion = nn.BCELoss() l1_lambda = 0 l2_lambda = 1e-4 for epoch in range(200): running_loss = 0.0 total_l2_loss = 0.0 model.train() correct = 0 total = 0 for batch in train_loader: inputs = batch["features"].to(device) targets = batch["labels"].to(device) optimizer.zero_grad() outputs = model(inputs) loss = criterion(outputs, targets.float()) l1_loss = sum(p.abs().sum() for p in model.parameters()) l2_loss = sum(p.pow(2).sum() for p in model.parameters()) total_loss = loss + l1_lambda * l1_loss + l2_lambda * l2_loss total_loss.backward() optimizer.step() running_loss += loss.item() total_l2_loss += l2_loss predicted = (outputs > 0.5).float() total += targets.size(0) correct += (predicted == targets).sum().item() metric = correct / total print(f"Accuracy Train : {metric:.2f}") print(f"Époque {epoch + 1}, Train Perte : {running_loss / len(train_loader)}") print(f"Époque {epoch + 1}, L2 loss : {total_l2_loss / len(train_loader)}") model.eval() running_loss_val = 0.0 with torch.no_grad(): correct = 0 total = 0 for batch in val_loader: inputs = batch["features"].to(device) targets = batch["labels"].to(device) outputs = model(inputs) loss = criterion(outputs, targets.float()) running_loss_val += loss.item() predicted = (outputs > 0.5).float() total += targets.size(0) correct += (predicted == targets).sum().item() metric = correct / total print(f"Accuracy Val : {metric:.2f}") print(f"Époque {epoch + 1}, Val Perte : {running_loss_val / len(val_loader)}")

Comparaison des optimizers

Nous voulons maintenant comparer les performances de différents optimizers sur notre dataset.

Testez les optimizers suivants : SGD (avec momentum), ADAM, RMSProp, et AdaGrad. Comparez les courbes de perte et la précision sur le dataset de validation.
optimizers = { "SGD": torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9), "ADAM": torch.optim.Adam(model.parameters(), lr=0.01), "RMSProp": torch.optim.RMSprop(model.parameters(), lr=0.01), "AdaGrad": torch.optim.Adagrad(model.parameters(), lr=0.01) } for name, optimizer in optimizers.items(): print(f"Training with {name}") # Re-entraînez le modèle avec cet optimizer et tracez les résultats.

Analyse des métriques

Finalement, nous voulons calculer et analyser les différentes métriques de classification.

Implémentez les métriques suivantes : précision, rappel, F1-score et AUC. Utilisez sklearn.metrics pour les calculs.
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score # Exemple d'utilisation avec des données factices true_labels = [0, 1, 0, 1, 1] predicted_probs = [0.2, 0.8, 0.1, 0.6, 0.9] predicted_labels = [int(p > 0.5) for p in predicted_probs] precision = precision_score(true_labels, predicted_labels) recall = recall_score(true_labels, predicted_labels) f1 = f1_score(true_labels, predicted_labels) auc = roc_auc_score(true_labels, predicted_probs) print(f"Precision: {precision}, Recall: {recall}, F1: {f1}, AUC: {auc}")
import torch from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score from sklearn.preprocessing import StandardScaler from torch.utils.data import Dataset, DataLoader, random_split import pandas as pd import torch.nn as nn import numpy as np from torch.utils.tensorboard import SummaryWriter from tqdm import tqdm class CardioDataset(Dataset): def __init__(self): df = pd.read_csv("cardio_train.csv", sep=";") df = df.drop_duplicates().drop("id", axis=1) categorical_features = ["gender", "cholesterol", "gluc"] for feature in categorical_features: one_hot = pd.get_dummies(df[feature], dtype=np.float32) one_hot.columns = [feature + "_" + str(c) for c in one_hot.columns] df.drop(feature, axis=1, inplace=True) df = pd.concat([df, one_hot], axis=1) print(len(df.columns)) print(list(df.columns)) features_names = list(df.columns) features_names.remove("cardio") self.features = df[features_names] scaler = StandardScaler() self.features = scaler.fit_transform(self.features).astype(np.float32) self.labels = df["cardio"].to_numpy().astype(int).reshape(-1, 1) def __len__(self): return len(self.features) def __getitem__(self, idx): if torch.is_tensor(idx): idx = idx.tolist() local_features = self.features[idx] local_labels = self.labels[idx] res = {"features": local_features, "labels": local_labels} return res class MyModel(nn.Module): def __init__(self, input_size, hidden_size, output_size, n_layers): super(MyModel, self).__init__() layers = [nn.Linear(input_size, hidden_size)] layers.append(nn.ReLU()) for _ in range(n_layers - 1): layers.append(nn.Linear(hidden_size, hidden_size)) layers.append(nn.ReLU()) layers.append(nn.Linear(hidden_size, output_size)) layers.append(nn.Sigmoid()) self.model = nn.Sequential(*layers) def forward(self, x): return self.model(x) if __name__ == '__main__': device = torch.device("cuda" if torch.cuda.is_available() else "cpu") HIDDEN_SIZE = 1000 N_LAYERS = 1 BATCH_SIZE = 64 LR = 0.001 dataset = CardioDataset() train, val, test = random_split(dataset, [0.8, 0.1, 0.1]) print(len(train), len(val), len(test)) model = MyModel(dataset.features.shape[1], HIDDEN_SIZE, 1,N_LAYERS).to(device) print(model) train_loader = DataLoader(train, batch_size=BATCH_SIZE, shuffle=True) val_loader = DataLoader(val, batch_size=BATCH_SIZE, shuffle=True) test_loader = DataLoader(test, batch_size=BATCH_SIZE, shuffle=True) print(len(train_loader), len(val_loader), len(test_loader)) optimizer = torch.optim.SGD(model.parameters(), lr=LR) criterion = nn.BCELoss() l1_lambda = 0 l2_lambda = 1e-4 writer = SummaryWriter('runs/cardio') for epoch in tqdm(list(range(200))): running_loss = 0.0 total_l2_loss = 0.0 model.train() correct = 0 total = 0 for batch in train_loader: inputs = batch["features"].to(device) targets = batch["labels"].to(device) optimizer.zero_grad() outputs = model(inputs) loss = criterion(outputs, targets.float()) l1_loss = sum(p.abs().sum() for p in model.parameters()) l2_loss = sum(p.pow(2).sum() for p in model.parameters()) total_loss = loss + l1_lambda * l1_loss + l2_lambda * l2_loss total_loss.backward() optimizer.step() running_loss += loss.item() total_l2_loss += l2_loss predicted = (outputs > 0.5).float() total += targets.size(0) correct += (predicted == targets).sum().item() accuracy = correct / total writer.add_scalar('train/accuracy', accuracy, epoch) writer.add_scalar("train/loss", running_loss / len(train_loader), epoch) writer.add_scalar("train/l2_loss", total_l2_loss / len(train_loader), epoch) model.eval() running_loss_val = 0.0 precision = 0 recall = 0 f1 = 0 auc = 0 with torch.no_grad(): correct = 0 total = 0 for batch in val_loader: inputs = batch["features"].to(device) targets = batch["labels"].to(device) outputs = model(inputs) loss = criterion(outputs, targets.float()) running_loss_val += loss.item() predicted = (outputs > 0.5).float() total += targets.size(0) correct += (predicted == targets).sum().item() targets = targets.cpu().numpy() predicted = predicted.cpu().numpy() precision += precision_score(targets, predicted) recall = recall_score(targets, predicted) f1 = f1_score(targets, predicted) auc = roc_auc_score(targets, predicted) accuracy = correct / total writer.add_scalar("val/accuracy", accuracy, epoch) writer.add_scalar("val/loss", running_loss_val / len(val_loader), epoch) writer.add_scalar("val/precision", precision, epoch) writer.add_scalar("val/recall", recall, epoch) writer.add_scalar("val/f1", f1, epoch) writer.add_scalar("val/auc", auc, epoch) model.eval() running_loss_val = 0.0 precision = 0 recall = 0 f1 = 0 auc = 0 with torch.no_grad(): correct = 0 total = 0 for batch in test_loader: inputs = batch["features"].to(device) targets = batch["labels"].to(device) outputs = model(inputs) loss = criterion(outputs, targets.float()) running_loss_val += loss.item() predicted = (outputs > 0.5).float() total += targets.size(0) correct += (predicted == targets).sum().item() targets = targets.cpu().numpy() predicted = predicted.cpu().numpy() precision += precision_score(targets, predicted) recall = recall_score(targets, predicted) f1 = f1_score(targets, predicted) auc = roc_auc_score(targets, predicted) accuracy = correct / total hparams_dict = {"lr": LR, "hidden_size": HIDDEN_SIZE, "n_layers": N_LAYERS, "batch_size": BATCH_SIZE, "L1": l1_lambda, "L2": l2_lambda, } value_dict = { "accuracy": accuracy, "loss": running_loss_val / len(test_loader), "precision": precision, "recall": recall, "f1": f1, "auc": auc } writer.add_hparams(hparams_dict, value_dict) writer.close()