30
30
import java .util .concurrent .Executors ;
31
31
import java .util .concurrent .Future ;
32
32
import java .util .concurrent .atomic .AtomicLong ;
33
- import java .util .stream .Stream ;
34
33
import org .junit .jupiter .api .BeforeEach ;
35
34
import org .junit .jupiter .api .Test ;
36
- import org .junit .jupiter .params .ParameterizedTest ;
37
- import org .junit .jupiter .params .provider .Arguments ;
38
- import org .junit .jupiter .params .provider .MethodSource ;
39
35
import software .amazon .awssdk .auth .credentials .AwsBasicCredentials ;
40
36
import software .amazon .awssdk .auth .credentials .StaticCredentialsProvider ;
41
37
import software .amazon .awssdk .core .exception .AbortedException ;
@@ -57,8 +53,7 @@ class SyncClientConnectionInterruptionTest {
57
53
public static final String SAMPLE_BODY = "{\" StringMember"
58
54
+ "\" :\" resultString\" }" ;
59
55
private final WireMockServer mockServer = new WireMockServer (new WireMockConfiguration ()
60
- .bindAddress ("localhost" )
61
- .dynamicPort ());
56
+ .bindAddress ("localhost" ).dynamicPort ());
62
57
@ BeforeEach
63
58
public void setup () {
64
59
mockServer .start ();
@@ -67,16 +62,16 @@ public void setup() {
67
62
68
63
@ Test
69
64
void connectionPoolsGetsReusedWhenInterruptedWith_1_MaxConnection () throws Exception {
70
- Integer LONG_DELAY = 1500 ;
65
+ Integer responseDelay = 1500 ;
71
66
72
67
String urlRegex = "/2016-03-11/allTypes" ;
73
- stubPostRequest (urlRegex , aResponse ().withFixedDelay (LONG_DELAY ), SAMPLE_BODY );
68
+ stubPostRequest (urlRegex , aResponse ().withFixedDelay (responseDelay ), SAMPLE_BODY );
74
69
SdkHttpClient httpClient = ApacheHttpClient .builder ().maxConnections (1 ).build ();
75
- ProtocolRestJsonClient client = getClient (httpClient , Duration .ofMillis (2L * LONG_DELAY )).build ();
70
+ ProtocolRestJsonClient client = getClient (httpClient , Duration .ofMillis (2L * responseDelay )).build ();
76
71
77
72
ExecutorService executorService = Executors .newFixedThreadPool (5 );
78
73
Future <?> toBeInterruptedFuture = executorService .submit (() -> client .allTypes ());
79
- unInterruptedSleep (LONG_DELAY - LONG_DELAY / 5 );
74
+ unInterruptedSleep (responseDelay - responseDelay / 5 );
80
75
toBeInterruptedFuture .cancel (true );
81
76
// Make sure thread start the Http connections
82
77
unInterruptedSleep (50 );
@@ -85,56 +80,28 @@ void connectionPoolsGetsReusedWhenInterruptedWith_1_MaxConnection() throws Excep
85
80
executorService .shutdownNow ();
86
81
}
87
82
88
- @ Test
89
- void connectionPoolsGetsReusedWhenInterruptedWith_Multiple_MaxConnection () throws Exception {
90
- Integer LONG_DELAY = 1000 ;
91
- Integer VERY_VERY_LONG_DELAY = LONG_DELAY * 5 ;
92
- stubPostRequest ("/2016-03-11/allTypes" , aResponse ().withFixedDelay (LONG_DELAY ), SAMPLE_BODY );
93
- stubPostRequest ("/2016-03-11/JsonValuesOperation" , aResponse ().withFixedDelay (VERY_VERY_LONG_DELAY ), SAMPLE_BODY );
94
- SdkHttpClient httpClient = ApacheHttpClient .builder ().maxConnections (3 ).build ();
95
- Duration timeOutDuration = Duration .ofMillis (2L * LONG_DELAY );
96
- ProtocolRestJsonClient client = getClient (httpClient , timeOutDuration ).build ();
97
-
98
- ExecutorService executorService = Executors .newFixedThreadPool (5 );
99
- Future <?> toBeInterruptedFuture0 = executorService .submit (() -> client .allTypes ());
100
- Future <?> toBeInterruptedFuture1 = executorService .submit (() -> client .allTypes ());
101
- Future <?> toBeInterruptedFuture2 = executorService .submit (() -> client .allTypes ());
102
- unInterruptedSleep (50 );
103
- executorService .submit (() -> client .jsonValuesOperation ());
104
- unInterruptedSleep (LONG_DELAY / 2 );
105
- toBeInterruptedFuture0 .cancel (true );
106
- toBeInterruptedFuture1 .cancel (true );
107
- toBeInterruptedFuture2 .cancel (true );
108
- unInterruptedSleep (LONG_DELAY / 2 );
109
- // Make sure thread start the Http connections
110
- AllTypesResponse allTypesResponse = client .allTypes ();
111
- assertThat (allTypesResponse .stringMember ()).isEqualTo ("resultString" );
112
- executorService .shutdownNow ();
113
- }
114
-
115
83
@ Test
116
84
void interruptionWhenWaitingForLease_AbortsImmediately () throws InterruptedException {
117
- Integer LONG_DELAY = 5000 ;
85
+ Integer responseDelay = 50000 ;
118
86
ExceptionInThreadRun exceptionInThreadRun = new ExceptionInThreadRun ();
119
- AtomicLong leaseWaitingTime = new AtomicLong (LONG_DELAY );
120
- stubPostRequest ("/2016-03-11/allTypes" , aResponse ().withFixedDelay (LONG_DELAY ), SAMPLE_BODY );
87
+ AtomicLong leaseWaitingTime = new AtomicLong (responseDelay );
88
+ stubPostRequest ("/2016-03-11/allTypes" , aResponse ().withFixedDelay (responseDelay ), SAMPLE_BODY );
121
89
SdkHttpClient httpClient = ApacheHttpClient .builder ().maxConnections (1 ).build ();
122
- ProtocolRestJsonClient client = getClient (httpClient , Duration .ofMillis (2L * LONG_DELAY )).build ();
90
+ ProtocolRestJsonClient client = getClient (httpClient , Duration .ofMillis (2L * responseDelay )).build ();
123
91
ExecutorService executorService = Executors .newFixedThreadPool (5 );
124
92
executorService .submit (() -> client .allTypes ());
125
- unInterruptedSleep (100 );
93
+ // 1 Sec sleep to make sure Thread 1 is picked for executing Http connection
94
+ unInterruptedSleep (1000 );
126
95
Thread leaseWaitingThread = new Thread (() -> {
127
96
128
97
try {
129
98
client .allTypes (l -> l .overrideConfiguration (
130
99
b -> b
131
- .apiCallAttemptTimeout (Duration .ofSeconds (10 ))
132
100
.addMetricPublisher (new MetricPublisher () {
133
101
@ Override
134
102
public void publish (MetricCollection metricCollection ) {
135
- System .out .println (metricCollection );
136
103
Optional <MetricRecord <?>> apiCallDuration =
137
- metricCollection .stream ().filter (o -> "ApiCallDuration" .equals (o .metric ().name ())).findAny ();
104
+ metricCollection .stream ().filter (o -> "ApiCallDuration" .equals (o .metric ().name ())).findAny ();
138
105
leaseWaitingTime .set (Duration .parse (apiCallDuration .get ().value ().toString ()).toMillis ());
139
106
}
140
107
@@ -151,11 +118,13 @@ public void close() {
151
118
});
152
119
153
120
leaseWaitingThread .start ();
154
- unInterruptedSleep (100 );
121
+ // 1 sec sleep to make sure Http connection execution is initialized for Thread 2 , in this case it will wait for lease
122
+ // and immediately terminate on interrupt
123
+ unInterruptedSleep (1000 );
155
124
leaseWaitingThread .interrupt ();
156
125
leaseWaitingThread .join ();
157
- assertThat (leaseWaitingTime .get ()).isNotEqualTo (LONG_DELAY .longValue ());
158
- assertThat (leaseWaitingTime .get ()).isLessThan (LONG_DELAY .longValue ());
126
+ assertThat (leaseWaitingTime .get ()).isNotEqualTo (responseDelay .longValue ());
127
+ assertThat (leaseWaitingTime .get ()).isLessThan (responseDelay .longValue ());
159
128
assertThat (exceptionInThreadRun .getException ()).isInstanceOf (AbortedException .class );
160
129
client .close ();
161
130
}
@@ -169,54 +138,28 @@ public void close() {
169
138
@ Test
170
139
void interruptionDueToApiTimeOut_followed_byInterruptCausesOnlyTimeOutException () throws InterruptedException {
171
140
SdkHttpClient httpClient = ApacheHttpClient .create ();
172
- Integer SERVER_RESPONSE_DELAY = 3000 ;
173
- stubPostRequest ("/2016-03-11/allTypes" , aResponse ().withFixedDelay (SERVER_RESPONSE_DELAY ), SAMPLE_BODY );
141
+ Integer responseDelay = 3000 ;
142
+ stubPostRequest ("/2016-03-11/allTypes" , aResponse ().withFixedDelay (responseDelay ), SAMPLE_BODY );
174
143
ExceptionInThreadRun exception = new ExceptionInThreadRun ();
175
144
ProtocolRestJsonClient client =
176
145
getClient (httpClient , Duration .ofMillis (10 )).overrideConfiguration (o -> o .retryPolicy (RetryPolicy .none ())).build ();
177
146
unInterruptedSleep (100 );
178
147
// We need to creat a separate thread to interrupt it externally.
179
148
Thread leaseWaitingThread = new Thread (() -> {
180
149
try {
181
- client .allTypes (l -> l .overrideConfiguration (b -> b .apiCallAttemptTimeout (Duration .ofMillis (SERVER_RESPONSE_DELAY / 3 ))));
150
+ client .allTypes (l -> l .overrideConfiguration (b -> b .apiCallAttemptTimeout (Duration .ofMillis (responseDelay / 3 ))));
182
151
} catch (Exception e ) {
183
152
exception .setException (e );
184
153
}
185
154
});
186
155
leaseWaitingThread .start ();
187
- unInterruptedSleep (SERVER_RESPONSE_DELAY - SERVER_RESPONSE_DELAY / 10 );
156
+ unInterruptedSleep (responseDelay - responseDelay / 10 );
188
157
leaseWaitingThread .interrupt ();
189
158
leaseWaitingThread .join ();
190
159
assertThat (exception .getException ()).isInstanceOf (ApiCallAttemptTimeoutException .class );
191
160
client .close ();
192
161
}
193
162
194
-
195
- @ Test
196
- void sdkClientInterrupted_while_connectionIsInProgress () throws InterruptedException {
197
- SdkHttpClient httpClient = ApacheHttpClient .create ();
198
- Integer SERVER_RESPONSE_DELAY = 3000 ;
199
- stubPostRequest ("/2016-03-11/allTypes" , aResponse ().withFixedDelay (SERVER_RESPONSE_DELAY ), SAMPLE_BODY );
200
- ExceptionInThreadRun exception = new ExceptionInThreadRun ();
201
- ProtocolRestJsonClient client =
202
- getClient (httpClient , Duration .ofMillis (10 )).overrideConfiguration (o -> o .retryPolicy (RetryPolicy .none ())).build ();
203
- unInterruptedSleep (100 );
204
- // We need to creat a separate thread to interrupt it externally.
205
- Thread leaseWaitingThread = new Thread (() -> {
206
- try {
207
- client .allTypes (l -> l .overrideConfiguration (b -> b .apiCallAttemptTimeout (Duration .ofMillis (SERVER_RESPONSE_DELAY * 3 ))));
208
- } catch (Exception e ) {
209
- exception .setException (e );
210
- }
211
- });
212
- leaseWaitingThread .start ();
213
- unInterruptedSleep (SERVER_RESPONSE_DELAY - SERVER_RESPONSE_DELAY / 10 );
214
- leaseWaitingThread .interrupt ();
215
- leaseWaitingThread .join ();
216
- assertThat (exception .getException ()).isInstanceOf (AbortedException .class );
217
- client .close ();
218
- }
219
-
220
163
private class ExceptionInThreadRun {
221
164
private Exception exception ;
222
165
public Exception getException () {
0 commit comments