Le dataset Flickr30k peut être chargé à l’aide de torchvision.datasets.Flickr30k.
Ce dataset contient des images ainsi que leurs annotations textuelles. Pour préparer les données,
vous allez devoir commencer par effectuer les transformations nécessaires.
Chargez le dataset Flickr30k en utilisant les chemins adaptés à votre environnement. Consultez la documentation
associée si besoin. Sur notre cluster, vous pouvez utiliser :
- root='/array/data/flickr30k/flickr30k-images'
- ann_file="/array/data/flickr30k/results_20130124.token"
dataset = datasets.Flickr30k(root='/array/data/flickr30k/flickr30k-images',
ann_file="/array/data/flickr30k/results_20130124.token")
Appliquez les transformations suivantes aux images :
- Redimensionnement des images à une taille de (224, 224)
- Conversion des images en tenseurs
transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
])
dataset = datasets.Flickr30k(root='/array/data/flickr30k/flickr30k-images',
ann_file='/array/data/flickr30k/results_20130124.token',
transform=transform)
Séparez le dataset en ensembles d’entraînement et de test avec un ratio de 80/20. Créez ensuite des
DataLoader pour chaque ensemble avec une taille de batch de 32.
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [0.8, 0.2])
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
Pour accélérer l'apprentissage, nous utiliserons des embeddings pré-entrainés (Word2Vec).
Voici les fonctions déjà implémentées pour charger ces embeddings et effectuer la tokenisation :
print("Computing vocabulary")
vocabulary = set()
with open("/array/data/flickr30k/results_20130124.token") as f:
for line in f:
_, words = line.strip().split("\t")
for word in words.split(" "):
vocabulary.add(word)
print("Number of words", len(vocabulary))
def read_until(open_file):
while True:
char = open_file.read(1)
if char == b' ':
break
yield char
def read_word2vec():
with open("/array/data/word2vec/GoogleNews-vectors-negative300.bin", "rb") as f:
first_line = f.readline()
n_words, n_dim = map(int, first_line.split())
for n_word in range(n_words):
word = b''.join(read_until(f)).decode('utf-8')
vector = struct.unpack('f' * n_dim, f.read(n_dim * 4))
yield word, vector
def create_embedding_layer(vocab, randomized=False):
if randomized:
idx_to_word = ["</s>"]
word_to_idx = dict()
word_to_idx["</s>"] = 0
weights = []
for i, word in enumerate(vocab):
idx_to_word.append(word)
word_to_idx[word] = i + 1
weights.append(np.random.rand(1, 300))
else:
weights = []
idx_to_word = []
word_to_idx = dict()
idx = 0
for word, vector in read_word2vec():
if word in vocab or idx == 0:
idx_to_word.append(word)
word_to_idx[word] = idx
weights.append(np.array([vector]))
idx += 1
n_words = len(weights)
n_dim = len(weights[0][0])
weights = np.concat(weights)
emb_layer = nn.Embedding(n_words, n_dim)
emb_layer.load_state_dict({"weight": torch.Tensor(weights)})
if not randomized:
emb_layer.requires_grad = False
return emb_layer, idx_to_word, word_to_idx
def tokenize(sentence, word2idx, max_length=20):
res = [0]
sentence = sentence.lower()
for word in sentence.split():
if word in word2idx:
res.append(word2idx.get(word, 0))
if len(res) > max_length:
return res[:max_length]
return res + [0] * (max_length - len(res))
def untokenize(tokens, idx2word):
res = []
for x in tokens:
res.append(idx2word[x])
return res
print("Loading embeddings.")
pretrained_embeddings, idx2word, word2idx = create_embedding_layer(vocabulary, RANDOMIZED)
print("Vocabulary size", len(idx2word))
Recopiez ce code. Vous pourrez utiliser pretrained_embeddings plus tard dans votre code.
Nous voulons maintenant charger le modèle ResNet50 qui est déjà pré-entrainé.
Chargez un modèle ResNet50 pré-entraîné à l’aide de torchvision.models.
Affichez le modèle pour voir son architecture.
Modifiez le modèle pour conserver toutes les couches sauf les deux dernières couches pleinement connectées.
resnet_pretrained = models.resnet50()
cnn_features = nn.Sequential(*list(resnet_pretrained.children())[:-2])
Nous souhaitons maintenant geler les poids.
Pourquoi est-il important de geler les poids du ResNet dans ce TP ?
Implémentez cette opération pour que les poids ne soient pas modifiés pendant l'entraînement.
for param in resnet_pretrained.parameters():
param.requires_grad = False
Nous allons maintenant créer un module d’attention personnalisé. Voici les équations clés :
- attention_scores = W_{att} * concat(features, hidden_state)
- context_vector = sum(softmax(attention_scores) * features)
Implémentez le module d'attention. Il prendra en entrée la sortie de la dernière convolution de ResNet et le dernier
état caché du RNN.
Utilisez l'opération unsqueeze pour rajouter des dimensions. L'opération repeat
permet de répéter un tenseur. view permet de réorganiser les dimensions des tenseurs.
Il faut bien faire attention aux dimensions ici.
class AttentionModule(nn.Module):
def __init__(self, feature_dim, hidden_dim):
super(AttentionModule, self).__init__()
self.attention_layer = nn.Conv2d(feature_dim + hidden_dim, 1, 1)
def forward(self, features, hidden_state):
hidden_state_expanded = hidden_state.unsqueeze(2).unsqueeze(3).repeat(1, 1, features.size(2), features.size(2))
concat_features = torch.cat((features, hidden_state_expanded), dim=1)
attention_scores = self.attention_layer(concat_features).squeeze(-1)
attention_weights = F.softmax(attention_scores.view(attention_scores.size(0), -1), dim=1)\
.view(attention_scores.size(0), attention_scores.size(1), attention_scores.size(2), attention_scores.size(3))
context_vector = torch.sum(torch.sum(features * attention_weights, dim=2), dim=2)
return context_vector, attention_weights
Nous allons maintenant créer un LSTM personnalisé. Nous ne pouvons pas utiliser directement le module LSTM de
PyTorch, car nous voulons ajouter une partie attention. Voici les équations :
- i_t = σ(W_i * concat(x_t, h_{t-1}, att_t))
- f_t = σ(W_f * concat(x_t, h_{t-1}, att_t))
- c_t = f_t * c_{t-1} + i_t * tanh(W_c * concat(x_t, h_{t-1}, att_t))
- o_t = σ(W_o * concat(x_t, h_{t-1}, att_t))
- h_t = o_t * tanh(c_t)
Implémentez le module LSTMWithAttention. Le constructeur prendra en entrée input_size,
hidden_size, attention_module, attention_size et la méthode forward prendra la séquence d'entrée du
LSTM et la sortie de la dernière couche de convolution de ResNet50.
class LSTMWithAttention(nn.Module):
def init_weights(self):
stdv = 1.0 / math.sqrt(self.hidden_size)
for weight in self.parameters():
weight.data.uniform_(-stdv, stdv)
def __init__(self, input_size, hidden_size, attention_module, attention_size):
super().__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.attention_module = attention_module
# i_t
self.lin_i = nn.Linear(input_size + hidden_size + attention_size, hidden_size)
# f_t
self.lin_f = nn.Linear(input_size + hidden_size + attention_size, hidden_size)
# c_t
self.lin_c = nn.Linear(input_size + hidden_size + attention_size, hidden_size)
# o_t
self.lin_o = nn.Linear(input_size + hidden_size + attention_size, hidden_size)
self.init_weights()
def forward(self, x, features, init_states=None):
# Assumes x.shape represents (batch_size, sequence_size, input_size)
bs, seq_sz, _ = x.size()
hidden_seq = []
if init_states is None:
h_t, c_t = (
torch.zeros(bs, self.hidden_size).to(x.device),
torch.zeros(bs, self.hidden_size).to(x.device),
)
else:
h_t, c_t = init_states
for t in range(seq_sz):
x_t = x[:, t, :]
att_t, _ = self.attention_module(features, h_t)
concat = torch.cat((att_t, x_t, h_t), dim=1)
i_t = torch.sigmoid(self.lin_i(concat))
f_t = torch.sigmoid(self.lin_f(concat))
g_t = torch.tanh(self.lin_c(concat))
o_t = torch.sigmoid(self.lin_o(concat))
c_t = f_t * c_t + i_t * g_t
h_t = o_t * torch.tanh(c_t)
hidden_seq.append(h_t.unsqueeze(-1))
hidden_seq = torch.cat(hidden_seq, dim=-1)
return hidden_seq, (h_t, c_t)
Maintenant, nous voulons tout mettre dans un module unique qui combinera les différentes composantes.
Implémentez la classe ImageCaptioningModel combinant les différentes parties du modèle :
embeddings, ResNet, LSTM et attention. Le constructeur prend en argument cnn_model, embedding_model,
lstm_hidden_dim et la méthode forward les images et les captions.
class ImageCaptioningModel(nn.Module):
def __init__(self, cnn_model, embedding_model, lstm_hidden_dim):
super(ImageCaptioningModel, self).__init__()
# Utilisation de ResNet pré-entrainé sans la dernière couche
self.cnn = nn.Sequential(*list(cnn_model.children())[:-2])
self.attention = AttentionModule(feature_dim=2048, hidden_dim=lstm_hidden_dim)
self.embeddings = embedding_model
vocab_size = embedding_model.weight.shape[0]
self.lstm = LSTMWithAttention(embedding_model.weight.shape[1], lstm_hidden_dim, self.attention, 2048)
self.conv1d = nn.Conv1d(lstm_hidden_dim, vocab_size, 1)
def forward(self, images, captions):
# Extraire les caractéristiques visuelles
cnn_features = self.cnn(images)
emb = self.embeddings(captions)
lstm_outputs, (hidden_state, _) = self.lstm(emb, cnn_features)
output = self.conv1d(lstm_outputs)
return output
Nous voulons maintenant entrainer notre modèle. Voici les hyperparamètres suggérés :
- learning_rate = 0.001
- num_epochs = 1000
- Utilisation de l’optimiseur Adam
- Scheduler StepLR avec step_size=100 et gamma=0.1
Implémentez une boucle d'entraînement avec l'évaluation sur le test set qui :
- Enregistre la perte d'entraînement dans TensorBoard
- Affiche un exemple de texte généré après chaque epoch
- Fait une step du scheduler à chaque epoch.
On pourra logger la loss sur le train toutes les 10 itérations.
La méthode untokenize permet de transformer les identifiants des mots en textes.
for epoch in range(num_epochs):
model.train()
total_loss = 0
for i, (images, captions) in tqdm(enumerate(train_loader), total=len(train_loader)):
optimizer.zero_grad()
images = images.to(device)
captions = torch.LongTensor([tokenize(x, word2idx) for x in captions[0]]).to(device)
outputs = model(images, captions[:, :-1])
loss = criterion(outputs, captions[:, 1:])
loss.backward()
optimizer.step()
total_loss += loss.item()
if (i + 1) % 10 == 0:
writer.add_scalar('Training Loss', total_loss / 10, epoch * len(train_loader) + i)
total_loss = 0
model.eval()
val_loss = 0
outputs = []
captions = []
for i, (images, captions) in tqdm(enumerate(test_loader), total=len(test_loader)):
images = images.to(device)
captions = torch.LongTensor([tokenize(x, word2idx) for x in captions[0]]).to(device)
outputs = model(images, captions[:, :-1])
loss = criterion(outputs, captions[:, 1:])
val_loss += loss.item()
predictions = outputs[-1].argmax(dim=-2)
print(f"Epoch {epoch + 1} - Train Loss: {total_loss:.4f}, "
f"Validation Loss: {val_loss:.4f}, prediction: \"", " ".join(untokenize(predictions, idx2word)),
"\" instead of: \"", " ".join(untokenize(captions[-1], idx2word)), "\".")
writer.add_scalar('Validation Loss', val_loss / len(test_loader), epoch)
scheduler.step()
writer.add_scalar('Learning Rate', scheduler.get_last_lr()[0], epoch)