Skip to content

Commit e370c15

Browse files
committed
Fix error handling regression for non-streaming Flux
Closes gh-29038
1 parent 3307630 commit e370c15

File tree

2 files changed

+34
-12
lines changed

2 files changed

+34
-12
lines changed

spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Encoder.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.lang.annotation.Annotation;
2121
import java.nio.charset.Charset;
2222
import java.util.ArrayList;
23+
import java.util.Arrays;
2324
import java.util.Collections;
2425
import java.util.List;
2526
import java.util.Map;
@@ -47,6 +48,7 @@
4748
import org.springframework.core.codec.Hints;
4849
import org.springframework.core.io.buffer.DataBuffer;
4950
import org.springframework.core.io.buffer.DataBufferFactory;
51+
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
5052
import org.springframework.core.log.LogFormatUtils;
5153
import org.springframework.http.MediaType;
5254
import org.springframework.http.codec.HttpMessageEncoder;
@@ -73,6 +75,8 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple
7375

7476
private static final byte[] EMPTY_BYTES = new byte[0];
7577

78+
private static DataBuffer EMPTY_BUFFER = DefaultDataBufferFactory.sharedInstance.wrap(EMPTY_BYTES);
79+
7680
private static final Map<String, JsonEncoding> ENCODINGS;
7781

7882
static {
@@ -174,11 +178,21 @@ public Flux<DataBuffer> encode(Publisher<?> inputStream, DataBufferFactory buffe
174178
}
175179
else {
176180
JsonArrayJoinHelper helper = new JsonArrayJoinHelper();
177-
return Flux.concat(
178-
helper.getPrefix(bufferFactory, hints, logger),
179-
Flux.from(inputStream).map(value -> encodeStreamingValue(
180-
value, bufferFactory, hints, sequenceWriter, byteBuilder, helper.getDelimiter(), EMPTY_BYTES)),
181-
helper.getSuffix(bufferFactory, hints, logger));
181+
182+
// Do not prepend JSON array prefix until first signal is known, onNext vs onError
183+
// Keeps response not committed for error handling
184+
185+
Flux<DataBuffer> flux1 = helper.getPrefix(bufferFactory, hints, logger)
186+
.concatWith(Flux.just(EMPTY_BUFFER).repeat());
187+
188+
Flux<DataBuffer> flux2 = Flux.from(inputStream).map(value -> encodeStreamingValue(
189+
value, bufferFactory, hints, sequenceWriter, byteBuilder, helper.getDelimiter(), EMPTY_BYTES));
190+
191+
dataBufferFlux = Flux.zip(flux1, flux2, (buffer1, buffer2) ->
192+
(buffer1 != EMPTY_BUFFER ?
193+
bufferFactory.join(Arrays.asList(buffer1, buffer2)) :
194+
buffer2))
195+
.concatWith(helper.getSuffix(bufferFactory, hints, logger));
182196
}
183197

184198
return dataBufferFlux

spring-web/src/test/java/org/springframework/http/codec/json/Jackson2JsonEncoderTests.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -137,29 +137,38 @@ public void encodeNonStream() {
137137
);
138138

139139
testEncode(input, Pojo.class, step -> step
140-
.consumeNextWith(expectString("["))
141-
.consumeNextWith(expectString("{\"foo\":\"foo\",\"bar\":\"bar\"}"))
140+
.consumeNextWith(expectString("[{\"foo\":\"foo\",\"bar\":\"bar\"}"))
142141
.consumeNextWith(expectString(",{\"foo\":\"foofoo\",\"bar\":\"barbar\"}"))
143142
.consumeNextWith(expectString(",{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}"))
144143
.consumeNextWith(expectString("]"))
145144
.verifyComplete());
146145
}
147146

147+
@Test // gh-29038
148+
void encodeNonStreamWithErrorAsFirstSignal() {
149+
String message = "I'm a teapot";
150+
Flux<Object> input = Flux.error(new IllegalStateException(message));
151+
152+
Flux<DataBuffer> output = this.encoder.encode(
153+
input, this.bufferFactory, ResolvableType.forClass(Pojo.class), null, null);
154+
155+
StepVerifier.create(output).expectErrorMessage(message).verify();
156+
}
157+
148158
@Test
149159
public void encodeWithType() {
150160
Flux<ParentClass> input = Flux.just(new Foo(), new Bar());
151161

152162
testEncode(input, ParentClass.class, step -> step
153-
.consumeNextWith(expectString("["))
154-
.consumeNextWith(expectString("{\"type\":\"foo\"}"))
163+
.consumeNextWith(expectString("[{\"type\":\"foo\"}"))
155164
.consumeNextWith(expectString(",{\"type\":\"bar\"}"))
156165
.consumeNextWith(expectString("]"))
157166
.verifyComplete());
158167
}
159168

160169

161170
@Test // SPR-15727
162-
public void encodeAsStreamWithCustomStreamingType() {
171+
public void encodeStreamWithCustomStreamingType() {
163172
MediaType fooMediaType = new MediaType("application", "foo");
164173
MediaType barMediaType = new MediaType("application", "bar");
165174
this.encoder.setStreamingMediaTypes(Arrays.asList(fooMediaType, barMediaType));
@@ -263,8 +272,7 @@ public void encodeWithFlushAfterWriteOff() {
263272
ResolvableType.forClass(Pojo.class), MimeTypeUtils.APPLICATION_JSON, Collections.emptyMap());
264273

265274
StepVerifier.create(result)
266-
.consumeNextWith(expectString("["))
267-
.consumeNextWith(expectString("{\"foo\":\"foo\",\"bar\":\"bar\"}"))
275+
.consumeNextWith(expectString("[{\"foo\":\"foo\",\"bar\":\"bar\"}"))
268276
.consumeNextWith(expectString("]"))
269277
.expectComplete()
270278
.verify(Duration.ofSeconds(5));

0 commit comments

Comments
 (0)