Skip to content

Add test for permission updates after re-authentication #491

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,12 @@ private Address address() {
}

private String label() {
return address.host() + ":" + address.port();
Client c = this.nullableClient();
if (c == null) {
return address.host() + ":" + address.port();
} else {
return c.getHost() + ":" + c.getPort();
}
}

@Override
Expand Down
195 changes: 174 additions & 21 deletions src/test/java/com/rabbitmq/stream/impl/AuthorisationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,32 @@
// [email protected].
package com.rabbitmq.stream.impl;

import static com.rabbitmq.stream.Constants.*;
import static com.rabbitmq.stream.Host.*;
import static com.rabbitmq.stream.impl.TestUtils.b;
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
import static com.rabbitmq.stream.OffsetSpecification.first;
import static com.rabbitmq.stream.impl.TestUtils.*;
import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.ok;
import static java.util.Arrays.asList;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;

import com.rabbitmq.stream.Constants;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.sasl.CredentialsProvider;
import com.rabbitmq.stream.sasl.DefaultUsernamePasswordCredentialsProvider;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.IntStream;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
Expand Down Expand Up @@ -74,11 +83,11 @@ void createStreamWithAuthorisedNameShouldSucceed() {
String stream = "stream-authorized" + i;
Client.Response response = client.create(stream);
assertThat(response.isOk()).isTrue();
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK);

response = deletionClient.delete(stream);
assertThat(response.isOk()).isTrue();
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK);
});
}

Expand All @@ -105,11 +114,11 @@ void deleteStreamWithAuthorisedNameShouldSucceed() {
String stream = "stream-authorized" + i;
Client.Response response = creationClient.create(stream);
assertThat(response.isOk()).isTrue();
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK);

response = client.delete(stream);
assertThat(response.isOk()).isTrue();
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK);
});
}

Expand All @@ -123,7 +132,7 @@ void deleteStreamWithUnauthorisedNameShouldFail() {
String stream = "not-authorized" + i;
Client.Response response = creationClient.create(stream);
assertThat(response.isOk()).isTrue();
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK);

response = client.delete(stream);
assertThat(response.isOk()).isFalse();
Expand All @@ -132,7 +141,7 @@ void deleteStreamWithUnauthorisedNameShouldFail() {

response = creationClient.delete(stream);
assertThat(response.isOk()).isTrue();
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK);
});
}

Expand All @@ -146,15 +155,15 @@ void subscribeToAuthorisedStreamShouldSucceed() {
String stream = "stream-authorized" + i;
Client.Response response = configurationClient.create(stream);
assertThat(response.isOk()).isTrue();
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK);

response = client.subscribe(b(1), stream, OffsetSpecification.first(), 10);
response = client.subscribe(b(1), stream, first(), 10);
assertThat(response.isOk()).isTrue();
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK);

response = configurationClient.delete(stream);
assertThat(response.isOk()).isTrue();
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK);
});
}

Expand All @@ -168,16 +177,16 @@ void subscribeToUnauthorisedStreamShouldFail() {
String stream = "not-authorized" + i;
Client.Response response = configurationClient.create(stream);
assertThat(response.isOk()).isTrue();
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK);

response = client.subscribe(b(1), stream, OffsetSpecification.first(), 10);
response = client.subscribe(b(1), stream, first(), 10);
assertThat(response.isOk()).isFalse();
assertThat(response.getResponseCode())
.isEqualTo(Constants.RESPONSE_CODE_ACCESS_REFUSED);

response = configurationClient.delete(stream);
assertThat(response.isOk()).isTrue();
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK);
});
}

Expand All @@ -191,7 +200,7 @@ void publishToAuthorisedStreamShouldSucceed() {
String stream = "stream-authorized" + i;
Client.Response response = configurationClient.create(stream);
assertThat(response.isOk()).isTrue();
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK);

int messageCount = 1000;
CountDownLatch publishConfirmLatch = new CountDownLatch(messageCount);
Expand Down Expand Up @@ -223,7 +232,7 @@ void publishToAuthorisedStreamShouldSucceed() {

response = configurationClient.delete(stream);
assertThat(response.isOk()).isTrue();
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK);
});
}

Expand All @@ -237,7 +246,7 @@ void publishToUnauthorisedStreamShouldFail() {
String stream = "not-authorized" + i;
Client.Response response = configurationClient.create(stream);
assertThat(response.isOk()).isTrue();
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK);

int messageCount = 1000;
CountDownLatch publishErrorLatch = new CountDownLatch(messageCount);
Expand Down Expand Up @@ -270,7 +279,7 @@ void publishToUnauthorisedStreamShouldFail() {

response = configurationClient.delete(stream);
assertThat(response.isOk()).isTrue();
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
assertThat(response.getResponseCode()).isEqualTo(RESPONSE_CODE_OK);
});
}

Expand Down Expand Up @@ -304,6 +313,142 @@ void storeQueryOffsetShouldSucceedOnAuthorisedStreamShouldFailOnUnauthorisedStre
}
}

@Test
@TestUtils.BrokerVersionAtLeast(TestUtils.BrokerVersion.RABBITMQ_3_13_0)
void shouldReceiveMetadataUpdateAfterUpdateSecret(TestInfo info) throws Exception {
try {
String newPassword = "new-password";
String prefix = "passthrough-";
String pubSub = TestUtils.streamName(info);
String authorizedPubSub = prefix + TestUtils.streamName(info);
String pub = TestUtils.streamName(info);
String authorizedPub = prefix + TestUtils.streamName(info);
String sub = TestUtils.streamName(info);
String authorizedSub = prefix + TestUtils.streamName(info);
setPermissions(USERNAME, VH, ".*");
Set<String> metadataUpdates = ConcurrentHashMap.newKeySet();
ConcurrentMap<Byte, Short> publishConfirms = new ConcurrentHashMap<>();
ConcurrentMap<Byte, Short> creditNotifications = new ConcurrentHashMap<>();
Set<Byte> receivedMessages = ConcurrentHashMap.newKeySet();
Client client =
cf.get(
parameters()
.virtualHost(VH)
.username(USERNAME)
.password(USERNAME)
.publishConfirmListener(
(publisherId, publishingId) ->
publishConfirms.put(publisherId, RESPONSE_CODE_OK))
.publishErrorListener(
(publisherId, publishingId, errorCode) ->
publishConfirms.put(publisherId, errorCode))
.creditNotification(
(subscriptionId, responseCode) ->
creditNotifications.put(subscriptionId, responseCode))
.messageListener(
(subscriptionId,
offset,
chunkTimestamp,
committedChunkId,
chunkContext,
message) -> receivedMessages.add(subscriptionId))
.metadataListener((stream, code) -> metadataUpdates.add(stream)));
assertThat(client.create(pubSub)).is(ok());
assertThat(client.create(authorizedPubSub)).is(ok());
assertThat(client.create(pub)).is(ok());
assertThat(client.create(authorizedPub)).is(ok());
assertThat(client.create(sub)).is(ok());
assertThat(client.create(authorizedSub)).is(ok());

Map<String, Byte> publishers = new HashMap<>();
publishers.put(pubSub, b(0));
publishers.put(authorizedPubSub, b(1));
publishers.put(pub, b(2));
publishers.put(authorizedPub, b(3));
publishers.forEach((s, id) -> assertThat(client.declarePublisher(id, null, s)).is(ok()));
Map<String, Byte> subscriptions = new HashMap<>();
subscriptions.put(pubSub, b(0));
subscriptions.put(authorizedPubSub, b(1));
subscriptions.put(sub, b(2));
subscriptions.put(authorizedSub, b(3));
subscriptions.forEach((s, id) -> assertThat(client.subscribe(id, s, first(), 1)).is(ok()));

Function<String, Byte> toPub = publishers::get;
Function<String, Byte> toSub = subscriptions::get;

// change password and permissions and re-authenticate
changePassword(USERNAME, newPassword);
setPermissions(USERNAME, VH, "^passthrough.*$");
client.authenticate(credentialsProvider(USERNAME, newPassword));

waitAtMost(() -> metadataUpdates.containsAll(asList(pubSub, pub, sub)));

List<Message> message = Collections.singletonList(client.messageBuilder().build());

// publishers for unauthorized streams should be gone
asList(toPub.apply(pubSub), toPub.apply(pub))
.forEach(
wrap(
pubId -> {
assertThat(publishConfirms).doesNotContainKey(pubId);
client.publish(pubId, message);
waitAtMost(() -> publishConfirms.containsKey(pubId));
assertThat(publishConfirms)
.containsEntry(pubId, RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST);
}));

// subscriptions for unauthorized streams should be gone
asList(toSub.apply(pubSub), toSub.apply(sub))
.forEach(
wrap(
subId -> {
assertThat(creditNotifications).doesNotContainKey(subId);
client.credit(subId, 1);
waitAtMost(() -> creditNotifications.containsKey(subId));
assertThat(creditNotifications)
.containsEntry(subId, RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST);
}));

// subscriptions for authorized streams still work
asList(toSub.apply(authorizedPubSub), toSub.apply(authorizedSub))
.forEach(subId -> client.credit(subId, 1));

assertThat(receivedMessages).isEmpty();
// publishers for authorized streams should still work
asList(toPub.apply(authorizedPubSub), toPub.apply(authorizedPub))
.forEach(
wrap(
pubId -> {
client.publish(pubId, message);
waitAtMost(() -> publishConfirms.containsKey(pubId));
assertThat(publishConfirms).containsEntry(pubId, RESPONSE_CODE_OK);
}));

waitAtMost(() -> receivedMessages.contains(b(1)));

// send message to authorized subscription stream
assertThat(client.declarePublisher(b(5), null, authorizedSub)).is(ok());
client.publish(b(5), message);
waitAtMost(() -> receivedMessages.contains(toSub.apply(authorizedSub)));

// last checks to make sure nothing unexpected arrived late
assertThat(metadataUpdates).hasSize(3);
assertThat(creditNotifications).containsOnlyKeys(b(0), b(2));
assertThat(publishConfirms)
.hasSize(4 + 1)
.containsEntry(toPub.apply(pubSub), RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST)
.containsEntry(toPub.apply(authorizedPubSub), RESPONSE_CODE_OK)
.containsEntry(toPub.apply(pub), RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST)
.containsEntry(toPub.apply(authorizedPub), RESPONSE_CODE_OK);
assertThat(receivedMessages).hasSize(2);

client.close();
} finally {
changePassword(USERNAME, PASSWORD);
setPermissions(USERNAME, VH, "^stream.*$");
}
}

Client configurationClient() {
return cf.get(new Client.ClientParameters().virtualHost(VH));
}
Expand All @@ -315,4 +460,12 @@ Client client() {
Client client(Client.ClientParameters parameters) {
return cf.get(parameters.virtualHost(VH).username(USERNAME).password(PASSWORD));
}

private static Client.ClientParameters parameters() {
return new Client.ClientParameters();
}

private static CredentialsProvider credentialsProvider(String username, String password) {
return new DefaultUsernamePasswordCredentialsProvider(username, password);
}
}
1 change: 1 addition & 0 deletions src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
</appender>

<logger name="com.rabbitmq.stream" level="warn" />
<!-- <logger name="com.rabbitmq.stream.impl.DefaultExecutorServiceFactory" level="warn" />-->

<logger name="com.rabbitmq.stream.perf.Version" level="error" />

Expand Down