Skip to content

Avoid marking objects as both live and disposed #178

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 30, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,24 +94,25 @@ public ThreadCachingPool( int targetSize, Allocator<T> allocator, ValidationStra

public T acquire( long timeout, TimeUnit unit ) throws InterruptedException
{
assert live.size() <= maxSize;
long deadline = clock.millis() + unit.toMillis( timeout );

// 1. Try and value an object from our local slot
Slot<T> slot = local.get();

if ( slot != null && slot.availableToClaimed() )
if ( slot != null && slot.availableToThreadLocalClaimed() )
{
if ( slot.isValid( validationStrategy ) )
{
allocator.onAcquire( slot.value );
return slot.value;
}
else
{
// We've acquired the slot, but the validation strategy says it's time for it to die. Dispose of it,
// and go to the global pool.
else {
dispose( slot );
}

//The slot was invalidated however we cannot put it to the
//disposed queue yet since it already exists in the live queue
}

// 2. If that fails, acquire from big pool
Expand All @@ -133,17 +134,18 @@ private T acquireFromGlobal( long deadline ) throws InterruptedException
if ( slot != null )
{
// Yay, got a slot - can we keep it?
if ( slot.availableToClaimed() )
if ( slot.isValid( validationStrategy ) )
{
if ( slot.isValid( validationStrategy ) )
if ( slot.availableToClaimed() )
{
break;
}
else
{
// We've acquired the slot, but the validation strategy says it's time for it to die.
dispose( slot );
}
}
// We've acquired the slot, but the validation strategy says it's time for it to die.
// Either the slot is already claimed or if it is available make it claimed
else if ( slot.isClaimedOrAvailableToClaimed() )
{
dispose( slot );
}
}
else
Expand Down Expand Up @@ -179,22 +181,38 @@ private T acquireFromGlobal( long deadline ) throws InterruptedException

// Keep this slot cached with our thread, so that we can grab this value quickly next time,
// assuming threads generally availableToClaimed one instance at a time
local.set( slot );
updateThreadLocal( slot );
allocator.onAcquire( slot.value );
return slot.value;
}

private void dispose( Slot<T> slot )
private void updateThreadLocal(Slot<T> slot)
{
if ( !slot.claimedToDisposed() )
Slot<T> localSlot = local.get();
if ( localSlot != null )
{
throw new IllegalStateException( "Cannot dispose unclaimed pool object: " + slot );
//The old slot is no longer in the tread local
localSlot.threadLocalClaimedToClaimed();
}
else
{
//There was nothing stored in thread local
//no we must also add this slot to the live queue
live.add( slot );
}
slot.claimByThreadLocal();
local.set( slot );
}

// Done before below, in case dispose call fails. This is safe since objects on the
// pool are used for read-only operations
disposed.add( slot );
allocator.onDispose( slot.value );
private void dispose( Slot<T> slot )
{
if ( slot.claimedToDisposed() || slot.threadLocalClaimedToDisposed() )
{
// Done before below, in case dispose call fails. This is safe since objects on the
// pool are used for read-only operations
disposed.add( slot );
allocator.onDispose( slot.value );
}
}

/**
Expand All @@ -217,7 +235,7 @@ private Slot<T> allocate( int slotIndex )
// Return it :)
return slot;
}
catch( Neo4jException e )
catch ( Neo4jException e )
{
// Failed to allocate slot, return it to the list of disposed slots, rethrow exception.
slot.claimedToDisposed();
Expand All @@ -236,33 +254,41 @@ public void accept( T t )
slot.updateUsageTimestamp();
if ( !slot.isValid( validationStrategy ) )
{
// The value has for some reason become invalid, dispose of it
dispose( slot );
return;
}

if ( !slot.claimedToAvailable() )
if ( slot.claimedToAvailable() )
{
throw new IllegalStateException( "Failed to release pooled object: " + slot );
// Make sure the pool isn't being stopped in the middle of all these shenanigans
if ( !stopped.get() )
{
// All good, as you were.
live.add( slot );
}
else
{
// Another thread concurrently closing the pool may have started closing before we
// set our slot to "available". In that case, the slot will not be disposed of by the closing thread
// We mitigate this by trying to claim the slot back - if we are able to, we dispose the slot.
// If we can't claim the slot back, that means another thread is dealing with it.
if ( slot.availableToClaimed() )
{
dispose( slot );
}
}
}

// Make sure the pool isn't being stopped in the middle of all these shenanigans
if ( !stopped.get() )
// If we are claimed by thread local we are already in the live queue
if ( slot.threadLocalClaimedToAvailable() && stopped.get() )
{
// All good, as you were.
live.add( slot );
}
else
{
// Another thread concurrently closing the pool may have started closing before we
// set our slot to "available". In that case, the slot will not be disposed of by the closing thread
// We mitigate this by trying to claim the slot back - if we are able to, we dispose the slot.
// If we can't claim the slot back, that means another thread is dealing with it.
// As above, try to claim the slot back and dispose
if ( slot.availableToClaimed() )
{
dispose( slot );
}
}

}
};
}
Expand Down Expand Up @@ -293,6 +319,7 @@ class Slot<T>
enum State
{
AVAILABLE,
THREAD_LOCAL_CLAIMED,
CLAIMED,
DISPOSED
}
Expand All @@ -304,13 +331,6 @@ enum State
long lastUsed;
T value;

public static <T> Slot<T> disposed( int index, Clock clock )
{
Slot<T> slot = new Slot<>( index, clock );
slot.claimedToDisposed();
return slot;
}

/**
* @param index the index into the {@link ThreadCachingPool#all all} array, used to re-use that slot when this is
* disposed
Expand All @@ -332,6 +352,11 @@ public boolean availableToClaimed()
return state.compareAndSet( State.AVAILABLE, State.CLAIMED );
}

public boolean availableToThreadLocalClaimed()
{
return state.compareAndSet( State.AVAILABLE, State.THREAD_LOCAL_CLAIMED );
}

public boolean claimedToAvailable()
{
updateUsageTimestamp();
Expand All @@ -343,6 +368,36 @@ public boolean claimedToDisposed()
return state.compareAndSet( State.CLAIMED, State.DISPOSED );
}

public boolean threadLocalClaimedToDisposed()
{
return state.compareAndSet( State.THREAD_LOCAL_CLAIMED, State.DISPOSED );
}

public boolean threadLocalClaimedToClaimed()
{
return state.compareAndSet( State.THREAD_LOCAL_CLAIMED, State.CLAIMED );
}

public boolean threadLocalClaimedToAvailable()
{
return state.compareAndSet( State.THREAD_LOCAL_CLAIMED, State.AVAILABLE );
}

public void claimByThreadLocal()
{
state.set( State.THREAD_LOCAL_CLAIMED );
}

public boolean isClaimedOrAvailableToClaimed()
{
return availableToClaimed() || state.get() == State.CLAIMED;
}

public boolean disposed()
{
return state.get() == State.DISPOSED;
}

public void updateUsageTimestamp()
{
lastUsed = clock.millis();
Expand Down
Loading