Skip to content

Add experimental BlockHound integration #1450

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions bundle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@
<groupId>org.graalvm.nativeimage</groupId>
<artifactId>svm</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.tools</groupId>
<artifactId>blockhound</artifactId>
<optional>true</optional>
</dependency>

<!-- As we pretend the driver being provided only, we need be explicit about the one dependency we actually didn't shade. -->
<dependency>
Expand Down
1 change: 1 addition & 0 deletions bundle/src/main/jpms/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@
requires static org.graalvm.sdk;
requires static org.slf4j;
requires static java.management;
requires static reactor.blockhound;
}
26 changes: 26 additions & 0 deletions driver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
<maven.compiler.xlint.extras>,-try</maven.compiler.xlint.extras>
<surefire.jpms.args>--add-opens org.neo4j.driver/org.neo4j.driver.internal.util.messaging=ALL-UNNAMED</surefire.jpms.args>
<failsafe.parallelizable.jpms.args>--add-opens org.neo4j.driver/org.neo4j.driver.internal.util=ALL-UNNAMED --add-opens org.neo4j.driver/org.neo4j.driver.internal.async=ALL-UNNAMED</failsafe.parallelizable.jpms.args>
<blockhound.tag>blockHoundTest</blockhound.tag>
<maven.deploy.skip>false</maven.deploy.skip>
</properties>

Expand Down Expand Up @@ -59,6 +60,11 @@
<groupId>org.graalvm.nativeimage</groupId>
<artifactId>svm</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.tools</groupId>
<artifactId>blockhound</artifactId>
<optional>true</optional>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down Expand Up @@ -127,6 +133,26 @@
</compilerArgs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludedGroups>${blockhound.tag}</excludedGroups>
</configuration>
<executions>
<execution>
<id>blockhound-tests</id>
<goals>
<goal>test</goal>
</goals>
<configuration combine.self="override">
<useModulePath>false</useModulePath>
<argLine>-XX:+AllowRedefinitionToAddDeleteMethods</argLine>
<groups>${blockhound.tag}</groups>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
Expand Down
1 change: 1 addition & 0 deletions driver/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,5 @@
requires static org.graalvm.sdk;
requires static org.slf4j;
requires static java.management;
requires static reactor.blockhound;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.neo4j.driver.internal.async.connection.ChannelAttributes.poolId;
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.serverAddress;
import static org.neo4j.driver.internal.util.LockUtil.executeWithLock;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
Expand All @@ -31,7 +32,6 @@
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;
import org.neo4j.driver.internal.messaging.BoltProtocol;
Expand Down Expand Up @@ -60,27 +60,9 @@ public NettyChannelTracker(MetricsListener metricsListener, ChannelGroup channel
this.allChannels = channels;
}

private void doInWriteLock(Runnable work) {
try {
write.lock();
work.run();
} finally {
write.unlock();
}
}

private <T> T retrieveInReadLock(Supplier<T> work) {
try {
read.lock();
return work.get();
} finally {
read.unlock();
}
}

@Override
public void channelReleased(Channel channel) {
doInWriteLock(() -> {
executeWithLock(write, () -> {
decrementInUse(channel);
incrementIdle(channel);
channel.closeFuture().addListener(closeListener);
Expand All @@ -91,7 +73,7 @@ public void channelReleased(Channel channel) {

@Override
public void channelAcquired(Channel channel) {
doInWriteLock(() -> {
executeWithLock(write, () -> {
incrementInUse(channel);
decrementIdle(channel);
channel.closeFuture().removeListener(closeListener);
Expand All @@ -109,7 +91,7 @@ public void channelCreated(Channel channel) {

public void channelCreated(Channel channel, ListenerEvent<?> creatingEvent) {
// when it is created, we count it as idle as it has not been acquired out of the pool
doInWriteLock(() -> incrementIdle(channel));
executeWithLock(write, () -> incrementIdle(channel));

metricsListener.afterCreated(poolId(channel), creatingEvent);
allChannels.add(channel);
Expand All @@ -129,16 +111,16 @@ public void channelFailedToCreate(String poolId) {
}

public void channelClosed(Channel channel) {
doInWriteLock(() -> decrementIdle(channel));
executeWithLock(write, () -> decrementIdle(channel));
metricsListener.afterClosed(poolId(channel));
}

public int inUseChannelCount(ServerAddress address) {
return retrieveInReadLock(() -> addressToInUseChannelCount.getOrDefault(address, 0));
return executeWithLock(read, () -> addressToInUseChannelCount.getOrDefault(address, 0));
}

public int idleChannelCount(ServerAddress address) {
return retrieveInReadLock(() -> addressToIdleChannelCount.getOrDefault(address, 0));
return executeWithLock(read, () -> addressToIdleChannelCount.getOrDefault(address, 0));
}

public void prepareToCloseChannels() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.blockhound;

import org.neo4j.driver.internal.util.LockUtil;
import org.neo4j.driver.util.Experimental;
import reactor.blockhound.BlockHound;
import reactor.blockhound.integration.BlockHoundIntegration;

@Experimental
public class Neo4jDriverBlockHoundIntegration implements BlockHoundIntegration {
@Override
public void applyTo(BlockHound.Builder builder) {
var lockUtilName = LockUtil.class.getName();
builder.allowBlockingCallsInside(lockUtilName, "lock");
builder.allowBlockingCallsInside(lockUtilName, "unlock");
}
}
34 changes: 28 additions & 6 deletions driver/src/main/java/org/neo4j/driver/internal/util/LockUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,49 @@

public class LockUtil {
public static void executeWithLock(Lock lock, Runnable runnable) {
lock.lock();
lock(lock);
try {
runnable.run();
} finally {
lock.unlock();
unlock(lock);
}
}

public static <T> T executeWithLock(Lock lock, Supplier<T> supplier) {
lock.lock();
lock(lock);
try {
return supplier.get();
} finally {
lock.unlock();
unlock(lock);
}
}

public static <T> void executeWithLockAsync(Lock lock, Supplier<CompletionStage<T>> stageSupplier) {
lock.lock();
lock(lock);
CompletableFuture.completedFuture(lock)
.thenCompose(ignored -> stageSupplier.get())
.whenComplete((ignored, throwable) -> lock.unlock());
.whenComplete((ignored, throwable) -> unlock(lock));
}

/**
* Invokes {@link Lock#lock()} on the supplied {@link Lock}.
* <p>
* This method is marked as allowed in the {@link org.neo4j.driver.internal.blockhound.Neo4jDriverBlockHoundIntegration}.
* @param lock the lock
* @since 5.11
*/
private static void lock(Lock lock) {
lock.lock();
}

/**
* Invokes {@link Lock#unlock()} on the supplied {@link Lock}.
* <p>
* This method is marked as allowed in the {@link org.neo4j.driver.internal.blockhound.Neo4jDriverBlockHoundIntegration}.
* @param lock the lock
* @since 5.11
*/
private static void unlock(Lock lock) {
lock.unlock();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Copyright (c) "Neo4j"
# Neo4j Sweden AB [http://neo4j.com]
#
# This file is part of Neo4j.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.neo4j.driver.internal.blockhound.Neo4jDriverBlockHoundIntegration
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.blockhound;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.locks.ReentrantLock;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.neo4j.driver.internal.util.LockUtil;
import org.neo4j.driver.testutil.BlockHoundTest;
import reactor.blockhound.BlockHound;
import reactor.blockhound.BlockingOperationError;
import reactor.core.scheduler.Schedulers;

@BlockHoundTest
class LockUtilBlockHoundTest {
@BeforeAll
static void setUp() {
BlockHound.install();
}

@Test
@SuppressWarnings("StatementWithEmptyBody")
void shouldAllowBlockingOnLock() {
var lock = new ReentrantLock();
var lockUnlockFuture = new CompletableFuture<Void>();
CompletableFuture.runAsync(() -> {
lock.lock();
lockUnlockFuture.join();
lock.unlock();
});
var testFuture = new CompletableFuture<Void>();
Schedulers.parallel().schedule(() -> {
try {
LockUtil.executeWithLock(lock, () -> {});
} catch (Throwable t) {
testFuture.completeExceptionally(t);
}
testFuture.complete(null);
});

while (lock.getQueueLength() != 1) {}
lockUnlockFuture.complete(null);
testFuture.join();
}

@Test
void shouldFailInternalBlockingCalls() {
var lock = new ReentrantLock();
var testFuture = new CompletableFuture<Void>();
Schedulers.parallel()
.schedule(() -> LockUtil.executeWithLock(lock, () -> {
try {
Thread.sleep(1);
} catch (Throwable e) {
testFuture.completeExceptionally(e);
return;
}
testFuture.complete(null);
}));

var exception =
assertThrows(CompletionException.class, testFuture::join).getCause();
assertEquals(BlockingOperationError.class, exception.getClass());
assertTrue(exception.getMessage().contains("java.lang.Thread.sleep"));
}
}
Loading