CI4 : Entraînement end-to-end : MLflow Registry → API de prédiction<
Dans ce TP, vous allez compléter une chaîne MLOps minimale “de bout en bout” : à partir de features déjà disponibles (et déjà matérialisées en ligne), vous construisez un dataset d’entraînement, entraînez un modèle, tracez l’exécution dans MLflow, publiez une version dans le Model Registry, puis servez le modèle via une API FastAPI. L’objectif est de manipuler un flux réaliste et reproductible, centré sur la traçabilité et le déploiement contrôlé.
- Relier un dataset d’entraînement (point-in-time correct) à un entraînement reproductible.
- Journaliser une exécution dans MLflow (paramètres, métriques, artefacts) et enregistrer un modèle.
- Promouvoir une version de modèle en Production via l’interface MLflow.
- Étendre une API FastAPI pour exposer un endpoint /predict s’appuyant sur le modèle Production.
- Mettre en place des sanity checks simples côté serving (features manquantes, erreurs contrôlées).
- Documenter les étapes et résultats dans reports/rapport_tp4.md (commandes, extraits, captures, réflexion).
Mise en route + rappel de contexte (sanity checks + où on en est dans la pipeline)
Nous reprenons exactement là où vous vous êtes arrêtés au TP3 : les snapshots existent déjà, le repo Feast est déjà en place,
et l’online store est déjà matérialisé. Dans ce TP, on ajoute MLflow (tracking + registry) et on commence le “end-to-end”
jusqu’à une API de prédiction.
Démarrez la stack Docker Compose et vérifiez que les conteneurs principaux démarrent sans erreur.
Ajoutez le service MLflow dans docker-compose.yml, puis redémarrez la stack.
Pensez à redémarrer la stack.
mlflow:
image: ghcr.io/mlflow/mlflow:v2.16.0
command: mlflow server --backend-store-uri sqlite:///mlartifacts/mlflow.db --default-artifact-root mlflow-artifacts:/ --host 0.0.0.0 --port 5000 --serve-artifacts --artifacts-destination /mlartifacts/artifacts
volumes:
- "./mlartifacts:/mlartifacts"
ports: ["5000:5000"]
volumes:
pgdata:
mlartifacts:
Vérifiez l’accessibilité des interfaces et endpoints suivants :
MLflow UI (localhost:5000), API /health (localhost:8000).
Faites un smoke check : vérifiez que la récupération de features online fonctionne toujours via l’endpoint existant
/features/{user_id}.
Dans votre rapport reports/rapport_tp4.md, listez :
- les commandes utilisées,
- une preuve que chaque service est accessible (captures ou sortie terminal),
- un court paragraphe : “quels composants tournent et pourquoi”.
Créer un script d’entraînement + tracking MLflow (baseline RandomForest)
Dans cet exercice, vous allez compléter un script train_baseline.py qui :
L’utilisation de l’interface MLflow (promotion en Production, etc.) sera traitée dans l’exercice suivant.
- construit un dataset d’entraînement via des features déjà disponibles,
- entraîne un modèle baseline,
- trace l’exécution dans MLflow (params, métriques, artefacts),
- enregistre le modèle dans le Model Registry.
L’utilisation de l’interface MLflow (promotion en Production, etc.) sera traitée dans l’exercice suivant.
Créez le fichier services/prefect/train_baseline.py et copiez le squelette ci-dessous. Complétez ensuite les zones
marquées par _______ (ce sont vos TODO).
Gardez le script exécutable “as-is” dans le conteneur prefect. Le but est d’avoir un entraînement déterministe
et traçable (mêmes entrées → mêmes sorties).
import os
import time
import warnings
warnings.filterwarnings("ignore")
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
from feast import FeatureStore
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, roc_auc_score, accuracy_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder
from sklearn.pipeline import Pipeline
import mlflow
import mlflow.sklearn
from mlflow.models import ModelSignature
from mlflow.types.schema import Schema, ColSpec
# --------------------
# Config
# --------------------
FEAST_REPO = "/repo"
MODEL_NAME = "streamflow_churn"
AS_OF = os.environ.get("TRAIN_AS_OF", "2024-01-31")
PG_USER = os.environ.get("POSTGRES_USER", "streamflow")
PG_PWD = os.environ.get("POSTGRES_PASSWORD", "streamflow")
PG_DB = os.environ.get("POSTGRES_DB", "streamflow")
PG_HOST = os.environ.get("POSTGRES_HOST", "postgres")
PG_PORT = int(os.environ.get("POSTGRES_PORT", "5432"))
MLFLOW_TRACKING_URI = os.environ.get("MLFLOW_TRACKING_URI", "http://mlflow:5000")
MLFLOW_EXPERIMENT = os.environ.get("MLFLOW_EXPERIMENT", "streamflow")
# --------------------
# Helpers
# --------------------
def get_sql_engine():
uri = f"postgresql+psycopg2://{PG_USER}:{PG_PWD}@{PG_HOST}:{PG_PORT}/{PG_DB}"
return create_engine(uri)
def fetch_entity_df(engine, as_of):
q = """
SELECT user_id, as_of
FROM subscriptions_profile_snapshots
WHERE as_of = %(as_of)s
"""
df = pd.read_sql(q, engine, params={"as_of": as_of})
if df.empty:
raise RuntimeError(f"No snapshot rows found at as_of={as_of}")
df = df.rename(columns={"as_of": "event_timestamp"})
df["event_timestamp"] = pd.to_datetime(df["event_timestamp"])
return df[["user_id", "event_timestamp"]]
def fetch_labels(engine, as_of):
try:
q = """
SELECT user_id, period_start, churn_label
FROM labels
WHERE period_start = %(as_of)s
"""
labels = pd.read_sql(q, engine, params={"as_of": as_of})
if not labels.empty:
labels = labels.rename(columns={"period_start": "event_timestamp"})
return labels[["user_id", "event_timestamp", "churn_label"]]
except Exception:
pass
q2 = "SELECT user_id, churn_label FROM labels"
labels = pd.read_sql(q2, engine)
if labels.empty:
raise RuntimeError("Labels table is empty.")
labels["event_timestamp"] = pd.to_datetime(AS_OF)
return labels[["user_id", "event_timestamp", "churn_label"]]
def build_training_set(store, entity_df, features):
hf = store.get_historical_features(
entity_df=entity_df,
features=features,
)
return hf.to_df()
def prep_xy(df, label_col="churn_label"):
y = df[label_col].astype(int).values
X = df.drop(columns=[label_col, "user_id", "event_timestamp"], errors="ignore")
return X, y
# --------------------
# Main
# --------------------
def main():
# TODO 1: configurer MLflow (tracking URI + experiment) en écrivant les bonnes constantes
mlflow.set_tracking_uri(____________)
mlflow.set_experiment(____________)
engine = get_sql_engine()
entity_df = fetch_entity_df(engine, AS_OF)
labels = fetch_labels(engine, AS_OF)
# TODO 2: définir la liste des features Feast à récupérer (liste de strings). Voir lab précédent
# pour le nom des features
features = [
_______,
_______,
_______,
_______,
_______,
_______,
_______,
_______,
_______,
_______,
_______,
_______,
_______,
_______,
]
store = FeatureStore(repo_path=FEAST_REPO)
feat_df = build_training_set(store, entity_df, features)
# TODO 3: fusionner features + labels avec une jointure sur (user_id, event_timestamp)
# Inspirez-vous du TP précédent
df = _______
if df.empty:
raise RuntimeError("Training set is empty after merge. Check AS_OF and labels.")
# Feature engineering minimal
cat_cols = [c for c in df.columns if df[c].dtype == "object" and c not in ["user_id", "event_timestamp"]]
num_cols = [c for c in df.columns if c not in cat_cols + ["user_id", "event_timestamp", "churn_label"]]
X, y = prep_xy(df)
# TODO 4: construire le préprocessing (OneHot sur cat + passthrough sur num)
preproc = ColumnTransformer(
transformers=[
("cat", _______, cat_cols),
("num", _______, num_cols),
],
remainder="drop"
)
# TODO 5: définir le modèle RandomForest (avec un random_state fixé de manière arbitraire)
clf = RandomForestClassifier(
n_estimators=300,
n_jobs=-1,
random_state=_______,
class_weight="balanced",
max_features="sqrt",
)
# TODO 6: Finissez de définir la pipeline
pipe = Pipeline(steps=[("prep", ________), ("clf", _________)])
X_train, X_val, y_train, y_val = train_test_split(
X, y, test_size=0.25, random_state=42, stratify=y
)
# TODO 7: démarrer un run MLflow, entraîner, calculer métriques, logger (params + metrics)
# Cherchez la fonction permettant de démarrer un run
with mlflow._________(run_name=f"rf_baseline_{AS_OF}") as run:
start = time.time()
pipe.fit(X_train, y_train)
train_time = time.time() - start
if hasattr(pipe, "predict_proba"):
y_val_proba = pipe.predict_proba(X_val)[:, 1]
auc = roc_auc_score(y_val, y_val_proba)
else:
auc = float("nan")
y_val_pred = pipe.predict(X_val)
f1 = f1_score(y_val, y_val_pred)
acc = accuracy_score(y_val, y_val_pred)
# TODO7.1 log params + metrics
# Loggez le type de modèle, AUC, F1, ACC (sur val) et le train time
# de la même manière que la ligne suivante :
mlflow.log_param("as_of", AS_OF)
_______
_______
_______
_______
_______
_______
# TODO 7: logger un artefact JSON qui décrit cat_cols et num_cols
mlflow.log_dict(
{"categorical_cols": ________,
"numeric_cols": ________},
"feature_schema.json"
)
# TODO 8: créer une signature MLflow (inputs + outputs) puis enregistrer le modèle dans le Registry
# À adapter avec vos features
input_schema = Schema(
[
ColSpec("long", "months_active"),
ColSpec("double", "monthly_fee"),
ColSpec("boolean", "paperless_billing"),
ColSpec("boolean", "plan_stream_tv"),
ColSpec("boolean", "plan_stream_movies"),
ColSpec("string", "net_service"),
ColSpec("double", "watch_hours_30d"),
ColSpec("double", "avg_session_mins_7d"),
ColSpec("long", "unique_devices_30d"),
ColSpec("long", "skips_7d"),
ColSpec("long", "rebuffer_events_7d"),
ColSpec("long", "failed_payments_90d"),
ColSpec("long", "support_tickets_90d"),
ColSpec("double", "ticket_avg_resolution_hrs_90d"),
]
)
output_schema = Schema([ColSpec("long", "prediction")])
signature = ModelSignature(inputs=_________, outputs=__________)
mlflow.sklearn.log_model(
sk_model=_______, # TODO 9 : faut-il mettre pipe ou clf ? Expliquez pourquoi dans le rapport
artifact_path="model",
registered_model_name=MODEL_NAME,
signature=signature
)
print(f"[OK] Trained baseline RF. AUC={auc:.4f} F1={f1:.4f} ACC={acc:.4f} (run_id={run.info.run_id})")
if __name__ == "__main__":
main()
Exécutez votre script train_baseline.py dans le conteneur prefect sur month_000 avec
Vérifiez qu’il se termine avec un message [OK] et un run_id.
docker compose exec -e TRAIN_AS_OF=_________ prefect \
python /opt/prefect/flows/train_baseline.py
Dans votre rapport reports/rapport_tp4.md, indiquez :
- la valeur de AS_OF utilisée,
- le nombre de lignes de votre dataset d’entraînement (après merge),
- les colonnes catégorielles détectées (cat_cols),
- les trois métriques calculées (AUC, F1, ACC) et le temps d’entraînement.
Toujours dans le rapport, expliquez en 5–8 lignes pourquoi on fixe :
- AS_OF et
- random_state,
Explorer l’interface MLflow et promouvoir un modèle
Dans cet exercice, vous n’écrivez pas de code. L’objectif est de comprendre comment MLflow structure
l’information liée à un entraînement (runs, métriques, artefacts) et comment le Model Registry permet
de piloter le cycle de vie d’un modèle via des stages.
Ouvrez l’interface graphique de MLflow et identifiez l’expérience associée à votre entraînement
(MLFLOW_EXPERIMENT, par défaut streamflow).
Sélectionnez le run correspondant à votre exécution de train_baseline.py et explorez :
les paramètres, les métriques et les artefacts associés.
Accédez au Model Registry dans l’UI MLflow et localisez le modèle
streamflow_churn qui a été enregistré par votre script.
Depuis l’interface MLflow, promouvez la dernière version du modèle
streamflow_churn vers le stage Production.
Par simplicité, utiliser l'ancien interface ("New model registry UI" sur off sur
la page de la version) et modifié le Stage.
Vérifiez dans le Model Registry qu’une (et une seule) version du modèle est désormais en
Production, et notez son numéro de version.
Dans votre rapport reports/rapport_tp4.md, incluez :
- une capture de l’UI MLflow montrant le run (métriques + artefacts),
- une capture du Model Registry avec le modèle en Production,
- le numéro de version promu.
Expliquez en 5–8 lignes pourquoi la promotion via une interface (stages None, Staging,
Production) est préférable à un déploiement manuel basé sur des fichiers ou des chemins locaux.
Étendre l’API pour exposer /predict (serving minimal end-to-end)
Objectif : passer d’un endpoint “features” à un vrai endpoint de prédiction. On garde volontairement l’API simple :
- récupérer les features online déjà matérialisées,
- construire un DataFrame 1 ligne,
- charger le modèle MLflow en Production,
- retourner une prédiction (et optionnellement une probabilité).
Modifiez les requirements.txt de l'API par :
fastapi
cloudpickle==3.0.0
uvicorn[standard]==0.30.6
pydantic
scikit-learn==1.7.2
mlflow==2.16.0
feast==0.56.0
pandas==2.3.3
prometheus-client==0.23.1
psycopg2-binary==2.9.11
psycopg-binary==3.2.12
psycopg-pool==3.2.7
psycopg==3.2.12
structlog==24.4.0
python-json-logger==2.0.7
Ajoutez MLFLOW_TRACKING_URI=http://mlflow:5000 dans .env.
Modifiez api/app.py pour ajouter un endpoint POST /predict.
Vous devez aussi charger le Feature Store et le modèle MLflow “Production”.
Contrainte importante : l’API s’exécute dans Docker. Dans le conteneur, l’URI MLflow n’est pas
http://localhost:5000 mais http://mlflow:5000 (nom du service Docker).
Ci-dessous, un squelette minimal basé sur votre api/app.py actuel. Complétez uniquement les zones
_______. Le reste est fourni.
from fastapi import FastAPI
from pydantic import BaseModel
from feast import FeatureStore
import mlflow.pyfunc
import pandas as pd
import os
app = FastAPI(title="StreamFlow Churn Prediction API")
# --- Config ---
REPO_PATH = "/repo"
# TODO 1: complétez avec le nom de votre modèle
MODEL_URI = "models:/__________/Production"
try:
store = FeatureStore(repo_path=REPO_PATH)
model = mlflow.pyfunc.load_model(MODEL_URI)
except Exception as e:
print(f"Warning: init failed: {e}")
store = None
model = None
class UserPayload(BaseModel):
user_id: str
@app.get("/health")
def health():
return {"status": "ok"}
# TODO 2: Mettre une requête POST
@app._______("/predict")
def predict(payload: UserPayload):
if store is None or model is None:
return {"error": "Model or feature store not initialized"}
# TODO (optionel) à adapter si besoin
features_request = [
"subs_profile_fv:months_active",
"subs_profile_fv:monthly_fee",
"subs_profile_fv:paperless_billing",
"subs_profile_fv:plan_stream_tv",
"subs_profile_fv:plan_stream_movies",
"subs_profile_fv:net_service",
"usage_agg_30d_fv:watch_hours_30d",
"usage_agg_30d_fv:avg_session_mins_7d",
"usage_agg_30d_fv:unique_devices_30d",
"usage_agg_30d_fv:skips_7d",
"usage_agg_30d_fv:rebuffer_events_7d",
"payments_agg_90d_fv:failed_payments_90d",
"support_agg_90d_fv:support_tickets_90d",
"support_agg_90d_fv:ticket_avg_resolution_hrs_90d",
]
# TODO 3 : Récupérer les features online
feature_dict = store.__________________(
features=features_request,
entity_rows=[{"user_id": payload.user_id}],
).to_dict()
X = pd.DataFrame({k: [v[0]] for k, v in feature_dict.items()})
# Gestion des features manquantes
if X.isnull().any().any():
missing = X.columns[X.isnull().any()].tolist()
return {
"error": f"Missing features for user_id={payload.user_id}",
"missing_features": missing,
}
# Nettoyage minimal (évite bugs de types)
X = X.drop(columns=["user_id"], errors="ignore")
# TODO 4: appeler le modèle et produire la réponse JSON (prediction + proba optionnelle)
# Astuce : la plupart des modèles MLflow “pyfunc” utilisent model.predict(X)
# (on ne suppose pas predict_proba ici)
y_pred = _______
# TODO 5 : Retourner la prédiction
return {
"user_id": payload.user_id,
"prediction": int(_________),
"features_used": X.to_dict(orient="records")[0],
}
Redémarrez uniquement le service api (pas toute la stack) en regénérant l'image à partir du Dockerfile de l'API si nécessaire, puis vérifiez que l’API est accessible.
Testez POST /predict avec un user_id valide via Swagger UI ou via curl.
Dans votre rapport reports/rapport_tp4.md, incluez :
- une requête réussie (capture Swagger ou commande curl),
- la réponse JSON obtenue.
Dans votre rapport, expliquez en 5–8 lignes pourquoi le modèle chargé par l’API doit pointer vers
models:/streamflow_churn/Production et pas vers un fichier local (.pkl) ou un artifact de run.
Robustesse du serving : cas d’échec réalistes (sans monitoring)
Objectif : confronter l’API à des cas d’échec réalistes et mettre en place des garde-fous minimaux,
sans introduire de monitoring (drift, alerting, dashboards, etc.).
En production, beaucoup de pannes “modèle” sont en réalité des pannes “features” (entités absentes, online store incomplet, valeurs nulles, types inattendus).
En production, beaucoup de pannes “modèle” sont en réalité des pannes “features” (entités absentes, online store incomplet, valeurs nulles, types inattendus).
Testez POST /predict avec un user_id dont vous savez qu’il existe.
Conservez la commande (ou capture Swagger) et la réponse JSON.
Testez POST /predict avec un user_id qui a de fortes chances de ne pas exister
dans l’online store (ex : 999999). Observez le comportement actuel de votre API.
En pratique, un user_id absent peut produire :
- un dictionnaire de features avec des valeurs null,
- ou une réponse incohérente si l’on ne contrôle pas les valeurs manquantes.
Dans votre rapport reports/rapport_tp4.md, fournissez :
- un exemple de requête qui réussit (commande/capture) + la réponse JSON,
- un exemple de requête qui échoue (commande/capture) + la réponse JSON d’erreur (avec missing_features),
- un court paragraphe “ce qui peut mal tourner en serving et comment on le détecte tôt”. Dans ce paragraphe, discutez au minimum ces deux causes (sans refaire Feast) :
- Entité absente : le user_id demandé n’est pas présent dans l’online store.
- Online store incomplet / obsolète : la matérialisation est manquante ou n’est pas à jour (stale), ce qui se traduit par des valeurs manquantes côté API.
Réflexion de synthèse (ingénierie MLOps)
Cet exercice est volontairement court et sans code. L’objectif est de prendre du recul et de relier
toutes les briques manipulées dans ce TP : données, features, entraînement, tracking, registry et serving.
À ce stade, on ne traite pas encore le monitoring, le drift ou le retraining automatique.
Dans votre rapport reports/rapport_tp4.md, répondez aux questions suivantes.
Attendu : des réponses concrètes et techniques, pas des définitions génériques.
Appuyez-vous sur ce que vous avez réellement mis en place dans ce TP.
À ce stade, on ne traite pas encore le monitoring, le drift ou le retraining automatique.
Dans votre rapport reports/rapport_tp4.md, répondez aux questions suivantes.
Expliquez ce que MLflow garantit dans cette pipeline :
- au niveau de la traçabilité des entraînements,
- au niveau de l’identification des modèles servis.
Expliquez ce que signifie concrètement le stage Production pour l’API :
- comment le modèle est sélectionné au démarrage,
- ce que cela permet (ou empêche) côté déploiement.
Identifiez au moins trois points où la reproductibilité peut encore casser dans ce système,
même avec MLflow (exemples : données, code, configuration, environnement).
Vous pouvez structurer votre réponse en courts paragraphes ou en liste, tant que le total reste
dans une limite raisonnable (10–15 lignes).