Skip to content

Commit dfaf7a0

Browse files
committed
Rename InputStreamSubscriber to SubscriberInputStream
It is both, but InputStream is what's exposed for public use, in effect an InputStream backed by a Subscriber source. See gh-31677
1 parent 86a42db commit dfaf7a0

File tree

5 files changed

+34
-35
lines changed

5 files changed

+34
-35
lines changed

spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -472,16 +472,16 @@ public static Publisher<DataBuffer> outputStreamPublisher(
472472
* Note: {@link Subscription#request(long)} happens eagerly for the first time upon subscription
473473
* and then repeats every time {@code bufferSize - (bufferSize >> 2)} consumed.
474474
* @param publisher the source of {@link DataBuffer} which should be represented as an {@link InputStream}
475-
* @param bufferSize the maximum amount of {@link DataBuffer} prefetched in advance and stored inside {@link InputStream}
475+
* @param demand the maximum number of buffers to request from the Publisher and buffer on an ongoing basis
476476
* @return an {@link InputStream} instance representing given {@link Publisher} messages
477477
*/
478-
public static <T extends DataBuffer> InputStream subscribeAsInputStream(Publisher<T> publisher, int bufferSize) {
478+
public static <T extends DataBuffer> InputStream subscriberInputStream(Publisher<T> publisher, int demand) {
479479
Assert.notNull(publisher, "Publisher must not be null");
480-
Assert.isTrue(bufferSize > 0, "Buffer size must be > 0");
480+
Assert.isTrue(demand > 0, "maxBufferCount must be > 0");
481481

482-
InputStreamSubscriber inputStreamSubscriber = new InputStreamSubscriber(bufferSize);
483-
publisher.subscribe(inputStreamSubscriber);
484-
return inputStreamSubscriber;
482+
SubscriberInputStream subscriber = new SubscriberInputStream(demand);
483+
publisher.subscribe(subscriber);
484+
return subscriber;
485485
}
486486

487487

spring-core/src/main/java/org/springframework/core/io/buffer/InputStreamSubscriber.java renamed to spring-core/src/main/java/org/springframework/core/io/buffer/SubscriberInputStream.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,13 @@
3939
* Bridges between {@link Publisher Publisher&lt;DataBuffer&gt;} and {@link InputStream}.
4040
*
4141
* <p>Note that this class has a near duplicate in
42-
* {@link org.springframework.http.client.InputStreamSubscriber}.
42+
* {@link org.springframework.http.client.SubscriberInputStream}.
4343
*
4444
* @author Oleh Dokuka
4545
* @author Rossen Stoyanchev
4646
* @since 6.2
4747
*/
48-
final class InputStreamSubscriber extends InputStream implements Subscriber<DataBuffer> {
48+
final class SubscriberInputStream extends InputStream implements Subscriber<DataBuffer> {
4949

5050
private static final Object READY = new Object();
5151

@@ -82,10 +82,10 @@ final class InputStreamSubscriber extends InputStream implements Subscriber<Data
8282
private Throwable error;
8383

8484

85-
InputStreamSubscriber(int prefetch) {
86-
this.prefetch = prefetch;
87-
this.limit = (prefetch == Integer.MAX_VALUE ? Integer.MAX_VALUE : prefetch - (prefetch >> 2));
88-
this.queue = new ArrayBlockingQueue<>(prefetch);
85+
SubscriberInputStream(int demand) {
86+
this.prefetch = demand;
87+
this.limit = (demand == Integer.MAX_VALUE ? Integer.MAX_VALUE : demand - (demand >> 2));
88+
this.queue = new ArrayBlockingQueue<>(demand);
8989
this.lock = new ReentrantLock(false);
9090
}
9191

spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -754,7 +754,7 @@ void genericInputStreamSubscriberTest(
754754
byte[] chunk = new byte[readChunkSize];
755755
List<String> words = new ArrayList<>();
756756

757-
try (InputStream in = DataBufferUtils.subscribeAsInputStream(publisher, bufferSize)) {
757+
try (InputStream in = DataBufferUtils.subscriberInputStream(publisher, bufferSize)) {
758758
int read;
759759
while ((read = in.read(chunk)) > -1) {
760760
words.add(new String(chunk, 0, read, StandardCharsets.UTF_8));
@@ -791,7 +791,7 @@ void inputStreamSubscriberError(DataBufferFactory factory) {
791791
byte[] chunk = new byte[4];
792792
List<String> words = new ArrayList<>();
793793

794-
try (InputStream in = DataBufferUtils.subscribeAsInputStream(publisher, 1)) {
794+
try (InputStream in = DataBufferUtils.subscriberInputStream(publisher, 1)) {
795795
int read;
796796
while ((read = in.read(chunk)) > -1) {
797797
words.add(new String(chunk, 0, read, StandardCharsets.UTF_8));
@@ -830,7 +830,7 @@ void inputStreamSubscriberMixedReadMode(DataBufferFactory factory) {
830830
byte[] chunk = new byte[3];
831831
ArrayList<String> words = new ArrayList<>();
832832

833-
try (InputStream inputStream = DataBufferUtils.subscribeAsInputStream(publisher, 1)) {
833+
try (InputStream inputStream = DataBufferUtils.subscriberInputStream(publisher, 1)) {
834834
words.add(new String(chunk,0, inputStream.read(chunk), StandardCharsets.UTF_8));
835835
assertThat(inputStream.read()).isEqualTo(' ' & 0xFF);
836836
words.add(new String(chunk,0, inputStream.read(chunk), StandardCharsets.UTF_8));
@@ -874,7 +874,7 @@ void inputStreamSubscriberClose(DataBufferFactory bufferFactory) throws Interrup
874874
byte[] chunk = new byte[3];
875875
ArrayList<String> words = new ArrayList<>();
876876

877-
try (InputStream in = DataBufferUtils.subscribeAsInputStream(publisher, ThreadLocalRandom.current().nextInt(1, 4))) {
877+
try (InputStream in = DataBufferUtils.subscriberInputStream(publisher, ThreadLocalRandom.current().nextInt(1, 4))) {
878878
in.read(chunk);
879879
String word = new String(chunk, StandardCharsets.UTF_8);
880880
words.add(word);

spring-web/src/main/java/org/springframework/http/client/InputStreamSubscriber.java renamed to spring-web/src/main/java/org/springframework/http/client/SubscriberInputStream.java

+11-12
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,16 @@
4242
* Bridges between {@link Flow.Publisher Flow.Publisher&lt;T&gt;} and {@link InputStream}.
4343
*
4444
* <p>Note that this class has a near duplicate in
45-
* {@link org.springframework.core.io.buffer.InputStreamSubscriber}.
45+
* {@link org.springframework.core.io.buffer.SubscriberInputStream}.
4646
*
4747
* @author Oleh Dokuka
4848
* @author Rossen Stoyanchev
4949
* @since 6.2
5050
* @param <T> the publisher byte buffer type
5151
*/
52-
final class InputStreamSubscriber<T> extends InputStream implements Flow.Subscriber<T> {
52+
final class SubscriberInputStream<T> extends InputStream implements Flow.Subscriber<T> {
5353

54-
private static final Log logger = LogFactory.getLog(InputStreamSubscriber.class);
54+
private static final Log logger = LogFactory.getLog(SubscriberInputStream.class);
5555

5656
private static final Object READY = new Object();
5757

@@ -94,12 +94,12 @@ final class InputStreamSubscriber<T> extends InputStream implements Flow.Subscri
9494
private Throwable error;
9595

9696

97-
private InputStreamSubscriber(Function<T, byte[]> mapper, Consumer<T> onDiscardHandler, int prefetch) {
97+
private SubscriberInputStream(Function<T, byte[]> mapper, Consumer<T> onDiscardHandler, int demand) {
9898
this.mapper = mapper;
9999
this.onDiscardHandler = onDiscardHandler;
100-
this.prefetch = prefetch;
101-
this.limit = (prefetch == Integer.MAX_VALUE ? Integer.MAX_VALUE : prefetch - (prefetch >> 2));
102-
this.queue = new ArrayBlockingQueue<>(prefetch);
100+
this.prefetch = demand;
101+
this.limit = (demand == Integer.MAX_VALUE ? Integer.MAX_VALUE : demand - (demand >> 2));
102+
this.queue = new ArrayBlockingQueue<>(demand);
103103
this.lock = new ReentrantLock(false);
104104
}
105105

@@ -122,17 +122,16 @@ private InputStreamSubscriber(Function<T, byte[]> mapper, Consumer<T> onDiscardH
122122
* @param publisher the source of {@link DataBuffer} which should be represented as an {@link InputStream}
123123
* @param mapper function to transform &lt;T&gt; element to {@code byte[]}. Note, &lt;T&gt; should be released during the mapping if needed.
124124
* @param onDiscardHandler &lt;T&gt; element consumer if returned {@link InputStream} is closed prematurely.
125-
* @param bufferSize the maximum amount of &lt;T&gt; elements prefetched in advance and stored inside {@link InputStream}
125+
* @param demand the maximum number of buffers to request from the Publisher and buffer on an ongoing basis
126126
* @return an {@link InputStream} instance representing given {@link Flow.Publisher} messages
127127
*/
128-
public static <T> InputStream subscribeTo(Flow.Publisher<T> publisher, Function<T, byte[]> mapper, Consumer<T> onDiscardHandler, int bufferSize) {
129-
128+
public static <T> InputStream subscribeTo(Flow.Publisher<T> publisher, Function<T, byte[]> mapper, Consumer<T> onDiscardHandler, int demand) {
130129
Assert.notNull(publisher, "Flow.Publisher must not be null");
131130
Assert.notNull(mapper, "mapper must not be null");
132131
Assert.notNull(onDiscardHandler, "onDiscardHandler must not be null");
133-
Assert.isTrue(bufferSize > 0, "bufferSize must be greater than 0");
132+
Assert.isTrue(demand > 0, "demand must be greater than 0");
134133

135-
InputStreamSubscriber<T> iss = new InputStreamSubscriber<>(mapper, onDiscardHandler, bufferSize);
134+
SubscriberInputStream<T> iss = new SubscriberInputStream<>(mapper, onDiscardHandler, demand);
136135
publisher.subscribe(iss);
137136
return iss;
138137
}

spring-web/src/test/java/org/springframework/http/client/InputStreamSubscriberTests.java renamed to spring-web/src/test/java/org/springframework/http/client/SubscriberInputStreamTests.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,12 @@
3636
import static org.assertj.core.api.Assertions.assertThatIOException;
3737

3838
/**
39-
* Unit tests for {@link InputStreamSubscriber}.
39+
* Unit tests for {@link SubscriberInputStream}.
4040
*
4141
* @author Arjen Poutsma
4242
* @author Oleh Dokuka
4343
*/
44-
class InputStreamSubscriberTests {
44+
class SubscriberInputStreamTests {
4545

4646
private static final byte[] FOO = "foo".getBytes(UTF_8);
4747

@@ -98,7 +98,7 @@ void flush() throws IOException {
9898
this.byteMapper, this.executor, null);
9999

100100

101-
try (InputStream is = InputStreamSubscriber.subscribeTo(
101+
try (InputStream is = SubscriberInputStream.subscribeTo(
102102
toStringPublisher(publisher), s -> s.getBytes(UTF_8), s -> {}, 1)) {
103103

104104
byte[] chunk = new byte[3];
@@ -123,7 +123,7 @@ void chunkSize() {
123123
},
124124
this.byteMapper, this.executor, 2);
125125

126-
try (InputStream is = InputStreamSubscriber.subscribeTo(
126+
try (InputStream is = SubscriberInputStream.subscribeTo(
127127
toStringPublisher(publisher), s -> s.getBytes(UTF_8), s -> {}, 1)) {
128128

129129
StringBuilder stringBuilder = new StringBuilder();
@@ -167,7 +167,7 @@ void cancel() throws InterruptedException {
167167

168168
List<String> discarded = new ArrayList<>();
169169

170-
try (InputStream is = InputStreamSubscriber.subscribeTo(
170+
try (InputStream is = SubscriberInputStream.subscribeTo(
171171
toStringPublisher(publisher), s -> s.getBytes(UTF_8), discarded::add, 1)) {
172172

173173
byte[] chunk = new byte[3];
@@ -198,7 +198,7 @@ void closed() throws InterruptedException {
198198
},
199199
this.byteMapper, this.executor, null);
200200

201-
try (InputStream is = InputStreamSubscriber.subscribeTo(
201+
try (InputStream is = SubscriberInputStream.subscribeTo(
202202
toStringPublisher(publisher), s -> s.getBytes(UTF_8), s -> {}, 1)) {
203203

204204
byte[] chunk = new byte[3];
@@ -234,7 +234,7 @@ void mapperThrowsException() throws InterruptedException {
234234
Throwable savedEx = null;
235235

236236
StringBuilder sb = new StringBuilder();
237-
try (InputStream is = InputStreamSubscriber.subscribeTo(
237+
try (InputStream is = SubscriberInputStream.subscribeTo(
238238
publisher, s -> { throw new NullPointerException("boom"); }, s -> {}, 1)) {
239239

240240
byte[] chunk = new byte[3];

0 commit comments

Comments
 (0)