Skip to content

Commit fdcf09d

Browse files
committed
Copied buffering split from DataBufferUtils to StringDecoder
Due to the imminent removal of DataBufferUtils.split, this commit copies over the buffering split algortihm from DataBufferUtils, as it is still sutable for the StringDecoder
1 parent ef19a6b commit fdcf09d

File tree

1 file changed

+242
-2
lines changed

1 file changed

+242
-2
lines changed

spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java

Lines changed: 242 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616

1717
package org.springframework.core.codec;
1818

19+
import java.io.InputStream;
20+
import java.io.OutputStream;
21+
import java.nio.ByteBuffer;
1922
import java.nio.CharBuffer;
2023
import java.nio.charset.Charset;
2124
import java.nio.charset.StandardCharsets;
@@ -25,13 +28,17 @@
2528
import java.util.Map;
2629
import java.util.concurrent.ConcurrentHashMap;
2730
import java.util.concurrent.ConcurrentMap;
31+
import java.util.function.IntPredicate;
2832

2933
import org.reactivestreams.Publisher;
3034
import reactor.core.publisher.Flux;
3135

3236
import org.springframework.core.ResolvableType;
3337
import org.springframework.core.io.buffer.DataBuffer;
38+
import org.springframework.core.io.buffer.DataBufferFactory;
3439
import org.springframework.core.io.buffer.DataBufferUtils;
40+
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
41+
import org.springframework.core.io.buffer.PooledDataBuffer;
3542
import org.springframework.core.log.LogFormatUtils;
3643
import org.springframework.lang.Nullable;
3744
import org.springframework.util.Assert;
@@ -88,8 +95,15 @@ public Flux<String> decode(Publisher<DataBuffer> input, ResolvableType elementTy
8895

8996
byte[][] delimiterBytes = getDelimiterBytes(mimeType);
9097

91-
Flux<DataBuffer> inputFlux =
92-
DataBufferUtils.split(input, delimiterBytes, this.stripDelimiter);
98+
Flux<DataBuffer> inputFlux = Flux.defer(() -> {
99+
DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delimiterBytes);
100+
return Flux.from(input)
101+
.concatMapIterable(buffer -> endFrameAfterDelimiter(buffer, matcher))
102+
.bufferUntil(buffer -> buffer instanceof EndFrameBuffer)
103+
.map(buffers -> joinAndStrip(buffers, this.stripDelimiter))
104+
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
105+
106+
});
93107

94108
return super.decode(inputFlux, elementType, mimeType, hints);
95109
}
@@ -128,6 +142,69 @@ private static Charset getCharset(@Nullable MimeType mimeType) {
128142
}
129143
}
130144

145+
/**
146+
* Finds the first match and longest delimiter, {@link EndFrameBuffer} just after it.
147+
*
148+
* @param dataBuffer the buffer to find delimiters in
149+
* @param matcher used to find the first delimiters
150+
* @return a flux of buffers, containing {@link EndFrameBuffer} after each delimiter that was
151+
* found in {@code dataBuffer}. Returns Flux, because returning List (w/ flatMapIterable)
152+
* results in memory leaks due to pre-fetching.
153+
*/
154+
private static List<DataBuffer> endFrameAfterDelimiter(DataBuffer dataBuffer, DataBufferUtils.Matcher matcher) {
155+
List<DataBuffer> result = new ArrayList<>();
156+
do {
157+
int endIdx = matcher.match(dataBuffer);
158+
if (endIdx != -1) {
159+
int readPosition = dataBuffer.readPosition();
160+
int length = endIdx - readPosition + 1;
161+
result.add(dataBuffer.retainedSlice(readPosition, length));
162+
result.add(new EndFrameBuffer(matcher.delimiter()));
163+
dataBuffer.readPosition(endIdx + 1);
164+
}
165+
else {
166+
result.add(DataBufferUtils.retain(dataBuffer));
167+
break;
168+
}
169+
}
170+
while (dataBuffer.readableByteCount() > 0);
171+
172+
DataBufferUtils.release(dataBuffer);
173+
return result;
174+
}
175+
176+
/**
177+
* Joins the given list of buffers. If the list ends with a {@link EndFrameBuffer}, it is
178+
* removed. If {@code stripDelimiter} is {@code true} and the resulting buffer ends with
179+
* a delimiter, it is removed.
180+
* @param dataBuffers the data buffers to join
181+
* @param stripDelimiter whether to strip the delimiter
182+
* @return the joined buffer
183+
*/
184+
private static DataBuffer joinAndStrip(List<DataBuffer> dataBuffers,
185+
boolean stripDelimiter) {
186+
187+
Assert.state(!dataBuffers.isEmpty(), "DataBuffers should not be empty");
188+
189+
byte[] matchingDelimiter = null;
190+
191+
int lastIdx = dataBuffers.size() - 1;
192+
DataBuffer lastBuffer = dataBuffers.get(lastIdx);
193+
if (lastBuffer instanceof EndFrameBuffer) {
194+
matchingDelimiter = ((EndFrameBuffer) lastBuffer).delimiter();
195+
dataBuffers.remove(lastIdx);
196+
}
197+
198+
DataBuffer result = dataBuffers.get(0).factory().join(dataBuffers);
199+
200+
if (stripDelimiter && matchingDelimiter != null) {
201+
result.writePosition(result.writePosition() - matchingDelimiter.length);
202+
}
203+
return result;
204+
}
205+
206+
207+
131208

132209
/**
133210
* Create a {@code StringDecoder} for {@code "text/plain"}.
@@ -186,4 +263,167 @@ public static StringDecoder allMimeTypes(List<String> delimiters, boolean stripD
186263
new MimeType("text", "plain", DEFAULT_CHARSET), MimeTypeUtils.ALL);
187264
}
188265

266+
267+
private static class EndFrameBuffer implements DataBuffer {
268+
269+
private static final DataBuffer BUFFER = new DefaultDataBufferFactory().wrap(new byte[0]);
270+
271+
private byte[] delimiter;
272+
273+
274+
public EndFrameBuffer(byte[] delimiter) {
275+
this.delimiter = delimiter;
276+
}
277+
278+
public byte[] delimiter() {
279+
return this.delimiter;
280+
}
281+
282+
@Override
283+
public DataBufferFactory factory() {
284+
return BUFFER.factory();
285+
}
286+
287+
@Override
288+
public int indexOf(IntPredicate predicate, int fromIndex) {
289+
return BUFFER.indexOf(predicate, fromIndex);
290+
}
291+
292+
@Override
293+
public int lastIndexOf(IntPredicate predicate, int fromIndex) {
294+
return BUFFER.lastIndexOf(predicate, fromIndex);
295+
}
296+
297+
@Override
298+
public int readableByteCount() {
299+
return BUFFER.readableByteCount();
300+
}
301+
302+
@Override
303+
public int writableByteCount() {
304+
return BUFFER.writableByteCount();
305+
}
306+
307+
@Override
308+
public int capacity() {
309+
return BUFFER.capacity();
310+
}
311+
312+
@Override
313+
public DataBuffer capacity(int capacity) {
314+
return BUFFER.capacity(capacity);
315+
}
316+
317+
@Override
318+
public DataBuffer ensureCapacity(int capacity) {
319+
return BUFFER.ensureCapacity(capacity);
320+
}
321+
322+
@Override
323+
public int readPosition() {
324+
return BUFFER.readPosition();
325+
}
326+
327+
@Override
328+
public DataBuffer readPosition(int readPosition) {
329+
return BUFFER.readPosition(readPosition);
330+
}
331+
332+
@Override
333+
public int writePosition() {
334+
return BUFFER.writePosition();
335+
}
336+
337+
@Override
338+
public DataBuffer writePosition(int writePosition) {
339+
return BUFFER.writePosition(writePosition);
340+
}
341+
342+
@Override
343+
public byte getByte(int index) {
344+
return BUFFER.getByte(index);
345+
}
346+
347+
@Override
348+
public byte read() {
349+
return BUFFER.read();
350+
}
351+
352+
@Override
353+
public DataBuffer read(byte[] destination) {
354+
return BUFFER.read(destination);
355+
}
356+
357+
@Override
358+
public DataBuffer read(byte[] destination, int offset, int length) {
359+
return BUFFER.read(destination, offset, length);
360+
}
361+
362+
@Override
363+
public DataBuffer write(byte b) {
364+
return BUFFER.write(b);
365+
}
366+
367+
@Override
368+
public DataBuffer write(byte[] source) {
369+
return BUFFER.write(source);
370+
}
371+
372+
@Override
373+
public DataBuffer write(byte[] source, int offset, int length) {
374+
return BUFFER.write(source, offset, length);
375+
}
376+
377+
@Override
378+
public DataBuffer write(DataBuffer... buffers) {
379+
return BUFFER.write(buffers);
380+
}
381+
382+
@Override
383+
public DataBuffer write(ByteBuffer... buffers) {
384+
return BUFFER.write(buffers);
385+
}
386+
387+
@Override
388+
public DataBuffer write(CharSequence charSequence, Charset charset) {
389+
return BUFFER.write(charSequence, charset);
390+
}
391+
392+
@Override
393+
public DataBuffer slice(int index, int length) {
394+
return BUFFER.slice(index, length);
395+
}
396+
397+
@Override
398+
public DataBuffer retainedSlice(int index, int length) {
399+
return BUFFER.retainedSlice(index, length);
400+
}
401+
402+
@Override
403+
public ByteBuffer asByteBuffer() {
404+
return BUFFER.asByteBuffer();
405+
}
406+
407+
@Override
408+
public ByteBuffer asByteBuffer(int index, int length) {
409+
return BUFFER.asByteBuffer(index, length);
410+
}
411+
412+
@Override
413+
public InputStream asInputStream() {
414+
return BUFFER.asInputStream();
415+
}
416+
417+
@Override
418+
public InputStream asInputStream(boolean releaseOnClose) {
419+
return BUFFER.asInputStream(releaseOnClose);
420+
}
421+
422+
@Override
423+
public OutputStream asOutputStream() {
424+
return BUFFER.asOutputStream();
425+
}
426+
}
427+
428+
189429
}

0 commit comments

Comments
 (0)