Skip to content

Commit 2ad8b06

Browse files
authored
GH-3034: IQ API's for remote state stores
Fixes: #3034 * Kafka Streams interactive query API's for accessing remote state stores. * Addressing PR review
1 parent 697d3c4 commit 2ad8b06

File tree

3 files changed

+175
-13
lines changed

3 files changed

+175
-13
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/streams.adoc

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -363,8 +363,6 @@ public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> stor
363363

364364
When calling this method, the user can specifially ask for the proper state store type, as we have done in the above example.
365365

366-
NOTE: `KafkaStreamsInteractiveQueryService` API in Spring for Apache Kafka only supports providing access to local key-value stores at the moment.
367-
368366
=== Retrying State Store Retrieval
369367

370368
When trying to retrieve the state store using the `KafkaStreamsInteractiveQueryService`, there is a chance that the state store might not be found for various reasons.
@@ -388,6 +386,49 @@ public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(S
388386
}
389387
----
390388

389+
=== Querying Remote State Stores
390+
391+
The API shown above for retrieving the state store - `retrieveQueryableStore` is intended for locally available key-value state stores.
392+
In productions settings, Kafka Streams applications are most likely distributed based on the number of partitions.
393+
If a topic has four partitions and there are four instances of the same Kafka Streams processor running, then each instance maybe responsible for processing a single partition from the topic.
394+
In this scenario, calling `retrieveQueryableStore` may not give the correct result that an instance is looking for, although it might return a valid store.
395+
Let's assume that the topic with four partitions has data about various keys and a single partition is always responsible for a specific key.
396+
If the instance that is calling `retrieveQueryableStore` is looking for information about a key that this instance does not host, then it will not receive any data.
397+
This is because the current Kafka Streams instance does not know anything about this key.
398+
To fix this, the calling instance first needs to make sure that they have the host information for the Kafka Streams processor instance where the particular key is hosted.
399+
This can be retrieved from any Kafka Streams instance under the same `application.id` as below.
400+
401+
[source, java]
402+
----
403+
@Autowired
404+
private KafkaStreamsInteractiveQueryService interactiveQueryService;
405+
406+
HostInfo kafkaStreamsApplicationHostInfo = this.interactiveQueryService.getKafkaStreamsApplicationHostInfo("app-store", 12345, new IntegerSerializer());
407+
----
408+
409+
In the example code above, the calling instance is querying for a particular key `12345` from the state-store named `app-store`.
410+
The API also needs a corresponding key serializer, which in this case is the `IntegerSerializer`.
411+
Kafka Streams looks through all it's instances under the same `application.id` and tries to find which instance hosts this particular key,
412+
Once found, it returns that host information as a `HostInfo` object.
413+
414+
This is how the API looks like:
415+
416+
[source, java]
417+
----
418+
public <K> HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer<K> serializer)
419+
----
420+
421+
When using multiple instances of the Kafka Streams processors of the same `application.id` in a distributed way like this, the application is supposed to provide an RPC layer where the state stores can be queried over an RPC endpoint such as a REST one.
422+
See this https://kafka.apache.org/36/documentation/streams/developer-guide/interactive-queries.html#querying-remote-state-stores-for-the-entire-app[article] for more details on this.
423+
When using Spring for Apache Kafka, it is very easy to add a Spring based REST endpoint by using the spring-web technologies.
424+
Once there is a REST endpoint, then that can be used to query the state stores from any Kafka Streams instance, given the `HostInfo` where the key is hosted is known to the instance.
425+
426+
If the key hosting the instance is the current instance, then the application does not need to call the RPC mechanism, but rather make an in-JVM call.
427+
However, the trouble is that an application may not know that the instance that is making the call is where the key is hosted because a particular server may lose a partition due to a consumer rebalance.
428+
To fix this issue, `KafkaStreamsInteractiveQueryService` provides a convenient API for querying the current host information via an API method `getCurrentKafkaStreamsApplicationHostInfo()` that returns the current `HostInfo`.
429+
The idea is that the application can first acquire information about where the key is held, and then compare the `HostInfo` with the one about the current instance.
430+
If the `HostInfo` data matches, then it can proceed with a simple JVM call via the `retrieveQueryableStore`, otherwise go with the RPC option.
431+
391432
[[kafka-streams-example]]
392433
== Kafka Streams Example
393434

spring-kafka/src/main/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryService.java

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,20 @@
1616

1717
package org.springframework.kafka.streams;
1818

19+
import java.util.Properties;
20+
21+
import org.apache.kafka.common.serialization.Serializer;
1922
import org.apache.kafka.streams.KafkaStreams;
23+
import org.apache.kafka.streams.KeyQueryMetadata;
2024
import org.apache.kafka.streams.StoreQueryParameters;
25+
import org.apache.kafka.streams.state.HostInfo;
2126
import org.apache.kafka.streams.state.QueryableStoreType;
2227

2328
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
29+
import org.springframework.lang.Nullable;
2430
import org.springframework.retry.support.RetryTemplate;
2531
import org.springframework.util.Assert;
32+
import org.springframework.util.StringUtils;
2633

2734
/**
2835
* Provide a wrapper API around the interactive query stores in Kafka Streams.
@@ -75,20 +82,80 @@ public void setRetryTemplate(RetryTemplate retryTemplate) {
7582
* @return queryable store.
7683
*/
7784
public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> storeType) {
85+
populateKafkaStreams();
86+
StoreQueryParameters<T> storeQueryParams = StoreQueryParameters.fromNameAndType(storeName, storeType);
87+
88+
return this.retryTemplate.execute(context -> {
89+
try {
90+
return this.kafkaStreams.store(storeQueryParams);
91+
}
92+
catch (Exception e) {
93+
throw new IllegalStateException("Error retrieving state store: " + storeName, e);
94+
}
95+
});
96+
}
97+
98+
private void populateKafkaStreams() {
7899
if (this.kafkaStreams == null) {
79100
this.kafkaStreams = this.streamsBuilderFactoryBean.getKafkaStreams();
80101
}
81102
Assert.notNull(this.kafkaStreams, "KafkaStreams cannot be null. " +
82103
"Make sure that the corresponding StreamsBuilderFactoryBean has started properly.");
83-
StoreQueryParameters<T> storeQueryParams = StoreQueryParameters.fromNameAndType(storeName, storeType);
104+
}
105+
106+
/**
107+
* Retrieve the current {@link HostInfo} where this Kafka Streams application is running on.
108+
* This {link @HostInfo} is different from the Kafka `bootstrap.server` property, and is based on
109+
* the Kafka Streams configuration property `application.server` where user-defined REST
110+
* endpoints can be invoked per each Kafka Streams application instance.
111+
* If this property - `application.server` - is not available from the end-user application, then null is returned.
112+
* @return the current {@link HostInfo}
113+
*/
114+
@Nullable
115+
public HostInfo getCurrentKafkaStreamsApplicationHostInfo() {
116+
Properties streamsConfiguration = this.streamsBuilderFactoryBean
117+
.getStreamsConfiguration();
118+
if (streamsConfiguration != null && streamsConfiguration.containsKey("application.server")) {
119+
String applicationServer = (String) streamsConfiguration.get("application.server");
120+
String[] appServerComponents = StringUtils.split(applicationServer, ":");
121+
if (appServerComponents != null) {
122+
return new HostInfo(appServerComponents[0], Integer.parseInt(appServerComponents[1]));
123+
}
124+
}
125+
return null;
126+
}
84127

128+
/**
129+
* Retrieve the {@link HostInfo} where the provided store and key are hosted on. This may
130+
* not be the current host that is running the application. Kafka Streams will look
131+
* through all the consumer instances under the same application id and retrieves the
132+
* proper host. Note that the end user applications must provide `application.server` as a
133+
* configuration property for all the application instances when calling this method.
134+
* If this is not available, then null maybe returned.
135+
* @param <K> generic type for key
136+
* @param store store name
137+
* @param key key to look for
138+
* @param serializer {@link Serializer} for the key
139+
* @return the {@link HostInfo} where the key for the provided store is hosted currently
140+
*/
141+
public <K> HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer<K> serializer) {
142+
populateKafkaStreams();
85143
return this.retryTemplate.execute(context -> {
144+
Throwable throwable = null;
86145
try {
87-
return this.kafkaStreams.store(storeQueryParams);
146+
KeyQueryMetadata keyQueryMetadata = this.kafkaStreams.queryMetadataForKey(store, key, serializer);
147+
if (keyQueryMetadata != null) {
148+
return keyQueryMetadata.activeHost();
149+
}
88150
}
89151
catch (Exception e) {
90-
throw new IllegalStateException("Error retrieving state store: " + storeName, e);
152+
throwable = e;
91153
}
154+
// In addition to the obvious case of a valid exception above, if keyQueryMetadata was null for any
155+
// transient reasons, let the retry kick in by forcing an exception below.
156+
throw new IllegalStateException(
157+
"Error when retrieving state store.", throwable != null ? throwable :
158+
new Throwable("KeyQueryMetadata is not yet available."));
92159
});
93160
}
94161

spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryServiceTests.java

Lines changed: 62 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,12 @@
2727
import java.util.HashMap;
2828
import java.util.Map;
2929
import java.util.concurrent.CompletableFuture;
30+
import java.util.concurrent.ExecutionException;
3031
import java.util.concurrent.TimeUnit;
32+
import java.util.concurrent.TimeoutException;
3133

3234
import org.apache.kafka.clients.consumer.ConsumerRecord;
35+
import org.apache.kafka.common.serialization.IntegerSerializer;
3336
import org.apache.kafka.common.serialization.Serdes;
3437
import org.apache.kafka.streams.KafkaStreams;
3538
import org.apache.kafka.streams.KeyValue;
@@ -40,6 +43,7 @@
4043
import org.apache.kafka.streams.kstream.KStream;
4144
import org.apache.kafka.streams.kstream.Materialized;
4245
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
46+
import org.apache.kafka.streams.state.HostInfo;
4347
import org.apache.kafka.streams.state.QueryableStoreTypes;
4448
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
4549
import org.junit.jupiter.api.Test;
@@ -108,11 +112,7 @@ class KafkaStreamsInteractiveQueryServiceTests {
108112

109113
@Test
110114
void retrieveQueryableStore() throws Exception {
111-
this.kafkaTemplate.sendDefault(123, "123");
112-
this.kafkaTemplate.flush();
113-
114-
ConsumerRecord<?, String> result = resultFuture.get(600, TimeUnit.SECONDS);
115-
assertThat(result).isNotNull();
115+
ensureKafkaStreamsProcessorIsUpAndRunning();
116116

117117
ReadOnlyKeyValueStore<Object, Object> objectObjectReadOnlyKeyValueStore = this.interactiveQueryService
118118
.retrieveQueryableStore(STATE_STORE,
@@ -121,14 +121,18 @@ void retrieveQueryableStore() throws Exception {
121121
assertThat((Long) objectObjectReadOnlyKeyValueStore.get(123)).isGreaterThanOrEqualTo(1);
122122
}
123123

124-
@SuppressWarnings("unchecked")
125-
@Test
126-
void retrieveNonExistentStateStoreAndVerifyRetries() throws Exception {
124+
private void ensureKafkaStreamsProcessorIsUpAndRunning() throws InterruptedException, ExecutionException, TimeoutException {
127125
this.kafkaTemplate.sendDefault(123, "123");
128126
this.kafkaTemplate.flush();
129127

130128
ConsumerRecord<?, String> result = resultFuture.get(600, TimeUnit.SECONDS);
131129
assertThat(result).isNotNull();
130+
}
131+
132+
@SuppressWarnings("unchecked")
133+
@Test
134+
void retrieveNonExistentStateStoreAndVerifyRetries() throws Exception {
135+
ensureKafkaStreamsProcessorIsUpAndRunning();
132136

133137
assertThat(this.streamsBuilderFactoryBean.getKafkaStreams()).isNotNull();
134138
KafkaStreams kafkaStreams = spy(this.streamsBuilderFactoryBean.getKafkaStreams());
@@ -148,6 +152,55 @@ void retrieveNonExistentStateStoreAndVerifyRetries() throws Exception {
148152
verify(kafkaStreams, times(3)).store(any(StoreQueryParameters.class));
149153
}
150154

155+
@Test
156+
void currentHostInfo() {
157+
HostInfo currentKafkaStreamsApplicationHostInfo =
158+
this.interactiveQueryService.getCurrentKafkaStreamsApplicationHostInfo();
159+
assertThat(currentKafkaStreamsApplicationHostInfo.host()).isEqualTo("localhost");
160+
assertThat(currentKafkaStreamsApplicationHostInfo.port()).isEqualTo(8080);
161+
}
162+
163+
@Test
164+
void hostInfoForKeyAndStore() throws Exception {
165+
ensureKafkaStreamsProcessorIsUpAndRunning();
166+
167+
HostInfo kafkaStreamsApplicationHostInfo =
168+
this.interactiveQueryService.getKafkaStreamsApplicationHostInfo(STATE_STORE, 123,
169+
new IntegerSerializer());
170+
// In real applications, the above call may return a different server than what is configured
171+
// via application.server on the Kafka Streams where the call was invoked. However, in the case
172+
// of this test, we only have a single Kafka Streams instance and even there, we provide a mock
173+
// value for application.server (localhost:8080). Because of that, that is what we are verifying against.
174+
assertThat(kafkaStreamsApplicationHostInfo.host()).isEqualTo("localhost");
175+
assertThat(kafkaStreamsApplicationHostInfo.port()).isEqualTo(8080);
176+
}
177+
178+
@Test
179+
void hostInfoForNonExistentKeyAndStateStore() throws Exception {
180+
ensureKafkaStreamsProcessorIsUpAndRunning();
181+
182+
assertThat(this.streamsBuilderFactoryBean.getKafkaStreams()).isNotNull();
183+
KafkaStreams kafkaStreams = spy(this.streamsBuilderFactoryBean.getKafkaStreams());
184+
assertThat(kafkaStreams).isNotNull();
185+
186+
Field kafkaStreamsField = KafkaStreamsInteractiveQueryService.class.getDeclaredField("kafkaStreams");
187+
kafkaStreamsField.setAccessible(true);
188+
kafkaStreamsField.set(interactiveQueryService, kafkaStreams);
189+
190+
IntegerSerializer serializer = new IntegerSerializer();
191+
192+
assertThatExceptionOfType(IllegalStateException.class)
193+
.isThrownBy(() -> {
194+
this.interactiveQueryService.getKafkaStreamsApplicationHostInfo(NON_EXISTENT_STORE, 12345,
195+
serializer);
196+
})
197+
.withMessageContaining("Error when retrieving state store.");
198+
199+
verify(kafkaStreams, times(3)).queryMetadataForKey(NON_EXISTENT_STORE, 12345,
200+
serializer);
201+
}
202+
203+
151204
@Configuration
152205
@EnableKafka
153206
@EnableKafkaStreams
@@ -204,6 +257,7 @@ public KafkaStreamsConfiguration kStreamsConfigs() {
204257
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
205258
WallclockTimestampExtractor.class.getName());
206259
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100");
260+
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:8080");
207261
return new KafkaStreamsConfiguration(props);
208262
}
209263

0 commit comments

Comments
 (0)