Skip to content

Commit 4604b0e

Browse files
committed
Exchange command versions if possible
For consumer connections, on RabbitMQ 3.11+. This is to benefit from deliver v2 if possible. References #158
1 parent 32fa40b commit 4604b0e

File tree

6 files changed

+105
-24
lines changed

6 files changed

+105
-24
lines changed

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -290,8 +290,8 @@ private MessageHandlerContext(
290290
long offset, long timestamp, long committedOffset, Consumer consumer) {
291291
this.offset = offset;
292292
this.timestamp = timestamp;
293-
this.consumer = consumer;
294293
this.committedOffset = committedOffset;
294+
this.consumer = consumer;
295295
}
296296

297297
@Override
@@ -438,7 +438,7 @@ private ClientSubscriptionsManager(
438438
subscriptionId & 0xFF,
439439
Utils.formatConstant(responseCode));
440440
MessageListener messageListener =
441-
(subscriptionId, offset, committedOffset, chunkTimestamp, message) -> {
441+
(subscriptionId, offset, chunkTimestamp, committedOffset, message) -> {
442442
SubscriptionTracker subscriptionTracker =
443443
subscriptionTrackers.get(subscriptionId & 0xFF);
444444
if (subscriptionTracker != null) {
@@ -559,6 +559,7 @@ private ClientSubscriptionsManager(
559559
.metadataListener(metadataListener))
560560
.key(owner.name);
561561
this.client = clientFactory.client(clientFactoryContext);
562+
maybeExchangeCommandVersions(client);
562563
clientInitializedInManager.set(true);
563564
}
564565

@@ -848,4 +849,14 @@ public String toString() {
848849
return "SubscriptionContext{" + "offsetSpecification=" + offsetSpecification + '}';
849850
}
850851
}
852+
853+
private static void maybeExchangeCommandVersions(Client client) {
854+
try {
855+
if (Utils.is3_11_OrMore(client.brokerVersion())) {
856+
client.exchangeCommandVersions();
857+
}
858+
} catch (Exception e) {
859+
LOGGER.info("Error while exchanging command versions: {}", e.getMessage());
860+
}
861+
}
851862
}

src/main/java/com/rabbitmq/stream/impl/Utils.java

+41
Original file line numberDiff line numberDiff line change
@@ -240,4 +240,45 @@ static Function<ClientConnectionType, String> defaultConnectionNamingStrategy(St
240240
static boolean offsetBefore(long x, long y) {
241241
return Long.compareUnsigned(x, y) < 0;
242242
}
243+
244+
private static String currentVersion(String currentVersion) {
245+
// versions built from source: 3.7.0+rc.1.4.gedc5d96
246+
if (currentVersion.contains("+")) {
247+
currentVersion = currentVersion.substring(0, currentVersion.indexOf("+"));
248+
}
249+
// alpha (snapshot) versions: 3.7.0~alpha.449-1
250+
if (currentVersion.contains("~")) {
251+
currentVersion = currentVersion.substring(0, currentVersion.indexOf("~"));
252+
}
253+
// alpha (snapshot) versions: 3.7.1-alpha.40
254+
if (currentVersion.contains("-")) {
255+
currentVersion = currentVersion.substring(0, currentVersion.indexOf("-"));
256+
}
257+
return currentVersion;
258+
}
259+
260+
/**
261+
* https://stackoverflow.com/questions/6701948/efficient-way-to-compare-version-strings-in-java
262+
*/
263+
static int versionCompare(String str1, String str2) {
264+
String[] vals1 = str1.split("\\.");
265+
String[] vals2 = str2.split("\\.");
266+
int i = 0;
267+
// set index to first non-equal ordinal or length of shortest version string
268+
while (i < vals1.length && i < vals2.length && vals1[i].equals(vals2[i])) {
269+
i++;
270+
}
271+
// compare first non-equal ordinal number
272+
if (i < vals1.length && i < vals2.length) {
273+
int diff = Integer.valueOf(vals1[i]).compareTo(Integer.valueOf(vals2[i]));
274+
return Integer.signum(diff);
275+
}
276+
// the strings are equal or one string is a substring of the other
277+
// e.g. "1.2.3" = "1.2.3" or "1.2.3" < "1.2.3.4"
278+
return Integer.signum(vals1.length - vals2.length);
279+
}
280+
281+
static boolean is3_11_OrMore(String brokerVersion) {
282+
return versionCompare(currentVersion(brokerVersion), "3.11.0") >= 0;
283+
}
243284
}

src/test/java/com/rabbitmq/stream/impl/ClientTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import com.rabbitmq.stream.impl.Client.Response;
3838
import com.rabbitmq.stream.impl.Client.StreamParametersBuilder;
3939
import com.rabbitmq.stream.impl.ServerFrameHandler.FrameHandlerInfo;
40+
import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast;
4041
import io.netty.buffer.ByteBufAllocator;
4142
import io.netty.buffer.UnpooledByteBufAllocator;
4243
import java.io.ByteArrayOutputStream;
@@ -830,7 +831,7 @@ void closingPublisherWhilePublishingShouldNotCloseConnection(String publisherRef
830831
}
831832

832833
@Test
833-
@Disabled
834+
@BrokerVersionAtLeast("3.11.0")
834835
void exchangeCommandVersions() {
835836
Client client = cf.get();
836837
List<FrameHandlerInfo> infos = client.exchangeCommandVersions();

src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java

+25
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import com.rabbitmq.stream.StreamDoesNotExistException;
3636
import com.rabbitmq.stream.impl.Client.QueryOffsetResponse;
3737
import com.rabbitmq.stream.impl.MonitoringTestUtils.ConsumerInfo;
38+
import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast;
3839
import com.rabbitmq.stream.impl.TestUtils.DisabledIfRabbitMqCtlNotSet;
3940
import io.netty.channel.EventLoopGroup;
4041
import java.time.Duration;
@@ -146,6 +147,30 @@ void nameShouldBeSetIfTrackingStrategyIsSet() {
146147
});
147148
}
148149

150+
@Test
151+
@BrokerVersionAtLeast("3.11.0")
152+
void committedOffsetShouldBeSet() throws Exception {
153+
int messageCount = 20_000;
154+
TestUtils.publishAndWaitForConfirms(cf, messageCount, this.stream);
155+
156+
CountDownLatch consumeLatch = new CountDownLatch(messageCount);
157+
AtomicLong committedOffset = new AtomicLong();
158+
Consumer consumer =
159+
environment.consumerBuilder().stream(stream)
160+
.offset(OffsetSpecification.first())
161+
.messageHandler(
162+
(context, message) -> {
163+
committedOffset.set(context.committedOffset());
164+
consumeLatch.countDown();
165+
})
166+
.build();
167+
168+
assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
169+
assertThat(committedOffset.get()).isNotZero();
170+
171+
consumer.close();
172+
}
173+
149174
@Test
150175
void consume() throws Exception {
151176
int messageCount = 100_000;

src/test/java/com/rabbitmq/stream/impl/TestUtils.java

+2-21
Original file line numberDiff line numberDiff line change
@@ -414,34 +414,15 @@ static boolean atLeastVersion(String expectedVersion, String currentVersion) {
414414
}
415415
try {
416416
currentVersion = currentVersion(currentVersion);
417-
return "0.0.0".equals(currentVersion) || versionCompare(currentVersion, expectedVersion) >= 0;
417+
return "0.0.0".equals(currentVersion) || Utils.versionCompare(currentVersion, expectedVersion) >= 0;
418418
} catch (RuntimeException e) {
419419
LoggerFactory.getLogger(TestUtils.class)
420420
.warn("Unable to parse broker version {}", currentVersion, e);
421421
throw e;
422422
}
423423
}
424424

425-
/**
426-
* https://stackoverflow.com/questions/6701948/efficient-way-to-compare-version-strings-in-java
427-
*/
428-
static int versionCompare(String str1, String str2) {
429-
String[] vals1 = str1.split("\\.");
430-
String[] vals2 = str2.split("\\.");
431-
int i = 0;
432-
// set index to first non-equal ordinal or length of shortest version string
433-
while (i < vals1.length && i < vals2.length && vals1[i].equals(vals2[i])) {
434-
i++;
435-
}
436-
// compare first non-equal ordinal number
437-
if (i < vals1.length && i < vals2.length) {
438-
int diff = Integer.valueOf(vals1[i]).compareTo(Integer.valueOf(vals2[i]));
439-
return Integer.signum(diff);
440-
}
441-
// the strings are equal or one string is a substring of the other
442-
// e.g. "1.2.3" = "1.2.3" or "1.2.3" < "1.2.3.4"
443-
return Integer.signum(vals1.length - vals2.length);
444-
}
425+
445426

446427
@Target({ElementType.TYPE, ElementType.METHOD})
447428
@Retention(RetentionPolicy.RUNTIME)

src/test/java/com/rabbitmq/stream/impl/UtilsTest.java

+22
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import java.util.function.Predicate;
3838
import java.util.stream.IntStream;
3939
import org.junit.jupiter.api.Test;
40+
import org.junit.jupiter.params.ParameterizedTest;
41+
import org.junit.jupiter.params.provider.CsvSource;
4042

4143
public class UtilsTest {
4244

@@ -102,4 +104,24 @@ void testOffsetBefore() {
102104
assertThat(offsetBefore(Long.MAX_VALUE + 10, Long.MAX_VALUE + 10)).isFalse();
103105
assertThat(offsetBefore(Long.MAX_VALUE + 10, Long.MAX_VALUE + 20)).isTrue();
104106
}
107+
108+
@ParameterizedTest
109+
@CsvSource({
110+
"3.8.0+rc.1.2186.g95f3fde,false",
111+
"3.9.21,false",
112+
"3.9.22-alpha.13,false",
113+
"3.10.6,false",
114+
"3.11.0-alpha.15,true",
115+
"3.11.0,true",
116+
"3.11.1,true",
117+
"4.0.0-alpha.15,true",
118+
"4.0.0,true",
119+
"4.0.1,true",
120+
"4.1.0-alpha.15,true",
121+
"4.1.0,true",
122+
"4.1.1,true",
123+
})
124+
void is_3_11_OrMore(String input, boolean expected) {
125+
assertThat(Utils.is3_11_OrMore(input)).isEqualTo(expected);
126+
}
105127
}

0 commit comments

Comments
 (0)