CSC 8608 – Concepts avancés et applications du deep learning

Portail informatique

CI3 : Infrastructures

Ce TP permet aux étudiants de maîtriser les concepts fondamentaux du Machine Learning opérationnel (MLOps) à travers la création d'un pipeline automatisé. Les étudiants utiliseront Apache Airflow pour orchestrer des tâches sous forme de graphes acycliques dirigés (DAGs), Docker pour la conteneurisation et MLflow pour la gestion du cycle de vie des modèles. Enfin, les modèles entraînés seront déployés et servis via une API REST.

Objectifs du TP :

  • Mettre en place et utiliser Apache Airflow pour automatiser des workflows complexes de Machine Learning.
  • Construire et entraîner plusieurs modèles ML en parallèle.
  • Conteneuriser les environnements ML à l'aide de Docker.
  • Suivre, gérer et déployer des modèles de ML avec MLflow.
  • Déployer des modèles en production en les servant via une API REST.

Mise en place de MLflow et Apache Airflow

Dans cet exercice, nous allons configurer notre environnement de travail afin de pouvoir suivre des expérimentations avec MLflow et orchestrer des flux de travail automatisés grâce à Apache Airflow.

MLflow est un outil open-source très utilisé dans le domaine du Machine Learning pour suivre les expérimentations, gérer les modèles et faciliter leur déploiement. Apache Airflow, quant à lui, permet de définir, planifier et surveiller des flux de travail complexes à l'aide de graphes acycliques orientés (DAG).

Vous pouvez aussi faire ce TP en vous connectant à une machine distance à travers votre IDE. Il vous faudra cependant faire du port forwarding pour pouvoir afficher les interfaces graphique, par exemple en exécutant sur votre machine locale :
ssh -g -L 6007:localhost:6007 -f -N tsp-client
Ici, nous forwardons le port 6007. Ouvrez localhost:6007 pour vérifier que vous accédez bien à l'interface

Installer MLflow et Airflow dans votre environnement Python. Vérifiez ensuite les versions installées en affichant les informations depuis votre terminal.

Pour installer MLflow et Apache Airflow, utilisez les commandes suivantes :

pip install mlflow apache-airflow

Vérifiez ensuite l'installation en tapant :

mlflow --version airflow version

Lancez un serveur MLflow en local et vérifiez qu'il fonctionne correctement en accédant à son interface web.

Pour lancer un serveur MLflow en local :

mlflow ui

Accédez ensuite à l'adresse http://localhost:5000 depuis votre navigateur web. Vous devriez voir l'interface utilisateur MLflow apparaître correctement.

Initialisez une base de données SQLite avec :
airflow db init

Créez ensuite un utilisateur administrateur :

airflow users create \ --username admin \ --firstname FIRST_NAME \ --lastname LAST_NAME \ --role Admin \ --email admin@example.com

Afin qu'Airflow trouve vos DAGs, nous allons modifier les paramètres de configuration d'Airflow ~/airflow/airflow.cfg. Changer les paramètres suivants (à trouver dans le fichier) :

dags_folder = /chemin/vers/le/dossier/dags load_examples = False
(c.f. exercice suivant pour le dossier contenant les DAGs). Ensuite, exécutez :
airflow standalone
si vous ne voyez pas les DAGs s'afficher plus tard. Vérifiez que le serveur Airflow fonctionne en accédant à son interface web.

Création d'un pipeline de Machine Learning avec Apache Airflow

Dans cet exercice, vous allez créer un pipeline complexe de Machine Learning comprenant l'ingestion de données, le prétraitement, l'entraînement parallèle de deux modèles différents (Random Forest et Logistic Regression), la validation, et l'enregistrement des résultats avec MLflow. Le pipeline sera orchestré sous forme de DAG avec Apache Airflow.

Pour simplifier, nous utiliserons le jeu de données Iris disponible dans scikit-learn.

Créez une structure de projet Python organisée avec les dossiers nécessaires pour votre pipeline.
projet_ml/ ├── airflow_dags/ │ └── pipeline_ml_dag.py ├── data/ ├── src/ │ ├── data_ingestion.py │ ├── preprocessing.py │ ├── train_rf.py │ ├── train_lr.py │ └── validate.py └── models/

Définissez un DAG Airflow nommé pipeline_ml avec les tâches suivantes : ingestion, prétraitement, entraînement parallèle des modèles Random Forest et Logistic Regression, puis validation. Pour l'instant, on demande juste la création du DAG dans pipeline_ml_dag.py en faisant appels aux scripts définis (mais non écris) définis à la question précédente.
from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime default_args = {'start_date': datetime(2025, 3, 10)} with DAG('pipeline_ml', default_args=default_args, schedule_interval=None) as dag: ingestion = BashOperator(task_id='ingestion', bash_command='python ' + os.path.dirname(__file__) + '/../src/data_ingestion.py') preprocessing = BashOperator(task_id='preprocessing', bash_command='python ' + os.path.dirname(__file__) + '/../src/preprocessing.py') training_rf = BashOperator(task_id='training_rf', bash_command='python ' + os.path.dirname(__file__) + '/../src/train_rf.py') training_lr = BashOperator(task_id='training_lr', bash_command='python ' + os.path.dirname(__file__) + '/../src/train_lr.py') validation = BashOperator(task_id='validation', bash_command='python ' + os.path.dirname(__file__) + '/../src/validate.py') ingestion >> preprocessing >> [training_rf, training_lr] >> validation

Implémentez le script Python data_ingestion.py pour télécharger et sauvegarder localement le jeu de données Iris.
from sklearn.datasets import load_iris df = load_iris(as_frame=True) df.frame.to_csv(os.path.dirname(__file__) + "/../data/iris.csv", index=False)

Implémentez le script preprocessing.py pour charger le jeu de données Iris, faire un train/test split (80/20), puis sauvegarder les données prétraitées.
import pandas as pd from sklearn.model_selection import train_test_split df = pd.read_csv(os.path.dirname(__file__) + "/../data/iris.csv") train, test = train_test_split(df, test_size=0.2, random_state=42) train.to_csv(os.path.dirname(__file__) + "/../data/train.csv", index=False) test.to_csv(os.path.dirname(__file__) + "/../data/test.csv", index=False)

Modifiez les scripts d'entraînement afin de créer deux scripts séparés : train_rf.py pour Random Forest et train_lr.py pour Logistic Regression.

train_rf.py

import pandas as pd from sklearn.ensemble import RandomForestClassifier import joblib data = pd.read_csv(os.path.dirname(__file__) + "/../data/train.csv") X = data.drop('target', axis=1) y = data['target'] rf = RandomForestClassifier().fit(X, y) joblib.dump(rf, os.path.dirname(__file__) + "/../models/random_forest.pkl")

train_lr.py

import pandas as pd from sklearn.linear_model import LogisticRegression import joblib data = pd.read_csv(os.path.dirname(__file__) + "/../data/train.csv") X = data.drop('target', axis=1) y = data['target'] lr = LogisticRegression().fit(X, y) joblib.dump(lr, os.path.dirname(__file__) + "/../models/logistic_regression.pkl")

Modifiez le script de validation pour évaluer les modèles entraînés sur les données de test (test.csv) et sauvegarder les résultats avec MLflow. On veut trouver le modèle avec la meilleure accuracy. On loggera pour chaque modèle son accuracy dans MLflow. Ensuite, on loggera dans MLflow le nom du meilleur modèle et les paramètres du meilleur modèle (c.f. mlflow.sklearn.log_model).
import pandas as pd from sklearn.metrics import accuracy_score import joblib import mlflow test_data = pd.read_csv(os.path.dirname(__file__) + "/../data/test.csv") X_test = test_data.drop('target', axis=1) y_test = test_data['target'] models = { 'RandomForest': joblib.load(os.path.dirname(__file__) + "/../models/random_forest.pkl"), 'LogisticRegression': joblib.load(os.path.dirname(__file__) + "/../models/logistic_regression.pkl") } mlflow.set_tracking_uri('http://localhost:5000') mlflow.start_run() best_score, best_model = 0, None for name, model in models.items(): predictions = model.predict(X_test) acc = accuracy_score(y_test, predictions) mlflow.log_metric(f'{name}_accuracy', acc) if acc > best_score: best_score, best_model = acc, model mlflow.log_param('best_model', best_model.__class__.__name__) mlflow.sklearn.log_model(best_model, "best_model") mlflow.end_run()

Exécutez votre DAG Airflow complet pour générer, entraîner, valider et sélectionner le meilleur modèle. Vérifiez dans MLflow que les résultats sont correctement enregistrés.

Depuis l'interface web Airflow (http://localhost:8080), déclenchez manuellement l'exécution du DAG. Après son exécution complète, consultez l'interface MLflow (http://localhost:5000) pour vérifier que les paramètres, métriques et modèles ont été correctement enregistrés.

Conteneurisation du pipeline ML avec Docker

Dans cet exercice, vous allez conteneuriser le pipeline ML que vous avez créé dans l'exercice précédent. Vous utiliserez Docker pour isoler et simplifier la gestion de votre environnement de travail.

Cette section s'adresse surtout aux personnes peu familières avec Docker.

Docker est un outil open-source permettant de créer des environnements reproductibles et isolés sous forme de conteneurs.

Créez un fichier requirements.txt listant toutes les dépendances Python nécessaires pour votre projet.
pandas scikit-learn joblib mlflow apache-airflow

Créez un fichier Dockerfile à la racine de votre projet. Dans ce Dockerfile, utilisez python:3.10-slim comme image de base. Définissez /app comme étant le dossier de travail. Ensuite, copiez le fichier requirements.txt, installer les requirements et git. Finalement, copiez votre projet dans le conteneur et lancez la commande bash.
FROM python:3.10-slim WORKDIR /app COPY requirements.txt ./ RUN apt-get update && apt-get install -y git && pip install --upgrade pip && pip install -r requirements.txt COPY . . CMD ["bash"]

Construisez votre image Docker en utilisant le fichier Dockerfile créé précédemment.
docker build -t pipeline_ml_image .

Lancez un conteneur basé sur cette image Docker et vérifiez que vous pouvez exécuter correctement les différents scripts de votre pipeline ML.
docker run -it --name pipeline_ml_container pipeline_ml_image

À l'intérieur du conteneur, exécutez :

python src/data_ingestion.py python src/preprocessing.py python src/train_rf.py python src/train_lr.py

Vérifiez que chaque étape s'exécute correctement sans erreur.

Déploiement et gestion des modèles avec MLflow

Dans cet exercice final, vous apprendrez à déployer et gérer les modèles de Machine Learning en utilisant MLflow. Vous utiliserez les modèles précédemment entraînés avec votre DAG Apache Airflow et enregistrés avec MLflow. Vous servirez ensuite ces modèles via une API REST afin d'effectuer des prédictions.

Enregistrez votre meilleur modèle dans le registre des modèles de MLflow depuis les résultats de la dernière exécution de votre DAG. Pour cela, vous pouvez vous baser sur le code suivant :
import mlflow mlflow.set_tracking_uri("http://localhost:5000") run_id = "<votre_run_id>" # à remplacer par l'ID du run MLflow depuis l'interface model_uri = f"runs:/{run_id}/best_model" model_name = "iris_best_model" mlflow.register_model(model_uri, model_name)

Remplacez <votre_run_id> par l'ID récupéré depuis l'interface utilisateur MLflow.

Lancez un serveur MLflow pour servir le modèle enregistré via une API REST avec :
export MLFLOW_TRACKING_URI=http://localhost:5000 mlflow models serve -m "models:/iris_best_model/1" --port 1234 --no-conda

Cette commande lance un serveur web servant votre modèle sur le port 1234.

Testez l'API REST en envoyant une requête HTTP à http://localhost:1234/invocations avec des données de test (au format JSON) pour obtenir des prédictions du modèle déployé.

Exemple de requête avec curl :

curl -X POST http://localhost:1234/invocations -H 'Content-Type: application/json' -d '{ "inputs": [[5.1, 3.5, 1.4, 0.2]] }'

Vous devriez recevoir la prédiction correspondante en réponse à cette requête.

Consultez l'interface utilisateur MLflow pour vérifier l'état de votre modèle déployé dans le registre.

Accédez à l'interface MLflow (http://localhost:5000) et rendez-vous dans l'onglet « Models ». Vous devriez voir le modèle iris_best_model avec son état actuel et les détails associés.