30
30
import java .util .concurrent .Executors ;
31
31
import java .util .concurrent .Future ;
32
32
import java .util .concurrent .atomic .AtomicLong ;
33
+ import java .util .stream .Stream ;
33
34
import org .junit .jupiter .api .BeforeEach ;
34
35
import org .junit .jupiter .api .Test ;
36
+ import org .junit .jupiter .params .ParameterizedTest ;
37
+ import org .junit .jupiter .params .provider .Arguments ;
38
+ import org .junit .jupiter .params .provider .MethodSource ;
35
39
import software .amazon .awssdk .auth .credentials .AwsBasicCredentials ;
36
40
import software .amazon .awssdk .auth .credentials .StaticCredentialsProvider ;
41
+ import software .amazon .awssdk .core .exception .AbortedException ;
37
42
import software .amazon .awssdk .core .exception .ApiCallAttemptTimeoutException ;
38
43
import software .amazon .awssdk .core .retry .RetryPolicy ;
39
44
import software .amazon .awssdk .http .SdkHttpClient ;
40
45
import software .amazon .awssdk .http .apache .ApacheHttpClient ;
46
+ import software .amazon .awssdk .http .urlconnection .UrlConnectionHttpClient ;
41
47
import software .amazon .awssdk .metrics .MetricCollection ;
42
48
import software .amazon .awssdk .metrics .MetricPublisher ;
43
49
import software .amazon .awssdk .metrics .MetricRecord ;
@@ -53,6 +59,7 @@ class SyncClientConnectionInterruptionTest {
53
59
+ "\" :\" resultString\" }" ;
54
60
private final WireMockServer mockServer = new WireMockServer (new WireMockConfiguration ()
55
61
.bindAddress ("localhost" )
62
+ .dynamicPort ());
56
63
@ BeforeEach
57
64
public void setup () {
58
65
mockServer .start ();
@@ -109,51 +116,68 @@ void connectionPoolsGetsReusedWhenInterruptedWith_Multiple_MaxConnection() throw
109
116
@ Test
110
117
void interruptionWhenWaitingForLease_AbortsImmediately () throws InterruptedException {
111
118
Integer LONG_DELAY = 5000 ;
119
+ ExceptionInThreadRun exceptionInThreadRun = new ExceptionInThreadRun ();
112
120
AtomicLong leaseWaitingTime = new AtomicLong (LONG_DELAY );
113
121
stubPostRequest ("/2016-03-11/allTypes" , aResponse ().withFixedDelay (LONG_DELAY ), SAMPLE_BODY );
114
122
SdkHttpClient httpClient = ApacheHttpClient .builder ().maxConnections (1 ).build ();
115
123
ProtocolRestJsonClient client = getClient (httpClient , Duration .ofMillis (2L * LONG_DELAY )).build ();
116
124
ExecutorService executorService = Executors .newFixedThreadPool (5 );
117
125
executorService .submit (() -> client .allTypes ());
118
126
unInterruptedSleep (100 );
119
- Thread leaseWaitingThread = new Thread (() ->
120
- client .allTypes (l -> l .overrideConfiguration (
121
- b -> b
122
- .apiCallAttemptTimeout (Duration .ofSeconds (10 ))
123
- .addMetricPublisher (new MetricPublisher () {
124
- @ Override
125
- public void publish (MetricCollection metricCollection ) {
126
- System .out .println (metricCollection );
127
- Optional <MetricRecord <?>> apiCallDuration =
128
- metricCollection .stream ().filter (o -> "ApiCallDuration" .equals (o .metric ().name ())).findAny ();
129
- leaseWaitingTime .set (Duration .parse (apiCallDuration .get ().value ().toString ()).toMillis ());
130
- }
131
- @ Override
132
- public void close () {
133
- }
134
- })
135
- )));
127
+ Thread leaseWaitingThread = new Thread (() -> {
128
+
129
+ try {
130
+ client .allTypes (l -> l .overrideConfiguration (
131
+ b -> b
132
+ .apiCallAttemptTimeout (Duration .ofSeconds (10 ))
133
+ .addMetricPublisher (new MetricPublisher () {
134
+ @ Override
135
+ public void publish (MetricCollection metricCollection ) {
136
+ System .out .println (metricCollection );
137
+ Optional <MetricRecord <?>> apiCallDuration =
138
+ metricCollection .stream ().filter (o -> "ApiCallDuration" .equals (o .metric ().name ())).findAny ();
139
+ leaseWaitingTime .set (Duration .parse (apiCallDuration .get ().value ().toString ()).toMillis ());
140
+ }
141
+
142
+ @ Override
143
+ public void close () {
144
+ }
145
+ })
146
+ ));
147
+
148
+ } catch (Exception exception ) {
149
+ exceptionInThreadRun .setException (exception );
150
+
151
+ }
152
+ });
153
+
136
154
leaseWaitingThread .start ();
137
155
unInterruptedSleep (100 );
138
156
leaseWaitingThread .interrupt ();
139
157
leaseWaitingThread .join ();
140
158
assertThat (leaseWaitingTime .get ()).isNotEqualTo (LONG_DELAY .longValue ());
141
159
assertThat (leaseWaitingTime .get ()).isLessThan (LONG_DELAY .longValue ());
160
+ assertThat (exceptionInThreadRun .getException ()).isInstanceOf (AbortedException .class );
142
161
client .close ();
143
162
}
144
163
164
+ private static Stream <Arguments > httpClientImplementation () {
165
+ return Stream .of (Arguments .of (ApacheHttpClient .create ()),
166
+ Arguments .of (UrlConnectionHttpClient .create ()));
167
+ }
168
+
145
169
/**
146
170
* Service Latency is set to high value say X.
147
171
* Api timeout value id set to 1/3 of X.
148
172
* And we interrupt the thread at 90% of X.
149
173
* In this case since the ApiTimeOut first happened we should get ApiTimeOut Exception and not the interrupt.
150
174
*/
151
- @ Test
152
- void interruptionDueToApiTimeOut_followed_byInterruptCausesOnlyTimeOutException () throws InterruptedException {
175
+ @ ParameterizedTest
176
+ @ MethodSource ("httpClientImplementation" )
177
+ void interruptionDueToApiTimeOut_followed_byInterruptCausesOnlyTimeOutException (SdkHttpClient httpClient ) throws InterruptedException {
153
178
Integer SERVER_RESPONSE_DELAY = 3000 ;
154
179
stubPostRequest ("/2016-03-11/allTypes" , aResponse ().withFixedDelay (SERVER_RESPONSE_DELAY ), SAMPLE_BODY );
155
180
ExceptionInThreadRun exception = new ExceptionInThreadRun ();
156
- SdkHttpClient httpClient = ApacheHttpClient .builder ().build ();
157
181
ProtocolRestJsonClient client =
158
182
getClient (httpClient , Duration .ofMillis (10 )).overrideConfiguration (o -> o .retryPolicy (RetryPolicy .none ())).build ();
159
183
unInterruptedSleep (100 );
@@ -172,6 +196,32 @@ void interruptionDueToApiTimeOut_followed_byInterruptCausesOnlyTimeOutException(
172
196
assertThat (exception .getException ()).isInstanceOf (ApiCallAttemptTimeoutException .class );
173
197
client .close ();
174
198
}
199
+
200
+ @ ParameterizedTest
201
+ @ MethodSource ("httpClientImplementation" )
202
+ void sdkClientInterrupted_while_connectionIsInProgress (SdkHttpClient httpClient ) throws InterruptedException {
203
+ Integer SERVER_RESPONSE_DELAY = 3000 ;
204
+ stubPostRequest ("/2016-03-11/allTypes" , aResponse ().withFixedDelay (SERVER_RESPONSE_DELAY ), SAMPLE_BODY );
205
+ ExceptionInThreadRun exception = new ExceptionInThreadRun ();
206
+ ProtocolRestJsonClient client =
207
+ getClient (httpClient , Duration .ofMillis (10 )).overrideConfiguration (o -> o .retryPolicy (RetryPolicy .none ())).build ();
208
+ unInterruptedSleep (100 );
209
+ // We need to creat a separate thread to interrupt it externally.
210
+ Thread leaseWaitingThread = new Thread (() -> {
211
+ try {
212
+ client .allTypes (l -> l .overrideConfiguration (b -> b .apiCallAttemptTimeout (Duration .ofMillis (SERVER_RESPONSE_DELAY * 3 ))));
213
+ } catch (Exception e ) {
214
+ exception .setException (e );
215
+ }
216
+ });
217
+ leaseWaitingThread .start ();
218
+ unInterruptedSleep (SERVER_RESPONSE_DELAY - SERVER_RESPONSE_DELAY / 10 );
219
+ leaseWaitingThread .interrupt ();
220
+ leaseWaitingThread .join ();
221
+ assertThat (exception .getException ()).isInstanceOf (AbortedException .class );
222
+ client .close ();
223
+ }
224
+
175
225
private class ExceptionInThreadRun {
176
226
private Exception exception ;
177
227
public Exception getException () {
0 commit comments