Skip to content

Commit cc48b6d

Browse files
committed
Handling errors in parse part of extended query flow
1 parent 0cb53c8 commit cc48b6d

File tree

5 files changed

+56
-3
lines changed

5 files changed

+56
-3
lines changed

src/main/java/io/r2dbc/postgresql/client/Client.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,14 @@ default Flux<BackendMessage> exchange(Publisher<FrontendMessage> requests) {
7171
*/
7272
Flux<BackendMessage> exchange(Predicate<BackendMessage> takeUntil, Publisher<FrontendMessage> requests);
7373

74+
/**
75+
* Send one message without waiting for response.
76+
*
77+
* @param message outbound message
78+
* @throws IllegalArgumentException if {@code message} is {@code null}
79+
*/
80+
void send(FrontendMessage message);
81+
7482
/**
7583
* Returns the {@link ByteBufAllocator}.
7684
*

src/main/java/io/r2dbc/postgresql/client/ExtendedQueryMessageFlow.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
import io.r2dbc.postgresql.message.Format;
2121
import io.r2dbc.postgresql.message.backend.BackendMessage;
2222
import io.r2dbc.postgresql.message.backend.CloseComplete;
23+
import io.r2dbc.postgresql.message.backend.ErrorResponse;
2324
import io.r2dbc.postgresql.message.backend.ParseComplete;
25+
import io.r2dbc.postgresql.message.backend.ReadyForQuery;
2426
import io.r2dbc.postgresql.message.frontend.Bind;
2527
import io.r2dbc.postgresql.message.frontend.Close;
2628
import io.r2dbc.postgresql.message.frontend.Describe;
@@ -95,7 +97,20 @@ public static Flux<BackendMessage> parse(Client client, String name, String quer
9597
Assert.requireNonNull(query, "query must not be null");
9698
Assert.requireNonNull(types, "types must not be null");
9799

98-
return client.exchange(ParseComplete.class::isInstance, Flux.just(new Parse(name, types, query), Flush.INSTANCE));
100+
/*
101+
ParseComplete will be received if parse was successful
102+
ReadyForQuery will be received as a response to Sync, which was send in case of error in parsing
103+
*/
104+
return client.exchange(message -> message instanceof ParseComplete || message instanceof ReadyForQuery, Flux.just(new Parse(name, types, query), Flush.INSTANCE))
105+
.doOnNext(message -> {
106+
if (message instanceof ErrorResponse) {
107+
/*
108+
When an error is detected while processing any extended-query message, the backend issues ErrorResponse, then reads and discards messages until a Sync is reached.
109+
So we have to provide Sync message to continue.
110+
*/
111+
client.send(Sync.INSTANCE);
112+
}
113+
});
99114
}
100115

101116
/**

src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,13 @@ public Flux<BackendMessage> exchange(Predicate<BackendMessage> takeUntil, Publis
209209
});
210210
}
211211

212+
@Override
213+
public void send(FrontendMessage message) {
214+
Assert.requireNonNull(message, "requests must not be null");
215+
216+
this.requests.next(message);
217+
}
218+
212219
private Mono<Void> resumeError(Throwable throwable) {
213220

214221
handleConnectionError(throwable);

src/test/java/io/r2dbc/postgresql/client/ExtendedQueryMessageFlowTest.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919
import io.r2dbc.postgresql.message.backend.BindComplete;
2020
import io.r2dbc.postgresql.message.backend.CloseComplete;
2121
import io.r2dbc.postgresql.message.backend.CommandComplete;
22+
import io.r2dbc.postgresql.message.backend.ErrorResponse;
2223
import io.r2dbc.postgresql.message.backend.NoData;
2324
import io.r2dbc.postgresql.message.backend.ParseComplete;
25+
import io.r2dbc.postgresql.message.backend.ReadyForQuery;
2426
import io.r2dbc.postgresql.message.frontend.Bind;
2527
import io.r2dbc.postgresql.message.frontend.Close;
2628
import io.r2dbc.postgresql.message.frontend.Describe;
@@ -112,7 +114,22 @@ void parse() {
112114
ExtendedQueryMessageFlow
113115
.parse(client, "test-name", "test-query", Collections.singletonList(100))
114116
.as(StepVerifier::create)
115-
.expectNext(ParseComplete.INSTANCE)
117+
.verifyComplete();
118+
}
119+
120+
@Test
121+
void parseWithError() {
122+
Client client = TestClient.builder()
123+
.expectRequest(new Parse("test-name", Collections.singletonList(100), "test-query"), Flush.INSTANCE)
124+
.thenRespond(new ErrorResponse(Collections.emptyList()))
125+
.expectRequest(Sync.INSTANCE)
126+
.thenRespond(new ReadyForQuery(ReadyForQuery.TransactionStatus.IDLE))
127+
.build();
128+
129+
ExtendedQueryMessageFlow
130+
.parse(client, "test-name", "test-query", Collections.singletonList(100))
131+
.as(StepVerifier::create)
132+
.expectNext(new ErrorResponse(Collections.emptyList()))
116133
.verifyComplete();
117134
}
118135

src/test/java/io/r2dbc/postgresql/client/TestClient.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,8 @@ public Flux<BackendMessage> exchange(Predicate<BackendMessage> takeUntil, Publis
119119
Flux.from(requests)
120120
.subscribe(this.requests::next, this.requests::error))
121121
.next()
122-
.flatMapMany(Function.identity());
122+
.flatMapMany(Function.identity())
123+
.takeWhile(takeUntil.negate());
123124
}
124125

125126
@Override
@@ -152,6 +153,11 @@ public boolean isConnected() {
152153
return this.connected;
153154
}
154155

156+
@Override
157+
public void send(FrontendMessage message) {
158+
this.requests.next(message);
159+
}
160+
155161
@Override
156162
public Disposable addNotificationListener(Consumer<NotificationResponse> consumer) {
157163
return this.notificationProcessor.subscribe(consumer);

0 commit comments

Comments
 (0)