|
25 | 25 | import io.r2dbc.postgresql.message.backend.ErrorResponse;
|
26 | 26 | import io.r2dbc.postgresql.util.Assert;
|
27 | 27 | import io.r2dbc.postgresql.util.GeneratedValuesUtils;
|
| 28 | +import io.r2dbc.postgresql.util.Operators; |
28 | 29 | import io.r2dbc.spi.Statement;
|
29 | 30 | import reactor.core.publisher.Flux;
|
30 | 31 | import reactor.util.annotation.Nullable;
|
@@ -140,12 +141,16 @@ private Flux<io.r2dbc.postgresql.api.PostgresqlResult> execute(String sql) {
|
140 | 141 |
|
141 | 142 | if (this.fetchSize != NO_LIMIT) {
|
142 | 143 |
|
143 |
| - Flux<BackendMessage> messages = ExtendedFlowDelegate.runQuery(this.resources, factory, sql, Binding.EMPTY, Collections.emptyList(), this.fetchSize); |
144 |
| - return Flux.just(new PostgresqlResult(this.resources, messages, factory)); |
| 144 | + return ExtendedFlowDelegate.runQuery(this.resources, factory, sql, Binding.EMPTY, Collections.emptyList(), this.fetchSize) |
| 145 | + .windowUntil(WINDOW_UNTIL) |
| 146 | + .map(messages -> PostgresqlResult.toResult(this.resources, messages, factory)) |
| 147 | + .as(Operators::discardOnCancel); |
145 | 148 | }
|
146 | 149 |
|
147 |
| - Flux<BackendMessage> messages = SimpleQueryMessageFlow.exchange(this.resources.getClient(), sql); |
148 |
| - return Flux.just(PostgresqlResult.toResult(this.resources, messages, factory)); |
| 150 | + return SimpleQueryMessageFlow.exchange(this.resources.getClient(), sql) |
| 151 | + .windowUntil(WINDOW_UNTIL) |
| 152 | + .map(messages -> PostgresqlResult.toResult(this.resources, messages, factory)) |
| 153 | + .as(Operators::discardOnCancel); |
149 | 154 | }
|
150 | 155 |
|
151 | 156 | }
|
0 commit comments