1
1
/*
2
- * Copyright 2002-2019 the original author or authors.
2
+ * Copyright 2002-2020 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
@@ -95,17 +95,44 @@ public Flux<String> decode(Publisher<DataBuffer> input, ResolvableType elementTy
95
95
96
96
List <byte []> delimiterBytes = getDelimiterBytes (mimeType );
97
97
98
- // TODO: Drop Consumer and use bufferUntil with Supplier<LimistedDataBufferList> (reactor-core#1925)
99
- // TODO: Drop doOnDiscard(LimitedDataBufferList.class, ...) (reactor-core#1924)
100
- LimitedDataBufferConsumer limiter = new LimitedDataBufferConsumer (getMaxInMemorySize ());
98
+ Flux <DataBuffer > inputFlux = Flux .defer (() -> {
99
+ if (getMaxInMemorySize () != -1 ) {
101
100
102
- Flux <DataBuffer > inputFlux = Flux .from (input )
103
- .flatMapIterable (buffer -> splitOnDelimiter (buffer , delimiterBytes ))
104
- .doOnNext (limiter )
105
- .bufferUntil (buffer -> buffer == END_FRAME )
106
- .map (StringDecoder ::joinUntilEndFrame )
107
- .doOnDiscard (LimitedDataBufferList .class , LimitedDataBufferList ::releaseAndClear )
108
- .doOnDiscard (PooledDataBuffer .class , DataBufferUtils ::release );
101
+ // Passing limiter into endFrameAfterDelimiter helps to ensure that in case of one DataBuffer
102
+ // containing multiple lines, the limit is checked and raised immediately without accumulating
103
+ // subsequent lines. This is necessary because concatMapIterable doesn't respect doOnDiscard.
104
+ // When reactor-core#1925 is resolved, we could replace bufferUntil with:
105
+
106
+ // .windowUntil(buffer -> buffer instanceof EndFrameBuffer)
107
+ // .concatMap(fluxes -> fluxes.collect(() -> new LimitedDataBufferList(getMaxInMemorySize()), LimitedDataBufferList::add))
108
+
109
+ LimitedDataBufferList limiter = new LimitedDataBufferList (getMaxInMemorySize ());
110
+
111
+ return Flux .from (input )
112
+ .concatMapIterable (buffer -> splitOnDelimiter (buffer , delimiterBytes , limiter ))
113
+ .bufferUntil (buffer -> buffer == END_FRAME )
114
+ .map (StringDecoder ::joinUntilEndFrame )
115
+ .doOnDiscard (PooledDataBuffer .class , DataBufferUtils ::release );
116
+ }
117
+ else {
118
+
119
+ // When the decoder is unlimited (-1), concatMapIterable will cache buffers that may not
120
+ // be released if cancel is signalled before they are turned into String lines
121
+ // (see test maxInMemoryLimitReleasesUnprocessedLinesWhenUnlimited).
122
+ // When reactor-core#1925 is resolved, the workaround can be removed and the entire
123
+ // else clause possibly dropped.
124
+
125
+ ConcatMapIterableDiscardWorkaroundCache cache = new ConcatMapIterableDiscardWorkaroundCache ();
126
+
127
+ return Flux .from (input )
128
+ .concatMapIterable (buffer -> cache .addAll (splitOnDelimiter (buffer , delimiterBytes , null )))
129
+ .doOnNext (cache )
130
+ .doOnCancel (cache )
131
+ .bufferUntil (buffer -> buffer == END_FRAME )
132
+ .map (StringDecoder ::joinUntilEndFrame )
133
+ .doOnDiscard (PooledDataBuffer .class , DataBufferUtils ::release );
134
+ }
135
+ });
109
136
110
137
return super .decode (inputFlux , elementType , mimeType , hints );
111
138
}
@@ -125,7 +152,9 @@ private List<byte[]> getDelimiterBytes(@Nullable MimeType mimeType) {
125
152
* Split the given data buffer on delimiter boundaries.
126
153
* The returned Flux contains an {@link #END_FRAME} buffer after each delimiter.
127
154
*/
128
- private List <DataBuffer > splitOnDelimiter (DataBuffer buffer , List <byte []> delimiterBytes ) {
155
+ private List <DataBuffer > splitOnDelimiter (
156
+ DataBuffer buffer , List <byte []> delimiterBytes , @ Nullable LimitedDataBufferList limiter ) {
157
+
129
158
List <DataBuffer > frames = new ArrayList <>();
130
159
try {
131
160
do {
@@ -147,15 +176,28 @@ private List<DataBuffer> splitOnDelimiter(DataBuffer buffer, List<byte[]> delimi
147
176
buffer .readPosition (readPosition + length + matchingDelimiter .length );
148
177
frames .add (DataBufferUtils .retain (frame ));
149
178
frames .add (END_FRAME );
179
+ if (limiter != null ) {
180
+ limiter .add (frame ); // enforce the limit
181
+ limiter .clear ();
182
+ }
150
183
}
151
184
else {
152
185
frame = buffer .slice (readPosition , buffer .readableByteCount ());
153
186
buffer .readPosition (readPosition + buffer .readableByteCount ());
154
187
frames .add (DataBufferUtils .retain (frame ));
188
+ if (limiter != null ) {
189
+ limiter .add (frame );
190
+ }
155
191
}
156
192
}
157
193
while (buffer .readableByteCount () > 0 );
158
194
}
195
+ catch (DataBufferLimitException ex ) {
196
+ if (limiter != null ) {
197
+ limiter .releaseAndClear ();
198
+ }
199
+ throw ex ;
200
+ }
159
201
catch (Throwable ex ) {
160
202
for (DataBuffer frame : frames ) {
161
203
DataBufferUtils .release (frame );
@@ -293,34 +335,32 @@ public static StringDecoder allMimeTypes(List<String> delimiters, boolean stripD
293
335
}
294
336
295
337
296
- /**
297
- * Temporary measure for reactor-core#1925.
298
- * Consumer that adds to a {@link LimitedDataBufferList} to enforce limits.
299
- */
300
- private static class LimitedDataBufferConsumer implements Consumer <DataBuffer > {
338
+ private class ConcatMapIterableDiscardWorkaroundCache implements Consumer <DataBuffer >, Runnable {
301
339
302
- private final LimitedDataBufferList bufferList ;
340
+ private final List < DataBuffer > buffers = new ArrayList <>() ;
303
341
304
342
305
- public LimitedDataBufferConsumer (int maxInMemorySize ) {
306
- this .bufferList = new LimitedDataBufferList (maxInMemorySize );
343
+ public List <DataBuffer > addAll (List <DataBuffer > buffersToAdd ) {
344
+ this .buffers .addAll (buffersToAdd );
345
+ return buffersToAdd ;
307
346
}
308
347
348
+ @ Override
349
+ public void accept (DataBuffer dataBuffer ) {
350
+ this .buffers .remove (dataBuffer );
351
+ }
309
352
310
353
@ Override
311
- public void accept (DataBuffer buffer ) {
312
- if (buffer == END_FRAME ) {
313
- this .bufferList .clear ();
314
- }
315
- else {
354
+ public void run () {
355
+ this .buffers .forEach (buffer -> {
316
356
try {
317
- this .bufferList .add (buffer );
318
- }
319
- catch (DataBufferLimitException ex ) {
320
357
DataBufferUtils .release (buffer );
321
- throw ex ;
322
358
}
323
- }
359
+ catch (Throwable ex ) {
360
+ // Keep going..
361
+ }
362
+ });
324
363
}
325
364
}
365
+
326
366
}
0 commit comments