From 5637b2a4b640339bda66ec3501c7cdc36731e74b Mon Sep 17 00:00:00 2001 From: Dmitriy Tverdiakov Date: Sun, 9 Jul 2023 17:20:46 +0100 Subject: [PATCH] Add experimental BlockHound integration --- bundle/pom.xml | 5 ++ bundle/src/main/jpms/module-info.java | 1 + driver/pom.xml | 26 ++++++ driver/src/main/java/module-info.java | 1 + .../async/pool/NettyChannelTracker.java | 32 ++----- .../Neo4jDriverBlockHoundIntegration.java | 34 +++++++ .../neo4j/driver/internal/util/LockUtil.java | 34 +++++-- ...ockhound.integration.BlockHoundIntegration | 18 ++++ .../blockhound/LockUtilBlockHoundTest.java | 88 +++++++++++++++++++ .../Neo4jDriverBlockHoundIntegrationTest.java | 52 +++++++++++ .../neo4j/driver/testutil/BlockHoundTest.java | 30 +++++++ pom.xml | 7 ++ 12 files changed, 297 insertions(+), 31 deletions(-) create mode 100644 driver/src/main/java/org/neo4j/driver/internal/blockhound/Neo4jDriverBlockHoundIntegration.java create mode 100644 driver/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration create mode 100644 driver/src/test/java/org/neo4j/driver/internal/blockhound/LockUtilBlockHoundTest.java create mode 100644 driver/src/test/java/org/neo4j/driver/internal/blockhound/Neo4jDriverBlockHoundIntegrationTest.java create mode 100644 driver/src/test/java/org/neo4j/driver/testutil/BlockHoundTest.java diff --git a/bundle/pom.xml b/bundle/pom.xml index f7562a34d3..0b1fd6aa4e 100644 --- a/bundle/pom.xml +++ b/bundle/pom.xml @@ -48,6 +48,11 @@ org.graalvm.nativeimage svm + + io.projectreactor.tools + blockhound + true + diff --git a/bundle/src/main/jpms/module-info.java b/bundle/src/main/jpms/module-info.java index f883518346..3f5db07a40 100644 --- a/bundle/src/main/jpms/module-info.java +++ b/bundle/src/main/jpms/module-info.java @@ -37,4 +37,5 @@ requires static org.graalvm.sdk; requires static org.slf4j; requires static java.management; + requires static reactor.blockhound; } diff --git a/driver/pom.xml b/driver/pom.xml index 375b10e480..3f5cf61174 100644 --- a/driver/pom.xml +++ b/driver/pom.xml @@ -22,6 +22,7 @@ ,-try --add-opens org.neo4j.driver/org.neo4j.driver.internal.util.messaging=ALL-UNNAMED --add-opens org.neo4j.driver/org.neo4j.driver.internal.util=ALL-UNNAMED --add-opens org.neo4j.driver/org.neo4j.driver.internal.async=ALL-UNNAMED + blockHoundTest false @@ -59,6 +60,11 @@ org.graalvm.nativeimage svm + + io.projectreactor.tools + blockhound + true + @@ -127,6 +133,26 @@ + + org.apache.maven.plugins + maven-surefire-plugin + + ${blockhound.tag} + + + + blockhound-tests + + test + + + false + -XX:+AllowRedefinitionToAddDeleteMethods + ${blockhound.tag} + + + + org.apache.maven.plugins maven-javadoc-plugin diff --git a/driver/src/main/java/module-info.java b/driver/src/main/java/module-info.java index 84804e4657..64ab39a9ac 100644 --- a/driver/src/main/java/module-info.java +++ b/driver/src/main/java/module-info.java @@ -45,4 +45,5 @@ requires static org.graalvm.sdk; requires static org.slf4j; requires static java.management; + requires static reactor.blockhound; } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelTracker.java b/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelTracker.java index 61e20303e1..b3d6f11b5d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelTracker.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelTracker.java @@ -20,6 +20,7 @@ import static org.neo4j.driver.internal.async.connection.ChannelAttributes.poolId; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.serverAddress; +import static org.neo4j.driver.internal.util.LockUtil.executeWithLock; import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; @@ -31,7 +32,6 @@ import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Supplier; import org.neo4j.driver.Logger; import org.neo4j.driver.Logging; import org.neo4j.driver.internal.messaging.BoltProtocol; @@ -60,27 +60,9 @@ public NettyChannelTracker(MetricsListener metricsListener, ChannelGroup channel this.allChannels = channels; } - private void doInWriteLock(Runnable work) { - try { - write.lock(); - work.run(); - } finally { - write.unlock(); - } - } - - private T retrieveInReadLock(Supplier work) { - try { - read.lock(); - return work.get(); - } finally { - read.unlock(); - } - } - @Override public void channelReleased(Channel channel) { - doInWriteLock(() -> { + executeWithLock(write, () -> { decrementInUse(channel); incrementIdle(channel); channel.closeFuture().addListener(closeListener); @@ -91,7 +73,7 @@ public void channelReleased(Channel channel) { @Override public void channelAcquired(Channel channel) { - doInWriteLock(() -> { + executeWithLock(write, () -> { incrementInUse(channel); decrementIdle(channel); channel.closeFuture().removeListener(closeListener); @@ -109,7 +91,7 @@ public void channelCreated(Channel channel) { public void channelCreated(Channel channel, ListenerEvent creatingEvent) { // when it is created, we count it as idle as it has not been acquired out of the pool - doInWriteLock(() -> incrementIdle(channel)); + executeWithLock(write, () -> incrementIdle(channel)); metricsListener.afterCreated(poolId(channel), creatingEvent); allChannels.add(channel); @@ -129,16 +111,16 @@ public void channelFailedToCreate(String poolId) { } public void channelClosed(Channel channel) { - doInWriteLock(() -> decrementIdle(channel)); + executeWithLock(write, () -> decrementIdle(channel)); metricsListener.afterClosed(poolId(channel)); } public int inUseChannelCount(ServerAddress address) { - return retrieveInReadLock(() -> addressToInUseChannelCount.getOrDefault(address, 0)); + return executeWithLock(read, () -> addressToInUseChannelCount.getOrDefault(address, 0)); } public int idleChannelCount(ServerAddress address) { - return retrieveInReadLock(() -> addressToIdleChannelCount.getOrDefault(address, 0)); + return executeWithLock(read, () -> addressToIdleChannelCount.getOrDefault(address, 0)); } public void prepareToCloseChannels() { diff --git a/driver/src/main/java/org/neo4j/driver/internal/blockhound/Neo4jDriverBlockHoundIntegration.java b/driver/src/main/java/org/neo4j/driver/internal/blockhound/Neo4jDriverBlockHoundIntegration.java new file mode 100644 index 0000000000..d8e25642d7 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/blockhound/Neo4jDriverBlockHoundIntegration.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.blockhound; + +import org.neo4j.driver.internal.util.LockUtil; +import org.neo4j.driver.util.Experimental; +import reactor.blockhound.BlockHound; +import reactor.blockhound.integration.BlockHoundIntegration; + +@Experimental +public class Neo4jDriverBlockHoundIntegration implements BlockHoundIntegration { + @Override + public void applyTo(BlockHound.Builder builder) { + var lockUtilName = LockUtil.class.getName(); + builder.allowBlockingCallsInside(lockUtilName, "lock"); + builder.allowBlockingCallsInside(lockUtilName, "unlock"); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/util/LockUtil.java b/driver/src/main/java/org/neo4j/driver/internal/util/LockUtil.java index 9d31f0ab63..d35678491d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/util/LockUtil.java +++ b/driver/src/main/java/org/neo4j/driver/internal/util/LockUtil.java @@ -25,27 +25,49 @@ public class LockUtil { public static void executeWithLock(Lock lock, Runnable runnable) { - lock.lock(); + lock(lock); try { runnable.run(); } finally { - lock.unlock(); + unlock(lock); } } public static T executeWithLock(Lock lock, Supplier supplier) { - lock.lock(); + lock(lock); try { return supplier.get(); } finally { - lock.unlock(); + unlock(lock); } } public static void executeWithLockAsync(Lock lock, Supplier> stageSupplier) { - lock.lock(); + lock(lock); CompletableFuture.completedFuture(lock) .thenCompose(ignored -> stageSupplier.get()) - .whenComplete((ignored, throwable) -> lock.unlock()); + .whenComplete((ignored, throwable) -> unlock(lock)); + } + + /** + * Invokes {@link Lock#lock()} on the supplied {@link Lock}. + *

+ * This method is marked as allowed in the {@link org.neo4j.driver.internal.blockhound.Neo4jDriverBlockHoundIntegration}. + * @param lock the lock + * @since 5.11 + */ + private static void lock(Lock lock) { + lock.lock(); + } + + /** + * Invokes {@link Lock#unlock()} on the supplied {@link Lock}. + *

+ * This method is marked as allowed in the {@link org.neo4j.driver.internal.blockhound.Neo4jDriverBlockHoundIntegration}. + * @param lock the lock + * @since 5.11 + */ + private static void unlock(Lock lock) { + lock.unlock(); } } diff --git a/driver/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration b/driver/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration new file mode 100644 index 0000000000..1c9a3247b3 --- /dev/null +++ b/driver/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration @@ -0,0 +1,18 @@ +# Copyright (c) "Neo4j" +# Neo4j Sweden AB [http://neo4j.com] +# +# This file is part of Neo4j. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.neo4j.driver.internal.blockhound.Neo4jDriverBlockHoundIntegration diff --git a/driver/src/test/java/org/neo4j/driver/internal/blockhound/LockUtilBlockHoundTest.java b/driver/src/test/java/org/neo4j/driver/internal/blockhound/LockUtilBlockHoundTest.java new file mode 100644 index 0000000000..2aaf2c3974 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/blockhound/LockUtilBlockHoundTest.java @@ -0,0 +1,88 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.blockhound; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.locks.ReentrantLock; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.neo4j.driver.internal.util.LockUtil; +import org.neo4j.driver.testutil.BlockHoundTest; +import reactor.blockhound.BlockHound; +import reactor.blockhound.BlockingOperationError; +import reactor.core.scheduler.Schedulers; + +@BlockHoundTest +class LockUtilBlockHoundTest { + @BeforeAll + static void setUp() { + BlockHound.install(); + } + + @Test + @SuppressWarnings("StatementWithEmptyBody") + void shouldAllowBlockingOnLock() { + var lock = new ReentrantLock(); + var lockUnlockFuture = new CompletableFuture(); + CompletableFuture.runAsync(() -> { + lock.lock(); + lockUnlockFuture.join(); + lock.unlock(); + }); + var testFuture = new CompletableFuture(); + Schedulers.parallel().schedule(() -> { + try { + LockUtil.executeWithLock(lock, () -> {}); + } catch (Throwable t) { + testFuture.completeExceptionally(t); + } + testFuture.complete(null); + }); + + while (lock.getQueueLength() != 1) {} + lockUnlockFuture.complete(null); + testFuture.join(); + } + + @Test + void shouldFailInternalBlockingCalls() { + var lock = new ReentrantLock(); + var testFuture = new CompletableFuture(); + Schedulers.parallel() + .schedule(() -> LockUtil.executeWithLock(lock, () -> { + try { + Thread.sleep(1); + } catch (Throwable e) { + testFuture.completeExceptionally(e); + return; + } + testFuture.complete(null); + })); + + var exception = + assertThrows(CompletionException.class, testFuture::join).getCause(); + assertEquals(BlockingOperationError.class, exception.getClass()); + assertTrue(exception.getMessage().contains("java.lang.Thread.sleep")); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/blockhound/Neo4jDriverBlockHoundIntegrationTest.java b/driver/src/test/java/org/neo4j/driver/internal/blockhound/Neo4jDriverBlockHoundIntegrationTest.java new file mode 100644 index 0000000000..28bbd60f0b --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/blockhound/Neo4jDriverBlockHoundIntegrationTest.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.blockhound; + +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; + +import java.lang.reflect.Method; +import java.util.Arrays; +import org.junit.jupiter.api.Test; +import reactor.blockhound.BlockHound; + +class Neo4jDriverBlockHoundIntegrationTest { + @Test + void shouldUseExistingMethodsInAllowBlockingCallsInside() { + var builder = mock(BlockHound.Builder.class); + given(builder.allowBlockingCallsInside(any(), any())).willAnswer(invocationOnMock -> { + var clsName = (String) invocationOnMock.getArgument(0); + var methodName = (String) invocationOnMock.getArgument(1); + var cls = Class.forName(clsName); + Arrays.stream(cls.getDeclaredMethods()) + .map(Method::getName) + .filter(name -> name.equals(methodName)) + .findAny() + .ifPresentOrElse( + ignored -> {}, + () -> fail(String.format("%s.%s method has not been found", clsName, methodName))); + return builder; + }); + var integration = new Neo4jDriverBlockHoundIntegration(); + + integration.applyTo(builder); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/testutil/BlockHoundTest.java b/driver/src/test/java/org/neo4j/driver/testutil/BlockHoundTest.java new file mode 100644 index 0000000000..e3322a5852 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/testutil/BlockHoundTest.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.testutil; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import org.junit.jupiter.api.Tag; + +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Tag("blockHoundTest") +public @interface BlockHoundTest {} diff --git a/pom.xml b/pom.xml index 0f697f4c87..bc1232d16e 100644 --- a/pom.xml +++ b/pom.xml @@ -53,6 +53,7 @@ 1.18.28 23.0.0 1.11.1 + 1.0.8.RELEASE 1.18.3 5.9.0 @@ -125,6 +126,12 @@ ${micrometer.version} provided + + io.projectreactor.tools + blockhound + ${blockhound.version} + provided +