Skip to content

Commit

Permalink
💥 Bump redis to v4
Browse files Browse the repository at this point in the history
Fixes #19

This is a **BREAKING** change that upgrades the underlying `redis`
client to v4, which had so many [breaking changes][1], that it's
infeasible to maintain backwards-compatibility with v3. If consumers
need to use v3, they are advised to pin their version of
`sharedb-redis-pubsub` to v4.

This change adapts to the breakages:

 - wrap the returned `Promise`s in callbacks
 - actively connect the `client`, which no longer auto-connects
 - move the `message` event handler into the subscription callback
 - adapt to the new call signature of `client.eval()`

[1]: https://github.com/redis/node-redis/blob/HEAD/docs/v3-to-v4.md
  • Loading branch information
alecgibson committed Feb 7, 2024
1 parent fa5921e commit 13c46af
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 18 deletions.
61 changes: 47 additions & 14 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@ function RedisPubSub(options) {
// options if not provided
this.observer = options.observer || redis.createClient(this.client.options);

var pubsub = this;
this.observer.on('message', function(channel, message) {
var data = JSON.parse(message);
pubsub._emit(channel, data);
});
this._connect();
}
module.exports = RedisPubSub;

Expand All @@ -37,27 +33,64 @@ RedisPubSub.prototype.close = function(callback) {
var pubsub = this;
PubSub.prototype.close.call(this, function(err) {
if (err) return callback(err);
pubsub.client.quit(function(err) {
if (err) return callback(err);
pubsub.observer.quit(callback);
});
pubsub._close().then(function() {
callback();
}, callback);
});
};

RedisPubSub.prototype._close = function() {
return this._closing = this._closing || this._connect().then(Promise.all([
this.client.quit(),
this.observer.quit()
]));
};

RedisPubSub.prototype._subscribe = function(channel, callback) {
this.observer.subscribe(channel, callback);
var pubsub = this;
this._observerConnection.then(function() {
pubsub.observer
.subscribe(channel, function(message) {
var data = JSON.parse(message);
pubsub._emit(channel, data);
})
.then(function() {
callback();
}, callback);
});
};

RedisPubSub.prototype._unsubscribe = function(channel, callback) {
this.observer.unsubscribe(channel, callback);
this.observer.unsubscribe(channel)
.then(function() {
callback();
}, callback);
};

RedisPubSub.prototype._publish = function(channels, data, callback) {
var message = JSON.stringify(data);
var args = [PUBLISH_SCRIPT, 0, message].concat(channels);
this.client.eval(args, callback);
var client = this.client;
this._clientConnection.then(function() {
var message = JSON.stringify(data);
var arguments = [message].concat(channels);
client.eval(PUBLISH_SCRIPT, {arguments: arguments}).then(function() {
callback();
}, callback);
});
};

RedisPubSub.prototype._connect = function() {
this._clientConnection = this._clientConnection || connect(this.client);
this._observerConnection = this._observerConnection || connect(this.observer);
return Promise.all([
this._clientConnection,
this._observerConnection
]);
};

function connect(client) {
return client.isOpen ? Promise.resolve() : client.connect();
}

var PUBLISH_SCRIPT =
'for i = 2, #ARGV do ' +
'redis.call("publish", ARGV[i], ARGV[1]) ' +
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{
"name": "sharedb-redis-pubsub",
"version": "4.0.0",
"version": "5.0.0",
"description": "Redis pub/sub adapter adapter for ShareDB",
"main": "index.js",
"dependencies": {
"redis": "^2.6.0 || ^3.0.0",
"redis": "^4.0.0",
"sharedb": "^1.0.0 || ^2.0.0 || ^3.0.0 || ^4.0.0"
},
"devDependencies": {
Expand Down
43 changes: 41 additions & 2 deletions test/test.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,44 @@
var redisPubSub = require('../index');
var redis = require('redis');
var runTestSuite = require('sharedb/test/pubsub');

require('sharedb/test/pubsub')(function(callback) {
callback(null, redisPubSub());
describe('default options', function() {
runTestSuite(function(callback) {
callback(null, redisPubSub());
});
});

describe('unconnected client', function() {
runTestSuite(function(callback) {
callback(null, redisPubSub({
client: redis.createClient()
}));
});
});

describe('connected client', function() {
var client;

beforeEach(function(done) {
client = redis.createClient();
client.connect().then(function() {
done();
}, done);
});

runTestSuite(function(callback) {
callback(null, redisPubSub({
client: client
}));
});
});

describe('connecting client', function() {
runTestSuite(function(callback) {
var client = redis.createClient();
client.connect();
callback(null, redisPubSub({
client: client
}));
});
});

0 comments on commit 13c46af

Please sign in to comment.