Skip to content

Commit 9040d1a

Browse files
committed
Added support for individual commands
1 parent 33bee83 commit 9040d1a

File tree

5 files changed

+379
-240
lines changed

5 files changed

+379
-240
lines changed

lib/commands.js

Lines changed: 62 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,78 @@
11
'use strict';
22

3+
require('./individualCommands'); // We should define individual commands before adding standard commands
34
var commands = require('redis-commands');
45
var Multi = require('./multi');
56
var RedisClient = require('../').RedisClient;
67

78
var parseArgs_arr, parseArgs_callback;
89

10+
// Calls individual command with truthy this.cur_command_ret_buf
11+
RedisClient.prototype.call_individual_command_buf = Multi.prototype.call_individual_command_buf = function (command, args) {
12+
try {
13+
this.cur_command_ret_buf++;
14+
return this[command].apply(this, args);
15+
} finally {
16+
this.cur_command_ret_buf--;
17+
}
18+
};
19+
920
// TODO: Rewrite this including the invidual commands into a Commands class
1021
// that provided a functionality to add new commands to the client
1122

1223
commands.list.forEach(function (command) {
1324

25+
// Adding "b_" prefixed functions.
26+
// Multi and batch should not be prefixed because you should add prefixes directly to commands you calling, for example client.multi().b_get().exec();
27+
if (command !== 'multi' && command !== 'batch' && command !== 'exec') {
28+
if (!RedisClient.prototype['b_' + command]) {
29+
//When function already exists we can not call internal_send_command
30+
if (RedisClient.prototype[command]) {
31+
RedisClient.prototype['b_' + command] = function () {
32+
return this.call_individual_command_buf(command, arguments);
33+
};
34+
}else{
35+
RedisClient.prototype['b_' + command] = function () {
36+
parseArgs(arguments);
37+
return this.internal_send_command(command, parseArgs_arr, parseArgs_callback, undefined, true);
38+
};
39+
}
40+
}
41+
42+
if (!Multi.prototype['b_' + command]) {
43+
// When function already exists we can not just call this.queue.push only
44+
if (Multi.prototype[command]) {
45+
// Individual command should check this.cur_command_ret_buf when pushing into this.queue
46+
switch (command) {
47+
case 'select':
48+
case 'monitor':
49+
case 'quit':
50+
case 'info':
51+
case 'auth':
52+
case 'client':
53+
case 'hmset':
54+
case 'subscribe':
55+
case 'unsubscribe':
56+
case 'psubscribe':
57+
case 'punsubscribe':
58+
Multi.prototype['b_' + command] = function () {
59+
return this.call_individual_command_buf(command, arguments);
60+
};
61+
break;
62+
default:
63+
throw new TypeError('You have added new individual command for Multi: ' + command + '.' +
64+
'Before continue please check that your command checks this.cur_command_ret_buf when pushing into this.queue.');
65+
}
66+
}else{
67+
Multi.prototype['b_' + command] = function () {
68+
parseArgs(arguments);
69+
this.queue.push([command, parseArgs_arr, parseArgs_callback, undefined, true]);
70+
return this;
71+
};
72+
}
73+
}
74+
}
75+
1476
// Do not override existing functions
1577
if (!RedisClient.prototype[command]) {
1678
RedisClient.prototype[command.toUpperCase()] = RedisClient.prototype[command] = function () {
@@ -19,14 +81,6 @@ commands.list.forEach(function (command) {
1981
};
2082
}
2183

22-
// Do not override existing functions
23-
if (!RedisClient.prototype['b_' + command]) {
24-
RedisClient.prototype['b_' + command] = function () {
25-
parseArgs(arguments);
26-
return this.internal_send_command(command, parseArgs_arr, parseArgs_callback, undefined, true);
27-
};
28-
}
29-
3084
// Do not override existing functions
3185
if (!Multi.prototype[command]) {
3286
Multi.prototype[command.toUpperCase()] = Multi.prototype[command] = function () {
@@ -35,15 +89,6 @@ commands.list.forEach(function (command) {
3589
return this;
3690
};
3791
}
38-
39-
// Do not override existing functions
40-
if (!Multi.prototype['b_' + command]) {
41-
Multi.prototype['b_' + command] = function () {
42-
parseArgs(arguments);
43-
this.queue.push([command, parseArgs_arr, parseArgs_callback, undefined, true]);
44-
return this;
45-
};
46-
}
4792
});
4893

4994
function parseArgs(args) {

lib/extendedApi.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ RedisClient.prototype.send_command_buf = function (command, args, callback) {
1515
// Increment instead of setting to true is needed to return buffers in all subcommands even subcommands are calling several times in a row.
1616
// If some subcommands are calling in process.nextTick then they must check this.cur_command_ret_buf property before process.nextTick and decide to use "b_" prefix by themselves.
1717
this.cur_command_ret_buf++;
18-
this.send_command(command, args, callback);
18+
return this.send_command(command, args, callback);
1919
} finally {
20-
this.cur_command_ret_buf--; // Decrease even if internal_send_command has not been called or throw. An exception will be thrown up.
20+
this.cur_command_ret_buf--; // Decrease even if internal_send_command has not been called or throw. Decrement is before return. Exceptions will be thrown up.
2121
}
2222
};
2323

lib/individualCommands.js

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ RedisClient.prototype.select = RedisClient.prototype.SELECT = function select (d
4646
};
4747

4848
Multi.prototype.select = Multi.prototype.SELECT = function select (db, callback) {
49-
this.queue.push(['select', [db], select_callback(this._client, db, callback)]);
49+
this.queue.push(['select', [db], select_callback(this._client, db, callback), undefined, this.cur_command_ret_buf]);
5050
return this;
5151
};
5252

@@ -68,7 +68,7 @@ RedisClient.prototype.monitor = RedisClient.prototype.MONITOR = function monitor
6868
Multi.prototype.monitor = Multi.prototype.MONITOR = function monitor (callback) {
6969
// Use a individual command, as this is a special case that does not has to be checked for any other command
7070
if (this.exec !== this.exec_transaction) {
71-
this.queue.push(['monitor', [], monitor_callback(this._client, callback)]);
71+
this.queue.push(['monitor', [], monitor_callback(this._client, callback), undefined, this.cur_command_ret_buf]);
7272
return this;
7373
}
7474
// Set multi monitoring to indicate the exec that it should abort
@@ -115,7 +115,7 @@ Multi.prototype.QUIT = Multi.prototype.quit = function (callback) {
115115
self.closing = true;
116116
self.ready = false;
117117
};
118-
this.queue.push(['quit', [], quit_callback(self, callback), call_on_write]);
118+
this.queue.push(['quit', [], quit_callback(self, callback), call_on_write, this.cur_command_ret_buf]);
119119
return this;
120120
};
121121

@@ -174,7 +174,7 @@ Multi.prototype.info = Multi.prototype.INFO = function info (section, callback)
174174
} else if (section !== undefined) {
175175
args = Array.isArray(section) ? section : [section];
176176
}
177-
this.queue.push(['info', args, info_callback(this._client, callback)]);
177+
this.queue.push(['info', args, info_callback(this._client, callback), undefined, this.cur_command_ret_buf]);
178178
return this;
179179
};
180180

@@ -216,7 +216,7 @@ Multi.prototype.auth = Multi.prototype.AUTH = function auth (pass, callback) {
216216

217217
// Stash auth for connect and reconnect.
218218
this.auth_pass = pass;
219-
this.queue.push(['auth', [pass], auth_callback(this._client, callback)]);
219+
this.queue.push(['auth', [pass], auth_callback(this._client, callback), undefined, this.cur_command_ret_buf]);
220220
return this;
221221
};
222222

@@ -307,7 +307,7 @@ Multi.prototype.client = Multi.prototype.CLIENT = function client () {
307307
};
308308
}
309309
}
310-
this.queue.push(['client', arr, callback, call_on_write]);
310+
this.queue.push(['client', arr, callback, call_on_write, undefined, this.cur_command_ret_buf]);
311311
return this;
312312
};
313313

@@ -386,7 +386,7 @@ Multi.prototype.hmset = Multi.prototype.HMSET = function hmset () {
386386
arr[i] = arguments[i];
387387
}
388388
}
389-
this.queue.push(['hmset', arr, callback]);
389+
this.queue.push(['hmset', arr, callback], undefined, this.cur_command_ret_buf);
390390
return this;
391391
};
392392

@@ -441,7 +441,7 @@ Multi.prototype.subscribe = Multi.prototype.SUBSCRIBE = function subscribe () {
441441
var call_on_write = function () {
442442
self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
443443
};
444-
this.queue.push(['subscribe', arr, callback, call_on_write]);
444+
this.queue.push(['subscribe', arr, callback, call_on_write, this.cur_command_ret_buf]);
445445
return this;
446446
};
447447

@@ -498,7 +498,7 @@ Multi.prototype.unsubscribe = Multi.prototype.UNSUBSCRIBE = function unsubscribe
498498
// Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback
499499
self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
500500
};
501-
this.queue.push(['unsubscribe', arr, callback, call_on_write]);
501+
this.queue.push(['unsubscribe', arr, callback, call_on_write, this.cur_command_ret_buf]);
502502
return this;
503503
};
504504

@@ -553,7 +553,7 @@ Multi.prototype.psubscribe = Multi.prototype.PSUBSCRIBE = function psubscribe ()
553553
var call_on_write = function () {
554554
self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
555555
};
556-
this.queue.push(['psubscribe', arr, callback, call_on_write]);
556+
this.queue.push(['psubscribe', arr, callback, call_on_write, this.cur_command_ret_buf]);
557557
return this;
558558
};
559559

@@ -610,6 +610,6 @@ Multi.prototype.punsubscribe = Multi.prototype.PUNSUBSCRIBE = function punsubscr
610610
// Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback
611611
self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
612612
};
613-
this.queue.push(['punsubscribe', arr, callback, call_on_write]);
613+
this.queue.push(['punsubscribe', arr, callback, call_on_write, this.cur_command_ret_buf]);
614614
return this;
615615
};

lib/multi.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ var utils = require('./utils');
66
function Multi (client, args) {
77
this._client = client;
88
this.queue = new Queue();
9+
this.cur_command_ret_buf = 0;
910
var command, tmp_args;
1011
if (args) { // Either undefined or an array. Fail hard if it's not an array
1112
for (var i = 0; i < args.length; i++) {

0 commit comments

Comments
 (0)