Skip to content

Commit bd16b41

Browse files
committed
Merge pull request neo4j#178 from pontusmelke/1.0-thread-caching-bug
Avoid marking objects as both live and disposed
2 parents 9f3876a + 4851937 commit bd16b41

File tree

4 files changed

+540
-63
lines changed

4 files changed

+540
-63
lines changed

driver/src/main/java/org/neo4j/driver/internal/pool/ThreadCachingPool.java

Lines changed: 97 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -94,24 +94,25 @@ public ThreadCachingPool( int targetSize, Allocator<T> allocator, ValidationStra
9494

9595
public T acquire( long timeout, TimeUnit unit ) throws InterruptedException
9696
{
97+
assert live.size() <= maxSize;
9798
long deadline = clock.millis() + unit.toMillis( timeout );
9899

99100
// 1. Try and value an object from our local slot
100101
Slot<T> slot = local.get();
101102

102-
if ( slot != null && slot.availableToClaimed() )
103+
if ( slot != null && slot.availableToThreadLocalClaimed() )
103104
{
104105
if ( slot.isValid( validationStrategy ) )
105106
{
106107
allocator.onAcquire( slot.value );
107108
return slot.value;
108109
}
109-
else
110-
{
111-
// We've acquired the slot, but the validation strategy says it's time for it to die. Dispose of it,
112-
// and go to the global pool.
110+
else {
113111
dispose( slot );
114112
}
113+
114+
//The slot was invalidated however we cannot put it to the
115+
//disposed queue yet since it already exists in the live queue
115116
}
116117

117118
// 2. If that fails, acquire from big pool
@@ -133,17 +134,18 @@ private T acquireFromGlobal( long deadline ) throws InterruptedException
133134
if ( slot != null )
134135
{
135136
// Yay, got a slot - can we keep it?
136-
if ( slot.availableToClaimed() )
137+
if ( slot.isValid( validationStrategy ) )
137138
{
138-
if ( slot.isValid( validationStrategy ) )
139+
if ( slot.availableToClaimed() )
139140
{
140141
break;
141142
}
142-
else
143-
{
144-
// We've acquired the slot, but the validation strategy says it's time for it to die.
145-
dispose( slot );
146-
}
143+
}
144+
// We've acquired the slot, but the validation strategy says it's time for it to die.
145+
// Either the slot is already claimed or if it is available make it claimed
146+
else if ( slot.isClaimedOrAvailableToClaimed() )
147+
{
148+
dispose( slot );
147149
}
148150
}
149151
else
@@ -179,22 +181,38 @@ private T acquireFromGlobal( long deadline ) throws InterruptedException
179181

180182
// Keep this slot cached with our thread, so that we can grab this value quickly next time,
181183
// assuming threads generally availableToClaimed one instance at a time
182-
local.set( slot );
184+
updateThreadLocal( slot );
183185
allocator.onAcquire( slot.value );
184186
return slot.value;
185187
}
186188

187-
private void dispose( Slot<T> slot )
189+
private void updateThreadLocal(Slot<T> slot)
188190
{
189-
if ( !slot.claimedToDisposed() )
191+
Slot<T> localSlot = local.get();
192+
if ( localSlot != null )
190193
{
191-
throw new IllegalStateException( "Cannot dispose unclaimed pool object: " + slot );
194+
//The old slot is no longer in the tread local
195+
localSlot.threadLocalClaimedToClaimed();
192196
}
197+
else
198+
{
199+
//There was nothing stored in thread local
200+
//no we must also add this slot to the live queue
201+
live.add( slot );
202+
}
203+
slot.claimByThreadLocal();
204+
local.set( slot );
205+
}
193206

194-
// Done before below, in case dispose call fails. This is safe since objects on the
195-
// pool are used for read-only operations
196-
disposed.add( slot );
197-
allocator.onDispose( slot.value );
207+
private void dispose( Slot<T> slot )
208+
{
209+
if ( slot.claimedToDisposed() || slot.threadLocalClaimedToDisposed() )
210+
{
211+
// Done before below, in case dispose call fails. This is safe since objects on the
212+
// pool are used for read-only operations
213+
disposed.add( slot );
214+
allocator.onDispose( slot.value );
215+
}
198216
}
199217

200218
/**
@@ -217,7 +235,7 @@ private Slot<T> allocate( int slotIndex )
217235
// Return it :)
218236
return slot;
219237
}
220-
catch( Neo4jException e )
238+
catch ( Neo4jException e )
221239
{
222240
// Failed to allocate slot, return it to the list of disposed slots, rethrow exception.
223241
slot.claimedToDisposed();
@@ -236,33 +254,41 @@ public void accept( T t )
236254
slot.updateUsageTimestamp();
237255
if ( !slot.isValid( validationStrategy ) )
238256
{
239-
// The value has for some reason become invalid, dispose of it
240257
dispose( slot );
241258
return;
242259
}
243260

244-
if ( !slot.claimedToAvailable() )
261+
if ( slot.claimedToAvailable() )
245262
{
246-
throw new IllegalStateException( "Failed to release pooled object: " + slot );
263+
// Make sure the pool isn't being stopped in the middle of all these shenanigans
264+
if ( !stopped.get() )
265+
{
266+
// All good, as you were.
267+
live.add( slot );
268+
}
269+
else
270+
{
271+
// Another thread concurrently closing the pool may have started closing before we
272+
// set our slot to "available". In that case, the slot will not be disposed of by the closing thread
273+
// We mitigate this by trying to claim the slot back - if we are able to, we dispose the slot.
274+
// If we can't claim the slot back, that means another thread is dealing with it.
275+
if ( slot.availableToClaimed() )
276+
{
277+
dispose( slot );
278+
}
279+
}
247280
}
248281

249-
// Make sure the pool isn't being stopped in the middle of all these shenanigans
250-
if ( !stopped.get() )
282+
// If we are claimed by thread local we are already in the live queue
283+
if ( slot.threadLocalClaimedToAvailable() && stopped.get() )
251284
{
252-
// All good, as you were.
253-
live.add( slot );
254-
}
255-
else
256-
{
257-
// Another thread concurrently closing the pool may have started closing before we
258-
// set our slot to "available". In that case, the slot will not be disposed of by the closing thread
259-
// We mitigate this by trying to claim the slot back - if we are able to, we dispose the slot.
260-
// If we can't claim the slot back, that means another thread is dealing with it.
285+
// As above, try to claim the slot back and dispose
261286
if ( slot.availableToClaimed() )
262287
{
263288
dispose( slot );
264289
}
265290
}
291+
266292
}
267293
};
268294
}
@@ -293,6 +319,7 @@ class Slot<T>
293319
enum State
294320
{
295321
AVAILABLE,
322+
THREAD_LOCAL_CLAIMED,
296323
CLAIMED,
297324
DISPOSED
298325
}
@@ -304,13 +331,6 @@ enum State
304331
long lastUsed;
305332
T value;
306333

307-
public static <T> Slot<T> disposed( int index, Clock clock )
308-
{
309-
Slot<T> slot = new Slot<>( index, clock );
310-
slot.claimedToDisposed();
311-
return slot;
312-
}
313-
314334
/**
315335
* @param index the index into the {@link ThreadCachingPool#all all} array, used to re-use that slot when this is
316336
* disposed
@@ -332,6 +352,11 @@ public boolean availableToClaimed()
332352
return state.compareAndSet( State.AVAILABLE, State.CLAIMED );
333353
}
334354

355+
public boolean availableToThreadLocalClaimed()
356+
{
357+
return state.compareAndSet( State.AVAILABLE, State.THREAD_LOCAL_CLAIMED );
358+
}
359+
335360
public boolean claimedToAvailable()
336361
{
337362
updateUsageTimestamp();
@@ -343,6 +368,36 @@ public boolean claimedToDisposed()
343368
return state.compareAndSet( State.CLAIMED, State.DISPOSED );
344369
}
345370

371+
public boolean threadLocalClaimedToDisposed()
372+
{
373+
return state.compareAndSet( State.THREAD_LOCAL_CLAIMED, State.DISPOSED );
374+
}
375+
376+
public boolean threadLocalClaimedToClaimed()
377+
{
378+
return state.compareAndSet( State.THREAD_LOCAL_CLAIMED, State.CLAIMED );
379+
}
380+
381+
public boolean threadLocalClaimedToAvailable()
382+
{
383+
return state.compareAndSet( State.THREAD_LOCAL_CLAIMED, State.AVAILABLE );
384+
}
385+
386+
public void claimByThreadLocal()
387+
{
388+
state.set( State.THREAD_LOCAL_CLAIMED );
389+
}
390+
391+
public boolean isClaimedOrAvailableToClaimed()
392+
{
393+
return availableToClaimed() || state.get() == State.CLAIMED;
394+
}
395+
396+
public boolean disposed()
397+
{
398+
return state.get() == State.DISPOSED;
399+
}
400+
346401
public void updateUsageTimestamp()
347402
{
348403
lastUsed = clock.millis();

0 commit comments

Comments
 (0)