Skip to content

Commit bdbf958

Browse files
authored
Add experimental BlockHound integration (#1450)
1 parent 8810ab5 commit bdbf958

File tree

12 files changed

+297
-31
lines changed

12 files changed

+297
-31
lines changed

bundle/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@
4848
<groupId>org.graalvm.nativeimage</groupId>
4949
<artifactId>svm</artifactId>
5050
</dependency>
51+
<dependency>
52+
<groupId>io.projectreactor.tools</groupId>
53+
<artifactId>blockhound</artifactId>
54+
<optional>true</optional>
55+
</dependency>
5156

5257
<!-- As we pretend the driver being provided only, we need be explicit about the one dependency we actually didn't shade. -->
5358
<dependency>

bundle/src/main/jpms/module-info.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,5 @@
3737
requires static org.graalvm.sdk;
3838
requires static org.slf4j;
3939
requires static java.management;
40+
requires static reactor.blockhound;
4041
}

driver/pom.xml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
<maven.compiler.xlint.extras>,-try</maven.compiler.xlint.extras>
2323
<surefire.jpms.args>--add-opens org.neo4j.driver/org.neo4j.driver.internal.util.messaging=ALL-UNNAMED</surefire.jpms.args>
2424
<failsafe.parallelizable.jpms.args>--add-opens org.neo4j.driver/org.neo4j.driver.internal.util=ALL-UNNAMED --add-opens org.neo4j.driver/org.neo4j.driver.internal.async=ALL-UNNAMED</failsafe.parallelizable.jpms.args>
25+
<blockhound.tag>blockHoundTest</blockhound.tag>
2526
<maven.deploy.skip>false</maven.deploy.skip>
2627
</properties>
2728

@@ -59,6 +60,11 @@
5960
<groupId>org.graalvm.nativeimage</groupId>
6061
<artifactId>svm</artifactId>
6162
</dependency>
63+
<dependency>
64+
<groupId>io.projectreactor.tools</groupId>
65+
<artifactId>blockhound</artifactId>
66+
<optional>true</optional>
67+
</dependency>
6268

6369
<!-- Test dependencies -->
6470
<dependency>
@@ -127,6 +133,26 @@
127133
</compilerArgs>
128134
</configuration>
129135
</plugin>
136+
<plugin>
137+
<groupId>org.apache.maven.plugins</groupId>
138+
<artifactId>maven-surefire-plugin</artifactId>
139+
<configuration>
140+
<excludedGroups>${blockhound.tag}</excludedGroups>
141+
</configuration>
142+
<executions>
143+
<execution>
144+
<id>blockhound-tests</id>
145+
<goals>
146+
<goal>test</goal>
147+
</goals>
148+
<configuration combine.self="override">
149+
<useModulePath>false</useModulePath>
150+
<argLine>-XX:+AllowRedefinitionToAddDeleteMethods</argLine>
151+
<groups>${blockhound.tag}</groups>
152+
</configuration>
153+
</execution>
154+
</executions>
155+
</plugin>
130156
<plugin>
131157
<groupId>org.apache.maven.plugins</groupId>
132158
<artifactId>maven-javadoc-plugin</artifactId>

driver/src/main/java/module-info.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,5 @@
4545
requires static org.graalvm.sdk;
4646
requires static org.slf4j;
4747
requires static java.management;
48+
requires static reactor.blockhound;
4849
}

driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelTracker.java

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.poolId;
2222
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.serverAddress;
23+
import static org.neo4j.driver.internal.util.LockUtil.executeWithLock;
2324

2425
import io.netty.channel.Channel;
2526
import io.netty.channel.ChannelFutureListener;
@@ -31,7 +32,6 @@
3132
import java.util.Map;
3233
import java.util.concurrent.locks.Lock;
3334
import java.util.concurrent.locks.ReentrantReadWriteLock;
34-
import java.util.function.Supplier;
3535
import org.neo4j.driver.Logger;
3636
import org.neo4j.driver.Logging;
3737
import org.neo4j.driver.internal.messaging.BoltProtocol;
@@ -60,27 +60,9 @@ public NettyChannelTracker(MetricsListener metricsListener, ChannelGroup channel
6060
this.allChannels = channels;
6161
}
6262

63-
private void doInWriteLock(Runnable work) {
64-
try {
65-
write.lock();
66-
work.run();
67-
} finally {
68-
write.unlock();
69-
}
70-
}
71-
72-
private <T> T retrieveInReadLock(Supplier<T> work) {
73-
try {
74-
read.lock();
75-
return work.get();
76-
} finally {
77-
read.unlock();
78-
}
79-
}
80-
8163
@Override
8264
public void channelReleased(Channel channel) {
83-
doInWriteLock(() -> {
65+
executeWithLock(write, () -> {
8466
decrementInUse(channel);
8567
incrementIdle(channel);
8668
channel.closeFuture().addListener(closeListener);
@@ -91,7 +73,7 @@ public void channelReleased(Channel channel) {
9173

9274
@Override
9375
public void channelAcquired(Channel channel) {
94-
doInWriteLock(() -> {
76+
executeWithLock(write, () -> {
9577
incrementInUse(channel);
9678
decrementIdle(channel);
9779
channel.closeFuture().removeListener(closeListener);
@@ -109,7 +91,7 @@ public void channelCreated(Channel channel) {
10991

11092
public void channelCreated(Channel channel, ListenerEvent<?> creatingEvent) {
11193
// when it is created, we count it as idle as it has not been acquired out of the pool
112-
doInWriteLock(() -> incrementIdle(channel));
94+
executeWithLock(write, () -> incrementIdle(channel));
11395

11496
metricsListener.afterCreated(poolId(channel), creatingEvent);
11597
allChannels.add(channel);
@@ -129,16 +111,16 @@ public void channelFailedToCreate(String poolId) {
129111
}
130112

131113
public void channelClosed(Channel channel) {
132-
doInWriteLock(() -> decrementIdle(channel));
114+
executeWithLock(write, () -> decrementIdle(channel));
133115
metricsListener.afterClosed(poolId(channel));
134116
}
135117

136118
public int inUseChannelCount(ServerAddress address) {
137-
return retrieveInReadLock(() -> addressToInUseChannelCount.getOrDefault(address, 0));
119+
return executeWithLock(read, () -> addressToInUseChannelCount.getOrDefault(address, 0));
138120
}
139121

140122
public int idleChannelCount(ServerAddress address) {
141-
return retrieveInReadLock(() -> addressToIdleChannelCount.getOrDefault(address, 0));
123+
return executeWithLock(read, () -> addressToIdleChannelCount.getOrDefault(address, 0));
142124
}
143125

144126
public void prepareToCloseChannels() {
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.blockhound;
20+
21+
import org.neo4j.driver.internal.util.LockUtil;
22+
import org.neo4j.driver.util.Experimental;
23+
import reactor.blockhound.BlockHound;
24+
import reactor.blockhound.integration.BlockHoundIntegration;
25+
26+
@Experimental
27+
public class Neo4jDriverBlockHoundIntegration implements BlockHoundIntegration {
28+
@Override
29+
public void applyTo(BlockHound.Builder builder) {
30+
var lockUtilName = LockUtil.class.getName();
31+
builder.allowBlockingCallsInside(lockUtilName, "lock");
32+
builder.allowBlockingCallsInside(lockUtilName, "unlock");
33+
}
34+
}

driver/src/main/java/org/neo4j/driver/internal/util/LockUtil.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,27 +25,49 @@
2525

2626
public class LockUtil {
2727
public static void executeWithLock(Lock lock, Runnable runnable) {
28-
lock.lock();
28+
lock(lock);
2929
try {
3030
runnable.run();
3131
} finally {
32-
lock.unlock();
32+
unlock(lock);
3333
}
3434
}
3535

3636
public static <T> T executeWithLock(Lock lock, Supplier<T> supplier) {
37-
lock.lock();
37+
lock(lock);
3838
try {
3939
return supplier.get();
4040
} finally {
41-
lock.unlock();
41+
unlock(lock);
4242
}
4343
}
4444

4545
public static <T> void executeWithLockAsync(Lock lock, Supplier<CompletionStage<T>> stageSupplier) {
46-
lock.lock();
46+
lock(lock);
4747
CompletableFuture.completedFuture(lock)
4848
.thenCompose(ignored -> stageSupplier.get())
49-
.whenComplete((ignored, throwable) -> lock.unlock());
49+
.whenComplete((ignored, throwable) -> unlock(lock));
50+
}
51+
52+
/**
53+
* Invokes {@link Lock#lock()} on the supplied {@link Lock}.
54+
* <p>
55+
* This method is marked as allowed in the {@link org.neo4j.driver.internal.blockhound.Neo4jDriverBlockHoundIntegration}.
56+
* @param lock the lock
57+
* @since 5.11
58+
*/
59+
private static void lock(Lock lock) {
60+
lock.lock();
61+
}
62+
63+
/**
64+
* Invokes {@link Lock#unlock()} on the supplied {@link Lock}.
65+
* <p>
66+
* This method is marked as allowed in the {@link org.neo4j.driver.internal.blockhound.Neo4jDriverBlockHoundIntegration}.
67+
* @param lock the lock
68+
* @since 5.11
69+
*/
70+
private static void unlock(Lock lock) {
71+
lock.unlock();
5072
}
5173
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Copyright (c) "Neo4j"
2+
# Neo4j Sweden AB [http://neo4j.com]
3+
#
4+
# This file is part of Neo4j.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
18+
org.neo4j.driver.internal.blockhound.Neo4jDriverBlockHoundIntegration
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.blockhound;
20+
21+
import static org.junit.jupiter.api.Assertions.assertEquals;
22+
import static org.junit.jupiter.api.Assertions.assertThrows;
23+
import static org.junit.jupiter.api.Assertions.assertTrue;
24+
25+
import java.util.concurrent.CompletableFuture;
26+
import java.util.concurrent.CompletionException;
27+
import java.util.concurrent.locks.ReentrantLock;
28+
import org.junit.jupiter.api.BeforeAll;
29+
import org.junit.jupiter.api.Test;
30+
import org.neo4j.driver.internal.util.LockUtil;
31+
import org.neo4j.driver.testutil.BlockHoundTest;
32+
import reactor.blockhound.BlockHound;
33+
import reactor.blockhound.BlockingOperationError;
34+
import reactor.core.scheduler.Schedulers;
35+
36+
@BlockHoundTest
37+
class LockUtilBlockHoundTest {
38+
@BeforeAll
39+
static void setUp() {
40+
BlockHound.install();
41+
}
42+
43+
@Test
44+
@SuppressWarnings("StatementWithEmptyBody")
45+
void shouldAllowBlockingOnLock() {
46+
var lock = new ReentrantLock();
47+
var lockUnlockFuture = new CompletableFuture<Void>();
48+
CompletableFuture.runAsync(() -> {
49+
lock.lock();
50+
lockUnlockFuture.join();
51+
lock.unlock();
52+
});
53+
var testFuture = new CompletableFuture<Void>();
54+
Schedulers.parallel().schedule(() -> {
55+
try {
56+
LockUtil.executeWithLock(lock, () -> {});
57+
} catch (Throwable t) {
58+
testFuture.completeExceptionally(t);
59+
}
60+
testFuture.complete(null);
61+
});
62+
63+
while (lock.getQueueLength() != 1) {}
64+
lockUnlockFuture.complete(null);
65+
testFuture.join();
66+
}
67+
68+
@Test
69+
void shouldFailInternalBlockingCalls() {
70+
var lock = new ReentrantLock();
71+
var testFuture = new CompletableFuture<Void>();
72+
Schedulers.parallel()
73+
.schedule(() -> LockUtil.executeWithLock(lock, () -> {
74+
try {
75+
Thread.sleep(1);
76+
} catch (Throwable e) {
77+
testFuture.completeExceptionally(e);
78+
return;
79+
}
80+
testFuture.complete(null);
81+
}));
82+
83+
var exception =
84+
assertThrows(CompletionException.class, testFuture::join).getCause();
85+
assertEquals(BlockingOperationError.class, exception.getClass());
86+
assertTrue(exception.getMessage().contains("java.lang.Thread.sleep"));
87+
}
88+
}

0 commit comments

Comments
 (0)