25
25
import com .github .tomakehurst .wiremock .core .WireMockConfiguration ;
26
26
import java .net .URI ;
27
27
import java .time .Duration ;
28
+ import java .util .List ;
28
29
import java .util .Optional ;
29
30
import java .util .concurrent .ExecutorService ;
30
31
import java .util .concurrent .Executors ;
31
32
import java .util .concurrent .Future ;
32
33
import java .util .concurrent .atomic .AtomicLong ;
33
- import java .util .stream .Stream ;
34
+ import org .junit .jupiter .api .AfterAll ;
35
+ import org .junit .jupiter .api .AfterEach ;
34
36
import org .junit .jupiter .api .BeforeEach ;
35
37
import org .junit .jupiter .api .Test ;
36
- import org .junit .jupiter .params .ParameterizedTest ;
37
- import org .junit .jupiter .params .provider .Arguments ;
38
- import org .junit .jupiter .params .provider .MethodSource ;
39
38
import software .amazon .awssdk .auth .credentials .AwsBasicCredentials ;
40
39
import software .amazon .awssdk .auth .credentials .StaticCredentialsProvider ;
41
40
import software .amazon .awssdk .core .exception .AbortedException ;
@@ -57,26 +56,32 @@ class SyncClientConnectionInterruptionTest {
57
56
public static final String SAMPLE_BODY = "{\" StringMember"
58
57
+ "\" :\" resultString\" }" ;
59
58
private final WireMockServer mockServer = new WireMockServer (new WireMockConfiguration ()
60
- .bindAddress ("localhost" )
61
- .dynamicPort ());
59
+ .bindAddress ("localhost" ).dynamicPort ());
60
+
61
+ private static final ExecutorService executorService = Executors .newCachedThreadPool ();
62
+
62
63
@ BeforeEach
63
64
public void setup () {
64
65
mockServer .start ();
65
66
stubPostRequest (".*" , aResponse (), "{}" );
66
67
}
67
68
69
+ @ AfterAll
70
+ public static void cleanUp (){
71
+ executorService .shutdownNow ();
72
+ }
73
+
68
74
@ Test
69
75
void connectionPoolsGetsReusedWhenInterruptedWith_1_MaxConnection () throws Exception {
70
- Integer LONG_DELAY = 1500 ;
76
+ Integer responseDelay = 1500 ;
71
77
72
78
String urlRegex = "/2016-03-11/allTypes" ;
73
- stubPostRequest (urlRegex , aResponse ().withFixedDelay (LONG_DELAY ), SAMPLE_BODY );
79
+ stubPostRequest (urlRegex , aResponse ().withFixedDelay (responseDelay ), SAMPLE_BODY );
74
80
SdkHttpClient httpClient = ApacheHttpClient .builder ().maxConnections (1 ).build ();
75
- ProtocolRestJsonClient client = getClient (httpClient , Duration .ofMillis (2L * LONG_DELAY )).build ();
81
+ ProtocolRestJsonClient client = getClient (httpClient , Duration .ofMillis (2L * responseDelay )).build ();
76
82
77
- ExecutorService executorService = Executors .newFixedThreadPool (5 );
78
83
Future <?> toBeInterruptedFuture = executorService .submit (() -> client .allTypes ());
79
- unInterruptedSleep (LONG_DELAY - LONG_DELAY / 5 );
84
+ unInterruptedSleep (responseDelay - responseDelay / 5 );
80
85
toBeInterruptedFuture .cancel (true );
81
86
// Make sure thread start the Http connections
82
87
unInterruptedSleep (50 );
@@ -85,56 +90,27 @@ void connectionPoolsGetsReusedWhenInterruptedWith_1_MaxConnection() throws Excep
85
90
executorService .shutdownNow ();
86
91
}
87
92
88
- @ Test
89
- void connectionPoolsGetsReusedWhenInterruptedWith_Multiple_MaxConnection () throws Exception {
90
- Integer LONG_DELAY = 1000 ;
91
- Integer VERY_VERY_LONG_DELAY = LONG_DELAY * 5 ;
92
- stubPostRequest ("/2016-03-11/allTypes" , aResponse ().withFixedDelay (LONG_DELAY ), SAMPLE_BODY );
93
- stubPostRequest ("/2016-03-11/JsonValuesOperation" , aResponse ().withFixedDelay (VERY_VERY_LONG_DELAY ), SAMPLE_BODY );
94
- SdkHttpClient httpClient = ApacheHttpClient .builder ().maxConnections (3 ).build ();
95
- Duration timeOutDuration = Duration .ofMillis (2L * LONG_DELAY );
96
- ProtocolRestJsonClient client = getClient (httpClient , timeOutDuration ).build ();
97
-
98
- ExecutorService executorService = Executors .newFixedThreadPool (5 );
99
- Future <?> toBeInterruptedFuture0 = executorService .submit (() -> client .allTypes ());
100
- Future <?> toBeInterruptedFuture1 = executorService .submit (() -> client .allTypes ());
101
- Future <?> toBeInterruptedFuture2 = executorService .submit (() -> client .allTypes ());
102
- unInterruptedSleep (50 );
103
- executorService .submit (() -> client .jsonValuesOperation ());
104
- unInterruptedSleep (LONG_DELAY / 2 );
105
- toBeInterruptedFuture0 .cancel (true );
106
- toBeInterruptedFuture1 .cancel (true );
107
- toBeInterruptedFuture2 .cancel (true );
108
- unInterruptedSleep (LONG_DELAY / 2 );
109
- // Make sure thread start the Http connections
110
- AllTypesResponse allTypesResponse = client .allTypes ();
111
- assertThat (allTypesResponse .stringMember ()).isEqualTo ("resultString" );
112
- executorService .shutdownNow ();
113
- }
114
-
115
93
@ Test
116
94
void interruptionWhenWaitingForLease_AbortsImmediately () throws InterruptedException {
117
- Integer LONG_DELAY = 5000 ;
95
+ Integer responseDelay = 50000 ;
118
96
ExceptionInThreadRun exceptionInThreadRun = new ExceptionInThreadRun ();
119
- AtomicLong leaseWaitingTime = new AtomicLong (LONG_DELAY );
120
- stubPostRequest ("/2016-03-11/allTypes" , aResponse ().withFixedDelay (LONG_DELAY ), SAMPLE_BODY );
97
+ AtomicLong leaseWaitingTime = new AtomicLong (responseDelay );
98
+ stubPostRequest ("/2016-03-11/allTypes" , aResponse ().withFixedDelay (responseDelay ), SAMPLE_BODY );
121
99
SdkHttpClient httpClient = ApacheHttpClient .builder ().maxConnections (1 ).build ();
122
- ProtocolRestJsonClient client = getClient (httpClient , Duration .ofMillis (2L * LONG_DELAY )).build ();
123
- ExecutorService executorService = Executors .newFixedThreadPool (5 );
100
+ ProtocolRestJsonClient client = getClient (httpClient , Duration .ofMillis (2L * responseDelay )).build ();
124
101
executorService .submit (() -> client .allTypes ());
125
- unInterruptedSleep (100 );
102
+ // 1 Sec sleep to make sure Thread 1 is picked for executing Http connection
103
+ unInterruptedSleep (1000 );
126
104
Thread leaseWaitingThread = new Thread (() -> {
127
105
128
106
try {
129
107
client .allTypes (l -> l .overrideConfiguration (
130
108
b -> b
131
- .apiCallAttemptTimeout (Duration .ofSeconds (10 ))
132
109
.addMetricPublisher (new MetricPublisher () {
133
110
@ Override
134
111
public void publish (MetricCollection metricCollection ) {
135
- System .out .println (metricCollection );
136
112
Optional <MetricRecord <?>> apiCallDuration =
137
- metricCollection .stream ().filter (o -> "ApiCallDuration" .equals (o .metric ().name ())).findAny ();
113
+ metricCollection .stream ().filter (o -> "ApiCallDuration" .equals (o .metric ().name ())).findAny ();
138
114
leaseWaitingTime .set (Duration .parse (apiCallDuration .get ().value ().toString ()).toMillis ());
139
115
}
140
116
@@ -151,11 +127,13 @@ public void close() {
151
127
});
152
128
153
129
leaseWaitingThread .start ();
154
- unInterruptedSleep (100 );
130
+ // 1 sec sleep to make sure Http connection execution is initialized for Thread 2 , in this case it will wait for lease
131
+ // and immediately terminate on interrupt
132
+ unInterruptedSleep (1000 );
155
133
leaseWaitingThread .interrupt ();
156
134
leaseWaitingThread .join ();
157
- assertThat (leaseWaitingTime .get ()).isNotEqualTo (LONG_DELAY .longValue ());
158
- assertThat (leaseWaitingTime .get ()).isLessThan (LONG_DELAY .longValue ());
135
+ assertThat (leaseWaitingTime .get ()).isNotEqualTo (responseDelay .longValue ());
136
+ assertThat (leaseWaitingTime .get ()).isLessThan (responseDelay .longValue ());
159
137
assertThat (exceptionInThreadRun .getException ()).isInstanceOf (AbortedException .class );
160
138
client .close ();
161
139
}
@@ -169,54 +147,28 @@ public void close() {
169
147
@ Test
170
148
void interruptionDueToApiTimeOut_followed_byInterruptCausesOnlyTimeOutException () throws InterruptedException {
171
149
SdkHttpClient httpClient = ApacheHttpClient .create ();
172
- Integer SERVER_RESPONSE_DELAY = 3000 ;
173
- stubPostRequest ("/2016-03-11/allTypes" , aResponse ().withFixedDelay (SERVER_RESPONSE_DELAY ), SAMPLE_BODY );
150
+ Integer responseDelay = 3000 ;
151
+ stubPostRequest ("/2016-03-11/allTypes" , aResponse ().withFixedDelay (responseDelay ), SAMPLE_BODY );
174
152
ExceptionInThreadRun exception = new ExceptionInThreadRun ();
175
153
ProtocolRestJsonClient client =
176
154
getClient (httpClient , Duration .ofMillis (10 )).overrideConfiguration (o -> o .retryPolicy (RetryPolicy .none ())).build ();
177
155
unInterruptedSleep (100 );
178
156
// We need to creat a separate thread to interrupt it externally.
179
157
Thread leaseWaitingThread = new Thread (() -> {
180
158
try {
181
- client .allTypes (l -> l .overrideConfiguration (b -> b .apiCallAttemptTimeout (Duration .ofMillis (SERVER_RESPONSE_DELAY / 3 ))));
159
+ client .allTypes (l -> l .overrideConfiguration (b -> b .apiCallAttemptTimeout (Duration .ofMillis (responseDelay / 3 ))));
182
160
} catch (Exception e ) {
183
161
exception .setException (e );
184
162
}
185
163
});
186
164
leaseWaitingThread .start ();
187
- unInterruptedSleep (SERVER_RESPONSE_DELAY - SERVER_RESPONSE_DELAY / 10 );
165
+ unInterruptedSleep (responseDelay - responseDelay / 10 );
188
166
leaseWaitingThread .interrupt ();
189
167
leaseWaitingThread .join ();
190
168
assertThat (exception .getException ()).isInstanceOf (ApiCallAttemptTimeoutException .class );
191
169
client .close ();
192
170
}
193
171
194
-
195
- @ Test
196
- void sdkClientInterrupted_while_connectionIsInProgress () throws InterruptedException {
197
- SdkHttpClient httpClient = ApacheHttpClient .create ();
198
- Integer SERVER_RESPONSE_DELAY = 3000 ;
199
- stubPostRequest ("/2016-03-11/allTypes" , aResponse ().withFixedDelay (SERVER_RESPONSE_DELAY ), SAMPLE_BODY );
200
- ExceptionInThreadRun exception = new ExceptionInThreadRun ();
201
- ProtocolRestJsonClient client =
202
- getClient (httpClient , Duration .ofMillis (10 )).overrideConfiguration (o -> o .retryPolicy (RetryPolicy .none ())).build ();
203
- unInterruptedSleep (100 );
204
- // We need to creat a separate thread to interrupt it externally.
205
- Thread leaseWaitingThread = new Thread (() -> {
206
- try {
207
- client .allTypes (l -> l .overrideConfiguration (b -> b .apiCallAttemptTimeout (Duration .ofMillis (SERVER_RESPONSE_DELAY * 3 ))));
208
- } catch (Exception e ) {
209
- exception .setException (e );
210
- }
211
- });
212
- leaseWaitingThread .start ();
213
- unInterruptedSleep (SERVER_RESPONSE_DELAY - SERVER_RESPONSE_DELAY / 10 );
214
- leaseWaitingThread .interrupt ();
215
- leaseWaitingThread .join ();
216
- assertThat (exception .getException ()).isInstanceOf (AbortedException .class );
217
- client .close ();
218
- }
219
-
220
172
private class ExceptionInThreadRun {
221
173
private Exception exception ;
222
174
public Exception getException () {
0 commit comments