@@ -763,6 +763,25 @@ util.inherits(ClientRequest, OutgoingMessage);
763
763
exports . ClientRequest = ClientRequest ;
764
764
765
765
766
+ ClientRequest . prototype . abort = function ( ) {
767
+ if ( this . _queue ) {
768
+ // queued for dispatch
769
+ assert ( ! this . connection ) ;
770
+ var i = this . _queue . indexOf ( this ) ;
771
+ this . _queue . splice ( i , 1 ) ;
772
+
773
+ } else if ( this . connection ) {
774
+ // in-progress
775
+ var c = this . connection ;
776
+ this . detachSocket ( c ) ;
777
+ c . destroy ( ) ;
778
+
779
+ } else {
780
+ // already complete.
781
+ }
782
+ } ;
783
+
784
+
766
785
function httpSocketSetup ( socket ) {
767
786
// NOTE: be sure not to use ondrain elsewhere in this file!
768
787
socket . ondrain = function ( ) {
@@ -781,6 +800,11 @@ function Server(requestListener) {
781
800
this . addListener ( 'request' , requestListener ) ;
782
801
}
783
802
803
+ // Similar option to this. Too lazy to write my own docs.
804
+ // http://www.squid-cache.org/Doc/config/half_closed_clients/
805
+ // http://wiki.squid-cache.org/SquidFaq/InnerWorkings#What_is_a_half-closed_filedescriptor.3F
806
+ this . httpAllowHalfOpen = false ;
807
+
784
808
this . addListener ( 'connection' , connectionListener ) ;
785
809
}
786
810
util . inherits ( Server , net . Server ) ;
@@ -797,6 +821,15 @@ exports.createServer = function(requestListener) {
797
821
function connectionListener ( socket ) {
798
822
var self = this ;
799
823
var outgoing = [ ] ;
824
+ var incoming = [ ] ;
825
+
826
+ function abortIncoming ( ) {
827
+ while ( incoming . length ) {
828
+ var req = incoming . shift ( ) ;
829
+ req . emit ( 'aborted' ) ;
830
+ }
831
+ // abort socket._httpMessage ?
832
+ }
800
833
801
834
debug ( 'SERVER new http connection' ) ;
802
835
@@ -842,9 +875,18 @@ function connectionListener(socket) {
842
875
} ;
843
876
844
877
socket . onend = function ( ) {
845
- parser . finish ( ) ;
878
+ var ret = parser . finish ( ) ;
846
879
847
- if ( outgoing . length ) {
880
+ if ( ret instanceof Error ) {
881
+ debug ( 'parse error' ) ;
882
+ socket . destroy ( ret ) ;
883
+ return ;
884
+ }
885
+
886
+ if ( ! self . httpAllowHalfOpen ) {
887
+ abortIncoming ( ) ;
888
+ socket . end ( ) ;
889
+ } else if ( outgoing . length ) {
848
890
outgoing [ outgoing . length - 1 ] . _last = true ;
849
891
} else if ( socket . _httpMessage ) {
850
892
socket . _httpMessage . _last = true ;
@@ -854,14 +896,19 @@ function connectionListener(socket) {
854
896
} ;
855
897
856
898
socket . addListener ( 'close' , function ( ) {
899
+ debug ( 'server socket close' ) ;
857
900
// unref the parser for easy gc
858
901
parsers . free ( parser ) ;
902
+
903
+ abortIncoming ( ) ;
859
904
} ) ;
860
905
861
906
// The following callback is issued after the headers have been read on a
862
907
// new message. In this callback we setup the response object and pass it
863
908
// to the user.
864
909
parser . onIncoming = function ( req , shouldKeepAlive ) {
910
+ incoming . push ( req ) ;
911
+
865
912
var res = new ServerResponse ( req ) ;
866
913
debug ( 'server response shouldKeepAlive: ' + shouldKeepAlive ) ;
867
914
res . shouldKeepAlive = shouldKeepAlive ;
@@ -877,6 +924,9 @@ function connectionListener(socket) {
877
924
// When we're finished writing the response, check if this is the last
878
925
// respose, if so destroy the socket.
879
926
res . on ( 'finish' , function ( ) {
927
+ assert ( incoming [ 0 ] === req ) ;
928
+ incoming . shift ( ) ;
929
+
880
930
res . detachSocket ( socket ) ;
881
931
882
932
if ( res . _last ) {
@@ -909,23 +959,28 @@ function connectionListener(socket) {
909
959
exports . _connectionListener = connectionListener ;
910
960
911
961
962
+
912
963
function Agent ( host , port ) {
913
964
this . host = host ;
914
965
this . port = port ;
915
966
916
967
this . queue = [ ] ;
917
968
this . sockets = [ ] ;
918
- this . maxSockets = 5 ;
969
+ this . maxSockets = Agent . defaultMaxSockets ;
919
970
}
920
971
util . inherits ( Agent , EventEmitter ) ;
921
972
exports . Agent = Agent ;
922
973
923
974
975
+ Agent . defaultMaxSockets = 5 ;
976
+
977
+
924
978
Agent . prototype . appendMessage = function ( options ) {
925
979
var self = this ;
926
980
927
981
var req = new ClientRequest ( options ) ;
928
982
this . queue . push ( req ) ;
983
+ req . _queue = this . queue ;
929
984
930
985
/*
931
986
req.on('finish', function () {
@@ -978,6 +1033,8 @@ Agent.prototype._establishNewConnection = function() {
978
1033
req = socket . _httpMessage ;
979
1034
} else if ( self . queue . length ) {
980
1035
req = self . queue . shift ( ) ;
1036
+ assert ( req . _queue === self . queue ) ;
1037
+ req . _queue = null ;
981
1038
} else {
982
1039
// No requests on queue? Where is the request
983
1040
assert ( 0 ) ;
@@ -1135,6 +1192,9 @@ Agent.prototype._cycle = function() {
1135
1192
debug ( 'Agent found socket, shift' ) ;
1136
1193
// We found an available connection!
1137
1194
this . queue . shift ( ) ; // remove first from queue.
1195
+ assert ( first . _queue === this . queue ) ;
1196
+ first . _queue = null ;
1197
+
1138
1198
first . assignSocket ( socket ) ;
1139
1199
self . _cycle ( ) ; // try to dispatch another
1140
1200
return ;
0 commit comments