Skip to content

Commit 62f1d5f

Browse files
committed
SseEmitter supports custom MimeType
- Supports using `PRODUCIBLE_MEDIA_TYPES_ATTRIBUTE` to modify the default `text/event-stream` Signed-off-by: YunKui Lu <[email protected]>
1 parent 5e9f90f commit 62f1d5f

File tree

2 files changed

+106
-35
lines changed

2 files changed

+106
-35
lines changed

spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandler.java

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,6 @@
1616

1717
package org.springframework.web.servlet.mvc.method.annotation;
1818

19-
import java.io.IOException;
20-
import java.time.Duration;
21-
import java.util.ArrayList;
22-
import java.util.Collection;
23-
import java.util.List;
24-
import java.util.Optional;
25-
import java.util.concurrent.atomic.AtomicLong;
26-
import java.util.concurrent.atomic.AtomicReference;
27-
2819
import io.micrometer.context.ContextSnapshot;
2920
import io.micrometer.context.ContextSnapshotFactory;
3021
import org.apache.commons.logging.Log;
@@ -33,9 +24,6 @@
3324
import org.reactivestreams.Publisher;
3425
import org.reactivestreams.Subscriber;
3526
import org.reactivestreams.Subscription;
36-
import reactor.core.publisher.Flux;
37-
import reactor.core.publisher.Mono;
38-
3927
import org.springframework.core.MethodParameter;
4028
import org.springframework.core.ReactiveAdapter;
4129
import org.springframework.core.ReactiveAdapterRegistry;
@@ -59,6 +47,17 @@
5947
import org.springframework.web.context.request.async.WebAsyncUtils;
6048
import org.springframework.web.method.support.ModelAndViewContainer;
6149
import org.springframework.web.servlet.HandlerMapping;
50+
import reactor.core.publisher.Flux;
51+
import reactor.core.publisher.Mono;
52+
53+
import java.io.IOException;
54+
import java.time.Duration;
55+
import java.util.ArrayList;
56+
import java.util.Collection;
57+
import java.util.List;
58+
import java.util.Optional;
59+
import java.util.concurrent.atomic.AtomicLong;
60+
import java.util.concurrent.atomic.AtomicReference;
6261

6362
/**
6463
* Private helper class to assist with handling "reactive" return values types
@@ -160,7 +159,11 @@ public boolean isReactiveType(Class<?> type) {
160159
if (adapter.isMultiValue()) {
161160
if (mediaTypes.stream().anyMatch(MediaType.TEXT_EVENT_STREAM::includes) ||
162161
ServerSentEvent.class.isAssignableFrom(elementClass)) {
163-
SseEmitter emitter = new SseEmitter(STREAMING_TIMEOUT_VALUE);
162+
SseEmitter emitter = getSseEmitter(mediaType
163+
// When `mediaType` isn't TEXT_EVENT_STREAM, but elementClass is ServerSentEvent,
164+
// the `mediaType` will return null.
165+
.filter(MediaType.TEXT_EVENT_STREAM::includes)
166+
.orElse(null));
164167
new SseEmitterSubscriber(emitter, this.taskExecutor, taskDecorator).connect(adapter, returnValue);
165168
return emitter;
166169
}
@@ -239,6 +242,20 @@ protected void extendResponse(ServerHttpResponse outputMessage) {
239242
};
240243
}
241244

245+
private SseEmitter getSseEmitter(@Nullable MediaType mediaType) {
246+
return new SseEmitter(STREAMING_TIMEOUT_VALUE) {
247+
@Override
248+
protected void extendResponse(ServerHttpResponse outputMessage) {
249+
super.extendResponse(outputMessage);
250+
// The super class sets the content type to TEXT_EVENT_STREAM.
251+
// If we don't want to override it, we set `mediaType` to null.
252+
if (mediaType != null) {
253+
outputMessage.getHeaders().setContentType(mediaType);
254+
}
255+
}
256+
};
257+
}
258+
242259

243260
private abstract static class AbstractEmitterSubscriber implements Subscriber<Object>, Runnable {
244261

spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java

Lines changed: 76 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,6 @@
1616

1717
package org.springframework.web.servlet.mvc.method.annotation;
1818

19-
import java.io.IOException;
20-
import java.util.ArrayList;
21-
import java.util.Arrays;
22-
import java.util.Collections;
23-
import java.util.List;
24-
import java.util.Set;
25-
import java.util.concurrent.atomic.AtomicInteger;
26-
import java.util.concurrent.atomic.AtomicReference;
27-
import java.util.function.Consumer;
28-
import java.util.stream.Collectors;
29-
3019
import io.micrometer.context.ContextRegistry;
3120
import io.micrometer.context.ContextSnapshotFactory;
3221
import io.reactivex.rxjava3.core.Single;
@@ -35,11 +24,6 @@
3524
import org.jspecify.annotations.Nullable;
3625
import org.junit.jupiter.api.BeforeEach;
3726
import org.junit.jupiter.api.Test;
38-
import reactor.core.publisher.Flux;
39-
import reactor.core.publisher.Mono;
40-
import reactor.core.publisher.Sinks;
41-
import reactor.util.context.ReactorContextAccessor;
42-
4327
import org.springframework.core.MethodParameter;
4428
import org.springframework.core.ReactiveAdapterRegistry;
4529
import org.springframework.core.ResolvableType;
@@ -51,19 +35,26 @@
5135
import org.springframework.http.server.ServletServerHttpResponse;
5236
import org.springframework.web.accept.ContentNegotiationManager;
5337
import org.springframework.web.accept.ContentNegotiationManagerFactoryBean;
54-
import org.springframework.web.context.request.NativeWebRequest;
55-
import org.springframework.web.context.request.RequestAttributes;
56-
import org.springframework.web.context.request.RequestAttributesThreadLocalAccessor;
57-
import org.springframework.web.context.request.RequestContextHolder;
58-
import org.springframework.web.context.request.ServletRequestAttributes;
59-
import org.springframework.web.context.request.ServletWebRequest;
38+
import org.springframework.web.context.request.*;
6039
import org.springframework.web.context.request.async.AsyncWebRequest;
6140
import org.springframework.web.context.request.async.StandardServletAsyncWebRequest;
6241
import org.springframework.web.context.request.async.WebAsyncUtils;
6342
import org.springframework.web.method.support.ModelAndViewContainer;
6443
import org.springframework.web.servlet.HandlerMapping;
6544
import org.springframework.web.testfixture.servlet.MockHttpServletRequest;
6645
import org.springframework.web.testfixture.servlet.MockHttpServletResponse;
46+
import reactor.core.publisher.Flux;
47+
import reactor.core.publisher.Mono;
48+
import reactor.core.publisher.Sinks;
49+
import reactor.util.context.ReactorContextAccessor;
50+
51+
import java.io.IOException;
52+
import java.nio.charset.StandardCharsets;
53+
import java.util.*;
54+
import java.util.concurrent.atomic.AtomicInteger;
55+
import java.util.concurrent.atomic.AtomicReference;
56+
import java.util.function.Consumer;
57+
import java.util.stream.Collectors;
6758

6859
import static org.assertj.core.api.Assertions.assertThat;
6960
import static org.springframework.core.ResolvableType.forClass;
@@ -240,6 +231,22 @@ void mediaTypes() throws Exception {
240231
testSseResponse(false);
241232
}
242233

234+
@Test
235+
void mediaTypesWithCharset() throws Exception {
236+
237+
// Media type from request
238+
this.servletRequest.addHeader("Accept", "text/event-stream;charset=UTF-8");
239+
testSseResponse(true);
240+
241+
// Media type from "produces" attribute
242+
Set<MediaType> types = Collections.singleton(new MediaType("text", "event-stream", StandardCharsets.UTF_8));
243+
this.servletRequest.setAttribute(HandlerMapping.PRODUCIBLE_MEDIA_TYPES_ATTRIBUTE, types);
244+
testSseResponse(true);
245+
246+
// No media type preferences
247+
testSseResponse(false);
248+
}
249+
243250
private void testSseResponse(boolean expectSseEmitter) throws Exception {
244251
ResponseBodyEmitter emitter = handleValue(Flux.empty(), Flux.class, forClass(String.class));
245252
Object actual = emitter instanceof SseEmitter;
@@ -265,6 +272,53 @@ void writeServerSentEvents() throws Exception {
265272
assertThat(emitterHandler.getValuesAsText()).isEqualTo("data:foo\n\ndata:bar\n\ndata:baz\n\n");
266273
}
267274

275+
@Test
276+
void writeServerSentEventsWhenAcceptIsNotSseMediaType() throws Exception {
277+
278+
// Media type from request
279+
this.servletRequest.addHeader("Accept", "application/json;charset=UTF-8");
280+
281+
// When `Accept` isn't `text/event-stream`, but elementClass is ServerSentEvent.
282+
// We want to get `SseEmitter`.
283+
testSseResponseForServerSentEvent(true);
284+
285+
// No media type preferences
286+
testSseResponse(false);
287+
}
288+
289+
private void testSseResponseForServerSentEvent(boolean expectSseEmitter) throws Exception {
290+
ResolvableType type = ResolvableType.forClassWithGenerics(ServerSentEvent.class, String.class);
291+
ResponseBodyEmitter emitter = handleValue(Flux.empty(), Flux.class, type);
292+
Object actual = emitter instanceof SseEmitter;
293+
assertThat(actual).isEqualTo(expectSseEmitter);
294+
resetRequest();
295+
}
296+
297+
@Test
298+
void writeServerSentEventsWithCharset() throws Exception {
299+
300+
this.servletRequest.addHeader("Accept", "text/event-stream");
301+
Set<MediaType> types = Collections.singleton(new MediaType("text", "event-stream", StandardCharsets.UTF_8));
302+
this.servletRequest.setAttribute(HandlerMapping.PRODUCIBLE_MEDIA_TYPES_ATTRIBUTE, types);
303+
304+
Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
305+
SseEmitter sseEmitter = (SseEmitter) handleValue(sink.asFlux(), Flux.class, forClass(String.class));
306+
307+
EmitterHandler emitterHandler = new EmitterHandler();
308+
sseEmitter.initialize(emitterHandler);
309+
310+
ServletServerHttpResponse message = new ServletServerHttpResponse(this.servletResponse);
311+
sseEmitter.extendResponse(message);
312+
313+
sink.tryEmitNext("foo");
314+
sink.tryEmitNext("bar");
315+
sink.tryEmitNext("baz");
316+
sink.tryEmitComplete();
317+
318+
assertThat(message.getHeaders().getContentType()).hasToString("text/event-stream;charset=UTF-8");
319+
assertThat(emitterHandler.getValuesAsText()).isEqualTo("data:foo\n\ndata:bar\n\ndata:baz\n\n");
320+
}
321+
268322
@Test
269323
void writeServerSentEventsWithBuilder() throws Exception {
270324

0 commit comments

Comments
 (0)