20
20
21
21
import java .io .ByteArrayInputStream ;
22
22
import java .io .ByteArrayOutputStream ;
23
+ import java .io .File ;
23
24
import java .io .FileInputStream ;
24
25
import java .io .IOException ;
25
26
import java .nio .ByteBuffer ;
26
27
import java .nio .charset .Charset ;
28
+ import java .nio .file .Files ;
27
29
import java .util .ArrayList ;
28
30
import java .util .List ;
29
31
import java .util .Optional ;
32
+ import java .util .UUID ;
30
33
import java .util .concurrent .CompletableFuture ;
34
+ import java .util .concurrent .ExecutionException ;
35
+ import java .util .concurrent .ThreadLocalRandom ;
31
36
import java .util .concurrent .TimeUnit ;
37
+ import java .util .concurrent .TimeoutException ;
32
38
import java .util .concurrent .atomic .AtomicInteger ;
33
39
import org .apache .commons .lang3 .RandomStringUtils ;
34
40
import org .junit .jupiter .api .AfterAll ;
@@ -51,11 +57,12 @@ public class SplittingPublisherTest {
51
57
52
58
private static final int NUM_OF_CHUNK = (int ) Math .ceil (CONTENT_SIZE / (double ) CHUNK_SIZE );
53
59
54
- private static RandomTempFile testFile ;
60
+ private static File testFile ;
55
61
56
62
@ BeforeAll
57
63
public static void beforeAll () throws IOException {
58
- testFile = new RandomTempFile ("testfile.dat" , CONTENT_SIZE );
64
+ testFile = File .createTempFile ("SplittingPublisherTest" , UUID .randomUUID ().toString ());
65
+ Files .write (testFile .toPath (), CONTENT );
59
66
}
60
67
61
68
@ AfterAll
@@ -65,46 +72,19 @@ public static void afterAll() throws Exception {
65
72
66
73
@ ParameterizedTest
67
74
@ ValueSource (ints = {CHUNK_SIZE , CHUNK_SIZE * 2 - 1 , CHUNK_SIZE * 2 })
68
- void differentChunkSize_shouldSplitAsyncRequestBodyCorrectly (int upstreamByteBufferSize ) throws Exception {
69
- CompletableFuture <Void > future = new CompletableFuture <>();
70
- SplittingPublisher splittingPublisher = SplittingPublisher .builder ()
71
- .resultFuture (future )
72
- .asyncRequestBody (FileAsyncRequestBody .builder ()
73
- .path (testFile .toPath ())
74
- .chunkSizeInBytes (upstreamByteBufferSize )
75
- .build ())
76
-
77
- .resultFuture (future )
78
- .chunkSizeInBytes ((long ) CHUNK_SIZE )
79
- .maxMemoryUsageInBytes ((long ) CHUNK_SIZE * 4 )
80
- .build ();
81
-
82
- List <CompletableFuture <byte []>> futures = new ArrayList <>();
75
+ void differentChunkSize_shouldSplitAsyncRequestBodyCorrectly (int chunkSize ) throws Exception {
83
76
84
- splittingPublisher .subscribe (requestBody -> {
85
- CompletableFuture <byte []> baosFuture = new CompletableFuture <>();
86
- BaosSubscriber subscriber = new BaosSubscriber (baosFuture );
87
- futures .add (baosFuture );
88
- requestBody .subscribe (subscriber );
89
- }).get (5 , TimeUnit .SECONDS );
90
-
91
- assertThat (futures .size ()).isEqualTo (NUM_OF_CHUNK );
77
+ FileAsyncRequestBody fileAsyncRequestBody = FileAsyncRequestBody .builder ()
78
+ .path (testFile .toPath ())
79
+ .chunkSizeInBytes (chunkSize )
80
+ .build ();
81
+ verifySplitContent (fileAsyncRequestBody , chunkSize );
82
+ }
92
83
93
- for (int i = 0 ; i < futures .size (); i ++) {
94
- try (FileInputStream fileInputStream = new FileInputStream (testFile )) {
95
- byte [] expected ;
96
- if (i == futures .size () - 1 ) {
97
- expected = new byte [1 ];
98
- } else {
99
- expected = new byte [5 ];
100
- }
101
- fileInputStream .skip (i * 5 );
102
- fileInputStream .read (expected );
103
- byte [] actualBytes = futures .get (i ).join ();
104
- assertThat (actualBytes ).isEqualTo (expected );
105
- };
106
- }
107
- assertThat (future ).isCompleted ();
84
+ @ ParameterizedTest
85
+ @ ValueSource (ints = {CHUNK_SIZE , CHUNK_SIZE * 2 - 1 , CHUNK_SIZE * 2 })
86
+ void differentChunkSize_byteArrayShouldSplitAsyncRequestBodyCorrectly (int chunkSize ) throws Exception {
87
+ verifySplitContent (AsyncRequestBody .fromBytes (CONTENT ), chunkSize );
108
88
}
109
89
110
90
@@ -115,7 +95,7 @@ void cancelFuture_shouldCancelUpstream() throws IOException {
115
95
SplittingPublisher splittingPublisher = SplittingPublisher .builder ()
116
96
.resultFuture (future )
117
97
.asyncRequestBody (asyncRequestBody )
118
- .chunkSizeInBytes (( long ) CHUNK_SIZE )
98
+ .chunkSizeInBytes (CHUNK_SIZE )
119
99
.maxMemoryUsageInBytes (10L )
120
100
.build ();
121
101
@@ -139,7 +119,7 @@ public Optional<Long> contentLength() {
139
119
SplittingPublisher splittingPublisher = SplittingPublisher .builder ()
140
120
.resultFuture (future )
141
121
.asyncRequestBody (asyncRequestBody )
142
- .chunkSizeInBytes (( long ) CHUNK_SIZE )
122
+ .chunkSizeInBytes (CHUNK_SIZE )
143
123
.maxMemoryUsageInBytes (10L )
144
124
.build ();
145
125
@@ -177,6 +157,46 @@ public Optional<Long> contentLength() {
177
157
178
158
}
179
159
160
+
161
+ private static void verifySplitContent (AsyncRequestBody asyncRequestBody , int chunkSize ) throws Exception {
162
+ CompletableFuture <Void > future = new CompletableFuture <>();
163
+ SplittingPublisher splittingPublisher = SplittingPublisher .builder ()
164
+ .resultFuture (future )
165
+ .asyncRequestBody (asyncRequestBody )
166
+ .resultFuture (future )
167
+ .chunkSizeInBytes (chunkSize )
168
+ .maxMemoryUsageInBytes ((long ) chunkSize * 4 )
169
+ .build ();
170
+
171
+ List <CompletableFuture <byte []>> futures = new ArrayList <>();
172
+
173
+ splittingPublisher .subscribe (requestBody -> {
174
+ CompletableFuture <byte []> baosFuture = new CompletableFuture <>();
175
+ BaosSubscriber subscriber = new BaosSubscriber (baosFuture );
176
+ futures .add (baosFuture );
177
+ requestBody .subscribe (subscriber );
178
+ }).get (5 , TimeUnit .SECONDS );
179
+
180
+ assertThat (futures .size ()).isEqualTo ((int ) Math .ceil (CONTENT_SIZE / (double ) chunkSize ));
181
+
182
+ for (int i = 0 ; i < futures .size (); i ++) {
183
+ try (FileInputStream fileInputStream = new FileInputStream (testFile )) {
184
+ byte [] expected ;
185
+ if (i == futures .size () - 1 ) {
186
+ int lastChunk = CONTENT_SIZE % chunkSize == 0 ? chunkSize : (CONTENT_SIZE % chunkSize );
187
+ expected = new byte [lastChunk ];
188
+ } else {
189
+ expected = new byte [chunkSize ];
190
+ }
191
+ fileInputStream .skip (i * chunkSize );
192
+ fileInputStream .read (expected );
193
+ byte [] actualBytes = futures .get (i ).join ();
194
+ assertThat (actualBytes ).isEqualTo (expected );
195
+ };
196
+ }
197
+ assertThat (future ).isCompleted ();
198
+ }
199
+
180
200
private static class TestAsyncRequestBody implements AsyncRequestBody {
181
201
private volatile boolean cancelled ;
182
202
private volatile boolean isDone ;
0 commit comments