@@ -122,6 +122,7 @@ function RedisClient (options, stream) {
122122 }
123123 this . command_queue = new Queue ( ) ; // Holds sent commands to de-pipeline them
124124 this . offline_queue = new Queue ( ) ; // Holds commands issued but not able to be sent
125+ this . pipeline_queue = new Queue ( ) ; // Holds all pipelined commands
125126 // ATTENTION: connect_timeout should change in v.3.0 so it does not count towards ending reconnection attempts after x seconds
126127 // This should be done by the retry_strategy. Instead it should only be the timeout for connecting to redis
127128 this . connect_timeout = + options . connect_timeout || 3600000 ; // 60 * 60 * 1000 ms
@@ -144,8 +145,8 @@ function RedisClient (options, stream) {
144145 this . auth_pass = options . auth_pass || options . password ;
145146 this . selected_db = options . db ; // Save the selected db here, used when reconnecting
146147 this . old_state = null ;
147- this . send_anyway = false ;
148- this . pipeline = 0 ;
148+ this . fire_strings = true ; // Determine if strings or buffers should be written to the stream
149+ this . pipeline = false ;
149150 this . times_connected = 0 ;
150151 this . options = options ;
151152 this . buffers = options . return_buffers || options . detect_buffers ;
@@ -374,23 +375,25 @@ RedisClient.prototype.on_ready = function () {
374375 debug ( 'on_ready called ' + this . address + ' id ' + this . connection_id ) ;
375376 this . ready = true ;
376377
377- var cork ;
378- if ( ! this . stream . cork ) {
379- cork = function ( len ) {
380- self . pipeline = len ;
381- self . pipeline_queue = new Queue ( len ) ;
382- } ;
383- } else {
384- cork = function ( len ) {
385- self . pipeline = len ;
386- self . pipeline_queue = new Queue ( len ) ;
378+ this . cork = function ( ) {
379+ self . pipeline = true ;
380+ if ( self . stream . cork ) {
387381 self . stream . cork ( ) ;
388- } ;
389- this . uncork = function ( ) {
382+ }
383+ } ;
384+ this . uncork = function ( ) {
385+ if ( self . fire_strings ) {
386+ self . write_strings ( ) ;
387+ } else {
388+ self . write_buffers ( ) ;
389+ }
390+ self . pipeline = false ;
391+ self . fire_strings = true ;
392+ if ( self . stream . uncork ) {
393+ // TODO: Consider using next tick here. See https://github.com/NodeRedis/node_redis/issues/1033
390394 self . stream . uncork ( ) ;
391- } ;
392- }
393- this . cork = cork ;
395+ }
396+ } ;
394397
395398 // Restore modal commands from previous connection. The order of the commands is important
396399 if ( this . selected_db !== undefined ) {
@@ -523,7 +526,8 @@ RedisClient.prototype.connection_gone = function (why, error) {
523526 this . ready = false ;
524527 // Deactivate cork to work with the offline queue
525528 this . cork = noop ;
526- this . pipeline = 0 ;
529+ this . uncork = noop ;
530+ this . pipeline = false ;
527531
528532 var state = {
529533 monitoring : this . monitoring ,
@@ -792,10 +796,6 @@ RedisClient.prototype.internal_send_command = function (command, args, callback)
792796 if ( args [ i ] . length > 30000 ) {
793797 big_data = true ;
794798 args_copy [ i ] = new Buffer ( args [ i ] , 'utf8' ) ;
795- if ( this . pipeline !== 0 ) {
796- this . pipeline += 2 ;
797- this . writeDefault = this . writeBuffers ;
798- }
799799 } else {
800800 args_copy [ i ] = args [ i ] ;
801801 }
@@ -813,10 +813,6 @@ RedisClient.prototype.internal_send_command = function (command, args, callback)
813813 args_copy [ i ] = args [ i ] ;
814814 buffer_args = true ;
815815 big_data = true ;
816- if ( this . pipeline !== 0 ) {
817- this . pipeline += 2 ;
818- this . writeDefault = this . writeBuffers ;
819- }
820816 } else {
821817 this . warn (
822818 'Deprecated: The ' + command . toUpperCase ( ) + ' command contains a argument of type ' + args [ i ] . constructor . name + '.\n' +
@@ -870,6 +866,7 @@ RedisClient.prototype.internal_send_command = function (command, args, callback)
870866 this . write ( command_str ) ;
871867 } else {
872868 debug ( 'Send command (' + command_str + ') has Buffer arguments' ) ;
869+ this . fire_strings = false ;
873870 this . write ( command_str ) ;
874871
875872 for ( i = 0 ; i < len ; i += 1 ) {
@@ -887,40 +884,33 @@ RedisClient.prototype.internal_send_command = function (command, args, callback)
887884 return ! this . should_buffer ;
888885} ;
889886
890- RedisClient . prototype . writeDefault = RedisClient . prototype . writeStrings = function ( data ) {
887+ RedisClient . prototype . write_strings = function ( ) {
891888 var str = '' ;
892889 for ( var command = this . pipeline_queue . shift ( ) ; command ; command = this . pipeline_queue . shift ( ) ) {
893890 // Write to stream if the string is bigger than 4mb. The biggest string may be Math.pow(2, 28) - 15 chars long
894891 if ( str . length + command . length > 4 * 1024 * 1024 ) {
895- this . stream . write ( str ) ;
892+ this . should_buffer = ! this . stream . write ( str ) ;
896893 str = '' ;
897894 }
898895 str += command ;
899896 }
900- this . should_buffer = ! this . stream . write ( str + data ) ;
897+ if ( str !== '' ) {
898+ this . should_buffer = ! this . stream . write ( str ) ;
899+ }
901900} ;
902901
903- RedisClient . prototype . writeBuffers = function ( data ) {
902+ RedisClient . prototype . write_buffers = function ( ) {
904903 for ( var command = this . pipeline_queue . shift ( ) ; command ; command = this . pipeline_queue . shift ( ) ) {
905- this . stream . write ( command ) ;
904+ this . should_buffer = ! this . stream . write ( command ) ;
906905 }
907- this . should_buffer = ! this . stream . write ( data ) ;
908906} ;
909907
910908RedisClient . prototype . write = function ( data ) {
911- if ( this . pipeline === 0 ) {
909+ if ( this . pipeline === false ) {
912910 this . should_buffer = ! this . stream . write ( data ) ;
913911 return ;
914912 }
915-
916- this . pipeline -- ;
917- if ( this . pipeline === 0 ) {
918- this . writeDefault ( data ) ;
919- return ;
920- }
921-
922913 this . pipeline_queue . push ( data ) ;
923- return ;
924914} ;
925915
926916Object . defineProperty ( exports , 'debugMode' , {
0 commit comments