@@ -69,28 +69,29 @@ public abstract class DataBufferUtils {
69
69
//---------------------------------------------------------------------
70
70
71
71
/**
72
- * Obtain a {@link InputStream} from the given supplier, and read it into a {@code Flux}
73
- * of {@code DataBuffer}s. Closes the input stream when the flux is terminated.
72
+ * Obtain a {@link InputStream} from the given supplier, and read it into a
73
+ * {@code Flux} of {@code DataBuffer}s. Closes the input stream when the
74
+ * Flux is terminated.
74
75
* @param inputStreamSupplier the supplier for the input stream to read from
75
- * @param dataBufferFactory the factory to create data buffers with
76
+ * @param bufferFactory the factory to create data buffers with
76
77
* @param bufferSize the maximum size of the data buffers
77
- * @return a flux of data buffers read from the given channel
78
+ * @return a Flux of data buffers read from the given channel
78
79
*/
79
80
public static Flux <DataBuffer > readInputStream (
80
- Callable <InputStream > inputStreamSupplier , DataBufferFactory dataBufferFactory , int bufferSize ) {
81
+ Callable <InputStream > inputStreamSupplier , DataBufferFactory bufferFactory , int bufferSize ) {
81
82
82
83
Assert .notNull (inputStreamSupplier , "'inputStreamSupplier' must not be null" );
83
-
84
- return readByteChannel (() -> Channels .newChannel (inputStreamSupplier .call ()), dataBufferFactory , bufferSize );
84
+ return readByteChannel (() -> Channels .newChannel (inputStreamSupplier .call ()), bufferFactory , bufferSize );
85
85
}
86
86
87
87
/**
88
- * Obtain a {@link ReadableByteChannel} from the given supplier, and read it into a
89
- * {@code Flux} of {@code DataBuffer}s. Closes the channel when the flux is terminated.
88
+ * Obtain a {@link ReadableByteChannel} from the given supplier, and read
89
+ * it into a {@code Flux} of {@code DataBuffer}s. Closes the channel when
90
+ * the Flux is terminated.
90
91
* @param channelSupplier the supplier for the channel to read from
91
92
* @param bufferFactory the factory to create data buffers with
92
93
* @param bufferSize the maximum size of the data buffers
93
- * @return a flux of data buffers read from the given channel
94
+ * @return a Flux of data buffers read from the given channel
94
95
*/
95
96
public static Flux <DataBuffer > readByteChannel (
96
97
Callable <ReadableByteChannel > channelSupplier , DataBufferFactory bufferFactory , int bufferSize ) {
@@ -107,12 +108,13 @@ public static Flux<DataBuffer> readByteChannel(
107
108
}
108
109
109
110
/**
110
- * Obtain a {@code AsynchronousFileChannel} from the given supplier, and read it into a
111
- * {@code Flux} of {@code DataBuffer}s. Closes the channel when the flux is terminated.
111
+ * Obtain a {@code AsynchronousFileChannel} from the given supplier, and read
112
+ * it into a {@code Flux} of {@code DataBuffer}s. Closes the channel when
113
+ * the Flux is terminated.
112
114
* @param channelSupplier the supplier for the channel to read from
113
115
* @param bufferFactory the factory to create data buffers with
114
116
* @param bufferSize the maximum size of the data buffers
115
- * @return a flux of data buffers read from the given channel
117
+ * @return a Flux of data buffers read from the given channel
116
118
*/
117
119
public static Flux <DataBuffer > readAsynchronousFileChannel (
118
120
Callable <AsynchronousFileChannel > channelSupplier , DataBufferFactory bufferFactory , int bufferSize ) {
@@ -121,17 +123,18 @@ public static Flux<DataBuffer> readAsynchronousFileChannel(
121
123
}
122
124
123
125
/**
124
- * Obtain a {@code AsynchronousFileChannel} from the given supplier, and read it into a
125
- * {@code Flux} of {@code DataBuffer}s, starting at the given position. Closes the
126
- * channel when the flux is terminated.
126
+ * Obtain a {@code AsynchronousFileChannel} from the given supplier, and
127
+ * read it into a {@code Flux} of {@code DataBuffer}s, starting at the given
128
+ * position. Closes the channel when the Flux is terminated.
127
129
* @param channelSupplier the supplier for the channel to read from
128
130
* @param position the position to start reading from
129
131
* @param bufferFactory the factory to create data buffers with
130
132
* @param bufferSize the maximum size of the data buffers
131
- * @return a flux of data buffers read from the given channel
133
+ * @return a Flux of data buffers read from the given channel
132
134
*/
133
- public static Flux <DataBuffer > readAsynchronousFileChannel (Callable <AsynchronousFileChannel > channelSupplier ,
134
- long position , DataBufferFactory bufferFactory , int bufferSize ) {
135
+ public static Flux <DataBuffer > readAsynchronousFileChannel (
136
+ Callable <AsynchronousFileChannel > channelSupplier , long position ,
137
+ DataBufferFactory bufferFactory , int bufferSize ) {
135
138
136
139
Assert .notNull (channelSupplier , "'channelSupplier' must not be null" );
137
140
Assert .notNull (bufferFactory , "'dataBufferFactory' must not be null" );
@@ -165,14 +168,12 @@ public static Flux<DataBuffer> readAsynchronousFileChannel(Callable<Asynchronous
165
168
* fall back to {@link #readByteChannel(Callable, DataBufferFactory, int)}.
166
169
* Closes the channel when the flux is terminated.
167
170
* @param resource the resource to read from
168
- * @param dataBufferFactory the factory to create data buffers with
171
+ * @param bufferFactory the factory to create data buffers with
169
172
* @param bufferSize the maximum size of the data buffers
170
- * @return a flux of data buffers read from the given channel
173
+ * @return a Flux of data buffers read from the given channel
171
174
*/
172
- public static Flux <DataBuffer > read (
173
- Resource resource , DataBufferFactory dataBufferFactory , int bufferSize ) {
174
-
175
- return read (resource , 0 , dataBufferFactory , bufferSize );
175
+ public static Flux <DataBuffer > read (Resource resource , DataBufferFactory bufferFactory , int bufferSize ) {
176
+ return read (resource , 0 , bufferFactory , bufferSize );
176
177
}
177
178
178
179
/**
@@ -185,26 +186,25 @@ public static Flux<DataBuffer> read(
185
186
* Closes the channel when the flux is terminated.
186
187
* @param resource the resource to read from
187
188
* @param position the position to start reading from
188
- * @param dataBufferFactory the factory to create data buffers with
189
+ * @param bufferFactory the factory to create data buffers with
189
190
* @param bufferSize the maximum size of the data buffers
190
- * @return a flux of data buffers read from the given channel
191
+ * @return a Flux of data buffers read from the given channel
191
192
*/
192
193
public static Flux <DataBuffer > read (
193
- Resource resource , long position , DataBufferFactory dataBufferFactory , int bufferSize ) {
194
+ Resource resource , long position , DataBufferFactory bufferFactory , int bufferSize ) {
194
195
195
196
try {
196
197
if (resource .isFile ()) {
197
198
File file = resource .getFile ();
198
199
return readAsynchronousFileChannel (
199
200
() -> AsynchronousFileChannel .open (file .toPath (), StandardOpenOption .READ ),
200
- position , dataBufferFactory , bufferSize );
201
+ position , bufferFactory , bufferSize );
201
202
}
202
203
}
203
204
catch (IOException ignore ) {
204
205
// fallback to resource.readableChannel(), below
205
206
}
206
-
207
- Flux <DataBuffer > result = readByteChannel (resource ::readableChannel , dataBufferFactory , bufferSize );
207
+ Flux <DataBuffer > result = readByteChannel (resource ::readableChannel , bufferFactory , bufferSize );
208
208
return position == 0 ? result : skipUntilByteCount (result , position );
209
209
}
210
210
@@ -214,16 +214,19 @@ public static Flux<DataBuffer> read(
214
214
//---------------------------------------------------------------------
215
215
216
216
/**
217
- * Write the given stream of {@link DataBuffer DataBuffers} to the given {@code OutputStream}. Does
218
- * <strong>not</strong> close the output stream when the flux is terminated, and does
219
- * <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
220
- * source. If releasing is required, then subscribe to the returned {@code Flux} with a
221
- * {@link #releaseConsumer()}.
222
- * <p>Note that the writing process does not start until the returned {@code Flux} is subscribed to.
217
+ * Write the given stream of {@link DataBuffer DataBuffers} to the given
218
+ * {@code OutputStream}. Does <strong>not</strong> close the output stream
219
+ * when the flux is terminated, and does <strong>not</strong>
220
+ * {@linkplain #release(DataBuffer) release} the data buffers in the source.
221
+ * If releasing is required, then subscribe to the returned {@code Flux}
222
+ * with a {@link #releaseConsumer()}.
223
+ * <p>Note that the writing process does not start until the returned
224
+ * {@code Flux} is subscribed to.
223
225
* @param source the stream of data buffers to be written
224
226
* @param outputStream the output stream to write to
225
- * @return a flux containing the same buffers as in {@code source}, that starts the writing
226
- * process when subscribed to, and that publishes any writing errors and the completion signal
227
+ * @return a Flux containing the same buffers as in {@code source}, that
228
+ * starts the writing process when subscribed to, and that publishes any
229
+ * writing errors and the completion signal
227
230
*/
228
231
public static Flux <DataBuffer > write (Publisher <DataBuffer > source , OutputStream outputStream ) {
229
232
Assert .notNull (source , "'source' must not be null" );
@@ -234,16 +237,19 @@ public static Flux<DataBuffer> write(Publisher<DataBuffer> source, OutputStream
234
237
}
235
238
236
239
/**
237
- * Write the given stream of {@link DataBuffer DataBuffers} to the given {@code WritableByteChannel}. Does
238
- * <strong>not</strong> close the channel when the flux is terminated, and does
239
- * <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
240
- * source. If releasing is required, then subscribe to the returned {@code Flux} with a
241
- * {@link #releaseConsumer()}.
242
- * <p>Note that the writing process does not start until the returned {@code Flux} is subscribed to.
240
+ * Write the given stream of {@link DataBuffer DataBuffers} to the given
241
+ * {@code WritableByteChannel}. Does <strong>not</strong> close the channel
242
+ * when the flux is terminated, and does <strong>not</strong>
243
+ * {@linkplain #release(DataBuffer) release} the data buffers in the source.
244
+ * If releasing is required, then subscribe to the returned {@code Flux}
245
+ * with a {@link #releaseConsumer()}.
246
+ * <p>Note that the writing process does not start until the returned
247
+ * {@code Flux} is subscribed to.
243
248
* @param source the stream of data buffers to be written
244
249
* @param channel the channel to write to
245
- * @return a flux containing the same buffers as in {@code source}, that starts the writing
246
- * process when subscribed to, and that publishes any writing errors and the completion signal
250
+ * @return a Flux containing the same buffers as in {@code source}, that
251
+ * starts the writing process when subscribed to, and that publishes any
252
+ * writing errors and the completion signal
247
253
*/
248
254
public static Flux <DataBuffer > write (Publisher <DataBuffer > source , WritableByteChannel channel ) {
249
255
Assert .notNull (source , "'source' must not be null" );
@@ -258,34 +264,40 @@ public static Flux<DataBuffer> write(Publisher<DataBuffer> source, WritableByteC
258
264
}
259
265
260
266
/**
261
- * Write the given stream of {@link DataBuffer DataBuffers} to the given {@code AsynchronousFileChannel}.
262
- * Does <strong>not</strong> close the channel when the flux is terminated, and does
263
- * <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
264
- * source. If releasing is required, then subscribe to the returned {@code Flux} with a
265
- * {@link #releaseConsumer()}.
266
- * <p>Note that the writing process does not start until the returned {@code Flux} is subscribed to.
267
+ * Write the given stream of {@link DataBuffer DataBuffers} to the given
268
+ * {@code AsynchronousFileChannel}. Does <strong>not</strong> close the
269
+ * channel when the flux is terminated, and does <strong>not</strong>
270
+ * {@linkplain #release(DataBuffer) release} the data buffers in the source.
271
+ * If releasing is required, then subscribe to the returned {@code Flux}
272
+ * with a {@link #releaseConsumer()}.
273
+ * <p>Note that the writing process does not start until the returned
274
+ * {@code Flux} is subscribed to.
267
275
* @param source the stream of data buffers to be written
268
276
* @param channel the channel to write to
269
- * @return a flux containing the same buffers as in {@code source}, that starts the writing
270
- * process when subscribed to, and that publishes any writing errors and the completion signal
277
+ * @return a Flux containing the same buffers as in {@code source}, that
278
+ * starts the writing process when subscribed to, and that publishes any
279
+ * writing errors and the completion signal
271
280
* @since 5.0.10
272
281
*/
273
282
public static Flux <DataBuffer > write (Publisher <DataBuffer > source , AsynchronousFileChannel channel ) {
274
283
return write (source , channel , 0 );
275
284
}
276
285
277
286
/**
278
- * Write the given stream of {@link DataBuffer DataBuffers} to the given {@code AsynchronousFileChannel}.
279
- * Does <strong>not</strong> close the channel when the flux is terminated, and does
280
- * <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
281
- * source. If releasing is required, then subscribe to the returned {@code Flux} with a
287
+ * Write the given stream of {@link DataBuffer DataBuffers} to the given
288
+ * {@code AsynchronousFileChannel}. Does <strong>not</strong> close the channel
289
+ * when the flux is terminated, and does <strong>not</strong>
290
+ * {@linkplain #release(DataBuffer) release} the data buffers in the source.
291
+ * If releasing is required, then subscribe to the returned {@code Flux} with a
282
292
* {@link #releaseConsumer()}.
283
- * <p>Note that the writing process does not start until the returned {@code Flux} is subscribed to.
293
+ * <p>Note that the writing process does not start until the returned
294
+ * {@code Flux} is subscribed to.
284
295
* @param source the stream of data buffers to be written
285
296
* @param channel the channel to write to
286
- * @param position the file position at which the write is to begin; must be non-negative
287
- * @return a flux containing the same buffers as in {@code source}, that starts the writing
288
- * process when subscribed to, and that publishes any writing errors and the completion signal
297
+ * @param position file position write write is to begin; must be non-negative
298
+ * @return a flux containing the same buffers as in {@code source}, that
299
+ * starts the writing process when subscribed to, and that publishes any
300
+ * writing errors and the completion signal
289
301
*/
290
302
public static Flux <DataBuffer > write (
291
303
Publisher <DataBuffer > source , AsynchronousFileChannel channel , long position ) {
0 commit comments