CSC 8608 – Concepts avancés et applications du deep learning

Portail informatique

CI2 : Monitoring

Ce travail pratique a pour objectif de familiariser les étudiants avec la mise en production d'un système de Machine Learning en continu. Ils mettront en place un pipeline complet, de la prédiction en ligne à la surveillance des performances du modèle et à la détection des changements de distribution des données.

Objectifs du TP :

  • Déployer une API de prédiction en ligne avec FastAPI
  • Mettre en place un système de streaming de données avec Kafka
  • Effectuer des transformations de features en temps réel
  • Détecter des changements dans la distribution des données
  • Mettre en place un système de monitoring avec Prometheus
  • Visualiser les métriques et configurer des alertes avec Grafana

Prédiction en ligne avec un modèle de Machine Learning

Dans cet exercice, vous allez mettre en place une API de prédiction en ligne en utilisant FastAPI. Cette API recevra des données en entrée, effectuera des prédictions à l'aide d'un modèle de machine learning pré-entrainé, et renverra les résultats au client. Vous allez également envoyer ces données dans un système Kafka pour permettre un traitement en flux (streaming).

L'objectif est de comprendre comment créer une API simple qui intègre un modèle de prédiction et comment déployer cette API pour un usage en production.

Créez un projet Python et installez les dépendances nécessaires. Utilisez les bibliothèques suivantes :
  • fastapi
  • uvicorn
  • numpy
  • kafka-python
pip install fastapi uvicorn numpy kafka-python

Implémentez une API FastAPI avec un endpoint /predict qui reçoit des données de features sous forme de liste, utilise un modèle aléatoire (qui retourne la classe 0 95% du temps et la classe 1 5% du temps), et renvoie la prédiction sous forme de JSON.
from fastapi import FastAPI from pydantic import BaseModel import numpy as np from kafka import KafkaProducer import json app = FastAPI() class PredictionRequest(BaseModel): features: list @app.post("/predict") async def predict(request: PredictionRequest): # Simuler une prédiction aléatoire prediction = np.random.choice([0, 1], p=[0.95, 0.05]) return {"prediction": prediction.tolist()}

Ajoutez un second endpoint /send qui envoie les données reçues dans un topic Kafka appelé prediction_requests. Assurez-vous que le message envoyé contient bien les features.
Il va vous falloir démarrer Kafka comme dans le TP précédent.
producer = KafkaProducer( bootstrap_servers='localhost:9092', value_serializer=lambda x: json.dumps(x).encode('utf-8') ) @app.post("/send") async def send_to_kafka(request: PredictionRequest): producer.send('prediction_requests', {"features": request.features}) return {"status": "Message sent"}

Démarrez le serveur FastAPI en utilisant Uvicorn pour rendre l'API accessible. Ajouter le main suivant à votre programme :
if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)
Ensuite, testez que le serveur fonctionne en accédant à http://localhost:8000/docs dans votre navigateur.

Vous pouvez tester votre API directement depuis la page docs en cliquant sur Try it out. Essayez de faire une prédiction et observez le résultat.

Transformation de features avec Kafka Streams

Dans cet exercice, vous allez mettre en place un pipeline de transformation de features en utilisant Kafka Streams. L'objectif est de traiter les messages entrants, appliquer des transformations en temps réel, et envoyer les résultats vers un nouveau topic Kafka. Cela vous permettra de comprendre comment enrichir ou transformer les données en streaming avant qu'elles ne soient utilisées pour la prédiction.

Créez un nouveau consommateur Kafka pour lire les messages du topic prediction_requests. Assurez-vous que les messages soient correctement reçus et affichés dans la console.
from kafka import KafkaConsumer import json consumer = KafkaConsumer( 'prediction_requests', bootstrap_servers='localhost:9092', value_deserializer=lambda x: json.loads(x.decode('utf-8')) ) if __name__ == '__main__': for message in consumer: print("Reçu :", message.value)

Implémentez un producteur Kafka qui envoie les données transformées vers un nouveau topic transformed_features. Par exemple, ajoutez de nouvelles features calculées à partir des données initiales, comme la somme ou la moyenne.
from kafka import KafkaProducer producer = KafkaProducer( bootstrap_servers='localhost:9092', value_serializer=lambda x: json.dumps(x).encode('utf-8') ) def transform_features(features): # Exemple : ajout de nouvelles features features_sum = sum(features) features_mean = features_sum / len(features) features.append(features_sum) features.append(features_mean) return features if __name__ == '__main__': for message in consumer: transformed_features = transform_features(message.value['features']) producer.send('transformed_features', {"features": transformed_features})

Testez l'ensemble du pipeline en envoyant des messages au topic prediction_requests et en vérifiant que les messages transformés apparaissent bien dans le topic transformed_features.
import json from kafka import KafkaConsumer transformed_consumer = KafkaConsumer( 'transformed_features', bootstrap_servers='localhost:9092', value_deserializer=lambda x: json.loads(x.decode('utf-8')) ) if __name__ == '__main__': for message in transformed_consumer: print("Transformé :", message.value)

Détection des changements de distribution des données

Dans cet exercice, vous allez mettre en place un système pour détecter les changements de distribution dans les données entrantes. L'objectif est d'analyser le flux de données en continu, de comparer les distributions passées et présentes, et d'alerter en cas de dérive significative.

Créez un consommateur Kafka pour lire les messages du topic transformed_features. Calculez la moyenne et l'écart-type des features sur une fenêtre glissante de 100 messages. Logger les résultats avec un print.
from kafka import KafkaConsumer import json import numpy as np consumer = KafkaConsumer( 'transformed_features', bootstrap_servers='localhost:9092', value_deserializer=lambda x: json.loads(x.decode('utf-8')) ) window_size = 100 feature_window = [] if __name__ == '__main__': for message in consumer: features = message.value['features'] feature_window.append(features) if len(feature_window) > window_size: feature_window.pop(0) window_array = np.array(feature_window) mean = np.mean(window_array, axis=0) std_dev = np.std(window_array, axis=0) print(f"Moyenne: {mean}, Écart-type: {std_dev}")

Implémentez un nouveau consommateur qui effectuera un test statistique, le test de Kolmogorov-Smirnov, pour détecter un changement significatif (une p-value en dessous de 0.05) dans la distribution des features entre les premières et les dernières observations de la fenêtre (on coupera la fenêtre en deux). On utilisera la fonction ks_2samp dans scipy.stats.
from kafka import KafkaConsumer import json from scipy.stats import ks_2samp consumer = KafkaConsumer( 'transformed_features', bootstrap_servers='localhost:9092', value_deserializer=lambda x: json.loads(x.decode('utf-8')) ) if __name__ == '__main__': window_size = 100 feature_window = [] window_half = window_size // 2 for message in consumer: features = message.value['features'] feature_window.append(features) if len(feature_window) > window_size: feature_window.pop(0) if len(feature_window) == window_size: old_window = feature_window[:window_half] new_window = feature_window[window_half:] for i in range(len(features)): stat, p_value = ks_2samp([x[i] for x in old_window], [x[i] for x in new_window]) if p_value

Utilisez le code suivant pour générer un flux de données normal avec des changements de distribution aléatoires et testez votre implémentation de détection.
import json import time import numpy as np from kafka import KafkaProducer KAFKA_BROKER = 'localhost:9092' KAFKA_TOPIC = 'transformed_features' producer = KafkaProducer( bootstrap_servers=KAFKA_BROKER, value_serializer=lambda x: json.dumps(x).encode('utf-8') ) def generate_normal_features(mean=0.5, std=0.1, dim=3): return np.random.normal(mean, std, dim).tolist() def send_data_with_shifts(num_messages=1000, delay=0.1): current_mean = 0.5 for i in range(num_messages): if i % 200 == 0 and i > 0: current_mean += np.random.choice([-0.3, 0.3]) # Shift aléatoire print(f"Changement effectué ! Nouvelle moyenne: {current_mean}") features = generate_normal_features(mean=current_mean) producer.send(KAFKA_TOPIC, {"features": features}) print(f"Envoyé : {features}") time.sleep(delay) if __name__ == "__main__": send_data_with_shifts()

Surveillance des prédictions et des métriques avec Prometheus

Dans cet exercice, vous allez mettre en place un système de surveillance pour suivre les performances des prédictions et des données en utilisant Prometheus. Vous apprendrez à exposer des métriques via une API FastAPI et à configurer Prometheus pour collecter ces informations.

Installez la bibliothèque prometheus_client.

Modifiez votre API FastAPI pour inclure des métriques Prometheus. Ajoutez un compteur (Counter dans prometheus_client) pour suivre le nombre total de prédictions effectuées et un histogramme (Histogram dans prometheus_client) pour mesurer la latence des prédictions.
from fastapi import FastAPI from prometheus_client import Counter, Histogram, generate_latest import time app = FastAPI() PREDICTION_COUNT = Counter('prediction_count', 'Nombre total de prédictions') PREDICTION_LATENCY = Histogram('prediction_latency_seconds', 'Latence des prédictions en secondes') @app.post("/predict") async def predict(request: PredictionRequest): # Simuler une prédiction aléatoire prediction = np.random.choice([0, 1], p=[0.95, 0.05]) start_time = time.time() PREDICTION_COUNT.inc() time.sleep(random.random() / 10) # Simuler un délai de traitement PREDICTION_LATENCY.observe(time.time() - start_time) return {"prediction": prediction.tolist()}

Ajoutez le point d'entrée suivant :
@app.get("/metrics") async def metrics(): return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)

Configurez Prometheus pour collecter les métriques de votre API. Ajoutez une configuration qui scrape l'endpoint de votre API toutes les 5 secondes. Pour cela, créez le fichier de configuration prometheus.yml suivant :
global: scrape_interval: 5s scrape_configs: - job_name: 'fastapi' static_configs: - targets: ['localhost:8000']

Lancez Prometheus à l'aide de la commande Docker :
            docker run \
    -p 9090:9090 \
    -v /home/julien/Documents/programmation/datashift/prometheus.yml:/etc/prometheus/prometheus.yml --network host \
    prom/prometheus
        
Ensuite, accédez à http://localhost:9090 pour voir les métriques collectées.

Exécutez les requêtes suivantes dans l'interface de Prometheus pour afficher des graphes des métriques collectées :
  • Affichez l'évolution du nombre de prédictions :
    prediction_count
  • Affichez un histogramme de la latence des prédictions :
    histogram_quantile(0.95, rate(prediction_latency_seconds_bucket[5m]))

Visualisation des métriques avec Grafana

Dans cet exercice, vous allez configurer Grafana pour visualiser les métriques collectées par Prometheus. Vous allez apprendre à ajouter une source de données Prometheus et à créer des tableaux de bord pour surveiller l'évolution des prédictions et de la latence.

Démarrez Grafana avec la commande docker suivant :
            docker run -d -p 3000:3000 --name=grafana --network host grafana/grafana
        
Ensuite, ouvrez Grafana dans votre navigateur à l’adresse http://localhost:3000. Les identifiants par défaut sont admin, admin.

Ajoutez Prometheus comme source de données dans Grafana.
  • Dans l’interface Grafana, allez dans Connections > Data sources.
  • Cliquez sur Add data source, choisissez Prometheus.
  • Dans le champ URL, entrez http://localhost:9090.
  • Enregistrez la configuration et testez la connexion.

On veut créer un tableau de bord affichant l’évolution du nombre total de prédictions.
  • Créez un nouveau dashboard
  • Ajoutez une visualisation
  • Sélectionnez Prometheus comme source de données
  • En bas, sélectionnez la métrique prediction_count_total
  • Ajoutez une opération rate (vous pouvez jouer sur le range)
  • Appuyez sur Run queries pour obtenir la visualisation
  • Une fois satisfait, cliquez sur Back to dashboard

Ajoutez un graphique montrant l'évolution de la latence des prédictions. Utilisez la visualisation suggérée (hint) et réglez les paramètres.

Configurez un panneau d’alerte dans Grafana pour détecter une latence élevée des prédictions.