1
1
/*
2
- * Copyright 2002-2024 the original author or authors.
2
+ * Copyright 2002-2025 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
21
21
import java .util .LinkedHashSet ;
22
22
import java .util .List ;
23
23
import java .util .Set ;
24
- import java .util .concurrent .atomic .AtomicBoolean ;
24
+ import java .util .concurrent .atomic .AtomicReference ;
25
25
import java .util .function .Consumer ;
26
26
27
27
import org .jspecify .annotations .Nullable ;
@@ -72,20 +72,19 @@ public class ResponseBodyEmitter {
72
72
73
73
private @ Nullable Handler handler ;
74
74
75
+ private final AtomicReference <State > state = new AtomicReference <>(State .START );
76
+
75
77
/** Store send data before handler is initialized. */
76
78
private final Set <DataWithMediaType > earlySendAttempts = new LinkedHashSet <>(8 );
77
79
78
- /** Store successful completion before the handler is initialized. */
79
- private final AtomicBoolean complete = new AtomicBoolean ();
80
-
81
80
/** Store an error before the handler is initialized. */
82
81
private @ Nullable Throwable failure ;
83
82
84
- private final DefaultCallback timeoutCallback = new DefaultCallback ();
83
+ private final TimeoutCallback timeoutCallback = new TimeoutCallback ();
85
84
86
85
private final ErrorCallback errorCallback = new ErrorCallback ();
87
86
88
- private final DefaultCallback completionCallback = new DefaultCallback ();
87
+ private final CompletionCallback completionCallback = new CompletionCallback ();
89
88
90
89
91
90
/**
@@ -125,7 +124,7 @@ synchronized void initialize(Handler handler) throws IOException {
125
124
this .earlySendAttempts .clear ();
126
125
}
127
126
128
- if (this .complete .get ()) {
127
+ if (this .state .get () == State . COMPLETE ) {
129
128
if (this .failure != null ) {
130
129
this .handler .completeWithError (this .failure );
131
130
}
@@ -141,7 +140,7 @@ synchronized void initialize(Handler handler) throws IOException {
141
140
}
142
141
143
142
void initializeWithError (Throwable ex ) {
144
- if (this .complete .compareAndSet (false , true )) {
143
+ if (this .state .compareAndSet (State . START , State . COMPLETE )) {
145
144
this .failure = ex ;
146
145
this .earlySendAttempts .clear ();
147
146
this .errorCallback .accept (ex );
@@ -183,8 +182,7 @@ public void send(Object object) throws IOException {
183
182
* @throws java.lang.IllegalStateException wraps any other errors
184
183
*/
185
184
public synchronized void send (Object object , @ Nullable MediaType mediaType ) throws IOException {
186
- Assert .state (!this .complete .get (), () -> "ResponseBodyEmitter has already completed" +
187
- (this .failure != null ? " with error: " + this .failure : "" ));
185
+ assertNotComplete ();
188
186
if (this .handler != null ) {
189
187
try {
190
188
this .handler .send (object , mediaType );
@@ -211,11 +209,15 @@ public synchronized void send(Object object, @Nullable MediaType mediaType) thro
211
209
* @since 6.0.12
212
210
*/
213
211
public synchronized void send (Set <DataWithMediaType > items ) throws IOException {
214
- Assert .state (!this .complete .get (), () -> "ResponseBodyEmitter has already completed" +
215
- (this .failure != null ? " with error: " + this .failure : "" ));
212
+ assertNotComplete ();
216
213
sendInternal (items );
217
214
}
218
215
216
+ private void assertNotComplete () {
217
+ Assert .state (this .state .get () == State .START , () -> "ResponseBodyEmitter has already completed" +
218
+ (this .failure != null ? " with error: " + this .failure : "" ));
219
+ }
220
+
219
221
private void sendInternal (Set <DataWithMediaType > items ) throws IOException {
220
222
if (items .isEmpty ()) {
221
223
return ;
@@ -245,7 +247,7 @@ private void sendInternal(Set<DataWithMediaType> items) throws IOException {
245
247
* related events such as an error while {@link #send(Object) sending}.
246
248
*/
247
249
public void complete () {
248
- if (this . complete . compareAndSet ( false , true ) && this .handler != null ) {
250
+ if (trySetComplete ( ) && this .handler != null ) {
249
251
this .handler .complete ();
250
252
}
251
253
}
@@ -262,14 +264,19 @@ public void complete() {
262
264
* {@link #send(Object) sending}.
263
265
*/
264
266
public void completeWithError (Throwable ex ) {
265
- if (this . complete . compareAndSet ( false , true )) {
267
+ if (trySetComplete ( )) {
266
268
this .failure = ex ;
267
269
if (this .handler != null ) {
268
270
this .handler .completeWithError (ex );
269
271
}
270
272
}
271
273
}
272
274
275
+ private boolean trySetComplete () {
276
+ return (this .state .compareAndSet (State .START , State .COMPLETE ) ||
277
+ (this .state .compareAndSet (State .TIMEOUT , State .COMPLETE )));
278
+ }
279
+
273
280
/**
274
281
* Register code to invoke when the async request times out. This method is
275
282
* called from a container thread when an async request times out.
@@ -364,7 +371,7 @@ public Object getData() {
364
371
}
365
372
366
373
367
- private class DefaultCallback implements Runnable {
374
+ private class TimeoutCallback implements Runnable {
368
375
369
376
private final List <Runnable > delegates = new ArrayList <>(1 );
370
377
@@ -374,9 +381,10 @@ public synchronized void addDelegate(Runnable delegate) {
374
381
375
382
@ Override
376
383
public void run () {
377
- ResponseBodyEmitter .this .complete .compareAndSet (false , true );
378
- for (Runnable delegate : this .delegates ) {
379
- delegate .run ();
384
+ if (ResponseBodyEmitter .this .state .compareAndSet (State .START , State .TIMEOUT )) {
385
+ for (Runnable delegate : this .delegates ) {
386
+ delegate .run ();
387
+ }
380
388
}
381
389
}
382
390
}
@@ -392,11 +400,51 @@ public synchronized void addDelegate(Consumer<Throwable> callback) {
392
400
393
401
@ Override
394
402
public void accept (Throwable t ) {
395
- ResponseBodyEmitter .this .complete .compareAndSet (false , true );
396
- for (Consumer <Throwable > delegate : this .delegates ) {
397
- delegate .accept (t );
403
+ if (ResponseBodyEmitter .this .state .compareAndSet (State .START , State .COMPLETE )) {
404
+ for (Consumer <Throwable > delegate : this .delegates ) {
405
+ delegate .accept (t );
406
+ }
407
+ }
408
+ }
409
+ }
410
+
411
+
412
+ private class CompletionCallback implements Runnable {
413
+
414
+ private final List <Runnable > delegates = new ArrayList <>(1 );
415
+
416
+ public synchronized void addDelegate (Runnable delegate ) {
417
+ this .delegates .add (delegate );
418
+ }
419
+
420
+ @ Override
421
+ public void run () {
422
+ if (ResponseBodyEmitter .this .state .compareAndSet (State .START , State .COMPLETE )) {
423
+ for (Runnable delegate : this .delegates ) {
424
+ delegate .run ();
425
+ }
398
426
}
399
427
}
400
428
}
401
429
430
+
431
+ /**
432
+ * Represents a state for {@link ResponseBodyEmitter}.
433
+ * <p><pre>
434
+ * START ----+
435
+ * | |
436
+ * v |
437
+ * TIMEOUT |
438
+ * | |
439
+ * v |
440
+ * COMPLETE <--+
441
+ * </pre>
442
+ * @since 6.2.4
443
+ */
444
+ private enum State {
445
+ START ,
446
+ TIMEOUT , // handling a timeout
447
+ COMPLETE
448
+ }
449
+
402
450
}
0 commit comments