Skip to content

Commit 4c908f3

Browse files
committed
Use "filter."-like subscription properties
And deal with null filter values. Conflicts: pom.xml
1 parent 1cd3fc5 commit 4c908f3

13 files changed

+324
-176
lines changed

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@
9292
<buildnumber.plugin.version>1.4</buildnumber.plugin.version>
9393
<jmh.version>1.36</jmh.version>
9494
<spotless.version>2.37.0</spotless.version>
95-
<google-java-format.version>1.15.0</google-java-format.version>
95+
<google-java-format.version>1.17.0</google-java-format.version>
9696
<jacoco.version>0.8.10</jacoco.version>
9797
<!-- for documentation -->
9898
<broker.version>3.11</broker.version>

src/main/java/com/rabbitmq/stream/ConsumerBuilder.java

+4
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,10 @@ interface FilterConfiguration {
219219

220220
FilterConfiguration filter(Predicate<Message> filter);
221221

222+
FilterConfiguration matchUnfiltered();
223+
224+
FilterConfiguration matchUnfiltered(boolean matchUnfiltered);
225+
222226
ConsumerBuilder builder();
223227
}
224228
}

src/main/java/com/rabbitmq/stream/impl/Client.java

+26-45
Original file line numberDiff line numberDiff line change
@@ -13,34 +13,7 @@
1313
1414
package com.rabbitmq.stream.impl;
1515

16-
import static com.rabbitmq.stream.Constants.COMMAND_CLOSE;
17-
import static com.rabbitmq.stream.Constants.COMMAND_CONSUMER_UPDATE;
18-
import static com.rabbitmq.stream.Constants.COMMAND_CREATE_STREAM;
19-
import static com.rabbitmq.stream.Constants.COMMAND_CREDIT;
20-
import static com.rabbitmq.stream.Constants.COMMAND_DECLARE_PUBLISHER;
21-
import static com.rabbitmq.stream.Constants.COMMAND_DELETE_PUBLISHER;
22-
import static com.rabbitmq.stream.Constants.COMMAND_DELETE_STREAM;
23-
import static com.rabbitmq.stream.Constants.COMMAND_EXCHANGE_COMMAND_VERSIONS;
24-
import static com.rabbitmq.stream.Constants.COMMAND_HEARTBEAT;
25-
import static com.rabbitmq.stream.Constants.COMMAND_METADATA;
26-
import static com.rabbitmq.stream.Constants.COMMAND_OPEN;
27-
import static com.rabbitmq.stream.Constants.COMMAND_PARTITIONS;
28-
import static com.rabbitmq.stream.Constants.COMMAND_PEER_PROPERTIES;
29-
import static com.rabbitmq.stream.Constants.COMMAND_PUBLISH;
30-
import static com.rabbitmq.stream.Constants.COMMAND_QUERY_OFFSET;
31-
import static com.rabbitmq.stream.Constants.COMMAND_QUERY_PUBLISHER_SEQUENCE;
32-
import static com.rabbitmq.stream.Constants.COMMAND_ROUTE;
33-
import static com.rabbitmq.stream.Constants.COMMAND_SASL_AUTHENTICATE;
34-
import static com.rabbitmq.stream.Constants.COMMAND_SASL_HANDSHAKE;
35-
import static com.rabbitmq.stream.Constants.COMMAND_STORE_OFFSET;
36-
import static com.rabbitmq.stream.Constants.COMMAND_STREAM_STATS;
37-
import static com.rabbitmq.stream.Constants.COMMAND_SUBSCRIBE;
38-
import static com.rabbitmq.stream.Constants.COMMAND_UNSUBSCRIBE;
39-
import static com.rabbitmq.stream.Constants.RESPONSE_CODE_AUTHENTICATION_FAILURE;
40-
import static com.rabbitmq.stream.Constants.RESPONSE_CODE_AUTHENTICATION_FAILURE_LOOPBACK;
41-
import static com.rabbitmq.stream.Constants.RESPONSE_CODE_OK;
42-
import static com.rabbitmq.stream.Constants.RESPONSE_CODE_SASL_CHALLENGE;
43-
import static com.rabbitmq.stream.Constants.VERSION_1;
16+
import static com.rabbitmq.stream.Constants.*;
4417
import static com.rabbitmq.stream.impl.Utils.encodeRequestCode;
4518
import static com.rabbitmq.stream.impl.Utils.encodeResponseCode;
4619
import static com.rabbitmq.stream.impl.Utils.extractResponseCode;
@@ -216,6 +189,7 @@ public long applyAsLong(Object value) {
216189
private final Duration rpcTimeout;
217190
private volatile ShutdownReason shutdownReason = null;
218191
private final Runnable exchangeCommandVersionsCheck;
192+
private final boolean filteringSupported;
219193

220194
public Client() {
221195
this(new ClientParameters());
@@ -398,18 +372,25 @@ public void initChannel(SocketChannel ch) {
398372
tuneState.getHeartbeat());
399373
this.connectionProperties = open(parameters.virtualHost);
400374
Set<FrameHandlerInfo> supportedCommands = maybeExchangeCommandVersions();
401-
if (supportedCommands.stream()
402-
.filter(i -> i.getKey() == COMMAND_STREAM_STATS)
403-
.findAny()
404-
.isPresent()) {
405-
this.exchangeCommandVersionsCheck = () -> {};
406-
} else {
407-
this.exchangeCommandVersionsCheck =
408-
() -> {
409-
throw new UnsupportedOperationException(
410-
"QueryStreamInfo is available only on RabbitMQ 3.11 or more.");
411-
};
412-
}
375+
AtomicReference<Runnable> exchangeCommandVersionsCheckReference = new AtomicReference<>();
376+
AtomicBoolean filteringSupportedReference = new AtomicBoolean(false);
377+
supportedCommands.forEach(
378+
c -> {
379+
if (c.getKey() == COMMAND_STREAM_STATS) {
380+
exchangeCommandVersionsCheckReference.set(() -> {});
381+
}
382+
if (c.getKey() == COMMAND_PUBLISH && c.getMaxVersion() >= VERSION_2) {
383+
filteringSupportedReference.set(true);
384+
}
385+
});
386+
this.exchangeCommandVersionsCheck =
387+
exchangeCommandVersionsCheckReference.get() == null
388+
? () -> {
389+
throw new UnsupportedOperationException(
390+
"QueryStreamInfo is available only on RabbitMQ 3.11 or more.");
391+
}
392+
: exchangeCommandVersionsCheckReference.get();
393+
this.filteringSupported = filteringSupportedReference.get();
413394
started.set(true);
414395
this.metricsCollector.openConnection();
415396
} catch (RuntimeException e) {
@@ -1414,6 +1395,10 @@ private String serverAddress() {
14141395
}
14151396
}
14161397

1398+
boolean filteringSupported() {
1399+
return this.filteringSupported;
1400+
}
1401+
14171402
public List<String> route(String routingKey, String superStream) {
14181403
if (routingKey == null || superStream == null) {
14191404
throw new IllegalArgumentException("routing key and stream must not be null");
@@ -1615,11 +1600,7 @@ private Set<FrameHandlerInfo> maybeExchangeCommandVersions() {
16151600
Set<FrameHandlerInfo> supported = new HashSet<>();
16161601
try {
16171602
if (Utils.is3_11_OrMore(brokerVersion())) {
1618-
for (FrameHandlerInfo info : exchangeCommandVersions()) {
1619-
if (info.getKey() == COMMAND_STREAM_STATS) {
1620-
supported.add(info);
1621-
}
1622-
}
1603+
supported.addAll(exchangeCommandVersions());
16231604
}
16241605
} catch (Exception e) {
16251606
LOGGER.info("Error while exchanging command versions: {}", e.getMessage());

src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,9 @@ class StreamConsumer implements Consumer {
7676
ConsumerUpdateListener consumerUpdateListener,
7777
int initialCredits,
7878
int additionalCredits) {
79-
79+
if (Utils.filteringEnabled(subscriptionProperties) && !environment.filteringSupported()) {
80+
throw new IllegalArgumentException("Filtering is not supported by the broker");
81+
}
8082
this.id = ID_SEQUENCE.getAndIncrement();
8183
Runnable trackingClosingCallback;
8284
try {

src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java

+26-4
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
1414
package com.rabbitmq.stream.impl;
1515

16+
import static com.rabbitmq.stream.impl.Utils.SUBSCRIPTION_PROPERTY_FILTER_PREFIX;
17+
import static com.rabbitmq.stream.impl.Utils.SUBSCRIPTION_PROPERTY_MATCH_UNFILTERED;
18+
1619
import com.rabbitmq.stream.*;
1720
import java.lang.reflect.Field;
1821
import java.lang.reflect.Modifier;
@@ -21,6 +24,7 @@
2124
import java.util.List;
2225
import java.util.Map;
2326
import java.util.concurrent.ConcurrentHashMap;
27+
import java.util.concurrent.atomic.AtomicInteger;
2428
import java.util.function.Predicate;
2529

2630
class StreamConsumerBuilder implements ConsumerBuilder {
@@ -88,7 +92,7 @@ public ConsumerBuilder name(String name) {
8892

8993
@Override
9094
public ConsumerBuilder singleActiveConsumer() {
91-
this.subscriptionProperties.put("single-active-consumer", "true");
95+
this.subscriptionProperties.put(Utils.SUBSCRIPTION_PROPERTY_SAC, "true");
9296
return this;
9397
}
9498

@@ -200,8 +204,14 @@ public Consumer build() {
200204
handler = this.messageHandler;
201205
} else {
202206
this.filterConfiguration.validate();
207+
AtomicInteger i = new AtomicInteger(0);
208+
this.filterConfiguration.filterValues.forEach(
209+
v ->
210+
this.subscriptionProperties.put(
211+
SUBSCRIPTION_PROPERTY_FILTER_PREFIX + i.getAndIncrement(), v));
203212
this.subscriptionProperties.put(
204-
"filters", String.join(",", this.filterConfiguration.filterValues));
213+
SUBSCRIPTION_PROPERTY_MATCH_UNFILTERED,
214+
this.filterConfiguration.matchUnfiltered ? "true" : "false");
205215
final Predicate<Message> filter = this.filterConfiguration.filter;
206216
final MessageHandler delegate = this.messageHandler;
207217
handler =
@@ -231,7 +241,7 @@ public Consumer build() {
231241
environment.addConsumer((StreamConsumer) consumer);
232242
} else {
233243
if (Utils.isSac(this.subscriptionProperties)) {
234-
this.subscriptionProperties.put("super-stream", this.superStream);
244+
this.subscriptionProperties.put(Utils.SUBSCRIPTION_PROPERTY_SUPER_STREAM, this.superStream);
235245
}
236246
consumer =
237247
new SuperStreamConsumer(this, this.superStream, this.environment, trackingConfiguration);
@@ -371,14 +381,14 @@ private static final class DefaultFilterConfiguration implements FilterConfigura
371381
private final StreamConsumerBuilder builder;
372382
private List<String> filterValues;
373383
private Predicate<Message> filter;
384+
private boolean matchUnfiltered = false;
374385

375386
private DefaultFilterConfiguration(StreamConsumerBuilder builder) {
376387
this.builder = builder;
377388
}
378389

379390
@Override
380391
public FilterConfiguration values(String... filterValues) {
381-
// FIXME: check for ',' in values
382392
this.filterValues = Arrays.asList(filterValues);
383393
return this;
384394
}
@@ -389,6 +399,18 @@ public FilterConfiguration filter(Predicate<Message> filter) {
389399
return this;
390400
}
391401

402+
@Override
403+
public FilterConfiguration matchUnfiltered() {
404+
this.matchUnfiltered = true;
405+
return this;
406+
}
407+
408+
@Override
409+
public FilterConfiguration matchUnfiltered(boolean matchUnfiltered) {
410+
this.matchUnfiltered = matchUnfiltered;
411+
return this;
412+
}
413+
392414
@Override
393415
public ConsumerBuilder builder() {
394416
return this.builder;

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

+4
Original file line numberDiff line numberDiff line change
@@ -744,6 +744,10 @@ static <T> T locatorOperation(
744744
return result;
745745
}
746746

747+
boolean filteringSupported() {
748+
return this.locatorOperation(Client::filteringSupported);
749+
}
750+
747751
Clock clock() {
748752
return this.clock;
749753
}

src/main/java/com/rabbitmq/stream/impl/StreamProducer.java

+15-3
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,9 @@ class StreamProducer implements Producer {
9494
Duration enqueueTimeout,
9595
Function<Message, String> filterValueExtractor,
9696
StreamEnvironment environment) {
97+
if (filterValueExtractor != null && !environment.filteringSupported()) {
98+
throw new IllegalArgumentException("Filtering is not supported by the broker");
99+
}
97100
this.id = ID_SEQUENCE.getAndIncrement();
98101
this.environment = environment;
99102
this.name = name;
@@ -606,8 +609,12 @@ private static final class OutboundMessageFilterValueWriterCallback
606609
public int write(ByteBuf bb, Object entity, long publishingId) {
607610
AccumulatedEntity accumulatedEntity = (AccumulatedEntity) entity;
608611
String filterValue = accumulatedEntity.filterValue();
609-
bb.writeShort(filterValue.length());
610-
bb.writeBytes(filterValue.getBytes(StandardCharsets.UTF_8));
612+
if (filterValue == null) {
613+
bb.writeShort(-1);
614+
} else {
615+
bb.writeShort(filterValue.length());
616+
bb.writeBytes(filterValue.getBytes(StandardCharsets.UTF_8));
617+
}
611618
Codec.EncodedMessage messageToPublish =
612619
(Codec.EncodedMessage) accumulatedEntity.encodedEntity();
613620
bb.writeInt(messageToPublish.getSize());
@@ -619,7 +626,12 @@ public int write(ByteBuf bb, Object entity, long publishingId) {
619626
public int fragmentLength(Object entity) {
620627
AccumulatedEntity accumulatedEntity = (AccumulatedEntity) entity;
621628
Codec.EncodedMessage message = (Codec.EncodedMessage) accumulatedEntity.encodedEntity();
622-
return 8 + 2 + accumulatedEntity.filterValue().length() + 4 + message.getSize();
629+
String filterValue = accumulatedEntity.filterValue();
630+
if (filterValue == null) {
631+
return 8 + 2 + 4 + message.getSize();
632+
} else {
633+
return 8 + 2 + accumulatedEntity.filterValue().length() + 4 + message.getSize();
634+
}
623635
}
624636
}
625637
}

src/main/java/com/rabbitmq/stream/impl/Utils.java

+14
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ final class Utils {
6565
private static final Logger LOGGER = LoggerFactory.getLogger(Utils.class);
6666
private static final Map<Short, String> CONSTANT_LABELS;
6767

68+
static final String SUBSCRIPTION_PROPERTY_SAC = "single-active-consumer";
69+
static final String SUBSCRIPTION_PROPERTY_SUPER_STREAM = "super-stream";
70+
static final String SUBSCRIPTION_PROPERTY_FILTER_PREFIX = "filter.";
71+
static final String SUBSCRIPTION_PROPERTY_MATCH_UNFILTERED = "match-unfiltered";
72+
6873
static {
6974
Map<Short, String> labels = new HashMap<>();
7075
Arrays.stream(Constants.class.getDeclaredFields())
@@ -119,6 +124,15 @@ static boolean isSac(Map<String, String> properties) {
119124
}
120125
}
121126

127+
static boolean filteringEnabled(Map<String, String> properties) {
128+
if (properties == null || properties.isEmpty()) {
129+
return false;
130+
} else {
131+
return properties.keySet().stream()
132+
.anyMatch(k -> k.startsWith(SUBSCRIPTION_PROPERTY_FILTER_PREFIX));
133+
}
134+
}
135+
122136
static short encodeRequestCode(Short code) {
123137
return code;
124138
}

src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ public class StreamPerfTest implements Callable<Integer> {
123123

124124
@CommandLine.Mixin
125125
private final CommandLine.HelpCommand helpCommand = new CommandLine.HelpCommand();
126+
126127
// for testing
127128
private final AddressResolver addressResolver;
128129
private final PrintWriter err, out;
@@ -477,6 +478,7 @@ static class InstanceSyncOptions {
477478
private List<Monitoring> monitorings;
478479
private volatile Environment environment;
479480
private volatile EventLoopGroup eventLoopGroup;
481+
480482
// constructor for completion script generation
481483
public StreamPerfTest() {
482484
this(null, null, null, null);

src/test/java/com/rabbitmq/stream/impl/ClientTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1093,7 +1093,7 @@ public int fragmentLength(Object obj) {
10931093
stream,
10941094
OffsetSpecification.first(),
10951095
1,
1096-
Collections.singletonMap("filters", newFilterValue));
1096+
Collections.singletonMap("filter.1", newFilterValue));
10971097

10981098
int expectedCount = filterValueCount.get(newFilterValue).get();
10991099
waitAtMost(() -> filteredConsumedMessageCount.get() == expectedCount);

0 commit comments

Comments
 (0)