|
1 | 1 | /*
|
2 |
| - * Copyright 2002-2021 the original author or authors. |
| 2 | + * Copyright 2002-2022 the original author or authors. |
3 | 3 | *
|
4 | 4 | * Licensed under the Apache License, Version 2.0 (the "License");
|
5 | 5 | * you may not use this file except in compliance with the License.
|
|
26 | 26 | import org.reactivestreams.Subscription;
|
27 | 27 | import reactor.core.publisher.Operators;
|
28 | 28 |
|
| 29 | +import org.springframework.core.io.buffer.DataBuffer; |
| 30 | +import org.springframework.core.io.buffer.DefaultDataBufferFactory; |
29 | 31 | import org.springframework.core.log.LogDelegateFactory;
|
30 | 32 | import org.springframework.lang.Nullable;
|
31 | 33 | import org.springframework.util.Assert;
|
@@ -56,6 +58,8 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
|
56 | 58 | */
|
57 | 59 | protected static Log rsReadLogger = LogDelegateFactory.getHiddenLog(AbstractListenerReadPublisher.class);
|
58 | 60 |
|
| 61 | + final static DataBuffer EMPTY_BUFFER = DefaultDataBufferFactory.sharedInstance.allocateBuffer(0); |
| 62 | + |
59 | 63 |
|
60 | 64 | private final AtomicReference<State> state = new AtomicReference<>(State.UNSUBSCRIBED);
|
61 | 65 |
|
@@ -180,15 +184,20 @@ public final void onError(Throwable ex) {
|
180 | 184 |
|
181 | 185 | /**
|
182 | 186 | * Read and publish data one at a time until there is no more data, no more
|
183 |
| - * demand, or perhaps we completed in the mean time. |
| 187 | + * demand, or perhaps we completed meanwhile. |
184 | 188 | * @return {@code true} if there is more demand; {@code false} if there is
|
185 | 189 | * no more demand or we have completed.
|
186 | 190 | */
|
187 | 191 | private boolean readAndPublish() throws IOException {
|
188 | 192 | long r;
|
189 | 193 | while ((r = this.demand) > 0 && (this.state.get() != State.COMPLETED)) {
|
190 | 194 | T data = read();
|
191 |
| - if (data != null) { |
| 195 | + if (data == EMPTY_BUFFER) { |
| 196 | + if (rsReadLogger.isTraceEnabled()) { |
| 197 | + rsReadLogger.trace(getLogPrefix() + "0 bytes read, trying again"); |
| 198 | + } |
| 199 | + } |
| 200 | + else if (data != null) { |
192 | 201 | if (r != Long.MAX_VALUE) {
|
193 | 202 | DEMAND_FIELD_UPDATER.addAndGet(this, -1L);
|
194 | 203 | }
|
|
0 commit comments