@@ -7,15 +7,48 @@ import { HttpStatusCodes } from "./constants";
7
7
import * as request from "request" ;
8
8
9
9
export class HttpClient implements Server . IHttpClient {
10
- private defaultUserAgent : string ;
11
10
private static STATUS_CODE_REGEX = / s t a t u s c o d e = ( \d + ) / i;
11
+ private static STUCK_REQUEST_ERROR_MESSAGE = "The request can't receive any response." ;
12
+ private static STUCK_RESPONSE_ERROR_MESSAGE = "Can't receive all parts of the response." ;
13
+ private static STUCK_REQUEST_TIMEOUT = 60000 ;
14
+ // We receive multiple response packets every ms but we don't need to be very aggressive here.
15
+ private static STUCK_RESPONSE_CHECK_INTERVAL = 10000 ;
16
+
17
+ private defaultUserAgent : string ;
18
+ private cleanupData : ICleanupRequestData [ ] ;
12
19
13
20
constructor ( private $config : Config . IConfig ,
14
21
private $logger : ILogger ,
22
+ private $processService : IProcessService ,
15
23
private $proxyService : IProxyService ,
16
- private $staticConfig : Config . IStaticConfig ) { }
24
+ private $staticConfig : Config . IStaticConfig ) {
25
+ this . cleanupData = [ ] ;
26
+ this . $processService . attachToProcessExitSignals ( this , ( ) => {
27
+ this . cleanupData . forEach ( d => {
28
+ this . cleanupAfterRequest ( d ) ;
29
+ } ) ;
30
+ } ) ;
31
+ }
32
+
33
+ public async httpRequest ( options : any , proxySettings ?: IProxySettings ) : Promise < Server . IResponse > {
34
+ try {
35
+ const result = await this . httpRequestCore ( options , proxySettings ) ;
36
+ return result ;
37
+ } catch ( err ) {
38
+ if ( err . message === HttpClient . STUCK_REQUEST_ERROR_MESSAGE || err . message === HttpClient . STUCK_RESPONSE_ERROR_MESSAGE ) {
39
+ // Retry the request immediately because there are at least 10 seconds between the two requests.
40
+ // We have to retry only once the sporadically stuck requests/responses.
41
+ // We can add exponential backoff retry here if we decide that we need to workaround bigger network issues on the client side.
42
+ this . $logger . warn ( "%s Retrying request to %s..." , err . message , options . url || options ) ;
43
+ const retryResult = await this . httpRequestCore ( options , proxySettings ) ;
44
+ return retryResult ;
45
+ }
17
46
18
- async httpRequest ( options : any , proxySettings ?: IProxySettings ) : Promise < Server . IResponse > {
47
+ throw err ;
48
+ }
49
+ }
50
+
51
+ private async httpRequestCore ( options : any , proxySettings ?: IProxySettings ) : Promise < Server . IResponse > {
19
52
if ( _ . isString ( options ) ) {
20
53
options = {
21
54
url : options ,
@@ -73,6 +106,10 @@ export class HttpClient implements Server.IHttpClient {
73
106
74
107
const result = new Promise < Server . IResponse > ( ( resolve , reject ) => {
75
108
let timerId : number ;
109
+ let stuckRequestTimerId : number ;
110
+ let hasResponse = false ;
111
+ const cleanupRequestData : ICleanupRequestData = Object . create ( { timers : [ ] } ) ;
112
+ this . cleanupData . push ( cleanupRequestData ) ;
76
113
77
114
const promiseActions : IPromiseActions < Server . IResponse > = {
78
115
resolve,
@@ -82,8 +119,9 @@ export class HttpClient implements Server.IHttpClient {
82
119
83
120
if ( options . timeout ) {
84
121
timerId = setTimeout ( ( ) => {
85
- this . setResponseResult ( promiseActions , timerId , { err : new Error ( `Request to ${ unmodifiedOptions . url } timed out.` ) } , ) ;
122
+ this . setResponseResult ( promiseActions , cleanupRequestData , { err : new Error ( `Request to ${ unmodifiedOptions . url } timed out.` ) } ) ;
86
123
} , options . timeout ) ;
124
+ cleanupRequestData . timers . push ( timerId ) ;
87
125
88
126
delete options . timeout ;
89
127
}
@@ -94,6 +132,16 @@ export class HttpClient implements Server.IHttpClient {
94
132
95
133
this . $logger . trace ( "httpRequest: %s" , util . inspect ( options ) ) ;
96
134
const requestObj = request ( options ) ;
135
+ cleanupRequestData . req = requestObj ;
136
+
137
+ stuckRequestTimerId = setTimeout ( ( ) => {
138
+ clearTimeout ( stuckRequestTimerId ) ;
139
+ stuckRequestTimerId = null ;
140
+ if ( ! hasResponse ) {
141
+ this . setResponseResult ( promiseActions , cleanupRequestData , { err : new Error ( HttpClient . STUCK_REQUEST_ERROR_MESSAGE ) } ) ;
142
+ }
143
+ } , options . timeout || HttpClient . STUCK_REQUEST_TIMEOUT ) ;
144
+ cleanupRequestData . timers . push ( stuckRequestTimerId ) ;
97
145
98
146
requestObj
99
147
. on ( "error" , ( err : IHttpRequestError ) => {
@@ -107,15 +155,26 @@ export class HttpClient implements Server.IHttpClient {
107
155
const errorMessage = this . getErrorMessage ( errorMessageStatusCode , null ) ;
108
156
err . proxyAuthenticationRequired = errorMessageStatusCode === HttpStatusCodes . PROXY_AUTHENTICATION_REQUIRED ;
109
157
err . message = errorMessage || err . message ;
110
- this . setResponseResult ( promiseActions , timerId , { err } ) ;
158
+ this . setResponseResult ( promiseActions , cleanupRequestData , { err } ) ;
111
159
} )
112
160
. on ( "response" , ( response : Server . IRequestResponseData ) => {
161
+ cleanupRequestData . res = response ;
162
+ hasResponse = true ;
163
+ let lastChunkTimestamp = Date . now ( ) ;
164
+ cleanupRequestData . stuckResponseIntervalId = setInterval ( ( ) => {
165
+ if ( Date . now ( ) - lastChunkTimestamp > HttpClient . STUCK_RESPONSE_CHECK_INTERVAL ) {
166
+ this . setResponseResult ( promiseActions , cleanupRequestData , { err : new Error ( HttpClient . STUCK_RESPONSE_ERROR_MESSAGE ) } ) ;
167
+ }
168
+ } , HttpClient . STUCK_RESPONSE_CHECK_INTERVAL ) ;
113
169
const successful = helpers . isRequestSuccessful ( response ) ;
114
170
if ( ! successful ) {
115
171
pipeTo = undefined ;
116
172
}
117
173
118
174
let responseStream = response ;
175
+ responseStream . on ( "data" , ( chunk : string ) => {
176
+ lastChunkTimestamp = Date . now ( ) ;
177
+ } ) ;
119
178
switch ( response . headers [ "content-encoding" ] ) {
120
179
case "gzip" :
121
180
responseStream = responseStream . pipe ( zlib . createGunzip ( ) ) ;
@@ -128,7 +187,7 @@ export class HttpClient implements Server.IHttpClient {
128
187
if ( pipeTo ) {
129
188
pipeTo . on ( "finish" , ( ) => {
130
189
this . $logger . trace ( "httpRequest: Piping done. code = %d" , response . statusCode . toString ( ) ) ;
131
- this . setResponseResult ( promiseActions , timerId , { response } ) ;
190
+ this . setResponseResult ( promiseActions , cleanupRequestData , { response } ) ;
132
191
} ) ;
133
192
134
193
responseStream . pipe ( pipeTo ) ;
@@ -144,13 +203,13 @@ export class HttpClient implements Server.IHttpClient {
144
203
const responseBody = data . join ( "" ) ;
145
204
146
205
if ( successful ) {
147
- this . setResponseResult ( promiseActions , timerId , { body : responseBody , response } ) ;
206
+ this . setResponseResult ( promiseActions , cleanupRequestData , { body : responseBody , response } ) ;
148
207
} else {
149
208
const errorMessage = this . getErrorMessage ( response . statusCode , responseBody ) ;
150
209
const err : any = new Error ( errorMessage ) ;
151
210
err . response = response ;
152
211
err . body = responseBody ;
153
- this . setResponseResult ( promiseActions , timerId , { err } ) ;
212
+ this . setResponseResult ( promiseActions , cleanupRequestData , { err } ) ;
154
213
}
155
214
} ) ;
156
215
}
@@ -181,16 +240,12 @@ export class HttpClient implements Server.IHttpClient {
181
240
return response ;
182
241
}
183
242
184
- private setResponseResult ( result : IPromiseActions < Server . IResponse > , timerId : number , resultData : { response ?: Server . IRequestResponseData , body ?: string , err ?: Error } ) : void {
185
- if ( timerId ) {
186
- clearTimeout ( timerId ) ;
187
- timerId = null ;
188
- }
189
-
243
+ private setResponseResult ( result : IPromiseActions < Server . IResponse > , cleanupRequestData : ICleanupRequestData , resultData : { response ?: Server . IRequestResponseData , body ?: string , err ?: Error } ) : void {
244
+ this . cleanupAfterRequest ( cleanupRequestData ) ;
190
245
if ( ! result . isResolved ( ) ) {
191
246
result . isResolved = ( ) => true ;
192
- if ( resultData . err ) {
193
- return result . reject ( resultData . err ) ;
247
+ if ( resultData . err || ! resultData . response . complete ) {
248
+ return result . reject ( resultData . err || new Error ( "Request canceled" ) ) ;
194
249
}
195
250
196
251
const finalResult : any = resultData ;
@@ -258,5 +313,36 @@ export class HttpClient implements Server.IHttpClient {
258
313
this . $logger . trace ( "Using proxy: %s" , options . proxy ) ;
259
314
}
260
315
}
316
+
317
+ private cleanupAfterRequest ( data : ICleanupRequestData ) : void {
318
+ data . timers . forEach ( t => {
319
+ if ( t ) {
320
+ clearTimeout ( t ) ;
321
+ t = null ;
322
+ }
323
+ } ) ;
324
+
325
+ if ( data . stuckResponseIntervalId ) {
326
+ clearInterval ( data . stuckResponseIntervalId ) ;
327
+ data . stuckResponseIntervalId = null ;
328
+ }
329
+
330
+ if ( data . req ) {
331
+ data . req . abort ( ) ;
332
+ }
333
+
334
+ if ( data . res ) {
335
+ data . res . destroy ( ) ;
336
+ }
337
+ }
338
+
261
339
}
340
+
341
+ interface ICleanupRequestData {
342
+ timers : number [ ] ;
343
+ stuckResponseIntervalId : NodeJS . Timer ;
344
+ req : request . Request ;
345
+ res : Server . IRequestResponseData ;
346
+ }
347
+
262
348
$injector . register ( "httpClient" , HttpClient ) ;
0 commit comments