Skip to content

Commit ec2218c

Browse files
violetaggbclozel
authored andcommitted
AbstractListenerWriteFlushProcessor: Ensure the last flush will be performed
When writing Publisher<Publisher<T>>, a flush operation is performed onComplete for every Publisher. If the flush operation is not able to be performed immediately it will be retried before starting to process data provided by the next Publisher. For the last Publisher the implementation needs to ensure that the flush will be performed only then whole operation will complete. Issue: SPR-15949
1 parent 2510db0 commit ec2218c

File tree

3 files changed

+116
-28
lines changed

3 files changed

+116
-28
lines changed

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,29 @@ protected void flushingFailed(Throwable t) {
124124
*/
125125
protected abstract void flush() throws IOException;
126126

127+
/**
128+
* Whether writing is possible.
129+
*/
130+
protected abstract boolean isWritePossible();
131+
132+
/**
133+
* Whether flushing is pending.
134+
*/
135+
protected abstract boolean isFlushPending();
136+
137+
/**
138+
* Listeners can call this to notify when flushing is possible.
139+
*/
140+
protected final void onFlushPossible() {
141+
this.state.get().onFlushPossible(this);
142+
}
143+
144+
private void flushIfPossible() {
145+
if (isWritePossible()) {
146+
onFlushPossible();
147+
}
148+
}
149+
127150

128151
private boolean changeState(State oldState, State newState) {
129152
return this.state.compareAndSet(oldState, newState);
@@ -181,7 +204,12 @@ public <T> void writeComplete(AbstractListenerWriteFlushProcessor<T> processor)
181204
return;
182205
}
183206
if (processor.subscriberCompleted) {
184-
if (processor.changeState(this, COMPLETED)) {
207+
if (processor.isFlushPending()) {
208+
// Ensure the final flush
209+
processor.changeState(this, FLUSHING);
210+
processor.flushIfPossible();
211+
}
212+
else if (processor.changeState(this, COMPLETED)) {
185213
processor.resultPublisher.publishComplete();
186214
}
187215
}
@@ -198,6 +226,28 @@ public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) {
198226
}
199227
},
200228

229+
FLUSHING {
230+
public <T> void onFlushPossible(AbstractListenerWriteFlushProcessor<T> processor) {
231+
try {
232+
processor.flush();
233+
}
234+
catch (IOException ex) {
235+
processor.flushingFailed(ex);
236+
return;
237+
}
238+
if (processor.changeState(this, COMPLETED)) {
239+
processor.resultPublisher.publishComplete();
240+
}
241+
}
242+
public <T> void onNext(AbstractListenerWriteFlushProcessor<T> processor, Publisher<? extends T> publisher) {
243+
// ignore
244+
}
245+
@Override
246+
public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) {
247+
// ignore
248+
}
249+
},
250+
201251
COMPLETED {
202252
@Override
203253
public <T> void onNext(AbstractListenerWriteFlushProcessor<T> processor, Publisher<? extends T> publisher) {
@@ -235,6 +285,10 @@ public <T> void writeComplete(AbstractListenerWriteFlushProcessor<T> processor)
235285
// ignore
236286
}
237287

288+
public <T> void onFlushPossible(AbstractListenerWriteFlushProcessor<T> processor) {
289+
// ignore
290+
}
291+
238292

239293
private static class WriteSubscriber implements Subscriber<Void> {
240294

spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java

Lines changed: 37 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import java.io.IOException;
2020
import java.io.InputStream;
21-
import java.io.UncheckedIOException;
2221
import java.nio.charset.Charset;
2322
import java.util.List;
2423
import java.util.Map;
@@ -52,6 +51,8 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
5251

5352
private final HttpServletResponse response;
5453

54+
private final ServletOutputStream outputStream;
55+
5556
private final int bufferSize;
5657

5758
@Nullable
@@ -73,6 +74,7 @@ public ServletServerHttpResponse(HttpServletResponse response, AsyncContext asyn
7374
Assert.isTrue(bufferSize > 0, "Buffer size must be greater than 0");
7475

7576
this.response = response;
77+
this.outputStream = response.getOutputStream();
7678
this.bufferSize = bufferSize;
7779

7880
asyncContext.addListener(new ResponseAsyncListener());
@@ -147,7 +149,7 @@ protected Processor<? super Publisher<? extends DataBuffer>, Void> createBodyFlu
147149
* @return the number of bytes written
148150
*/
149151
protected int writeToOutputStream(DataBuffer dataBuffer) throws IOException {
150-
ServletOutputStream outputStream = response.getOutputStream();
152+
ServletOutputStream outputStream = this.outputStream;
151153
InputStream input = dataBuffer.asInputStream();
152154
int bytesWritten = 0;
153155
byte[] buffer = new byte[this.bufferSize];
@@ -160,7 +162,7 @@ protected int writeToOutputStream(DataBuffer dataBuffer) throws IOException {
160162
}
161163

162164
private void flush() throws IOException {
163-
ServletOutputStream outputStream = this.response.getOutputStream();
165+
ServletOutputStream outputStream = this.outputStream;
164166
if (outputStream.isReady()) {
165167
try {
166168
outputStream.flush();
@@ -176,6 +178,10 @@ private void flush() throws IOException {
176178
}
177179
}
178180

181+
private boolean isWritePossible() {
182+
return this.outputStream.isReady();
183+
}
184+
179185

180186
private final class ResponseAsyncListener implements AsyncListener {
181187

@@ -233,6 +239,12 @@ public void onWritePossible() throws IOException {
233239
if (processor != null) {
234240
processor.onWritePossible();
235241
}
242+
else {
243+
ResponseBodyFlushProcessor flushProcessor = bodyFlushProcessor;
244+
if (flushProcessor != null) {
245+
flushProcessor.onFlushPossible();
246+
}
247+
}
236248
}
237249

238250
@Override
@@ -242,6 +254,13 @@ public void onError(Throwable ex) {
242254
processor.cancel();
243255
processor.onError(ex);
244256
}
257+
else {
258+
ResponseBodyFlushProcessor flushProcessor = bodyFlushProcessor;
259+
if (flushProcessor != null) {
260+
flushProcessor.cancel();
261+
flushProcessor.onError(ex);
262+
}
263+
}
245264
}
246265
}
247266

@@ -250,15 +269,9 @@ private class ResponseBodyFlushProcessor extends AbstractListenerWriteFlushProce
250269

251270
@Override
252271
protected Processor<? super DataBuffer, Void> createWriteProcessor() {
253-
try {
254-
ServletOutputStream outputStream = response.getOutputStream();
255-
ResponseBodyProcessor processor = new ResponseBodyProcessor(outputStream);
256-
bodyProcessor = processor;
257-
return processor;
258-
}
259-
catch (IOException ex) {
260-
throw new UncheckedIOException(ex);
261-
}
272+
ResponseBodyProcessor processor = new ResponseBodyProcessor();
273+
bodyProcessor = processor;
274+
return processor;
262275
}
263276

264277
@Override
@@ -268,20 +281,24 @@ protected void flush() throws IOException {
268281
}
269282
ServletServerHttpResponse.this.flush();
270283
}
271-
}
272284

285+
@Override
286+
protected boolean isWritePossible() {
287+
return ServletServerHttpResponse.this.isWritePossible();
288+
}
273289

274-
private class ResponseBodyProcessor extends AbstractListenerWriteProcessor<DataBuffer> {
290+
@Override
291+
protected boolean isFlushPending() {
292+
return flushOnNext;
293+
}
294+
}
275295

276-
private final ServletOutputStream outputStream;
277296

278-
public ResponseBodyProcessor(ServletOutputStream outputStream) {
279-
this.outputStream = outputStream;
280-
}
297+
private class ResponseBodyProcessor extends AbstractListenerWriteProcessor<DataBuffer> {
281298

282299
@Override
283300
protected boolean isWritePossible() {
284-
return this.outputStream.isReady();
301+
return ServletServerHttpResponse.this.isWritePossible();
285302
}
286303

287304
@Override
@@ -306,7 +323,7 @@ protected boolean write(DataBuffer dataBuffer) throws IOException {
306323
}
307324
flush();
308325
}
309-
boolean ready = this.outputStream.isReady();
326+
boolean ready = ServletServerHttpResponse.this.isWritePossible();
310327
if (this.logger.isTraceEnabled()) {
311328
this.logger.trace("write: " + dataBuffer + " ready: " + ready);
312329
}

spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,20 @@ private ResponseBodyProcessor createBodyProcessor() {
147147
return new ResponseBodyProcessor(this.responseChannel);
148148
}
149149

150+
private boolean isWritePossible() {
151+
if (this.responseChannel == null) {
152+
this.responseChannel = this.exchange.getResponseChannel();
153+
}
154+
if (this.responseChannel.isWriteResumed()) {
155+
return true;
156+
} else {
157+
this.responseChannel.resumeWrites();
158+
return false;
159+
}
160+
}
161+
150162

151-
private static class ResponseBodyProcessor extends AbstractListenerWriteProcessor<DataBuffer> {
163+
private class ResponseBodyProcessor extends AbstractListenerWriteProcessor<DataBuffer> {
152164

153165
private final StreamSinkChannel channel;
154166

@@ -164,12 +176,7 @@ public ResponseBodyProcessor(StreamSinkChannel channel) {
164176

165177
@Override
166178
protected boolean isWritePossible() {
167-
if (this.channel.isWriteResumed()) {
168-
return true;
169-
} else {
170-
this.channel.resumeWrites();
171-
return false;
172-
}
179+
return UndertowServerHttpResponse.this.isWritePossible();
173180
}
174181

175182
@Override
@@ -264,6 +271,16 @@ protected void flushingFailed(Throwable t) {
264271
cancel();
265272
onError(t);
266273
}
274+
275+
@Override
276+
protected boolean isWritePossible() {
277+
return UndertowServerHttpResponse.this.isWritePossible();
278+
}
279+
280+
@Override
281+
protected boolean isFlushPending() {
282+
return false;
283+
}
267284
}
268285

269286
}

0 commit comments

Comments
 (0)