Skip to content

Commit 246ebaa

Browse files
committed
Avoid marking objects as both live and disposed
In the `ThreadCachingPool` whenever an object is picked from the `ThreadLocal` we cannot add it to the `disposed` queue directly since the object will already be present in the `live` queue (or will be on release). Doing that means that the object will be present both in the `live` and `disposed` queue and also means that on reallocation we can multiple copies of the same reference and can lead to that the `live` queue blows up in size.
1 parent bd0ad3a commit 246ebaa

File tree

3 files changed

+328
-68
lines changed

3 files changed

+328
-68
lines changed

driver/src/main/java/org/neo4j/driver/internal/pool/ThreadCachingPool.java

Lines changed: 43 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,9 @@ public T acquire( long timeout, TimeUnit unit ) throws InterruptedException
106106
allocator.onAcquire( slot.value );
107107
return slot.value;
108108
}
109-
else
110-
{
111-
// We've acquired the slot, but the validation strategy says it's time for it to die. Dispose of it,
112-
// and go to the global pool.
113-
dispose( slot );
114-
}
109+
110+
//The slot was invalidated however we cannot put it to the
111+
//disposed queue yet since it already exists in the live queue
115112
}
116113

117114
// 2. If that fails, acquire from big pool
@@ -133,17 +130,18 @@ private T acquireFromGlobal( long deadline ) throws InterruptedException
133130
if ( slot != null )
134131
{
135132
// Yay, got a slot - can we keep it?
136-
if ( slot.availableToClaimed() )
133+
if ( slot.isValid( validationStrategy ) )
137134
{
138-
if ( slot.isValid( validationStrategy ) )
135+
if ( slot.availableToClaimed() )
139136
{
140137
break;
141138
}
142-
else
143-
{
144-
// We've acquired the slot, but the validation strategy says it's time for it to die.
145-
dispose( slot );
146-
}
139+
}
140+
// We've acquired the slot, but the validation strategy says it's time for it to die.
141+
// Either the slot is already claimed or if it is available make it claimed
142+
else if ( slot.isClaimedOrAvailableToClaimed() )
143+
{
144+
dispose( slot );
147145
}
148146
}
149147
else
@@ -186,15 +184,13 @@ private T acquireFromGlobal( long deadline ) throws InterruptedException
186184

187185
private void dispose( Slot<T> slot )
188186
{
189-
if ( !slot.claimedToDisposed() )
187+
if ( slot.claimedToDisposed() )
190188
{
191-
throw new IllegalStateException( "Cannot dispose unclaimed pool object: " + slot );
189+
// Done before below, in case dispose call fails. This is safe since objects on the
190+
// pool are used for read-only operations
191+
disposed.add( slot );
192+
allocator.onDispose( slot.value );
192193
}
193-
194-
// Done before below, in case dispose call fails. This is safe since objects on the
195-
// pool are used for read-only operations
196-
disposed.add( slot );
197-
allocator.onDispose( slot.value );
198194
}
199195

200196
/**
@@ -234,33 +230,30 @@ private Consumer<T> createDisposeCallback( final Slot<T> slot )
234230
public void accept( T t )
235231
{
236232
slot.updateUsageTimestamp();
237-
if ( !slot.isValid( validationStrategy ) )
233+
if ( !slot.isValid( validationStrategy) )
238234
{
239-
// The value has for some reason become invalid, dispose of it
240235
dispose( slot );
241236
return;
242237
}
243238

244-
if ( !slot.claimedToAvailable() )
245-
{
246-
throw new IllegalStateException( "Failed to release pooled object: " + slot );
247-
}
248-
249-
// Make sure the pool isn't being stopped in the middle of all these shenanigans
250-
if ( !stopped.get() )
251-
{
252-
// All good, as you were.
253-
live.add( slot );
254-
}
255-
else
239+
if ( slot.claimedToAvailable() )
256240
{
257-
// Another thread concurrently closing the pool may have started closing before we
258-
// set our slot to "available". In that case, the slot will not be disposed of by the closing thread
259-
// We mitigate this by trying to claim the slot back - if we are able to, we dispose the slot.
260-
// If we can't claim the slot back, that means another thread is dealing with it.
261-
if ( slot.availableToClaimed() )
241+
// Make sure the pool isn't being stopped in the middle of all these shenanigans
242+
if ( !stopped.get() )
262243
{
263-
dispose( slot );
244+
// All good, as you were.
245+
live.add( slot );
246+
}
247+
else
248+
{
249+
// Another thread concurrently closing the pool may have started closing before we
250+
// set our slot to "available". In that case, the slot will not be disposed of by the closing thread
251+
// We mitigate this by trying to claim the slot back - if we are able to, we dispose the slot.
252+
// If we can't claim the slot back, that means another thread is dealing with it.
253+
if ( slot.availableToClaimed() )
254+
{
255+
dispose( slot );
256+
}
264257
}
265258
}
266259
}
@@ -304,13 +297,6 @@ enum State
304297
long lastUsed;
305298
T value;
306299

307-
public static <T> Slot<T> disposed( int index, Clock clock )
308-
{
309-
Slot<T> slot = new Slot<>( index, clock );
310-
slot.claimedToDisposed();
311-
return slot;
312-
}
313-
314300
/**
315301
* @param index the index into the {@link ThreadCachingPool#all all} array, used to re-use that slot when this is
316302
* disposed
@@ -343,6 +329,16 @@ public boolean claimedToDisposed()
343329
return state.compareAndSet( State.CLAIMED, State.DISPOSED );
344330
}
345331

332+
public boolean isClaimedOrAvailableToClaimed()
333+
{
334+
return availableToClaimed() || state.get() == State.CLAIMED;
335+
}
336+
337+
public boolean disposed()
338+
{
339+
return state.get() == State.DISPOSED;
340+
}
341+
346342
public void updateUsageTimestamp()
347343
{
348344
lastUsed = clock.millis();

driver/src/test/java/org/neo4j/driver/internal/pool/ThreadCachingPoolTest.java

Lines changed: 111 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,33 +18,45 @@
1818
*/
1919
package org.neo4j.driver.internal.pool;
2020

21+
import org.junit.Before;
22+
import org.junit.Rule;
23+
import org.junit.Test;
24+
import org.junit.rules.ExpectedException;
25+
26+
import java.lang.invoke.MethodHandle;
27+
import java.lang.invoke.MethodHandles;
28+
import java.lang.reflect.Field;
2129
import java.util.Arrays;
2230
import java.util.Collections;
2331
import java.util.LinkedList;
2432
import java.util.List;
33+
import java.util.concurrent.BlockingQueue;
34+
import java.util.concurrent.ExecutorService;
35+
import java.util.concurrent.Executors;
2536
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.atomic.AtomicBoolean;
2638
import java.util.concurrent.atomic.AtomicInteger;
2739

28-
import org.junit.Before;
29-
import org.junit.Rule;
30-
import org.junit.Test;
31-
import org.junit.rules.ExpectedException;
32-
3340
import org.neo4j.driver.internal.util.Clock;
3441
import org.neo4j.driver.internal.util.Consumer;
3542
import org.neo4j.driver.v1.exceptions.ClientException;
3643

3744
import static junit.framework.TestCase.fail;
3845
import static org.hamcrest.MatcherAssert.assertThat;
46+
import static org.hamcrest.Matchers.empty;
3947
import static org.hamcrest.Matchers.equalTo;
48+
import static org.hamcrest.Matchers.hasSize;
4049
import static org.junit.Assert.assertNull;
50+
import static org.junit.Assert.assertTrue;
4151

4252
public class ThreadCachingPoolTest
4353
{
4454
private final List<PooledObject> inUse = new LinkedList<>();
4555
private final List<PooledObject> inPool = new LinkedList<>();
4656
private final List<PooledObject> disposed = new LinkedList<>();
47-
57+
private final MethodHandle liveQueueGet = queueGetter( "live" );
58+
private final MethodHandle disposedQueueGet = queueGetter( "disposed" );
59+
private final ExecutorService executor = Executors.newFixedThreadPool( 10 );
4860
private static AtomicInteger IDGEN = new AtomicInteger();
4961

5062
@Rule
@@ -127,7 +139,7 @@ public void shouldDisposeOfInvalidItems() throws Throwable
127139
{
128140
// Given
129141
ThreadCachingPool<PooledObject>
130-
pool = new ThreadCachingPool<>( 4, trackAllocator, invalidIfIdIs(0), Clock.SYSTEM );
142+
pool = new ThreadCachingPool<>( 4, trackAllocator, invalidIfIdIs( 0 ), Clock.SYSTEM );
131143

132144
// And given we've allocated/releasd object with id 0 once (no validation on first allocation)
133145
// TODO: Is that the right thing to do? I assume the allocator will allocate healthy objects..
@@ -137,8 +149,8 @@ public void shouldDisposeOfInvalidItems() throws Throwable
137149
pool.acquire( 10, TimeUnit.SECONDS );
138150

139151
// Then object with id 0 should've been disposed of, and we should have one live object with id 1
140-
assertThat( inPool, equalTo( none() ) );
141-
assertThat( inUse, equalTo( items( 1 ) ) );
152+
assertThat( inPool, equalTo( none() ) );
153+
assertThat( inUse, equalTo( items( 1 ) ) );
142154
assertThat( disposed, equalTo( items( 0 ) ) );
143155
}
144156

@@ -171,8 +183,8 @@ public void shouldDisposeOfObjectsThatBecomeInvalidWhileInUse() throws Throwable
171183
val.invalidate().release();
172184

173185
// Then
174-
assertThat( inPool, equalTo( none() ) );
175-
assertThat( inUse, equalTo( none() ) );
186+
assertThat( inPool, equalTo( none() ) );
187+
assertThat( inUse, equalTo( none() ) );
176188
assertThat( disposed, equalTo( items( val ) ) );
177189
}
178190

@@ -191,9 +203,9 @@ public void shouldRecoverFromItemCreationFailure() throws Throwable
191203
try
192204
{
193205
pool.acquire( 10, TimeUnit.SECONDS );
194-
fail("Should not succeed at allocating any item here.");
206+
fail( "Should not succeed at allocating any item here." );
195207
}
196-
catch( ClientException e )
208+
catch ( ClientException e )
197209
{
198210
// Expected
199211
}
@@ -207,8 +219,8 @@ public void shouldRecoverFromItemCreationFailure() throws Throwable
207219
{
208220
pool.acquire( 10, TimeUnit.SECONDS );
209221
}
210-
assertThat( inPool, equalTo( none() ) );
211-
assertThat( inUse, equalTo( items( 0, 1, 2, 3 ) ) );
222+
assertThat( inPool, equalTo( none() ) );
223+
assertThat( inUse, equalTo( items( 0, 1, 2, 3 ) ) );
212224
assertThat( disposed, equalTo( none() ) ); // because allocation fails, onDispose is not called
213225
}
214226

@@ -256,10 +268,88 @@ public void shouldRecovedDisposedItemReallocationFailing() throws Throwable
256268
assertThat( inPool, equalTo( none() ) );
257269
assertThat( inUse, equalTo( items( 2, 3 ) ) );
258270
// only the first two items value onDispose called, since allocation fails after that
259-
assertThat( disposed, equalTo( items( 0, 1) ) );
271+
assertThat( disposed, equalTo( items( 0, 1 ) ) );
272+
}
273+
274+
@SuppressWarnings( "unchecked" )
275+
@Test
276+
public void shouldNotHaveReferenceAsBothLiveAndDisposed() throws Throwable
277+
{
278+
// Given
279+
final ThreadCachingPool<PooledObject>
280+
pool = new ThreadCachingPool<>( 4, trackAllocator, checkInvalidateFlag, Clock.SYSTEM );
281+
282+
// This object will be cached in ThreadLocal
283+
final PooledObject obj1 = pool.acquire( 10, TimeUnit.SECONDS );
284+
285+
//This will add another object to the live queue
286+
assertTrue( acquireInSeparateThread( pool ) );
287+
288+
//Now we release the first object, meaning that it will be added
289+
//to the live queue (as well as being cached as ThreadLocal in this thread)
290+
obj1.release();
291+
//Now we invalidate the object
292+
obj1.invalidate();
293+
294+
// When
295+
//Now the cached object is invalidated, we should now pick the object
296+
//from the live objects created in the background thread
297+
PooledObject obj2 = pool.acquire( 10, TimeUnit.SECONDS );
298+
299+
//THEN
300+
assertThat( obj1.id, equalTo( 0 ) );
301+
assertThat( obj2.id, equalTo( 1 ) );
302+
BlockingQueue<Slot<PooledObject>> liveQueue = (BlockingQueue<Slot<PooledObject>>) liveQueueGet.invoke( pool );
303+
BlockingQueue<Slot<PooledObject>> disposedQueue =
304+
(BlockingQueue<Slot<PooledObject>>) disposedQueueGet.invoke( pool );
305+
306+
assertThat( disposedQueue, empty() );
307+
assertThat( liveQueue, hasSize( 1 ) );
308+
assertThat( liveQueue.poll( 10, TimeUnit.SECONDS ).value.id, equalTo( 0 ) );
309+
}
310+
311+
private boolean acquireInSeparateThread( final ThreadCachingPool<PooledObject> pool ) throws InterruptedException
312+
{
313+
final AtomicBoolean succeeded = new AtomicBoolean( true );
314+
executor.execute( new Runnable()
315+
{
316+
@Override
317+
public void run()
318+
{
319+
try
320+
{
321+
PooledObject obj = pool.acquire( 10, TimeUnit.MINUTES );
322+
obj.release();
323+
}
324+
catch ( InterruptedException e )
325+
{
326+
succeeded.set( false );
327+
}
328+
}
329+
} );
330+
executor.awaitTermination( 2, TimeUnit.SECONDS );
331+
return succeeded.get();
332+
}
333+
334+
335+
//This is terrible hack, but I really want to keep the queues private in
336+
//ThreadCachingPool
337+
private static MethodHandle queueGetter( String name )
338+
{
339+
try
340+
{
341+
MethodHandles.Lookup lookup = MethodHandles.lookup();
342+
Field value = ThreadCachingPool.class.getDeclaredField( name );
343+
value.setAccessible( true );
344+
return lookup.unreflectGetter( value );
345+
}
346+
catch ( NoSuchFieldException | IllegalAccessException e )
347+
{
348+
throw new AssertionError( e );
349+
}
260350
}
261351

262-
private List<PooledObject> items( int ... objects )
352+
private List<PooledObject> items( int... objects )
263353
{
264354
List<PooledObject> out = new LinkedList<>();
265355
for ( int id : objects )
@@ -269,9 +359,9 @@ private List<PooledObject> items( int ... objects )
269359
return out;
270360
}
271361

272-
private List<PooledObject> items( PooledObject ... objects )
362+
private List<PooledObject> items( PooledObject... objects )
273363
{
274-
return Arrays.asList(objects);
364+
return Arrays.asList( objects );
275365
}
276366

277367
private List<PooledObject> none()
@@ -305,7 +395,7 @@ private class PooledObject
305395

306396
public PooledObject( Consumer<PooledObject> release )
307397
{
308-
this(IDGEN.getAndIncrement(), release);
398+
this( IDGEN.getAndIncrement(), release );
309399
}
310400

311401
public PooledObject( int id, Consumer<PooledObject> release )
@@ -362,7 +452,7 @@ private class TestAllocator implements Allocator<PooledObject>
362452
@Override
363453
public PooledObject allocate( Consumer<PooledObject> release )
364454
{
365-
if( creationException != null )
455+
if ( creationException != null )
366456
{
367457
throw creationException;
368458
}

0 commit comments

Comments
 (0)