Skip to content

Commit e424739

Browse files
committed
Fix correlation ID increment
Fixes #689
1 parent b0cc7af commit e424739

File tree

1 file changed

+49
-23
lines changed

1 file changed

+49
-23
lines changed

Diff for: src/main/java/com/rabbitmq/stream/impl/Client.java

+49-23
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
import java.util.concurrent.atomic.AtomicLong;
105105
import java.util.concurrent.atomic.AtomicReference;
106106
import java.util.function.Consumer;
107+
import java.util.function.Supplier;
107108
import java.util.function.ToLongFunction;
108109
import javax.net.ssl.SSLEngine;
109110
import javax.net.ssl.SSLHandshakeException;
@@ -178,7 +179,7 @@ public long applyAsLong(Object value) {
178179
private final CredentialsProvider credentialsProvider;
179180
private final Runnable nettyClosing;
180181
private final int maxFrameSize;
181-
private final boolean frameSizeCopped;
182+
private final boolean frameSizeCapped;
182183
private final EventLoopGroup eventLoopGroup;
183184
private final Map<String, String> clientProperties;
184185
private final String NETTY_HANDLER_FLUSH_CONSOLIDATION =
@@ -373,12 +374,18 @@ public void initChannel(SocketChannel ch) {
373374
new TuneState(
374375
parameters.requestedMaxFrameSize, (int) parameters.requestedHeartbeat.getSeconds());
375376
this.clientProperties = clientProperties(parameters.clientProperties);
377+
debug(() -> "exchanging peer properties");
376378
this.serverProperties = peerProperties();
379+
debug(() -> "peer properties exchanged");
380+
debug(() -> "starting SASL handshake");
377381
this.saslMechanisms = getSaslMechanisms();
382+
debug(() -> "SASL mechanisms supported by server ({})", this.saslMechanisms);
383+
debug(() -> "starting authentication");
378384
authenticate(this.credentialsProvider);
385+
debug(() -> "authenticated");
379386
this.tuneState.await(Duration.ofSeconds(10));
380387
this.maxFrameSize = this.tuneState.getMaxFrameSize();
381-
this.frameSizeCopped = this.maxFrameSize() > 0;
388+
this.frameSizeCapped = this.maxFrameSize() > 0;
382389
LOGGER.debug(
383390
"Connection tuned with max frame size {} and heartbeat {}",
384391
this.maxFrameSize(),
@@ -418,6 +425,8 @@ public void initChannel(SocketChannel ch) {
418425
started.set(true);
419426
this.metricsCollector.openConnection();
420427
} catch (RuntimeException e) {
428+
LOGGER.debug(
429+
"Error while opening connection {}: {}", this.clientConnectionName, e.getMessage());
421430
this.closingSequence(null);
422431
throw e;
423432
}
@@ -462,10 +471,14 @@ int maxFrameSize() {
462471
return this.maxFrameSize;
463472
}
464473

474+
private int nextCorrelationId() {
475+
return this.correlationSequence.getAndIncrement();
476+
}
477+
465478
private Map<String, String> peerProperties() {
466479
int clientPropertiesSize = mapSize(this.clientProperties);
467480
int length = 2 + 2 + 4 + clientPropertiesSize;
468-
int correlationId = correlationSequence.incrementAndGet();
481+
int correlationId = nextCorrelationId();
469482
try {
470483
ByteBuf bb = allocateNoCheck(length + 4);
471484
bb.writeInt(length);
@@ -474,6 +487,7 @@ private Map<String, String> peerProperties() {
474487
bb.writeInt(correlationId);
475488
writeMap(bb, this.clientProperties);
476489
OutstandingRequest<Map<String, String>> request = outstandingRequest();
490+
LOGGER.debug("Peer properties request has correlation ID {}", correlationId);
477491
outstandingRequests.put(correlationId, request);
478492
channel.writeAndFlush(bb);
479493
request.block();
@@ -539,7 +553,7 @@ private SaslAuthenticateResponse sendSaslAuthenticate(
539553
+ saslMechanism.getName().length()
540554
+ 4
541555
+ (challengeResponse == null ? 0 : challengeResponse.length);
542-
int correlationId = correlationSequence.incrementAndGet();
556+
int correlationId = nextCorrelationId();
543557
try {
544558
ByteBuf bb = allocateNoCheck(length + 4);
545559
bb.writeInt(length);
@@ -569,7 +583,7 @@ private SaslAuthenticateResponse sendSaslAuthenticate(
569583

570584
private Map<String, String> open(String virtualHost) {
571585
int length = 2 + 2 + 4 + 2 + virtualHost.length();
572-
int correlationId = correlationSequence.incrementAndGet();
586+
int correlationId = nextCorrelationId();
573587
try {
574588
ByteBuf bb = allocate(length + 4);
575589
bb.writeInt(length);
@@ -610,7 +624,7 @@ void send(byte[] content) {
610624

611625
private void sendClose(short code, String reason) {
612626
int length = 2 + 2 + 4 + 2 + 2 + reason.length();
613-
int correlationId = correlationSequence.incrementAndGet();
627+
int correlationId = nextCorrelationId();
614628
try {
615629
ByteBuf bb = allocate(length + 4);
616630
bb.writeInt(length);
@@ -643,7 +657,7 @@ private void sendClose(short code, String reason) {
643657

644658
private List<String> getSaslMechanisms() {
645659
int length = 2 + 2 + 4;
646-
int correlationId = correlationSequence.incrementAndGet();
660+
int correlationId = nextCorrelationId();
647661
try {
648662
ByteBuf bb = allocateNoCheck(length + 4);
649663
bb.writeInt(length);
@@ -670,7 +684,7 @@ public Response create(String stream) {
670684

671685
public Response create(String stream, Map<String, String> arguments) {
672686
int length = 2 + 2 + 4 + 2 + stream.length() + mapSize(arguments);
673-
int correlationId = correlationSequence.incrementAndGet();
687+
int correlationId = nextCorrelationId();
674688
try {
675689
ByteBuf bb = allocate(length + 4);
676690
bb.writeInt(length);
@@ -719,7 +733,7 @@ Response createSuperStream(
719733
+ collectionSize(partitions)
720734
+ collectionSize(bindingKeys)
721735
+ mapSize(arguments);
722-
int correlationId = correlationSequence.incrementAndGet();
736+
int correlationId = nextCorrelationId();
723737
try {
724738
ByteBuf bb = allocate(length + 4);
725739
bb.writeInt(length);
@@ -748,7 +762,7 @@ Response createSuperStream(
748762
Response deleteSuperStream(String superStream) {
749763
this.superStreamManagementCommandVersionsCheck.run();
750764
int length = 2 + 2 + 4 + 2 + superStream.length();
751-
int correlationId = correlationSequence.incrementAndGet();
765+
int correlationId = nextCorrelationId();
752766
try {
753767
ByteBuf bb = allocate(length + 4);
754768
bb.writeInt(length);
@@ -808,7 +822,7 @@ private static ByteBuf writeMap(ByteBuf bb, Map<String, String> elements) {
808822
}
809823

810824
ByteBuf allocate(ByteBufAllocator allocator, int capacity) {
811-
if (frameSizeCopped && capacity > this.maxFrameSize()) {
825+
if (frameSizeCapped && capacity > this.maxFrameSize()) {
812826
throw new IllegalArgumentException(
813827
"Cannot allocate "
814828
+ capacity
@@ -832,7 +846,7 @@ private ByteBuf allocateNoCheck(int capacity) {
832846

833847
public Response delete(String stream) {
834848
int length = 2 + 2 + 4 + 2 + stream.length();
835-
int correlationId = correlationSequence.incrementAndGet();
849+
int correlationId = nextCorrelationId();
836850
try {
837851
ByteBuf bb = allocate(length + 4);
838852
bb.writeInt(length);
@@ -864,7 +878,7 @@ public Map<String, StreamMetadata> metadata(String... streams) {
864878
throw new IllegalArgumentException("At least one stream must be specified");
865879
}
866880
int length = 2 + 2 + 4 + arraySize(streams); // API code, version, correlation ID, array size
867-
int correlationId = correlationSequence.incrementAndGet();
881+
int correlationId = nextCorrelationId();
868882
try {
869883
ByteBuf bb = allocate(length + 4);
870884
bb.writeInt(length);
@@ -897,7 +911,7 @@ public Response declarePublisher(byte publisherId, String publisherReference, St
897911
"If specified, publisher reference must less than 256 characters");
898912
}
899913
int length = 2 + 2 + 4 + 1 + 2 + publisherReferenceSize + 2 + stream.length();
900-
int correlationId = correlationSequence.getAndIncrement();
914+
int correlationId = nextCorrelationId();
901915
try {
902916
ByteBuf bb = allocate(length + 4);
903917
bb.writeInt(length);
@@ -928,7 +942,7 @@ public Response declarePublisher(byte publisherId, String publisherReference, St
928942

929943
public Response deletePublisher(byte publisherId) {
930944
int length = 2 + 2 + 4 + 1;
931-
int correlationId = correlationSequence.getAndIncrement();
945+
int correlationId = nextCorrelationId();
932946
try {
933947
ByteBuf bb = allocate(length + 4);
934948
bb.writeInt(length);
@@ -1252,7 +1266,7 @@ public Response subscribe(
12521266
propertiesSize = mapSize(properties);
12531267
}
12541268
length += propertiesSize;
1255-
int correlationId = correlationSequence.getAndIncrement();
1269+
int correlationId = nextCorrelationId();
12561270
try {
12571271
ByteBuf bb = allocate(length + 4);
12581272
bb.writeInt(length);
@@ -1320,7 +1334,7 @@ public QueryOffsetResponse queryOffset(String reference, String stream) {
13201334
}
13211335

13221336
int length = 2 + 2 + 4 + 2 + reference.length() + 2 + stream.length();
1323-
int correlationId = correlationSequence.getAndIncrement();
1337+
int correlationId = nextCorrelationId();
13241338
try {
13251339
ByteBuf bb = allocate(length + 4);
13261340
bb.writeInt(length);
@@ -1361,7 +1375,7 @@ public long queryPublisherSequence(String publisherReference, String stream) {
13611375
}
13621376

13631377
int length = 2 + 2 + 4 + 2 + publisherReference.length() + 2 + stream.length();
1364-
int correlationId = correlationSequence.getAndIncrement();
1378+
int correlationId = nextCorrelationId();
13651379
try {
13661380
ByteBuf bb = allocate(length + 4);
13671381
bb.writeInt(length);
@@ -1398,7 +1412,7 @@ public long queryPublisherSequence(String publisherReference, String stream) {
13981412

13991413
public Response unsubscribe(byte subscriptionId) {
14001414
int length = 2 + 2 + 4 + 1;
1401-
int correlationId = correlationSequence.getAndIncrement();
1415+
int correlationId = nextCorrelationId();
14021416
try {
14031417
ByteBuf bb = allocate(length + 4);
14041418
bb.writeInt(length);
@@ -1445,6 +1459,8 @@ private void closeNetty() {
14451459
if (this.channel != null && this.channel.isOpen()) {
14461460
LOGGER.debug("Closing Netty channel");
14471461
this.channel.close().get(10, TimeUnit.SECONDS);
1462+
} else {
1463+
LOGGER.debug("No Netty channel to close");
14481464
}
14491465
} catch (InterruptedException e) {
14501466
LOGGER.info("Channel closing has been interrupted");
@@ -1530,7 +1546,7 @@ public List<String> route(String routingKey, String superStream) {
15301546
+ routingKey.length()
15311547
+ 2
15321548
+ superStream.length(); // API code, version, correlation ID, 2 strings
1533-
int correlationId = correlationSequence.incrementAndGet();
1549+
int correlationId = nextCorrelationId();
15341550
try {
15351551
ByteBuf bb = allocate(length + 4);
15361552
bb.writeInt(length);
@@ -1565,7 +1581,7 @@ public List<String> partitions(String superStream) {
15651581
}
15661582
int length =
15671583
2 + 2 + 4 + 2 + superStream.length(); // API code, version, correlation ID, 1 string
1568-
int correlationId = correlationSequence.incrementAndGet();
1584+
int correlationId = nextCorrelationId();
15691585
try {
15701586
ByteBuf bb = allocate(length + 4);
15711587
bb.writeInt(length);
@@ -1593,7 +1609,7 @@ List<FrameHandlerInfo> exchangeCommandVersions() {
15931609
List<FrameHandlerInfo> commandVersions = ServerFrameHandler.commandVersions();
15941610
int length = 2 + 2 + 4 + 4; // API code, version, correlation ID, array size
15951611
length += commandVersions.size() * (2 + 2 + 2);
1596-
int correlationId = correlationSequence.incrementAndGet();
1612+
int correlationId = nextCorrelationId();
15971613
try {
15981614
ByteBuf bb = allocate(length + 4);
15991615
bb.writeInt(length);
@@ -1626,7 +1642,7 @@ StreamStatsResponse streamStats(String stream) {
16261642
throw new IllegalArgumentException("stream must not be null");
16271643
}
16281644
int length = 2 + 2 + 4 + 2 + stream.length(); // API code, version, correlation ID, 1 string
1629-
int correlationId = correlationSequence.incrementAndGet();
1645+
int correlationId = nextCorrelationId();
16301646
try {
16311647
ByteBuf bb = allocate(length + 4);
16321648
bb.writeInt(length);
@@ -2583,6 +2599,10 @@ public ClientParameters bootstrapCustomizer(Consumer<Bootstrap> bootstrapCustomi
25832599
return this;
25842600
}
25852601

2602+
Duration rpcTimeout() {
2603+
return this.rpcTimeout;
2604+
}
2605+
25862606
ClientParameters duplicate() {
25872607
ClientParameters duplicate = new ClientParameters();
25882608
for (Field field : ClientParameters.class.getDeclaredFields()) {
@@ -2926,4 +2946,10 @@ private <T> OutstandingRequest<T> outstandingRequest() {
29262946
public String toString() {
29272947
return "Client{connectionName='" + connectionName() + "'}";
29282948
}
2949+
2950+
private void debug(Supplier<String> format, Object... args) {
2951+
if (LOGGER.isDebugEnabled()) {
2952+
LOGGER.debug("Connection '" + this.clientConnectionName + "': " + format.get(), args);
2953+
}
2954+
}
29292955
}

0 commit comments

Comments
 (0)