CI2 : Ingestion mensuelle, validation et snapshots
Ce TP prolonge le travail initié au TP1. Vous allez commencer à construire un pipeline d’ingestion de données pour un système de machine learning de bout en bout.
- Le rapport du TP2 doit être rédigé en Markdown dans le fichier reports/rapport_tp2.md.
- Vous devez réutiliser le même dépôt Git que pour le TP1 (ne créez pas un nouveau dépôt).
- Avant de commencer ce TP, ajouter un tag tp1 sur votre dépôt correspondant à la fin du TP1.
Objectif global du TP :
Mettre en place une ingestion mensuelle pour les données
month_000 et month_001, en utilisant :
- PostgreSQL pour stocker les données structurées,
- Prefect pour orchestrer le pipeline d’ingestion,
- Great Expectations pour valider la qualité des données,
- des snapshots temporels pour capturer l’état des données à la fin de chaque mois.
Le TP est à réaliser en environ 1h30 en séance. Vous pourrez compléter et nettoyer le rapport reports/rapport_tp2.md chez vous.
Mise en place du projet et du rapport
Reprendre le dépôt du TP1
Vous devez réutiliser le même dépôt Git que pour le TP1. Placez-vous à la racine de ce dépôt (là où se trouvent api/, docker-compose.yml, reports/).
Vérifiez d’abord la structure minimale suivante :
Puis affichez l’état du dépôt Git et copiez-coller la sortie dans votre rapport reports/rapport_tp2.md (dans une section que vous pourrez appeler par exemple État initial du dépôt).
Créer la structure minimale pour le TP2
Pour ce TP, nous allons progressivement nous rapprocher de l’architecture complète du projet. Commencez par créer les répertoires suivants :
Créez ces répertoires depuis la racine du dépôt. Vérifiez ensuite qu’ils existent bien avec ls ou tree (si disponible), et ajoutez une courte capture (commande + sortie) dans votre rapport.
Télécharger et extraire les données month_000 et month_001
Les données de ce TP sont fournies sous forme d’archives ZIP contenant des fichiers CSV. Téléchargez les deux archives depuis les liens suivants (depuis votre navigateur, en dehors des conteneurs Docker) :
- Données month_000 : Télécharger month_000.zip
- Données month_001 : Télécharger month_001.zip
Une fois les fichiers téléchargés, extrayez-les dans les répertoires prévus :
Vérifiez la liste des fichiers (par exemple avec ls data/seeds/month_000 et ls data/seeds/month_001) et copiez la liste obtenue dans votre rapport reports/rapport_tp2.md (par exemple dans une section Structure des données).
Base de données et docker-compose
Créer le schéma de base de données dans db/init/001_schema.sql
Nous allons définir le schéma PostgreSQL qui servira de base à tout le système. Créez le fichier db/init/001_schema.sql et copiez-y le contenu suivant sans le modifier (aucun TODO ici) :
Sauvegardez le fichier, puis ajoutez une courte note dans votre rapport reports/rapport_tp2.md indiquant que le schéma a été créé.
Créer et comprendre le fichier .env
Le fichier .env contient des variables d’environnement qui seront automatiquement injectées dans les conteneurs Docker. Cela permet de séparer la configuration (mots de passe, noms de base, etc.) du code.
À la racine du dépôt, créez un fichier nommé .env avec le contenu minimal suivant :
Ces variables seront utilisées par le conteneur PostgreSQL (et plus tard par Prefect) pour se connecter à la base de données.
Ajoutez dans votre rapport une courte phrase expliquant à quoi sert un fichier .env dans un projet Docker.
Mettre à jour docker-compose.yml
Pour ce TP, nous allons utiliser uniquement deux services : un service postgres pour la base de données et un service prefect (que nous utiliserons dans l’exercice suivant).
Adaptez votre fichier docker-compose.yml à la racine du projet pour qu’il contienne au minimum la configuration suivante :
Vérifiez que votre fichier docker-compose.yml se trouve bien à la racine du dépôt (même niveau que le dossier db/). Vous pouvez montrer son contenu (ou un extrait) dans le rapport.
Démarrer Postgres et vérifier les tables créées
Vous pouvez maintenant démarrer uniquement le service postgres :
Une fois le conteneur postgres en état Up, connectez-vous à la base de données depuis le conteneur :
Listez les tables existantes avec la commande \dt dans psql :
Copiez la sortie de \dt dans votre rapport reports/rapport_tp2.md et commentez brièvement, en une phrase par table, ce que représente chaque table du schéma.
Upsert des CSV avec Prefect (month_000)
Créer le service Prefect : services/prefect/Dockerfile et services/prefect/requirements.txt
Nous allons maintenant créer un service prefect dédié à l’orchestration de notre pipeline d’ingestion. Créez le dossier services/prefect s’il n’existe pas déjà, puis ajoutez-y les deux fichiers suivants.
Fichier services/prefect/Dockerfile :
Fichier services/prefect/requirements.txt :
Copiez ces contenus tels quels. Nous n’allons pas utiliser toutes ces dépendances immédiatement, mais cela prépare les TP suivants.
Ajoutez dans votre rapport une courte note expliquant le rôle du conteneur prefect dans l’architecture (orchestration du pipeline d’ingestion).
Créer le fichier services/prefect/ingest_flow.py (version TP)
Nous allons maintenant créer un premier flow Prefect qui lit les CSV de month_000 et les insère dans PostgreSQL en utilisant un upsert (INSERT ... ON CONFLICT DO UPDATE).
Créez le fichier services/prefect/ingest_flow.py avec le contenu suivant. Deux petites parties du code sont à compléter (marquées TODO) pour travailler la logique d’upsert.
Complétez les deux blocs TODO :
- Conversion en booléen des colonnes de type booléen (`plan_stream_tv`, `plan_stream_movies`, `paperless_billing`) si elles sont présentes.
- Construction de la chaîne updates utilisée dans le ON CONFLICT pour mettre à jour toutes les colonnes non clés primaires.
Décrivez brièvement dans votre rapport la logique de cette fonction upsert_csv (en quelques phrases).
Lancer Prefect et l’ingestion de month_000
Vous pouvez maintenant construire l’image du service prefect et lancer le flow d’ingestion pour month_000.
Assurez-vous d’abord que le service postgres est démarré (voir exercice précédent), puis lancez également le service prefect :
Une fois le conteneur prefect en état Up, lancez le flow d’ingestion en lui passant les variables d’environnement adaptées pour le mois 000 :
Après l’exécution, connectez-vous à PostgreSQL pour vérifier que les données ont bien été insérées :
Puis exécutez les requêtes SQL suivantes :
Copiez les résultats dans votre rapport et commentez en une phrase : Combien de clients avons-nous après month_000 ?.
- Que le conteneur prefect est bien démarré.
- Que les volumes ./services/prefect et ./data sont correctement montés.
- Que les chemins /data/seeds/month_000/*.csv existent réellement dans votre machine hôte.
Validation des données avec Great Expectations
Compléter la fonction validate_with_ge
Dans cet exercice, nous allons ajouter une étape de validation avec Great Expectations dans le flow d’ingestion. L’idée est de récupérer une table depuis PostgreSQL, d’appliquer quelques expectations, puis de faire échouer le flow si la validation ne passe pas.
Ouvrez le fichier services/prefect/ingest_flow.py et ajoutez-y la fonction suivante (par exemple après upsert_csv). Une partie est déjà écrite pour les tables users et subscriptions. Vous devez compléter les expectations pour la table usage_agg_30d (marquées par des commentaires).
Complétez la partie elif table == "usage_agg_30d": :
- Définir les colonnes attendues avec expect_table_columns_to_match_set.
- Ajouter au moins une ou deux expectations de type expect_column_values_to_be_between pour vérifier que vos agrégats sont bien non négatifs (par exemple watch_hours_30d et avg_session_mins_7d doivent être ≥ 0).
Enfin, mettez à jour le flow pour appeler validate_with_ge après les upserts, par exemple :
Décrivez dans votre rapport, en quelques lignes, le rôle de validate_with_ge dans le pipeline.
Relancer l’ingestion pour month_000 avec validation
Une fois la fonction validate_with_ge complétée et le flow mis à jour, relancez le flow d’ingestion pour month_000 comme précédemment :
Si vos expectations sont correctes et cohérentes avec les données, la pipeline doit se terminer sans erreur. Si une erreur Great Expectations apparaît, lisez attentivement le message pour comprendre quelle règle a été violée.
Compléter le rapport : pourquoi ces bornes et comment protègent-elles le modèle ?
Dans votre fichier reports/rapport_tp2.md, ajoutez une section (par exemple Validation des données) dans laquelle vous :
- copiez quelques lignes de vos expectations pour usage_agg_30d (par exemple les appels à expect_column_values_to_be_between) ;
- expliquez, en quelques phrases, pourquoi vous avez choisi ces bornes, par exemple watch_hours_30d >= 0 ;
- expliquez comment ces règles protègent votre futur modèle (exclusion de valeurs impossibles, détection d’exports corrompus, etc.).
Snapshots et ingestion month_001
Compléter la fonction snapshot_month(as_of)
Nous allons maintenant ajouter une étape de création de snapshots temporels pour figer l’état des données à la fin de chaque mois. L’idée est de copier les données des tables live vers des tables *_snapshots avec un champ as_of.
Dans services/prefect/ingest_flow.py, ajoutez la fonction suivante. Elle crée d’abord les tables de snapshots si nécessaire, puis insère les données pour un mois donné. Un des blocs INSERT est à compléter par vous.
Complétez le bloc d’insertion dans payments_agg_90d_snapshots en vous inspirant des autres blocs. Le but est d’avoir une ligne par utilisateur et par date as_of.
Ensuite, mettez à jour votre flow ingest_month_flow pour appeler snapshot_month(as_of) après la validation GE, par exemple :
Ajoutez dans votre rapport une phrase expliquant ce que fait snapshot_month (en particulier le rôle de as_of).
Ingestion de month_001 avec snapshots
Vous allez maintenant lancer l’ingestion pour le mois suivant month_001, avec une nouvelle valeur de AS_OF. Assurez-vous que postgres et prefect sont démarrés, puis exécutez :
Une fois le flow terminé sans erreur, connectez-vous à PostgreSQL et vérifiez qu’il existe bien des snapshots pour les deux dates 2024-01-31 et 2024-02-29, par exemple avec :
Copiez ces résultats dans votre rapport et commentez brièvement : avez-vous le même nombre de lignes ? davantage ? Pourquoi ?
Compléter le rapport : schéma, explications et réflexion
Dans votre fichier reports/rapport_tp2.md, ajoutez une section de synthèse pour ce TP :
- Un petit schéma (ASCII ou capture) montrant le pipeline complet
-
Une explication en quelques phrases :
- Pourquoi on ne travaille pas directement sur les tables live pour entraîner un modèle ?
- Pourquoi les snapshots sont importants pour éviter la data leakage et garantir la reproductibilité temporelle ?
-
Un court paragraphe de réflexion personnelle :
- Qu’avez-vous trouvé le plus difficile dans la mise en place de l’ingestion ?
- Quelles erreurs avez-vous rencontrées et comment les avez-vous corrigées ?