@@ -94,25 +94,24 @@ public ThreadCachingPool( int targetSize, Allocator<T> allocator, ValidationStra
94
94
95
95
public T acquire ( long timeout , TimeUnit unit ) throws InterruptedException
96
96
{
97
- assert live .size () <= maxSize ;
98
97
long deadline = clock .millis () + unit .toMillis ( timeout );
99
98
100
99
// 1. Try and value an object from our local slot
101
100
Slot <T > slot = local .get ();
102
101
103
- if ( slot != null && slot .availableToThreadLocalClaimed () )
102
+ if ( slot != null && slot .availableToClaimed () )
104
103
{
105
104
if ( slot .isValid ( validationStrategy ) )
106
105
{
107
106
allocator .onAcquire ( slot .value );
108
107
return slot .value ;
109
108
}
110
- else {
109
+ else
110
+ {
111
+ // We've acquired the slot, but the validation strategy says it's time for it to die. Dispose of it,
112
+ // and go to the global pool.
111
113
dispose ( slot );
112
114
}
113
-
114
- //The slot was invalidated however we cannot put it to the
115
- //disposed queue yet since it already exists in the live queue
116
115
}
117
116
118
117
// 2. If that fails, acquire from big pool
@@ -134,18 +133,17 @@ private T acquireFromGlobal( long deadline ) throws InterruptedException
134
133
if ( slot != null )
135
134
{
136
135
// Yay, got a slot - can we keep it?
137
- if ( slot .isValid ( validationStrategy ) )
136
+ if ( slot .availableToClaimed ( ) )
138
137
{
139
- if ( slot .availableToClaimed ( ) )
138
+ if ( slot .isValid ( validationStrategy ) )
140
139
{
141
140
break ;
142
141
}
143
- }
144
- // We've acquired the slot, but the validation strategy says it's time for it to die.
145
- // Either the slot is already claimed or if it is available make it claimed
146
- else if ( slot .isClaimedOrAvailableToClaimed () )
147
- {
148
- dispose ( slot );
142
+ else
143
+ {
144
+ // We've acquired the slot, but the validation strategy says it's time for it to die.
145
+ dispose ( slot );
146
+ }
149
147
}
150
148
}
151
149
else
@@ -181,38 +179,22 @@ else if ( slot.isClaimedOrAvailableToClaimed() )
181
179
182
180
// Keep this slot cached with our thread, so that we can grab this value quickly next time,
183
181
// assuming threads generally availableToClaimed one instance at a time
184
- updateThreadLocal ( slot );
182
+ local . set ( slot );
185
183
allocator .onAcquire ( slot .value );
186
184
return slot .value ;
187
185
}
188
186
189
- private void updateThreadLocal (Slot <T > slot )
190
- {
191
- Slot <T > localSlot = local .get ();
192
- if ( localSlot != null )
193
- {
194
- //The old slot is no longer in the tread local
195
- localSlot .threadLocalClaimedToClaimed ();
196
- }
197
- else
198
- {
199
- //There was nothing stored in thread local
200
- //no we must also add this slot to the live queue
201
- live .add ( slot );
202
- }
203
- slot .claimByThreadLocal ();
204
- local .set ( slot );
205
- }
206
-
207
187
private void dispose ( Slot <T > slot )
208
188
{
209
- if ( slot .claimedToDisposed () || slot . threadLocalClaimedToDisposed () )
189
+ if ( ! slot .claimedToDisposed () )
210
190
{
211
- // Done before below, in case dispose call fails. This is safe since objects on the
212
- // pool are used for read-only operations
213
- disposed .add ( slot );
214
- allocator .onDispose ( slot .value );
191
+ throw new IllegalStateException ( "Cannot dispose unclaimed pool object: " + slot );
215
192
}
193
+
194
+ // Done before below, in case dispose call fails. This is safe since objects on the
195
+ // pool are used for read-only operations
196
+ disposed .add ( slot );
197
+ allocator .onDispose ( slot .value );
216
198
}
217
199
218
200
/**
@@ -235,7 +217,7 @@ private Slot<T> allocate( int slotIndex )
235
217
// Return it :)
236
218
return slot ;
237
219
}
238
- catch ( Neo4jException e )
220
+ catch ( Neo4jException e )
239
221
{
240
222
// Failed to allocate slot, return it to the list of disposed slots, rethrow exception.
241
223
slot .claimedToDisposed ();
@@ -254,41 +236,33 @@ public void accept( T t )
254
236
slot .updateUsageTimestamp ();
255
237
if ( !slot .isValid ( validationStrategy ) )
256
238
{
239
+ // The value has for some reason become invalid, dispose of it
257
240
dispose ( slot );
258
241
return ;
259
242
}
260
243
261
- if ( slot .claimedToAvailable () )
244
+ if ( ! slot .claimedToAvailable () )
262
245
{
263
- // Make sure the pool isn't being stopped in the middle of all these shenanigans
264
- if ( !stopped .get () )
265
- {
266
- // All good, as you were.
267
- live .add ( slot );
268
- }
269
- else
270
- {
271
- // Another thread concurrently closing the pool may have started closing before we
272
- // set our slot to "available". In that case, the slot will not be disposed of by the closing thread
273
- // We mitigate this by trying to claim the slot back - if we are able to, we dispose the slot.
274
- // If we can't claim the slot back, that means another thread is dealing with it.
275
- if ( slot .availableToClaimed () )
276
- {
277
- dispose ( slot );
278
- }
279
- }
246
+ throw new IllegalStateException ( "Failed to release pooled object: " + slot );
280
247
}
281
248
282
- // If we are claimed by thread local we are already in the live queue
283
- if ( slot . threadLocalClaimedToAvailable () && stopped .get () )
249
+ // Make sure the pool isn't being stopped in the middle of all these shenanigans
250
+ if ( ! stopped .get () )
284
251
{
285
- // As above, try to claim the slot back and dispose
252
+ // All good, as you were.
253
+ live .add ( slot );
254
+ }
255
+ else
256
+ {
257
+ // Another thread concurrently closing the pool may have started closing before we
258
+ // set our slot to "available". In that case, the slot will not be disposed of by the closing thread
259
+ // We mitigate this by trying to claim the slot back - if we are able to, we dispose the slot.
260
+ // If we can't claim the slot back, that means another thread is dealing with it.
286
261
if ( slot .availableToClaimed () )
287
262
{
288
263
dispose ( slot );
289
264
}
290
265
}
291
-
292
266
}
293
267
};
294
268
}
@@ -319,7 +293,6 @@ class Slot<T>
319
293
enum State
320
294
{
321
295
AVAILABLE ,
322
- THREAD_LOCAL_CLAIMED ,
323
296
CLAIMED ,
324
297
DISPOSED
325
298
}
@@ -331,6 +304,13 @@ enum State
331
304
long lastUsed ;
332
305
T value ;
333
306
307
+ public static <T > Slot <T > disposed ( int index , Clock clock )
308
+ {
309
+ Slot <T > slot = new Slot <>( index , clock );
310
+ slot .claimedToDisposed ();
311
+ return slot ;
312
+ }
313
+
334
314
/**
335
315
* @param index the index into the {@link ThreadCachingPool#all all} array, used to re-use that slot when this is
336
316
* disposed
@@ -352,11 +332,6 @@ public boolean availableToClaimed()
352
332
return state .compareAndSet ( State .AVAILABLE , State .CLAIMED );
353
333
}
354
334
355
- public boolean availableToThreadLocalClaimed ()
356
- {
357
- return state .compareAndSet ( State .AVAILABLE , State .THREAD_LOCAL_CLAIMED );
358
- }
359
-
360
335
public boolean claimedToAvailable ()
361
336
{
362
337
updateUsageTimestamp ();
@@ -368,36 +343,6 @@ public boolean claimedToDisposed()
368
343
return state .compareAndSet ( State .CLAIMED , State .DISPOSED );
369
344
}
370
345
371
- public boolean threadLocalClaimedToDisposed ()
372
- {
373
- return state .compareAndSet ( State .THREAD_LOCAL_CLAIMED , State .DISPOSED );
374
- }
375
-
376
- public boolean threadLocalClaimedToClaimed ()
377
- {
378
- return state .compareAndSet ( State .THREAD_LOCAL_CLAIMED , State .CLAIMED );
379
- }
380
-
381
- public boolean threadLocalClaimedToAvailable ()
382
- {
383
- return state .compareAndSet ( State .THREAD_LOCAL_CLAIMED , State .AVAILABLE );
384
- }
385
-
386
- public void claimByThreadLocal ()
387
- {
388
- state .set ( State .THREAD_LOCAL_CLAIMED );
389
- }
390
-
391
- public boolean isClaimedOrAvailableToClaimed ()
392
- {
393
- return availableToClaimed () || state .get () == State .CLAIMED ;
394
- }
395
-
396
- public boolean disposed ()
397
- {
398
- return state .get () == State .DISPOSED ;
399
- }
400
-
401
346
public void updateUsageTimestamp ()
402
347
{
403
348
lastUsed = clock .millis ();
0 commit comments