Skip to content

Commit d331dfb

Browse files
committed
Support StreamInfo command
References rabbitmq/rabbitmq-server#5412
1 parent c6168ae commit d331dfb

27 files changed

+373
-24
lines changed

src/main/java/com/rabbitmq/stream/Constants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ public final class Constants {
6868
public static final short COMMAND_ROUTE = 24;
6969
public static final short COMMAND_PARTITIONS = 25;
7070
public static final short COMMAND_EXCHANGE_COMMAND_VERSIONS = 27;
71+
public static final short COMMAND_STREAM_INFO = 28;
7172

7273
public static final short VERSION_1 = 1;
7374
public static final short VERSION_2 = 2;

src/main/java/com/rabbitmq/stream/Environment.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -61,6 +61,8 @@ static EnvironmentBuilder builder() {
6161
*/
6262
void deleteStream(String stream);
6363

64+
StreamInfo queryStreamInfo(String stream);
65+
6466
/**
6567
* Create a {@link ProducerBuilder} to configure and create a {@link Producer}.
6668
*

src/main/java/com/rabbitmq/stream/MessageHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Copyright (c) 2022 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
14+
package com.rabbitmq.stream;
15+
16+
public class NoOffsetException extends StreamException {
17+
18+
public NoOffsetException(String message) {
19+
super(message, Constants.RESPONSE_CODE_NO_OFFSET);
20+
}
21+
}

src/main/java/com/rabbitmq/stream/StreamDoesNotExistException.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -19,7 +19,7 @@ public class StreamDoesNotExistException extends StreamException {
1919
private final String stream;
2020

2121
public StreamDoesNotExistException(String stream) {
22-
super("Stream " + stream + " does not exist");
22+
super("Stream " + stream + " does not exist", Constants.RESPONSE_CODE_STREAM_DOES_NOT_EXIST);
2323
this.stream = stream;
2424
}
2525

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Copyright (c) 2022 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
14+
package com.rabbitmq.stream;
15+
16+
public interface StreamInfo {
17+
18+
long firstOffset();
19+
20+
long committedOffset();
21+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright (c) 2022 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
14+
package com.rabbitmq.stream;
15+
16+
public class StreamNotAvailableException extends StreamException {
17+
18+
private final String stream;
19+
20+
public StreamNotAvailableException(String stream) {
21+
super("Stream " + stream + " is not available", Constants.RESPONSE_CODE_STREAM_NOT_AVAILABLE);
22+
this.stream = stream;
23+
}
24+
}

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import static com.rabbitmq.stream.Constants.COMMAND_SASL_AUTHENTICATE;
3333
import static com.rabbitmq.stream.Constants.COMMAND_SASL_HANDSHAKE;
3434
import static com.rabbitmq.stream.Constants.COMMAND_STORE_OFFSET;
35+
import static com.rabbitmq.stream.Constants.COMMAND_STREAM_INFO;
3536
import static com.rabbitmq.stream.Constants.COMMAND_SUBSCRIBE;
3637
import static com.rabbitmq.stream.Constants.COMMAND_UNSUBSCRIBE;
3738
import static com.rabbitmq.stream.Constants.RESPONSE_CODE_AUTHENTICATION_FAILURE;
@@ -1354,6 +1355,31 @@ List<FrameHandlerInfo> exchangeCommandVersions() {
13541355
}
13551356
}
13561357

1358+
StreamInfoResponse streamInfo(String stream) {
1359+
if (stream == null) {
1360+
throw new IllegalArgumentException("stream must not be null");
1361+
}
1362+
int length = 2 + 2 + 4 + 2 + stream.length(); // API code, version, correlation ID, 1 string
1363+
int correlationId = correlationSequence.incrementAndGet();
1364+
try {
1365+
ByteBuf bb = allocate(length + 4);
1366+
bb.writeInt(length);
1367+
bb.writeShort(encodeRequestCode(COMMAND_STREAM_INFO));
1368+
bb.writeShort(VERSION_1);
1369+
bb.writeInt(correlationId);
1370+
bb.writeShort(stream.length());
1371+
bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8));
1372+
OutstandingRequest<StreamInfoResponse> request = new OutstandingRequest<>(this.rpcTimeout);
1373+
outstandingRequests.put(correlationId, request);
1374+
channel.writeAndFlush(bb);
1375+
request.block();
1376+
return request.response.get();
1377+
} catch (RuntimeException e) {
1378+
outstandingRequests.remove(correlationId);
1379+
throw new StreamException(e);
1380+
}
1381+
}
1382+
13571383
void shutdownReason(ShutdownReason reason) {
13581384
this.shutdownReason = reason;
13591385
}
@@ -1852,7 +1878,7 @@ static class QueryPublisherSequenceResponse extends Response {
18521878

18531879
private final long sequence;
18541880

1855-
public QueryPublisherSequenceResponse(short responseCode, long sequence) {
1881+
QueryPublisherSequenceResponse(short responseCode, long sequence) {
18561882
super(responseCode);
18571883
this.sequence = sequence;
18581884
}
@@ -1862,6 +1888,20 @@ public long getSequence() {
18621888
}
18631889
}
18641890

1891+
static class StreamInfoResponse extends Response {
1892+
1893+
private final Map<String, String> info;
1894+
1895+
StreamInfoResponse(short responseCode, Map<String, String> info) {
1896+
super(responseCode);
1897+
this.info = Collections.unmodifiableMap(new HashMap<>(info));
1898+
}
1899+
1900+
public Map<String, String> getInfo() {
1901+
return info;
1902+
}
1903+
}
1904+
18651905
public static class StreamMetadata {
18661906

18671907
private final String stream;

src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import static com.rabbitmq.stream.Constants.COMMAND_ROUTE;
3535
import static com.rabbitmq.stream.Constants.COMMAND_SASL_AUTHENTICATE;
3636
import static com.rabbitmq.stream.Constants.COMMAND_SASL_HANDSHAKE;
37+
import static com.rabbitmq.stream.Constants.COMMAND_STREAM_INFO;
3738
import static com.rabbitmq.stream.Constants.COMMAND_SUBSCRIBE;
3839
import static com.rabbitmq.stream.Constants.COMMAND_TUNE;
3940
import static com.rabbitmq.stream.Constants.COMMAND_UNSUBSCRIBE;
@@ -62,6 +63,7 @@
6263
import com.rabbitmq.stream.impl.Client.Response;
6364
import com.rabbitmq.stream.impl.Client.SaslAuthenticateResponse;
6465
import com.rabbitmq.stream.impl.Client.ShutdownContext.ShutdownReason;
66+
import com.rabbitmq.stream.impl.Client.StreamInfoResponse;
6567
import com.rabbitmq.stream.impl.Client.StreamMetadata;
6668
import com.rabbitmq.stream.impl.Client.SubscriptionOffset;
6769
import com.rabbitmq.stream.metrics.MetricsCollector;
@@ -132,6 +134,7 @@ class ServerFrameHandler {
132134
handlers.put(COMMAND_ROUTE, new RouteFrameHandler());
133135
handlers.put(COMMAND_PARTITIONS, new PartitionsFrameHandler());
134136
handlers.put(COMMAND_EXCHANGE_COMMAND_VERSIONS, new ExchangeCommandVersionsFrameHandler());
137+
handlers.put(COMMAND_STREAM_INFO, new StreamInfoFrameHandler());
135138
HANDLERS = new FrameHandler[maxCommandKey + 1][];
136139
handlers
137140
.entrySet()
@@ -1146,4 +1149,38 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
11461149
return read;
11471150
}
11481151
}
1152+
1153+
private static class StreamInfoFrameHandler extends BaseFrameHandler {
1154+
1155+
@Override
1156+
int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
1157+
int correlationId = message.readInt();
1158+
int read = 4;
1159+
1160+
short responseCode = message.readShort();
1161+
read += 2;
1162+
1163+
int infoCount = message.readInt();
1164+
read += 4;
1165+
Map<String, String> info = new LinkedHashMap<>(infoCount);
1166+
1167+
for (int i = 0; i < infoCount; i++) {
1168+
String key = readString(message);
1169+
read += 2 + key.length();
1170+
String value = readString(message);
1171+
read += 2 + value.length();
1172+
info.put(key, value);
1173+
}
1174+
1175+
OutstandingRequest<StreamInfoResponse> outstandingRequest =
1176+
remove(client.outstandingRequests, correlationId, StreamInfoResponse.class);
1177+
if (outstandingRequest == null) {
1178+
LOGGER.warn("Could not find outstanding request with correlation ID {}", correlationId);
1179+
} else {
1180+
outstandingRequest.response().set(new StreamInfoResponse(responseCode, info));
1181+
outstandingRequest.countDown();
1182+
}
1183+
return read;
1184+
}
1185+
}
11491186
}

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.rabbitmq.stream.impl;
1515

1616
import static com.rabbitmq.stream.impl.Utils.formatConstant;
17+
import static com.rabbitmq.stream.impl.Utils.propagateException;
1718
import static java.util.concurrent.TimeUnit.SECONDS;
1819

1920
import com.rabbitmq.stream.Address;
@@ -24,13 +25,16 @@
2425
import com.rabbitmq.stream.Environment;
2526
import com.rabbitmq.stream.MessageHandler;
2627
import com.rabbitmq.stream.MessageHandler.Context;
28+
import com.rabbitmq.stream.NoOffsetException;
2729
import com.rabbitmq.stream.OffsetSpecification;
2830
import com.rabbitmq.stream.ProducerBuilder;
2931
import com.rabbitmq.stream.StreamCreator;
3032
import com.rabbitmq.stream.StreamException;
33+
import com.rabbitmq.stream.StreamInfo;
3134
import com.rabbitmq.stream.SubscriptionListener;
3235
import com.rabbitmq.stream.compression.CompressionCodecFactory;
3336
import com.rabbitmq.stream.impl.Client.ClientParameters;
37+
import com.rabbitmq.stream.impl.Client.StreamInfoResponse;
3438
import com.rabbitmq.stream.impl.OffsetTrackingCoordinator.Registration;
3539
import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration;
3640
import com.rabbitmq.stream.impl.StreamEnvironmentBuilder.DefaultTlsConfiguration;
@@ -45,6 +49,7 @@
4549
import java.net.URLDecoder;
4650
import java.util.Collections;
4751
import java.util.List;
52+
import java.util.Map;
4853
import java.util.Random;
4954
import java.util.concurrent.CopyOnWriteArrayList;
5055
import java.util.concurrent.ExecutionException;
@@ -54,9 +59,11 @@
5459
import java.util.concurrent.TimeoutException;
5560
import java.util.concurrent.atomic.AtomicBoolean;
5661
import java.util.concurrent.atomic.AtomicReference;
62+
import java.util.function.BiFunction;
5763
import java.util.function.Consumer;
5864
import java.util.function.Function;
5965
import java.util.function.LongConsumer;
66+
import java.util.function.LongSupplier;
6067
import java.util.function.Supplier;
6168
import java.util.stream.Collectors;
6269
import javax.net.ssl.SSLException;
@@ -382,6 +389,59 @@ public void deleteStream(String stream) {
382389
}
383390
}
384391

392+
@Override
393+
public StreamInfo queryStreamInfo(String stream) {
394+
StreamInfoResponse response = locatorOperation(client -> client.streamInfo(stream));
395+
if (response.isOk()) {
396+
Map<String, String> info = response.getInfo();
397+
BiFunction<String, String, LongSupplier> offsetSupplierLogic =
398+
(key, message) -> {
399+
if (!info.containsKey(key) || "-1".equals(info.get(key))) {
400+
return () -> {
401+
throw new NoOffsetException(message);
402+
};
403+
} else {
404+
try {
405+
long firstOffset = Long.parseUnsignedLong(info.get(key));
406+
return () -> firstOffset;
407+
} catch (NumberFormatException e) {
408+
return () -> {
409+
throw new NoOffsetException(message);
410+
};
411+
}
412+
}
413+
};
414+
LongSupplier firstOffsetSupplier =
415+
offsetSupplierLogic.apply("first_offset", "No first offset for stream " + stream);
416+
LongSupplier committedOffsetSupplier =
417+
offsetSupplierLogic.apply("committed_offset", "No committed offset for stream " + stream);
418+
return new DefaultStreamInfo(firstOffsetSupplier, committedOffsetSupplier);
419+
} else {
420+
throw propagateException(response.getResponseCode(), stream);
421+
}
422+
}
423+
424+
private static class DefaultStreamInfo implements StreamInfo {
425+
426+
private final LongSupplier firstOffsetSupplier, committedOffsetSupplier;
427+
428+
private DefaultStreamInfo(
429+
LongSupplier firstOffsetSupplier, LongSupplier committedOffsetSupplier) {
430+
this.firstOffsetSupplier = firstOffsetSupplier;
431+
this.committedOffsetSupplier = committedOffsetSupplier;
432+
}
433+
434+
@Override
435+
public long firstOffset() {
436+
return firstOffsetSupplier.getAsLong();
437+
}
438+
439+
@Override
440+
public long committedOffset() {
441+
return committedOffsetSupplier.getAsLong();
442+
}
443+
}
444+
385445
@Override
386446
public ProducerBuilder producerBuilder() {
387447
return new StreamProducerBuilder(this);

src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2021-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").

src/main/java/com/rabbitmq/stream/impl/Utils.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515

1616
import com.rabbitmq.stream.Address;
1717
import com.rabbitmq.stream.Constants;
18+
import com.rabbitmq.stream.StreamDoesNotExistException;
19+
import com.rabbitmq.stream.StreamException;
20+
import com.rabbitmq.stream.StreamNotAvailableException;
1821
import com.rabbitmq.stream.impl.Client.ClientParameters;
1922
import java.security.cert.X509Certificate;
2023
import java.time.Duration;
@@ -281,4 +284,15 @@ static int versionCompare(String str1, String str2) {
281284
static boolean is3_11_OrMore(String brokerVersion) {
282285
return versionCompare(currentVersion(brokerVersion), "3.11.0") >= 0;
283286
}
287+
288+
static StreamException propagateException(short responseCode, String stream) {
289+
if (responseCode == Constants.RESPONSE_CODE_STREAM_DOES_NOT_EXIST) {
290+
return new StreamDoesNotExistException(stream);
291+
} else if (responseCode == Constants.RESPONSE_CODE_STREAM_NOT_AVAILABLE) {
292+
return new StreamNotAvailableException(stream);
293+
} else {
294+
String message = "Error while querying stream info: " + formatConstant(responseCode) + ".";
295+
return new StreamException(message, responseCode);
296+
}
297+
}
284298
}

src/test/java/com/rabbitmq/stream/impl/AmqpInteroperabilityTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").

0 commit comments

Comments
 (0)