1
1
/*
2
- * Copyright 2002-2018 the original author or authors.
2
+ * Copyright 2002-2019 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
@@ -82,40 +82,36 @@ public static Flux<DataBuffer> readInputStream(
82
82
* Obtain a {@link ReadableByteChannel} from the given supplier, and read it into a
83
83
* {@code Flux} of {@code DataBuffer}s. Closes the channel when the flux is terminated.
84
84
* @param channelSupplier the supplier for the channel to read from
85
- * @param dataBufferFactory the factory to create data buffers with
85
+ * @param bufferFactory the factory to create data buffers with
86
86
* @param bufferSize the maximum size of the data buffers
87
87
* @return a flux of data buffers read from the given channel
88
88
*/
89
89
public static Flux <DataBuffer > readByteChannel (
90
- Callable <ReadableByteChannel > channelSupplier , DataBufferFactory dataBufferFactory , int bufferSize ) {
90
+ Callable <ReadableByteChannel > channelSupplier , DataBufferFactory bufferFactory , int bufferSize ) {
91
91
92
92
Assert .notNull (channelSupplier , "'channelSupplier' must not be null" );
93
- Assert .notNull (dataBufferFactory , "'dataBufferFactory' must not be null" );
93
+ Assert .notNull (bufferFactory , "'dataBufferFactory' must not be null" );
94
94
Assert .isTrue (bufferSize > 0 , "'bufferSize' must be > 0" );
95
95
96
96
return Flux .using (channelSupplier ,
97
- channel -> {
98
- ReadableByteChannelGenerator generator =
99
- new ReadableByteChannelGenerator (channel , dataBufferFactory ,
100
- bufferSize );
101
- return Flux .generate (generator );
102
- },
103
- DataBufferUtils ::closeChannel )
104
- .doOnDiscard (PooledDataBuffer .class , DataBufferUtils ::release );
97
+ channel -> Flux .generate (new ReadableByteChannelGenerator (channel , bufferFactory , bufferSize )),
98
+ DataBufferUtils ::closeChannel );
99
+
100
+ // No doOnDiscard as operators used do not cache
105
101
}
106
102
107
103
/**
108
104
* Obtain a {@code AsynchronousFileChannel} from the given supplier, and read it into a
109
105
* {@code Flux} of {@code DataBuffer}s. Closes the channel when the flux is terminated.
110
106
* @param channelSupplier the supplier for the channel to read from
111
- * @param dataBufferFactory the factory to create data buffers with
107
+ * @param bufferFactory the factory to create data buffers with
112
108
* @param bufferSize the maximum size of the data buffers
113
109
* @return a flux of data buffers read from the given channel
114
110
*/
115
111
public static Flux <DataBuffer > readAsynchronousFileChannel (
116
- Callable <AsynchronousFileChannel > channelSupplier , DataBufferFactory dataBufferFactory , int bufferSize ) {
112
+ Callable <AsynchronousFileChannel > channelSupplier , DataBufferFactory bufferFactory , int bufferSize ) {
117
113
118
- return readAsynchronousFileChannel (channelSupplier , 0 , dataBufferFactory , bufferSize );
114
+ return readAsynchronousFileChannel (channelSupplier , 0 , bufferFactory , bufferSize );
119
115
}
120
116
121
117
/**
@@ -124,32 +120,30 @@ public static Flux<DataBuffer> readAsynchronousFileChannel(
124
120
* channel when the flux is terminated.
125
121
* @param channelSupplier the supplier for the channel to read from
126
122
* @param position the position to start reading from
127
- * @param dataBufferFactory the factory to create data buffers with
123
+ * @param bufferFactory the factory to create data buffers with
128
124
* @param bufferSize the maximum size of the data buffers
129
125
* @return a flux of data buffers read from the given channel
130
126
*/
131
127
public static Flux <DataBuffer > readAsynchronousFileChannel (Callable <AsynchronousFileChannel > channelSupplier ,
132
- long position , DataBufferFactory dataBufferFactory , int bufferSize ) {
128
+ long position , DataBufferFactory bufferFactory , int bufferSize ) {
133
129
134
130
Assert .notNull (channelSupplier , "'channelSupplier' must not be null" );
135
- Assert .notNull (dataBufferFactory , "'dataBufferFactory' must not be null" );
131
+ Assert .notNull (bufferFactory , "'dataBufferFactory' must not be null" );
136
132
Assert .isTrue (position >= 0 , "'position' must be >= 0" );
137
133
Assert .isTrue (bufferSize > 0 , "'bufferSize' must be > 0" );
138
134
139
- DataBuffer dataBuffer = dataBufferFactory .allocateBuffer (bufferSize );
140
- ByteBuffer byteBuffer = dataBuffer .asByteBuffer (0 , bufferSize );
141
-
142
- Flux <DataBuffer > result = Flux .using (channelSupplier ,
135
+ Flux <DataBuffer > flux = Flux .using (channelSupplier ,
143
136
channel -> Flux .create (sink -> {
144
- AsynchronousFileChannelReadCompletionHandler completionHandler =
145
- new AsynchronousFileChannelReadCompletionHandler (channel ,
146
- sink , position , dataBufferFactory , bufferSize );
147
- channel .read (byteBuffer , position , dataBuffer , completionHandler );
148
- sink .onDispose (completionHandler ::dispose );
137
+ ReadCompletionHandler handler =
138
+ new ReadCompletionHandler (channel , sink , position , bufferFactory , bufferSize );
139
+ DataBuffer dataBuffer = bufferFactory .allocateBuffer (bufferSize );
140
+ ByteBuffer byteBuffer = dataBuffer .asByteBuffer (0 , bufferSize );
141
+ channel .read (byteBuffer , position , dataBuffer , handler );
142
+ sink .onDispose (handler ::dispose );
149
143
}),
150
144
DataBufferUtils ::closeChannel );
151
145
152
- return result .doOnDiscard (PooledDataBuffer .class , DataBufferUtils ::release );
146
+ return flux .doOnDiscard (PooledDataBuffer .class , DataBufferUtils ::release );
153
147
}
154
148
155
149
/**
@@ -246,8 +240,7 @@ public static Flux<DataBuffer> write(Publisher<DataBuffer> source, WritableByteC
246
240
247
241
Flux <DataBuffer > flux = Flux .from (source );
248
242
return Flux .create (sink -> {
249
- WritableByteChannelSubscriber subscriber =
250
- new WritableByteChannelSubscriber (sink , channel );
243
+ WritableByteChannelSubscriber subscriber = new WritableByteChannelSubscriber (sink , channel );
251
244
sink .onDispose (subscriber );
252
245
flux .subscribe (subscriber );
253
246
});
@@ -292,10 +285,9 @@ public static Flux<DataBuffer> write(
292
285
293
286
Flux <DataBuffer > flux = Flux .from (source );
294
287
return Flux .create (sink -> {
295
- AsynchronousFileChannelWriteCompletionHandler completionHandler =
296
- new AsynchronousFileChannelWriteCompletionHandler (sink , channel , position );
297
- sink .onDispose (completionHandler );
298
- flux .subscribe (completionHandler );
288
+ WriteCompletionHandler handler = new WriteCompletionHandler (sink , channel , position );
289
+ sink .onDispose (handler );
290
+ flux .subscribe (handler );
299
291
});
300
292
}
301
293
@@ -326,21 +318,21 @@ public static Flux<DataBuffer> takeUntilByteCount(Publisher<DataBuffer> publishe
326
318
Assert .notNull (publisher , "Publisher must not be null" );
327
319
Assert .isTrue (maxByteCount >= 0 , "'maxByteCount' must be a positive number" );
328
320
329
- return Flux . defer (() -> {
330
- AtomicLong countDown = new AtomicLong ( maxByteCount );
331
- return Flux . from ( publisher )
332
- . map ( buffer -> {
333
- long remainder = countDown . addAndGet (- buffer . readableByteCount ());
334
- if ( remainder < 0 ) {
335
- int length = buffer .readableByteCount () + ( int ) remainder ;
336
- return buffer . slice ( 0 , length );
337
- }
338
- else {
339
- return buffer ;
340
- }
341
- })
342
- . takeUntil ( buffer -> countDown . get () <= 0 );
343
- }); // no doOnDiscard necessary, as this method does not drop buffers
321
+ AtomicLong countDown = new AtomicLong ( maxByteCount );
322
+ return Flux . from ( publisher )
323
+ . map ( buffer -> {
324
+ long remainder = countDown . addAndGet (- buffer . readableByteCount ());
325
+ if ( remainder < 0 ) {
326
+ int length = buffer . readableByteCount () + ( int ) remainder ;
327
+ return buffer .slice ( 0 , length ) ;
328
+ }
329
+ else {
330
+ return buffer ;
331
+ }
332
+ })
333
+ . takeUntil ( buffer -> countDown . get () <= 0 );
334
+
335
+ // No doOnDiscard as operators used do not cache (and drop) buffers
344
336
}
345
337
346
338
/**
@@ -487,8 +479,7 @@ public void accept(SynchronousSink<DataBuffer> sink) {
487
479
}
488
480
489
481
490
- private static class AsynchronousFileChannelReadCompletionHandler
491
- implements CompletionHandler <Integer , DataBuffer > {
482
+ private static class ReadCompletionHandler implements CompletionHandler <Integer , DataBuffer > {
492
483
493
484
private final AsynchronousFileChannel channel ;
494
485
@@ -502,7 +493,7 @@ private static class AsynchronousFileChannelReadCompletionHandler
502
493
503
494
private final AtomicBoolean disposed = new AtomicBoolean ();
504
495
505
- public AsynchronousFileChannelReadCompletionHandler (AsynchronousFileChannel channel ,
496
+ public ReadCompletionHandler (AsynchronousFileChannel channel ,
506
497
FluxSink <DataBuffer > sink , long position , DataBufferFactory dataBufferFactory , int bufferSize ) {
507
498
508
499
this .channel = channel ;
@@ -586,7 +577,7 @@ protected void hookOnComplete() {
586
577
}
587
578
588
579
589
- private static class AsynchronousFileChannelWriteCompletionHandler extends BaseSubscriber <DataBuffer >
580
+ private static class WriteCompletionHandler extends BaseSubscriber <DataBuffer >
590
581
implements CompletionHandler <Integer , ByteBuffer > {
591
582
592
583
private final FluxSink <DataBuffer > sink ;
@@ -601,7 +592,7 @@ private static class AsynchronousFileChannelWriteCompletionHandler extends BaseS
601
592
602
593
private final AtomicReference <DataBuffer > dataBuffer = new AtomicReference <>();
603
594
604
- public AsynchronousFileChannelWriteCompletionHandler (
595
+ public WriteCompletionHandler (
605
596
FluxSink <DataBuffer > sink , AsynchronousFileChannel channel , long position ) {
606
597
607
598
this .sink = sink ;
0 commit comments