Skip to content

Commit 6b9b023

Browse files
committed
Introduce JSON streaming support
This commit introduces JSON streaming support which consists of serializing HTTP request with application/stream+json media type as line delimited JSON. It also optimize Flux serialization for application/json by using flux.collectList() and a single Jackson invocation instead of one call per element previous strategy. This change result in a x4 throughput improvement for collection with a lot of small elements. Issues: SPR-15095, SPR-15104
1 parent f128feb commit 6b9b023

File tree

4 files changed

+194
-33
lines changed

4 files changed

+194
-33
lines changed

spring-web/src/main/java/org/springframework/http/MediaType.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,18 @@ public class MediaType extends MimeType implements Serializable {
137137
*/
138138
public final static String APPLICATION_RSS_XML_VALUE = "application/rss+xml";
139139

140+
/**
141+
* Public constant media type for {@code application/stream+json}.
142+
* @since 5.0
143+
*/
144+
public final static MediaType APPLICATION_STREAM_JSON;
145+
146+
/**
147+
* A String equivalent of {@link MediaType#APPLICATION_STREAM_JSON}.
148+
* @since 5.0
149+
*/
150+
public final static String APPLICATION_STREAM_JSON_VALUE = "application/stream+json";
151+
140152
/**
141153
* Public constant media type for {@code application/xhtml+xml}.
142154
*/
@@ -292,6 +304,7 @@ public class MediaType extends MimeType implements Serializable {
292304
APPLICATION_PROBLEM_JSON = valueOf(APPLICATION_PROBLEM_JSON_VALUE);
293305
APPLICATION_PROBLEM_XML = valueOf(APPLICATION_PROBLEM_XML_VALUE);
294306
APPLICATION_RSS_XML = valueOf(APPLICATION_RSS_XML_VALUE);
307+
APPLICATION_STREAM_JSON = valueOf(APPLICATION_STREAM_JSON_VALUE);
295308
APPLICATION_XHTML_XML = valueOf(APPLICATION_XHTML_XML_VALUE);
296309
APPLICATION_XML = valueOf(APPLICATION_XML_VALUE);
297310
IMAGE_GIF = valueOf(IMAGE_GIF_VALUE);

spring-web/src/main/java/org/springframework/http/codec/json/Jackson2JsonEncoder.java

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2017 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,7 +18,6 @@
1818

1919
import java.io.IOException;
2020
import java.io.OutputStream;
21-
import java.nio.ByteBuffer;
2221
import java.util.List;
2322
import java.util.Map;
2423

@@ -32,6 +31,7 @@
3231
import com.fasterxml.jackson.databind.SerializationFeature;
3332
import com.fasterxml.jackson.databind.type.TypeFactory;
3433
import org.reactivestreams.Publisher;
34+
import static org.springframework.http.MediaType.APPLICATION_STREAM_JSON;
3535
import reactor.core.publisher.Flux;
3636
import reactor.core.publisher.Mono;
3737

@@ -56,13 +56,6 @@
5656
*/
5757
public class Jackson2JsonEncoder extends AbstractJackson2Codec implements Encoder<Object> {
5858

59-
private static final ByteBuffer START_ARRAY_BUFFER = ByteBuffer.wrap(new byte[]{'['});
60-
61-
private static final ByteBuffer SEPARATOR_BUFFER = ByteBuffer.wrap(new byte[]{','});
62-
63-
private static final ByteBuffer END_ARRAY_BUFFER = ByteBuffer.wrap(new byte[]{']'});
64-
65-
6659
private final PrettyPrinter ssePrettyPrinter;
6760

6861

@@ -100,17 +93,15 @@ public Flux<DataBuffer> encode(Publisher<?> inputStream, DataBufferFactory buffe
10093
if (inputStream instanceof Mono) {
10194
return Flux.from(inputStream).map(value -> encodeValue(value, bufferFactory, elementType, hints));
10295
}
103-
104-
Mono<DataBuffer> startArray = Mono.just(bufferFactory.wrap(START_ARRAY_BUFFER));
105-
Mono<DataBuffer> endArray = Mono.just(bufferFactory.wrap(END_ARRAY_BUFFER));
106-
107-
Flux<DataBuffer> array = Flux.from(inputStream)
108-
.concatMap(value -> {
109-
DataBuffer arraySeparator = bufferFactory.wrap(SEPARATOR_BUFFER);
110-
return Flux.just(encodeValue(value, bufferFactory, elementType, hints), arraySeparator);
111-
});
112-
113-
return Flux.concat(startArray, array.skipLast(1), endArray);
96+
else if (APPLICATION_STREAM_JSON.isCompatibleWith(mimeType)) {
97+
return Flux.from(inputStream).map(value -> {
98+
DataBuffer buffer = encodeValue(value, bufferFactory, elementType, hints);
99+
buffer.write(new byte[]{'\n'});
100+
return buffer;
101+
});
102+
}
103+
ResolvableType listType = ResolvableType.forClassWithGenerics(List.class, elementType);
104+
return Flux.from(inputStream).collectList().map(list -> encodeValue(list, bufferFactory, listType, hints)).flux();
114105
}
115106

116107
private DataBuffer encodeValue(Object value, DataBufferFactory bufferFactory,

spring-web/src/test/java/org/springframework/http/codec/json/Jackson2JsonEncoderTests.java

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2017 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@
2323
import com.fasterxml.jackson.annotation.JsonTypeName;
2424
import com.fasterxml.jackson.annotation.JsonView;
2525
import org.junit.Test;
26+
import static org.springframework.http.MediaType.APPLICATION_STREAM_JSON;
2627
import reactor.core.publisher.Flux;
2728
import reactor.core.publisher.Mono;
2829
import reactor.test.StepVerifier;
@@ -66,13 +67,7 @@ public void encode() throws Exception {
6667
Flux<DataBuffer> output = this.encoder.encode(source, this.bufferFactory, type, null, Collections.emptyMap());
6768

6869
StepVerifier.create(output)
69-
.consumeNextWith(stringConsumer("["))
70-
.consumeNextWith(stringConsumer("{\"foo\":\"foo\",\"bar\":\"bar\"}"))
71-
.consumeNextWith(stringConsumer(","))
72-
.consumeNextWith(stringConsumer("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}"))
73-
.consumeNextWith(stringConsumer(","))
74-
.consumeNextWith(stringConsumer("{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}"))
75-
.consumeNextWith(stringConsumer("]"))
70+
.consumeNextWith(stringConsumer("[{\"foo\":\"foo\",\"bar\":\"bar\"},{\"foo\":\"foofoo\",\"bar\":\"barbar\"},{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}]"))
7671
.expectComplete()
7772
.verify();
7873
}
@@ -84,11 +79,25 @@ public void encodeWithType() throws Exception {
8479
Flux<DataBuffer> output = this.encoder.encode(source, this.bufferFactory, type, null, Collections.emptyMap());
8580

8681
StepVerifier.create(output)
87-
.consumeNextWith(stringConsumer("["))
88-
.consumeNextWith(stringConsumer("{\"type\":\"foo\"}"))
89-
.consumeNextWith(stringConsumer(","))
90-
.consumeNextWith(stringConsumer("{\"type\":\"bar\"}"))
91-
.consumeNextWith(stringConsumer("]"))
82+
.consumeNextWith(stringConsumer("[{\"type\":\"foo\"},{\"type\":\"bar\"}]"))
83+
.expectComplete()
84+
.verify();
85+
}
86+
87+
@Test
88+
public void encodeAsStream() throws Exception {
89+
Flux<Pojo> source = Flux.just(
90+
new Pojo("foo", "bar"),
91+
new Pojo("foofoo", "barbar"),
92+
new Pojo("foofoofoo", "barbarbar")
93+
);
94+
ResolvableType type = ResolvableType.forClass(Pojo.class);
95+
Flux<DataBuffer> output = this.encoder.encode(source, this.bufferFactory, type, APPLICATION_STREAM_JSON, Collections.emptyMap());
96+
97+
StepVerifier.create(output)
98+
.consumeNextWith(stringConsumer("{\"foo\":\"foo\",\"bar\":\"bar\"}\n"))
99+
.consumeNextWith(stringConsumer("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}\n"))
100+
.consumeNextWith(stringConsumer("{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}\n"))
92101
.expectComplete()
93102
.verify();
94103
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Copyright 2002-2017 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.web.reactive.result.method.annotation;
18+
19+
import java.time.Duration;
20+
21+
import org.junit.Before;
22+
import org.junit.Test;
23+
import static org.springframework.http.MediaType.APPLICATION_STREAM_JSON;
24+
import static org.springframework.http.MediaType.APPLICATION_STREAM_JSON_VALUE;
25+
import reactor.core.publisher.Flux;
26+
import reactor.test.StepVerifier;
27+
28+
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
29+
import org.springframework.context.annotation.Bean;
30+
import org.springframework.context.annotation.Configuration;
31+
import org.springframework.http.server.reactive.AbstractHttpHandlerIntegrationTests;
32+
import org.springframework.http.server.reactive.HttpHandler;
33+
import org.springframework.web.bind.annotation.RequestMapping;
34+
import org.springframework.web.bind.annotation.RestController;
35+
import org.springframework.web.reactive.DispatcherHandler;
36+
import org.springframework.web.reactive.config.EnableWebFlux;
37+
import org.springframework.web.reactive.function.client.WebClient;
38+
import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
39+
40+
/**
41+
* @author Sebastien Deleuze
42+
*/
43+
public class JsonStreamingIntegrationTests extends AbstractHttpHandlerIntegrationTests {
44+
45+
private AnnotationConfigApplicationContext wac;
46+
47+
private WebClient webClient;
48+
49+
50+
@Override
51+
@Before
52+
public void setup() throws Exception {
53+
super.setup();
54+
this.webClient = WebClient.create("http://localhost:" + this.port);
55+
}
56+
57+
58+
@Override
59+
protected HttpHandler createHttpHandler() {
60+
this.wac = new AnnotationConfigApplicationContext();
61+
this.wac.register(TestConfiguration.class);
62+
this.wac.refresh();
63+
64+
return WebHttpHandlerBuilder.webHandler(new DispatcherHandler(this.wac)).build();
65+
}
66+
67+
@Test
68+
public void jsonStreaming() throws Exception {
69+
Flux<Person> result = this.webClient.get()
70+
.uri("/stream")
71+
.accept(APPLICATION_STREAM_JSON)
72+
.exchange()
73+
.flatMap(response -> response.bodyToFlux(Person.class));
74+
75+
StepVerifier.create(result)
76+
.expectNext(new Person("foo 0"))
77+
.expectNext(new Person("foo 1"))
78+
.verifyComplete();
79+
}
80+
81+
@RestController
82+
@SuppressWarnings("unused")
83+
static class JsonStreamingController {
84+
85+
@RequestMapping(value = "/stream", produces = APPLICATION_STREAM_JSON_VALUE)
86+
Flux<Person> person() {
87+
return Flux.interval(Duration.ofMillis(100)).map(l -> new Person("foo " + l)).take(2);
88+
}
89+
90+
}
91+
92+
@Configuration
93+
@EnableWebFlux
94+
@SuppressWarnings("unused")
95+
static class TestConfiguration {
96+
97+
@Bean
98+
public JsonStreamingController jsonStreamingController() {
99+
return new JsonStreamingController();
100+
}
101+
}
102+
103+
private static class Person {
104+
105+
private String name;
106+
107+
@SuppressWarnings("unused")
108+
public Person() {
109+
}
110+
111+
public Person(String name) {
112+
this.name = name;
113+
}
114+
115+
public String getName() {
116+
return name;
117+
}
118+
119+
public void setName(String name) {
120+
this.name = name;
121+
}
122+
123+
@Override
124+
public boolean equals(Object o) {
125+
if (this == o) {
126+
return true;
127+
}
128+
if (o == null || getClass() != o.getClass()) {
129+
return false;
130+
}
131+
Person person = (Person) o;
132+
return !(this.name != null ? !this.name.equals(person.name) : person.name != null);
133+
}
134+
135+
@Override
136+
public int hashCode() {
137+
return this.name != null ? this.name.hashCode() : 0;
138+
}
139+
140+
@Override
141+
public String toString() {
142+
return "Person{" +
143+
"name='" + name + '\'' +
144+
'}';
145+
}
146+
}
147+
148+
}

0 commit comments

Comments
 (0)