Skip to content

Commit f2e77e6

Browse files
authored
GH-2942: Kafka Streams queryable stores
Fixes: #2942 This is an initial iteration for providing a basic API around interactive query service in Kafka Streams. In this iteration, we introduce a single API for retrieving the queryable state store from the Kafka Streams topology, namely, `retrieveQueryableStore`. * Adding docs
1 parent e6affce commit f2e77e6

File tree

4 files changed

+402
-1
lines changed

4 files changed

+402
-1
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/streams.adoc

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,10 +315,61 @@ public DeadLetterPublishingRecoverer recoverer() {
315315

316316
Of course, the `recoverer()` bean can be your own implementation of `ConsumerRecordRecoverer`.
317317

318+
[[kafka-streams-iq-support]]
319+
== Interactive Query Support
320+
321+
Starting with version 3.2, Spring for Apache Kafka provides basic facilities required for interactive queries in Kafka Streams.
322+
Interactive queries are useful in stateful Kafka Streams applications since they provide a way to constantly query the stateful stores in the application.
323+
Thus, if an application wants to materialize the current view of the system under consideration, interactive queries provide a way to do that.
324+
To learn more about interacive queries, see this https://kafka.apache.org/36/documentation/streams/developer-guide/interactive-queries.html[article].
325+
The support in Spring for Apache Kafka is centered around an API called `KafkaStreamsInteractiveQueryService` which is a facade around interactive queries APIs in Kafka Streams library.
326+
An application can create an instance of this service as a bean and then later on use it to retrieve the state store by its name.
327+
328+
The following code snippet shows an example.
329+
330+
[source, java]
331+
----
332+
@Bean
333+
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
334+
final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
335+
new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
336+
return kafkaStreamsInteractiveQueryService;
337+
}
338+
----
339+
340+
Assuming that a Kafka Streams application has a state store called `app-store`, then that store can be retrieved via the `KafkStreamsInteractiveQuery` API as show below.
341+
342+
[source, java]
343+
----
344+
@Autowired
345+
private KafkaStreamsInteractiveQueryService interactiveQueryService;
346+
347+
ReadOnlyKeyValueStore<Object, Object> appStore = interactiveQueryService.retrieveQueryableStore("app-store", QueryableStoreTypes.keyValueStore());
348+
----
349+
350+
Once an application gains access to the state store, then it can query from it for key-value information.
351+
352+
In this case, the state store that the application uses is a read-only key value store.
353+
There are other types of state stores that a Kafka Streams application can use.
354+
For instance, if an application prefers to query a window based store, it can build that store in the Kafka Streams application business logic and then later on retrieve it.
355+
Because of this reason, the API to retrieve the queryable store in `KafkaStreamsInteractiveQueryService` has a generic store type signature, so that the end-user can assign the proper type.
356+
357+
Here is the type signature from the API.
358+
359+
[source, java]
360+
----
361+
public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> storeType)
362+
----
363+
364+
When calling this method, the user can specifially ask for the proper state store type, as we have done in the above example.
365+
366+
NOTE: `KafkaStreamsInteractiveQueryService` API in Spring for Apache Kafka only supports providing access to local key-value stores at the moment.
367+
368+
318369
[[kafka-streams-example]]
319370
== Kafka Streams Example
320371

321-
The following example combines all the topics we have covered in this chapter:
372+
The following example combines the various topics we have covered in this chapter:
322373

323374
[source, java]
324375
----

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,14 @@
77
This section covers the changes made from version 3.1 to version 3.2.
88
For changes in earlier version, see xref:appendix/change-history.adoc[Change History].
99

10+
[[x32-kafka-streams-iqs-support]]
11+
=== Kafka Streams Interactive Query Support
12+
13+
A new API `KafkaStreamsInteractiveQuerySupport` for accessing queryable stores used in Kafka Streams interactive queries.
14+
See xref:streams.adoc#kafka-streams-iq-support[Kafka Streams Interactive Support] for more details.
15+
16+
17+
1018
[[x32-tiss]]
1119
=== TransactionIdSuffixStrategy
1220

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright 2024-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.streams;
18+
19+
import org.apache.kafka.streams.KafkaStreams;
20+
import org.apache.kafka.streams.StoreQueryParameters;
21+
import org.apache.kafka.streams.state.QueryableStoreType;
22+
23+
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
24+
import org.springframework.retry.support.RetryTemplate;
25+
import org.springframework.util.Assert;
26+
27+
/**
28+
* Provide a wrapper API around the interactive query stores in Kafka Streams.
29+
* Using this API, an application can gain access to a named state store in the
30+
* {@link KafkaStreams} under consideration.
31+
*
32+
* @author Soby Chacko
33+
* @since 3.2
34+
*/
35+
public class KafkaStreamsInteractiveQueryService {
36+
37+
/**
38+
* {@link StreamsBuilderFactoryBean} that provides {@link KafkaStreams} where the state store is retrieved from.
39+
*/
40+
private final StreamsBuilderFactoryBean streamsBuilderFactoryBean;
41+
42+
/**
43+
* {@link RetryTemplate} to be used by the interative query service.
44+
*/
45+
private RetryTemplate retryTemplate = new RetryTemplate();
46+
47+
/**
48+
* Underlying {@link KafkaStreams} from {@link StreamsBuilderFactoryBean}.
49+
*/
50+
private volatile KafkaStreams kafkaStreams;
51+
52+
/**
53+
* Construct an instance for querying state stores from the KafkaStreams in the {@link StreamsBuilderFactoryBean}.
54+
* @param streamsBuilderFactoryBean {@link StreamsBuilderFactoryBean} for {@link KafkaStreams}.
55+
*/
56+
public KafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
57+
Assert.notNull(streamsBuilderFactoryBean, "StreamsBuildFactoryBean instance cannot be null.");
58+
this.streamsBuilderFactoryBean = streamsBuilderFactoryBean;
59+
}
60+
61+
/**
62+
* Custom {@link RetryTemplate} provided by the end users.
63+
* @param retryTemplate {@link RetryTemplate}
64+
*/
65+
public void setRetryTemplate(RetryTemplate retryTemplate) {
66+
Assert.notNull(retryTemplate, "The provided RetryTemplate instance must not be null");
67+
this.retryTemplate = retryTemplate;
68+
}
69+
70+
/**
71+
* Retrieve and return a queryable store by name created in the application.
72+
* @param storeName name of the queryable store
73+
* @param storeType type of the queryable store
74+
* @param <T> generic type for the queryable store
75+
* @return queryable store.
76+
*/
77+
public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> storeType) {
78+
if (this.kafkaStreams == null) {
79+
this.kafkaStreams = this.streamsBuilderFactoryBean.getKafkaStreams();
80+
}
81+
Assert.notNull(this.kafkaStreams, "KafkaStreams cannot be null. " +
82+
"Make sure that the corresponding StreamsBuilderFactoryBean has started properly.");
83+
StoreQueryParameters<T> storeQueryParams = StoreQueryParameters.fromNameAndType(storeName, storeType);
84+
85+
return this.retryTemplate.execute(context -> {
86+
try {
87+
return this.kafkaStreams.store(storeQueryParams);
88+
}
89+
catch (Exception e) {
90+
throw new IllegalStateException("Error retrieving state store: " + storeName, e);
91+
}
92+
});
93+
}
94+
95+
}

0 commit comments

Comments
 (0)