/*****************************/ /* serveurUDP_corrige_Q3.3.c */ /*****************************/ #include #include #include #include #include #include #include #include #include #include #include #include "redblack.h" #include "initCommunication.h" #include "client-serveur.h" // // Constantes de fonctionnement du programme // // #define NB_MAX_REPONSES 10 // Nombre maximum de reponses que le serveur fait // a un client (avant de lui repondre REPONSE_KO) #define NB_THREADS_DANS_POOL 10 // Nombre de threads dans le pool // Structure permettant de passer un entier en parametre lors // d'un pthread_create union IntPtr { int entier; void *ptr; }; // // Structure pour les noeuds du red-black tree // typedef struct{ char key[NI_MAXHOST + sizeof(':') + NI_MAXSERV]; // Le "+ 1" est la pour le ":" qui sert de separateur int nbReponsesAuClient; int lastNbReponsesAuClient; } node_t; // // Mutex divers // pthread_mutex_t mutexRangClient; // Mutex evitant les race conditions sur rangClient pthread_mutex_t mutexRbtree; // Mutex pour l'arbre red-black // // Pour lecteurs-rédacteur entre gestionRequete() et ramasseNoeudsInutiles() // pthread_mutex_t fifo; // cf. cours lecteurs-redacteurs pthread_mutex_t mutexL; // cf. cours lecteurs-redacteurs pthread_mutex_t mutexG; // cf. cours lecteurs-redacteurs int nbLecteurs = 0; // cf. cours lecteurs-redacteurs // // Arbre red-black; // struct rbtree *rbtree = NULL; // // Macros ameliorant la lisibilite des fonctions utilisant des mutex // #define MUTEX_LOCK(m) \ { \ int rc=pthread_mutex_lock(&(m)); \ if (rc < 0) \ error_at_line(EXIT_FAILURE,rc,__FILE__,__LINE__,"pthread_mutex_lock"); \ } #define MUTEX_UNLOCK(m) \ { \ int rc=pthread_mutex_unlock(&(m)); \ if (rc < 0) \ error_at_line(EXIT_FAILURE,rc,__FILE__,__LINE__,"pthread_mutex_unlock"); \ } int compare(const void *pa, const void *pb, const void *config) { node_t *pnodeA = (node_t *)pa; node_t *pnodeB = (node_t *)pb; return strcmp(pnodeA->key, pnodeB->key); } void gestionRequete(int fdPointDAcces, struct sockaddr_storage addrEmetteur, socklen_t addrEmetteur_len, message_t requete) { static int rangClient = 0; message_t reponse; // On determine l'emetteur du message char host[NI_MAXHOST], service[NI_MAXSERV]; int rc = getnameinfo((struct sockaddr *) &addrEmetteur, addrEmetteur_len, host, NI_MAXHOST, service, NI_MAXSERV, NI_NUMERICSERV); if (rc != 0) error_at_line(EXIT_FAILURE, errno, __FILE__, __LINE__, "getnameinfo"); // On traite la requete char key[NI_MAXHOST + sizeof(':') + NI_MAXSERV]; // Le "+ 1" est la pour le ":" qui sert de separateur sprintf(key, "%s:%s", host, service); // Debut du role de lecteur dans paradigme synchro lecteurs-redacteurs MUTEX_LOCK(fifo); MUTEX_LOCK(mutexL); nbLecteurs++; if (nbLecteurs == 1) { MUTEX_LOCK(mutexG); } MUTEX_UNLOCK(mutexL); MUTEX_UNLOCK(fifo); // Recherche de key dans l'arbre MUTEX_LOCK(mutexRbtree); node_t *pnode = (node_t *)rbfind(key, rbtree); MUTEX_UNLOCK(mutexRbtree); if (pnode == NULL) { // C'est un nouveau client pnode = malloc(sizeof(node_t)); assert(pnode != NULL); strcpy(pnode->key, key); pnode->nbReponsesAuClient = 0; pnode->lastNbReponsesAuClient = 0; MUTEX_LOCK(mutexRbtree); (void)rbsearch(pnode, rbtree); MUTEX_UNLOCK(mutexRbtree); MUTEX_LOCK(mutexRangClient); rangClient++; printf("%6d-ieme client a traiter (%s:%s)\n", rangClient, host, service); MUTEX_UNLOCK(mutexRangClient); } if (pnode->nbReponsesAuClient < NB_MAX_REPONSES) { reponse.typ = REQUETE_OK; pnode->nbReponsesAuClient++; CALCUL_PAYLOAD(reponse.infoSup.payload, pnode->nbReponsesAuClient); //usleep(100000); } else { reponse.typ = REQUETE_KO; } // Fin du role de lecteur dans paradigme synchro lecteurs-redacteurs MUTEX_LOCK(mutexL); nbLecteurs--; if (nbLecteurs == 0) { MUTEX_UNLOCK(mutexG); } MUTEX_UNLOCK(mutexL); ssize_t nbWrite = sendto(fdPointDAcces, &reponse, sizeof(reponse), 0, (struct sockaddr *) &addrEmetteur, addrEmetteur_len); if (nbWrite != sizeof(reponse)) error_at_line(EXIT_FAILURE, errno, __FILE__, __LINE__, "sendto"); } void *gestionPointDAcces(void *arg) { union IntPtr ip; ip.ptr = arg; int fdPointDAcces = ip.entier; do { // Attente message struct sockaddr_storage addrEmetteur; socklen_t addrEmetteur_len = sizeof(addrEmetteur); message_t requete; ssize_t nbRead = recvfrom(fdPointDAcces, &requete, sizeof(requete), 0, (struct sockaddr *) &addrEmetteur, &addrEmetteur_len); if (nbRead != sizeof(requete)) error_at_line(EXIT_FAILURE, errno, __FILE__, __LINE__, "recvfrom"); // Gestion de cette requete gestionRequete(fdPointDAcces, addrEmetteur, addrEmetteur_len, requete); } while (1); } void compteNoeud(const void *nodep, const VISIT which, const int unused2, void *arg) { int *pCptr = (int *)arg; if ((which == endorder) || (which == leaf)) { *pCptr += 1; } } void compteNoeudASupprimer(const void *nodep, const VISIT which, const int unused2, void *arg) { node_t *pnode = (node_t *) nodep; int *pCptr = (int *)arg; if (((which == endorder) || (which == leaf)) && (pnode->nbReponsesAuClient == pnode->lastNbReponsesAuClient)) { *pCptr += 1; } } void traitementCtxt(const void *nodep, const VISIT which, const int unused2, void *unused3) { node_t *pnode = (node_t *) nodep; if ((which == endorder) || (which == leaf)) { // Le noeud sur lequel on est est une feuille ET c'est la derniere fois // qu'on passe par ce noeud. if (pnode->nbReponsesAuClient == pnode->lastNbReponsesAuClient) { // Par ailleurs, il n'y a plus de client qui utilise ce contexte : on // peut donc le supprimer (void)rbdelete(pnode->key, rbtree); free(pnode); } else { // Un client utilise encore ce client. On met à jour // lastNbReponsesAuClient pour eventuellement supprimer ce ctxt // a la prochaine execution de ramasseNoeudsInutiles pnode->lastNbReponsesAuClient = pnode->nbReponsesAuClient; } } } void ramasseNoeudsInutiles() { do { sleep(5); // Debut du role de redacteur dans paradigme synchro lecteurs-redacteurs MUTEX_LOCK(fifo); MUTEX_LOCK(mutexG); MUTEX_UNLOCK(fifo); // Travail en tant que redacteur dans paradigme synchro lecteurs-redacteurs // On compte le nombre de noeuds avant int nbNoeudsAvantRbwalk = 0; (void)rbwalk(rbtree, compteNoeud, &nbNoeudsAvantRbwalk); // On compte les noeuds qui sont supprimables (pour pouvoir ensuite // determiner combien de noeuds ont echappe a la suppression au moment du // rbwalk) int nbNoeudsASuppr = 0; (void)rbwalk(rbtree, compteNoeudASupprimer, &nbNoeudsASuppr); // On supprime les noeuds obsoletes (void)rbwalk(rbtree, traitementCtxt, NULL); // On compte le nombre de noeuds apres int nbNoeudsApresRbwalk = 0; (void)rbwalk(rbtree, compteNoeud, &nbNoeudsApresRbwalk); // Fin du role de redacteur dans paradigme synchro lecteurs-redacteurs MUTEX_UNLOCK(mutexG); printf("Appel de ramasseNoeudsInutiles() ==> nbNoeudsAvantRbwalk=%4d, nbNoeudsApresRbwalk=%4d ==> Noeuds suppr/ASuppr=%4d/%4d\n", nbNoeudsAvantRbwalk, nbNoeudsApresRbwalk, nbNoeudsAvantRbwalk - nbNoeudsApresRbwalk, nbNoeudsASuppr); } while (1); } int main() { int fdPointDAcces = creationPointDAcces(); int i; // Initialisation des mutex pthread_mutex_init(&mutexRangClient, NULL); pthread_mutex_init(&mutexRbtree, NULL); pthread_mutex_init(&fifo, NULL); pthread_mutex_init(&mutexL, NULL); pthread_mutex_init(&mutexG, NULL); // Initialisation red-black tree if ((rbtree=rbinit(compare, NULL))==NULL) { fprintf(stderr, "insufficient memory\n"); exit(EXIT_FAILURE); } // Creation du pool de threads for (i = 0 ; i < NB_THREADS_DANS_POOL ; i++) { pthread_t thread; union IntPtr ip; ip.entier = fdPointDAcces; int rc = pthread_create(&thread, NULL, gestionPointDAcces, ip.ptr); if (rc < 0) error_at_line(EXIT_FAILURE, rc, __FILE__, __LINE__, "pthread_create"); rc = pthread_detach(thread); if (rc < 0) error_at_line(EXIT_FAILURE, rc, __FILE__, __LINE__, "pthread_detach"); } // Le thread du main est le thread qui fait ramasseNoeudsInutiles ramasseNoeudsInutiles(); return EXIT_SUCCESS; }