16
16
17
17
package com .rabbitmq .stream ;
18
18
19
+ import static com .rabbitmq .stream .TestUtils .*;
19
20
import static com .rabbitmq .stream .TestUtils .ResponseConditions .ok ;
20
- import static com .rabbitmq .stream .TestUtils .waitAtMost ;
21
- import static com .rabbitmq .stream .TestUtils .waitUntil ;
22
21
import static org .assertj .core .api .Assertions .assertThat ;
23
22
import static org .assertj .core .api .Assertions .fail ;
24
23
@@ -164,7 +163,7 @@ void leaderFailureWhenPublisherConnectedToReplica() throws Exception {
164
163
new Client .ClientParameters ()
165
164
.port (TestUtils .streamPortNode1 ())
166
165
.messageListener (
167
- (subscriptionId , offset , chunkTimestamp , committedChunkId , msg ) -> {
166
+ (subscriptionId , offset , chunkTimestamp , committedChunkId , context , msg ) -> {
168
167
bodies .add (new String (msg .getBodyAsBinary (), StandardCharsets .UTF_8 ));
169
168
consumeLatch .countDown ();
170
169
}));
@@ -341,11 +340,14 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
341
340
cf .get (
342
341
new Client .ClientParameters ()
343
342
.port (m .getReplicas ().get (0 ).getPort ())
344
- .chunkListener (
345
- (client1 , subscriptionId , offset , messageCount , dataSize ) ->
346
- client1 .credit (subscriptionId , 1 ))
343
+ .chunkListener (credit ())
347
344
.messageListener (
348
- (subscriptionId , offset , chunkTimestamp , committedChunkId , message ) -> {
345
+ (subscriptionId ,
346
+ offset ,
347
+ chunkTimestamp ,
348
+ committedChunkId ,
349
+ context ,
350
+ message ) -> {
349
351
consumed .add (message );
350
352
generations .add ((Long ) message .getApplicationProperties ().get ("generation" ));
351
353
if (consumed .size () == confirmed .size ()) {
@@ -447,7 +449,7 @@ void consumerReattachesToOtherReplicaWhenReplicaGoesAway() throws Exception {
447
449
Set <Long > generations = ConcurrentHashMap .newKeySet ();
448
450
Set <Long > consumedIds = ConcurrentHashMap .newKeySet ();
449
451
Client .MessageListener messageListener =
450
- (subscriptionId , offset , chunkTimestamp , committedChunkId , message ) -> {
452
+ (subscriptionId , offset , chunkTimestamp , committedChunkId , context , message ) -> {
451
453
consumed .add (message );
452
454
generations .add ((Long ) message .getApplicationProperties ().get ("generation" ));
453
455
consumedIds .add (message .getProperties ().getMessageIdAsLong ());
@@ -471,9 +473,7 @@ void consumerReattachesToOtherReplicaWhenReplicaGoesAway() throws Exception {
471
473
new Client .ClientParameters ()
472
474
.port (newReplicaPort )
473
475
.shutdownListener (shutdownListenerReference .get ())
474
- .chunkListener (
475
- (client1 , subscriptionId , offset , messageCount , dataSize ) ->
476
- client1 .credit (subscriptionId , 1 ))
476
+ .chunkListener (credit ())
477
477
.messageListener (messageListener ));
478
478
479
479
newConsumer .subscribe (
@@ -494,9 +494,7 @@ void consumerReattachesToOtherReplicaWhenReplicaGoesAway() throws Exception {
494
494
new Client .ClientParameters ()
495
495
.port (replica .getPort ())
496
496
.shutdownListener (shutdownListener )
497
- .chunkListener (
498
- (client1 , subscriptionId , offset , messageCount , dataSize ) ->
499
- client1 .credit (subscriptionId , 1 ))
497
+ .chunkListener (credit ())
500
498
.messageListener (messageListener ));
501
499
502
500
Client .Response response =
0 commit comments