Skip to content

Notify SAC when it is no longer active #612

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions src/main/java/com/rabbitmq/stream/impl/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -303,12 +303,7 @@ public void initChannel(SocketChannel ch) {
});

ChannelFuture f;
String clientConnectionName =
parameters.clientProperties == null
? ""
: (parameters.clientProperties.containsKey("connection_name")
? parameters.clientProperties.get("connection_name")
: "");
String clientConnectionName = parameters.clientProperties.getOrDefault("connection_name", "");
try {
LOGGER.debug(
"Trying to create stream connection to {}:{}, with client connection name '{}'",
Expand Down Expand Up @@ -1505,6 +1500,10 @@ String connectionName() {
return builder.append(serverAddress()).toString();
}

String clientConnectionName() {
return this.clientConnectionName;
}

private String serverAddress() {
SocketAddress remoteAddress = remoteAddress();
if (remoteAddress instanceof InetSocketAddress) {
Expand Down
23 changes: 23 additions & 0 deletions src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ public void close() {
}

void closeFromEnvironment() {
this.maybeNotifyActiveToInactiveSac();
LOGGER.debug("Calling consumer {} closing callback (stream {})", this.id, this.stream);
this.closingCallback.run();
closed.set(true);
Expand All @@ -490,6 +491,7 @@ void closeFromEnvironment() {

void closeAfterStreamDeletion() {
if (closed.compareAndSet(false, true)) {
this.maybeNotifyActiveToInactiveSac();
this.environment.removeConsumer(this);
this.status = Status.CLOSED;
}
Expand All @@ -506,11 +508,23 @@ void setTrackingClient(Client client) {
void setSubscriptionClient(Client client) {
this.subscriptionClient = client;
if (client == null && this.isSac()) {
maybeNotifyActiveToInactiveSac();
// we lost the connection
this.sacActive = false;
}
}

private void maybeNotifyActiveToInactiveSac() {
if (this.isSac() && this.sacActive) {
LOGGER.debug(
"Single active consumer {} from stream {} with name {} is unavailable, calling consumer update listener",
this.id,
this.stream,
this.name);
this.consumerUpdate(false);
}
}

synchronized void unavailable() {
this.status = Status.NOT_AVAILABLE;
this.trackingClient = null;
Expand Down Expand Up @@ -623,4 +637,13 @@ private void checkNotClosed() {
long id() {
return this.id;
}

String subscriptionConnectionName() {
Client client = this.subscriptionClient;
if (client == null) {
return "<no-connection>";
} else {
return client.clientConnectionName();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ public void store(long offset) {
"Consumer#store(long) does not work for super streams, use MessageHandler.Context#storeOffset() instead");
}

Consumer consumer(String partition) {
return this.consumers.get(partition);
}

@Override
public long storedOffset() {
throw new UnsupportedOperationException(
Expand Down
77 changes: 51 additions & 26 deletions src/test/java/com/rabbitmq/stream/Host.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,30 +47,34 @@ public static String capture(InputStream is) throws IOException {
return buff.toString();
}

private static Process executeCommand(String command) throws IOException {
private static Process executeCommand(String command) {
return executeCommand(command, false);
}

private static Process executeCommand(String command, boolean ignoreError) throws IOException {
Process pr = executeCommandProcess(command);

int ev = waitForExitValue(pr);
if (ev != 0 && !ignoreError) {
String stdout = capture(pr.getInputStream());
String stderr = capture(pr.getErrorStream());
throw new IOException(
"unexpected command exit value: "
+ ev
+ "\ncommand: "
+ command
+ "\n"
+ "\nstdout:\n"
+ stdout
+ "\nstderr:\n"
+ stderr
+ "\n");
private static Process executeCommand(String command, boolean ignoreError) {
try {
Process pr = executeCommandProcess(command);

int ev = waitForExitValue(pr);
if (ev != 0 && !ignoreError) {
String stdout = capture(pr.getInputStream());
String stderr = capture(pr.getErrorStream());
throw new IOException(
"unexpected command exit value: "
+ ev
+ "\ncommand: "
+ command
+ "\n"
+ "\nstdout:\n"
+ stdout
+ "\nstderr:\n"
+ stderr
+ "\n");
}
return pr;
} catch (IOException e) {
throw new RuntimeException(e);
}
return pr;
}

public static String hostname() throws IOException {
Expand Down Expand Up @@ -110,6 +114,10 @@ public static Process rabbitmqctl(String command) throws IOException {
return executeCommand(rabbitmqctlCommand() + " " + command);
}

static Process rabbitmqStreams(String command) {
return executeCommand(rabbitmqStreamsCommand() + " " + command);
}

public static Process rabbitmqctlIgnoreError(String command) throws IOException {
return executeCommand(rabbitmqctlCommand() + " " + command, true);
}
Expand Down Expand Up @@ -189,11 +197,19 @@ static List<ConnectionInfo> toConnectionInfoList(String json) {
return GSON.fromJson(json, new TypeToken<List<ConnectionInfo>>() {}.getType());
}

public static Process killStreamLeaderProcess(String stream) throws IOException {
return rabbitmqctl(
"eval 'case rabbit_stream_manager:lookup_leader(<<\"/\">>, <<\""
+ stream
+ "\">>) of {ok, Pid} -> exit(Pid, kill); Pid -> exit(Pid, kill) end.'");
public static void restartStream(String stream) {
rabbitmqStreams(" restart_stream " + stream);
}

public static Process killStreamLeaderProcess(String stream) {
try {
return rabbitmqctl(
"eval 'case rabbit_stream_manager:lookup_leader(<<\"/\">>, <<\""
+ stream
+ "\">>) of {ok, Pid} -> exit(Pid, kill); Pid -> exit(Pid, kill) end.'");
} catch (IOException e) {
throw new RuntimeException(e);
}
}

public static void addUser(String username, String password) throws IOException {
Expand Down Expand Up @@ -243,7 +259,7 @@ public static void setEnv(String parameter, String value) throws IOException {
public static String rabbitmqctlCommand() {
String rabbitmqCtl = System.getProperty("rabbitmqctl.bin");
if (rabbitmqCtl == null) {
throw new IllegalStateException("Please define the rabbitmqctl.bin system property");
rabbitmqCtl = DOCKER_PREFIX + "rabbitmq";
}
if (rabbitmqCtl.startsWith(DOCKER_PREFIX)) {
String containerId = rabbitmqCtl.split(":")[1];
Expand All @@ -253,6 +269,15 @@ public static String rabbitmqctlCommand() {
}
}

private static String rabbitmqStreamsCommand() {
String rabbitmqctl = rabbitmqctlCommand();
int lastIndex = rabbitmqctl.lastIndexOf("rabbitmqctl");
if (lastIndex == -1) {
throw new IllegalArgumentException("Not a valid rabbitqmctl command: " + rabbitmqctl);
}
return rabbitmqctl.substring(0, lastIndex) + "rabbitmq-streams";
}

public static AutoCloseable diskAlarm() throws Exception {
return new CallableAutoCloseable(
() -> {
Expand Down
62 changes: 62 additions & 0 deletions src/test/java/com/rabbitmq/stream/impl/Assertions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) 2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// [email protected].
package com.rabbitmq.stream.impl;

import static org.assertj.core.api.Assertions.fail;

import java.time.Duration;
import org.assertj.core.api.AbstractObjectAssert;

final class Assertions {

private Assertions() {}

static SyncAssert assertThat(TestUtils.Sync sync) {
return new SyncAssert(sync);
}

static class SyncAssert extends AbstractObjectAssert<SyncAssert, TestUtils.Sync> {

private SyncAssert(TestUtils.Sync sync) {
super(sync, SyncAssert.class);
}

SyncAssert completes() {
return this.completes(TestUtils.DEFAULT_CONDITION_TIMEOUT);
}

SyncAssert completes(Duration timeout) {
boolean completed = actual.await(timeout);
if (!completed) {
fail("Sync timed out after %d ms", timeout.toMillis());
}
return this;
}

SyncAssert hasCompleted() {
if (!this.actual.hasCompleted()) {
fail("Sync should have completed but has not");
}
return this;
}

SyncAssert hasNotCompleted() {
if (this.actual.hasCompleted()) {
fail("Sync should have not completed");
}
return this;
}
}
}
80 changes: 74 additions & 6 deletions src/test/java/com/rabbitmq/stream/impl/SacStreamConsumerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,24 @@
// [email protected].
package com.rabbitmq.stream.impl;

import static com.rabbitmq.stream.impl.TestUtils.publishAndWaitForConfirms;
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
import static com.rabbitmq.stream.impl.Assertions.assertThat;
import static com.rabbitmq.stream.impl.TestUtils.*;
import static org.assertj.core.api.Assertions.assertThat;

import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.EnvironmentBuilder;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.*;
import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast311Condition;
import io.netty.channel.EventLoopGroup;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

@ExtendWith({
TestUtils.StreamTestInfrastructureExtension.class,
Expand Down Expand Up @@ -237,4 +237,72 @@ void externalTrackingSecondConsumerShouldTakeOverWhereTheFirstOneLeftOff() throw
// nothing stored on the server side
assertThat(cf.get().queryOffset(consumerName, stream).getOffset()).isZero();
}

public static Stream<java.util.function.Consumer<Consumer>>
activeConsumerShouldGetUpdateNotificationAfterDisruption() {
return Stream.of(
namedConsumer(consumer -> Host.killConnection(connectionName(consumer)), "kill connection"),
namedConsumer(consumer -> Host.restartStream(stream(consumer)), "restart stream"),
namedConsumer(Consumer::close, "close consumer"));
}

@ParameterizedTest
@MethodSource
@TestUtils.DisabledIfRabbitMqCtlNotSet
void activeConsumerShouldGetUpdateNotificationAfterDisruption(
java.util.function.Consumer<Consumer> disruption) {
String consumerName = "foo";
Sync consumer1Active = sync();
Sync consumer1Inactive = sync();
Consumer consumer1 =
environment.consumerBuilder().stream(stream)
.name(consumerName)
.noTrackingStrategy()
.singleActiveConsumer()
.consumerUpdateListener(
context -> {
if (context.isActive()) {
consumer1Active.down();
} else {
consumer1Inactive.down();
}
return OffsetSpecification.next();
})
.messageHandler((context, message) -> {})
.build();

Sync consumer2Active = sync();
Sync consumer2Inactive = sync();
environment.consumerBuilder().stream(stream)
.name(consumerName)
.noTrackingStrategy()
.singleActiveConsumer()
.consumerUpdateListener(
context -> {
if (!context.isActive()) {
consumer2Inactive.down();
}
return OffsetSpecification.next();
})
.messageHandler((context, message) -> {})
.build();

assertThat(consumer1Active).completes();
assertThat(consumer2Inactive).hasNotCompleted();
assertThat(consumer1Inactive).hasNotCompleted();
assertThat(consumer2Active).hasNotCompleted();

disruption.accept(consumer1);

assertThat(consumer2Inactive).hasNotCompleted();
assertThat(consumer1Inactive).completes();
}

private static String connectionName(Consumer consumer) {
return ((StreamConsumer) consumer).subscriptionConnectionName();
}

private static String stream(Consumer consumer) {
return ((StreamConsumer) consumer).stream();
}
}
Loading