From f29458da1d6b270d44dc0e6a5643a735ecb73aee Mon Sep 17 00:00:00 2001 From: Claire Date: Mon, 21 Mar 2022 19:08:29 +0100 Subject: [PATCH] Fix streaming server sometimes silently dropping subscriptions (#17841) --- streaming/index.js | 47 +++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 42 insertions(+), 5 deletions(-) diff --git a/streaming/index.js b/streaming/index.js index 3db94b160..d6b445a91 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -167,6 +167,11 @@ const startWorker = async (workerId) => { const redisPrefix = redisNamespace ? `${redisNamespace}:` : ''; + /** + * @type {Object.>} + */ + const subs = {}; + const redisSubscribeClient = await redisUrlToClient(redisParams, process.env.REDIS_URL); const redisClient = await redisUrlToClient(redisParams, process.env.REDIS_URL); @@ -191,23 +196,55 @@ const startWorker = async (workerId) => { }; /** + * @param {string} message * @param {string} channel - * @param {function(string): void} callback */ - const subscribe = (channel, callback) => { - log.silly(`Adding listener for ${channel}`); + const onRedisMessage = (message, channel) => { + const callbacks = subs[channel]; - redisSubscribeClient.subscribe(channel, callback); + log.silly(`New message on channel ${channel}`); + + if (!callbacks) { + return; + } + + callbacks.forEach(callback => callback(message)); }; /** * @param {string} channel * @param {function(string): void} callback */ + const subscribe = (channel, callback) => { + log.silly(`Adding listener for ${channel}`); + + subs[channel] = subs[channel] || []; + + if (subs[channel].length === 0) { + log.verbose(`Subscribe ${channel}`); + redisSubscribeClient.subscribe(channel, onRedisMessage); + } + + subs[channel].push(callback); + }; + + /** + * @param {string} channel + */ const unsubscribe = (channel, callback) => { log.silly(`Removing listener for ${channel}`); - redisSubscribeClient.unsubscribe(channel, callback); + if (!subs[channel]) { + return; + } + + subs[channel] = subs[channel].filter(item => item !== callback); + + if (subs[channel].length === 0) { + log.verbose(`Unsubscribe ${channel}`); + redisSubscribeClient.unsubscribe(channel); + delete subs[channel]; + } }; const FALSE_VALUES = [