Skip to content

Commit 89d56b1

Browse files
committed
Streaming ReactorClientHttpResponse
Closes gh-33781
1 parent f1cfe7a commit 89d56b1

File tree

1 file changed

+13
-8
lines changed

1 file changed

+13
-8
lines changed

spring-web/src/main/java/org/springframework/http/client/ReactorClientHttpResponse.java

+13-8
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import java.io.InputStream;
2121
import java.time.Duration;
2222

23+
import io.netty.buffer.ByteBuf;
24+
import org.reactivestreams.FlowAdapters;
2325
import reactor.netty.Connection;
2426
import reactor.netty.http.client.HttpClientResponse;
2527

@@ -79,19 +81,22 @@ public InputStream getBody() throws IOException {
7981
if (body != null) {
8082
return body;
8183
}
82-
8384
try {
84-
body = this.connection.inbound().receive().aggregate().asInputStream().block(this.readTimeout);
85+
SubscriberInputStream<ByteBuf> is = new SubscriberInputStream<>(
86+
byteBuf -> {
87+
byte[] bytes = new byte[byteBuf.readableBytes()];
88+
byteBuf.readBytes(bytes);
89+
byteBuf.release();
90+
return bytes;
91+
},
92+
ByteBuf::release, 16);
93+
this.connection.inbound().receive().retain().subscribe(FlowAdapters.toSubscriber(is));
94+
this.body = is;
95+
return is;
8596
}
8697
catch (RuntimeException ex) {
8798
throw ReactorClientHttpRequest.convertException(ex);
8899
}
89-
90-
if (body == null) {
91-
body = InputStream.nullInputStream();
92-
}
93-
this.body = body;
94-
return body;
95100
}
96101

97102
@Override

0 commit comments

Comments
 (0)