CSC 8607 – Introduction au deep learning

Portail informatique

CI5 : Image Captioning avec RNN et Attention sur ResNet

Dans ce TP, vous allez implémenter un modèle d’image captioning utilisant un ResNet pré-entraîné comme extracteur de caractéristiques, combiné à un RNN avec un module d’attention. Ce projet permettra de pratiquer le transfer learning, la gestion du learning rate scheduling, et la mise en œuvre d’un module d’attention personnalisé.

L'objectif de cette application est de prendre une image comme entrée et de produire une légende textuelle pertinente en sortie, c’est-à-dire un modèle de "captioning d'image". Pour cela, nous allons utiliser le dataset Flickr30k contenant plus de 30 000 images avec des descriptions associées.

Objectifs

  • Comprendre et manipuler un dataset d'image captioning (Flickr30k).
  • Mettre en place une pipeline de chargement de données avec des transformations adaptées.
  • Pratiquer le transfert d'apprentissage en utilisant un ResNet50 pré-entraîné.
  • Apprendre à geler les paramètres d'un modèle pour en extraire des caractéristiques fixes.
  • Implémenter un module d'attention personnalisé sur les sorties d'une couche convolutionnelle.
  • Créer une architecture LSTM modifiée intégrant un mécanisme d'attention.
  • Utiliser des embeddings pré-entraînés (Word2Vec) pour la représentation des séquences textuelles.
  • Maîtriser les concepts de tokenisation et détokenisation pour la manipulation de séquences.
  • Concevoir une boucle d'entraînement incluant une planification du taux d'apprentissage (Step Decay).
  • Évaluer l'évolution du modèle en générant des légendes pour des images au fil des epochs.

Chargement des données

Le dataset Flickr30k peut être chargé à l’aide de torchvision.datasets.Flickr30k. Ce dataset contient des images ainsi que leurs annotations textuelles. Pour préparer les données, vous allez devoir commencer par effectuer les transformations nécessaires.

Chargez le dataset Flickr30k en utilisant les chemins adaptés à votre environnement. Consultez la documentation associée si besoin. Sur notre cluster, vous pouvez utiliser :
  • root='/array/data/flickr30k/flickr30k-images'
  • ann_file="/array/data/flickr30k/results_20130124.token"
dataset = datasets.Flickr30k(root='/array/data/flickr30k/flickr30k-images', ann_file="/array/data/flickr30k/results_20130124.token")

Appliquez les transformations suivantes aux images :
  • Redimensionnement des images à une taille de (224, 224)
  • Conversion des images en tenseurs
transform = transforms.Compose([ transforms.Resize((224, 224)), transforms.ToTensor(), ]) dataset = datasets.Flickr30k(root='/array/data/flickr30k/flickr30k-images', ann_file='/array/data/flickr30k/results_20130124.token', transform=transform)

Séparez le dataset en ensembles d’entraînement et de test avec un ratio de 80/20. Créez ensuite des DataLoader pour chaque ensemble avec une taille de batch de 32.

train_dataset, test_dataset = torch.utils.data.random_split(dataset, [0.8, 0.2]) train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True) test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

Pour accélérer l'apprentissage, nous utiliserons des embeddings pré-entrainés (Word2Vec). Voici les fonctions déjà implémentées pour charger ces embeddings et effectuer la tokenisation :

print("Computing vocabulary") vocabulary = set() with open("/array/data/flickr30k/results_20130124.token") as f: for line in f: _, words = line.strip().split("\t") for word in words.split(" "): vocabulary.add(word) print("Number of words", len(vocabulary)) def read_until(open_file): while True: char = open_file.read(1) if char == b' ': break yield char def read_word2vec(): with open("/array/data/word2vec/GoogleNews-vectors-negative300.bin", "rb") as f: first_line = f.readline() n_words, n_dim = map(int, first_line.split()) for n_word in range(n_words): word = b''.join(read_until(f)).decode('utf-8') vector = struct.unpack('f' * n_dim, f.read(n_dim * 4)) yield word, vector def create_embedding_layer(vocab, randomized=False): if randomized: idx_to_word = ["</s>"] word_to_idx = dict() word_to_idx["</s>"] = 0 weights = [] for i, word in enumerate(vocab): idx_to_word.append(word) word_to_idx[word] = i + 1 weights.append(np.random.rand(1, 300)) else: weights = [] idx_to_word = [] word_to_idx = dict() idx = 0 for word, vector in read_word2vec(): if word in vocab or idx == 0: idx_to_word.append(word) word_to_idx[word] = idx weights.append(np.array([vector])) idx += 1 n_words = len(weights) n_dim = len(weights[0][0]) weights = np.concat(weights) emb_layer = nn.Embedding(n_words, n_dim) emb_layer.load_state_dict({"weight": torch.Tensor(weights)}) if not randomized: emb_layer.requires_grad = False return emb_layer, idx_to_word, word_to_idx def tokenize(sentence, word2idx, max_length=20): res = [0] sentence = sentence.lower() for word in sentence.split(): if word in word2idx: res.append(word2idx.get(word, 0)) if len(res) > max_length: return res[:max_length] return res + [0] * (max_length - len(res)) def untokenize(tokens, idx2word): res = [] for x in tokens: res.append(idx2word[x]) return res print("Loading embeddings.") pretrained_embeddings, idx2word, word2idx = create_embedding_layer(vocabulary, RANDOMIZED) print("Vocabulary size", len(idx2word))

Recopiez ce code. Vous pourrez utiliser pretrained_embeddings plus tard dans votre code.

Nous voulons maintenant charger le modèle ResNet50 qui est déjà pré-entrainé.

Chargez un modèle ResNet50 pré-entraîné à l’aide de torchvision.models.

Affichez le modèle pour voir son architecture. Modifiez le modèle pour conserver toutes les couches sauf les deux dernières couches pleinement connectées.
resnet_pretrained = models.resnet50() cnn_features = nn.Sequential(*list(resnet_pretrained.children())[:-2])

Nous souhaitons maintenant geler les poids.

Pourquoi est-il important de geler les poids du ResNet dans ce TP ?

Implémentez cette opération pour que les poids ne soient pas modifiés pendant l'entraînement.

for param in resnet_pretrained.parameters(): param.requires_grad = False

Nous allons maintenant créer un module d’attention personnalisé. Voici les équations clés :

  • attention_scores = W_{att} * concat(features, hidden_state)
  • context_vector = sum(softmax(attention_scores) * features)

Implémentez le module d'attention. Il prendra en entrée la sortie de la dernière convolution de ResNet et le dernier état caché du RNN.
Utilisez l'opération unsqueeze pour rajouter des dimensions. L'opération repeat permet de répéter un tenseur. view permet de réorganiser les dimensions des tenseurs.
Il faut bien faire attention aux dimensions ici.
class AttentionModule(nn.Module): def __init__(self, feature_dim, hidden_dim): super(AttentionModule, self).__init__() self.attention_layer = nn.Conv2d(feature_dim + hidden_dim, 1, 1) def forward(self, features, hidden_state): hidden_state_expanded = hidden_state.unsqueeze(2).unsqueeze(3).repeat(1, 1, features.size(2), features.size(2)) concat_features = torch.cat((features, hidden_state_expanded), dim=1) attention_scores = self.attention_layer(concat_features).squeeze(-1) attention_weights = F.softmax(attention_scores.view(attention_scores.size(0), -1), dim=1)\ .view(attention_scores.size(0), attention_scores.size(1), attention_scores.size(2), attention_scores.size(3)) context_vector = torch.sum(torch.sum(features * attention_weights, dim=2), dim=2) return context_vector, attention_weights

Nous allons maintenant créer un LSTM personnalisé. Nous ne pouvons pas utiliser directement le module LSTM de PyTorch, car nous voulons ajouter une partie attention. Voici les équations :

  • i_t = σ(W_i * concat(x_t, h_{t-1}, att_t))
  • f_t = σ(W_f * concat(x_t, h_{t-1}, att_t))
  • c_t = f_t * c_{t-1} + i_t * tanh(W_c * concat(x_t, h_{t-1}, att_t))
  • o_t = σ(W_o * concat(x_t, h_{t-1}, att_t))
  • h_t = o_t * tanh(c_t)

Implémentez le module LSTMWithAttention. Le constructeur prendra en entrée input_size, hidden_size, attention_module, attention_size et la méthode forward prendra la séquence d'entrée du LSTM et la sortie de la dernière couche de convolution de ResNet50.
class LSTMWithAttention(nn.Module): def init_weights(self): stdv = 1.0 / math.sqrt(self.hidden_size) for weight in self.parameters(): weight.data.uniform_(-stdv, stdv) def __init__(self, input_size, hidden_size, attention_module, attention_size): super().__init__() self.input_size = input_size self.hidden_size = hidden_size self.attention_module = attention_module # i_t self.lin_i = nn.Linear(input_size + hidden_size + attention_size, hidden_size) # f_t self.lin_f = nn.Linear(input_size + hidden_size + attention_size, hidden_size) # c_t self.lin_c = nn.Linear(input_size + hidden_size + attention_size, hidden_size) # o_t self.lin_o = nn.Linear(input_size + hidden_size + attention_size, hidden_size) self.init_weights() def forward(self, x, features, init_states=None): # Assumes x.shape represents (batch_size, sequence_size, input_size) bs, seq_sz, _ = x.size() hidden_seq = [] if init_states is None: h_t, c_t = ( torch.zeros(bs, self.hidden_size).to(x.device), torch.zeros(bs, self.hidden_size).to(x.device), ) else: h_t, c_t = init_states for t in range(seq_sz): x_t = x[:, t, :] att_t, _ = self.attention_module(features, h_t) concat = torch.cat((att_t, x_t, h_t), dim=1) i_t = torch.sigmoid(self.lin_i(concat)) f_t = torch.sigmoid(self.lin_f(concat)) g_t = torch.tanh(self.lin_c(concat)) o_t = torch.sigmoid(self.lin_o(concat)) c_t = f_t * c_t + i_t * g_t h_t = o_t * torch.tanh(c_t) hidden_seq.append(h_t.unsqueeze(-1)) hidden_seq = torch.cat(hidden_seq, dim=-1) return hidden_seq, (h_t, c_t)

Maintenant, nous voulons tout mettre dans un module unique qui combinera les différentes composantes.

Implémentez la classe ImageCaptioningModel combinant les différentes parties du modèle : embeddings, ResNet, LSTM et attention. Le constructeur prend en argument cnn_model, embedding_model, lstm_hidden_dim et la méthode forward les images et les captions.
class ImageCaptioningModel(nn.Module): def __init__(self, cnn_model, embedding_model, lstm_hidden_dim): super(ImageCaptioningModel, self).__init__() # Utilisation de ResNet pré-entrainé sans la dernière couche self.cnn = nn.Sequential(*list(cnn_model.children())[:-2]) self.attention = AttentionModule(feature_dim=2048, hidden_dim=lstm_hidden_dim) self.embeddings = embedding_model vocab_size = embedding_model.weight.shape[0] self.lstm = LSTMWithAttention(embedding_model.weight.shape[1], lstm_hidden_dim, self.attention, 2048) self.conv1d = nn.Conv1d(lstm_hidden_dim, vocab_size, 1) def forward(self, images, captions): # Extraire les caractéristiques visuelles cnn_features = self.cnn(images) emb = self.embeddings(captions) lstm_outputs, (hidden_state, _) = self.lstm(emb, cnn_features) output = self.conv1d(lstm_outputs) return output

Nous voulons maintenant entrainer notre modèle. Voici les hyperparamètres suggérés :

  • learning_rate = 0.001
  • num_epochs = 1000
  • Utilisation de l’optimiseur Adam
  • Scheduler StepLR avec step_size=100 et gamma=0.1

Implémentez une boucle d'entraînement avec l'évaluation sur le test set qui :
  • Enregistre la perte d'entraînement dans TensorBoard
  • Affiche un exemple de texte généré après chaque epoch
  • Fait une step du scheduler à chaque epoch.

On pourra logger la loss sur le train toutes les 10 itérations.

La méthode untokenize permet de transformer les identifiants des mots en textes.
for epoch in range(num_epochs): model.train() total_loss = 0 for i, (images, captions) in tqdm(enumerate(train_loader), total=len(train_loader)): optimizer.zero_grad() images = images.to(device) captions = torch.LongTensor([tokenize(x, word2idx) for x in captions[0]]).to(device) outputs = model(images, captions[:, :-1]) loss = criterion(outputs, captions[:, 1:]) loss.backward() optimizer.step() total_loss += loss.item() if (i + 1) % 10 == 0: writer.add_scalar('Training Loss', total_loss / 10, epoch * len(train_loader) + i) total_loss = 0 model.eval() val_loss = 0 outputs = [] captions = [] for i, (images, captions) in tqdm(enumerate(test_loader), total=len(test_loader)): images = images.to(device) captions = torch.LongTensor([tokenize(x, word2idx) for x in captions[0]]).to(device) outputs = model(images, captions[:, :-1]) loss = criterion(outputs, captions[:, 1:]) val_loss += loss.item() predictions = outputs[-1].argmax(dim=-2) print(f"Epoch {epoch + 1} - Train Loss: {total_loss:.4f}, " f"Validation Loss: {val_loss:.4f}, prediction: \"", " ".join(untokenize(predictions, idx2word)), "\" instead of: \"", " ".join(untokenize(captions[-1], idx2word)), "\".") writer.add_scalar('Validation Loss', val_loss / len(test_loader), epoch) scheduler.step() writer.add_scalar('Learning Rate', scheduler.get_last_lr()[0], epoch)