Skip to content

Commit 76dd71b

Browse files
committed
Avoid marking objects as both live and disposed
In the `ThreadCachingPool` whenever an object is picked from the `ThreadLocal` we cannot add it to the `disposed` queue directly since the object will already be present in the `live` queue (or will be on release). Doing that means that the object will be present both in the `live` and `disposed` queue and also means that on reallocation we can multiple copies of the same reference and can lead to that the `live` queue blows up in size.
1 parent bd0ad3a commit 76dd71b

File tree

2 files changed

+126
-34
lines changed

2 files changed

+126
-34
lines changed

driver/src/main/java/org/neo4j/driver/internal/pool/ThreadCachingPool.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,8 @@ public T acquire( long timeout, TimeUnit unit ) throws InterruptedException
106106
allocator.onAcquire( slot.value );
107107
return slot.value;
108108
}
109-
else
110-
{
111-
// We've acquired the slot, but the validation strategy says it's time for it to die. Dispose of it,
112-
// and go to the global pool.
113-
dispose( slot );
114-
}
109+
//The slot was invalidated however we cannot put it to the
110+
//disposed queue yet since it already exists in the live queue
115111
}
116112

117113
// 2. If that fails, acquire from big pool
@@ -133,17 +129,18 @@ private T acquireFromGlobal( long deadline ) throws InterruptedException
133129
if ( slot != null )
134130
{
135131
// Yay, got a slot - can we keep it?
136-
if ( slot.availableToClaimed() )
132+
if ( slot.isValid( validationStrategy ) )
137133
{
138-
if ( slot.isValid( validationStrategy ) )
134+
if ( slot.availableToClaimed() )
139135
{
140136
break;
141137
}
142-
else
143-
{
144-
// We've acquired the slot, but the validation strategy says it's time for it to die.
145-
dispose( slot );
146-
}
138+
}
139+
// We've acquired the slot, but the validation strategy says it's time for it to die.
140+
// Either the slot is already claimed or if it is available make it claimed
141+
else if ( slot.isClaimedOrAvailableToClaimed() )
142+
{
143+
dispose( slot );
147144
}
148145
}
149146
else
@@ -343,6 +340,11 @@ public boolean claimedToDisposed()
343340
return state.compareAndSet( State.CLAIMED, State.DISPOSED );
344341
}
345342

343+
public boolean isClaimedOrAvailableToClaimed()
344+
{
345+
return availableToClaimed() || state.get() == State.CLAIMED;
346+
}
347+
346348
public void updateUsageTimestamp()
347349
{
348350
lastUsed = clock.millis();

driver/src/test/java/org/neo4j/driver/internal/pool/ThreadCachingPoolTest.java

Lines changed: 111 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,33 +18,45 @@
1818
*/
1919
package org.neo4j.driver.internal.pool;
2020

21+
import org.junit.Before;
22+
import org.junit.Rule;
23+
import org.junit.Test;
24+
import org.junit.rules.ExpectedException;
25+
26+
import java.lang.invoke.MethodHandle;
27+
import java.lang.invoke.MethodHandles;
28+
import java.lang.reflect.Field;
2129
import java.util.Arrays;
2230
import java.util.Collections;
2331
import java.util.LinkedList;
2432
import java.util.List;
33+
import java.util.concurrent.BlockingQueue;
34+
import java.util.concurrent.ExecutorService;
35+
import java.util.concurrent.Executors;
2536
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.atomic.AtomicBoolean;
2638
import java.util.concurrent.atomic.AtomicInteger;
2739

28-
import org.junit.Before;
29-
import org.junit.Rule;
30-
import org.junit.Test;
31-
import org.junit.rules.ExpectedException;
32-
3340
import org.neo4j.driver.internal.util.Clock;
3441
import org.neo4j.driver.internal.util.Consumer;
3542
import org.neo4j.driver.v1.exceptions.ClientException;
3643

3744
import static junit.framework.TestCase.fail;
3845
import static org.hamcrest.MatcherAssert.assertThat;
46+
import static org.hamcrest.Matchers.empty;
3947
import static org.hamcrest.Matchers.equalTo;
48+
import static org.hamcrest.Matchers.hasSize;
4049
import static org.junit.Assert.assertNull;
50+
import static org.junit.Assert.assertTrue;
4151

4252
public class ThreadCachingPoolTest
4353
{
4454
private final List<PooledObject> inUse = new LinkedList<>();
4555
private final List<PooledObject> inPool = new LinkedList<>();
4656
private final List<PooledObject> disposed = new LinkedList<>();
47-
57+
private final MethodHandle liveQueueGet = queueGetter( "live" );
58+
private final MethodHandle disposedQueueGet = queueGetter( "disposed" );
59+
private final ExecutorService executor = Executors.newFixedThreadPool( 10 );
4860
private static AtomicInteger IDGEN = new AtomicInteger();
4961

5062
@Rule
@@ -127,7 +139,7 @@ public void shouldDisposeOfInvalidItems() throws Throwable
127139
{
128140
// Given
129141
ThreadCachingPool<PooledObject>
130-
pool = new ThreadCachingPool<>( 4, trackAllocator, invalidIfIdIs(0), Clock.SYSTEM );
142+
pool = new ThreadCachingPool<>( 4, trackAllocator, invalidIfIdIs( 0 ), Clock.SYSTEM );
131143

132144
// And given we've allocated/releasd object with id 0 once (no validation on first allocation)
133145
// TODO: Is that the right thing to do? I assume the allocator will allocate healthy objects..
@@ -137,8 +149,8 @@ public void shouldDisposeOfInvalidItems() throws Throwable
137149
pool.acquire( 10, TimeUnit.SECONDS );
138150

139151
// Then object with id 0 should've been disposed of, and we should have one live object with id 1
140-
assertThat( inPool, equalTo( none() ) );
141-
assertThat( inUse, equalTo( items( 1 ) ) );
152+
assertThat( inPool, equalTo( none() ) );
153+
assertThat( inUse, equalTo( items( 1 ) ) );
142154
assertThat( disposed, equalTo( items( 0 ) ) );
143155
}
144156

@@ -171,8 +183,8 @@ public void shouldDisposeOfObjectsThatBecomeInvalidWhileInUse() throws Throwable
171183
val.invalidate().release();
172184

173185
// Then
174-
assertThat( inPool, equalTo( none() ) );
175-
assertThat( inUse, equalTo( none() ) );
186+
assertThat( inPool, equalTo( none() ) );
187+
assertThat( inUse, equalTo( none() ) );
176188
assertThat( disposed, equalTo( items( val ) ) );
177189
}
178190

@@ -191,9 +203,9 @@ public void shouldRecoverFromItemCreationFailure() throws Throwable
191203
try
192204
{
193205
pool.acquire( 10, TimeUnit.SECONDS );
194-
fail("Should not succeed at allocating any item here.");
206+
fail( "Should not succeed at allocating any item here." );
195207
}
196-
catch( ClientException e )
208+
catch ( ClientException e )
197209
{
198210
// Expected
199211
}
@@ -207,8 +219,8 @@ public void shouldRecoverFromItemCreationFailure() throws Throwable
207219
{
208220
pool.acquire( 10, TimeUnit.SECONDS );
209221
}
210-
assertThat( inPool, equalTo( none() ) );
211-
assertThat( inUse, equalTo( items( 0, 1, 2, 3 ) ) );
222+
assertThat( inPool, equalTo( none() ) );
223+
assertThat( inUse, equalTo( items( 0, 1, 2, 3 ) ) );
212224
assertThat( disposed, equalTo( none() ) ); // because allocation fails, onDispose is not called
213225
}
214226

@@ -256,10 +268,88 @@ public void shouldRecovedDisposedItemReallocationFailing() throws Throwable
256268
assertThat( inPool, equalTo( none() ) );
257269
assertThat( inUse, equalTo( items( 2, 3 ) ) );
258270
// only the first two items value onDispose called, since allocation fails after that
259-
assertThat( disposed, equalTo( items( 0, 1) ) );
271+
assertThat( disposed, equalTo( items( 0, 1 ) ) );
272+
}
273+
274+
@SuppressWarnings( "unchecked" )
275+
@Test
276+
public void shouldNotHaveReferenceAsBothLiveAndDisposed() throws Throwable
277+
{
278+
// Given
279+
final ThreadCachingPool<PooledObject>
280+
pool = new ThreadCachingPool<>( 4, trackAllocator, checkInvalidateFlag, Clock.SYSTEM );
281+
282+
// This object will be cached in ThreadLocal
283+
final PooledObject obj1 = pool.acquire( 10, TimeUnit.SECONDS );
284+
285+
//This will add another object to the live queue
286+
assertTrue( acquireInSeparateThread( pool ) );
287+
288+
//Now we release the first object, meaning that it will be added
289+
//to the live queue (as well as being cached as ThreadLocal in this thread)
290+
obj1.release();
291+
//Now we invalidate the object
292+
obj1.invalidate();
293+
294+
// When
295+
//Now the cached object is invalidated, we should now pick the object
296+
//from the live objects created in the background thread
297+
PooledObject obj2 = pool.acquire( 10, TimeUnit.SECONDS );
298+
299+
//THEN
300+
assertThat( obj1.id, equalTo( 0 ) );
301+
assertThat( obj2.id, equalTo( 1 ) );
302+
BlockingQueue<Slot<PooledObject>> liveQueue = (BlockingQueue<Slot<PooledObject>>) liveQueueGet.invoke( pool );
303+
BlockingQueue<Slot<PooledObject>> disposedQueue =
304+
(BlockingQueue<Slot<PooledObject>>) disposedQueueGet.invoke( pool );
305+
306+
assertThat( disposedQueue, empty() );
307+
assertThat( liveQueue, hasSize( 1 ) );
308+
assertThat( liveQueue.poll( 10, TimeUnit.SECONDS ).value.id, equalTo( 0 ) );
309+
}
310+
311+
private boolean acquireInSeparateThread( final ThreadCachingPool<PooledObject> pool ) throws InterruptedException
312+
{
313+
final AtomicBoolean succeeded = new AtomicBoolean( true );
314+
executor.execute( new Runnable()
315+
{
316+
@Override
317+
public void run()
318+
{
319+
try
320+
{
321+
PooledObject obj = pool.acquire( 10, TimeUnit.MINUTES );
322+
obj.release();
323+
}
324+
catch ( InterruptedException e )
325+
{
326+
succeeded.set( false );
327+
}
328+
}
329+
} );
330+
executor.awaitTermination( 2, TimeUnit.SECONDS );
331+
return succeeded.get();
332+
}
333+
334+
335+
//This is terrible hack, but I really want to keep the queues private in
336+
//ThreadCachingPool
337+
private static MethodHandle queueGetter( String name )
338+
{
339+
try
340+
{
341+
MethodHandles.Lookup lookup = MethodHandles.lookup();
342+
Field value = ThreadCachingPool.class.getDeclaredField( name );
343+
value.setAccessible( true );
344+
return lookup.unreflectGetter( value );
345+
}
346+
catch ( NoSuchFieldException | IllegalAccessException e )
347+
{
348+
throw new AssertionError( e );
349+
}
260350
}
261351

262-
private List<PooledObject> items( int ... objects )
352+
private List<PooledObject> items( int... objects )
263353
{
264354
List<PooledObject> out = new LinkedList<>();
265355
for ( int id : objects )
@@ -269,9 +359,9 @@ private List<PooledObject> items( int ... objects )
269359
return out;
270360
}
271361

272-
private List<PooledObject> items( PooledObject ... objects )
362+
private List<PooledObject> items( PooledObject... objects )
273363
{
274-
return Arrays.asList(objects);
364+
return Arrays.asList( objects );
275365
}
276366

277367
private List<PooledObject> none()
@@ -305,7 +395,7 @@ private class PooledObject
305395

306396
public PooledObject( Consumer<PooledObject> release )
307397
{
308-
this(IDGEN.getAndIncrement(), release);
398+
this( IDGEN.getAndIncrement(), release );
309399
}
310400

311401
public PooledObject( int id, Consumer<PooledObject> release )
@@ -362,7 +452,7 @@ private class TestAllocator implements Allocator<PooledObject>
362452
@Override
363453
public PooledObject allocate( Consumer<PooledObject> release )
364454
{
365-
if( creationException != null )
455+
if ( creationException != null )
366456
{
367457
throw creationException;
368458
}

0 commit comments

Comments
 (0)