Skip to content

Commit cb86a0d

Browse files
committed
KAFKA-5876: KIP-216 Part 4, Apply InvalidStateStorePartitionException for Interactive Queries (#10657)
KIP-216, part 4 - apply InvalidStateStorePartitionException Reviewers: Anna Sophie Blee-Goldman <[email protected]>
1 parent d892501 commit cb86a0d

File tree

5 files changed

+22
-3
lines changed

5 files changed

+22
-3
lines changed

docs/streams/upgrade-guide.html

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ <h3><a id="streams_api_changes_300" href="#streams_api_changes_300">Streams API
101101
<ul>
102102
<li> <code>UnknownStateStoreException</code>: If the specified store name does not exist in the topology, an <code>UnknownStateStoreException</code> will be thrown instead of the former <code>InvalidStateStoreException</code>.</li>
103103
<li> <code>StreamsNotStartedException</code>: If Streams state is <code>CREATED</code>, a <code>StreamsNotStartedException</code> will be thrown.</li>
104+
<li> <code>InvalidStateStorePartitionException</code>: If the specified partition does not exist, a <code>InvalidStateStorePartitionException</code> will be thrown.</li>
104105
</ul>
105106
<p>
106107
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors">KIP-216</a> for more information.

streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
4747
import org.apache.kafka.streams.errors.TopologyException;
4848
import org.apache.kafka.streams.errors.UnknownStateStoreException;
49+
import org.apache.kafka.streams.errors.InvalidStateStorePartitionException;
4950
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
5051
import org.apache.kafka.streams.processor.Processor;
5152
import org.apache.kafka.streams.processor.StateRestoreListener;
@@ -1526,6 +1527,7 @@ public <K> KeyQueryMetadata queryMetadataForKey(final String storeName,
15261527
* @throws StreamsNotStartedException If Streams state is {@link KafkaStreams.State#CREATED CREATED}. Just
15271528
* retry and wait until to {@link KafkaStreams.State#RUNNING RUNNING}.
15281529
* @throws UnknownStateStoreException If the specified store name does not exist in the topology.
1530+
* @throws InvalidStateStorePartitionException If the specified partition does not exist.
15291531
* @throws InvalidStateStoreException If the Streams instance isn't in a queryable state.
15301532
* If the store's type does not match the QueryableStoreType,
15311533
* the Streams instance is not in a queryable state with respect

streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.apache.kafka.streams.StoreQueryParameters;
2020
import org.apache.kafka.streams.errors.InvalidStateStoreException;
21+
import org.apache.kafka.streams.errors.InvalidStateStorePartitionException;
2122
import org.apache.kafka.streams.state.QueryableStoreType;
2223

2324
import java.util.ArrayList;
@@ -57,7 +58,7 @@ public <T> List<T> stores(final String storeName,
5758
}
5859
if (allStores.isEmpty()) {
5960
if (storeQueryParameters.partition() != null) {
60-
throw new InvalidStateStoreException(
61+
throw new InvalidStateStorePartitionException(
6162
String.format("The specified partition %d for store %s does not exist.",
6263
storeQueryParameters.partition(),
6364
storeName));

streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import static org.easymock.EasyMock.mock;
7575
import static org.easymock.EasyMock.notNull;
7676
import static org.easymock.EasyMock.reset;
77+
import static org.hamcrest.CoreMatchers.either;
7778
import static org.hamcrest.CoreMatchers.equalTo;
7879
import static org.hamcrest.CoreMatchers.is;
7980
import static org.hamcrest.CoreMatchers.notNullValue;
@@ -439,6 +440,8 @@ public void shouldReturnUUIDsWithStringPrefix() {
439440
final UUID uuid1 = UUID.randomUUID();
440441
final UUID uuid2 = UUID.randomUUID();
441442
final String prefix = uuid1.toString().substring(0, 4);
443+
final int numMatches = uuid2.toString().substring(0, 4).equals(prefix) ? 2 : 1;
444+
442445
entries.add(new KeyValue<>(
443446
new Bytes(uuidSerializer.serialize(null, uuid1)),
444447
stringSerializer.serialize(null, "a")));
@@ -460,8 +463,12 @@ public void shouldReturnUUIDsWithStringPrefix() {
460463
numberOfKeysReturned++;
461464
}
462465

463-
assertThat(numberOfKeysReturned, is(1));
464-
assertThat(valuesWithPrefix.get(0), is("a"));
466+
assertThat(numberOfKeysReturned, is(numMatches));
467+
if (numMatches == 2) {
468+
assertThat(valuesWithPrefix.get(0), either(is("a")).or(is("b")));
469+
} else {
470+
assertThat(valuesWithPrefix.get(0), is("a"));
471+
}
465472
}
466473

467474
@Test

streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.kafka.common.serialization.Serdes;
2121
import org.apache.kafka.streams.StoreQueryParameters;
2222
import org.apache.kafka.streams.errors.InvalidStateStoreException;
23+
import org.apache.kafka.streams.errors.InvalidStateStorePartitionException;
2324
import org.apache.kafka.streams.state.NoOpWindowStore;
2425
import org.apache.kafka.streams.state.QueryableStoreTypes;
2526
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
@@ -82,6 +83,13 @@ public void shouldThrowInvalidStoreExceptionIfNoStoreOfTypeFound() {
8283
assertThrows(InvalidStateStoreException.class, () -> wrappingStoreProvider.stores("doesn't exist", QueryableStoreTypes.<String, String>keyValueStore()));
8384
}
8485

86+
@Test
87+
public void shouldThrowInvalidStoreExceptionIfNoPartitionFound() {
88+
final int invalidPartition = numStateStorePartitions + 1;
89+
wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.<String, String>keyValueStore()).withPartition(invalidPartition));
90+
assertThrows(InvalidStateStorePartitionException.class, () -> wrappingStoreProvider.stores("kv", QueryableStoreTypes.<String, String>keyValueStore()));
91+
}
92+
8593
@Test
8694
public void shouldReturnAllStoreWhenQueryWithoutPartition() {
8795
wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.<String, String>keyValueStore()));

0 commit comments

Comments
 (0)