Nous voudrions que lors de certaines requêtes d’update des données soient transmises à l’ensemble des clients connectés via websocket.
Le problème ? C’est un serveur Node.js scalable deployé sur un cluster Kubernetes.
Si l’on n'avait qu'une seule instance, le problème ne se poserait pas. Le serveur sait garder l’ensemble des connexions ouvertes pour leur envoyer un message.
Lorsqu'on veut déployer des websockets sur un cluster (par exemple Kubernetes), il se pose le problème du partage des données de celles-ci.
En effet, dans un système où l'on souhaite broadcaster des events au client (en connexion descendante du NodeJS vers le client uniquement), seule l'instance ayant enregistré la modification est informée du changement et peut donc envoyer la notification à l'ensemble des websockets qui lui sont connectés.
On doit donc trouver un moyen pour partager les notifications à l'ensemble des clients.
Pour nous, ce sera RabbitMQ.
Plusieurs solutions de queuing existent sur le marché, comme Redis ou RabbitMQ. Chacune avec ses particularités.
Nous utiliserons rabbitMQ pour des raisons d'architecture existante.
Ces systèmes de queuing permettent de partager des messages entre différentes parties / instances de l'application
Lors d'une action par un client l'ensemble des websockets doit être notifié. Un message est envoyé sur RabbitMQ qui le re-dispatche à l'ensemble des instances. Chaque instance, en recevant le message sait qu'elle doit à son tour broadcaster le message à toutes ces connexions websockets actives.
Regardons maintenant comment l'implémenter. Nous allons nous focaliser sur la partie Node.jsServeur JavaScript. L'installation et le paramétrage de RabbitMQ ainsi la mise en place des Websocket pour les clients seront mis de côté. On utilisera le package ws pour la partie websocket.
this.wss = new WebSocketServer({
port: process.env.PORT,
noServer: true,
path: '/ws'
});
On pourra broadcaster un message à tous les clients
public sendToAllSubscriber(msg) {
this.wss.clients.forEach(function each(client) {
if (client.readyState === 1) {
client.send(JSON.stringify(msg));
}
});
}
Une fois défini, il ne nous reste qu'à paramétrer le rabbitMQ :
Dans le contrôleur de la mise à jour des données, nous souhaitons envoyer le message aux autres instances de Node.
send(message) {
// Le message à envoyer
const buf = Buffer.from(JSON.stringify(message));
// Overture d'une connexion avec le rabbitMQ
return connect(Tools.RMQ_ADDR)
// Création d'un channel
.then(conn => conn.createChannel())
.then(chan => {
// Le fanout permet un broadcast à tout les subscribers
// qui seront pour nous toutes les instances de node clientes
chan.assertExchange('ws-exchange', 'fanout')
return chan;
// Envoi du message au autre instance de node.
}).then(chan => chan.publish('ws-exchange', 'ws', buf));
}
Ainsi que d'écouter les messages entrants afin de les broadcasters à toutes les websockets.
consume(message) {
// Overture d'une connexion avec le rabbitMQ
connect(rmqConnectionString).then(conn => {
// Pramètrage du channel
let channel = conn.createChannel();
// Le fanout permet un broadcast à tout les subscribers
// qui seront pour nous toutes les instances de node clientes
channel.assertExchange('ws-exchange','fanout');
return channel.assertQueue('' /* Le serveur créera son propre identifiant */ )
}
).then( (channel) => {
queueName = r.queue;
channel.bindQueue(queueName, 'ws-exchange', 'ws');
channel.consume(r.queue, (msg) => {
// Les messages des autres instance de node en cas de mise à jour
// arriverons ici
if ( msg ) {
route.sendToAllSubscriber(msg?.content);
channel.ack(msg); // Ack du message pour le supprimer de la queue
}
});
})
On publie la mise à jour des données, toutes les websockets vont devoir recevoir une notification.
this.publish({channel: 'ws-news', payload: 'update'});
Et c'est suffisant pour broadcaster le message websocket à toutes les instances de Node qui à leur tour vont broadcaster à tous les websockets ouvertes sur l'instance.
Mission accomplie ;)
Pour échanger ou aller plus loin, n'hésitez pas à nous contacter !
Découvrez la planche #9 !
Quel est le rôle d'un architecte système d'information ? Son apport concret dans les projets de développement d'applications.
Chez AXOPEN, nous souhaitions se brancher sur GitLab pour récupérer la liste des issues pour un certain projet. On s’est donc lancé dans la création d’un connector Gitlab pour Google Data Studio.