Skip to content

Commit 64b4051

Browse files
authored
Merge pull request #612 from rabbitmq/notify-sac-it-is-no-longer-active
Notify SAC when it is no longer active
2 parents bd3cec6 + 16ffff6 commit 64b4051

File tree

8 files changed

+374
-52
lines changed

8 files changed

+374
-52
lines changed

src/main/java/com/rabbitmq/stream/impl/Client.java

+5-6
Original file line numberDiff line numberDiff line change
@@ -303,12 +303,7 @@ public void initChannel(SocketChannel ch) {
303303
});
304304

305305
ChannelFuture f;
306-
String clientConnectionName =
307-
parameters.clientProperties == null
308-
? ""
309-
: (parameters.clientProperties.containsKey("connection_name")
310-
? parameters.clientProperties.get("connection_name")
311-
: "");
306+
String clientConnectionName = parameters.clientProperties.getOrDefault("connection_name", "");
312307
try {
313308
LOGGER.debug(
314309
"Trying to create stream connection to {}:{}, with client connection name '{}'",
@@ -1505,6 +1500,10 @@ String connectionName() {
15051500
return builder.append(serverAddress()).toString();
15061501
}
15071502

1503+
String clientConnectionName() {
1504+
return this.clientConnectionName;
1505+
}
1506+
15081507
private String serverAddress() {
15091508
SocketAddress remoteAddress = remoteAddress();
15101509
if (remoteAddress instanceof InetSocketAddress) {

src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java

+23
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,7 @@ public void close() {
481481
}
482482

483483
void closeFromEnvironment() {
484+
this.maybeNotifyActiveToInactiveSac();
484485
LOGGER.debug("Calling consumer {} closing callback (stream {})", this.id, this.stream);
485486
this.closingCallback.run();
486487
closed.set(true);
@@ -490,6 +491,7 @@ void closeFromEnvironment() {
490491

491492
void closeAfterStreamDeletion() {
492493
if (closed.compareAndSet(false, true)) {
494+
this.maybeNotifyActiveToInactiveSac();
493495
this.environment.removeConsumer(this);
494496
this.status = Status.CLOSED;
495497
}
@@ -506,11 +508,23 @@ void setTrackingClient(Client client) {
506508
void setSubscriptionClient(Client client) {
507509
this.subscriptionClient = client;
508510
if (client == null && this.isSac()) {
511+
maybeNotifyActiveToInactiveSac();
509512
// we lost the connection
510513
this.sacActive = false;
511514
}
512515
}
513516

517+
private void maybeNotifyActiveToInactiveSac() {
518+
if (this.isSac() && this.sacActive) {
519+
LOGGER.debug(
520+
"Single active consumer {} from stream {} with name {} is unavailable, calling consumer update listener",
521+
this.id,
522+
this.stream,
523+
this.name);
524+
this.consumerUpdate(false);
525+
}
526+
}
527+
514528
synchronized void unavailable() {
515529
this.status = Status.NOT_AVAILABLE;
516530
this.trackingClient = null;
@@ -623,4 +637,13 @@ private void checkNotClosed() {
623637
long id() {
624638
return this.id;
625639
}
640+
641+
String subscriptionConnectionName() {
642+
Client client = this.subscriptionClient;
643+
if (client == null) {
644+
return "<no-connection>";
645+
} else {
646+
return client.clientConnectionName();
647+
}
648+
}
626649
}

src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java

+4
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,10 @@ public void store(long offset) {
187187
"Consumer#store(long) does not work for super streams, use MessageHandler.Context#storeOffset() instead");
188188
}
189189

190+
Consumer consumer(String partition) {
191+
return this.consumers.get(partition);
192+
}
193+
190194
@Override
191195
public long storedOffset() {
192196
throw new UnsupportedOperationException(

src/test/java/com/rabbitmq/stream/Host.java

+51-26
Original file line numberDiff line numberDiff line change
@@ -47,30 +47,34 @@ public static String capture(InputStream is) throws IOException {
4747
return buff.toString();
4848
}
4949

50-
private static Process executeCommand(String command) throws IOException {
50+
private static Process executeCommand(String command) {
5151
return executeCommand(command, false);
5252
}
5353

54-
private static Process executeCommand(String command, boolean ignoreError) throws IOException {
55-
Process pr = executeCommandProcess(command);
56-
57-
int ev = waitForExitValue(pr);
58-
if (ev != 0 && !ignoreError) {
59-
String stdout = capture(pr.getInputStream());
60-
String stderr = capture(pr.getErrorStream());
61-
throw new IOException(
62-
"unexpected command exit value: "
63-
+ ev
64-
+ "\ncommand: "
65-
+ command
66-
+ "\n"
67-
+ "\nstdout:\n"
68-
+ stdout
69-
+ "\nstderr:\n"
70-
+ stderr
71-
+ "\n");
54+
private static Process executeCommand(String command, boolean ignoreError) {
55+
try {
56+
Process pr = executeCommandProcess(command);
57+
58+
int ev = waitForExitValue(pr);
59+
if (ev != 0 && !ignoreError) {
60+
String stdout = capture(pr.getInputStream());
61+
String stderr = capture(pr.getErrorStream());
62+
throw new IOException(
63+
"unexpected command exit value: "
64+
+ ev
65+
+ "\ncommand: "
66+
+ command
67+
+ "\n"
68+
+ "\nstdout:\n"
69+
+ stdout
70+
+ "\nstderr:\n"
71+
+ stderr
72+
+ "\n");
73+
}
74+
return pr;
75+
} catch (IOException e) {
76+
throw new RuntimeException(e);
7277
}
73-
return pr;
7478
}
7579

7680
public static String hostname() throws IOException {
@@ -110,6 +114,10 @@ public static Process rabbitmqctl(String command) throws IOException {
110114
return executeCommand(rabbitmqctlCommand() + " " + command);
111115
}
112116

117+
static Process rabbitmqStreams(String command) {
118+
return executeCommand(rabbitmqStreamsCommand() + " " + command);
119+
}
120+
113121
public static Process rabbitmqctlIgnoreError(String command) throws IOException {
114122
return executeCommand(rabbitmqctlCommand() + " " + command, true);
115123
}
@@ -189,11 +197,19 @@ static List<ConnectionInfo> toConnectionInfoList(String json) {
189197
return GSON.fromJson(json, new TypeToken<List<ConnectionInfo>>() {}.getType());
190198
}
191199

192-
public static Process killStreamLeaderProcess(String stream) throws IOException {
193-
return rabbitmqctl(
194-
"eval 'case rabbit_stream_manager:lookup_leader(<<\"/\">>, <<\""
195-
+ stream
196-
+ "\">>) of {ok, Pid} -> exit(Pid, kill); Pid -> exit(Pid, kill) end.'");
200+
public static void restartStream(String stream) {
201+
rabbitmqStreams(" restart_stream " + stream);
202+
}
203+
204+
public static Process killStreamLeaderProcess(String stream) {
205+
try {
206+
return rabbitmqctl(
207+
"eval 'case rabbit_stream_manager:lookup_leader(<<\"/\">>, <<\""
208+
+ stream
209+
+ "\">>) of {ok, Pid} -> exit(Pid, kill); Pid -> exit(Pid, kill) end.'");
210+
} catch (IOException e) {
211+
throw new RuntimeException(e);
212+
}
197213
}
198214

199215
public static void addUser(String username, String password) throws IOException {
@@ -243,7 +259,7 @@ public static void setEnv(String parameter, String value) throws IOException {
243259
public static String rabbitmqctlCommand() {
244260
String rabbitmqCtl = System.getProperty("rabbitmqctl.bin");
245261
if (rabbitmqCtl == null) {
246-
throw new IllegalStateException("Please define the rabbitmqctl.bin system property");
262+
rabbitmqCtl = DOCKER_PREFIX + "rabbitmq";
247263
}
248264
if (rabbitmqCtl.startsWith(DOCKER_PREFIX)) {
249265
String containerId = rabbitmqCtl.split(":")[1];
@@ -253,6 +269,15 @@ public static String rabbitmqctlCommand() {
253269
}
254270
}
255271

272+
private static String rabbitmqStreamsCommand() {
273+
String rabbitmqctl = rabbitmqctlCommand();
274+
int lastIndex = rabbitmqctl.lastIndexOf("rabbitmqctl");
275+
if (lastIndex == -1) {
276+
throw new IllegalArgumentException("Not a valid rabbitqmctl command: " + rabbitmqctl);
277+
}
278+
return rabbitmqctl.substring(0, lastIndex) + "rabbitmq-streams";
279+
}
280+
256281
public static AutoCloseable diskAlarm() throws Exception {
257282
return new CallableAutoCloseable(
258283
() -> {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
5+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
6+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
package com.rabbitmq.stream.impl;
16+
17+
import static org.assertj.core.api.Assertions.fail;
18+
19+
import java.time.Duration;
20+
import org.assertj.core.api.AbstractObjectAssert;
21+
22+
final class Assertions {
23+
24+
private Assertions() {}
25+
26+
static SyncAssert assertThat(TestUtils.Sync sync) {
27+
return new SyncAssert(sync);
28+
}
29+
30+
static class SyncAssert extends AbstractObjectAssert<SyncAssert, TestUtils.Sync> {
31+
32+
private SyncAssert(TestUtils.Sync sync) {
33+
super(sync, SyncAssert.class);
34+
}
35+
36+
SyncAssert completes() {
37+
return this.completes(TestUtils.DEFAULT_CONDITION_TIMEOUT);
38+
}
39+
40+
SyncAssert completes(Duration timeout) {
41+
boolean completed = actual.await(timeout);
42+
if (!completed) {
43+
fail("Sync timed out after %d ms", timeout.toMillis());
44+
}
45+
return this;
46+
}
47+
48+
SyncAssert hasCompleted() {
49+
if (!this.actual.hasCompleted()) {
50+
fail("Sync should have completed but has not");
51+
}
52+
return this;
53+
}
54+
55+
SyncAssert hasNotCompleted() {
56+
if (this.actual.hasCompleted()) {
57+
fail("Sync should have not completed");
58+
}
59+
return this;
60+
}
61+
}
62+
}

src/test/java/com/rabbitmq/stream/impl/SacStreamConsumerTest.java

+74-6
Original file line numberDiff line numberDiff line change
@@ -14,24 +14,24 @@
1414
1515
package com.rabbitmq.stream.impl;
1616

17-
import static com.rabbitmq.stream.impl.TestUtils.publishAndWaitForConfirms;
18-
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
17+
import static com.rabbitmq.stream.impl.Assertions.assertThat;
18+
import static com.rabbitmq.stream.impl.TestUtils.*;
1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21-
import com.rabbitmq.stream.Consumer;
22-
import com.rabbitmq.stream.Environment;
23-
import com.rabbitmq.stream.EnvironmentBuilder;
24-
import com.rabbitmq.stream.OffsetSpecification;
21+
import com.rabbitmq.stream.*;
2522
import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast311Condition;
2623
import io.netty.channel.EventLoopGroup;
2724
import java.util.Map;
2825
import java.util.concurrent.ConcurrentHashMap;
2926
import java.util.concurrent.atomic.AtomicInteger;
3027
import java.util.concurrent.atomic.AtomicLong;
28+
import java.util.stream.Stream;
3129
import org.junit.jupiter.api.AfterEach;
3230
import org.junit.jupiter.api.BeforeEach;
3331
import org.junit.jupiter.api.Test;
3432
import org.junit.jupiter.api.extension.ExtendWith;
33+
import org.junit.jupiter.params.ParameterizedTest;
34+
import org.junit.jupiter.params.provider.MethodSource;
3535

3636
@ExtendWith({
3737
TestUtils.StreamTestInfrastructureExtension.class,
@@ -237,4 +237,72 @@ void externalTrackingSecondConsumerShouldTakeOverWhereTheFirstOneLeftOff() throw
237237
// nothing stored on the server side
238238
assertThat(cf.get().queryOffset(consumerName, stream).getOffset()).isZero();
239239
}
240+
241+
public static Stream<java.util.function.Consumer<Consumer>>
242+
activeConsumerShouldGetUpdateNotificationAfterDisruption() {
243+
return Stream.of(
244+
namedConsumer(consumer -> Host.killConnection(connectionName(consumer)), "kill connection"),
245+
namedConsumer(consumer -> Host.restartStream(stream(consumer)), "restart stream"),
246+
namedConsumer(Consumer::close, "close consumer"));
247+
}
248+
249+
@ParameterizedTest
250+
@MethodSource
251+
@TestUtils.DisabledIfRabbitMqCtlNotSet
252+
void activeConsumerShouldGetUpdateNotificationAfterDisruption(
253+
java.util.function.Consumer<Consumer> disruption) {
254+
String consumerName = "foo";
255+
Sync consumer1Active = sync();
256+
Sync consumer1Inactive = sync();
257+
Consumer consumer1 =
258+
environment.consumerBuilder().stream(stream)
259+
.name(consumerName)
260+
.noTrackingStrategy()
261+
.singleActiveConsumer()
262+
.consumerUpdateListener(
263+
context -> {
264+
if (context.isActive()) {
265+
consumer1Active.down();
266+
} else {
267+
consumer1Inactive.down();
268+
}
269+
return OffsetSpecification.next();
270+
})
271+
.messageHandler((context, message) -> {})
272+
.build();
273+
274+
Sync consumer2Active = sync();
275+
Sync consumer2Inactive = sync();
276+
environment.consumerBuilder().stream(stream)
277+
.name(consumerName)
278+
.noTrackingStrategy()
279+
.singleActiveConsumer()
280+
.consumerUpdateListener(
281+
context -> {
282+
if (!context.isActive()) {
283+
consumer2Inactive.down();
284+
}
285+
return OffsetSpecification.next();
286+
})
287+
.messageHandler((context, message) -> {})
288+
.build();
289+
290+
assertThat(consumer1Active).completes();
291+
assertThat(consumer2Inactive).hasNotCompleted();
292+
assertThat(consumer1Inactive).hasNotCompleted();
293+
assertThat(consumer2Active).hasNotCompleted();
294+
295+
disruption.accept(consumer1);
296+
297+
assertThat(consumer2Inactive).hasNotCompleted();
298+
assertThat(consumer1Inactive).completes();
299+
}
300+
301+
private static String connectionName(Consumer consumer) {
302+
return ((StreamConsumer) consumer).subscriptionConnectionName();
303+
}
304+
305+
private static String stream(Consumer consumer) {
306+
return ((StreamConsumer) consumer).stream();
307+
}
240308
}

0 commit comments

Comments
 (0)