Skip to content

Commit 972adfa

Browse files
committed
Polishing #2594
Update tests to reflect new behavior. Use negotiated protocol version and fall back to the configured one of no negotiated version is available. Original pull request: #2778
1 parent a470433 commit 972adfa

File tree

5 files changed

+69
-23
lines changed

5 files changed

+69
-23
lines changed

src/main/java/io/lettuce/core/pubsub/PubSubEndpoint.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.CopyOnWriteArrayList;
2626

2727
import io.lettuce.core.ClientOptions;
28+
import io.lettuce.core.ConnectionState;
2829
import io.lettuce.core.RedisException;
2930
import io.lettuce.core.protocol.CommandType;
3031
import io.lettuce.core.protocol.DefaultEndpoint;
@@ -55,6 +56,8 @@ public class PubSubEndpoint<K, V> extends DefaultEndpoint {
5556

5657
private volatile boolean subscribeWritten = false;
5758

59+
private ConnectionState connectionState;
60+
5861
static {
5962

6063
ALLOWED_COMMANDS_SUBSCRIBED = new HashSet<>(6, 1);
@@ -195,13 +198,24 @@ protected boolean containsViolatingCommands(Collection<? extends RedisCommand<?,
195198
}
196199

197200
private boolean isAllowed(RedisCommand<?, ?, ?> command) {
198-
return getProtocolVersion() == ProtocolVersion.RESP3 || ALLOWED_COMMANDS_SUBSCRIBED.contains(command.getType().name());
201+
202+
ProtocolVersion protocolVersion = connectionState != null ? connectionState.getNegotiatedProtocolVersion() : null;
203+
204+
if (protocolVersion == null) {
205+
protocolVersion = getProtocolVersion();
206+
}
207+
208+
return protocolVersion == ProtocolVersion.RESP3 || ALLOWED_COMMANDS_SUBSCRIBED.contains(command.getType().name());
199209
}
200210

201211
public boolean isSubscribed() {
202212
return subscribeWritten && (hasChannelSubscriptions() || hasPatternSubscriptions());
203213
}
204214

215+
void setConnectionState(ConnectionState connectionState) {
216+
this.connectionState = connectionState;
217+
}
218+
205219
void notifyMessage(PubSubMessage<K, V> message) {
206220

207221
// drop empty messages

src/main/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ public StatefulRedisPubSubConnectionImpl(PubSubEndpoint<K, V> endpoint, RedisCha
5959
Duration timeout) {
6060

6161
super(writer, endpoint, codec, timeout);
62-
6362
this.endpoint = endpoint;
63+
endpoint.setConnectionState(getConnectionState());
6464
}
6565

6666
/**

src/test/java/io/lettuce/core/pubsub/PubSubCommandResp2Test.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,16 @@
1515
*/
1616
package io.lettuce.core.pubsub;
1717

18+
import static org.assertj.core.api.Assertions.*;
19+
1820
import org.junit.jupiter.api.Disabled;
1921
import org.junit.jupiter.api.Test;
2022

2123
import io.lettuce.core.ClientOptions;
24+
import io.lettuce.core.RedisException;
2225
import io.lettuce.core.protocol.ProtocolVersion;
26+
import io.lettuce.test.TestFutures;
27+
import io.lettuce.test.Wait;
2328

2429
/**
2530
* Pub/Sub Command tests using RESP2.
@@ -28,13 +33,35 @@
2833
*/
2934
class PubSubCommandResp2Test extends PubSubCommandTest {
3035

36+
@Override
3137
protected ClientOptions getOptions() {
3238
return ClientOptions.builder().protocolVersion(ProtocolVersion.RESP2).build();
3339
}
3440

35-
@Override
3641
@Test
3742
@Disabled("Push messages are not available with RESP2")
43+
@Override
3844
void messageAsPushMessage() {
3945
}
46+
47+
@Test
48+
@Disabled("Does not apply with RESP2")
49+
@Override
50+
void echoAllowedInSubscriptionState() {
51+
}
52+
53+
@Test
54+
void echoNotAllowedInSubscriptionState() {
55+
56+
TestFutures.awaitOrTimeout(pubsub.subscribe(channel));
57+
58+
assertThatThrownBy(() -> TestFutures.getOrTimeout(pubsub.echo("ping"))).isInstanceOf(RedisException.class)
59+
.hasMessageContaining("not allowed");
60+
pubsub.unsubscribe(channel);
61+
62+
Wait.untilTrue(() -> channels.size() == 2).waitOrTimeout();
63+
64+
assertThat(TestFutures.getOrTimeout(pubsub.echo("ping"))).isEqualTo("ping");
65+
}
66+
4067
}

src/test/java/io/lettuce/core/pubsub/PubSubCommandTest.java

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@
1515
*/
1616
package io.lettuce.core.pubsub;
1717

18-
import static org.assertj.core.api.Assertions.assertThat;
19-
import static org.assertj.core.api.Assertions.assertThatThrownBy;
18+
import static org.assertj.core.api.Assertions.*;
2019

2120
import java.nio.ByteBuffer;
2221
import java.time.Duration;
@@ -33,7 +32,12 @@
3332
import org.junit.jupiter.api.BeforeEach;
3433
import org.junit.jupiter.api.Test;
3534

36-
import io.lettuce.core.*;
35+
import io.lettuce.core.AbstractRedisClientTest;
36+
import io.lettuce.core.ClientOptions;
37+
import io.lettuce.core.KillArgs;
38+
import io.lettuce.core.RedisClient;
39+
import io.lettuce.core.RedisFuture;
40+
import io.lettuce.core.RedisURI;
3741
import io.lettuce.core.api.async.RedisAsyncCommands;
3842
import io.lettuce.core.api.push.PushMessage;
3943
import io.lettuce.core.internal.LettuceFactories;
@@ -57,16 +61,21 @@
5761
*/
5862
class PubSubCommandTest extends AbstractRedisClientTest implements RedisPubSubListener<String, String> {
5963

60-
private RedisPubSubAsyncCommands<String, String> pubsub;
64+
RedisPubSubAsyncCommands<String, String> pubsub;
6165

62-
private BlockingQueue<String> channels;
63-
private BlockingQueue<String> patterns;
64-
private BlockingQueue<String> messages;
65-
private BlockingQueue<Long> counts;
66+
BlockingQueue<String> channels;
6667

67-
private String channel = "channel0";
68-
private String pattern = "channel*";
69-
private String message = "msg!";
68+
BlockingQueue<String> patterns;
69+
70+
BlockingQueue<String> messages;
71+
72+
BlockingQueue<Long> counts;
73+
74+
String channel = "channel0";
75+
76+
String pattern = "channel*";
77+
78+
String message = "msg!";
7079

7180
@BeforeEach
7281
void openPubSubConnection() {
@@ -446,6 +455,7 @@ void adapter() throws Exception {
446455
final BlockingQueue<Long> localCounts = LettuceFactories.newBlockingQueue();
447456

448457
RedisPubSubAdapter<String, String> adapter = new RedisPubSubAdapter<String, String>() {
458+
449459
@Override
450460
public void subscribed(String channel, long count) {
451461
super.subscribed(channel, count);
@@ -457,6 +467,7 @@ public void unsubscribed(String channel, long count) {
457467
super.unsubscribed(channel, count);
458468
localCounts.add(count);
459469
}
470+
460471
};
461472

462473
pubsub.getStatefulConnection().addListener(adapter);
@@ -489,17 +500,12 @@ void removeListener() throws Exception {
489500
}
490501

491502
@Test
492-
void pingNotAllowedInSubscriptionState() {
503+
void echoAllowedInSubscriptionState() {
493504

494505
TestFutures.awaitOrTimeout(pubsub.subscribe(channel));
495506

496-
assertThatThrownBy(() -> TestFutures.getOrTimeout(pubsub.echo("ping"))).isInstanceOf(RedisException.class)
497-
.hasMessageContaining("not allowed");
498-
pubsub.unsubscribe(channel);
499-
500-
Wait.untilTrue(() -> channels.size() == 2).waitOrTimeout();
501-
502507
assertThat(TestFutures.getOrTimeout(pubsub.echo("ping"))).isEqualTo("ping");
508+
pubsub.unsubscribe(channel);
503509
}
504510

505511
// RedisPubSubListener implementation
@@ -540,4 +546,5 @@ public void punsubscribed(String pattern, long count) {
540546
patterns.add(pattern);
541547
counts.add(count);
542548
}
549+
543550
}

src/test/java/io/lettuce/core/tracing/SynchronousIntegrationTests.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import io.lettuce.test.resource.FastShutdown;
2929
import io.lettuce.test.settings.TestSettings;
3030
import io.micrometer.core.instrument.MeterRegistry;
31-
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
3231
import io.micrometer.observation.ObservationRegistry;
3332
import io.micrometer.tracing.exporter.FinishedSpan;
3433
import io.micrometer.tracing.test.SampleTestRunner;
@@ -83,7 +82,6 @@ public SampleTestRunnerConsumer yourCode() {
8382
FastShutdown.shutdown(clientResources);
8483

8584
assertThat(tracer.getFinishedSpans()).isNotEmpty();
86-
System.out.println(((SimpleMeterRegistry) meterRegistry).getMetersAsString());
8785

8886
assertThat(tracer.getFinishedSpans()).isNotEmpty();
8987

0 commit comments

Comments
 (0)