diff --git a/driver/src/main/java/org/neo4j/driver/internal/pool/ThreadCachingPool.java b/driver/src/main/java/org/neo4j/driver/internal/pool/ThreadCachingPool.java index 03e786e6e0..cfdc2a8c22 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/pool/ThreadCachingPool.java +++ b/driver/src/main/java/org/neo4j/driver/internal/pool/ThreadCachingPool.java @@ -94,24 +94,25 @@ public ThreadCachingPool( int targetSize, Allocator allocator, ValidationStra public T acquire( long timeout, TimeUnit unit ) throws InterruptedException { + assert live.size() <= maxSize; long deadline = clock.millis() + unit.toMillis( timeout ); // 1. Try and value an object from our local slot Slot slot = local.get(); - if ( slot != null && slot.availableToClaimed() ) + if ( slot != null && slot.availableToThreadLocalClaimed() ) { if ( slot.isValid( validationStrategy ) ) { allocator.onAcquire( slot.value ); return slot.value; } - else - { - // We've acquired the slot, but the validation strategy says it's time for it to die. Dispose of it, - // and go to the global pool. + else { dispose( slot ); } + + //The slot was invalidated however we cannot put it to the + //disposed queue yet since it already exists in the live queue } // 2. If that fails, acquire from big pool @@ -133,17 +134,18 @@ private T acquireFromGlobal( long deadline ) throws InterruptedException if ( slot != null ) { // Yay, got a slot - can we keep it? - if ( slot.availableToClaimed() ) + if ( slot.isValid( validationStrategy ) ) { - if ( slot.isValid( validationStrategy ) ) + if ( slot.availableToClaimed() ) { break; } - else - { - // We've acquired the slot, but the validation strategy says it's time for it to die. - dispose( slot ); - } + } + // We've acquired the slot, but the validation strategy says it's time for it to die. + // Either the slot is already claimed or if it is available make it claimed + else if ( slot.isClaimedOrAvailableToClaimed() ) + { + dispose( slot ); } } else @@ -179,22 +181,38 @@ private T acquireFromGlobal( long deadline ) throws InterruptedException // Keep this slot cached with our thread, so that we can grab this value quickly next time, // assuming threads generally availableToClaimed one instance at a time - local.set( slot ); + updateThreadLocal( slot ); allocator.onAcquire( slot.value ); return slot.value; } - private void dispose( Slot slot ) + private void updateThreadLocal(Slot slot) { - if ( !slot.claimedToDisposed() ) + Slot localSlot = local.get(); + if ( localSlot != null ) { - throw new IllegalStateException( "Cannot dispose unclaimed pool object: " + slot ); + //The old slot is no longer in the tread local + localSlot.threadLocalClaimedToClaimed(); } + else + { + //There was nothing stored in thread local + //no we must also add this slot to the live queue + live.add( slot ); + } + slot.claimByThreadLocal(); + local.set( slot ); + } - // Done before below, in case dispose call fails. This is safe since objects on the - // pool are used for read-only operations - disposed.add( slot ); - allocator.onDispose( slot.value ); + private void dispose( Slot slot ) + { + if ( slot.claimedToDisposed() || slot.threadLocalClaimedToDisposed() ) + { + // Done before below, in case dispose call fails. This is safe since objects on the + // pool are used for read-only operations + disposed.add( slot ); + allocator.onDispose( slot.value ); + } } /** @@ -217,7 +235,7 @@ private Slot allocate( int slotIndex ) // Return it :) return slot; } - catch( Neo4jException e ) + catch ( Neo4jException e ) { // Failed to allocate slot, return it to the list of disposed slots, rethrow exception. slot.claimedToDisposed(); @@ -236,33 +254,41 @@ public void accept( T t ) slot.updateUsageTimestamp(); if ( !slot.isValid( validationStrategy ) ) { - // The value has for some reason become invalid, dispose of it dispose( slot ); return; } - if ( !slot.claimedToAvailable() ) + if ( slot.claimedToAvailable() ) { - throw new IllegalStateException( "Failed to release pooled object: " + slot ); + // Make sure the pool isn't being stopped in the middle of all these shenanigans + if ( !stopped.get() ) + { + // All good, as you were. + live.add( slot ); + } + else + { + // Another thread concurrently closing the pool may have started closing before we + // set our slot to "available". In that case, the slot will not be disposed of by the closing thread + // We mitigate this by trying to claim the slot back - if we are able to, we dispose the slot. + // If we can't claim the slot back, that means another thread is dealing with it. + if ( slot.availableToClaimed() ) + { + dispose( slot ); + } + } } - // Make sure the pool isn't being stopped in the middle of all these shenanigans - if ( !stopped.get() ) + // If we are claimed by thread local we are already in the live queue + if ( slot.threadLocalClaimedToAvailable() && stopped.get() ) { - // All good, as you were. - live.add( slot ); - } - else - { - // Another thread concurrently closing the pool may have started closing before we - // set our slot to "available". In that case, the slot will not be disposed of by the closing thread - // We mitigate this by trying to claim the slot back - if we are able to, we dispose the slot. - // If we can't claim the slot back, that means another thread is dealing with it. + // As above, try to claim the slot back and dispose if ( slot.availableToClaimed() ) { dispose( slot ); } } + } }; } @@ -293,6 +319,7 @@ class Slot enum State { AVAILABLE, + THREAD_LOCAL_CLAIMED, CLAIMED, DISPOSED } @@ -304,13 +331,6 @@ enum State long lastUsed; T value; - public static Slot disposed( int index, Clock clock ) - { - Slot slot = new Slot<>( index, clock ); - slot.claimedToDisposed(); - return slot; - } - /** * @param index the index into the {@link ThreadCachingPool#all all} array, used to re-use that slot when this is * disposed @@ -332,6 +352,11 @@ public boolean availableToClaimed() return state.compareAndSet( State.AVAILABLE, State.CLAIMED ); } + public boolean availableToThreadLocalClaimed() + { + return state.compareAndSet( State.AVAILABLE, State.THREAD_LOCAL_CLAIMED ); + } + public boolean claimedToAvailable() { updateUsageTimestamp(); @@ -343,6 +368,36 @@ public boolean claimedToDisposed() return state.compareAndSet( State.CLAIMED, State.DISPOSED ); } + public boolean threadLocalClaimedToDisposed() + { + return state.compareAndSet( State.THREAD_LOCAL_CLAIMED, State.DISPOSED ); + } + + public boolean threadLocalClaimedToClaimed() + { + return state.compareAndSet( State.THREAD_LOCAL_CLAIMED, State.CLAIMED ); + } + + public boolean threadLocalClaimedToAvailable() + { + return state.compareAndSet( State.THREAD_LOCAL_CLAIMED, State.AVAILABLE ); + } + + public void claimByThreadLocal() + { + state.set( State.THREAD_LOCAL_CLAIMED ); + } + + public boolean isClaimedOrAvailableToClaimed() + { + return availableToClaimed() || state.get() == State.CLAIMED; + } + + public boolean disposed() + { + return state.get() == State.DISPOSED; + } + public void updateUsageTimestamp() { lastUsed = clock.millis(); diff --git a/driver/src/test/java/org/neo4j/driver/internal/pool/ThreadCachingPoolTest.java b/driver/src/test/java/org/neo4j/driver/internal/pool/ThreadCachingPoolTest.java index 9405b8058a..bbfcb06309 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/pool/ThreadCachingPoolTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/pool/ThreadCachingPoolTest.java @@ -18,33 +18,45 @@ */ package org.neo4j.driver.internal.pool; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.reflect.Field; import java.util.Arrays; import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; - import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.Consumer; import org.neo4j.driver.v1.exceptions.ClientException; import static junit.framework.TestCase.fail; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class ThreadCachingPoolTest { private final List inUse = new LinkedList<>(); private final List inPool = new LinkedList<>(); private final List disposed = new LinkedList<>(); - + private final MethodHandle liveQueueGet = queueGetter( "live" ); + private final MethodHandle disposedQueueGet = queueGetter( "disposed" ); + private final ExecutorService executor = Executors.newFixedThreadPool( 10 ); private static AtomicInteger IDGEN = new AtomicInteger(); @Rule @@ -127,7 +139,7 @@ public void shouldDisposeOfInvalidItems() throws Throwable { // Given ThreadCachingPool - pool = new ThreadCachingPool<>( 4, trackAllocator, invalidIfIdIs(0), Clock.SYSTEM ); + pool = new ThreadCachingPool<>( 4, trackAllocator, invalidIfIdIs( 0 ), Clock.SYSTEM ); // And given we've allocated/releasd object with id 0 once (no validation on first allocation) // TODO: Is that the right thing to do? I assume the allocator will allocate healthy objects.. @@ -137,8 +149,8 @@ public void shouldDisposeOfInvalidItems() throws Throwable pool.acquire( 10, TimeUnit.SECONDS ); // Then object with id 0 should've been disposed of, and we should have one live object with id 1 - assertThat( inPool, equalTo( none() ) ); - assertThat( inUse, equalTo( items( 1 ) ) ); + assertThat( inPool, equalTo( none() ) ); + assertThat( inUse, equalTo( items( 1 ) ) ); assertThat( disposed, equalTo( items( 0 ) ) ); } @@ -171,8 +183,8 @@ public void shouldDisposeOfObjectsThatBecomeInvalidWhileInUse() throws Throwable val.invalidate().release(); // Then - assertThat( inPool, equalTo( none() ) ); - assertThat( inUse, equalTo( none() ) ); + assertThat( inPool, equalTo( none() ) ); + assertThat( inUse, equalTo( none() ) ); assertThat( disposed, equalTo( items( val ) ) ); } @@ -191,9 +203,9 @@ public void shouldRecoverFromItemCreationFailure() throws Throwable try { pool.acquire( 10, TimeUnit.SECONDS ); - fail("Should not succeed at allocating any item here."); + fail( "Should not succeed at allocating any item here." ); } - catch( ClientException e ) + catch ( ClientException e ) { // Expected } @@ -207,8 +219,8 @@ public void shouldRecoverFromItemCreationFailure() throws Throwable { pool.acquire( 10, TimeUnit.SECONDS ); } - assertThat( inPool, equalTo( none() ) ); - assertThat( inUse, equalTo( items( 0, 1, 2, 3 ) ) ); + assertThat( inPool, equalTo( none() ) ); + assertThat( inUse, equalTo( items( 0, 1, 2, 3 ) ) ); assertThat( disposed, equalTo( none() ) ); // because allocation fails, onDispose is not called } @@ -256,10 +268,126 @@ public void shouldRecovedDisposedItemReallocationFailing() throws Throwable assertThat( inPool, equalTo( none() ) ); assertThat( inUse, equalTo( items( 2, 3 ) ) ); // only the first two items value onDispose called, since allocation fails after that - assertThat( disposed, equalTo( items( 0, 1) ) ); + assertThat( disposed, equalTo( items( 0, 1 ) ) ); + } + + @SuppressWarnings( "unchecked" ) + @Test + public void shouldNotHaveReferenceAsBothLiveAndDisposed() throws Throwable + { + // Given + final ThreadCachingPool + pool = new ThreadCachingPool<>( 4, trackAllocator, checkInvalidateFlag, Clock.SYSTEM ); + + // This object will be cached in ThreadLocal + final PooledObject obj1 = pool.acquire( 10, TimeUnit.SECONDS ); + + //This will add another object to the live queue + assertTrue( acquireInSeparateThread( pool ) ); + + //Now we release the first object, meaning that it will be added + //to the live queue (as well as being cached as ThreadLocal in this thread) + obj1.release(); + //Now we invalidate the object + obj1.invalidate(); + + // When + //Now the cached object is invalidated, we should now pick the object + //from the live objects created in the background thread + PooledObject obj2 = pool.acquire( 10, TimeUnit.SECONDS ); + + //THEN + assertThat( obj1.id, equalTo( 0 ) ); + assertThat( obj2.id, equalTo( 1 ) ); + BlockingQueue> liveQueue = (BlockingQueue>) liveQueueGet.invoke( pool ); + BlockingQueue> disposedQueue = + (BlockingQueue>) disposedQueueGet.invoke( pool ); + + assertThat( disposedQueue, hasSize( 1 ) ); + assertThat( disposedQueue.poll( 10, TimeUnit.SECONDS ).value.id, equalTo( 0 ) ); + assertThat( liveQueue, empty() ); + } + + @SuppressWarnings( "unchecked" ) + @Test + public void shouldNotAddToLiveQueueTwice() throws Throwable + { + // Given + ThreadCachingPool + pool = new ThreadCachingPool<>( 4, trackAllocator, checkInvalidateFlag, Clock.SYSTEM ); + + // When + PooledObject o1 = pool.acquire( 10, TimeUnit.SECONDS ); + o1.release(); + PooledObject o2 = pool.acquire( 10, TimeUnit.SECONDS ); + o2.release(); + + // Then + BlockingQueue> liveQueue = (BlockingQueue>) liveQueueGet.invoke( pool ); + assertThat(liveQueue, hasSize( 1 )); + } + + @SuppressWarnings( "unchecked" ) + @Test + public void shouldNotAddToLiveQueueTwice2() throws Throwable + { + // Given + ThreadCachingPool + pool = new ThreadCachingPool<>( 4, trackAllocator, checkInvalidateFlag, Clock.SYSTEM ); + PooledObject o1 = pool.acquire( 10, TimeUnit.SECONDS ); + PooledObject o2 = pool.acquire( 10, TimeUnit.SECONDS ); + + // When + o1.release(); + o2.release(); + + // Then + BlockingQueue> liveQueue = (BlockingQueue>) liveQueueGet.invoke( pool ); + assertThat(liveQueue, hasSize( 1 )); + } + + private boolean acquireInSeparateThread( final ThreadCachingPool pool ) throws InterruptedException + { + final AtomicBoolean succeeded = new AtomicBoolean( true ); + executor.execute( new Runnable() + { + @Override + public void run() + { + try + { + PooledObject obj = pool.acquire( 10, TimeUnit.SECONDS ); + obj.release(); + } + catch ( InterruptedException e ) + { + succeeded.set( false ); + } + } + } ); + executor.awaitTermination( 2, TimeUnit.SECONDS ); + return succeeded.get(); + } + + + //This is terrible hack, but I really want to keep the queues private in + //ThreadCachingPool + private static MethodHandle queueGetter( String name ) + { + try + { + MethodHandles.Lookup lookup = MethodHandles.lookup(); + Field value = ThreadCachingPool.class.getDeclaredField( name ); + value.setAccessible( true ); + return lookup.unreflectGetter( value ); + } + catch ( NoSuchFieldException | IllegalAccessException e ) + { + throw new AssertionError( e ); + } } - private List items( int ... objects ) + private List items( int... objects ) { List out = new LinkedList<>(); for ( int id : objects ) @@ -269,9 +397,9 @@ private List items( int ... objects ) return out; } - private List items( PooledObject ... objects ) + private List items( PooledObject... objects ) { - return Arrays.asList(objects); + return Arrays.asList( objects ); } private List none() @@ -305,7 +433,7 @@ private class PooledObject public PooledObject( Consumer release ) { - this(IDGEN.getAndIncrement(), release); + this( IDGEN.getAndIncrement(), release ); } public PooledObject( int id, Consumer release ) @@ -362,7 +490,7 @@ private class TestAllocator implements Allocator @Override public PooledObject allocate( Consumer release ) { - if( creationException != null ) + if ( creationException != null ) { throw creationException; } diff --git a/driver/src/test/java/org/neo4j/driver/v1/stress/SessionPoolingStressIT.java b/driver/src/test/java/org/neo4j/driver/v1/stress/SessionPoolingStressIT.java new file mode 100644 index 0000000000..fccaf55a13 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/v1/stress/SessionPoolingStressIT.java @@ -0,0 +1,120 @@ +/** + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.v1.stress; + +import org.junit.Rule; +import org.junit.Test; + +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.Session; +import org.neo4j.driver.v1.StatementResult; +import org.neo4j.driver.v1.util.TestNeo4j; + +import static java.util.Arrays.asList; +import static org.neo4j.driver.v1.GraphDatabase.driver; + +public class SessionPoolingStressIT +{ + @Rule + public TestNeo4j neo4j = new TestNeo4j(); + + private static final int N_THREADS = 10; + private final ExecutorService executor = Executors.newFixedThreadPool( N_THREADS ); + private static final List QUERIES = asList( "RETURN 1295 + 42", "UNWIND range(1,10000) AS x CREATE (n {prop:x}) DELETE n " ); + private static final int MAX_TIME = 10000; + private final AtomicBoolean hasFailed = new AtomicBoolean( false ); + + + @Test + public void shouldWorkFine() throws InterruptedException + { + Driver driver = driver( neo4j.address(), + Config.build() + .withEncryptionLevel( Config.EncryptionLevel.NONE ) + .withMaxSessions( N_THREADS + 1 ).toConfig() ); + + doWork( driver ); + executor.awaitTermination( MAX_TIME + (int)(MAX_TIME * 0.2), TimeUnit.MILLISECONDS ); + } + + private void doWork( final Driver driver ) + { + for ( int i = 0; i < N_THREADS; i++ ) + { + executor.execute( new Worker( driver ) ); + } + } + + private class Worker implements Runnable + { + private final Random random = ThreadLocalRandom.current(); + private final Driver driver; + + public Worker( Driver driver ) + { + this.driver = driver; + } + + @Override + public void run() + { + try + { + long deadline = System.currentTimeMillis() + MAX_TIME; + for (;;) + { + for ( String query : QUERIES ) + { + runQuery( query ); + } + long left = deadline - System.currentTimeMillis(); + if ( left <= 0 ) + { + break; + } + } + } + catch ( Throwable e ) + { + e.printStackTrace(); + hasFailed.set( true ); + } + } + + private void runQuery( String query ) throws InterruptedException + { + try ( Session session = driver.session() ) + { + StatementResult run = session.run( query ); + Thread.sleep( random.nextInt( 100 ) ); + run.consume(); + Thread.sleep( random.nextInt( 100 ) ); + } + } + } +} diff --git a/driver/src/test/java/org/neo4j/driver/v1/stress/ThreadCachingPoolStressTest.java b/driver/src/test/java/org/neo4j/driver/v1/stress/ThreadCachingPoolStressTest.java new file mode 100644 index 0000000000..c52fbf45a4 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/v1/stress/ThreadCachingPoolStressTest.java @@ -0,0 +1,174 @@ +/** + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.v1.stress; + +import org.junit.Test; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.neo4j.driver.internal.pool.Allocator; +import org.neo4j.driver.internal.pool.ThreadCachingPool; +import org.neo4j.driver.internal.pool.ValidationStrategy; +import org.neo4j.driver.internal.util.Clock; +import org.neo4j.driver.internal.util.Consumer; +import org.neo4j.driver.v1.exceptions.Neo4jException; + +import static org.junit.Assert.assertFalse; + +public class ThreadCachingPoolStressTest +{ + private static final int WORKER_THREADS = 10; + public static final long TOTAL_MAX_TIME = 10000L; + private final ExecutorService executor = Executors.newFixedThreadPool( WORKER_THREADS ); + private final AtomicBoolean hasFailed = new AtomicBoolean( false ); + + @Test + public void shouldWorkFine() throws InterruptedException + { + // Given + ThreadCachingPool + pool = + new ThreadCachingPool<>( WORKER_THREADS, new TestAllocator(), checkInvalidateFlag, Clock.SYSTEM ); + // When + doStuffInTheBackground( pool ); + executor.awaitTermination( TOTAL_MAX_TIME, TimeUnit.MILLISECONDS ); + // Then + + assertFalse( hasFailed.get() ); + } + + private void doStuffInTheBackground( final ThreadCachingPool pool ) + { + for ( int i = 0; i < WORKER_THREADS; i++ ) + { + executor.execute( new Worker( pool ) ); + } + } + + private class PooledObject + { + private boolean valid = true; + private final Consumer release; + + private PooledObject( Consumer release ) + { + this.release = release; + } + + void close() + { + release.accept( this ); + } + + public void invalidate() + { + this.valid = false; + } + } + + private class TestAllocator implements Allocator + { + + @Override + public PooledObject allocate( Consumer release ) throws Neo4jException + { + return new PooledObject( release ); + } + + @Override + public void onDispose( PooledObject o ) + { + + } + + @Override + public void onAcquire( PooledObject o ) + { + + } + } + + private final ValidationStrategy checkInvalidateFlag = new ValidationStrategy() + { + @Override + public boolean isValid( PooledObject value, long idleTime ) + { + return value.valid; + } + }; + + private class Worker implements Runnable + { + private final ThreadLocalRandom random; + private final double probabilityToRelease; + private final double probabilityToInvalidate; + private final ThreadCachingPool pool; + private final long timeToRun; + + public Worker( ThreadCachingPool pool ) + { + this.pool = pool; + this.random = ThreadLocalRandom.current(); + this.probabilityToRelease = 0.5; + this.probabilityToInvalidate = 0.5; + this.timeToRun = random.nextLong( TOTAL_MAX_TIME ); + } + + @Override + public void run() + { + try + { + long deadline = timeToRun + System.currentTimeMillis(); + for (; ; ) + { + PooledObject object = pool.acquire( random.nextInt( 1 ), TimeUnit.SECONDS ); + if ( object != null ) + { + + + Thread.sleep( random.nextInt( 100 ) ); + object.close(); + if ( random.nextDouble() < probabilityToInvalidate ) + { + Thread.sleep( random.nextInt( 100 ) ); + object.invalidate(); + } + } + Thread.sleep( random.nextInt( 100 ) ); + + long timeLeft = deadline - System.currentTimeMillis(); + if ( timeLeft <= 0 ) + { + break; + } + } + } + catch ( Throwable e ) + { + e.printStackTrace(); + hasFailed.set( true ); + } + } + } +}