Skip to content

GH-3034: IQ API's for remote state stores #3089

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions spring-kafka-docs/src/main/antora/modules/ROOT/pages/streams.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,6 @@ public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> stor

When calling this method, the user can specifially ask for the proper state store type, as we have done in the above example.

NOTE: `KafkaStreamsInteractiveQueryService` API in Spring for Apache Kafka only supports providing access to local key-value stores at the moment.

=== Retrying State Store Retrieval

When trying to retrieve the state store using the `KafkaStreamsInteractiveQueryService`, there is a chance that the state store might not be found for various reasons.
Expand All @@ -388,6 +386,49 @@ public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(S
}
----

=== Querying Remote State Stores

The API shown above for retrieving the state store - `retrieveQueryableStore` is intended for locally available key-value state stores.
In productions settings, Kafka Streams applications are most likely distributed based on the number of partitions.
If a topic has four partitions and there are four instances of the same Kafka Streams processor running, then each instance maybe responsible for processing a single partition from the topic.
In this scenario, calling `retrieveQueryableStore` may not give the correct result that an instance is looking for, although it might return a valid store.
Let's assume that the topic with four partitions has data about various keys and a single partition is always responsible for a specific key.
If the instance that is calling `retrieveQueryableStore` is looking for information about a key that this instance does not host, then it will not receive any data.
This is because the current Kafka Streams instance does not know anything about this key.
To fix this, the calling instance first needs to make sure that they have the host information for the Kafka Streams processor instance where the particular key is hosted.
This can be retrieved from any Kafka Streams instance under the same `application.id` as below.

[source, java]
----
@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;

HostInfo kafkaStreamsApplicationHostInfo = this.interactiveQueryService.getKafkaStreamsApplicationHostInfo("app-store", 12345, new IntegerSerializer());
----

In the example code above, the calling instance is querying for a particular key `12345` from the state-store named `app-store`.
The API also needs a corresponding key serializer, which in this case is the `IntegerSerializer`.
Kafka Streams looks through all it's instances under the same `application.id` and tries to find which instance hosts this particular key,
Once found, it returns that host information as a `HostInfo` object.

This is how the API looks like:

[source, java]
----
public <K> HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer<K> serializer)
----

When using multiple instances of the Kafka Streams processors of the same `application.id` in a distributed way like this, the application is supposed to provide an RPC layer where the state stores can be queried over an RPC endpoint such as a REST one.
See this https://kafka.apache.org/36/documentation/streams/developer-guide/interactive-queries.html#querying-remote-state-stores-for-the-entire-app[article] for more details on this.
When using Spring for Apache Kafka, it is very easy to add a Spring based REST endpoint by using the spring-web technologies.
Once there is a REST endpoint, then that can be used to query the state stores from any Kafka Streams instance, given the `HostInfo` where the key is hosted is known to the instance.

If the key hosting the instance is the current instance, then the application does not need to call the RPC mechanism, but rather make an in-JVM call.
However, the trouble is that an application may not know that the instance that is making the call is where the key is hosted because a particular server may lose a partition due to a consumer rebalance.
To fix this issue, `KafkaStreamsInteractiveQueryService` provides a convenient API for querying the current host information via an API method `getCurrentKafkaStreamsApplicationHostInfo()` that returns the current `HostInfo`.
The idea is that the application can first acquire information about where the key is held, and then compare the `HostInfo` with the one about the current instance.
If the `HostInfo` data matches, then it can proceed with a simple JVM call via the `retrieveQueryableStore`, otherwise go with the RPC option.

[[kafka-streams-example]]
== Kafka Streams Example

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,20 @@

package org.springframework.kafka.streams;

import java.util.Properties;

import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.QueryableStoreType;

import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.lang.Nullable;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/**
* Provide a wrapper API around the interactive query stores in Kafka Streams.
Expand Down Expand Up @@ -75,20 +82,80 @@ public void setRetryTemplate(RetryTemplate retryTemplate) {
* @return queryable store.
*/
public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> storeType) {
populateKafkaStreams();
StoreQueryParameters<T> storeQueryParams = StoreQueryParameters.fromNameAndType(storeName, storeType);

return this.retryTemplate.execute(context -> {
try {
return this.kafkaStreams.store(storeQueryParams);
}
catch (Exception e) {
throw new IllegalStateException("Error retrieving state store: " + storeName, e);
}
});
}

private void populateKafkaStreams() {
if (this.kafkaStreams == null) {
this.kafkaStreams = this.streamsBuilderFactoryBean.getKafkaStreams();
}
Assert.notNull(this.kafkaStreams, "KafkaStreams cannot be null. " +
"Make sure that the corresponding StreamsBuilderFactoryBean has started properly.");
StoreQueryParameters<T> storeQueryParams = StoreQueryParameters.fromNameAndType(storeName, storeType);
}

/**
* Retrieve the current {@link HostInfo} where this Kafka Streams application is running on.
* This {link @HostInfo} is different from the Kafka `bootstrap.server` property, and is based on
* the Kafka Streams configuration property `application.server` where user-defined REST
* endpoints can be invoked per each Kafka Streams application instance.
* If this property - `application.server` - is not available from the end-user application, then null is returned.
* @return the current {@link HostInfo}
*/
@Nullable
public HostInfo getCurrentKafkaStreamsApplicationHostInfo() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Nullable

Properties streamsConfiguration = this.streamsBuilderFactoryBean
.getStreamsConfiguration();
if (streamsConfiguration != null && streamsConfiguration.containsKey("application.server")) {
String applicationServer = (String) streamsConfiguration.get("application.server");
String[] appServerComponents = StringUtils.split(applicationServer, ":");
if (appServerComponents != null) {
return new HostInfo(appServerComponents[0], Integer.parseInt(appServerComponents[1]));
}
}
return null;
}

/**
* Retrieve the {@link HostInfo} where the provided store and key are hosted on. This may
* not be the current host that is running the application. Kafka Streams will look
* through all the consumer instances under the same application id and retrieves the
* proper host. Note that the end user applications must provide `application.server` as a
* configuration property for all the application instances when calling this method.
* If this is not available, then null maybe returned.
* @param <K> generic type for key
* @param store store name
* @param key key to look for
* @param serializer {@link Serializer} for the key
* @return the {@link HostInfo} where the key for the provided store is hosted currently
*/
public <K> HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer<K> serializer) {
populateKafkaStreams();
return this.retryTemplate.execute(context -> {
Throwable throwable = null;
try {
return this.kafkaStreams.store(storeQueryParams);
KeyQueryMetadata keyQueryMetadata = this.kafkaStreams.queryMetadataForKey(store, key, serializer);
if (keyQueryMetadata != null) {
return keyQueryMetadata.activeHost();
}
}
catch (Exception e) {
throw new IllegalStateException("Error retrieving state store: " + storeName, e);
throwable = e;
}
// In addition to the obvious case of a valid exception above, if keyQueryMetadata was null for any
// transient reasons, let the retry kick in by forcing an exception below.
throw new IllegalStateException(
"Error when retrieving state store.", throwable != null ? throwable :
new Throwable("KeyQueryMetadata is not yet available."));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
Expand All @@ -40,6 +43,7 @@
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -108,11 +112,7 @@ class KafkaStreamsInteractiveQueryServiceTests {

@Test
void retrieveQueryableStore() throws Exception {
this.kafkaTemplate.sendDefault(123, "123");
this.kafkaTemplate.flush();

ConsumerRecord<?, String> result = resultFuture.get(600, TimeUnit.SECONDS);
assertThat(result).isNotNull();
ensureKafkaStreamsProcessorIsUpAndRunning();

ReadOnlyKeyValueStore<Object, Object> objectObjectReadOnlyKeyValueStore = this.interactiveQueryService
.retrieveQueryableStore(STATE_STORE,
Expand All @@ -121,14 +121,18 @@ void retrieveQueryableStore() throws Exception {
assertThat((Long) objectObjectReadOnlyKeyValueStore.get(123)).isGreaterThanOrEqualTo(1);
}

@SuppressWarnings("unchecked")
@Test
void retrieveNonExistentStateStoreAndVerifyRetries() throws Exception {
private void ensureKafkaStreamsProcessorIsUpAndRunning() throws InterruptedException, ExecutionException, TimeoutException {
this.kafkaTemplate.sendDefault(123, "123");
this.kafkaTemplate.flush();

ConsumerRecord<?, String> result = resultFuture.get(600, TimeUnit.SECONDS);
assertThat(result).isNotNull();
}

@SuppressWarnings("unchecked")
@Test
void retrieveNonExistentStateStoreAndVerifyRetries() throws Exception {
ensureKafkaStreamsProcessorIsUpAndRunning();

assertThat(this.streamsBuilderFactoryBean.getKafkaStreams()).isNotNull();
KafkaStreams kafkaStreams = spy(this.streamsBuilderFactoryBean.getKafkaStreams());
Expand All @@ -148,6 +152,55 @@ void retrieveNonExistentStateStoreAndVerifyRetries() throws Exception {
verify(kafkaStreams, times(3)).store(any(StoreQueryParameters.class));
}

@Test
void currentHostInfo() {
HostInfo currentKafkaStreamsApplicationHostInfo =
this.interactiveQueryService.getCurrentKafkaStreamsApplicationHostInfo();
assertThat(currentKafkaStreamsApplicationHostInfo.host()).isEqualTo("localhost");
assertThat(currentKafkaStreamsApplicationHostInfo.port()).isEqualTo(8080);
}

@Test
void hostInfoForKeyAndStore() throws Exception {
ensureKafkaStreamsProcessorIsUpAndRunning();

HostInfo kafkaStreamsApplicationHostInfo =
this.interactiveQueryService.getKafkaStreamsApplicationHostInfo(STATE_STORE, 123,
new IntegerSerializer());
// In real applications, the above call may return a different server than what is configured
// via application.server on the Kafka Streams where the call was invoked. However, in the case
// of this test, we only have a single Kafka Streams instance and even there, we provide a mock
// value for application.server (localhost:8080). Because of that, that is what we are verifying against.
assertThat(kafkaStreamsApplicationHostInfo.host()).isEqualTo("localhost");
assertThat(kafkaStreamsApplicationHostInfo.port()).isEqualTo(8080);
}

@Test
void hostInfoForNonExistentKeyAndStateStore() throws Exception {
ensureKafkaStreamsProcessorIsUpAndRunning();

assertThat(this.streamsBuilderFactoryBean.getKafkaStreams()).isNotNull();
KafkaStreams kafkaStreams = spy(this.streamsBuilderFactoryBean.getKafkaStreams());
assertThat(kafkaStreams).isNotNull();

Field kafkaStreamsField = KafkaStreamsInteractiveQueryService.class.getDeclaredField("kafkaStreams");
kafkaStreamsField.setAccessible(true);
kafkaStreamsField.set(interactiveQueryService, kafkaStreams);

IntegerSerializer serializer = new IntegerSerializer();

assertThatExceptionOfType(IllegalStateException.class)
.isThrownBy(() -> {
this.interactiveQueryService.getKafkaStreamsApplicationHostInfo(NON_EXISTENT_STORE, 12345,
serializer);
})
.withMessageContaining("Error when retrieving state store.");

verify(kafkaStreams, times(3)).queryMetadataForKey(NON_EXISTENT_STORE, 12345,
serializer);
}


@Configuration
@EnableKafka
@EnableKafkaStreams
Expand Down Expand Up @@ -204,6 +257,7 @@ public KafkaStreamsConfiguration kStreamsConfigs() {
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
WallclockTimestampExtractor.class.getName());
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:8080");
return new KafkaStreamsConfiguration(props);
}

Expand Down