13
13
14
14
package com .rabbitmq .stream .impl ;
15
15
16
+ import static com .rabbitmq .stream .impl .AsyncRetry .asyncRetry ;
16
17
import static com .rabbitmq .stream .impl .Utils .convertCodeToException ;
17
18
import static com .rabbitmq .stream .impl .Utils .exceptionMessage ;
18
19
import static com .rabbitmq .stream .impl .Utils .formatConstant ;
@@ -242,10 +243,7 @@ class StreamEnvironment implements Environment {
242
243
: clientParametersPrototype .codec ();
243
244
this .clockRefreshFuture =
244
245
this .scheduledExecutorService .scheduleAtFixedRate (
245
- Utils .namedRunnable (() -> this .clock .refresh (), "Background clock refresh" ),
246
- 1 ,
247
- 1 ,
248
- SECONDS );
246
+ namedRunnable (this .clock ::refresh , "Background clock refresh" ), 1 , 1 , SECONDS );
249
247
}
250
248
251
249
private ShutdownListener shutdownListener (
@@ -261,7 +259,7 @@ private ShutdownListener shutdownListener(
261
259
try {
262
260
Client .ClientParameters newLocatorParameters =
263
261
this .locatorParametersCopy ().shutdownListener (shutdownListenerReference .get ());
264
- AsyncRetry . asyncRetry (
262
+ asyncRetry (
265
263
() -> {
266
264
LOGGER .debug ("Locator reconnection..." );
267
265
Address resolvedAddress = addressResolver .resolve (locator .address ());
@@ -285,7 +283,7 @@ private ShutdownListener shutdownListener(
285
283
.scheduler (this .scheduledExecutorService )
286
284
.delayPolicy (recoveryBackOffDelayPolicy )
287
285
.build ()
288
- .thenAccept (newClient -> locator . client ( newClient ) )
286
+ .thenAccept (locator :: client )
289
287
.exceptionally (
290
288
ex -> {
291
289
LOGGER .debug ("Locator recovery failed" , ex );
@@ -309,7 +307,7 @@ private void scheduleLocatorConnection(
309
307
try {
310
308
Client .ClientParameters newLocatorParameters =
311
309
this .locatorParametersCopy ().shutdownListener (shutdownListener );
312
- AsyncRetry . asyncRetry (
310
+ asyncRetry (
313
311
() -> {
314
312
LOGGER .debug ("Locator reconnection..." );
315
313
Address resolvedAddress = addressResolver .resolve (locator .address ());
@@ -333,7 +331,7 @@ private void scheduleLocatorConnection(
333
331
.scheduler (this .scheduledExecutorService )
334
332
.delayPolicy (recoveryBackOffDelayPolicy )
335
333
.build ()
336
- .thenAccept (newClient -> locator . client ( newClient ) )
334
+ .thenAccept (locator :: client )
337
335
.exceptionally (
338
336
ex -> {
339
337
LOGGER .debug ("Locator recovery failed" , ex );
@@ -652,18 +650,16 @@ Runnable registerConsumer(
652
650
MessageHandler messageHandler ,
653
651
Map <String , String > subscriptionProperties ,
654
652
ConsumerFlowStrategy flowStrategy ) {
655
- Runnable closingCallback =
656
- this .consumersCoordinator .subscribe (
657
- consumer ,
658
- stream ,
659
- offsetSpecification ,
660
- trackingReference ,
661
- subscriptionListener ,
662
- trackingClosingCallback ,
663
- messageHandler ,
664
- subscriptionProperties ,
665
- flowStrategy );
666
- return closingCallback ;
653
+ return this .consumersCoordinator .subscribe (
654
+ consumer ,
655
+ stream ,
656
+ offsetSpecification ,
657
+ trackingReference ,
658
+ subscriptionListener ,
659
+ trackingClosingCallback ,
660
+ messageHandler ,
661
+ subscriptionProperties ,
662
+ flowStrategy );
667
663
}
668
664
669
665
Runnable registerProducer (StreamProducer producer , String reference , String stream ) {
@@ -674,12 +670,12 @@ Client locator() {
674
670
return this .locators .stream ()
675
671
.filter (Locator ::isSet )
676
672
.findAny ()
677
- .orElseThrow (() -> new LocatorNotAvailableException () )
673
+ .orElseThrow (LocatorNotAvailableException :: new )
678
674
.client ();
679
675
}
680
676
681
677
<T > T locatorOperation (Function <Client , T > operation ) {
682
- return locatorOperation (operation , () -> locator () , this .recoveryBackOffDelayPolicy );
678
+ return locatorOperation (operation , this :: locator , this .recoveryBackOffDelayPolicy );
683
679
}
684
680
685
681
static <T > T locatorOperation (
@@ -801,7 +797,7 @@ TrackingConsumerRegistration registerTrackingConsumer(
801
797
: offsetTrackingRegistration .trackingCallback (),
802
798
offsetTrackingRegistration == null
803
799
? Utils .NO_OP_LONG_SUPPLIER
804
- : () -> offsetTrackingRegistration . flush () );
800
+ : offsetTrackingRegistration :: flush );
805
801
}
806
802
807
803
@ Override
@@ -900,7 +896,7 @@ private boolean isSet() {
900
896
}
901
897
902
898
private Client client () {
903
- return this .client .orElseThrow (() -> new LocatorNotAvailableException () );
899
+ return this .client .orElseThrow (LocatorNotAvailableException :: new );
904
900
}
905
901
906
902
private Client nullableClient () {
0 commit comments