Skip to content

Commit 4617f8c

Browse files
committed
Refactor BackendMessageSubscriber and Conversation.
Encapsulate access to nested sink and demand fields with methods. Replace Atomic* fields with volatile ones that are updated through AtomicLongFieldUpdater. Use Operators.addCap(…) to accumulate demand and to prevent overflow. Introduce buffer for received Conversation that do not have demand. Introduce drainloop to drain the buffer on request(n). Extract constants. Use Operators.on…Dropped(…) to propagate dropped signals. [resolves #231]
1 parent 0b04fc3 commit 4617f8c

File tree

2 files changed

+191
-66
lines changed

2 files changed

+191
-66
lines changed

src/main/java/io/r2dbc/postgresql/client/Client.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,10 @@ public interface Client {
6666
Mono<Void> close();
6767

6868
/**
69-
* Perform an exchange of messages.
69+
* Perform an exchange of messages. Note that the {@link ReadyForQuery} frame is not emitted through the resulting {@link Flux}.
7070
*
7171
* @param requests the publisher of outbound messages
72-
* @return a {@link Flux} of incoming messages that ends with the end of the frame (i.e. reception of a {@link ReadyForQuery} message.
72+
* @return a {@link Flux} of incoming messages that ends with the end of conversation (i.e. reception of a {@link ReadyForQuery} message. Th
7373
* @throws IllegalArgumentException if {@code requests} is {@code null}
7474
*/
7575
default Flux<BackendMessage> exchange(Publisher<FrontendMessage> requests) {
@@ -79,8 +79,10 @@ default Flux<BackendMessage> exchange(Publisher<FrontendMessage> requests) {
7979
/**
8080
* Perform an exchange of messages.
8181
*
82-
* @param requests the publisher of outbound messages
83-
* @return a {@link Flux} of incoming messages that ends with the end of the frame (i.e. reception of a {@link ReadyForQuery} message.
82+
* @param takeUntil the predicate that signals the resulting {@link Flux} to terminate. Typically a check if the {@link BackendMessage} is the last frame of a conversation. Note that the
83+
* {@link BackendMessage} that matches the predicate is not emitted through the resulting {@link Flux}.
84+
* @param requests the publisher of outbound messages
85+
* @return a {@link Flux} of incoming messages that ends with the end of conversation matching {@code takeUntil}. (i.e. reception of a {@link ReadyForQuery} message.
8486
* @throws IllegalArgumentException if {@code requests} is {@code null}
8587
* @since 0.9
8688
*/

0 commit comments

Comments
 (0)