CSC 8608 – Concepts avancés et applications du deep learning

Portail informatique

CI1 : Kafka

Ce TP a pour objectif de vous familiariser avec Apache Kafka en mettant en place un pipeline de traitement de données en temps réel. Vous utiliserez Kafka pour produire, transformer et stocker des données structurées.

Objectifs du TP :

  • Installer et configurer Apache Kafka.
  • Créer et gérer des topics Kafka.
  • Implémenter un producer Kafka générant des données JSON réalistes.
  • Mettre en place un consumer appliquant un traitement de transformation des données.
  • Filtrer, normaliser et sélectionner des features sur les données reçues.
  • Stocker les données transformées dans un fichier CSV.
  • Observer le comportement des consumers et des partitions.

À la fin de ce TP, vous aurez une meilleure compréhension de la gestion des flux de données en temps réel et de l’intégration de Kafka dans un système de traitement de données.

Installation et communication avec Kafka

Dans cet exercice, vous allez installer Apache Kafka, créer un topic, et interagir avec lui en Python à l'aide de la bibliothèque kafka-python.

Exécutez les commandes suivantes pour installer Kafka et le démarrer :

 
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64 # Changez l'emplacement selon votre configuration. wget https://dlcdn.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz tar -xvf kafka_2.13-3.7.2.tgz cd kafka_2.13-3.7.2 bin/zookeeper-server-start.sh config/zookeeper.properties # Ouvrez un nouveau terminal ! bin/kafka-server-start.sh config/server.properties

Écrivez un script Python pour créer une connexion à Kafka en utilisant KafkaAdminClient.

from kafka.admin import KafkaAdminClient admin_client = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

Écrivez un script pour créer un topic nommé test-topic. Gérez le cas où le topic existe déjà.

from kafka.admin import NewTopic from kafka.errors import TopicAlreadyExistsError try: topic = NewTopic(name="test-topic", num_partitions=1, replication_factor=1) admin_client.create_topics([topic]) print("Topic créé avec succès.") except TopicAlreadyExistsError: print("Le topic existe déjà.")

Écrivez un script Python pour afficher la liste des topics existants.

print("Topics existants :", admin_client.list_topics())

Affichez la configuration des partitions d'un topic.

print(admin_client.describe_topics(["new-topic"]))

Ajoutez une partition supplémentaire à votre topic test-topic.

admin_client.create_partitions({"test-topic": NewPartitions(total_count=2)}) print("Partition ajoutée avec succès.")

Écrivez un script Python pour supprimer le topic test-topic.

admin_client.delete_topics(["test-topic"]) print("Topic supprimé avec succès.")

Création d'un Producer et d'un Consumer

Dans cet exercice, vous allez implémenter un Producer Kafka qui envoie continuellement des messages et un Consumer qui les lit.

Écrivez un script Python qui envoie en continu (un message par seconde) un numéro incrémenté à un topic Kafka.

from kafka import KafkaProducer import time producer = KafkaProducer(bootstrap_servers='localhost:9092') i = 0 while True: message = f"Message {i}" producer.send("test-topic", message.encode('utf-8')) print(f"Envoyé: {message}") i += 1 time.sleep(1)

Écrivez un script qui consomme les messages du topic.

consumer = KafkaConsumer("test-topic", bootstrap_servers='localhost:9092', auto_offset_reset='earliest') for message in consumer: print(f"Reçu: {message.value.decode('utf-8')}")

Tester de redémarrer le consumer. Observez le comportement selon la valeur de auto_offset_reset.

Ajouter le consumer à un groupe. Utilisez le paramètre group_id pour assigner un groupe.

Visualiser les consumers groups grâce à l'admin

Observer le comportement après redémarrage du consumer

Lancer un deuxième consumer identique.

Qui lit les messages ?

Afficher le numéro de la partition des messages lus.

Ajouter deux partitions au topic et observer.

Fermer un des consumers et observer.

Pipeline de traitement de données

Dans cet exercice, vous allez mettre en place un pipeline de traitement de données en temps réel en utilisant Apache Kafka.

Commencez par implémenter un producer Kafka qui génère des données JSON aléatoires de manière continue.

Voici la structure des données :

  • id : entier unique
  • temperature : nombre flottant
  • humidity : nombre flottant
  • status : chaîne de caractères (« normal », « warning », « critical »)
  • tags : liste contenant 2 éléments aléatoires parmi plusieurs valeurs possibles

Voici le code de la génération :

def generate_data(): return { "id": random.randint(1, 10000), "temperature": round(random.uniform(20.0, 35.0), 2), "humidity": round(random.uniform(30.0, 90.0), 2), "status": random.choice(["normal", "warning", "critical"]), "tags": random.sample(["sensor1", "sensor2", "sensor3", "outdoor", "indoor"], 2) }
import json import random import time from kafka import KafkaProducer producer = KafkaProducer( bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8') ) while True: data = generate_data() producer.send("raw-data", value=data) producer.flush() print("Envoyé:", data) time.sleep(1)

Implémentez un consumer qui lit ces données et applique un traitement spécifique :

  • Filtrer les données dont le statut est "critical" (elles doivent être ignorées).
  • Ne conserver que les colonnes "id", "temperature" et "humidity".
  • Normaliser la température et l'humidité entre 0 et 1.
Ensuite, vous devez réécrire les données transformées dans un nouveau topic.
import json from kafka import KafkaConsumer, KafkaProducer consumer = KafkaConsumer( "raw-data", bootstrap_servers='localhost:9092', value_deserializer=lambda v: json.loads(v.decode('utf-8')) ) producer = KafkaProducer( bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8') ) for message in consumer: data = message.value if data["status"] == "critical": continue normalized_data = { "id": data["id"], "temperature": (data["temperature"] - 20) / (35 - 20), "humidity": (data["humidity"] - 30) / (90 - 30) } producer.send("processed-data", value=normalized_data) print("Envoyé:", normalized_data)

Implémentez un consumer qui lit les données transformées et les stocke dans un fichier CSV en ajoutant les données reçues à la fin d'un fichier.

kafka import KafkaConsumer consumer = KafkaConsumer( "processed-data", bootstrap_servers='localhost:9092', value_deserializer=lambda v: json.loads(v.decode('utf-8')) ) file = open("dataset.csv", "a", newline='') for message in consumer: data = message.value file.write(str(data["id"]) + "," + str(data["temperature"]) + "," + str(data["humidity"]) + "\n") file.close()