Skip to content

Commit 96eed46

Browse files
authored
Merge branch 'spring-projects:main' into spring-projectsGH-3001
2 parents 1d9ddba + 22764cb commit 96eed46

File tree

85 files changed

+3354
-701
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

85 files changed

+3354
-701
lines changed

.github/dependabot.yml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,14 @@ updates:
2323
- "org.awaitility:awaitility"
2424
- "com.google.code.findbugs:jsr305"
2525
- "org.springframework.boot*"
26+
27+
- package-ecosystem: "github-actions"
28+
directory: "/"
29+
schedule:
30+
interval: "weekly"
31+
day: "saturday"
32+
labels: ["type: task"]
33+
groups:
34+
development-dependencies:
35+
patterns:
36+
- "*"
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
name: Auto Cherry-Pick
2+
3+
on:
4+
push:
5+
branches:
6+
- main
7+
- '*.x'
8+
9+
jobs:
10+
cherry-pick-commit:
11+
uses: spring-io/spring-github-workflows/.github/workflows/spring-cherry-pick.yml@main
12+
secrets:
13+
GH_ACTIONS_REPO_TOKEN: ${{ secrets.GH_ACTIONS_REPO_TOKEN }}

.github/workflows/backport-issue.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,6 @@ on:
77

88
jobs:
99
backport-issue:
10-
uses: spring-io/spring-github-workflows/.github/workflows/spring-backport-issue.yml@v2
10+
uses: spring-io/spring-github-workflows/.github/workflows/spring-backport-issue.yml@main
1111
secrets:
1212
GH_ACTIONS_REPO_TOKEN: ${{ secrets.GH_ACTIONS_REPO_TOKEN }}

.github/workflows/ci-snapshot.yml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@ concurrency:
1717

1818
jobs:
1919
build-snapshot:
20-
uses: spring-io/spring-github-workflows/.github/workflows/spring-artifactory-gradle-snapshot.yml@v2
20+
uses: spring-io/spring-github-workflows/.github/workflows/spring-artifactory-gradle-snapshot.yml@main
2121
with:
2222
gradleTasks: ${{ github.event_name == 'schedule' && '--rerun-tasks' || '' }}
2323
secrets:
2424
GRADLE_ENTERPRISE_CACHE_USER: ${{ secrets.GRADLE_ENTERPRISE_CACHE_USER }}
2525
GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GRADLE_ENTERPRISE_CACHE_PASSWORD }}
2626
GRADLE_ENTERPRISE_SECRET_ACCESS_KEY: ${{ secrets.GRADLE_ENTERPRISE_SECRET_ACCESS_KEY }}
27-
JF_ARTIFACTORY_SPRING: ${{ secrets.JF_ARTIFACTORY_SPRING }}
27+
ARTIFACTORY_USERNAME: ${{ secrets.ARTIFACTORY_USERNAME }}
28+
ARTIFACTORY_PASSWORD: ${{ secrets.ARTIFACTORY_PASSWORD }}

.github/workflows/deploy-docs.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@ permissions:
1616
jobs:
1717
dispatch-docs-build:
1818
if: github.repository_owner == 'spring-projects'
19-
uses: spring-io/spring-github-workflows/.github/workflows/spring-dispatch-docs-build.yml@v2
19+
uses: spring-io/spring-github-workflows/.github/workflows/spring-dispatch-docs-build.yml@main

.github/workflows/merge-dependabot-pr.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,6 @@ jobs:
1111
merge-dependabot-pr:
1212
permissions: write-all
1313

14-
uses: spring-io/spring-github-workflows/.github/workflows/spring-merge-dependabot-pr.yml@v2
14+
uses: spring-io/spring-github-workflows/.github/workflows/spring-merge-dependabot-pr.yml@main
15+
with:
16+
mergeArguments: --auto --squash

.github/workflows/pr-build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,4 @@ on:
77

88
jobs:
99
build-pull-request:
10-
uses: spring-io/spring-github-workflows/.github/workflows/spring-gradle-pull-request-build.yml@v2
10+
uses: spring-io/spring-github-workflows/.github/workflows/spring-gradle-pull-request-build.yml@main

.github/workflows/release.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@ jobs:
1212
contents: write
1313
issues: write
1414

15-
uses: spring-io/spring-github-workflows/.github/workflows/spring-artifactory-gradle-release.yml@v2
15+
uses: spring-io/spring-github-workflows/.github/workflows/spring-artifactory-gradle-release.yml@main
1616
secrets:
1717
GH_ACTIONS_REPO_TOKEN: ${{ secrets.GH_ACTIONS_REPO_TOKEN }}
1818
GRADLE_ENTERPRISE_CACHE_USER: ${{ secrets.GRADLE_ENTERPRISE_CACHE_USER }}
1919
GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GRADLE_ENTERPRISE_CACHE_PASSWORD }}
2020
GRADLE_ENTERPRISE_SECRET_ACCESS_KEY: ${{ secrets.GRADLE_ENTERPRISE_SECRET_ACCESS_KEY }}
2121
JF_ARTIFACTORY_SPRING: ${{ secrets.JF_ARTIFACTORY_SPRING }}
22+
ARTIFACTORY_USERNAME: ${{ secrets.ARTIFACTORY_USERNAME }}
23+
ARTIFACTORY_PASSWORD: ${{ secrets.ARTIFACTORY_PASSWORD }}
2224
OSSRH_URL: ${{ secrets.OSSRH_URL }}
2325
OSSRH_S01_TOKEN_USERNAME: ${{ secrets.OSSRH_S01_TOKEN_USERNAME }}
2426
OSSRH_S01_TOKEN_PASSWORD: ${{ secrets.OSSRH_S01_TOKEN_PASSWORD }}

.github/workflows/verify-staged-artifacts.yml

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,13 @@ on:
88
required: true
99
type: string
1010

11+
env:
12+
GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GRADLE_ENTERPRISE_CACHE_USER }}
13+
GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GRADLE_ENTERPRISE_CACHE_PASSWORD }}
14+
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GRADLE_ENTERPRISE_SECRET_ACCESS_KEY }}
15+
ARTIFACTORY_USERNAME: ${{ secrets.ARTIFACTORY_USERNAME }}
16+
ARTIFACTORY_PASSWORD: ${{ secrets.ARTIFACTORY_PASSWORD }}
17+
1118
jobs:
1219
verify-staged-with-spring-integration:
1320
runs-on: ubuntu-latest
@@ -19,28 +26,31 @@ jobs:
1926
repository: spring-projects/spring-integration
2027
show-progress: false
2128

22-
- name: Set up JDK
23-
uses: actions/setup-java@v3
24-
with:
25-
distribution: temurin
26-
java-version: 17
27-
cache: 'gradle'
28-
29-
- uses: jfrog/setup-jfrog-cli@v3
30-
env:
31-
JF_ENV_SPRING: ${{ secrets.JF_ARTIFACTORY_SPRING }}
32-
33-
- name: Configure JFrog Cli
34-
run: jf gradlec --repo-resolve libs-release-staging
29+
- name: Set up Gradle
30+
uses: spring-io/spring-gradle-build-action@v2
3531

36-
- name: Verify Spring Integration Kafka against staged release
32+
- name: Prepare Spring Integration project against Staging
3733
run: |
38-
sed -i "1,/springKafkaVersion.*/s/springKafkaVersion.*/springKafkaVersion='${{ inputs.releaseVersion }}'/" build.gradle
39-
jf gradle :spring-integration-kafka:check
34+
printf "allprojects {
35+
repositories {
36+
maven {
37+
url 'https://repo.spring.io/libs-staging-local'
38+
credentials {
39+
username = '$ARTIFACTORY_USERNAME'
40+
password = '$ARTIFACTORY_PASSWORD'
41+
}
42+
}
43+
}
44+
}" > staging-repo-init.gradle
45+
46+
sed -i "1,/springKafkaVersion.*/s/springKafkaVersion.*/springKafkaVersion='${{ inputs.releaseVersion }}'/" build.gradle
47+
48+
- name: Verify Spring Integration Kafka module against staged release
49+
run: gradle :spring-integration-kafka:check --init-script staging-repo-init.gradle
4050

4151
- name: Capture Test Results
4252
if: failure()
43-
uses: actions/upload-artifact@v3
53+
uses: actions/upload-artifact@v4
4454
with:
4555
name: test-results
4656
path: '**/target/surefire-reports/**/*.*'

build.gradle

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,22 +55,23 @@ ext {
5555
awaitilityVersion = '4.2.0'
5656
hamcrestVersion = '2.2'
5757
hibernateValidationVersion = '8.0.1.Final'
58-
jacksonBomVersion = '2.15.3'
58+
jacksonBomVersion = '2.15.4'
5959
jaywayJsonPathVersion = '2.8.0'
6060
junit4Version = '4.13.2'
61-
junitJupiterVersion = '5.10.1'
61+
junitJupiterVersion = '5.10.2'
6262
kafkaVersion = '3.6.1'
63+
kotlinCoroutinesVersion = '1.7.3'
6364
log4jVersion = '2.22.1'
6465
micrometerDocsVersion = '1.0.2'
65-
micrometerVersion = '1.13.0-SNAPSHOT'
66-
micrometerTracingVersion = '1.3.0-SNAPSHOT'
66+
micrometerVersion = '1.13.0-M1'
67+
micrometerTracingVersion = '1.3.0-M1'
6768
mockitoVersion = '5.8.0'
68-
reactorVersion = '2023.0.2'
69+
reactorVersion = '2023.0.3'
6970
scalaVersion = '2.13'
7071
springBootVersion = '3.2.2' // docs module
71-
springDataVersion = '2023.2.0-SNAPSHOT'
72+
springDataVersion = '2024.0.0-M1'
7273
springRetryVersion = '2.0.5'
73-
springVersion = '6.1.3'
74+
springVersion = '6.1.4'
7475
zookeeperVersion = '3.8.3'
7576

7677
idPrefix = 'kafka'
@@ -278,6 +279,7 @@ project ('spring-kafka') {
278279
}
279280
api "org.apache.kafka:kafka-clients:$kafkaVersion"
280281
optionalApi "org.apache.kafka:kafka-streams:$kafkaVersion"
282+
optionalApi "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:$kotlinCoroutinesVersion"
281283
optionalApi 'com.fasterxml.jackson.core:jackson-core'
282284
optionalApi 'com.fasterxml.jackson.core:jackson-databind'
283285
optionalApi 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8'

spring-kafka-docs/src/main/antora/modules/ROOT/nav.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
**** xref:kafka/receiving-messages/message-listeners.adoc[]
1212
**** xref:kafka/receiving-messages/message-listener-container.adoc[]
1313
**** xref:kafka/receiving-messages/ooo-commits.adoc[]
14+
**** xref:kafka/receiving-messages/async-returns.adoc[]
1415
**** xref:kafka/receiving-messages/listener-annotation.adoc[]
1516
**** xref:kafka/receiving-messages/listener-group-id.adoc[]
1617
**** xref:kafka/receiving-messages/container-thread-naming.adoc[]
@@ -20,6 +21,7 @@
2021
**** xref:kafka/receiving-messages/kafkalistener-lifecycle.adoc[]
2122
**** xref:kafka/receiving-messages/validation.adoc[]
2223
**** xref:kafka/receiving-messages/rebalance-listeners.adoc[]
24+
**** xref:kafka/receiving-messages/enforced-rebalance.adoc[]
2325
**** xref:kafka/receiving-messages/annotation-send-to.adoc[]
2426
**** xref:kafka/receiving-messages/filtering.adoc[]
2527
**** xref:kafka/receiving-messages/retrying-deliveries.adoc[]

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,10 @@ AfterRollbackProcessor<String, String> processor =
451451
When you do not use transactions, you can achieve similar functionality by configuring a `DefaultErrorHandler`.
452452
See xref:kafka/annotation-error-handling.adoc#error-handlers[Container Error Handlers].
453453

454-
IMPORTANT: Recovery is not possible with a batch listener, since the framework has no knowledge about which record in the batch keeps failing.
454+
Starting with version 3.2, Recovery can now recover (skip) entire batch of records that keeps failing.
455+
Set `ContainerProperties.setBatchRecoverAfterRollback(true)` to enable this feature.
456+
457+
IMPORTANT: Default behavior, recovery is not possible with a batch listener, since the framework has no knowledge about which record in the batch keeps failing.
455458
In such cases, the application listener must handle a record that keeps failing.
456459

457460
See also xref:kafka/annotation-error-handling.adoc#dead-letters[Publishing Dead-letter Records].
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
[[async-returns]]
2+
= Asynchronous `@KafkaListener` Return Types
3+
4+
Starting with version 3.2, `@KafkaListener` (and `@KafkaHandler`) methods can be specified with asynchronous return types, letting the reply be sent asynchronously.
5+
return types include `CompletableFuture<?>`, `Mono<?>` and Kotlin `suspend` functions.
6+
7+
[source, java]
8+
----
9+
@KafkaListener(id = "myListener", topics = "myTopic")
10+
public CompletableFuture<String> listen(String data) {
11+
...
12+
CompletableFuture<String> future = new CompletableFuture<>();
13+
future.complete("done");
14+
return future;
15+
}
16+
----
17+
18+
[source, java]
19+
----
20+
@KafkaListener(id = "myListener", topics = "myTopic")
21+
public Mono<Void> listen(String data) {
22+
...
23+
return Mono.empty();
24+
}
25+
----
26+
27+
IMPORTANT: The `AckMode` will be automatically set the `MANUAL` and enable out-of-order commits when async return types are detected; instead, the asynchronous completion will ack when the async operation completes.
28+
When the async result is completed with an error, whether the message is recover or not depends on the container error handler.
29+
If some exception occurs within the listener method that prevents creation of the async result object, you MUST catch that exception and return an appropriate return object that will cause the message to be ack or recover.
30+
31+
If a `KafkaListenerErrorHandler` is configured on a listener with an async return type (including Kotlin suspend functions), the error handler is invoked after a failure.
32+
See xref:kafka/annotation-error-handling.adoc[Handling Exceptions] for more information about this error handler and its purpose.
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
[[enforced-rebalance]]
2+
= Enforcing Consumer Rebalance
3+
4+
Kafka clients now support an option to trigger an https://cwiki.apache.org/confluence/display/KAFKA/KIP-568%3A+Explicit+rebalance+triggering+on+the+Consumer[enforced rebalance].
5+
Starting with version `3.1.2`, Spring for Apache Kafka provides an option to invoke this API on the Kafka consumer via the message listener container.
6+
When calling this API, it is simply alerting the Kafka consumer to trigger an enforced rebalance; the actual rebalance will only occur as part of the next `poll()` operation.
7+
If there is already a rebalance in progress, calling an enforced rebalance is a NO-OP.
8+
The caller must wait for the current rebalance to complete before invoking another one.
9+
See the javadocs for `enfroceRebalance` for more details.
10+
11+
The following code snippet shows the essence of enforcing a rebalance using the message listener container.
12+
13+
[source, java]
14+
----
15+
@KafkaListener(id = "my.id", topics = "my-topic")
16+
void listen(ConsumerRecord<String, String> in) {
17+
System.out.println("From KafkaListener: " + in);
18+
}
19+
20+
@Bean
21+
public ApplicationRunner runner(KafkaTemplate<String, Object> template, KafkaListenerEndpointRegistry registry) {
22+
return args -> {
23+
final MessageListenerContainer listenerContainer = registry.getListenerContainer("my.id");
24+
System.out.println("Enforcing a rebalance");
25+
Thread.sleep(5_000);
26+
listenerContainer.enforceRebalance();
27+
Thread.sleep(5_000);
28+
};
29+
}
30+
----
31+
32+
As the code above shows, the application uses the `KafkaListenerEndpointRegistry` to gain access to the message listener container and then calling the `enforceRebalnce` API on it.
33+
When calling the `enforceRebalance` on the listener container, it delegates the call to the underlying Kafka consumer.
34+
The Kafka consumer will trigger a rebalance as part of the next `poll()` operation.

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,8 @@ ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
397397
new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));
398398
----
399399

400+
Starting with version `3.1.2`, a `RecoveryCallback` can be set on the `RetryingDeserializer` optionally.
401+
400402
Refer to the https://github.com/spring-projects/spring-retry[spring-retry] project for configuration of the `RetryTemplate` with a retry policy, back off policy, etc.
401403

402404

spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/features.adoc

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,3 +261,41 @@ protected Consumer<DeadLetterPublishingRecovererFactory>
261261

262262
It is recommended that you use the provided resolvers when constructing the custom instance.
263263

264+
[[exc-based-custom-dlt-routing]]
265+
== Routing of messages to custom DLTs based on thrown exceptions
266+
267+
Starting with version 3.2.0, it's possible to route messages to custom DLTs based on the type of the exception, which has been thrown during their processing.
268+
In order to do that, there's a need to specify the routing.
269+
Routing customization consists of the specification of the additional destinations.
270+
Destinations in turn consist of two settings: the `suffix` and `exceptions`.
271+
When the exception type specified in `exceptions` has been thrown, the DLT containing the `suffix` will be considered as the target topic for the message before the general purpose DLT is considered.
272+
Examples of configuration using either annotations or `RetryTopicConfiguration` beans:
273+
274+
[source, java]
275+
----
276+
@RetryableTopic(exceptionBasedDltRouting = {
277+
@ExceptionBasedDltDestination(
278+
suffix = "-deserialization", exceptions = {DeserializationException.class}
279+
)}
280+
)
281+
@KafkaListener(topics = "my-annotated-topic")
282+
public void processMessage(MyPojo message) {
283+
// ... message processing
284+
}
285+
----
286+
287+
[source, java]
288+
----
289+
@Bean
290+
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
291+
return RetryTopicConfigurationBuilder
292+
.newInstance()
293+
.dltRoutingRules(Map.of("-deserialization", Set.of(DeserializationException.class)))
294+
.create(kafkaOperations)
295+
.create(template);
296+
}
297+
----
298+
299+
`suffix` takes place before the general `dltTopicSuffix` in the custom DLT name.
300+
Considering presented examples, the message, which caused the `DeserializationException` will be routed to the `my-annotated-topic-deserialization-dlt` instead of the `my-annotated-topic-dlt`.
301+
Custom DLTs will be created following the same rules as stated in the xref:retrytopic/features.adoc#topics-autocreation[Topics AutoCreation].

0 commit comments

Comments
 (0)