Skip to content

Commit 8688fe9

Browse files
committed
Support consumer update command
References rabbitmq/rabbitmq-server#3753
1 parent e4612bf commit 8688fe9

File tree

6 files changed

+157
-7
lines changed

6 files changed

+157
-7
lines changed

src/main/java/com/rabbitmq/stream/Constants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public final class Constants {
6666
public static final short COMMAND_HEARTBEAT = 23;
6767
public static final short COMMAND_ROUTE = 24;
6868
public static final short COMMAND_PARTITIONS = 25;
69+
public static final short COMMAND_CONSUMER_UPDATE = 26;
6970

7071
public static final short VERSION_1 = 1;
7172

src/main/java/com/rabbitmq/stream/OffsetSpecification.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
/** API to specify an offset in a stream. */
1919
public class OffsetSpecification {
2020

21+
private static final short TYPE_NONE = 0;
2122
private static final short TYPE_FIRST = 1;
2223
private static final short TYPE_LAST = 2;
2324
private static final short TYPE_NEXT = 3;
@@ -26,6 +27,7 @@ public class OffsetSpecification {
2627

2728
private static final long UNUSED_OFFSET = -1;
2829

30+
private static final OffsetSpecification NONE = new OffsetSpecification(TYPE_NONE, UNUSED_OFFSET);
2931
private static final OffsetSpecification FIRST =
3032
new OffsetSpecification(TYPE_FIRST, UNUSED_OFFSET);
3133
private static final OffsetSpecification LAST = new OffsetSpecification(TYPE_LAST, UNUSED_OFFSET);
@@ -39,6 +41,15 @@ private OffsetSpecification(short type, long offset) {
3941
this.offset = offset;
4042
}
4143

44+
/**
45+
* When the offset specification is not relevant.
46+
*
47+
* @return none offset specification
48+
*/
49+
public static OffsetSpecification none() {
50+
return NONE;
51+
}
52+
4253
/**
4354
* The first available offset in the stream.
4455
*

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.rabbitmq.stream.impl;
1515

1616
import static com.rabbitmq.stream.Constants.COMMAND_CLOSE;
17+
import static com.rabbitmq.stream.Constants.COMMAND_CONSUMER_UPDATE;
1718
import static com.rabbitmq.stream.Constants.COMMAND_CREATE_STREAM;
1819
import static com.rabbitmq.stream.Constants.COMMAND_CREDIT;
1920
import static com.rabbitmq.stream.Constants.COMMAND_DECLARE_PUBLISHER;
@@ -39,6 +40,7 @@
3940
import static com.rabbitmq.stream.Constants.RESPONSE_CODE_SASL_CHALLENGE;
4041
import static com.rabbitmq.stream.Constants.VERSION_1;
4142
import static com.rabbitmq.stream.impl.Utils.encodeRequestCode;
43+
import static com.rabbitmq.stream.impl.Utils.encodeResponseCode;
4244
import static com.rabbitmq.stream.impl.Utils.extractResponseCode;
4345
import static com.rabbitmq.stream.impl.Utils.formatConstant;
4446
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -159,6 +161,7 @@ public class Client implements AutoCloseable {
159161
final ChunkListener chunkListener;
160162
final MessageListener messageListener;
161163
final CreditNotification creditNotification;
164+
final ConsumerUpdateListener consumerUpdateListener;
162165
final MetadataListener metadataListener;
163166
final Codec codec;
164167
final Channel channel;
@@ -216,6 +219,7 @@ public Client(ClientParameters parameters) {
216219
this.chunkChecksum = parameters.chunkChecksum;
217220
this.metricsCollector = parameters.metricsCollector;
218221
this.metadataListener = parameters.metadataListener;
222+
this.consumerUpdateListener = parameters.consumerUpdateListener;
219223
this.compressionCodecFactory =
220224
parameters.compressionCodecFactory == null
221225
? compression -> null
@@ -1279,7 +1283,7 @@ public List<String> partitions(String superStream) {
12791283
throw new IllegalArgumentException("stream must not be null");
12801284
}
12811285
int length =
1282-
2 + 2 + 4 + +2 + superStream.length(); // API code, version, correlation ID, 1 string
1286+
2 + 2 + 4 + 2 + superStream.length(); // API code, version, correlation ID, 1 string
12831287
int correlationId = correlationSequence.incrementAndGet();
12841288
try {
12851289
ByteBuf bb = allocate(length + 4);
@@ -1300,6 +1304,29 @@ public List<String> partitions(String superStream) {
13001304
}
13011305
}
13021306

1307+
public void consumerUpdateResponse(
1308+
int correlationId, short responseCode, OffsetSpecification offsetSpecification) {
1309+
offsetSpecification =
1310+
offsetSpecification == null ? OffsetSpecification.none() : offsetSpecification;
1311+
int length = 2 + 2 + 4 + 2 + 2; // API code, version, correlation ID, response code, offset type
1312+
1313+
if (offsetSpecification.isOffset() || offsetSpecification.isTimestamp()) {
1314+
length += 8;
1315+
}
1316+
1317+
ByteBuf bb = allocate(length + 4);
1318+
bb.writeInt(length);
1319+
bb.writeShort(encodeResponseCode(COMMAND_CONSUMER_UPDATE));
1320+
bb.writeShort(VERSION_1);
1321+
bb.writeInt(correlationId);
1322+
bb.writeShort(responseCode);
1323+
bb.writeShort(offsetSpecification.getType());
1324+
if (offsetSpecification.isOffset() || offsetSpecification.isTimestamp()) {
1325+
bb.writeLong(offsetSpecification.getOffset());
1326+
}
1327+
channel.writeAndFlush(bb);
1328+
}
1329+
13031330
void shutdownReason(ShutdownReason reason) {
13041331
this.shutdownReason = reason;
13051332
}
@@ -1397,6 +1424,11 @@ public interface CreditNotification {
13971424
void handle(byte subscriptionId, short responseCode);
13981425
}
13991426

1427+
public interface ConsumerUpdateListener {
1428+
1429+
OffsetSpecification handle(Client client, byte subscriptionId, boolean active);
1430+
}
1431+
14001432
public interface ShutdownListener {
14011433

14021434
void handle(ShutdownContext shutdownContext);
@@ -1918,6 +1950,8 @@ public static class ClientParameters {
19181950
"Received notification for subscription {}: {}",
19191951
subscriptionId,
19201952
Utils.formatConstant(responseCode));
1953+
private ConsumerUpdateListener consumerUpdateListener =
1954+
(client, subscriptionId, active) -> null;
19211955
private ShutdownListener shutdownListener = shutdownContext -> {};
19221956
private SaslConfiguration saslConfiguration = DefaultSaslConfiguration.PLAIN;
19231957
private CredentialsProvider credentialsProvider =
@@ -1965,6 +1999,11 @@ public ClientParameters creditNotification(CreditNotification creditNotification
19651999
return this;
19662000
}
19672001

2002+
public ClientParameters consumerUpdateListener(ConsumerUpdateListener consumerUpdateListener) {
2003+
this.consumerUpdateListener = consumerUpdateListener;
2004+
return this;
2005+
}
2006+
19682007
public ClientParameters codec(Codec codec) {
19692008
this.codec = codec;
19702009
return this;

src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.rabbitmq.stream.impl;
1515

1616
import static com.rabbitmq.stream.Constants.COMMAND_CLOSE;
17+
import static com.rabbitmq.stream.Constants.COMMAND_CONSUMER_UPDATE;
1718
import static com.rabbitmq.stream.Constants.COMMAND_CREATE_STREAM;
1819
import static com.rabbitmq.stream.Constants.COMMAND_CREDIT;
1920
import static com.rabbitmq.stream.Constants.COMMAND_DECLARE_PUBLISHER;
@@ -46,6 +47,7 @@
4647
import com.rabbitmq.stream.ChunkChecksumValidationException;
4748
import com.rabbitmq.stream.Codec;
4849
import com.rabbitmq.stream.Message;
50+
import com.rabbitmq.stream.OffsetSpecification;
4951
import com.rabbitmq.stream.StreamException;
5052
import com.rabbitmq.stream.compression.Compression;
5153
import com.rabbitmq.stream.compression.CompressionCodec;
@@ -112,6 +114,7 @@ class ServerFrameHandler {
112114
handlers.put(COMMAND_QUERY_PUBLISHER_SEQUENCE, new QueryPublisherSequenceFrameHandler());
113115
handlers.put(COMMAND_ROUTE, new RouteFrameHandler());
114116
handlers.put(COMMAND_PARTITIONS, new PartitionsFrameHandler());
117+
handlers.put(COMMAND_CONSUMER_UPDATE, new ConsumerUpdateFrameHandler());
115118
HANDLERS = new FrameHandler[handlers.size() + 10];
116119
handlers.entrySet().forEach(entry -> HANDLERS[entry.getKey()] = entry.getValue());
117120
}
@@ -575,6 +578,27 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
575578
}
576579
}
577580

581+
private static class ConsumerUpdateFrameHandler extends BaseFrameHandler {
582+
583+
@Override
584+
int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
585+
int correlationId = message.readInt();
586+
int read = 4;
587+
byte subscriptionId = message.readByte();
588+
read += 1;
589+
byte activeByte = message.readByte();
590+
read += 1;
591+
592+
OffsetSpecification offsetSpecification =
593+
client.consumerUpdateListener.handle(
594+
client, subscriptionId, activeByte == 1 ? true : false);
595+
596+
client.consumerUpdateResponse(correlationId, RESPONSE_CODE_OK, offsetSpecification);
597+
598+
return read;
599+
}
600+
}
601+
578602
private static class PeerPropertiesFrameHandler extends BaseFrameHandler {
579603

580604
@Override
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
14+
package com.rabbitmq.stream.impl;
15+
16+
import static com.rabbitmq.stream.impl.TestUtils.b;
17+
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
18+
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import com.rabbitmq.stream.OffsetSpecification;
22+
import com.rabbitmq.stream.impl.Client.ClientParameters;
23+
import com.rabbitmq.stream.impl.Client.Response;
24+
import java.util.HashMap;
25+
import java.util.Map;
26+
import java.util.concurrent.ConcurrentHashMap;
27+
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.atomic.AtomicInteger;
29+
import org.junit.jupiter.api.Test;
30+
import org.junit.jupiter.api.extension.ExtendWith;
31+
32+
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
33+
public class SingleActiveConsumerTest {
34+
35+
String stream;
36+
TestUtils.ClientFactory cf;
37+
38+
@Test
39+
void subscribe() throws Exception {
40+
int messageCount = 10000;
41+
Map<Byte, Boolean> consumerStates = new ConcurrentHashMap<>();
42+
Map<Byte, AtomicInteger> receivedMessages = new ConcurrentHashMap<>();
43+
receivedMessages.put(b(0), new AtomicInteger(0));
44+
receivedMessages.put(b(1), new AtomicInteger(0));
45+
CountDownLatch consumerUpdateLatch = new CountDownLatch(2);
46+
ClientParameters clientParameters =
47+
new ClientParameters()
48+
.chunkListener(
49+
(client, subscriptionId, offset, msgCount, dataSize) ->
50+
client.credit(subscriptionId, 1))
51+
.messageListener(
52+
(subscriptionId, offset, chunkTimestamp, message) ->
53+
receivedMessages.get(subscriptionId).incrementAndGet())
54+
.consumerUpdateListener(
55+
(client, subscriptionId, active) -> {
56+
consumerStates.put(subscriptionId, active);
57+
consumerUpdateLatch.countDown();
58+
return null;
59+
});
60+
Client client = cf.get(clientParameters);
61+
62+
TestUtils.publishAndWaitForConfirms(cf, messageCount, stream);
63+
64+
Map<String, String> parameters = new HashMap<>();
65+
parameters.put("single-active-consumer", "true");
66+
parameters.put("name", "foo");
67+
Response response = client.subscribe(b(0), stream, OffsetSpecification.first(), 2, parameters);
68+
assertThat(response.isOk()).isTrue();
69+
response = client.subscribe(b(1), stream, OffsetSpecification.first(), 2, parameters);
70+
assertThat(response.isOk()).isTrue();
71+
latchAssert(consumerUpdateLatch).completes();
72+
assertThat(consumerStates)
73+
.hasSize(2)
74+
.containsEntry(b(0), Boolean.TRUE)
75+
.containsEntry(b(1), Boolean.FALSE);
76+
77+
waitAtMost(
78+
() -> receivedMessages.getOrDefault(b(0), new AtomicInteger(0)).get() == messageCount);
79+
}
80+
}

src/test/java/com/rabbitmq/stream/impl/TestUtils.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -170,12 +170,7 @@ static void publishAndWaitForConfirms(
170170
client.publish(b(1), Collections.singletonList(message));
171171
}
172172

173-
try {
174-
assertThat(latchConfirm.await(60, SECONDS)).isTrue();
175-
} catch (InterruptedException e) {
176-
Thread.currentThread().interrupt();
177-
throw new RuntimeException(e);
178-
}
173+
latchAssert(latchConfirm).completes(Duration.ofSeconds(60));
179174
}
180175

181176
static Consumer<Object> namedTask(TaskWithException task, String description) {

0 commit comments

Comments
 (0)