From 8f7165bc779bdafd090feb9953aa138f41a80531 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Thu, 29 Feb 2024 16:34:15 -0500 Subject: [PATCH 1/2] GH-3034: IQ API's for remote state stores Fixes: #3034 https://github.com/spring-projects/spring-kafka/issues/3034 * Kafka Streams interactive query API's for accessing remote state stores. --- .../antora/modules/ROOT/pages/streams.adoc | 45 +++++++++++- .../KafkaStreamsInteractiveQueryService.java | 71 ++++++++++++++++++- ...kaStreamsInteractiveQueryServiceTests.java | 70 +++++++++++++++--- 3 files changed, 173 insertions(+), 13 deletions(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/streams.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/streams.adoc index fd68c121d8..4fbd3f84a2 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/streams.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/streams.adoc @@ -363,8 +363,6 @@ public T retrieveQueryableStore(String storeName, QueryableStoreType stor When calling this method, the user can specifially ask for the proper state store type, as we have done in the above example. -NOTE: `KafkaStreamsInteractiveQueryService` API in Spring for Apache Kafka only supports providing access to local key-value stores at the moment. - === Retrying State Store Retrieval When trying to retrieve the state store using the `KafkaStreamsInteractiveQueryService`, there is a chance that the state store might not be found for various reasons. @@ -388,6 +386,49 @@ public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(S } ---- +=== Querying Remote State Stores + +The API shown above for retrieving the state store - `retrieveQueryableStore` is intended for locally available key-value state stores. +In productions settings, Kafka Streams applications are most likely distributed based on the number of partitions. +If a topic has four partitions and there are four instances of the same Kafka Streams processor running, then each instance maybe responsible for processing a single partition from the topic. +In this scenario, calling `retrieveQueryableStore` may not give the correct result that an instance is looking for, although it might return a valid store. +Let's assume that the topic with four partitions has data about various keys and a single partition is always responsible for a specific key. +If the instance that is calling `retrieveQueryableStore` is looking for information about a key that this instance does not host, then it will not receive any data. +This is because the current Kafka Streams instance does not know anything about this key. +To fix this, the calling instance first needs to make sure that they have the host information for the Kafka Streams processor instance where the particular key is hosted. +This can be retrieved from any Kafka Streams instance under the same `application.id` as below. + +[source, java] +---- +@Autowired +private KafkaStreamsInteractiveQueryService interactiveQueryService; + +HostInfo kafkaStreamsApplicationHostInfo = this.interactiveQueryService.getKafkaStreamsApplicationHostInfo("app-store", 12345, new IntegerSerializer()); +---- + +In the example code above, the calling instance is querying for a particular key `12345` from the state-store named `app-store`. +The API also needs a corresponding key serializer, which in this case is the `IntegerSerializer`. +Kafka Streams looks through all it's instances under the same `application.id` and tries to find which instance hosts this particular key, +Once found, it returns that host information as a `HostInfo` object. + +This is how the API looks like: + +[source, java] +---- +public HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer serializer) +---- + +When using multiple instances of the Kafka Streams processors of the same `application.id` in a distributed way like this, the application is supposed to provide an RPC layer where the state stores can be queried over an RPC endpoint such as a REST one. +See this https://kafka.apache.org/36/documentation/streams/developer-guide/interactive-queries.html#querying-remote-state-stores-for-the-entire-app[article] for more details on this. +When using Spring for Apache Kafka, it is very easy to add a Spring based REST endpoint by using the spring-web technologies. +Once there is a REST endpoint, then that can be used to query the state stores from any Kafka Streams instance, given the `HostInfo` where the key is hosted is known to the instance. + +If the key hosting the instance is the current instance, then the application does not need to call the RPC mechanism, but rather make an in-JVM call. +However, the trouble is that an application may not know that the instance that is making the call is where the key is hosted because a particular server may lose a partition due to a consumer rebalance. +To fix this issue, `KafkaStreamsInteractiveQueryService` provides a convenient API for querying the current host information via an API method `getCurrentKafkaStreamsApplicationHostInfo()` that returns the current `HostInfo`. +The idea is that the application can first acquire information about where the key is held, and then compare the `HostInfo` with the one about the current instance. +If the `HostInfo` data matches, then it can proceed with a simple JVM call via the `retrieveQueryableStore`, otherwise go with the RPC option. + [[kafka-streams-example]] == Kafka Streams Example diff --git a/spring-kafka/src/main/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryService.java b/spring-kafka/src/main/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryService.java index 05919ff952..0a9301ce02 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryService.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryService.java @@ -16,13 +16,19 @@ package org.springframework.kafka.streams; +import java.util.Properties; + +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyQueryMetadata; import org.apache.kafka.streams.StoreQueryParameters; +import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.QueryableStoreType; import org.springframework.kafka.config.StreamsBuilderFactoryBean; import org.springframework.retry.support.RetryTemplate; import org.springframework.util.Assert; +import org.springframework.util.StringUtils; /** * Provide a wrapper API around the interactive query stores in Kafka Streams. @@ -75,20 +81,79 @@ public void setRetryTemplate(RetryTemplate retryTemplate) { * @return queryable store. */ public T retrieveQueryableStore(String storeName, QueryableStoreType storeType) { + primeKafkaStreamsObject(); + StoreQueryParameters storeQueryParams = StoreQueryParameters.fromNameAndType(storeName, storeType); + + return this.retryTemplate.execute(context -> { + try { + return this.kafkaStreams.store(storeQueryParams); + } + catch (Exception e) { + throw new IllegalStateException("Error retrieving state store: " + storeName, e); + } + }); + } + + private void primeKafkaStreamsObject() { if (this.kafkaStreams == null) { this.kafkaStreams = this.streamsBuilderFactoryBean.getKafkaStreams(); } Assert.notNull(this.kafkaStreams, "KafkaStreams cannot be null. " + "Make sure that the corresponding StreamsBuilderFactoryBean has started properly."); - StoreQueryParameters storeQueryParams = StoreQueryParameters.fromNameAndType(storeName, storeType); + } + + /** + * Gets the current {@link HostInfo} where this Kafka Streams application is running on. + * This {link @HostInfo} is different from the Kafka `bootstrap.server` property, and is based on + * the Kafka Streams configuration property `application.server` where user-defined REST + * endpoints can be invoked per each Kafka Streams application instance. + * If this property - `application.server` - is not available from the end-user application, then null is returned. + * @return the current {@link HostInfo} + */ + public HostInfo getCurrentKafkaStreamsApplicationHostInfo() { + Properties streamsConfiguration = this.streamsBuilderFactoryBean + .getStreamsConfiguration(); + if (streamsConfiguration != null && streamsConfiguration.containsKey("application.server")) { + String applicationServer = (String) streamsConfiguration.get("application.server"); + String[] appServerComponents = StringUtils.split(applicationServer, ":"); + if (appServerComponents != null) { + return new HostInfo(appServerComponents[0], Integer.parseInt(appServerComponents[1])); + } + } + return null; + } + /** + * Gets the {@link HostInfo} where the provided store and key are hosted on. This may + * not be the current host that is running the application. Kafka Streams will look + * through all the consumer instances under the same application id and retrieves the + * proper host. Note that the end user applications must provide `application.server` as a + * configuration property for all the application instances when calling this method. + * If this is not available, then null maybe returned. + * @param generic type for key + * @param store store name + * @param key key to look for + * @param serializer {@link Serializer} for the key + * @return the {@link HostInfo} where the key for the provided store is hosted currently + */ + public HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer serializer) { + primeKafkaStreamsObject(); return this.retryTemplate.execute(context -> { + Throwable throwable = null; try { - return this.kafkaStreams.store(storeQueryParams); + KeyQueryMetadata keyQueryMetadata = this.kafkaStreams.queryMetadataForKey(store, key, serializer); + if (keyQueryMetadata != null) { + return keyQueryMetadata.activeHost(); + } } catch (Exception e) { - throw new IllegalStateException("Error retrieving state store: " + storeName, e); + throwable = e; } + // In addition to the obvious case of a valid exception above, if keyQueryMetadata was null for any + // transient reasons, let the retry kick in by forcing an exception below. + throw new IllegalStateException( + "Error when retrieving state store.", throwable != null ? throwable : + new Throwable("KeyQueryMetadata is not yet available.")); }); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryServiceTests.java b/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryServiceTests.java index e59ddcc89c..5865133e44 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryServiceTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryServiceTests.java @@ -27,9 +27,12 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; @@ -40,6 +43,7 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.processor.WallclockTimestampExtractor; +import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.junit.jupiter.api.Test; @@ -108,11 +112,7 @@ class KafkaStreamsInteractiveQueryServiceTests { @Test void retrieveQueryableStore() throws Exception { - this.kafkaTemplate.sendDefault(123, "123"); - this.kafkaTemplate.flush(); - - ConsumerRecord result = resultFuture.get(600, TimeUnit.SECONDS); - assertThat(result).isNotNull(); + ensureKafkaStreamsProcessorIsUpAndRunning(); ReadOnlyKeyValueStore objectObjectReadOnlyKeyValueStore = this.interactiveQueryService .retrieveQueryableStore(STATE_STORE, @@ -121,14 +121,18 @@ void retrieveQueryableStore() throws Exception { assertThat((Long) objectObjectReadOnlyKeyValueStore.get(123)).isGreaterThanOrEqualTo(1); } - @SuppressWarnings("unchecked") - @Test - void retrieveNonExistentStateStoreAndVerifyRetries() throws Exception { + private void ensureKafkaStreamsProcessorIsUpAndRunning() throws InterruptedException, ExecutionException, TimeoutException { this.kafkaTemplate.sendDefault(123, "123"); this.kafkaTemplate.flush(); ConsumerRecord result = resultFuture.get(600, TimeUnit.SECONDS); assertThat(result).isNotNull(); + } + + @SuppressWarnings("unchecked") + @Test + void retrieveNonExistentStateStoreAndVerifyRetries() throws Exception { + ensureKafkaStreamsProcessorIsUpAndRunning(); assertThat(this.streamsBuilderFactoryBean.getKafkaStreams()).isNotNull(); KafkaStreams kafkaStreams = spy(this.streamsBuilderFactoryBean.getKafkaStreams()); @@ -148,6 +152,55 @@ void retrieveNonExistentStateStoreAndVerifyRetries() throws Exception { verify(kafkaStreams, times(3)).store(any(StoreQueryParameters.class)); } + @Test + void currentHostInfo() { + HostInfo currentKafkaStreamsApplicationHostInfo = + this.interactiveQueryService.getCurrentKafkaStreamsApplicationHostInfo(); + assertThat(currentKafkaStreamsApplicationHostInfo.host()).isEqualTo("localhost"); + assertThat(currentKafkaStreamsApplicationHostInfo.port()).isEqualTo(8080); + } + + @Test + void hostInfoForKeyAndStore() throws Exception { + ensureKafkaStreamsProcessorIsUpAndRunning(); + + HostInfo kafkaStreamsApplicationHostInfo = + this.interactiveQueryService.getKafkaStreamsApplicationHostInfo(STATE_STORE, 123, + new IntegerSerializer()); + // In real applications, the above call may return a different server than what is configured + // via application.server on the Kafka Streams where the call was invoked. However, in the case + // of this test, we only have a single Kafka Streams instance and even there, we provide a mock + // value for application.server (localhost:8080). Because of that, that is what we are verifying against. + assertThat(kafkaStreamsApplicationHostInfo.host()).isEqualTo("localhost"); + assertThat(kafkaStreamsApplicationHostInfo.port()).isEqualTo(8080); + } + + @Test + void hostInfoForNonExistentKeyAndStateStore() throws Exception { + ensureKafkaStreamsProcessorIsUpAndRunning(); + + assertThat(this.streamsBuilderFactoryBean.getKafkaStreams()).isNotNull(); + KafkaStreams kafkaStreams = spy(this.streamsBuilderFactoryBean.getKafkaStreams()); + assertThat(kafkaStreams).isNotNull(); + + Field kafkaStreamsField = KafkaStreamsInteractiveQueryService.class.getDeclaredField("kafkaStreams"); + kafkaStreamsField.setAccessible(true); + kafkaStreamsField.set(interactiveQueryService, kafkaStreams); + + IntegerSerializer serializer = new IntegerSerializer(); + + assertThatExceptionOfType(IllegalStateException.class) + .isThrownBy(() -> { + this.interactiveQueryService.getKafkaStreamsApplicationHostInfo(NON_EXISTENT_STORE, 12345, + serializer); + }) + .withMessageContaining("Error when retrieving state store."); + + verify(kafkaStreams, times(3)).queryMetadataForKey(NON_EXISTENT_STORE, 12345, + serializer); + } + + @Configuration @EnableKafka @EnableKafkaStreams @@ -204,6 +257,7 @@ public KafkaStreamsConfiguration kStreamsConfigs() { props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName()); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100"); + props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:8080"); return new KafkaStreamsConfiguration(props); } From fa7fd1dd7bc6cb7fd7e39055afe19a2eeecf1f6c Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Thu, 29 Feb 2024 20:30:20 -0500 Subject: [PATCH 2/2] Addressing PR review --- .../streams/KafkaStreamsInteractiveQueryService.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryService.java b/spring-kafka/src/main/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryService.java index 0a9301ce02..2c011d6c24 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryService.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryService.java @@ -26,6 +26,7 @@ import org.apache.kafka.streams.state.QueryableStoreType; import org.springframework.kafka.config.StreamsBuilderFactoryBean; +import org.springframework.lang.Nullable; import org.springframework.retry.support.RetryTemplate; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -81,7 +82,7 @@ public void setRetryTemplate(RetryTemplate retryTemplate) { * @return queryable store. */ public T retrieveQueryableStore(String storeName, QueryableStoreType storeType) { - primeKafkaStreamsObject(); + populateKafkaStreams(); StoreQueryParameters storeQueryParams = StoreQueryParameters.fromNameAndType(storeName, storeType); return this.retryTemplate.execute(context -> { @@ -94,7 +95,7 @@ public T retrieveQueryableStore(String storeName, QueryableStoreType stor }); } - private void primeKafkaStreamsObject() { + private void populateKafkaStreams() { if (this.kafkaStreams == null) { this.kafkaStreams = this.streamsBuilderFactoryBean.getKafkaStreams(); } @@ -103,13 +104,14 @@ private void primeKafkaStreamsObject() { } /** - * Gets the current {@link HostInfo} where this Kafka Streams application is running on. + * Retrieve the current {@link HostInfo} where this Kafka Streams application is running on. * This {link @HostInfo} is different from the Kafka `bootstrap.server` property, and is based on * the Kafka Streams configuration property `application.server` where user-defined REST * endpoints can be invoked per each Kafka Streams application instance. * If this property - `application.server` - is not available from the end-user application, then null is returned. * @return the current {@link HostInfo} */ + @Nullable public HostInfo getCurrentKafkaStreamsApplicationHostInfo() { Properties streamsConfiguration = this.streamsBuilderFactoryBean .getStreamsConfiguration(); @@ -124,7 +126,7 @@ public HostInfo getCurrentKafkaStreamsApplicationHostInfo() { } /** - * Gets the {@link HostInfo} where the provided store and key are hosted on. This may + * Retrieve the {@link HostInfo} where the provided store and key are hosted on. This may * not be the current host that is running the application. Kafka Streams will look * through all the consumer instances under the same application id and retrieves the * proper host. Note that the end user applications must provide `application.server` as a @@ -137,7 +139,7 @@ public HostInfo getCurrentKafkaStreamsApplicationHostInfo() { * @return the {@link HostInfo} where the key for the provided store is hosted currently */ public HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer serializer) { - primeKafkaStreamsObject(); + populateKafkaStreams(); return this.retryTemplate.execute(context -> { Throwable throwable = null; try {