Skip to content

Commit 429e7f5

Browse files
committed
Don't add everything back to live queue
Previously whenever an object was picked off the thread local cache it was added back to the live queue even thought it was never removed from there. This lead to indefinite growth of the live queue.
1 parent 246ebaa commit 429e7f5

File tree

2 files changed

+106
-9
lines changed

2 files changed

+106
-9
lines changed

driver/src/main/java/org/neo4j/driver/internal/pool/ThreadCachingPool.java

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,18 +94,22 @@ public ThreadCachingPool( int targetSize, Allocator<T> allocator, ValidationStra
9494

9595
public T acquire( long timeout, TimeUnit unit ) throws InterruptedException
9696
{
97+
assert live.size() <= maxSize;
9798
long deadline = clock.millis() + unit.toMillis( timeout );
9899

99100
// 1. Try and value an object from our local slot
100101
Slot<T> slot = local.get();
101102

102-
if ( slot != null && slot.availableToClaimed() )
103+
if ( slot != null && slot.availableToThreadLocalClaimed() )
103104
{
104105
if ( slot.isValid( validationStrategy ) )
105106
{
106107
allocator.onAcquire( slot.value );
107108
return slot.value;
108109
}
110+
else {
111+
dispose( slot );
112+
}
109113

110114
//The slot was invalidated however we cannot put it to the
111115
//disposed queue yet since it already exists in the live queue
@@ -177,14 +181,32 @@ else if ( slot.isClaimedOrAvailableToClaimed() )
177181

178182
// Keep this slot cached with our thread, so that we can grab this value quickly next time,
179183
// assuming threads generally availableToClaimed one instance at a time
180-
local.set( slot );
184+
updateThreadLocal( slot );
181185
allocator.onAcquire( slot.value );
182186
return slot.value;
183187
}
184188

189+
private void updateThreadLocal(Slot<T> slot)
190+
{
191+
Slot<T> localSlot = local.get();
192+
if ( localSlot != null )
193+
{
194+
//The old slot is no longer in the tread local
195+
localSlot.threadLocalClaimedToClaimed();
196+
}
197+
else
198+
{
199+
//There was nothing stored in thread local
200+
//no we must also add this slot to the live queue
201+
live.add( slot );
202+
}
203+
slot.claimByThreadLocal();
204+
local.set( slot );
205+
}
206+
185207
private void dispose( Slot<T> slot )
186208
{
187-
if ( slot.claimedToDisposed() )
209+
if ( slot.claimedToDisposed() || slot.threadLocalClaimedToDisposed() )
188210
{
189211
// Done before below, in case dispose call fails. This is safe since objects on the
190212
// pool are used for read-only operations
@@ -213,7 +235,7 @@ private Slot<T> allocate( int slotIndex )
213235
// Return it :)
214236
return slot;
215237
}
216-
catch( Neo4jException e )
238+
catch ( Neo4jException e )
217239
{
218240
// Failed to allocate slot, return it to the list of disposed slots, rethrow exception.
219241
slot.claimedToDisposed();
@@ -230,7 +252,7 @@ private Consumer<T> createDisposeCallback( final Slot<T> slot )
230252
public void accept( T t )
231253
{
232254
slot.updateUsageTimestamp();
233-
if ( !slot.isValid( validationStrategy) )
255+
if ( !slot.isValid( validationStrategy ) )
234256
{
235257
dispose( slot );
236258
return;
@@ -256,6 +278,17 @@ public void accept( T t )
256278
}
257279
}
258280
}
281+
282+
// If we are claimed by thread local we are already in the live queue
283+
if ( slot.threadLocalClaimedToAvailable() && stopped.get() )
284+
{
285+
// As above, try to claim the slot back and dispose
286+
if ( slot.availableToClaimed() )
287+
{
288+
dispose( slot );
289+
}
290+
}
291+
259292
}
260293
};
261294
}
@@ -286,6 +319,7 @@ class Slot<T>
286319
enum State
287320
{
288321
AVAILABLE,
322+
THREAD_LOCAL_CLAIMED,
289323
CLAIMED,
290324
DISPOSED
291325
}
@@ -318,6 +352,11 @@ public boolean availableToClaimed()
318352
return state.compareAndSet( State.AVAILABLE, State.CLAIMED );
319353
}
320354

355+
public boolean availableToThreadLocalClaimed()
356+
{
357+
return state.compareAndSet( State.AVAILABLE, State.THREAD_LOCAL_CLAIMED );
358+
}
359+
321360
public boolean claimedToAvailable()
322361
{
323362
updateUsageTimestamp();
@@ -329,6 +368,26 @@ public boolean claimedToDisposed()
329368
return state.compareAndSet( State.CLAIMED, State.DISPOSED );
330369
}
331370

371+
public boolean threadLocalClaimedToDisposed()
372+
{
373+
return state.compareAndSet( State.THREAD_LOCAL_CLAIMED, State.DISPOSED );
374+
}
375+
376+
public boolean threadLocalClaimedToClaimed()
377+
{
378+
return state.compareAndSet( State.THREAD_LOCAL_CLAIMED, State.CLAIMED );
379+
}
380+
381+
public boolean threadLocalClaimedToAvailable()
382+
{
383+
return state.compareAndSet( State.THREAD_LOCAL_CLAIMED, State.AVAILABLE );
384+
}
385+
386+
public void claimByThreadLocal()
387+
{
388+
state.set( State.THREAD_LOCAL_CLAIMED );
389+
}
390+
332391
public boolean isClaimedOrAvailableToClaimed()
333392
{
334393
return availableToClaimed() || state.get() == State.CLAIMED;

driver/src/test/java/org/neo4j/driver/internal/pool/ThreadCachingPoolTest.java

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -303,9 +303,47 @@ public void shouldNotHaveReferenceAsBothLiveAndDisposed() throws Throwable
303303
BlockingQueue<Slot<PooledObject>> disposedQueue =
304304
(BlockingQueue<Slot<PooledObject>>) disposedQueueGet.invoke( pool );
305305

306-
assertThat( disposedQueue, empty() );
307-
assertThat( liveQueue, hasSize( 1 ) );
308-
assertThat( liveQueue.poll( 10, TimeUnit.SECONDS ).value.id, equalTo( 0 ) );
306+
assertThat( disposedQueue, hasSize( 1 ) );
307+
assertThat( disposedQueue.poll( 10, TimeUnit.SECONDS ).value.id, equalTo( 0 ) );
308+
assertThat( liveQueue, empty() );
309+
}
310+
311+
@SuppressWarnings( "unchecked" )
312+
@Test
313+
public void shouldNotAddToLiveQueueTwice() throws Throwable
314+
{
315+
// Given
316+
ThreadCachingPool<PooledObject>
317+
pool = new ThreadCachingPool<>( 4, trackAllocator, checkInvalidateFlag, Clock.SYSTEM );
318+
319+
// When
320+
PooledObject o1 = pool.acquire( 10, TimeUnit.SECONDS );
321+
o1.release();
322+
PooledObject o2 = pool.acquire( 10, TimeUnit.SECONDS );
323+
o2.release();
324+
325+
// Then
326+
BlockingQueue<Slot<PooledObject>> liveQueue = (BlockingQueue<Slot<PooledObject>>) liveQueueGet.invoke( pool );
327+
assertThat(liveQueue, hasSize( 1 ));
328+
}
329+
330+
@SuppressWarnings( "unchecked" )
331+
@Test
332+
public void shouldNotAddToLiveQueueTwice2() throws Throwable
333+
{
334+
// Given
335+
ThreadCachingPool<PooledObject>
336+
pool = new ThreadCachingPool<>( 4, trackAllocator, checkInvalidateFlag, Clock.SYSTEM );
337+
PooledObject o1 = pool.acquire( 10, TimeUnit.SECONDS );
338+
PooledObject o2 = pool.acquire( 10, TimeUnit.SECONDS );
339+
340+
// When
341+
o1.release();
342+
o2.release();
343+
344+
// Then
345+
BlockingQueue<Slot<PooledObject>> liveQueue = (BlockingQueue<Slot<PooledObject>>) liveQueueGet.invoke( pool );
346+
assertThat(liveQueue, hasSize( 1 ));
309347
}
310348

311349
private boolean acquireInSeparateThread( final ThreadCachingPool<PooledObject> pool ) throws InterruptedException
@@ -318,7 +356,7 @@ public void run()
318356
{
319357
try
320358
{
321-
PooledObject obj = pool.acquire( 10, TimeUnit.MINUTES );
359+
PooledObject obj = pool.acquire( 10, TimeUnit.SECONDS );
322360
obj.release();
323361
}
324362
catch ( InterruptedException e )

0 commit comments

Comments
 (0)