|
36 | 36 | import com.fasterxml.jackson.databind.SequenceWriter;
|
37 | 37 | import com.fasterxml.jackson.databind.exc.InvalidDefinitionException;
|
38 | 38 | import com.fasterxml.jackson.databind.ser.FilterProvider;
|
39 |
| -import org.apache.commons.logging.Log; |
40 | 39 | import org.reactivestreams.Publisher;
|
41 | 40 | import reactor.core.publisher.Flux;
|
42 | 41 | import reactor.core.publisher.Mono;
|
@@ -182,20 +181,23 @@ public Flux<DataBuffer> encode(Publisher<?> inputStream, DataBufferFactory buffe
|
182 | 181 | // Do not prepend JSON array prefix until first signal is known, onNext vs onError
|
183 | 182 | // Keeps response not committed for error handling
|
184 | 183 |
|
185 |
| - Flux<DataBuffer> flux1 = helper.getPrefix(bufferFactory, hints, logger) |
186 |
| - .concatWith(Flux.just(EMPTY_BUFFER).repeat()); |
| 184 | + dataBufferFlux = Flux.from(inputStream) |
| 185 | + .map(value -> { |
| 186 | + byte[] prefix = helper.getPrefix(); |
| 187 | + byte[] delimiter = helper.getDelimiter(); |
187 | 188 |
|
188 |
| - Flux<DataBuffer> flux2 = Flux.from(inputStream).map(value -> encodeStreamingValue( |
189 |
| - value, bufferFactory, hints, sequenceWriter, byteBuilder, helper.getDelimiter(), EMPTY_BYTES)); |
| 189 | + DataBuffer dataBuffer = encodeStreamingValue( |
| 190 | + value, bufferFactory, hints, sequenceWriter, byteBuilder, delimiter, EMPTY_BYTES); |
190 | 191 |
|
191 |
| - dataBufferFlux = Flux.zip(flux1, flux2, (buffer1, buffer2) -> |
192 |
| - (buffer1 != EMPTY_BUFFER ? |
193 |
| - bufferFactory.join(Arrays.asList(buffer1, buffer2)) : |
194 |
| - buffer2)) |
195 |
| - .concatWith(helper.getSuffix(bufferFactory, hints, logger)); |
| 192 | + return (prefix.length > 0 ? |
| 193 | + bufferFactory.join(Arrays.asList(bufferFactory.wrap(prefix), dataBuffer)) : |
| 194 | + dataBuffer); |
| 195 | + }) |
| 196 | + .concatWith(Mono.fromCallable(() -> bufferFactory.wrap(helper.getSuffix()))); |
196 | 197 | }
|
197 | 198 |
|
198 | 199 | return dataBufferFlux
|
| 200 | + .doOnNext(dataBuffer -> Hints.touchDataBuffer(dataBuffer, hints, logger)) |
199 | 201 | .doAfterTerminate(() -> {
|
200 | 202 | try {
|
201 | 203 | byteBuilder.release();
|
@@ -420,33 +422,22 @@ private static class JsonArrayJoinHelper {
|
420 | 422 |
|
421 | 423 | private static final byte[] CLOSE_BRACKET = {']'};
|
422 | 424 |
|
423 |
| - |
424 |
| - private boolean afterFirstItem = false; |
| 425 | + private boolean firstItemEmitted; |
425 | 426 |
|
426 | 427 | public byte[] getDelimiter() {
|
427 |
| - if (this.afterFirstItem) { |
| 428 | + if (this.firstItemEmitted) { |
428 | 429 | return COMMA_SEPARATOR;
|
429 | 430 | }
|
430 |
| - this.afterFirstItem = true; |
| 431 | + this.firstItemEmitted = true; |
431 | 432 | return EMPTY_BYTES;
|
432 | 433 | }
|
433 | 434 |
|
434 |
| - public Mono<DataBuffer> getPrefix(DataBufferFactory factory, @Nullable Map<String, Object> hints, Log logger) { |
435 |
| - return wrapBytes(OPEN_BRACKET, factory, hints, logger); |
436 |
| - } |
437 |
| - |
438 |
| - public Mono<DataBuffer> getSuffix(DataBufferFactory factory, @Nullable Map<String, Object> hints, Log logger) { |
439 |
| - return wrapBytes(CLOSE_BRACKET, factory, hints, logger); |
| 435 | + public byte[] getPrefix() { |
| 436 | + return (this.firstItemEmitted ? EMPTY_BYTES : OPEN_BRACKET); |
440 | 437 | }
|
441 | 438 |
|
442 |
| - private Mono<DataBuffer> wrapBytes( |
443 |
| - byte[] bytes, DataBufferFactory bufferFactory, @Nullable Map<String, Object> hints, Log logger) { |
444 |
| - |
445 |
| - return Mono.fromCallable(() -> { |
446 |
| - DataBuffer buffer = bufferFactory.wrap(bytes); |
447 |
| - Hints.touchDataBuffer(buffer, hints, logger); |
448 |
| - return buffer; |
449 |
| - }); |
| 439 | + public byte[] getSuffix() { |
| 440 | + return CLOSE_BRACKET; |
450 | 441 | }
|
451 | 442 | }
|
452 | 443 |
|
|
0 commit comments