@@ -52,6 +52,7 @@ function CryptoStream(pair) {
52
52
this . readable = this . writable = true ;
53
53
54
54
this . _paused = false ;
55
+ this . _needDrain = false ;
55
56
this . _pending = [ ] ;
56
57
this . _pendingCallbacks = [ ] ;
57
58
this . _pendingBytes = 0 ;
@@ -86,7 +87,7 @@ CryptoStream.prototype.write = function(data /* , encoding, cb */) {
86
87
data = new Buffer ( data , encoding ) ;
87
88
}
88
89
89
- debug ( 'clearIn data') ;
90
+ debug ( ( this === this . pair . cleartext ? 'clear' : 'encrypted' ) + 'In data') ;
90
91
91
92
this . _pending . push ( data ) ;
92
93
this . _pendingCallbacks . push ( cb ) ;
@@ -95,7 +96,26 @@ CryptoStream.prototype.write = function(data /* , encoding, cb */) {
95
96
this . pair . _writeCalled = true ;
96
97
this . pair . cycle ( ) ;
97
98
98
- return this . _pendingBytes < 128 * 1024 ;
99
+ // In the following cases, write() should return a false,
100
+ // then this stream should eventually emit 'drain' event.
101
+ //
102
+ // 1. There are pending data more than 128k bytes.
103
+ // 2. A forward stream shown below is paused.
104
+ // A) EncryptedStream for CleartextStream.write().
105
+ // B) CleartextStream for EncryptedStream.write().
106
+ //
107
+ if ( ! this . _needDrain ) {
108
+ if ( this . _pendingBytes >= 128 * 1024 ) {
109
+ this . _needDrain = true ;
110
+ } else {
111
+ if ( this === this . pair . cleartext ) {
112
+ this . _needDrain = this . pair . encrypted . _paused ;
113
+ } else {
114
+ this . _needDrain = this . pair . cleartext . _paused ;
115
+ }
116
+ }
117
+ }
118
+ return ! this . _needDrain ;
99
119
} ;
100
120
101
121
@@ -380,11 +400,25 @@ CryptoStream.prototype._pull = function() {
380
400
assert ( rv === tmp . length ) ;
381
401
}
382
402
383
- // If we've cleared all of incoming encrypted data, emit drain.
384
- if ( havePending && this . _pending . length === 0 ) {
385
- debug ( 'drain' ) ;
386
- this . emit ( 'drain' ) ;
387
- if ( this . __destroyOnDrain ) this . end ( ) ;
403
+ // If pending data has cleared, 'drain' event should be emitted
404
+ // after write() returns a false.
405
+ // Except when a forward stream shown below is paused.
406
+ // A) EncryptedStream for CleartextStream._pull().
407
+ // B) CleartextStream for EncryptedStream._pull().
408
+ //
409
+ if ( this . _needDrain && this . _pending . length === 0 ) {
410
+ var paused ;
411
+ if ( this === this . pair . cleartext ) {
412
+ paused = this . pair . encrypted . _paused ;
413
+ } else {
414
+ paused = this . pair . cleartext . _paused ;
415
+ }
416
+ if ( ! paused ) {
417
+ debug ( 'drain' ) ;
418
+ process . nextTick ( this . emit . bind ( this , 'drain' ) ) ;
419
+ this . _needDrain = false ;
420
+ if ( this . __destroyOnDrain ) this . end ( ) ;
421
+ }
388
422
}
389
423
} ;
390
424
0 commit comments