CSC 4509 – Algorithmique et communications des applications réparties

Portail informatique

Étude de l'application de tchat proposée comme ossature de départ

  • Avant de compléter l'application répartie avec des algorithmes, présenter l'ossature de départ.
  • Profiter de cette présentation pour approfondir des concepts et des idiomes JAVA.
cette page est le manuel de référence de l'ossature de départ de l'application de tchat multiclient multiserveur. Elle est donc très longue. Nous vous proposons de la parcourir une première fois en en comprenant chaque élément et d'y revenir autant que nécessaire ensuite. Nous estimons à environ 4 heures le premier parcours de ce manuel, la plus grande partie étant supposée faite en hors présentiel (peut-être de manière collective et avec la préparation de questions pour le présentiel qui suit).

Sommaire :

  • 1. Test de l'application de tchat
    • 1.a. Dans des consoles
    • 1.b. Dans un scénario de test d'intégration
  • 2. Présentation de l'utilisation de concepts et d'idiomes JAVA dans la conception du distributeur de messages du client
    • 2.a. Indications de lecture, et motivations et objectifs
    • 2.b. Types énumérés, lambda expression et default method
    • 2.c. Objets non modifiables, blocs de code static, et collection et Stream
    • 2.d. Parcours de collection et Stream
    • 2.e. Patron de conception Intercepteur et méthodes « par défaut » (default method) des interfaces
    • 2.f. Idiome/Patron d'implémentation JAVA pour la terminaison de Thread et de processus
  • 3. Architecture répartie et diagrammes de séquence de l'application de tchat
    • 3.a. Architecture de l'exemple d'exécution
    • 3.b. Diagrammes de classes des parties client et serveur
    • 3.c. Diagramme de séquence de l'émission d'un message de tchat — partie de la séquence depuis la saisie à la console par l'utilisateur jusqu'à l'émission du message vers le serveur auquel le client est connecté
    • 3.d. Diagramme de séquence de la réception d'un message de tchat par le serveur — partie de la séquence depuis la réception du message jusqu'à la transmission vers les serveurs voisins et les clients locaux
    • 3.e. Diagramme de séquence de la réception d'un message de tchat par le client — partie de la séquence depuis la réception du message jusqu'à l'affichage dans la console
  • 4. Programmation réseau dans l'application de tchat
    • 4.a. Connexions JAVA NIO
    • 4.b. Structure des messages et gestion des cycles dans la topologie des serveurs
    • 4.c. Primitives de communication disponibles dans le serveur
    • 4.d. Calcul des identités des processus
    • 4.e. Algorithme de découverte de la topologie

la page a été rédigée en supposant par défaut une lecture linéaire. Mais, selon vos affinités, vous pouvez lire une ou plusieurs sections dans un autre ordre, par exemple si vous préférez commencer par l'architecture répartie pour avoir une vue d'ensemble du code.

avant de démarrer l'étude du code, si vous ne connaissez pas, prenez connaissance des fonctionnalités « code mining » de Eclipse, ou de votre IDE favori : cf. question correspondante dans la page « Trucs & astuces — JAVA et Eclipse ».

Test de l'application de tchat

Dans des consoles

 

En guise d'ossature, nous fournissons une réalisation complète de l'application de tchat multiclient et multiserveur.

Pour tester l'application fournie, nous vous proposons d'utiliser l'architecture qui suit. Notez que les serveurs sont multiclients et que les cycles sont autorisés dans le graphe formé par les serveurs.

          Client0  Client1
           ||     //
           ||    //
           ||   //
          Serveur1 ======= Serveur2
               \\          //
                \\        //
                 \\      //
                  \\    //
                  Serveur3 ======= Serveur4
                     ||             ||   \\
                     ||             ||    \\
                     ||             ||     \\
                  Client2          Client3  Client4

Auparavant, compilez le projet avec Maven (mvn clean install -Dmaven.test.skip=true -DskipTests). Nous vous proposons d'utiliser neuf consoles et d'y lancer les commandes suivantes :

  • quatre serveurs :
    1. ./serveur.sh 1
    2. ./serveur.sh 2 localhost 1
    3. ./serveur.sh 3 localhost 1 localhost 2
    4. ./serveur.sh 4 localhost 3
  • cinq clients :
    1. ./client.sh localhost 1 # => serveur 1
    2. ./client.sh localhost 1 # => serveur 1
    3. ./client.sh localhost 3 # => serveur 3
    4. ./client.sh localhost 4 # => serveur 4
    5. ./client.sh localhost 4 # => serveur 4

Une fois les serveurs et les clients démarrés, vous pouvez entrer des messages dans les consoles des clients et les voir s'afficher dans les consoles des autres clients. Pour terminer un client, entrez la commande « quit » dans la console du client. Faites de même pour terminer un serveur.

Dans un scénario de test d'intégration

 

Vérifiez que vous avez terminé les exécutions des serveurs et des clients de l'étape précédente avant d'exécuter le test dans Eclipse. Sinon, vous aurez un problème de réutilisation de ports TCP.

$ ps aux | grep java xxxx 58786 2.7 0.5 3735888 42604 pts/1 Sl 18:59 0:00 java xxxx chat.server.Main 1 xxxx 58805 0.0 0.0 6172 828 pts/1 S+ 18:59 0:00 grep java ... $ kill -9 58786 $ ./serveur.sh : ligne 21 : 58786 Processus arrêté $CMD

Comme il est quelque peu fastidieux de procéder avec un nombre important de consoles, nous proposons le concept de « scénario » avec la classe chat.common.Scenario et l'utilisons pour écrire des tests de validation : par exemple, la classe chat.startingframework.TestScenarioStartingFramework réalise le scénario précédent en quelques lignes.

Exécutez le test d'intégration de la classe chat.startingframework.TestScenarioStartingFramework : sélectionnez la classe dans l'explorateur de paquetages, puis utilisez le menu contextuel Run as > JUnit Test. Vous obtenez la même exécution que celle obtenue précédemment, mais sans affichage dans la console de Eclipse. Pour voir des affichages, essayez différents niveaux de journalisation : par exemple Log.setLevel(Log.TEST, Level.INFO) et Log.setLevel(Log.CHAT, Level.DEBUG).

dans Eclipse, ne demandez pas l'exécution de tous les tests unitaires d'un paquetage car cela revient à demander à Eclipse d'exécuter dans une même machine virtuelle et en parallèle tous les scénarios de toutes les classes de test JUnit du paquetage. Vous tombez alors dans le problème de réutilisation des ports TCP.

Par défaut, les tests JUnit sont exécutés avec la commande mvn install, comme cela est fait dans l'intégration continue avec GitLab CI (cf. fichier .gitlab-ci.yml).

Typiquement, un scénario comme celui de la classe de tests TestScenarioStartingFramework est structuré comme suit :

  1. diverses initialisations : par exemple, la configuration de la journalisation,
  2. démarrage/instanciation des serveurs de tchat, avec des temporisations pour leur laisser le temps de terminer leur configuration,
  3. démarrage/instanciation des clients de tchat avec connexion à leur serveur,
  4. émulation des frappes au clavier de commandes pour les clients et/ou les serveurs,
  5. attente de la fin du scénario pour laisser le temps de terminer les exécutions des algorithmes,
  6. tests divers sur les états des clients et/ou des serveurs.

Pour permettre la construction de scénarios pour les tests, nous faisons la distinction entre les classes possédant les mains (chat.server.Main et chat.client.Main) et les classes contenant la logique des processus de l'application répartie (chat.server.Server et chat.client.Client). C'est un idiome JAVA communément utilisé.

les méthodes instanciateAServer et instanciateAClient retournent une référence vers le serveur ou le client créé (s0, s1, etc., et c0, c1, etc.). Ces références sont utilisées ensuite pour vérifier la correction de l'exécution avec les méthodes de la classe org.junit.Assert.
pour le test d'applications asynchrones, nous utilisons le canevas logiciel Awaitality. Pour lire les appels Awaitility.await().until(...), veuillez vous reporter à la documentation en ligne Usage. Par ailleurs, une configuration par défaut du canevas logiciel est proposée dans la méthode de classe setUp annotée @BeforeAll du test JUnit.

Présentation de l'utilisation de concepts et d'idiomes JAVA dans la conception du distributeur de messages du client

Indications de lecture, et motivations et objectifs

 

Dans cette étape, nous expliquons pourquoi nous avons besoin de ces concepts et idiomes JAVA, et nous montrons où et comment nous les utilisons. Pour cela, nous quittons la vue globale avec des serveurs et des clients en cours d'exécution pour détailler des morceaux de code. Cette étape est la plus coûteuse en temps de lecture. Lors de votre première lecture, nous vous proposons de vous concentrer sur la compréhension des aspects JAVA, notamment en parcourant les liens sur la documentation Javadoc qui sont proposés ; passez ensuite à l'étape qui suit et qui présente l'architecture répartie avec des diagrammes de classes et de séquence UML ; puis, peut-être sera-t-il intéressant de reparcourir cette étape avant le démarrage de la programmation.

Les extraits de code insérés dans cette page le sont sans les commentaires ; donc, lisez le code source des classes. Par ailleurs, vous pouvez aussi générer le site Web Maven du projet avec la commande « mvn site » pour parcourir la documentation Javadoc (par exemple avec firefox target/site/apidocs/index.html).

L'application de tchat est conçue et mise en œuvre en utilisant l'orientation événement (cf. section 1.2 à la page 14 du cours) : aussi bien le client que le serveur attendent la réception d'un message ou l'écriture de texte écrit dans une console par un utilisateur, et réagissent à ces événements, les réactions pouvant comprendre l'émission de messages. Afin de faciliter l'insertion d'algorithmes répartis dans l'architecture initiale, nous avons ajouté un mécanisme de distribution des messages.

Un casse-tête habituel de la mise en œuvre d'application répartie est l'initialisation des structures de données pour les algorithmes : en l'occurrence, la définition de la liste des algorithmes et des listes des types de messages des algorithmes avec le « branchement » des actions correspondantes. Nous souhaitons éviter que le programmeur oublie tout ou partie de cette phase d'initialisation lors de la construction des clients et des serveurs, et aussi faciliter la mise en place de ces branchements. Nous faisons le choix de structurer les données correspondantes dans des types énumérés (algorithmes et actions, qui correspondent aux différents types de messages utilisés dans les algorithmes), de définir les actions comme des lambda expressions, et d'initialiser les listes dans des blocs de code static.

Une seconde exigence est de mettre en œuvre un mécanisme d'interception permettant de retarder la réception d'un message et son traitement lorsque certaines conditions sont satisfaites. Cela est nécessaire pour programmer les tests des algorithmes répartis : par exemple, dans l'algorithme d'élection par vague Écho de Segall, nous souhaitons retarder les messages d'une vague pour laisser le temps de démarrer une seconde vague et ainsi obtenir la concurrence entre deux vagues.

Types énumérés, lambda expression et default method

 

Du côté client, le premier type énuméré, chat.client.algorithms.ClientAlgorithm définit la liste des algorithmes du distributeur de messages du client. (Au besoin, prenez quelques instants pour parcourir la page sur les types énumérés, en cherchant par exemple la méthode values() dans la page, puis pour parcourir la documentation Javadoc, avec par exemple la méthode ordinal().)

Le premier algorithme, c'est-à-dire le premier énumérateur, est l'algorithme de base du client appelé ALGORITHM_CHAT, qui est lui-même défini dans l'énumération chat.client.algorithms.chat.ChatAction. L'objet ALGORITHM_CHAT est construit avec la liste des actions (méthodes values()) définie dans le type énuméré chat.client.algorithms.chat.ChatAction.

NB : les explications sur le code du type énuméré chat.client.algorithms.ClientAlgorithm continuent dans les sections qui suivent : par exemple, le bloc de code static est expliqué en 2.c.

package chat.client.algorithms; public enum ClientAlgorithm { ALGORITHM_CHAT(chat.client.algorithms.chat.ChatAction.values()); ... private final List<Enum<?>> actions; ... ClientAlgorithm(final Enum<?>[] actions) { this.actions = Collections.unmodifiableList(Arrays.asList(actions)); } ... }

Le second type énuméré du côté client, chat.client.algorithms.chat.ChatAction, consiste en la déclaration des différentes actions à exécuter selon les types de messages reçus. Le premier énumérateur CHAT_MESSAGE définit l'action exécutée suite à la réception d'un message de type ChatMsgContent. La définition de l'action est une lambda expression. Si nous regardons ci-dessous le constructeur du type énuméré, nous remarquons que le type du second argument est une fonction BiConsumer, qui selon la documentation Javadoc pointée ci-avant prend deux arguments (le premier de type Client, le second de type MsgContent) et ne retourne pas de valeur. Ainsi, la définition de l'action de l'énumérateur CHAT_MESSAGE utilise la forme « (Entity client, MsgContent content) -> {...} » et le corps de l'action est simplement l'appel d'une méthode du client : ici, l'instruction « client.receiveChatMessageContent((ChatMsgContent) content) ».

package chat.client.algorithms.chat; public enum ChatAction implements Action { CHAT_MESSAGE(ChatMsgContent.class, (Entity client, MsgContent content) -> ((Client) client).receiveChatMessageContent((ChatMsgContent) content)); ... private final Class<? extends MsgContent> contentClass; private final BiConsumer<Entity, MsgContent> actionFunction; ... ChatAction(final Class<? extends MsgContent> contentClass, final BiConsumer<Entity, MsgContent> actionFunction) { Objects.requireNonNull(contentClass, "argument contentClass cannot be null"); Objects.requireNonNull(actionFunction, "argument actionFunction cannot be null"); this.contentClass = contentClass; this.actionFunction = actionFunction; }

Notez que des transtypages vers le bas sont effectués dans l'instruction « ((Client) client).receiveChatMessageContent((ChatMsgContent) content) » car c'est une méthode de la classe chat.client.Client, sous classe de Entity, qui est appelée, et la méthode chat.client.Client::receiveChatMessageContent requiert un message de type ChatMsgContent, sous-classe de MsgContent. Nous devons donc faire en sorte qu'à l'exécution les transtypages ne lèvent pas une exception ClassCastException : cela est fait dans la méthode chat.client.algorithms.ClientAlgorithm::execute, qui est décrite maintenant.

La méthode chat.client.algorithms.ClientAlgorithm::execute appelle la méthode « par défaut » (en JAVA default method) executeOrIntercept() de l'interface chat.common.Action, qui elle-même appelle la méthode par défaut chat.common.Action::execute(). Comme montré ci-après, la méthode par défaut chat.common.Action::execute appelle l'action définie dans la lambda expression : instruction actionFunction().accept(entity, msg).

package chat.common; ... public interface Action { ... default void execute(Entity entity, MsgContent msg) { Objects.requireNonNull(entity, "argument entity cannot be null"); Objects.requireNonNull(msg, "argument content cannot be null"); if (!contentClass().isInstance(msg)) { throw new IllegalArgumentException("msg of type " + msg.getClass().getCanonicalName() + "is not an instance of " + contentClass().getCanonicalName()); } actionFunction().accept(entity, msg); } ... }

Du côté de la conception du serveur, les mêmes types énumérés existent ; ils s'appellent :

  • chat.server.algorithms.ServerAlgorithm,
  • chat.server.algorithms.topology.TopologyAction et
  • chat.server.algorithms.election.ElectionAction.

Objets non modifiables, blocs de code static, et collection et Stream

 

Dans l'étape précédente, nous avons vu comment sont déclarés les algorithmes avec le branchement des actions. Détaillons maintenant la création et la manipulation des collections d'énumérateurs (algorithmes et actions). Ces collections sont créées dans des blocs de code static. Un bloc de code dit static est exécuté lors du chargement de la classe. Dans notre application, cela signifie que ces blocs de code sont exécutés par la machine virtuelle JAVA lors du chargement de l'application, donc avant l'exécution de la première instruction de la méthode main appelée. Nous faisons cela dans l'application de tchat pour rassembler dans un seul endroit du code l'initialisation des structures de données pour les algorithmes avant même que la première instruction d'une méthode main soit exécutée. En outre, ces collections sont construites comme des dictionnaires non modifiables (unmodifiable maps) ; la raison est d'éviter de « casser » par inadvertance les structures de données des algorithmes dans les classes et les méthodes des algorithmes.

Voici ci-dessous l'extrait de code du type énuméré chat.server.algorithms.ServerAlgorithm qui déclare pour l'instant deux énumérateurs, c'est-à-dire deux algorithmes : celui de la découverte de la topologie (cf. la dernière section de cette page qui le décrit), et celui de l'élection (le premier que vous aurez à mettre en œuvre, c'est-à-dire à intégrer, dans l'application répartie de tchat).

chat.server.algorithms; ... public enum ServerAlgorithm { ALGORITHM_TOPOLOGY(TopologyAction.values()), ALGORITHM_ELECTION(ElectionAction.values()); private final List<Enum<?>> actions; ServerAlgorithm(final Enum<?>[] actions) { this.actions = Collections.unmodifiableList(Arrays.asList(actions)); } private static final Map<Integer, Action> MAP_OF_ACTION_NUMBER_ACTIONS; private static final Map<Enum<?>, Integer> MAP_OF_ACTION_ACTION_NUMBERS; static { Map<Integer, Action> mBis = new HashMap<>(); Map<Enum<? extends Action>, Integer> mTer = new HashMap<>(); for (ServerAlgorithm algorithm : ServerAlgorithm.values()) { for (Enum<? extends Action> action : algorithm.actions) { final int actionNumber = OFFSET_SERVER_ALGORITHMS + algorithm.ordinal() * NB_MAX_ACTIONS_PER_ALGORITHM + action.ordinal(); mBis.put(actionNumber, (Action) action); mTer.put(action, actionNumber); } } MAP_OF_ACTION_NUMBER_ACTIONS = Collections.unmodifiableMap(mBis); MAP_OF_ACTION_ACTION_NUMBERS = Collections.unmodifiableMap(mTer); } public static int getActionNumber(final Enum<? extends Action> action) { return MAP_OF_ACTION_ACTION_NUMBERS.getOrDefault(action, -1); } public static void execute(final Server server, final int actionNumber, final Object content) { MAP_OF_ACTION_NUMBER_ACTIONS.entrySet().stream().filter(e -> e.getKey() == actionNumber).map(Entry::getValue) .filter(Objects::nonNull).filter(action -> action.contentClass().isInstance(content)) .forEach(action -> action.executeOrIntercept(server, action.contentClass().cast(content))); } }

Comme défini dans l'attribut ServerAlgorithm::actions, un algorithme contient une liste d'actions, c'est-à-dire une liste d'énumérateurs réalisant (au sens implements) l'interface Action. Le constructeur du type énuméré ServerAlgorithm prend donc en argument une liste d'actions, et la déclaration de l'énumérateur Agorithm.ALGORITHM_ELECTION montre que la liste d'actions est obtenue par la méthode du type énuméré values(), qui est implicitement déclarée et construite par le compilateur (en anglais, implicitly declared methods synthesized by the compiler).

Les collections (listes) d'actions des algorithmes sont rassemblées/fusionnées de deux manières dans deux collections (dictionnaires) comme suit :

  • ServerAlgorithm::MAP_OF_ACTION_ACTION_NUMBERS : de type Map<Enum<? extends Action>, Integer>, la clé du dictionnaire est l'énumérateur d'une action (d'un algorithme) et la valeur est l'entier du type du message correspondant. Ce dictionnaire est utilisé dans les méthodes des algorithmes via la méthode ServerAlgorithm::getActionNumber pour « typer » les messages à envoyer, c'est-à-dire pour trouver l'entier du type du message à envoyer. En effet, chaque message commence par un entier, qui est son type.
  • ServerAlgorithm::MAP_OF_ACTION_NUMBER_ACTIONS : de type Map<Integer, Action>, la clé du dictionnaire est l'entier du type du message et la valeur est la référence de l'action à exécuter, c'est-à-dire de l'énumérateur contenant la lambda expression à exécuter. Ce dictionnaire est utilisé dans la méthode ServerAlgorithm::execute, c'est-à-dire lors de la réception d'un message, pour « trouver » l'action à exécuter ;

Les deux collections sont construites dans le bloc de code static comme suit :

  • tous les algorithmes sont parcourus, c'est-à-dire tous les énumérateurs du type énuméré ServerAlgorithm sont parcourus ;
  • toutes les actions d'un algorithme sont parcourues ;
  • l'entier correspondant au type d'un message est calculé comme suit :
    • la constante chat.common.Action::OFFSET_SERVER_ALGORITHMS indique la valeur du début de la plage des entiers des types des messages des algorithmes des serveurs, c'est-à-dire ici la valeur 0 (la plage des entiers pour les types des messages des algorithmes des clients commence à 1000) ;
    • on y ajoute le début de la plage des entiers de l'algorithme donné, qui est calculé par la multiplication de l'ordinal de l'énumérateur de l'algorithme avec la taille de la plage des entiers des types des messages d'un algorithme (qui est donnée par la constante chat.common.Action::NB_MAX_ACTIONS_PER_ALGORITHM [ici fixée à 20 messages au maximum par algorithme]) ;
    • enfin, on ajoute l'ordinal de l'énumérateur de l'action.

Vous avez déjà dû observer que les attributs des collections ServerAlgorithm::MAP_OF_ACTION_NUMBER_ACTIONS et ServerAlgorithm::MAP_OF_ACTION_ACTION_NUMBERS sont des attributs de classe (static) et qu'ils sont constants (final). Leur valeur est calculée dans le bloc de code static, c'est-à-dire lors du chargement par la machine virtuelle JAVA du bytecode du type énuméré. Les collections sont construites comme étant des dictionnaires non modifiables (méthode Collections.unmodifiableMap()). Donc, non seulement les attributs sont final et ne peuvent pas référencer un autre dictionnaire une fois qu'ils ont été initialisés, mais le contenu des collections n'est pas modifiable non plus. C'est la compilation « à la volée » lors du chargement de la classe qui calcule l'ordre de construction des objets : les énumérateurs sont créés avant d'être insérés dans la collection.

Parcours de collection et Stream

 

Comme indiqué ci-avant, c'est le dictionnaire ServerAlgorithm::MAP_OF_ACTION_NUMBER_ACTIONS qui est utilisé lors de la réception d'un message pour trouver et exécuter l'action correspondant au type du message reçu. La méthode chat.server.algorithms.ServerAlgorithm::execute() utilise les Streams. À titre d'exercice, vous pouvez traduire le pipeline avec des boucles foreach et des structures if. Notez que, si aucune action n'est trouvée ou si le contenu du message ne correspond pas à ce qui est attendu, alors le message est ignoré silencieusement ; donc, pensez à utiliser les logs de type « COMM » pour tracer un éventuel problème.

Patron de conception Intercepteur et méthodes « par défaut » (default method) des interfaces

 

Afin de tester les algorithmes répartis dans des conditions variées, nous introduisons un mécanisme d'interception lors de la réception des messages.

Comme vous avez pu le voir dans les morceaux de code précédents, la méthode qui est appelée sur les actions est la méthode définie par défaut chat.common.Action::executeOrIntercept(), et non la méthode chat.common.Action::execute(). Voici la définition de la méthode executeOrIntercept().

package chat.common; public interface Action { ... default void executeOrIntercept(final Entity entity, final MsgContent content) { Objects.requireNonNull(entity, "argument entity cannot be null"); Objects.requireNonNull(content, "argument content cannot be null"); Optional<MsgContent> msg = Optional.of(content); if (Interceptors.isInterceptionEnabled()) { msg = Interceptors.intercept(entity, msg); } msg.ifPresent(m -> execute(entity, m)); } ... }

Tout d'abord, notez que nous utilisons la classe Optional pour indiquer que la variable msg peut être null. La raison est la suivante : avant d'exécuter la méthode Action::execute « sur » le message reçu, et si le mécanisme d'interception est activé (cf. méthode chat.common.Interceptors::isInterceptionEnabled), le message reçu passe dans la méthode chat.common.Interceptors::intercept. Cette dernière méthode parcourt les collections des intercepteurs préalablement enregistrés (cf. les explications ci-après) pour appliquer la méthode chat.common.Interceptor::doIntercept. Dans l'un des intercepteurs, le message peut disparaître, c'est-à-dire être retiré pour un traitement différé : par exemple « return Optional.empty() » dans la méthode Interceptor::doIntercept. Cela explique pourquoi l'argument est de type Optional dans la méthode chat.common.Interceptors::intercept.

Lors de l'interception, la méthode de classe chat.common.Interceptors::intercept parcourt la liste des intercepteurs enregistrés pour appeler la méthode chat.common.Interceptor::doIntercept sur chacun d'eux. Nous décrivons maintenant ce qu'est un intercepteur (dans notre application de tchat) en utilisant un exemple extrait de la classe de test JUnit chat.startingframework.TestScenarioStartingFrameworkWithInterceptorsOnClientSide. Voici le code, et les explications suivent après.

package chat; public class TestScenarioStartingFrameworkWithInterceptorsOnClientSide extends Scenario { @Test @Override public void constructAndRun() throws Exception { ... Client c1 = instanciateAClient(s1.identity()); ... ClientInterceptors.setInterceptionEnabled(true); Predicate<ChatMsgContent> conditionForInterceptingI1OnC1 = msg -> msg.getSender() == c0.identity(); Predicate<ChatMsgContent> conditionForExecutingI1OnC1 = msg -> true; Consumer<ChatMsgContent> treatmentI1OnC1 = msg -> chat.client.algorithms.chat.ChatAction.CHAT_MESSAGE .execute(c1, new ChatMsgContent(msg.getSender(), msg.getSeqNumber(), msg.getContent() + ", intercepted at client c1 by i1")); Interceptors.addAnInterceptor("i1", c1, conditionForInterceptingI1OnC1, conditionForExecutingI1OnC1, treatmentI1OnC1); } }
Avant d'exécuter la classe de test, vérifiez le niveau de log de l'instruction Log.setLevel(Log.CHAT, Level.INFO) pour voir les messages de journalisation de l'algorithme de tchat. Une exécution de cette classe de test donnera l'affichage suivant :
Client 1 (client 1 of server 0) receives message 0 from c0 (Client 0), intercepted at client c1 by i1

Un intercepteur, de type chat.common.Interceptor, est défini par les trois composantes suivantes :

  1. le prédicat de l'attribut Interceptor::conditionForIntercepting : qui est appelé lors de la réception d'un message et qui intercepte le message si, appliqué sur le message, il retourne true : c'est un objet de type Predicate<C extends MsgContent> prenant en argument le message reçu et retournant un booléen. Dans l'exemple, un message est intercepté s'il provient du client 0 : « msg.getSender() == c0.identity() ». Pratiquement, le message intercepté est géré par un Thread (classe TreatDelayedMessageToAClient), qui réessaie périodiquement (TreatDelayedMessageToAClient.DELAY = 100 ms) de consommer le message (par la méthode Interceptor::doTreatDelayedMessage) ;
  2. le prédicat de l'attribut Interceptor::conditionForExecuting  : qui est appelé par la méthode Interceptor::doTreatDelayedMessage et qui teste le contenu du message qui a été intercepté pour savoir si le message peut être consommé (maintenant). Dans l'exemple, à la fin de la première temporisation de 100ms, le message est consommé inconditionnellement car « conditionForExecutingI1OnC1 = msg -> true » 
  3. la fonction de l'attribut Interceptor::treatmentOfADelayedMsg : qui est appelée par la méthode Interceptor::doTreatDelayedMessage et qui consomme le message intercepté si la fonction conditionForExecuting a retourné true. Dans l'exemple, le message, qui est une chaîne de caractères (c'est un message de tchat), est complété avec la chaîne de caractères « ", intercepted at client c1 by i1" » avant d'être passé au distributeur de messages.

À titre d'exemple de fonctionnement, vous pouvez exécuter dans Eclipse la classe de test TestScenarioStartingFrameworkWithInterceptorsOnClientSide, de laquelle a été extrait la définition de l'exemple d'intercepteur.

dans Eclipse, ne demandez pas l'exécution de tous les tests unitaires d'un paquetage car cela revient à demander à Eclipse d'exécuter dans une même machine virtuelle et en parallèle tous les scénarios de toutes les classes de test JUnit du paquetage. Vous tombez alors dans le même problème de réutilisation des ports TCP.

Dans la construction des tests des algorithmes que vous mettez en œuvre dans cette infrastructure, les intercepteurs permettant de tester les cas intéressants sont à écrire.

Idiome/Patron d'implémentation JAVA pour la terminaison de Thread et de processus

 

Dans le client, et de la même façon dans le serveur, lorsque l'utilisateur entre à la console la chaîne de caractères « quit », tous les threads terminent leur exécution et le processus est alors terminé (il n'y a plus de threads). Après la phase de démarrage, les deux threads du client, et de la même façon ceux du serveur, exécutent une boucle (potentiellement infinie) : le premier thread en lecture des lignes de caractères saisies à la console, et le second thread en lecture des messages dans l'appel select sur les sockets.

À la lecture de la chaîne de caractères « quit », c'est une mauvaise pratique que de forcer la terminaison de (ou de suspendre) l'autre thread avec la méthode Thread::stop() (respectivement, Thread::suspend()) : ces deux méthodes sont d'ailleurs marquées deprecated. Une autre mauvaise pratique, encore plus radicale, est de forcer la terminaison du processus avec un appel à System::exit() : imaginez par exemple qu'une écriture sur disque ou sur réseau soit en cours.

Une manière classique d'organiser de tels threads est de remplacer les boucles « while (true) {...} » par des boucles « while (condition d'arrêt) {...} », et dans notre cas, par des boucles « while (!Thread.interrupted()) {...} ». Par ailleurs, à la lecture de la chaîne de caractères « quit », le thread de lecture au clavier interrompt le thread (signale une interruption au thread) qui lit les messages en provenance du réseau : instruction threadToRcvMsgs.interrupt(). Il s'envoie aussi le même « signal » à lui-même : instruction Thread.currentThread().interrupt().

Par exemple, pour le serveur :

  • le thread qui lit les messages sur le réseau de la classe chat.server.ReadMessagesFromNetwork :
    package chat.server; public class ReadMessagesFromNetwork implements Runnable { ... @Override public void run() { ... while (!Thread.interrupted()) { try { selector.select(); } catch (IOException e) { COMM.fatal(e.getLocalizedMessage()); return; } ... } } }
  • et le thread main qui les lignes de caractères saisies à la console de la classe chat.server.Main :
    package chat.server; public final class Main { ... public static void main(final String[] args) throws IOException { while (!Thread.interrupted()) { String consoleMsg = null; consoleMsg = bufin.readLine(); if (consoleMsg == null) { break; } GEN.debug("{}", () -> Log.computeServerLogMessage(server, ", new command line for server: " + consoleMsg)); server.treatConsoleInput(consoleMsg); } } }
  • avec la méthode chat.server.Server::treatConsoleInput qui analyse la chaîne de caractères et envoie un signal aux deux threads :
    package chat.server; public final class Server { ... public void treatConsoleInput(final String line) { Objects.requireNonNull(line, "argument line cannot be null"); GEN.debug("{}", () -> Log.computeServerLogMessage(this, ", new command line on console")); if (line.equals("quit")) { threadToRcvMsgs.interrupt(); // do not interrupt the main thread during the execution of a Scenario because // all the clients and all the servers are controlled by the same "main" thread if (!Scenario.isJUnitScenario()) { Thread.currentThread().interrupt(); } } assert invariant(); }

Enfin, les threads pouvant être interrompus, il faut en tenir compte dans le traitement des exceptions IOException lors des lectures de messages. Par exemple, dans la méthode chat.common.FullDuplexMsgWorker::readMessage, lors du traitement des exceptions de type IOException, si le thread a été interrompu, alors le canal est considéré comme fermé :

package chat.common; public class FullDuplexMsgWorker { public ReadMessageStatus readMessage() { ... if (readState == ReadMessageStatus.READHEADERSTARTED) { if (inBuffers[0].position() < inBuffers[0].capacity()) { try { recvSize = rwChan.read(inBuffers[0]); } catch (IOException e) { if (Thread.interrupted()) { return ReadMessageStatus.CHANNELCLOSED; } ... } } ... } ... if (readState == ReadMessageStatus.READDATASTARTED) { if (inBuffers[1].position() < inBuffers[1].capacity()) { try { recvSize = rwChan.read(inBuffers[1]); ... } catch (IOException e) { if (Thread.interrupted()) { return ReadMessageStatus.CHANNELCLOSED; } ... } ... } return readState; } }

Architecture répartie et diagrammes de séquence de l'application de tchat

L'étape précédente à étudier des parties spécifiques de l'application pour présenter quelques concepts, patrons de conception et idiomes qui sont insérés dans le code. Dans cette section, nous revenons à l'architecture répartie avec des serveurs et des clients en action. Pour ce faire, nous utilisons entre autres la modélisation avec UML.

Architecture de l'exemple d'exécution

 

L'architecture que vous avez utilisée ci-avant pour les tests peut être visualisée de la manière suivant. Dans la figure qui suit, les cercles sont des processus, les restangles dans les cercles sont les espaces mémoire partagés par les threads du processus, et autour des rectangles, ce sont les threads. La figure montre que chaque client et chaque serveurs possède deux threads, le premier pour lire les commandes à la console et le second pour communiquer avec les autres entités.

Diagrammes de classes des parties client et serveur

 

Voici le diagramme de classes de la « partie » client :

Voici le diagramme de classes de la « partie » serveur :

Diagramme de séquence de l'émission d'un message de tchat — partie de la séquence depuis la saisie à la console par l'utilisateur jusqu'à l'émission du message vers le serveur auquel le client est connecté

dans les diagrammes de séquence, nous sommes surtout intéressés par la séquence des appels. Donc, nous ne visualisons pas les barres d'activation des objets. En outre, nous sommes essentiellement intéressés par l'utilisation des classes que nous avons écrites. C'est pourquoi nous ignorons les classes qui ne sont pas dans le code de l'ossature (SelectionKey, Optional, etc.). Par ailleurs, nous sommes principalement intéressés par les noms des méthodes exécutées. Par conséquent, nous n'indiquons que les principaux arguments et mettons par convention la chaîne de caractères « ... » pour remplacer les arguments qui nous intéressent moins. Enfin, nous sommes avant tout intéressés par les appels de méthodes. Ainsi, nous ne modélisons pas les retours d'appel.

dans l'ensemble des diagrammes de séquence, nous considérons que le mécanisme d'interception n'est pas activé, ou qu'aucun intercepteur client ou serveur n'est enregistré.

Voici le diagramme de séquence de l'émission d'un message de tchat — partie de la séquence depuis la saisie à la console par l'utilisateur jusqu'à l'émission du message vers le serveur auquel le client est connecté :

Diagramme de séquence de la réception d'un message de tchat par le serveur — partie de la séquence depuis la réception du message jusqu'à la transmission vers les serveurs voisins et les clients locaux

Tout message, c'est-à-dire tout objet dont la classe a pour classe parente MsgContent, contient l'idendité de l'émetteur ainsi qu'un chemin des serveurs que le message a visités. Lorsque le message provient d'un client, le chemin est par définition de longueur -1. Lorsque le chemin provient d'un serveur voisin, le chemin est par construction de longueur 1. Et, lorsque le message est passé par plusieurs serveurs, le chemin est par construction de longueur > 1.

Par ailleurs, un message de tchat, c'est-à-dire de type ChatMsgContent, contient un numéro de séquence.

Voici le diagramme de séquence de la réception d'un message de tchat par le serveur — partie de la réception du message jusqu'à la transmission vers les serveurs voisins et les clients locaux :

Diagramme de séquence de la réception d'un message de tchat par le client — partie de la séquence depuis la réception du message jusqu'à l'affichage dans la console

nous simplifions ce diagramme de séquence en ne modélisant ni les fragments loop des parcours des collections ni les fragments opt des filtrages.

Voici le diagramme de séquence de la réception d'un message de tchat par le client — partie de la séquence depuis la réception du message jusqu'à l'affichage dans la console :

Programmation réseau dans l'application de tchat

Pour terminer cette étude de l'ossature de départ, nous parcourons les parties de code concernant la programmation réseau avec JAVA NIO.

Connexions JAVA NIO

 

Toutes les communications entre les processus clients ou serveurs sont réalisées avec JAVA NIO dans des objets de la classe chat.common.FullDuplexMsgWorker, que vous avez étudiée dans les précédents TP pour gérer un lien bidirectionnel TCP.

Du côté du client, la classe chat.client.ReadMessagesFromNetwork étend la classe FullDuplexMsgWorker pour la gestion des communications du client avec le serveur auquel il est attaché. Le client chat.client.Client utilise une délégation vers la classe ReadMessagesFromNetwork via l'attribut Client::runnableToRcvMsgs pour les communications (émission et réception de messages).

Du côté du serveur, la classe chat.server.Server utilise aussi une délégation pour la réception des messages : via la classe chat.server.ReadMessagesFromNetwork avec l'appel à Selector::select dans la méthode ReadMessagesFromNetwork::run.

Le serveur rassemble les informations sur les clients (locaux) et les serveurs (voisins ou non) dans l'attribut Server::reachableEntities de type Map<Integer, RoutingInformation>, avec la classe RoutingInformation qui contient l'attribut worker de type FullDuplexMsgWorker :

  • pour les clients locaux, le SelectionKey correspond au worker vers le client local, et la longueur du chemin est égale à -1 ;
  • pour les serveurs voisins, le SelectionKey correspond au worker vers le serveur voisin, et la longueur du chemin est égale à 1 ;
  • pour les serveurs qui ne sont pas des voisins, le worker est le worker du serveur voisin qui permet de faire le premier saut dans le chemin le plus court vers le serveur destination, et la longueur du chemin est strictement supérieure à 1.

Structure des messages et gestion des cycles dans la topologie des serveurs

 

L'approche choisie pour gérer les cycles dans la topologie des serveurs est d'ajouter à tout message la séquence des serveurs déjà « visités ». Ainsi, l'attribut chat.common.MsgContent::path est une liste d'entiers, c'est-à-dire une liste d'identités de serveurs. Dans la méthode chat.server.Server::sendToAllNeighbouringServersExceptOne, un message à « diffuser aux serveurs voisins » n'est pas transmis vers les serveurs dont l'identité est déjà dans la valeur de path : condition !msg.getPath().contains(e.getKey()) dans le filtre du Stream. Rappelons que les serveurs voisins sont ceux qui vérifient la condition e.getValue().getLengthOfThePath() == 1 du même filtre.

Lorsque les serveurs sont créés, un algorithme réparti est exécuté qui construit sur chaque serveur la connaissance complète de la topologie du réseau. Cette connaissance est mémorisée dans la structure de données Server::reachableEntities de type Map<Integer, RoutingInformation>. La clef du dictionnaire est une identité (serveur ou client), et la valeur est un objet de type RoutingInformation, qui décrit la route vers cette entité.

Primitives de communication disponibles dans le serveur

 

La classe chat.server.Server contient les méthodes d'envoi de message suivantes :

  • sendToAServer : elle permet d'envoyer un message à un serveur donné qui n'a pas déjà « vu passer » le message. La condition « qui n'a pas déjà "vu passer" le message » exprime le traitement des cycles dans le réseau des serveurs : il faut éviter les boucles infinies. Lorsque le destinataire n'est pas un serveur voisin, l'attribut RoutingInformation::lengthOfThePath possède une valeur supérieure à 1. La méthode utilise les informations de routage pour identifier le voisin du prochain saut : l'identité de ce serveur est contenu dans l'attribut RoutingInformation::identityNeighbouringServer ;
  • sendToAllNeighbouringServersExceptOne : comme déjà rencontré dans les explications ci-avant, cette méthode transmet le message en argument à tous les serveurs voisins qui n'ont pas déjà « vu passer » le message. Les serveurs voisins sont ceux dont l'attribut RoutingInformation::lengthOfThePath est égal à 1. Par ailleurs, le premier argument exceptId sert à exclure un voisin pour exprimer par exemple « envoyer le message à tous les voisins, excepté celui duquel le message a été reçu (en plus de ceux qui ont déjà "vu passer" le message) » ;
  • sendToAllLocalClientsExceptOne : elle permet d'envoyer un message à tous les clients locaux qui n'ont pas déjà « vu passer » le message. Les clients locaux sont ceux dont l'attribut RoutingInformation::lengthOfThePath est égal à -1. L'argument exceptId sert à exclure un client local pour exprimer par exemple « envoyer le message à tous les clients locaux, excepté celui duquel le message a été reçu (en plus de ceux qui ont déjà "vu passer" le message) ».

Calcul des identités des processus

 

L'identité des serveurs est fourni en argument du main ou dans le constructeur de la classe Server.

L'identité des clients est construite par les serveurs auxquels ils se connectent. Dans la méthode Server::acceptNewClient, dans l'expression « identity * OFFSET_ID_CLIENT + numberOfClientsSinceBeginning », l'attribut identity est l'identité du serveur d'accès auquel le client est connecté, la constante OFFSET_ID_CLIENT vaut 100, et l'attribut numberOfClientsSinceBeginning sert à compter les ouvertures de connexion des clients. Par exemple, pour le serveur d'identité 1, les premier et deuxième clients auront comme identité les valeurs 1 * 100 + 0 = 100 et 1 * 100 + 1 = 101, respectivement; et pour le serveur d'identité 2, les premier et deuxième clients auront comme identité les valeurs 2 * 100 + 0 = 200 et 2 * 100 + 1 = 201, respectivement. Vous pouvez retrouver ces valeurs dans les affichages des messages de tchat lors de l'exécution de la classe de test TestScenarioStartingFramework.

À titre d'exemple, dans la classe de test TestScenarioStartingFramework, l'instruction Log.setLevel(Log.CHAT, Level.INFO) permet de voir le journal de l'affichage des émissions et des réceptions avec les identités des clients et des serveurs :

0 [main] INFO chat - Client 100 (client 0 of server 1) sending chat message: message 0 from c0 4 [main] INFO chat - Client 101 (client 1 of server 1) sending chat message: message 1 from c1 4 [main] INFO chat - Client 300 (client 0 of server 3) sending chat message: message 2 from c2 10 [main] INFO chat - Client 400 (client 0 of server 4) sending chat message: message 3 from c3 10 [main] INFO chat - Client 401 (client 1 of server 4) sending chat message: message 4 from c4 ...62 [Thread-12] INFO chat - Client 600 (client 0 of server 6) receives message 5 from c5 62 [Thread-12] INFO chat - Client 600 (client 0 of server 6) receives message 3 from c3 63 [Thread-12] INFO chat - Client 600 (client 0 of server 6) receives message 1 from c1 63 [Thread-12] INFO chat - Client 600 (client 0 of server 6) receives message 2 from c2 65 [Thread-10] INFO chat - Client 401 (client 1 of server 4) receives message 6 from c6 70 [Thread-9] INFO chat - Client 400 (client 0 of server 4) receives message 6 from c6

Algorithme de découverte de la topologie

 

Dans les scénarios de test que nous avons écrits et dans ceux que vous écrirez, la phase de création et de connexion des serveurs est suivie d'une petite attente. Cette petite attente est programmée en utilisant le canevas logiciel Awaitility. Par exemple, dans la classe de test TestScenarioStartingFramework :

Awaitility.await().until(() -> s1.isStarted() && s1.isThreadToRcvMsgsAlive() && s2.isStarted() && s3.isStarted() && s4.isStarted() && s5.isStarted() && s6.isStarted());
Ainsi, l'attente se termine dès que les six serveurs sont démarrés, et est au plus de 5 secondes est déclanchée (cf. la configuration dans la méthode de classe setUp()).

Algorithmiquement, pendant cette attente, un algorithme réparti de découverte de la topologie est exécuté de telle façon que chacun des serveurs connaîsse le chemin le plus court vers chacun des autres serveurs du système. Cet algorithme est déclaré dans le type énuméré chat.server.algorithms.topology.TopologyAction.

Concrètement, à la fin du constructeur Server::Server, le serveur émet un message IDENTITY_MESSAGE vers tous ses voisins ; une nouvelle instance de l'algorithme de découverte de la topologie débute. Lorsqu'un serveur s reçoit un message en provenance d'un serveur voisin, quelque soit le type du message, s analyse le chemin de ce message dans la méthode Server::parsePathOfMsgToUpdateRoutingInformation. Lorsque l'identité d'un nouveau serveur r est découverte, c'est-à-dire qu'aucune information de routage n'est disponible pour émettre vers r, ou lorsqu'un chemin plus court est découvert vers un serveur r déjà connu, alors le serveur s émet un message IDENTITY_MESSAGE vers r, donc en utilisant un nouveau chemin vers r.

Le traitement de la réception d'un message IDENTITY_MESSAGE est contenu dans la méthode Server::receiveIdentityContent. Notez que la méthode Server::parsePathOfMsgToUpdateRoutingInformation est exécutée avant l'entrée dans la méthode Server::receiveIdentityContent. Par conséquent, la table de routage est déjà mise à jour lors de l'entrée dans la méthode Server::receiveIdentityContent.

Enfin, la méthode Server::receiveIdentityContent consiste à annoncer une nouvelle information de routage (nouveau serveur ou nouveau chemin) aux serveurs voisins.

Nous laissons à votre sagacité l'écriture en pseudo-code de l'algorithme ainsi que ses propriétés et sa preuve.