16
16
17
17
package org .springframework .http .client .reactive ;
18
18
19
+ import java .util .Objects ;
19
20
import java .util .concurrent .atomic .AtomicBoolean ;
20
21
22
+ import org .reactivestreams .Publisher ;
23
+ import org .reactivestreams .Subscriber ;
24
+ import org .reactivestreams .Subscription ;
21
25
import reactor .core .publisher .Flux ;
22
26
23
27
import org .springframework .core .io .buffer .DataBuffer ;
@@ -55,16 +59,7 @@ protected AbstractClientHttpResponse(HttpStatusCode statusCode, HttpHeaders head
55
59
this .statusCode = statusCode ;
56
60
this .headers = headers ;
57
61
this .cookies = cookies ;
58
- this .body = singleSubscription (body );
59
- }
60
-
61
- private static Flux <DataBuffer > singleSubscription (Flux <DataBuffer > body ) {
62
- AtomicBoolean subscribed = new AtomicBoolean ();
63
- return body .doOnSubscribe (s -> {
64
- if (!subscribed .compareAndSet (false , true )) {
65
- throw new IllegalStateException ("The client response body can only be consumed once" );
66
- }
67
- });
62
+ this .body = Flux .from (new SingleSubscriberPublisher <>(body ));
68
63
}
69
64
70
65
@@ -87,4 +82,39 @@ public MultiValueMap<String, ResponseCookie> getCookies() {
87
82
public Flux <DataBuffer > getBody () {
88
83
return this .body ;
89
84
}
85
+
86
+
87
+ private static final class SingleSubscriberPublisher <T > implements Publisher <T > {
88
+
89
+ private static final Subscription NO_OP_SUBSCRIPTION = new Subscription () {
90
+ @ Override
91
+ public void request (long l ) {
92
+ }
93
+
94
+ @ Override
95
+ public void cancel () {
96
+ }
97
+ };
98
+
99
+ private final Publisher <T > delegate ;
100
+
101
+ private final AtomicBoolean subscribed = new AtomicBoolean ();
102
+
103
+
104
+ public SingleSubscriberPublisher (Publisher <T > delegate ) {
105
+ this .delegate = delegate ;
106
+ }
107
+
108
+ @ Override
109
+ public void subscribe (Subscriber <? super T > subscriber ) {
110
+ Objects .requireNonNull (subscriber , "Subscriber must not be null" );
111
+ if (this .subscribed .compareAndSet (false , true )) {
112
+ this .delegate .subscribe (subscriber );
113
+ }
114
+ else {
115
+ subscriber .onSubscribe (NO_OP_SUBSCRIPTION );
116
+ subscriber .onError (new IllegalStateException ("The client response body can only be consumed once" ));
117
+ }
118
+ }
119
+ }
90
120
}
0 commit comments