@@ -31,88 +31,101 @@ RedisClient.prototype.batch = RedisClient.prototype.BATCH = function batch (args
3131 return new Multi ( this , args ) ;
3232} ;
3333
34- // Store db in this.select_db to restore it on reconnect
35- RedisClient . prototype . select = RedisClient . prototype . SELECT = function select ( db , callback ) {
36- var self = this ;
37- return this . internal_send_command ( 'select' , [ db ] , function ( err , res ) {
34+ function select_callback ( self , db , callback ) {
35+ return function ( err , res ) {
3836 if ( err === null ) {
37+ // Store db in this.select_db to restore it on reconnect
3938 self . selected_db = db ;
4039 }
4140 utils . callback_or_emit ( self , callback , err , res ) ;
42- } ) ;
41+ } ;
42+ }
43+
44+ RedisClient . prototype . select = RedisClient . prototype . SELECT = function select ( db , callback ) {
45+ return this . internal_send_command ( 'select' , [ db ] , select_callback ( this , db , callback ) ) ;
4346} ;
4447
45- RedisClient . prototype . monitor = RedisClient . prototype . MONITOR = function ( callback ) {
46- // Use a individual command, as this is a special case that does not has to be checked for any other command
47- var self = this ;
48- return this . internal_send_command ( 'monitor' , [ ] , function ( err , res ) {
48+ Multi . prototype . select = Multi . prototype . SELECT = function select ( db , callback ) {
49+ this . queue . push ( [ 'select' , [ db ] , select_callback ( this . _client , db , callback ) ] ) ;
50+ return this ;
51+ } ;
52+
53+ function monitor_callback ( self , callback ) {
54+ return function ( err , res ) {
4955 if ( err === null ) {
50- self . reply_parser . returnReply = function ( reply ) {
51- // If in monitor mode, all normal commands are still working and we only want to emit the streamlined commands
52- // As this is not the average use case and monitor is expensive anyway, let's change the code here, to improve
53- // the average performance of all other commands in case of no monitor mode
54- if ( self . monitoring ) {
55- var replyStr ;
56- if ( self . buffers && Buffer . isBuffer ( reply ) ) {
57- replyStr = reply . toString ( ) ;
58- } else {
59- replyStr = reply ;
60- }
61- // While reconnecting the redis server does not recognize the client as in monitor mode anymore
62- // Therefor the monitor command has to finish before it catches further commands
63- if ( typeof replyStr === 'string' && utils . monitor_regex . test ( replyStr ) ) {
64- var timestamp = replyStr . slice ( 0 , replyStr . indexOf ( ' ' ) ) ;
65- var args = replyStr . slice ( replyStr . indexOf ( '"' ) + 1 , - 1 ) . split ( '" "' ) . map ( function ( elem ) {
66- return elem . replace ( / \\ " / g, '"' ) ;
67- } ) ;
68- self . emit ( 'monitor' , timestamp , args , replyStr ) ;
69- return ;
70- }
71- }
72- self . return_reply ( reply ) ;
73- } ;
7456 self . monitoring = true ;
7557 }
7658 utils . callback_or_emit ( self , callback , err , res ) ;
77- } ) ;
59+ } ;
60+ }
61+
62+ RedisClient . prototype . monitor = RedisClient . prototype . MONITOR = function monitor ( callback ) {
63+ // Use a individual command, as this is a special case that does not has to be checked for any other command
64+ return this . internal_send_command ( 'monitor' , [ ] , monitor_callback ( this , callback ) ) ;
7865} ;
7966
80- RedisClient . prototype . quit = RedisClient . prototype . QUIT = function ( callback ) {
81- var self = this ;
82- var callback_hook = function ( err , res ) {
83- // TODO: Improve this by handling everything with coherend error codes and find out if there's anything missing
84- if ( err && ( err . code === 'NR_OFFLINE' ||
85- err . message === 'Redis connection gone from close event.' ||
86- err . message === 'The command can\'t be processed. The connection has already been closed.'
87- ) ) {
67+ // Only works with batch, not in a transaction
68+ Multi . prototype . monitor = Multi . prototype . MONITOR = function monitor ( callback ) {
69+ // Use a individual command, as this is a special case that does not has to be checked for any other command
70+ if ( this . exec !== this . exec_transaction ) {
71+ this . queue . push ( [ 'monitor' , [ ] , monitor_callback ( this . _client , callback ) ] ) ;
72+ return this ;
73+ }
74+ var err = new Error (
75+ 'You used the monitor command in combination with a transaction. Due to faulty return values of ' +
76+ 'Redis in this context, the monitor command is now executed without transaction instead and ignored ' +
77+ 'in the multi statement.'
78+ ) ;
79+ err . command = 'MONITOR' ;
80+ utils . reply_in_order ( this . _client , callback , err ) ;
81+ this . _client . monitor ( 'monitor' , callback ) ;
82+ return this ;
83+ } ;
84+
85+ function quit_callback ( self , callback ) {
86+ return function ( err , res ) {
87+ if ( err && err . code === 'NR_OFFLINE' ) {
8888 // Pretent the quit command worked properly in this case.
8989 // Either the quit landed in the offline queue and was flushed at the reconnect
9090 // or the offline queue is deactivated and the command was rejected right away
9191 // or the stream is not writable
92- // or while sending the quit, the connection dropped
92+ // or while sending the quit, the connection ended / closed
9393 err = null ;
9494 res = 'OK' ;
9595 }
9696 utils . callback_or_emit ( self , callback , err , res ) ;
97+ if ( self . stream . writable ) {
98+ // If the socket is still alive, kill it. This could happen if quit got a NR_OFFLINE error code
99+ self . stream . destroy ( ) ;
100+ }
97101 } ;
98- var backpressure_indicator = this . internal_send_command ( 'quit' , [ ] , callback_hook ) ;
102+ }
103+
104+ RedisClient . prototype . QUIT = RedisClient . prototype . quit = function ( callback ) {
105+ // TODO: Consider this for v.3
106+ // Allow the quit command to be fired as soon as possible to prevent it landing in the offline queue.
107+ // this.ready = this.offline_queue.length === 0;
108+ var backpressure_indicator = this . internal_send_command ( 'quit' , [ ] , quit_callback ( this , callback ) ) ;
99109 // Calling quit should always end the connection, no matter if there's a connection or not
100110 this . closing = true ;
111+ this . ready = false ;
101112 return backpressure_indicator ;
102113} ;
103114
104- // Store info in this.server_info after each call
105- RedisClient . prototype . info = RedisClient . prototype . INFO = function info ( section , callback ) {
106- var self = this ;
107- var ready = this . ready ;
108- var args = [ ] ;
109- if ( typeof section === 'function' ) {
110- callback = section ;
111- } else if ( section !== undefined ) {
112- args = Array . isArray ( section ) ? section : [ section ] ;
113- }
114- this . ready = ready || this . offline_queue . length === 0 ; // keep the execution order intakt
115- var tmp = this . internal_send_command ( 'info' , args , function ( err , res ) {
115+ // Only works with batch, not in a transaction
116+ Multi . prototype . QUIT = Multi . prototype . quit = function ( callback ) {
117+ var self = this . _client ;
118+ var call_on_write = function ( ) {
119+ // If called in a multi context, we expect redis is available
120+ self . closing = true ;
121+ self . ready = false ;
122+ } ;
123+ this . queue . push ( [ 'quit' , [ ] , quit_callback ( self , callback ) , call_on_write ] ) ;
124+ return this ;
125+ } ;
126+
127+ function info_callback ( self , callback ) {
128+ return function ( err , res ) {
116129 if ( res ) {
117130 var obj = { } ;
118131 var lines = res . toString ( ) . split ( '\r\n' ) ;
@@ -146,20 +159,33 @@ RedisClient.prototype.info = RedisClient.prototype.INFO = function info (section
146159 self . server_info = { } ;
147160 }
148161 utils . callback_or_emit ( self , callback , err , res ) ;
149- } ) ;
150- this . ready = ready ;
151- return tmp ;
162+ } ;
163+ }
164+
165+ // Store info in this.server_info after each call
166+ RedisClient . prototype . info = RedisClient . prototype . INFO = function info ( section , callback ) {
167+ var args = [ ] ;
168+ if ( typeof section === 'function' ) {
169+ callback = section ;
170+ } else if ( section !== undefined ) {
171+ args = Array . isArray ( section ) ? section : [ section ] ;
172+ }
173+ return this . internal_send_command ( 'info' , args , info_callback ( this , callback ) ) ;
152174} ;
153175
154- RedisClient . prototype . auth = RedisClient . prototype . AUTH = function auth ( pass , callback ) {
155- var self = this ;
156- var ready = this . ready ;
157- debug ( 'Sending auth to ' + self . address + ' id ' + self . connection_id ) ;
176+ Multi . prototype . info = Multi . prototype . INFO = function info ( section , callback ) {
177+ var args = [ ] ;
178+ if ( typeof section === 'function' ) {
179+ callback = section ;
180+ } else if ( section !== undefined ) {
181+ args = Array . isArray ( section ) ? section : [ section ] ;
182+ }
183+ this . queue . push ( [ 'info' , args , info_callback ( this . _client , callback ) ] ) ;
184+ return this ;
185+ } ;
158186
159- // Stash auth for connect and reconnect.
160- this . auth_pass = pass ;
161- this . ready = ready || this . offline_queue . length === 0 ; // keep the execution order intakt
162- var tmp = this . internal_send_command ( 'auth' , [ pass ] , function ( err , res ) {
187+ function auth_callback ( self , pass , callback ) {
188+ return function ( err , res ) {
163189 if ( err ) {
164190 if ( no_password_is_set . test ( err . message ) ) {
165191 self . warn ( 'Warning: Redis server does not require a password, but a password was supplied.' ) ;
@@ -175,11 +201,31 @@ RedisClient.prototype.auth = RedisClient.prototype.AUTH = function auth (pass, c
175201 }
176202 }
177203 utils . callback_or_emit ( self , callback , err , res ) ;
178- } ) ;
204+ } ;
205+ }
206+
207+ RedisClient . prototype . auth = RedisClient . prototype . AUTH = function auth ( pass , callback ) {
208+ debug ( 'Sending auth to ' + this . address + ' id ' + this . connection_id ) ;
209+
210+ // Stash auth for connect and reconnect.
211+ this . auth_pass = pass ;
212+ var ready = this . ready ;
213+ this . ready = ready || this . offline_queue . length === 0 ;
214+ var tmp = this . internal_send_command ( 'auth' , [ pass ] , auth_callback ( this , pass , callback ) ) ;
179215 this . ready = ready ;
180216 return tmp ;
181217} ;
182218
219+ // Only works with batch, not in a transaction
220+ Multi . prototype . auth = Multi . prototype . AUTH = function auth ( pass , callback ) {
221+ debug ( 'Sending auth to ' + this . address + ' id ' + this . connection_id ) ;
222+
223+ // Stash auth for connect and reconnect.
224+ this . auth_pass = pass ;
225+ this . queue . push ( [ 'auth' , [ pass ] , auth_callback ( this . _client , callback ) ] ) ;
226+ return this ;
227+ } ;
228+
183229RedisClient . prototype . hmset = RedisClient . prototype . HMSET = function hmset ( ) {
184230 var arr ,
185231 len = arguments . length ,
@@ -198,7 +244,7 @@ RedisClient.prototype.hmset = RedisClient.prototype.HMSET = function hmset () {
198244 for ( ; i < len ; i += 1 ) {
199245 arr [ i + 1 ] = arguments [ 1 ] [ i ] ;
200246 }
201- } else if ( typeof arguments [ 1 ] === 'object' && ( arguments . length === 2 || arguments . length === 3 && typeof arguments [ 2 ] === 'function' || typeof arguments [ 2 ] === 'undefined' ) ) {
247+ } else if ( typeof arguments [ 1 ] === 'object' && ( arguments . length === 2 || arguments . length === 3 && ( typeof arguments [ 2 ] === 'function' || typeof arguments [ 2 ] === 'undefined' ) ) ) {
202248 arr = [ arguments [ 0 ] ] ;
203249 for ( var field in arguments [ 1 ] ) { // jshint ignore: line
204250 arr . push ( field , arguments [ 1 ] [ field ] ) ;
@@ -219,6 +265,46 @@ RedisClient.prototype.hmset = RedisClient.prototype.HMSET = function hmset () {
219265 return this . internal_send_command ( 'hmset' , arr , callback ) ;
220266} ;
221267
268+ Multi . prototype . hmset = Multi . prototype . HMSET = function hmset ( ) {
269+ var arr ,
270+ len = arguments . length ,
271+ callback ,
272+ i = 0 ;
273+ if ( Array . isArray ( arguments [ 0 ] ) ) {
274+ arr = arguments [ 0 ] ;
275+ callback = arguments [ 1 ] ;
276+ } else if ( Array . isArray ( arguments [ 1 ] ) ) {
277+ if ( len === 3 ) {
278+ callback = arguments [ 2 ] ;
279+ }
280+ len = arguments [ 1 ] . length ;
281+ arr = new Array ( len + 1 ) ;
282+ arr [ 0 ] = arguments [ 0 ] ;
283+ for ( ; i < len ; i += 1 ) {
284+ arr [ i + 1 ] = arguments [ 1 ] [ i ] ;
285+ }
286+ } else if ( typeof arguments [ 1 ] === 'object' && ( arguments . length === 2 || arguments . length === 3 && ( typeof arguments [ 2 ] === 'function' || typeof arguments [ 2 ] === 'undefined' ) ) ) {
287+ arr = [ arguments [ 0 ] ] ;
288+ for ( var field in arguments [ 1 ] ) { // jshint ignore: line
289+ arr . push ( field , arguments [ 1 ] [ field ] ) ;
290+ }
291+ callback = arguments [ 2 ] ;
292+ } else {
293+ len = arguments . length ;
294+ // The later should not be the average use case
295+ if ( len !== 0 && ( typeof arguments [ len - 1 ] === 'function' || typeof arguments [ len - 1 ] === 'undefined' ) ) {
296+ len -- ;
297+ callback = arguments [ len ] ;
298+ }
299+ arr = new Array ( len ) ;
300+ for ( ; i < len ; i += 1 ) {
301+ arr [ i ] = arguments [ i ] ;
302+ }
303+ }
304+ this . queue . push ( [ 'hmset' , arr , callback ] ) ;
305+ return this ;
306+ } ;
307+
222308RedisClient . prototype . subscribe = RedisClient . prototype . SUBSCRIBE = function subscribe ( ) {
223309 var arr ,
224310 len = arguments . length ,
@@ -378,7 +464,7 @@ Multi.prototype.psubscribe = Multi.prototype.PSUBSCRIBE = function psubscribe ()
378464 arr [ i ] = arguments [ i ] ;
379465 }
380466 }
381- var self = this ;
467+ var self = this . _client ;
382468 var call_on_write = function ( ) {
383469 self . pub_sub_mode = self . pub_sub_mode || self . command_queue . length + 1 ;
384470 } ;
@@ -434,7 +520,7 @@ Multi.prototype.punsubscribe = Multi.prototype.PUNSUBSCRIBE = function punsubscr
434520 arr [ i ] = arguments [ i ] ;
435521 }
436522 }
437- var self = this ;
523+ var self = this . _client ;
438524 var call_on_write = function ( ) {
439525 // Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback
440526 self . pub_sub_mode = self . pub_sub_mode || self . command_queue . length + 1 ;
0 commit comments