Skip to content

Commit 6c33af0

Browse files
committed
Merge branch '5.1.x'
2 parents 57de2e0 + 85332c7 commit 6c33af0

File tree

2 files changed

+77
-64
lines changed

2 files changed

+77
-64
lines changed

spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java

Lines changed: 76 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -69,28 +69,29 @@ public abstract class DataBufferUtils {
6969
//---------------------------------------------------------------------
7070

7171
/**
72-
* Obtain a {@link InputStream} from the given supplier, and read it into a {@code Flux}
73-
* of {@code DataBuffer}s. Closes the input stream when the flux is terminated.
72+
* Obtain a {@link InputStream} from the given supplier, and read it into a
73+
* {@code Flux} of {@code DataBuffer}s. Closes the input stream when the
74+
* Flux is terminated.
7475
* @param inputStreamSupplier the supplier for the input stream to read from
75-
* @param dataBufferFactory the factory to create data buffers with
76+
* @param bufferFactory the factory to create data buffers with
7677
* @param bufferSize the maximum size of the data buffers
77-
* @return a flux of data buffers read from the given channel
78+
* @return a Flux of data buffers read from the given channel
7879
*/
7980
public static Flux<DataBuffer> readInputStream(
80-
Callable<InputStream> inputStreamSupplier, DataBufferFactory dataBufferFactory, int bufferSize) {
81+
Callable<InputStream> inputStreamSupplier, DataBufferFactory bufferFactory, int bufferSize) {
8182

8283
Assert.notNull(inputStreamSupplier, "'inputStreamSupplier' must not be null");
83-
84-
return readByteChannel(() -> Channels.newChannel(inputStreamSupplier.call()), dataBufferFactory, bufferSize);
84+
return readByteChannel(() -> Channels.newChannel(inputStreamSupplier.call()), bufferFactory, bufferSize);
8585
}
8686

8787
/**
88-
* Obtain a {@link ReadableByteChannel} from the given supplier, and read it into a
89-
* {@code Flux} of {@code DataBuffer}s. Closes the channel when the flux is terminated.
88+
* Obtain a {@link ReadableByteChannel} from the given supplier, and read
89+
* it into a {@code Flux} of {@code DataBuffer}s. Closes the channel when
90+
* the Flux is terminated.
9091
* @param channelSupplier the supplier for the channel to read from
9192
* @param bufferFactory the factory to create data buffers with
9293
* @param bufferSize the maximum size of the data buffers
93-
* @return a flux of data buffers read from the given channel
94+
* @return a Flux of data buffers read from the given channel
9495
*/
9596
public static Flux<DataBuffer> readByteChannel(
9697
Callable<ReadableByteChannel> channelSupplier, DataBufferFactory bufferFactory, int bufferSize) {
@@ -107,12 +108,13 @@ public static Flux<DataBuffer> readByteChannel(
107108
}
108109

109110
/**
110-
* Obtain a {@code AsynchronousFileChannel} from the given supplier, and read it into a
111-
* {@code Flux} of {@code DataBuffer}s. Closes the channel when the flux is terminated.
111+
* Obtain a {@code AsynchronousFileChannel} from the given supplier, and read
112+
* it into a {@code Flux} of {@code DataBuffer}s. Closes the channel when
113+
* the Flux is terminated.
112114
* @param channelSupplier the supplier for the channel to read from
113115
* @param bufferFactory the factory to create data buffers with
114116
* @param bufferSize the maximum size of the data buffers
115-
* @return a flux of data buffers read from the given channel
117+
* @return a Flux of data buffers read from the given channel
116118
*/
117119
public static Flux<DataBuffer> readAsynchronousFileChannel(
118120
Callable<AsynchronousFileChannel> channelSupplier, DataBufferFactory bufferFactory, int bufferSize) {
@@ -121,17 +123,18 @@ public static Flux<DataBuffer> readAsynchronousFileChannel(
121123
}
122124

123125
/**
124-
* Obtain a {@code AsynchronousFileChannel} from the given supplier, and read it into a
125-
* {@code Flux} of {@code DataBuffer}s, starting at the given position. Closes the
126-
* channel when the flux is terminated.
126+
* Obtain a {@code AsynchronousFileChannel} from the given supplier, and
127+
* read it into a {@code Flux} of {@code DataBuffer}s, starting at the given
128+
* position. Closes the channel when the Flux is terminated.
127129
* @param channelSupplier the supplier for the channel to read from
128130
* @param position the position to start reading from
129131
* @param bufferFactory the factory to create data buffers with
130132
* @param bufferSize the maximum size of the data buffers
131-
* @return a flux of data buffers read from the given channel
133+
* @return a Flux of data buffers read from the given channel
132134
*/
133-
public static Flux<DataBuffer> readAsynchronousFileChannel(Callable<AsynchronousFileChannel> channelSupplier,
134-
long position, DataBufferFactory bufferFactory, int bufferSize) {
135+
public static Flux<DataBuffer> readAsynchronousFileChannel(
136+
Callable<AsynchronousFileChannel> channelSupplier, long position,
137+
DataBufferFactory bufferFactory, int bufferSize) {
135138

136139
Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
137140
Assert.notNull(bufferFactory, "'dataBufferFactory' must not be null");
@@ -144,10 +147,10 @@ public static Flux<DataBuffer> readAsynchronousFileChannel(Callable<Asynchronous
144147
channel -> Flux.create(sink -> {
145148
ReadCompletionHandler handler =
146149
new ReadCompletionHandler(channel, sink, position, bufferFactoryToUse, bufferSize);
150+
sink.onDispose(handler::dispose);
147151
DataBuffer dataBuffer = bufferFactoryToUse.allocateBuffer(bufferSize);
148152
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize);
149153
channel.read(byteBuffer, position, dataBuffer, handler);
150-
sink.onDispose(handler::dispose);
151154
}),
152155
channel -> {
153156
// Do not close channel from here, rather wait for the current read callback
@@ -165,14 +168,12 @@ public static Flux<DataBuffer> readAsynchronousFileChannel(Callable<Asynchronous
165168
* fall back to {@link #readByteChannel(Callable, DataBufferFactory, int)}.
166169
* Closes the channel when the flux is terminated.
167170
* @param resource the resource to read from
168-
* @param dataBufferFactory the factory to create data buffers with
171+
* @param bufferFactory the factory to create data buffers with
169172
* @param bufferSize the maximum size of the data buffers
170-
* @return a flux of data buffers read from the given channel
173+
* @return a Flux of data buffers read from the given channel
171174
*/
172-
public static Flux<DataBuffer> read(
173-
Resource resource, DataBufferFactory dataBufferFactory, int bufferSize) {
174-
175-
return read(resource, 0, dataBufferFactory, bufferSize);
175+
public static Flux<DataBuffer> read(Resource resource, DataBufferFactory bufferFactory, int bufferSize) {
176+
return read(resource, 0, bufferFactory, bufferSize);
176177
}
177178

178179
/**
@@ -185,26 +186,25 @@ public static Flux<DataBuffer> read(
185186
* Closes the channel when the flux is terminated.
186187
* @param resource the resource to read from
187188
* @param position the position to start reading from
188-
* @param dataBufferFactory the factory to create data buffers with
189+
* @param bufferFactory the factory to create data buffers with
189190
* @param bufferSize the maximum size of the data buffers
190-
* @return a flux of data buffers read from the given channel
191+
* @return a Flux of data buffers read from the given channel
191192
*/
192193
public static Flux<DataBuffer> read(
193-
Resource resource, long position, DataBufferFactory dataBufferFactory, int bufferSize) {
194+
Resource resource, long position, DataBufferFactory bufferFactory, int bufferSize) {
194195

195196
try {
196197
if (resource.isFile()) {
197198
File file = resource.getFile();
198199
return readAsynchronousFileChannel(
199200
() -> AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ),
200-
position, dataBufferFactory, bufferSize);
201+
position, bufferFactory, bufferSize);
201202
}
202203
}
203204
catch (IOException ignore) {
204205
// fallback to resource.readableChannel(), below
205206
}
206-
207-
Flux<DataBuffer> result = readByteChannel(resource::readableChannel, dataBufferFactory, bufferSize);
207+
Flux<DataBuffer> result = readByteChannel(resource::readableChannel, bufferFactory, bufferSize);
208208
return position == 0 ? result : skipUntilByteCount(result, position);
209209
}
210210

@@ -214,16 +214,19 @@ public static Flux<DataBuffer> read(
214214
//---------------------------------------------------------------------
215215

216216
/**
217-
* Write the given stream of {@link DataBuffer DataBuffers} to the given {@code OutputStream}. Does
218-
* <strong>not</strong> close the output stream when the flux is terminated, and does
219-
* <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
220-
* source. If releasing is required, then subscribe to the returned {@code Flux} with a
221-
* {@link #releaseConsumer()}.
222-
* <p>Note that the writing process does not start until the returned {@code Flux} is subscribed to.
217+
* Write the given stream of {@link DataBuffer DataBuffers} to the given
218+
* {@code OutputStream}. Does <strong>not</strong> close the output stream
219+
* when the flux is terminated, and does <strong>not</strong>
220+
* {@linkplain #release(DataBuffer) release} the data buffers in the source.
221+
* If releasing is required, then subscribe to the returned {@code Flux}
222+
* with a {@link #releaseConsumer()}.
223+
* <p>Note that the writing process does not start until the returned
224+
* {@code Flux} is subscribed to.
223225
* @param source the stream of data buffers to be written
224226
* @param outputStream the output stream to write to
225-
* @return a flux containing the same buffers as in {@code source}, that starts the writing
226-
* process when subscribed to, and that publishes any writing errors and the completion signal
227+
* @return a Flux containing the same buffers as in {@code source}, that
228+
* starts the writing process when subscribed to, and that publishes any
229+
* writing errors and the completion signal
227230
*/
228231
public static Flux<DataBuffer> write(Publisher<DataBuffer> source, OutputStream outputStream) {
229232
Assert.notNull(source, "'source' must not be null");
@@ -234,16 +237,19 @@ public static Flux<DataBuffer> write(Publisher<DataBuffer> source, OutputStream
234237
}
235238

236239
/**
237-
* Write the given stream of {@link DataBuffer DataBuffers} to the given {@code WritableByteChannel}. Does
238-
* <strong>not</strong> close the channel when the flux is terminated, and does
239-
* <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
240-
* source. If releasing is required, then subscribe to the returned {@code Flux} with a
241-
* {@link #releaseConsumer()}.
242-
* <p>Note that the writing process does not start until the returned {@code Flux} is subscribed to.
240+
* Write the given stream of {@link DataBuffer DataBuffers} to the given
241+
* {@code WritableByteChannel}. Does <strong>not</strong> close the channel
242+
* when the flux is terminated, and does <strong>not</strong>
243+
* {@linkplain #release(DataBuffer) release} the data buffers in the source.
244+
* If releasing is required, then subscribe to the returned {@code Flux}
245+
* with a {@link #releaseConsumer()}.
246+
* <p>Note that the writing process does not start until the returned
247+
* {@code Flux} is subscribed to.
243248
* @param source the stream of data buffers to be written
244249
* @param channel the channel to write to
245-
* @return a flux containing the same buffers as in {@code source}, that starts the writing
246-
* process when subscribed to, and that publishes any writing errors and the completion signal
250+
* @return a Flux containing the same buffers as in {@code source}, that
251+
* starts the writing process when subscribed to, and that publishes any
252+
* writing errors and the completion signal
247253
*/
248254
public static Flux<DataBuffer> write(Publisher<DataBuffer> source, WritableByteChannel channel) {
249255
Assert.notNull(source, "'source' must not be null");
@@ -258,34 +264,40 @@ public static Flux<DataBuffer> write(Publisher<DataBuffer> source, WritableByteC
258264
}
259265

260266
/**
261-
* Write the given stream of {@link DataBuffer DataBuffers} to the given {@code AsynchronousFileChannel}.
262-
* Does <strong>not</strong> close the channel when the flux is terminated, and does
263-
* <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
264-
* source. If releasing is required, then subscribe to the returned {@code Flux} with a
265-
* {@link #releaseConsumer()}.
266-
* <p>Note that the writing process does not start until the returned {@code Flux} is subscribed to.
267+
* Write the given stream of {@link DataBuffer DataBuffers} to the given
268+
* {@code AsynchronousFileChannel}. Does <strong>not</strong> close the
269+
* channel when the flux is terminated, and does <strong>not</strong>
270+
* {@linkplain #release(DataBuffer) release} the data buffers in the source.
271+
* If releasing is required, then subscribe to the returned {@code Flux}
272+
* with a {@link #releaseConsumer()}.
273+
* <p>Note that the writing process does not start until the returned
274+
* {@code Flux} is subscribed to.
267275
* @param source the stream of data buffers to be written
268276
* @param channel the channel to write to
269-
* @return a flux containing the same buffers as in {@code source}, that starts the writing
270-
* process when subscribed to, and that publishes any writing errors and the completion signal
277+
* @return a Flux containing the same buffers as in {@code source}, that
278+
* starts the writing process when subscribed to, and that publishes any
279+
* writing errors and the completion signal
271280
* @since 5.0.10
272281
*/
273282
public static Flux<DataBuffer> write(Publisher<DataBuffer> source, AsynchronousFileChannel channel) {
274283
return write(source, channel, 0);
275284
}
276285

277286
/**
278-
* Write the given stream of {@link DataBuffer DataBuffers} to the given {@code AsynchronousFileChannel}.
279-
* Does <strong>not</strong> close the channel when the flux is terminated, and does
280-
* <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
281-
* source. If releasing is required, then subscribe to the returned {@code Flux} with a
287+
* Write the given stream of {@link DataBuffer DataBuffers} to the given
288+
* {@code AsynchronousFileChannel}. Does <strong>not</strong> close the channel
289+
* when the flux is terminated, and does <strong>not</strong>
290+
* {@linkplain #release(DataBuffer) release} the data buffers in the source.
291+
* If releasing is required, then subscribe to the returned {@code Flux} with a
282292
* {@link #releaseConsumer()}.
283-
* <p>Note that the writing process does not start until the returned {@code Flux} is subscribed to.
293+
* <p>Note that the writing process does not start until the returned
294+
* {@code Flux} is subscribed to.
284295
* @param source the stream of data buffers to be written
285296
* @param channel the channel to write to
286-
* @param position the file position at which the write is to begin; must be non-negative
287-
* @return a flux containing the same buffers as in {@code source}, that starts the writing
288-
* process when subscribed to, and that publishes any writing errors and the completion signal
297+
* @param position file position write write is to begin; must be non-negative
298+
* @return a flux containing the same buffers as in {@code source}, that
299+
* starts the writing process when subscribed to, and that publishes any
300+
* writing errors and the completion signal
289301
*/
290302
public static Flux<DataBuffer> write(
291303
Publisher<DataBuffer> source, AsynchronousFileChannel channel, long position) {

spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBufferFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ public DataBuffer wrap(byte[] bytes) {
9090
* @return the wrapped buffer
9191
*/
9292
public NettyDataBuffer wrap(ByteBuf byteBuf) {
93+
byteBuf.touch();
9394
return new NettyDataBuffer(byteBuf, this);
9495
}
9596

0 commit comments

Comments
 (0)