Skip to content

Commit 389238f

Browse files
committed
Add registerReactiveTypeOverride method to ReactiveAdapterRegistry
Closes gh-31047
1 parent 6f2a13f commit 389238f

File tree

2 files changed

+66
-6
lines changed

2 files changed

+66
-6
lines changed

spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,16 +105,45 @@ public ReactiveAdapterRegistry() {
105105
* Register a reactive type along with functions to adapt to and from a
106106
* Reactive Streams {@link Publisher}. The function arguments assume that
107107
* their input is neither {@code null} nor {@link Optional}.
108+
* <p>This variant registers the new adapter after existing adapters.
109+
* It will be matched for the exact reactive type if no earlier adapter was
110+
* registered for the specific type, and it will be matched for assignability
111+
* in a second pass if no earlier adapter had an assignable type before.
112+
* @see #registerReactiveTypeOverride
113+
* @see #getAdapter
108114
*/
109115
public void registerReactiveType(ReactiveTypeDescriptor descriptor,
110116
Function<Object, Publisher<?>> toAdapter, Function<Publisher<?>, Object> fromAdapter) {
111117

112-
if (reactorPresent) {
113-
this.adapters.add(new ReactorAdapter(descriptor, toAdapter, fromAdapter));
114-
}
115-
else {
116-
this.adapters.add(new ReactiveAdapter(descriptor, toAdapter, fromAdapter));
117-
}
118+
this.adapters.add(buildAdapter(descriptor, toAdapter, fromAdapter));
119+
}
120+
121+
/**
122+
* Register a reactive type along with functions to adapt to and from a
123+
* Reactive Streams {@link Publisher}. The function arguments assume that
124+
* their input is neither {@code null} nor {@link Optional}.
125+
* <p>This variant registers the new adapter first, effectively overriding
126+
* any previously registered adapters for the same reactive type. This allows
127+
* for overriding existing adapters, in particular default adapters.
128+
* <p>Note that existing adapters for specific types will still match before
129+
* an assignability match with the new adapter. In order to override all
130+
* existing matches, a new reactive type adapter needs to be registered
131+
* for every specific type, not relying on subtype assignability matches.
132+
* @since 5.3.30
133+
* @see #registerReactiveType
134+
* @see #getAdapter
135+
*/
136+
public void registerReactiveTypeOverride(ReactiveTypeDescriptor descriptor,
137+
Function<Object, Publisher<?>> toAdapter, Function<Publisher<?>, Object> fromAdapter) {
138+
139+
this.adapters.add(0, buildAdapter(descriptor, toAdapter, fromAdapter));
140+
}
141+
142+
private ReactiveAdapter buildAdapter(ReactiveTypeDescriptor descriptor,
143+
Function<Object, Publisher<?>> toAdapter, Function<Publisher<?>, Object> fromAdapter) {
144+
145+
return (reactorPresent ? new ReactorAdapter(descriptor, toAdapter, fromAdapter) :
146+
new ReactiveAdapter(descriptor, toAdapter, fromAdapter));
118147
}
119148

120149
/**

spring-core/src/test/java/org/springframework/core/ReactiveAdapterRegistryTests.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
* Unit tests for {@link ReactiveAdapterRegistry}.
4040
*
4141
* @author Rossen Stoyanchev
42+
* @author Juergen Hoeller
4243
*/
4344
@SuppressWarnings("unchecked")
4445
class ReactiveAdapterRegistryTests {
@@ -54,14 +55,40 @@ void getAdapterForReactiveSubType() {
5455
ReactiveAdapter adapter2 = getAdapter(ExtendedFlux.class);
5556
assertThat(adapter2).isSameAs(adapter1);
5657

58+
// Register regular reactive type (after existing adapters)
5759
this.registry.registerReactiveType(
5860
ReactiveTypeDescriptor.multiValue(ExtendedFlux.class, ExtendedFlux::empty),
5961
o -> (ExtendedFlux<?>) o,
6062
ExtendedFlux::from);
6163

64+
// Matches for ExtendedFlux itself
6265
ReactiveAdapter adapter3 = getAdapter(ExtendedFlux.class);
6366
assertThat(adapter3).isNotNull();
6467
assertThat(adapter3).isNotSameAs(adapter1);
68+
69+
// Does not match for ExtendedFlux subclass since the default Flux adapter
70+
// is being assignability-checked first when no specific match was found
71+
ReactiveAdapter adapter4 = getAdapter(ExtendedExtendedFlux.class);
72+
assertThat(adapter4).isSameAs(adapter1);
73+
74+
// Register reactive type override (before existing adapters)
75+
this.registry.registerReactiveTypeOverride(
76+
ReactiveTypeDescriptor.multiValue(Flux.class, ExtendedFlux::empty),
77+
o -> (ExtendedFlux<?>) o,
78+
ExtendedFlux::from);
79+
80+
// Override match for Flux
81+
ReactiveAdapter adapter5 = getAdapter(Flux.class);
82+
assertThat(adapter5).isNotNull();
83+
assertThat(adapter5).isNotSameAs(adapter1);
84+
85+
// Initially registered adapter specifically matches for ExtendedFlux
86+
ReactiveAdapter adapter6 = getAdapter(ExtendedFlux.class);
87+
assertThat(adapter6).isSameAs(adapter3);
88+
89+
// Override match for ExtendedFlux subclass
90+
ReactiveAdapter adapter7 = getAdapter(ExtendedExtendedFlux.class);
91+
assertThat(adapter7).isSameAs(adapter5);
6592
}
6693

6794

@@ -81,6 +108,10 @@ public void subscribe(CoreSubscriber<? super T> actual) {
81108
}
82109

83110

111+
private static class ExtendedExtendedFlux<T> extends ExtendedFlux<T> {
112+
}
113+
114+
84115
@Nested
85116
class Reactor {
86117

0 commit comments

Comments
 (0)