Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions src/controllers/realtime/index.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
const Room = require('./room');

class RealTimeController {
const expirationThrottleDelay = 1000;

class RealTimeController {
/**
* @param {Kuzzle} kuzzle
*/
constructor (kuzzle) {
this._kuzzle = kuzzle;
this.lastExpirationTimestamp = 0;

this.subscriptions = {
filters: {},
channels: {}
};
}

Expand Down Expand Up @@ -68,7 +68,7 @@ class RealTimeController {
throw new Error('Kuzzle.realtime.subscribe: a callback function is required');
}

const room = new Room(this.kuzzle, index, collection, filters, callback, options);
const room = new Room(this, index, collection, filters, callback, options);

return room.subscribe()
.then(() => {
Expand Down Expand Up @@ -105,6 +105,25 @@ class RealTimeController {
return response.result;
});
}

/**
* Removes all subscriptions, and emit a "tokenExpired" event
* (tries to prevent event duplication by throttling it)
*/
tokenExpired() {
for (const roomId of Object.keys(this.subscriptions)) {
this.subscriptions[roomId].forEach(room => room.removeListeners());
}

this.subscriptions = {};
this.kuzzle.jwt = undefined;

const now = Date.now();
if ((now - this.lastExpirationTimestamp) > expirationThrottleDelay) {
this.lastExpirationTimestamp = now;
this.kuzzle.emit('tokenExpired');
}
}
}

module.exports = RealTimeController;
19 changes: 14 additions & 5 deletions src/controllers/realtime/room.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
class Room {

/**
* @param {Kuzzle} kuzzle
* @param {RealTimeController} controller
* @param {string} index
* @param {string} collection
* @param {object} body
* @param {function} callback
* @param {object} options
*/
constructor (kuzzle, index, collection, body, callback, options = {}) {
this.kuzzle = kuzzle;
constructor (controller, index, collection, body, callback, options = {}) {
this.controller = controller;
this.kuzzle = controller.kuzzle;
this.index = index;
this.collection = collection;
this.callback = callback;
Expand All @@ -33,7 +34,7 @@ class Room {

this.autoResubscribe = typeof options.autoResubscribe === 'boolean'
? options.autoResubscribe
: kuzzle.autoResubscribe;
: this.kuzzle.autoResubscribe;
this.subscribeToSelf = typeof options.subscribeToSelf === 'boolean'
? options.subscribeToSelf
: true;
Expand Down Expand Up @@ -71,7 +72,15 @@ class Room {
}

_channelListener (data) {
const fromSelf = data.volatile && data.volatile.sdkInstanceId === this.kuzzle.protocol.id;
// intercept token expiration messages and relay them to the parent
// controller
if (data.type === 'TokenExpired') {
return this.controller.tokenExpired();
}

const fromSelf =
data.volatile && data.volatile.sdkInstanceId === this.kuzzle.protocol.id;

if (this.subscribeToSelf || !fromSelf) {
this.callback(data);
}
Expand Down
76 changes: 58 additions & 18 deletions test/controllers/realtime.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ describe('Realtime Controller', () => {

beforeEach(() => {
kuzzle = {
query: sinon.stub().resolves()
query: sinon.stub().resolves(),
emit: sinon.stub()
};
kuzzle.realtime = new RealtimeController(kuzzle);
});
Expand Down Expand Up @@ -96,22 +97,27 @@ describe('Realtime Controller', () => {
beforeEach(() => {
room = null;

mockrequire('../../src/controllers/realtime/room', function (kuz, index, collection, body, callback, opts = {}) {
room = {
kuzzle: kuz,
index,
collection,
body,
callback,
options: opts,
id: roomId,
subscribe: sinon.stub().resolves(subscribeResponse)
};

return room;
});

kuzzle.realtime = new (mockrequire.reRequire('../../src/controllers/realtime/index'))(kuzzle);
mockrequire(
'../../src/controllers/realtime/room',
function (controller, index, collection, body, callback, opts = {}) {
room = {
controller,
index,
collection,
body,
callback,
kuzzle: controller.kuzzle,
options: opts,
id: roomId,
subscribe: sinon.stub().resolves(subscribeResponse)
};

return room;
});

const MockRealtimeController =
mockrequire.reRequire('../../src/controllers/realtime/index');
kuzzle.realtime = new MockRealtimeController(kuzzle);
});

it('should throw an error if the "index" argument is not provided', () => {
Expand Down Expand Up @@ -245,7 +251,7 @@ describe('Realtime Controller', () => {
});
});

it('should call realtime/unsubiscribe query with the roomId and return a Promise which resolves the roomId', () => {
it('should call realtime/unsubscribe query with the roomId and return a Promise which resolves the roomId', () => {
return kuzzle.realtime.unsubscribe(roomId, options)
.then(res => {
should(kuzzle.query)
Expand All @@ -260,4 +266,38 @@ describe('Realtime Controller', () => {
});
});
});

describe('#tokenExpired', () => {
it('should clear all subscriptions and emit a "tokenExpired" event', () => {
const stub = sinon.stub();

kuzzle.jwt = 'foobar';

for (let i = 0; i < 10; i++) {
kuzzle.realtime.subscriptions[uuidv4()] = [{removeListeners: stub}];
}

kuzzle.realtime.tokenExpired();

should(kuzzle.realtime.subscriptions).be.empty();
should(stub.callCount).be.eql(10);
should(kuzzle.emit).calledOnce().calledWith('tokenExpired');
should(kuzzle.jwt).be.undefined();
});

it('should throttle to prevent emitting duplicate occurrences of the same event', () => {
const stub = sinon.stub();

for (let i = 0; i < 10; i++) {
kuzzle.realtime.subscriptions[uuidv4()] = [{removeListeners: stub}];

kuzzle.realtime.tokenExpired();

should(kuzzle.realtime.subscriptions).be.empty();
should(stub.callCount).be.eql(i+1);
}

should(kuzzle.emit).calledOnce().calledWith('tokenExpired');
});
});
});
Loading