CI3 : Introduction à Feast et au Feature Store pour StreamFlow
Dans ce TP, vous allez connecter le pipeline de données existant (ingestion + snapshots) à un Feature Store (Feast) pour préparer l’entraînement futur d’un modèle de churn et commencer à exposer les features en production. Vous allez :
- Ajouter un service Feast à l’architecture Docker existante.
- Définir l’Entity principale (user), les DataSources PostgreSQL et les FeatureViews.
- Appliquer la configuration Feast et vérifier la création du registre (registry.db).
- Réaliser une première récupération offline de features pour construire un jeu de données d’entraînement (training_df.csv).
- Matérialiser les features dans le Online Store et tester une récupération online pour un utilisateur.
- Intégrer un endpoint FastAPI minimal qui interroge Feast pour récupérer les features d’un utilisateur.
- Documenter la démarche et une courte réflexion dans reports/rapport_tp3.md.
Setup initial, création du rapport et balisage Git
Avant de commencer le TP3, créez un tag Git afin de conserver une version propre de votre dépôt correspondant à la fin du TP2.
Si cela n’a pas encore été fait, exécutez :
Cela permettra de revenir facilement à un état stable si nécessaire, et de comparer l’évolution du code entre les TP.
git tag -a tp2 -m "Fin du TP2"
git push origin tp2
Créez le fichier de rapport du TP3 : reports/rapport_tp3.md.
Ce fichier accompagnera toutes vos réponses, extraits de commandes, schémas, et commentaires écrits.
Copiez-y les sections suivantes, qui guideront votre rédaction :
# Contexte
# Mise en place de Feast
# Définition du Feature Store
# Récupération offline & online
# Réflexion
Dans la section # Contexte de votre rapport, écrivez un court paragraphe expliquant ou décrivant les points suivants :
- Les données dont vous disposez déjà (snapshots mensuels pour deux périodes, tables utilisateurs, usage, abonnements, paiements, support…).
- L’objectif du TP3 : brancher ces données au Feature Store Feast, récupérer des features en mode offline et online, et exposer un endpoint API simple utilisant ces features.
Ajout de Feast à l’architecture Docker
Nous allons d’abord préparer le service Feast côté code, puis l’ajouter à la composition Docker.
- Créez l’arborescence suivante (si ce n’est pas déjà fait) :
services/ feast_repo/ Dockerfile requirements.txt repo/ feature_store.yaml entities.py # (seront remplis dans l’exercice suivant) data_sources.py feature_views.py __init__.py
- Dans services/feast_repo/Dockerfile, copiez le contenu suivant, qui prépare un conteneur Python minimal pour exécuter Feast :
FROM python:3.11-slim WORKDIR /repo RUN apt-get update && \ apt-get install -y --no-install-recommends build-essential libpq-dev && \ rm -rf /var/lib/apt/lists/* COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # On garde le conteneur "vivant" pour pouvoir exécuter feast via docker compose exec CMD ["bash", "-lc", "tail -f /dev/null"]
- Dans services/feast_repo/requirements.txt, ajoutez les dépendances minimales pour Feast :
feast==0.56.0 pandas==2.3.3 psycopg2-binary==2.9.11 SQLAlchemy==2.0.36 psycopg==3.2.12 psycopg-pool==3.2.7
- Dans services/feast_repo/repo/feature_store.yaml, définissez la configuration minimale du Feature Store pour utiliser PostgreSQL en offline et online store :
project: streamflow provider: local registry: registry.db offline_store: type: postgres host: postgres port: 5432 database: streamflow db_schema: public user: streamflow password: streamflow online_store: type: postgres host: postgres port: 5432 database: streamflow db_schema: public user: streamflow password: streamflow entity_key_serialization_version: 2
Modifiez maintenant votre docker-compose.yml pour ajouter un service feast.
Dans le bloc services:, ajoutez un service en complétant les champs marqués # TODO ci-dessous :
Assurez-vous que :
services:
postgres:
image: postgres:16
env_file: .env
volumes:
- ./db/init:/docker-entrypoint-initdb.d
- pgdata:/var/lib/postgresql/data
ports:
- "5432:5432"
prefect:
build: ./services/prefect
depends_on:
- postgres
env_file: .env
environment:
PREFECT_API_URL: http://0.0.0.0:4200/api
PREFECT_UI_URL: http://0.0.0.0:4200
PREFECT_LOGGING_LEVEL: INFO
POSTGRES_HOST: postgres
POSTGRES_PORT: 5432
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_DB: ${POSTGRES_DB}
volumes:
- ./services/prefect:/opt/prefect/flows
- ./data:/data:ro
feast:
build: ______________ # TODO: donnez le chemin de build
depends_on:
- postgres
environment:
FEAST_USAGE: "False"
volumes:
- ______________________ # TODO: monter le dossier ./services/feast_repo/repo dans /repo
volumes:
pgdata:
- le service feast construit l’image à partir de ./services/feast_repo ;
- le volume ./services/feast_repo/repo est monté sur /repo à l’intérieur du conteneur.
Construisez les images et démarrez les services en arrière-plan :
docker compose up -d --build
- Vérifiez que le conteneur feast est bien démarré à l’aide de :
docker compose ps
- Si le conteneur ne démarre pas, consultez ses logs :
docker compose logs feast
Dans la section # Mise en place de Feast de votre reports/rapport_tp3.md :
- Collez la commande exacte que vous avez utilisée pour démarrer les services.
- Écrivez 2–3 lignes expliquant le rôle du conteneur feast :
- où se trouve la configuration du Feature Store dans le conteneur ;
- comment vous allez l’utiliser (via docker compose exec feast ... pour lancer feast apply et feast materialize).
docker compose exec feast ls -R /repo
Définition de l’Entity, des DataSources et des FeatureViews (Feast)
Définition de l’Entity user
L’Entity est la manière dont Feast identifie les entités métier pour lesquelles les features sont définies.
Dans notre cas, l’entité centrale est l’utilisateur (client StreamFlow), identifié par user_id.
- Ouvrez le fichier services/feast_repo/repo/entities.py. Copiez-y le squelette:
from feast import Entity # TODO: définir l'entité principale "user" user = Entity( name=..., # TODO join_keys=[...], # TODO description=..., # TODO (en français) )
- Complétez les champs:
- name : nom logique de l’entité ;
- join_keys : liste de colonnes utilisées pour relier les features ;
- description : courte description en français du rôle de cette entité.
- Dans votre rapport (# Définition du Feature Store), ajoutez un court paragraphe expliquant :
- ce qu’est une Entity dans Feast ;
- pourquoi user_id est un bon choix de clé de jointure pour StreamFlow.
Définition des DataSources PostgreSQL pour les snapshots
Dans le TP2, vous avez construit des tables de snapshots mensuels dans Postgres :
- subscriptions_profile_snapshots
- usage_agg_30d_snapshots
- payments_agg_90d_snapshots
- support_agg_90d_snapshots
- user_id
- as_of (date du snapshot)
- quelques colonnes de features (par ex. months_active, watch_hours_30d, etc.)
- Ouvrez services/feast_repo/repo/data_sources.py. Copiez-y le squelette :
from feast.infra.offline_stores.contrib.postgres_offline_store.postgres_source import PostgreSQLSource # TODO: source pour subscriptions_profile_snapshots subs_profile_source = PostgreSQLSource( name="subs_profile_source", query=""" SELECT user_id, as_of, -- TODO: colonnes de features FROM ... """, timestamp_field=..., # TODO ) # TODO: source pour usage_agg_30d_snapshots usage_agg_30d_source = PostgreSQLSource( name="usage_agg_30d_source", query=""" SELECT user_id, as_of, -- TODO: colonnes de features FROM ... """, timestamp_field=..., # TODO ) # TODO: source pour payments_agg_90d_snapshots payments_agg_90d_source = PostgreSQLSource( name="payments_agg_90d_source", query=""" SELECT user_id, as_of, -- TODO: colonnes de features FROM ... """, timestamp_field=..., # TODO ) # TODO: source pour support_agg_90d_snapshots support_agg_90d_source = PostgreSQLSource( name="support_agg_90d_source", query=""" SELECT user_id, as_of, -- TODO: colonnes de features FROM ... """, timestamp_field=..., # TODO )
-
Complétez pour chaque PostgreSQLSource :
- FROM ... avec le nom de la table snapshot correspondante ;
- la liste des colonnes de features (voir les schémas dans snapshot_month dans ingest_flow.py) ;
- timestamp_field avec la colonne servant de référence temporelle (as_of).
- Vérifiez que chaque requête sélectionne uniquement :
- user_id,
- as_of,
- les colonnes de features pertinentes.
- Dans votre rapport, section # Définition du Feature Store, indiquez :
- le nom d’une table de snapshot (par ex. usage_agg_30d_snapshots) ;
- 3–4 colonnes de features qu’elle contient.
Définition des FeatureViews
Les FeatureViews regroupent les features par entité et par source. Nous allons créer quatre vues :
- subs_profile_fv : profil d’abonnement ;
- usage_agg_30d_fv : usage de la plateforme ;
- payments_agg_90d_fv : paiements récents ;
- support_agg_90d_fv : interactions avec le support.
- Ouvrez services/feast_repo/repo/feature_views.py. Vous devriez voir un squelette proche de :
from feast import Field, FeatureView from feast.types import Float32, Int64, Bool, String from entities import user from data_sources import ( subs_profile_source, usage_agg_30d_source, payments_agg_90d_source, support_agg_90d_source, ) # TODO: FeatureView pour le profil d'abonnement subs_profile_fv = FeatureView( name="subs_profile_fv", entities=[user], ttl=None, schema=[ # TODO: compléter les Field(...) ], source=subs_profile_source, online=True, tags={"owner": "mlops-course"}, ) # TODO: FeatureView pour l'usage 30j usage_agg_30d_fv = FeatureView( name="usage_agg_30d_fv", entities=[user], ttl=None, schema=[ # TODO ], source=usage_agg_30d_source, online=True, tags={"owner": "mlops-course"}, ) # TODO: FeatureView pour les paiements 90j payments_agg_90d_fv = FeatureView( name="payments_agg_90d_fv", entities=[user], ttl=None, schema=[ # TODO ], source=payments_agg_90d_source, online=True, tags={"owner": "mlops-course"}, ) # TODO: FeatureView pour le support 90j support_agg_90d_fv = FeatureView( name="support_agg_90d_fv", entities=[user], ttl=None, schema=[ # TODO ], source=support_agg_90d_source, online=True, tags={"owner": "mlops-course"}, )
- Complétez la liste des Field(...) pour chaque FeatureView avec les colonnes suivantes :
- subs_profile_fv :
- months_active (Int64)
- monthly_fee (Float32)
- paperless_billing (Bool)
- plan_stream_tv (Bool)
- plan_stream_movies (Bool)
- net_service (String)
- usage_agg_30d_fv :
- watch_hours_30d (Float32)
- avg_session_mins_7d (Float32)
- unique_devices_30d (Int64)
- skips_7d (Int64)
- rebuffer_events_7d (Int64)
- payments_agg_90d_fv :
- failed_payments_90d (Int64)
- support_agg_90d_fv :
- support_tickets_90d (Int64)
- ticket_avg_resolution_hrs_90d (Float32)
- Une fois les FeatureViews complétées, exécutez dans le conteneur Feast :
docker compose exec feast feast apply
- Vérifiez que :
- la commande se termine sans erreur ;
- le fichier registry.db est apparu dans services/feast_repo/repo/.
- Dans votre rapport (# Définition du Feature Store), expliquez en 2–3 phrases à quoi sert feast apply.
Utilisation offline et online des features (Feast + API)
Récupération offline & création de training_df.csv
Dans cette partie, vous allez :
Vérifiez également que votre service prefect peut écrire dans /data à l’intérieur du conteneur.
Si nécessaire, adaptez le volume dans docker-compose.yml :
Redémarrez ensuite les services :
- construire un entity_df pointant vers les utilisateurs présents dans les snapshots à la date AS_OF = 2024-01-31 ;
- utiliser Feast pour récupérer les features correspondantes via get_historical_features ;
- joindre ces features avec les labels de churn ;
- sauvegarder le jeu de données final dans data/processed/training_df.csv.
mkdir -p data/processed
prefect:
build: ./services/prefect
depends_on:
- postgres
env_file: .env
environment:
...
volumes:
- ./services/prefect:/opt/prefect/flows
- ./data:/data # enlever :ro pour rendre le volume en écriture
- ./services/feast_repo/repo:/repo # Accès à Feast
docker compose up -d --build
Créez un nouveau script Python services/prefect/build_training_dataset.py.
Ce script doit :
- se connecter à PostgreSQL (nous réutilisons la logique de connexion de ingest_flow.py) ;
- construire un entity_df à partir de la table subscriptions_profile_snapshots à la date as_of = '2024-01-31' :
- colonnes : user_id, event_timestamp (dérivée de as_of) ;
- récupérer les labels depuis la table labels (schéma simple : user_id, churn_label) ;
- utiliser Feast (FeatureStore) pour faire un get_historical_features sur une liste de features, par exemple :
- subs_profile_fv:months_active,
- subs_profile_fv:monthly_fee,
- subs_profile_fv:paperless_billing,
- usage_agg_30d_fv:watch_hours_30d,
- usage_agg_30d_fv:avg_session_mins_7d,
- payments_agg_90d_fv:failed_payments_90d ;
- joindre les features avec les labels sur (user_id, event_timestamp) ;
- sauvegarder le résultat final dans /data/processed/training_df.csv.
import os
import pandas as pd
from sqlalchemy import create_engine
from feast import FeatureStore
AS_OF = "2024-01-31"
FEAST_REPO = "/repo"
def get_engine():
uri = (
f"postgresql+psycopg2://{os.getenv('POSTGRES_USER','streamflow')}:"
f"{os.getenv('POSTGRES_PASSWORD','streamflow')}@"
f"{os.getenv('POSTGRES_HOST','postgres')}:5432/"
f"{os.getenv('POSTGRES_DB','streamflow')}"
)
return create_engine(uri)
def build_entity_df(engine, as_of: str) -> pd.DataFrame:
q = """
SELECT user_id, as_of
FROM subscriptions_profile_snapshots
WHERE as_of = %(as_of)s
"""
df = pd.read_sql(q, engine, params={"as_of": as_of})
if df.empty:
raise RuntimeError(f"No snapshot rows found at as_of={as_of}")
df = df.rename(columns={"as_of": "event_timestamp"})
df["event_timestamp"] = pd.to_datetime(df["event_timestamp"])
return df[["user_id", "event_timestamp"]]
def fetch_labels(engine, as_of: str) -> pd.DataFrame:
# Version simple : table labels(user_id, churn_label)
q = "SELECT user_id, churn_label FROM labels"
labels = pd.read_sql(q, engine)
if labels.empty:
raise RuntimeError("Labels table is empty.")
labels["event_timestamp"] = pd.to_datetime(as_of)
return labels[["user_id", "event_timestamp", "churn_label"]]
def main():
engine = get_engine()
entity_df = build_entity_df(engine, AS_OF)
labels = fetch_labels(engine, AS_OF)
store = FeatureStore(repo_path=FEAST_REPO)
# TODO: définir la liste de features à récupérer
features = [
# "subs_profile_fv:months_active",
# ...
]
hf = store.get_historical_features(
entity_df=entity_df,
features=features,
).to_df()
# TODO: fusionner avec les labels
df = _____.merge(________, on=[__________, ___________], how="inner")
if df.empty:
raise RuntimeError("Training set is empty after merge. Check AS_OF and labels.")
os.makedirs("/data/processed", exist_ok=True)
df.to_csv("/data/processed/training_df.csv", index=False)
print(f"[OK] Wrote /data/processed/training_df.csv with {len(df)} rows")
if __name__ == "__main__":
main()
Exécutez ce script dans le conteneur prefect :
Vérifiez sur votre machine hôte que le fichier data/processed/training_df.csv a bien été créé.
Dans votre rapport (# Récupération offline & online) :
(copiez la sortie dans le rapport ou insérez une capture d’écran).
docker compose exec prefect python build_training_dataset.py
- Ajoutez la commande que vous avez utilisée ;
- Montrez les 5 premières lignes du fichier à l’aide de :
head -5 data/processed/training_df.csv
Toujours dans votre rapport, expliquez en 2–3 phrases comment Feast garantit la temporal correctness (point-in-time correctness) lors de cette récupération offline.
Appuyez-vous sur :
- le champ timestamp_field = "as_of" dans vos DataSources ;
- la structure de entity_df (user_id + event_timestamp).
Matérialisation & récupération online
Nous allons maintenant :
Cette commande :
- matérialiser les features dans le Online Store ;
- tester une récupération online pour un utilisateur donné.
docker compose exec feast feast materialize 2024-01-01T00:00:00 2024-02-01T00:00:00
- lit les données historiques dans l’Offline Store ;
- remplit le Online Store avec les features des FeatureViews, pour les timestamps compris entre le 1er janvier et le 1er février 2024.
Toujours dans le conteneur feast, lancez un shell Python interactif ou créez un petit script, par exemple services/feast_repo/repo/debug_online_features.py, pour tester get_online_features.
Le script (avec TODO) :
Exécutez-le dans le conteneur feast :
Choisissez un user_id existant en regardant par exemple le contenu de data/seeds/month_000/users.csv sur votre machine hôte.
from feast import FeatureStore
store = FeatureStore(repo_path="/repo")
# TODO: choisir un user_id existant (par ex. depuis data/seeds/month_000/users.csv)
user_id = "0001" # à adapter
features = [
"subs_profile_fv:months_active",
"subs_profile_fv:monthly_fee",
"subs_profile_fv:paperless_billing",
]
feature_dict = store.get_online_features(
features=features,
entity_rows=[{"user_id": user_id}],
).to_dict()
print("Online features for user:", user_id)
print(feature_dict)
docker compose exec feast python /repo/debug_online_features.py
Dans votre rapport (# Récupération offline & online) :
- copiez le dictionnaire retourné par get_online_features pour un utilisateur (sortie du script) ;
- ajoutez une phrase pour expliquer ce qui se passe si vous interrogez un user_id qui n’a pas de features matérialisées (par exemple : utilisateur inexistant ou en dehors de la fenêtre de matérialisation).
Intégration minimale de Feast dans l’API
Nous allons maintenant connecter l’API au Feature Store pour exposer un endpoint simple qui renvoie les features d’un utilisateur.
Modifiez votre docker-compose.yml pour ajouter un service api minimal (si ce n’est pas déjà fait).
Ajoutez ce bloc sous les autres services :
Mettez à jour le api/Dockerfile :
Et créez le fichier api/requirements.txt :
Ensuite, reconstruisez et redémarrez l’architecture :
api:
build: ./api
env_file: .env
depends_on:
- postgres
- feast
ports:
- "8000:8000"
volumes:
- ./api:/app
- ./services/feast_repo/repo:/repo # pour que l'API voie le repo Feast
FROM python:3.11-slim
WORKDIR /app
RUN apt-get update && \
apt-get install -y --no-install-recommends build-essential libpq-dev && \
rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY app.py .
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
fastapi
uvicorn
feast==0.56.0
pandas==2.3.3
psycopg2-binary==2.9.11
SQLAlchemy==2.0.36
psycopg==3.2.12
psycopg-pool==3.2.7
Ensuite, reconstruisez et redémarrez l’architecture :
docker compose up -d --build
Modifiez maintenant le fichier api/app.py (vous aviez un endpoint /health dans les TP précédents).
Vous allez :
Un exemple d’implémentation possible :
- conserver un endpoint /health simple ;
- initialiser un FeatureStore global avec repo_path="/repo" ;
- ajouter un endpoint GET /features/{user_id} qui :
- appelle get_online_features avec un petit sous-ensemble de features, par exemple :
- subs_profile_fv:months_active,
- subs_profile_fv:monthly_fee,
- subs_profile_fv:paperless_billing ;
- retourne un JSON de la forme :
{
"user_id": "...",
"features": {
"months_active": ...,
"monthly_fee": ...,
"paperless_billing": ...
}
}
from fastapi import FastAPI
from feast import FeatureStore
app = FastAPI()
# Initialisation du Feature Store (le repo est monté dans /repo)
store = FeatureStore(repo_path="/repo")
@app.get("/health")
def health():
return {"status": "ok"}
@app.get("/features/{user_id}")
def get_features(user_id: str):
features = [
"subs_profile_fv:months_active",
"subs_profile_fv:monthly_fee",
"subs_profile_fv:paperless_billing",
]
feature_dict = store.get_online_features(
features=features,
entity_rows=[{"user_id": user_id}],
).to_dict()
# On convertit en format plus simple (clé -> valeur scalaires)
simple = {name: values[0] for name, values in feature_dict.items()}
return {
"user_id": user_id,
"features": simple,
}
Vérifiez que l’API fonctionne :
- Assurez-vous que le service api est bien démarré :
docker compose ps
- Testez l’endpoint /health depuis votre machine :
curl http://localhost:8000/health
- Choisissez un user_id pour lequel vous savez que des features existent (par ex. un user du CSV data/seeds/month_000/users.csv).
Interrogez l’endpoint /features/{user_id} :
Copiez la réponse JSON dans votre rapport, section # Récupération offline & online.curl http://localhost:8000/features/7590-VHVEG
Dans la section # Réflexion de votre rapport, répondez brièvement (3–5 lignes) à la question suivante :
« En quoi ce endpoint /features/{user_id}, basé sur Feast, nous aide-t-il à réduire le training-serving skew dans un système de ML en production ? »
Pour terminer le TP, créez un tag Git marquant l’état de votre dépôt à la fin du TP3 :
Notez dans votre rapport que le dépôt a été tagué avec tp3, afin de pouvoir revenir facilement à cet état dans les TP suivants.
git tag -a tp3 -m "Fin du TP3 - Feature Store et API"
git push origin tp3