@@ -171,18 +171,8 @@ public String getPassword() {
171
171
@ Test public void thatShutdownHooksOnConnectionFireBeforeRecoveryStarts () throws IOException , InterruptedException {
172
172
final List <String > events = new CopyOnWriteArrayList <String >();
173
173
final CountDownLatch latch = new CountDownLatch (3 ); // one when started, another when complete
174
- connection .addShutdownListener (new ShutdownListener () {
175
- @ Override
176
- public void shutdownCompleted (ShutdownSignalException cause ) {
177
- events .add ("shutdown hook 1" );
178
- }
179
- });
180
- connection .addShutdownListener (new ShutdownListener () {
181
- @ Override
182
- public void shutdownCompleted (ShutdownSignalException cause ) {
183
- events .add ("shutdown hook 2" );
184
- }
185
- });
174
+ connection .addShutdownListener (cause -> events .add ("shutdown hook 1" ));
175
+ connection .addShutdownListener (cause -> events .add ("shutdown hook 2" ));
186
176
// note: we do not want to expose RecoveryCanBeginListener so this
187
177
// test does not use it
188
178
final CountDownLatch recoveryCanBeginLatch = new CountDownLatch (1 );
@@ -220,12 +210,7 @@ public void handleTopologyRecoveryStarted(Recoverable recoverable) {
220
210
221
211
@ Test public void shutdownHooksRecoveryOnConnection () throws IOException , InterruptedException {
222
212
final CountDownLatch latch = new CountDownLatch (2 );
223
- connection .addShutdownListener (new ShutdownListener () {
224
- @ Override
225
- public void shutdownCompleted (ShutdownSignalException cause ) {
226
- latch .countDown ();
227
- }
228
- });
213
+ connection .addShutdownListener (cause -> latch .countDown ());
229
214
assertThat (connection .isOpen ()).isTrue ();
230
215
closeAndWaitForRecovery ();
231
216
assertThat (connection .isOpen ()).isTrue ();
@@ -235,12 +220,7 @@ public void shutdownCompleted(ShutdownSignalException cause) {
235
220
236
221
@ Test public void shutdownHooksRecoveryOnChannel () throws IOException , InterruptedException {
237
222
final CountDownLatch latch = new CountDownLatch (3 );
238
- channel .addShutdownListener (new ShutdownListener () {
239
- @ Override
240
- public void shutdownCompleted (ShutdownSignalException cause ) {
241
- latch .countDown ();
242
- }
243
- });
223
+ channel .addShutdownListener (cause -> latch .countDown ());
244
224
assertThat (connection .isOpen ()).isTrue ();
245
225
closeAndWaitForRecovery ();
246
226
assertThat (connection .isOpen ()).isTrue ();
@@ -254,12 +234,12 @@ public void shutdownCompleted(ShutdownSignalException cause) {
254
234
final CountDownLatch latch = new CountDownLatch (2 );
255
235
connection .addBlockedListener (new BlockedListener () {
256
236
@ Override
257
- public void handleBlocked (String reason ) throws IOException {
237
+ public void handleBlocked (String reason ) {
258
238
latch .countDown ();
259
239
}
260
240
261
241
@ Override
262
- public void handleUnblocked () throws IOException {
242
+ public void handleUnblocked () {
263
243
latch .countDown ();
264
244
}
265
245
});
@@ -299,14 +279,8 @@ public void handleUnblocked() throws IOException {
299
279
300
280
@ Test public void returnListenerRecovery () throws IOException , InterruptedException {
301
281
final CountDownLatch latch = new CountDownLatch (1 );
302
- channel .addReturnListener (new ReturnListener () {
303
- @ Override
304
- public void handleReturn (int replyCode , String replyText , String exchange ,
305
- String routingKey , AMQP .BasicProperties properties ,
306
- byte [] body ) throws IOException {
307
- latch .countDown ();
308
- }
309
- });
282
+ channel .addReturnListener (
283
+ (replyCode , replyText , exchange , routingKey , properties , body ) -> latch .countDown ());
310
284
closeAndWaitForRecovery ();
311
285
expectChannelRecovery (channel );
312
286
channel .basicPublish ("" , "unknown" , true , false , null , "mandatory1" .getBytes ());
@@ -317,12 +291,12 @@ public void handleReturn(int replyCode, String replyText, String exchange,
317
291
final CountDownLatch latch = new CountDownLatch (1 );
318
292
channel .addConfirmListener (new ConfirmListener () {
319
293
@ Override
320
- public void handleAck (long deliveryTag , boolean multiple ) throws IOException {
294
+ public void handleAck (long deliveryTag , boolean multiple ) {
321
295
latch .countDown ();
322
296
}
323
297
324
298
@ Override
325
- public void handleNack (long deliveryTag , boolean multiple ) throws IOException {
299
+ public void handleNack (long deliveryTag , boolean multiple ) {
326
300
latch .countDown ();
327
301
}
328
302
});
@@ -425,13 +399,10 @@ private void testClientNamedQueueRecoveryWith(String q, boolean noWait) throws I
425
399
final AtomicReference <String > nameBefore = new AtomicReference <String >(q );
426
400
final AtomicReference <String > nameAfter = new AtomicReference <String >();
427
401
final CountDownLatch listenerLatch = new CountDownLatch (1 );
428
- ((AutorecoveringConnection )connection ).addQueueRecoveryListener (new QueueRecoveryListener () {
429
- @ Override
430
- public void queueRecovered (String oldName , String newName ) {
431
- nameBefore .set (oldName );
432
- nameAfter .set (newName );
433
- listenerLatch .countDown ();
434
- }
402
+ ((AutorecoveringConnection )connection ).addQueueRecoveryListener ((oldName , newName ) -> {
403
+ nameBefore .set (oldName );
404
+ nameAfter .set (newName );
405
+ listenerLatch .countDown ();
435
406
});
436
407
ch .queueBind (nameBefore .get (), x , "" );
437
408
restartPrimaryAndWaitForRecovery ();
@@ -673,14 +644,12 @@ public void queueRecovered(String oldName, String newName) {
673
644
final AtomicReference <String > tagA = new AtomicReference <String >();
674
645
final AtomicReference <String > tagB = new AtomicReference <String >();
675
646
final CountDownLatch listenerLatch = new CountDownLatch (n );
676
- ((AutorecoveringConnection )connection ).addConsumerRecoveryListener (new ConsumerRecoveryListener () {
677
- @ Override
678
- public void consumerRecovered (String oldConsumerTag , String newConsumerTag ) {
647
+ ((AutorecoveringConnection )connection ).addConsumerRecoveryListener (
648
+ (oldConsumerTag , newConsumerTag ) -> {
679
649
tagA .set (oldConsumerTag );
680
650
tagB .set (newConsumerTag );
681
651
listenerLatch .countDown ();
682
- }
683
- });
652
+ });
684
653
685
654
assertConsumerCount (n , q );
686
655
closeAndWaitForRecovery ();
@@ -830,7 +799,8 @@ public void handleDelivery(String consumerTag,
830
799
831
800
@ Test public void recoveryWithMultipleThreads () throws Exception {
832
801
// test with 8 recovery threads
833
- final ThreadPoolExecutor executor = new ThreadPoolExecutor (8 , 8 , 30 , TimeUnit .SECONDS , new LinkedBlockingQueue <Runnable >());
802
+ final ThreadPoolExecutor executor = new ThreadPoolExecutor (8 , 8 , 30 , TimeUnit .SECONDS ,
803
+ new LinkedBlockingQueue <>());
834
804
executor .allowCoreThreadTimeOut (true );
835
805
ConnectionFactory connectionFactory = buildConnectionFactoryWithRecoveryEnabled (false );
836
806
assertThat (connectionFactory .getTopologyRecoveryExecutor ()).isNull ();
@@ -956,12 +926,7 @@ private static void expectExchangeRecovery(Channel ch, String x) throws IOExcept
956
926
957
927
private static CountDownLatch prepareForShutdown (Connection conn ) {
958
928
final CountDownLatch latch = new CountDownLatch (1 );
959
- conn .addShutdownListener (new ShutdownListener () {
960
- @ Override
961
- public void shutdownCompleted (ShutdownSignalException cause ) {
962
- latch .countDown ();
963
- }
964
- });
929
+ conn .addShutdownListener (cause -> latch .countDown ());
965
930
return latch ;
966
931
}
967
932
0 commit comments