Déployer des websockets sur un cluster de Node.js scalable

Tuto AXOPEN - On vous explique pas à pas le déploiement de websockets sur un cluster de Node.js
RomainV.jpg
Romain VIDALMis à jour le 28 Juil 2022
Tuto  Déployer des websockets sur un cluster de Node.js scalable

Pourquoi ce tuto ?

Nous voudrions que lors de certaines requêtes d’update des données soient transmises à l’ensemble des clients connectés via websocket.

Le problème ? C’est un serveur Node.js scalable deployé sur un cluster Kubernetes.

Architecture

Si l’on n'avait qu'une seule instance, le problème ne se poserait pas. Le serveur sait garder l’ensemble des connexions ouvertes pour leur envoyer un message. 

Websocket simple server

Lorsqu'on veut déployer des websockets sur un cluster (par exemple Kubernetes), il se pose le problème du partage des données de celles-ci.

En effet, dans un système où l'on souhaite broadcaster des events au client (en connexion descendante du NodeJS vers le client uniquement), seule l'instance ayant enregistré la modification est informée du changement et peut donc envoyer la notification à l'ensemble des websockets qui lui sont connectés.

On doit donc trouver un moyen pour partager les notifications à l'ensemble des clients.

Pour nous, ce sera RabbitMQ. 

websocket multi instance fleche

L'entrée en scène de RabbitMQ

Plusieurs solutions de queuing existent sur le marché, comme Redis ou RabbitMQ. Chacune avec ses particularités.

Nous utiliserons rabbitMQ pour des raisons d'architecture existante.

Ces systèmes de queuing permettent de partager des messages entre différentes parties / instances de l'application

Lors d'une action par un client l'ensemble des websockets doit être notifié. Un message est envoyé sur RabbitMQ qui le re-dispatche à l'ensemble des instances. Chaque instance, en recevant le message sait qu'elle doit à son tour broadcaster le message à toutes ces connexions websockets actives. 

websocket multi instance fleche

L'implémentation

Regardons maintenant comment l'implémenter. Nous allons nous focaliser sur la partie Node.jsServeur JavaScript. L'installation et le paramétrage de RabbitMQ ainsi la mise en place des Websocket pour les clients seront mis de côté. On utilisera le package ws pour la partie websocket.

   this.wss = new WebSocketServer({
     port: process.env.PORT,
     noServer: true,
     path: '/ws'
   });

On pourra broadcaster un message à tous les clients

  public sendToAllSubscriber(msg) {
    this.wss.clients.forEach(function each(client) {
        if (client.readyState === 1) {
            client.send(JSON.stringify(msg));
        }
    });
  }

Une fois défini, il ne nous reste qu'à paramétrer le rabbitMQ :

Dans le contrôleur de la mise à jour des données, nous souhaitons envoyer le message aux autres instances de Node.

send(message) {
 // Le message à envoyer
 const buf = Buffer.from(JSON.stringify(message));
 // Overture d'une connexion avec le rabbitMQ
 return connect(Tools.RMQ_ADDR)
    // Création d'un channel
    .then(conn => conn.createChannel())
    .then(chan => {
        // Le fanout permet un broadcast à tout les subscribers
        // qui seront pour nous toutes les instances de node clientes
        chan.assertExchange('ws-exchange', 'fanout')
        return chan;
    // Envoi du message au autre instance de node.
    }).then(chan => chan.publish('ws-exchange', 'ws', buf));
}

Ainsi que d'écouter les messages entrants afin de les broadcasters à toutes les websockets.

consume(message) {
 // Overture d'une connexion avec le rabbitMQ
 connect(rmqConnectionString).then(conn => {
    // Pramètrage du channel
    let channel = conn.createChannel();
    // Le fanout permet un broadcast à tout les subscribers
    // qui seront pour nous toutes les instances de node clientes
    channel.assertExchange('ws-exchange','fanout');
    return channel.assertQueue('' /* Le serveur créera son propre identifiant */ )
    }
 ).then( (channel) => {
           queueName = r.queue;
           channel.bindQueue(queueName, 'ws-exchange', 'ws');
           channel.consume(r.queue, (msg) => {
               // Les messages des autres instance de node en cas de mise à jour
               // arriverons ici
               if ( msg ) {
                   route.sendToAllSubscriber(msg?.content);
                   channel.ack(msg); // Ack du message pour le supprimer de la queue
               }
          });
     })

On publie la mise à jour des données, toutes les websockets vont devoir recevoir une notification.

this.publish({channel: 'ws-news', payload: 'update'});

Et c'est suffisant pour broadcaster le message websocket à toutes les instances de Node qui à leur tour vont broadcaster à tous les websockets ouvertes sur l'instance.

Mission accomplie ;)

Pour échanger ou aller plus loin, n'hésitez pas à nous contacter !