Skip to content

Commit d2fa553

Browse files
committed
Use more efficient Reactor operators
Use handle/flatMapIterable instead of flatMap/flatMapMany when possible. Closes gh-22727
1 parent b1231de commit d2fa553

File tree

5 files changed

+37
-31
lines changed

5 files changed

+37
-31
lines changed

spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Decoder.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ private Flux<Object> decodeInternal(Flux<TokenBuffer> tokens, ResolvableType ele
115115
getObjectMapper().readerWithView(jsonView).forType(javaType) :
116116
getObjectMapper().readerFor(javaType));
117117

118-
return tokens.flatMap(tokenBuffer -> {
118+
return tokens.handle((tokenBuffer, sink) -> {
119119
try {
120120
Object value = reader.readValue(tokenBuffer.asParser(getObjectMapper()));
121121
if (!Hints.isLoggingSuppressed(hints)) {
@@ -124,16 +124,18 @@ private Flux<Object> decodeInternal(Flux<TokenBuffer> tokens, ResolvableType ele
124124
return Hints.getLogPrefix(hints) + "Decoded [" + formatted + "]";
125125
});
126126
}
127-
return Mono.justOrEmpty(value);
127+
if (value != null) {
128+
sink.next(value);
129+
}
128130
}
129131
catch (InvalidDefinitionException ex) {
130-
return Mono.error(new CodecException("Type definition error: " + ex.getType(), ex));
132+
sink.error(new CodecException("Type definition error: " + ex.getType(), ex));
131133
}
132134
catch (JsonProcessingException ex) {
133-
return Mono.error(new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex));
135+
sink.error(new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex));
134136
}
135137
catch (IOException ex) {
136-
return Mono.error(new DecodingException("I/O error while parsing input stream", ex));
138+
sink.error(new DecodingException("I/O error while parsing input stream", ex));
137139
}
138140
});
139141
}

spring-web/src/main/java/org/springframework/http/codec/xml/Jaxb2XmlDecoder.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
1919
import java.util.ArrayList;
2020
import java.util.List;
2121
import java.util.Map;
22+
import java.util.function.BiConsumer;
2223
import java.util.function.Function;
2324
import javax.xml.XMLConstants;
2425
import javax.xml.bind.JAXBElement;
@@ -35,6 +36,7 @@
3536
import org.reactivestreams.Publisher;
3637
import reactor.core.publisher.Flux;
3738
import reactor.core.publisher.Mono;
39+
import reactor.core.publisher.SynchronousSink;
3840

3941
import org.springframework.core.ResolvableType;
4042
import org.springframework.core.codec.AbstractDecoder;
@@ -224,11 +226,11 @@ else if (outputClass.isAnnotationPresent(XmlType.class)) {
224226
* </ol>
225227
*/
226228
Flux<List<XMLEvent>> split(Flux<XMLEvent> xmlEventFlux, QName desiredName) {
227-
return xmlEventFlux.flatMap(new SplitFunction(desiredName));
229+
return xmlEventFlux.handle(new SplitHandler(desiredName));
228230
}
229231

230232

231-
private static class SplitFunction implements Function<XMLEvent, Publisher<? extends List<XMLEvent>>> {
233+
private static class SplitHandler implements BiConsumer<XMLEvent, SynchronousSink<List<XMLEvent>>> {
232234

233235
private final QName desiredName;
234236

@@ -239,12 +241,12 @@ private static class SplitFunction implements Function<XMLEvent, Publisher<? ext
239241

240242
private int barrier = Integer.MAX_VALUE;
241243

242-
public SplitFunction(QName desiredName) {
244+
public SplitHandler(QName desiredName) {
243245
this.desiredName = desiredName;
244246
}
245247

246248
@Override
247-
public Publisher<? extends List<XMLEvent>> apply(XMLEvent event) {
249+
public void accept(XMLEvent event, SynchronousSink<List<XMLEvent>> sink) {
248250
if (event.isStartElement()) {
249251
if (this.barrier == Integer.MAX_VALUE) {
250252
QName startElementName = event.asStartElement().getName();
@@ -264,10 +266,9 @@ public Publisher<? extends List<XMLEvent>> apply(XMLEvent event) {
264266
if (this.elementDepth == this.barrier) {
265267
this.barrier = Integer.MAX_VALUE;
266268
Assert.state(this.events != null, "No XMLEvent List");
267-
return Mono.just(this.events);
269+
sink.next(this.events);
268270
}
269271
}
270-
return Mono.empty();
271272
}
272273
}
273274

spring-web/src/main/java/org/springframework/http/codec/xml/XmlEventDecoder.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import java.io.InputStream;
2020
import java.util.ArrayList;
21-
import java.util.Iterator;
2221
import java.util.List;
2322
import java.util.Map;
2423
import java.util.function.Function;
@@ -33,6 +32,7 @@
3332
import com.fasterxml.aalto.evt.EventAllocatorImpl;
3433
import com.fasterxml.aalto.stax.InputFactoryImpl;
3534
import org.reactivestreams.Publisher;
35+
import reactor.core.Exceptions;
3636
import reactor.core.publisher.Flux;
3737
import reactor.core.publisher.Mono;
3838

@@ -106,18 +106,17 @@ public Flux<XMLEvent> decode(Publisher<DataBuffer> inputStream, ResolvableType e
106106
}
107107
else {
108108
Mono<DataBuffer> singleBuffer = DataBufferUtils.join(flux);
109-
return singleBuffer.
110-
flatMapMany(dataBuffer -> {
111-
try {
112-
InputStream is = dataBuffer.asInputStream();
113-
Iterator eventReader = inputFactory.createXMLEventReader(is);
114-
return Flux.fromIterable((Iterable<XMLEvent>) () -> eventReader)
115-
.doFinally(t -> DataBufferUtils.release(dataBuffer));
116-
}
117-
catch (XMLStreamException ex) {
118-
return Mono.error(ex);
119-
}
120-
});
109+
return singleBuffer.flatMapIterable(dataBuffer -> {
110+
InputStream is = dataBuffer.asInputStream();
111+
return () -> {
112+
try {
113+
return inputFactory.createXMLEventReader(is);
114+
}
115+
catch (XMLStreamException ex) {
116+
throw Exceptions.propagate(ex);
117+
}
118+
};
119+
});
121120
}
122121
}
123122

spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -441,13 +441,13 @@ public <T> Mono<T> bodyToMono(ParameterizedTypeReference<T> bodyType) {
441441
@Override
442442
public <T> Flux<T> bodyToFlux(Class<T> elementType) {
443443
return this.responseMono.flatMapMany(response ->
444-
handleBody(response, response.bodyToFlux(elementType), mono -> mono.flatMapMany(Flux::error)));
444+
handleBody(response, response.bodyToFlux(elementType), mono -> mono.handle((t, sink) -> sink.error(t))));
445445
}
446446

447447
@Override
448448
public <T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> elementType) {
449449
return this.responseMono.flatMapMany(response ->
450-
handleBody(response, response.bodyToFlux(elementType), mono -> mono.flatMapMany(Flux::error)));
450+
handleBody(response, response.bodyToFlux(elementType), mono -> mono.handle((t, sink) -> sink.error(t))));
451451
}
452452

453453
private <T extends Publisher<?>> T handleBody(ClientResponse response,

spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/RequestPartMethodArgumentResolver.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.web.reactive.result.method.annotation;
1818

19+
import java.util.Collections;
1920
import java.util.List;
2021

2122
import reactor.core.publisher.Flux;
@@ -71,12 +72,15 @@ public Mono<Object> resolveArgument(
7172
String name = getPartName(parameter, requestPart);
7273

7374
Flux<Part> parts = exchange.getMultipartData()
74-
.flatMapMany(map -> {
75+
.flatMapIterable(map -> {
7576
List<Part> list = map.get(name);
7677
if (CollectionUtils.isEmpty(list)) {
77-
return (isRequired ? Flux.error(getMissingPartException(name, parameter)) : Flux.empty());
78+
if (isRequired) {
79+
throw getMissingPartException(name, parameter);
80+
}
81+
return Collections.emptyList();
7882
}
79-
return Flux.fromIterable(list);
83+
return list;
8084
});
8185

8286
if (Part.class.isAssignableFrom(parameter.getParameterType())) {

0 commit comments

Comments
 (0)