CSC 8613 – Systèmes pour le machine learning

Portail informatique

CI4 : Entraînement end-to-end : MLflow Registry → API de prédiction<

Dans ce TP, vous allez compléter une chaîne MLOps minimale “de bout en bout” : à partir de features déjà disponibles (et déjà matérialisées en ligne), vous construisez un dataset d’entraînement, entraînez un modèle, tracez l’exécution dans MLflow, publiez une version dans le Model Registry, puis servez le modèle via une API FastAPI. L’objectif est de manipuler un flux réaliste et reproductible, centré sur la traçabilité et le déploiement contrôlé.

  • Relier un dataset d’entraînement (point-in-time correct) à un entraînement reproductible.
  • Journaliser une exécution dans MLflow (paramètres, métriques, artefacts) et enregistrer un modèle.
  • Promouvoir une version de modèle en Production via l’interface MLflow.
  • Étendre une API FastAPI pour exposer un endpoint /predict s’appuyant sur le modèle Production.
  • Mettre en place des sanity checks simples côté serving (features manquantes, erreurs contrôlées).
  • Documenter les étapes et résultats dans reports/rapport_tp4.md (commandes, extraits, captures, réflexion).

Mise en route + rappel de contexte (sanity checks + où on en est dans la pipeline)

Nous reprenons exactement là où vous vous êtes arrêtés au TP3 : les snapshots existent déjà, le repo Feast est déjà en place, et l’online store est déjà matérialisé. Dans ce TP, on ajoute MLflow (tracking + registry) et on commence le “end-to-end” jusqu’à une API de prédiction.

Démarrez la stack Docker Compose et vérifiez que les conteneurs principaux démarrent sans erreur.
Utilisez docker compose ps puis docker compose logs -f <service> si un service ne démarre pas.

Ajoutez le service MLflow dans docker-compose.yml, puis redémarrez la stack.
mlflow: image: ghcr.io/mlflow/mlflow:v2.16.0 command: mlflow server --backend-store-uri sqlite:///mlartifacts/mlflow.db --default-artifact-root mlflow-artifacts:/ --host 0.0.0.0 --port 5000 --serve-artifacts --artifacts-destination /mlartifacts/artifacts volumes: - "./mlartifacts:/mlartifacts" ports: ["5000:5000"] volumes: pgdata: mlartifacts:
Pensez à redémarrer la stack.
Objectif : obtenir une UI MLflow accessible localement. On utilise ici une configuration simple et locale (SQLite + volume), suffisante pour le TP.

Vérifiez l’accessibilité des interfaces et endpoints suivants : MLflow UI (localhost:5000), API /health (localhost:8000).

Faites un smoke check : vérifiez que la récupération de features online fonctionne toujours via l’endpoint existant /features/{user_id}.
Choisissez un user_id dont vous savez qu’il existe (par exemple en regardant la table ou un CSV du TP3). Si vous obtenez des null, c’est souvent un signe que l’online store n’est pas matérialisé, ou que l’ID n’existe pas.

Dans votre rapport reports/rapport_tp4.md, listez :
  1. les commandes utilisées,
  2. une preuve que chaque service est accessible (captures ou sortie terminal),
  3. un court paragraphe : “quels composants tournent et pourquoi”.

Créer un script d’entraînement + tracking MLflow (baseline RandomForest)

Dans cet exercice, vous allez compléter un script train_baseline.py qui :
  1. construit un dataset d’entraînement via des features déjà disponibles,
  2. entraîne un modèle baseline,
  3. trace l’exécution dans MLflow (params, métriques, artefacts),
  4. enregistre le modèle dans le Model Registry.

L’utilisation de l’interface MLflow (promotion en Production, etc.) sera traitée dans l’exercice suivant.

Créez le fichier services/prefect/train_baseline.py et copiez le squelette ci-dessous. Complétez ensuite les zones marquées par _______ (ce sont vos TODO). Gardez le script exécutable “as-is” dans le conteneur prefect. Le but est d’avoir un entraînement déterministe et traçable (mêmes entrées → mêmes sorties).
import os import time import warnings warnings.filterwarnings("ignore") import pandas as pd import numpy as np from sqlalchemy import create_engine from feast import FeatureStore from sklearn.model_selection import train_test_split from sklearn.metrics import f1_score, roc_auc_score, accuracy_score from sklearn.ensemble import RandomForestClassifier from sklearn.compose import ColumnTransformer from sklearn.preprocessing import OneHotEncoder from sklearn.pipeline import Pipeline import mlflow import mlflow.sklearn from mlflow.models import ModelSignature from mlflow.types.schema import Schema, ColSpec # -------------------- # Config # -------------------- FEAST_REPO = "/repo" MODEL_NAME = "streamflow_churn" AS_OF = os.environ.get("TRAIN_AS_OF", "2024-01-31") PG_USER = os.environ.get("POSTGRES_USER", "streamflow") PG_PWD = os.environ.get("POSTGRES_PASSWORD", "streamflow") PG_DB = os.environ.get("POSTGRES_DB", "streamflow") PG_HOST = os.environ.get("POSTGRES_HOST", "postgres") PG_PORT = int(os.environ.get("POSTGRES_PORT", "5432")) MLFLOW_TRACKING_URI = os.environ.get("MLFLOW_TRACKING_URI", "http://mlflow:5000") MLFLOW_EXPERIMENT = os.environ.get("MLFLOW_EXPERIMENT", "streamflow") # -------------------- # Helpers # -------------------- def get_sql_engine(): uri = f"postgresql+psycopg2://{PG_USER}:{PG_PWD}@{PG_HOST}:{PG_PORT}/{PG_DB}" return create_engine(uri) def fetch_entity_df(engine, as_of): q = """ SELECT user_id, as_of FROM subscriptions_profile_snapshots WHERE as_of = %(as_of)s """ df = pd.read_sql(q, engine, params={"as_of": as_of}) if df.empty: raise RuntimeError(f"No snapshot rows found at as_of={as_of}") df = df.rename(columns={"as_of": "event_timestamp"}) df["event_timestamp"] = pd.to_datetime(df["event_timestamp"]) return df[["user_id", "event_timestamp"]] def fetch_labels(engine, as_of): try: q = """ SELECT user_id, period_start, churn_label FROM labels WHERE period_start = %(as_of)s """ labels = pd.read_sql(q, engine, params={"as_of": as_of}) if not labels.empty: labels = labels.rename(columns={"period_start": "event_timestamp"}) return labels[["user_id", "event_timestamp", "churn_label"]] except Exception: pass q2 = "SELECT user_id, churn_label FROM labels" labels = pd.read_sql(q2, engine) if labels.empty: raise RuntimeError("Labels table is empty.") labels["event_timestamp"] = pd.to_datetime(AS_OF) return labels[["user_id", "event_timestamp", "churn_label"]] def build_training_set(store, entity_df, features): hf = store.get_historical_features( entity_df=entity_df, features=features, ) return hf.to_df() def prep_xy(df, label_col="churn_label"): y = df[label_col].astype(int).values X = df.drop(columns=[label_col, "user_id", "event_timestamp"], errors="ignore") return X, y # -------------------- # Main # -------------------- def main(): # TODO 1: configurer MLflow (tracking URI + experiment) en écrivant les bonnes constantes mlflow.set_tracking_uri(____________) mlflow.set_experiment(____________) engine = get_sql_engine() entity_df = fetch_entity_df(engine, AS_OF) labels = fetch_labels(engine, AS_OF) # TODO 2: définir la liste des features Feast à récupérer (liste de strings). Voir lab précédent # pour le nom des features features = [ _______, _______, _______, _______, _______, _______, _______, _______, _______, _______, _______, _______, _______, _______, ] store = FeatureStore(repo_path=FEAST_REPO) feat_df = build_training_set(store, entity_df, features) # TODO 3: fusionner features + labels avec une jointure sur (user_id, event_timestamp) # Inspirez-vous du TP précédent df = _______ if df.empty: raise RuntimeError("Training set is empty after merge. Check AS_OF and labels.") # Feature engineering minimal cat_cols = [c for c in df.columns if df[c].dtype == "object" and c not in ["user_id", "event_timestamp"]] num_cols = [c for c in df.columns if c not in cat_cols + ["user_id", "event_timestamp", "churn_label"]] X, y = prep_xy(df) # TODO 4: construire le préprocessing (OneHot sur cat + passthrough sur num) preproc = ColumnTransformer( transformers=[ ("cat", _______, cat_cols), ("num", _______, num_cols), ], remainder="drop" ) # TODO 5: définir le modèle RandomForest (avec un random_state fixé de manière arbitraire) clf = RandomForestClassifier( n_estimators=300, n_jobs=-1, random_state=_______, class_weight="balanced", max_features="sqrt", ) # TODO 6: Finissez de définir la pipeline pipe = Pipeline(steps=[("prep", ________), ("clf", _________)]) X_train, X_val, y_train, y_val = train_test_split( X, y, test_size=0.25, random_state=42, stratify=y ) # TODO 7: démarrer un run MLflow, entraîner, calculer métriques, logger (params + metrics) # Cherchez la fonction permettant de démarrer un run with mlflow._________(run_name=f"rf_baseline_{AS_OF}") as run: start = time.time() pipe.fit(X_train, y_train) train_time = time.time() - start if hasattr(pipe, "predict_proba"): y_val_proba = pipe.predict_proba(X_val)[:, 1] auc = roc_auc_score(y_val, y_val_proba) else: auc = float("nan") y_val_pred = pipe.predict(X_val) f1 = f1_score(y_val, y_val_pred) acc = accuracy_score(y_val, y_val_pred) # TODO7.1 log params + metrics # Loggez le type de modèle, AUC, F1, ACC (sur val) et le train time # de la même manière que la ligne suivante : mlflow.log_param("as_of", AS_OF) _______ _______ _______ _______ _______ _______ # TODO 7: logger un artefact JSON qui décrit cat_cols et num_cols mlflow.log_dict( {"categorical_cols": ________, "numeric_cols": ________}, "feature_schema.json" ) # TODO 8: créer une signature MLflow (inputs + outputs) puis enregistrer le modèle dans le Registry # À adapter avec vos features input_schema = Schema( [ ColSpec("long", "months_active"), ColSpec("double", "monthly_fee"), ColSpec("boolean", "paperless_billing"), ColSpec("boolean", "plan_stream_tv"), ColSpec("boolean", "plan_stream_movies"), ColSpec("string", "net_service"), ColSpec("double", "watch_hours_30d"), ColSpec("double", "avg_session_mins_7d"), ColSpec("long", "unique_devices_30d"), ColSpec("long", "skips_7d"), ColSpec("long", "rebuffer_events_7d"), ColSpec("long", "failed_payments_90d"), ColSpec("long", "support_tickets_90d"), ColSpec("double", "ticket_avg_resolution_hrs_90d"), ] ) output_schema = Schema([ColSpec("long", "prediction")]) signature = ModelSignature(inputs=_________, outputs=__________) mlflow.sklearn.log_model( sk_model=_______, # TODO 9 : faut-il mettre pipe ou clf ? Expliquez pourquoi dans le rapport artifact_path="model", registered_model_name=MODEL_NAME, signature=signature ) print(f"[OK] Trained baseline RF. AUC={auc:.4f} F1={f1:.4f} ACC={acc:.4f} (run_id={run.info.run_id})") if __name__ == "__main__": main()

Exécutez votre script train_baseline.py dans le conteneur prefect sur month_000 avec
docker compose exec -e TRAIN_AS_OF=_________ prefect \ python /opt/prefect/flows/train_baseline.py
Vérifiez qu’il se termine avec un message [OK] et un run_id.
Le script doit pouvoir accéder à Postgres, Feast, et MLflow via le réseau Docker Compose. Assurez-vous que la variable MLFLOW_TRACKING_URI pointe vers http://mlflow:5000 côté conteneur.

Dans votre rapport reports/rapport_tp4.md, indiquez :
  1. la valeur de AS_OF utilisée,
  2. le nombre de lignes de votre dataset d’entraînement (après merge),
  3. les colonnes catégorielles détectées (cat_cols),
  4. les trois métriques calculées (AUC, F1, ACC) et le temps d’entraînement.

Toujours dans le rapport, expliquez en 5–8 lignes pourquoi on fixe :
  • AS_OF et
  • random_state,
dans un pipeline MLOps orienté reproductibilité.

Explorer l’interface MLflow et promouvoir un modèle

Dans cet exercice, vous n’écrivez pas de code. L’objectif est de comprendre comment MLflow structure l’information liée à un entraînement (runs, métriques, artefacts) et comment le Model Registry permet de piloter le cycle de vie d’un modèle via des stages.

Ouvrez l’interface graphique de MLflow et identifiez l’expérience associée à votre entraînement (MLFLOW_EXPERIMENT, par défaut streamflow).
L’UI MLflow est accessible sur http://localhost:5000. Si aucune expérience n’apparaît, vérifiez que votre script s’est bien exécuté et que MLFLOW_TRACKING_URI est correctement configurée.

Sélectionnez le run correspondant à votre exécution de train_baseline.py et explorez : les paramètres, les métriques et les artefacts associés.

Accédez au Model Registry dans l’UI MLflow et localisez le modèle streamflow_churn qui a été enregistré par votre script.
Le Model Registry est accessible via l’onglet “Models”. Chaque version correspond à un run ayant enregistré un modèle.

Depuis l’interface MLflow, promouvez la dernière version du modèle streamflow_churn vers le stage Production. Par simplicité, utiliser l'ancien interface ("New model registry UI" sur off sur la page de la version) et modifié le Stage.

Vérifiez dans le Model Registry qu’une (et une seule) version du modèle est désormais en Production, et notez son numéro de version.

Dans votre rapport reports/rapport_tp4.md, incluez :
  • une capture de l’UI MLflow montrant le run (métriques + artefacts),
  • une capture du Model Registry avec le modèle en Production,
  • le numéro de version promu.

Expliquez en 5–8 lignes pourquoi la promotion via une interface (stages None, Staging, Production) est préférable à un déploiement manuel basé sur des fichiers ou des chemins locaux.

Étendre l’API pour exposer /predict (serving minimal end-to-end)

Objectif : passer d’un endpoint “features” à un vrai endpoint de prédiction. On garde volontairement l’API simple :
  • récupérer les features online déjà matérialisées,
  • construire un DataFrame 1 ligne,
  • charger le modèle MLflow en Production,
  • retourner une prédiction (et optionnellement une probabilité).

Modifiez les requirements.txt de l'API par :
fastapi cloudpickle==3.0.0 uvicorn[standard]==0.30.6 pydantic scikit-learn==1.7.2 mlflow==2.16.0 feast==0.56.0 pandas==2.3.3 prometheus-client==0.23.1 psycopg2-binary==2.9.11 psycopg-binary==3.2.12 psycopg-pool==3.2.7 psycopg==3.2.12 structlog==24.4.0 python-json-logger==2.0.7

Ajoutez MLFLOW_TRACKING_URI=http://mlflow:5000 dans .env.

Modifiez api/app.py pour ajouter un endpoint POST /predict. Vous devez aussi charger le Feature Store et le modèle MLflow “Production”. Contrainte importante : l’API s’exécute dans Docker. Dans le conteneur, l’URI MLflow n’est pas http://localhost:5000 mais http://mlflow:5000 (nom du service Docker).
Ci-dessous, un squelette minimal basé sur votre api/app.py actuel. Complétez uniquement les zones _______. Le reste est fourni.
from fastapi import FastAPI from pydantic import BaseModel from feast import FeatureStore import mlflow.pyfunc import pandas as pd import os app = FastAPI(title="StreamFlow Churn Prediction API") # --- Config --- REPO_PATH = "/repo" # TODO 1: complétez avec le nom de votre modèle MODEL_URI = "models:/__________/Production" try: store = FeatureStore(repo_path=REPO_PATH) model = mlflow.pyfunc.load_model(MODEL_URI) except Exception as e: print(f"Warning: init failed: {e}") store = None model = None class UserPayload(BaseModel): user_id: str @app.get("/health") def health(): return {"status": "ok"} # TODO 2: Mettre une requête POST @app._______("/predict") def predict(payload: UserPayload): if store is None or model is None: return {"error": "Model or feature store not initialized"} # TODO (optionel) à adapter si besoin features_request = [ "subs_profile_fv:months_active", "subs_profile_fv:monthly_fee", "subs_profile_fv:paperless_billing", "subs_profile_fv:plan_stream_tv", "subs_profile_fv:plan_stream_movies", "subs_profile_fv:net_service", "usage_agg_30d_fv:watch_hours_30d", "usage_agg_30d_fv:avg_session_mins_7d", "usage_agg_30d_fv:unique_devices_30d", "usage_agg_30d_fv:skips_7d", "usage_agg_30d_fv:rebuffer_events_7d", "payments_agg_90d_fv:failed_payments_90d", "support_agg_90d_fv:support_tickets_90d", "support_agg_90d_fv:ticket_avg_resolution_hrs_90d", ] # TODO 3 : Récupérer les features online feature_dict = store.__________________( features=features_request, entity_rows=[{"user_id": payload.user_id}], ).to_dict() X = pd.DataFrame({k: [v[0]] for k, v in feature_dict.items()}) # Gestion des features manquantes if X.isnull().any().any(): missing = X.columns[X.isnull().any()].tolist() return { "error": f"Missing features for user_id={payload.user_id}", "missing_features": missing, } # Nettoyage minimal (évite bugs de types) X = X.drop(columns=["user_id"], errors="ignore") # TODO 4: appeler le modèle et produire la réponse JSON (prediction + proba optionnelle) # Astuce : la plupart des modèles MLflow “pyfunc” utilisent model.predict(X) # (on ne suppose pas predict_proba ici) y_pred = _______ # TODO 5 : Retourner la prédiction return { "user_id": payload.user_id, "prediction": int(_________), "features_used": X.to_dict(orient="records")[0], }

Redémarrez uniquement le service api (pas toute la stack) en regénérant l'image à partir du Dockerfile de l'API si nécessaire, puis vérifiez que l’API est accessible.

Testez POST /predict avec un user_id valide via Swagger UI ou via curl.
Swagger UI est accessible sur http://localhost:8000/docs. Pour curl, pensez au header Content-Type: application/json.

Dans votre rapport reports/rapport_tp4.md, incluez :
  • une requête réussie (capture Swagger ou commande curl),
  • la réponse JSON obtenue.

Dans votre rapport, expliquez en 5–8 lignes pourquoi le modèle chargé par l’API doit pointer vers models:/streamflow_churn/Production et pas vers un fichier local (.pkl) ou un artifact de run.

Robustesse du serving : cas d’échec réalistes (sans monitoring)

Objectif : confronter l’API à des cas d’échec réalistes et mettre en place des garde-fous minimaux, sans introduire de monitoring (drift, alerting, dashboards, etc.).
En production, beaucoup de pannes “modèle” sont en réalité des pannes “features” (entités absentes, online store incomplet, valeurs nulles, types inattendus).

Testez POST /predict avec un user_id dont vous savez qu’il existe. Conservez la commande (ou capture Swagger) et la réponse JSON.

Testez POST /predict avec un user_id qui a de fortes chances de ne pas exister dans l’online store (ex : 999999). Observez le comportement actuel de votre API.
En pratique, un user_id absent peut produire :
  • un dictionnaire de features avec des valeurs null,
  • ou une réponse incohérente si l’on ne contrôle pas les valeurs manquantes.

Dans votre rapport reports/rapport_tp4.md, fournissez :
  • un exemple de requête qui réussit (commande/capture) + la réponse JSON,
  • un exemple de requête qui échoue (commande/capture) + la réponse JSON d’erreur (avec missing_features),
  • un court paragraphe “ce qui peut mal tourner en serving et comment on le détecte tôt”. Dans ce paragraphe, discutez au minimum ces deux causes (sans refaire Feast) :
    • Entité absente : le user_id demandé n’est pas présent dans l’online store.
    • Online store incomplet / obsolète : la matérialisation est manquante ou n’est pas à jour (stale), ce qui se traduit par des valeurs manquantes côté API.
i

Réflexion de synthèse (ingénierie MLOps)

Cet exercice est volontairement court et sans code. L’objectif est de prendre du recul et de relier toutes les briques manipulées dans ce TP : données, features, entraînement, tracking, registry et serving.
À ce stade, on ne traite pas encore le monitoring, le drift ou le retraining automatique.
Dans votre rapport reports/rapport_tp4.md, répondez aux questions suivantes.
Attendu : des réponses concrètes et techniques, pas des définitions génériques. Appuyez-vous sur ce que vous avez réellement mis en place dans ce TP.

Expliquez ce que MLflow garantit dans cette pipeline :
  • au niveau de la traçabilité des entraînements,
  • au niveau de l’identification des modèles servis.

Expliquez ce que signifie concrètement le stage Production pour l’API :
  • comment le modèle est sélectionné au démarrage,
  • ce que cela permet (ou empêche) côté déploiement.

Identifiez au moins trois points où la reproductibilité peut encore casser dans ce système, même avec MLflow (exemples : données, code, configuration, environnement).
Vous pouvez structurer votre réponse en courts paragraphes ou en liste, tant que le total reste dans une limite raisonnable (10–15 lignes).