Skip to content

Commit 59e6a0c

Browse files
committed
Support consumer update command
References rabbitmq/rabbitmq-server#3753 Conflicts: src/main/java/com/rabbitmq/stream/Constants.java src/main/java/com/rabbitmq/stream/impl/Client.java src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java
1 parent b40739d commit 59e6a0c

File tree

6 files changed

+156
-6
lines changed

6 files changed

+156
-6
lines changed

src/main/java/com/rabbitmq/stream/Constants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public final class Constants {
6767
public static final short COMMAND_HEARTBEAT = 23;
6868
public static final short COMMAND_ROUTE = 24;
6969
public static final short COMMAND_PARTITIONS = 25;
70+
public static final short COMMAND_CONSUMER_UPDATE = 26;
7071
public static final short COMMAND_EXCHANGE_COMMAND_VERSIONS = 27;
7172
public static final short COMMAND_STREAM_STATS = 28;
7273

src/main/java/com/rabbitmq/stream/OffsetSpecification.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
/** API to specify an offset in a stream. */
1919
public class OffsetSpecification {
2020

21+
private static final short TYPE_NONE = 0;
2122
private static final short TYPE_FIRST = 1;
2223
private static final short TYPE_LAST = 2;
2324
private static final short TYPE_NEXT = 3;
@@ -26,6 +27,7 @@ public class OffsetSpecification {
2627

2728
private static final long UNUSED_OFFSET = -1;
2829

30+
private static final OffsetSpecification NONE = new OffsetSpecification(TYPE_NONE, UNUSED_OFFSET);
2931
private static final OffsetSpecification FIRST =
3032
new OffsetSpecification(TYPE_FIRST, UNUSED_OFFSET);
3133
private static final OffsetSpecification LAST = new OffsetSpecification(TYPE_LAST, UNUSED_OFFSET);
@@ -39,6 +41,15 @@ private OffsetSpecification(short type, long offset) {
3941
this.offset = offset;
4042
}
4143

44+
/**
45+
* When the offset specification is not relevant.
46+
*
47+
* @return none offset specification
48+
*/
49+
public static OffsetSpecification none() {
50+
return NONE;
51+
}
52+
4253
/**
4354
* The first available offset in the stream.
4455
*

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.rabbitmq.stream.impl;
1515

1616
import static com.rabbitmq.stream.Constants.COMMAND_CLOSE;
17+
import static com.rabbitmq.stream.Constants.COMMAND_CONSUMER_UPDATE;
1718
import static com.rabbitmq.stream.Constants.COMMAND_CREATE_STREAM;
1819
import static com.rabbitmq.stream.Constants.COMMAND_CREDIT;
1920
import static com.rabbitmq.stream.Constants.COMMAND_DECLARE_PUBLISHER;
@@ -41,6 +42,7 @@
4142
import static com.rabbitmq.stream.Constants.RESPONSE_CODE_SASL_CHALLENGE;
4243
import static com.rabbitmq.stream.Constants.VERSION_1;
4344
import static com.rabbitmq.stream.impl.Utils.encodeRequestCode;
45+
import static com.rabbitmq.stream.impl.Utils.encodeResponseCode;
4446
import static com.rabbitmq.stream.impl.Utils.extractResponseCode;
4547
import static com.rabbitmq.stream.impl.Utils.formatConstant;
4648
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -163,6 +165,7 @@ public class Client implements AutoCloseable {
163165
final ChunkListener chunkListener;
164166
final MessageListener messageListener;
165167
final CreditNotification creditNotification;
168+
final ConsumerUpdateListener consumerUpdateListener;
166169
final MetadataListener metadataListener;
167170
final Codec codec;
168171
final Channel channel;
@@ -220,6 +223,7 @@ public Client(ClientParameters parameters) {
220223
this.chunkChecksum = parameters.chunkChecksum;
221224
this.metricsCollector = parameters.metricsCollector;
222225
this.metadataListener = parameters.metadataListener;
226+
this.consumerUpdateListener = parameters.consumerUpdateListener;
223227
this.compressionCodecFactory =
224228
parameters.compressionCodecFactory == null
225229
? compression -> null
@@ -1380,6 +1384,29 @@ StreamInfoResponse streamStats(String stream) {
13801384
}
13811385
}
13821386

1387+
public void consumerUpdateResponse(
1388+
int correlationId, short responseCode, OffsetSpecification offsetSpecification) {
1389+
offsetSpecification =
1390+
offsetSpecification == null ? OffsetSpecification.none() : offsetSpecification;
1391+
int length = 2 + 2 + 4 + 2 + 2; // API code, version, correlation ID, response code, offset type
1392+
1393+
if (offsetSpecification.isOffset() || offsetSpecification.isTimestamp()) {
1394+
length += 8;
1395+
}
1396+
1397+
ByteBuf bb = allocate(length + 4);
1398+
bb.writeInt(length);
1399+
bb.writeShort(encodeResponseCode(COMMAND_CONSUMER_UPDATE));
1400+
bb.writeShort(VERSION_1);
1401+
bb.writeInt(correlationId);
1402+
bb.writeShort(responseCode);
1403+
bb.writeShort(offsetSpecification.getType());
1404+
if (offsetSpecification.isOffset() || offsetSpecification.isTimestamp()) {
1405+
bb.writeLong(offsetSpecification.getOffset());
1406+
}
1407+
channel.writeAndFlush(bb);
1408+
}
1409+
13831410
void shutdownReason(ShutdownReason reason) {
13841411
this.shutdownReason = reason;
13851412
}
@@ -1482,6 +1509,11 @@ public interface CreditNotification {
14821509
void handle(byte subscriptionId, short responseCode);
14831510
}
14841511

1512+
public interface ConsumerUpdateListener {
1513+
1514+
OffsetSpecification handle(Client client, byte subscriptionId, boolean active);
1515+
}
1516+
14851517
public interface ShutdownListener {
14861518

14871519
void handle(ShutdownContext shutdownContext);
@@ -2017,6 +2049,8 @@ public static class ClientParameters {
20172049
"Received notification for subscription {}: {}",
20182050
subscriptionId,
20192051
Utils.formatConstant(responseCode));
2052+
private ConsumerUpdateListener consumerUpdateListener =
2053+
(client, subscriptionId, active) -> null;
20202054
private ShutdownListener shutdownListener = shutdownContext -> {};
20212055
private SaslConfiguration saslConfiguration = DefaultSaslConfiguration.PLAIN;
20222056
private CredentialsProvider credentialsProvider =
@@ -2064,6 +2098,11 @@ public ClientParameters creditNotification(CreditNotification creditNotification
20642098
return this;
20652099
}
20662100

2101+
public ClientParameters consumerUpdateListener(ConsumerUpdateListener consumerUpdateListener) {
2102+
this.consumerUpdateListener = consumerUpdateListener;
2103+
return this;
2104+
}
2105+
20672106
public ClientParameters codec(Codec codec) {
20682107
this.codec = codec;
20692108
return this;

src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.rabbitmq.stream.impl;
1515

1616
import static com.rabbitmq.stream.Constants.COMMAND_CLOSE;
17+
import static com.rabbitmq.stream.Constants.COMMAND_CONSUMER_UPDATE;
1718
import static com.rabbitmq.stream.Constants.COMMAND_CREATE_STREAM;
1819
import static com.rabbitmq.stream.Constants.COMMAND_CREDIT;
1920
import static com.rabbitmq.stream.Constants.COMMAND_DECLARE_PUBLISHER;
@@ -50,6 +51,7 @@
5051
import com.rabbitmq.stream.Codec;
5152
import com.rabbitmq.stream.Constants;
5253
import com.rabbitmq.stream.Message;
54+
import com.rabbitmq.stream.OffsetSpecification;
5355
import com.rabbitmq.stream.StreamException;
5456
import com.rabbitmq.stream.compression.Compression;
5557
import com.rabbitmq.stream.compression.CompressionCodec;
@@ -133,6 +135,7 @@ class ServerFrameHandler {
133135
handlers.put(COMMAND_QUERY_PUBLISHER_SEQUENCE, new QueryPublisherSequenceFrameHandler());
134136
handlers.put(COMMAND_ROUTE, new RouteFrameHandler());
135137
handlers.put(COMMAND_PARTITIONS, new PartitionsFrameHandler());
138+
handlers.put(COMMAND_CONSUMER_UPDATE, new ConsumerUpdateFrameHandler());
136139
handlers.put(COMMAND_EXCHANGE_COMMAND_VERSIONS, new ExchangeCommandVersionsFrameHandler());
137140
handlers.put(COMMAND_STREAM_STATS, new StreamStatsFrameHandler());
138141
HANDLERS = new FrameHandler[maxCommandKey + 1][];
@@ -723,6 +726,27 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
723726
}
724727
}
725728

729+
private static class ConsumerUpdateFrameHandler extends BaseFrameHandler {
730+
731+
@Override
732+
int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
733+
int correlationId = message.readInt();
734+
int read = 4;
735+
byte subscriptionId = message.readByte();
736+
read += 1;
737+
byte activeByte = message.readByte();
738+
read += 1;
739+
740+
OffsetSpecification offsetSpecification =
741+
client.consumerUpdateListener.handle(
742+
client, subscriptionId, activeByte == 1 ? true : false);
743+
744+
client.consumerUpdateResponse(correlationId, RESPONSE_CODE_OK, offsetSpecification);
745+
746+
return read;
747+
}
748+
}
749+
726750
private static class PeerPropertiesFrameHandler extends BaseFrameHandler {
727751

728752
@Override
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
14+
package com.rabbitmq.stream.impl;
15+
16+
import static com.rabbitmq.stream.impl.TestUtils.b;
17+
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
18+
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import com.rabbitmq.stream.OffsetSpecification;
22+
import com.rabbitmq.stream.impl.Client.ClientParameters;
23+
import com.rabbitmq.stream.impl.Client.Response;
24+
import java.util.HashMap;
25+
import java.util.Map;
26+
import java.util.concurrent.ConcurrentHashMap;
27+
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.atomic.AtomicInteger;
29+
import org.junit.jupiter.api.Test;
30+
import org.junit.jupiter.api.extension.ExtendWith;
31+
32+
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
33+
public class SingleActiveConsumerTest {
34+
35+
String stream;
36+
TestUtils.ClientFactory cf;
37+
38+
@Test
39+
void subscribe() throws Exception {
40+
int messageCount = 10000;
41+
Map<Byte, Boolean> consumerStates = new ConcurrentHashMap<>();
42+
Map<Byte, AtomicInteger> receivedMessages = new ConcurrentHashMap<>();
43+
receivedMessages.put(b(0), new AtomicInteger(0));
44+
receivedMessages.put(b(1), new AtomicInteger(0));
45+
CountDownLatch consumerUpdateLatch = new CountDownLatch(2);
46+
ClientParameters clientParameters =
47+
new ClientParameters()
48+
.chunkListener(
49+
(client, subscriptionId, offset, msgCount, dataSize) ->
50+
client.credit(subscriptionId, 1))
51+
.messageListener(
52+
(subscriptionId, offset, chunkTimestamp, committedChunkId, message) ->
53+
receivedMessages.get(subscriptionId).incrementAndGet())
54+
.consumerUpdateListener(
55+
(client, subscriptionId, active) -> {
56+
consumerStates.put(subscriptionId, active);
57+
consumerUpdateLatch.countDown();
58+
return null;
59+
});
60+
Client client = cf.get(clientParameters);
61+
62+
TestUtils.publishAndWaitForConfirms(cf, messageCount, stream);
63+
64+
Map<String, String> parameters = new HashMap<>();
65+
parameters.put("single-active-consumer", "true");
66+
parameters.put("name", "foo");
67+
Response response = client.subscribe(b(0), stream, OffsetSpecification.first(), 2, parameters);
68+
assertThat(response.isOk()).isTrue();
69+
response = client.subscribe(b(1), stream, OffsetSpecification.first(), 2, parameters);
70+
assertThat(response.isOk()).isTrue();
71+
latchAssert(consumerUpdateLatch).completes();
72+
assertThat(consumerStates)
73+
.hasSize(2)
74+
.containsEntry(b(0), Boolean.TRUE)
75+
.containsEntry(b(1), Boolean.FALSE);
76+
77+
waitAtMost(
78+
() -> receivedMessages.getOrDefault(b(0), new AtomicInteger(0)).get() == messageCount);
79+
}
80+
}

src/test/java/com/rabbitmq/stream/impl/TestUtils.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -195,12 +195,7 @@ static void publishAndWaitForConfirms(
195195
client.publish(b(1), Collections.singletonList(message));
196196
}
197197

198-
try {
199-
assertThat(latchConfirm.await(60, SECONDS)).isTrue();
200-
} catch (InterruptedException e) {
201-
Thread.currentThread().interrupt();
202-
throw new RuntimeException(e);
203-
}
198+
latchAssert(latchConfirm).completes(Duration.ofSeconds(60));
204199
}
205200

206201
static Consumer<Object> namedTask(TaskWithException task, String description) {

0 commit comments

Comments
 (0)