Skip to content

Commit 5695637

Browse files
committed
Conditionally use windowUntil(…) in SimpleQueryPostgresqlStatement.
We now conditionally check if Reactor 3.4 is present and if so, we use windowUntil(…) as Reactor 3.4 is known to not suffer from the demand/backpressure counting issue. [#44] Signed-off-by: Mark Paluch <[email protected]>
1 parent 0d18586 commit 5695637

File tree

1 file changed

+49
-2
lines changed

1 file changed

+49
-2
lines changed

src/main/java/io/r2dbc/postgresql/SimpleQueryPostgresqlStatement.java

+49-2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.r2dbc.postgresql.message.backend.ErrorResponse;
2626
import io.r2dbc.postgresql.util.Assert;
2727
import io.r2dbc.postgresql.util.GeneratedValuesUtils;
28+
import io.r2dbc.postgresql.util.Operators;
2829
import io.r2dbc.spi.Statement;
2930
import reactor.core.publisher.Flux;
3031
import reactor.util.annotation.Nullable;
@@ -41,6 +42,8 @@
4142
*/
4243
final class SimpleQueryPostgresqlStatement implements PostgresqlStatement {
4344

45+
private static final boolean REACTOR_3_4_AVAILABLE = isPresent("reactor.util.context.ContextView", SimpleQueryPostgresqlStatement.class.getClassLoader());
46+
4447
private static final Predicate<BackendMessage> WINDOW_UNTIL = or(CommandComplete.class::isInstance, EmptyQueryResponse.class::isInstance, ErrorResponse.class::isInstance);
4548

4649
private final ConnectionResources resources;
@@ -141,11 +144,55 @@ private Flux<io.r2dbc.postgresql.api.PostgresqlResult> execute(String sql) {
141144
if (this.fetchSize != NO_LIMIT) {
142145

143146
Flux<BackendMessage> messages = ExtendedFlowDelegate.runQuery(this.resources, factory, sql, Binding.EMPTY, Collections.emptyList(), this.fetchSize);
144-
return Flux.just(new PostgresqlResult(this.resources, messages, factory));
147+
return REACTOR_3_4_AVAILABLE ? messages.windowUntil(WINDOW_UNTIL).map(msg -> new PostgresqlResult(this.resources, messages, factory)).as(Operators::discardOnCancel) :
148+
Flux.just(new PostgresqlResult(this.resources, messages, factory));
145149
}
146150

147151
Flux<BackendMessage> messages = SimpleQueryMessageFlow.exchange(this.resources.getClient(), sql);
148-
return Flux.just(PostgresqlResult.toResult(this.resources, messages, factory));
152+
return REACTOR_3_4_AVAILABLE ? messages.windowUntil(WINDOW_UNTIL).map(msg -> new PostgresqlResult(this.resources, messages, factory)).as(Operators::discardOnCancel) :
153+
Flux.just(PostgresqlResult.toResult(this.resources, messages, factory));
154+
}
155+
156+
/**
157+
* Determine whether the {@link Class} identified by the supplied name is present
158+
* and can be loaded. Will return {@code false} if either the class or
159+
* one of its dependencies is not present or cannot be loaded.
160+
*
161+
* @param className the name of the class to check
162+
* @param classLoader the class loader to use
163+
* (may be {@code null} which indicates the default class loader)
164+
* @return whether the specified class is present (including all of its
165+
* superclasses and interfaces)
166+
* @throws IllegalStateException if the corresponding class is resolvable but
167+
* there was a readability mismatch in the inheritance hierarchy of the class
168+
* (typically a missing dependency declaration in a Jigsaw module definition
169+
* for a superclass or interface implemented by the class to be checked here)
170+
*/
171+
private static boolean isPresent(String className, @Nullable ClassLoader classLoader) {
172+
try {
173+
try {
174+
Class.forName(className, false, classLoader);
175+
} catch (ClassNotFoundException ex) {
176+
int lastDotIndex = className.lastIndexOf(".");
177+
if (lastDotIndex != -1) {
178+
String innerClassName =
179+
className.substring(0, lastDotIndex) + "$" + className.substring(lastDotIndex + 1);
180+
try {
181+
Class.forName(innerClassName, false, classLoader);
182+
} catch (ClassNotFoundException ex2) {
183+
// Swallow - let original exception get through
184+
}
185+
}
186+
throw ex;
187+
}
188+
return true;
189+
} catch (IllegalAccessError err) {
190+
throw new IllegalStateException("Readability mismatch in inheritance hierarchy of class [" +
191+
className + "]: " + err.getMessage(), err);
192+
} catch (Throwable ex) {
193+
// Typically ClassNotFoundException or NoClassDefFoundError...
194+
return false;
195+
}
149196
}
150197

151198
}

0 commit comments

Comments
 (0)