25
25
26
26
import java .util .Objects ;
27
27
import java .util .concurrent .*;
28
+ import java .util .concurrent .locks .Lock ;
28
29
import java .util .concurrent .locks .LockSupport ;
30
+ import java .util .concurrent .locks .ReentrantLock ;
29
31
import java .util .function .Consumer ;
30
32
31
33
/**
@@ -45,7 +47,7 @@ final class FutureImpl<T> implements Future<T> {
45
47
/**
46
48
* Used to synchronize state changes.
47
49
*/
48
- private final Object lock = new Object () ;
50
+ private final Lock lock ;
49
51
50
52
/**
51
53
* Indicates if this Future is cancelled
@@ -87,8 +89,10 @@ final class FutureImpl<T> implements Future<T> {
87
89
88
90
// single constructor
89
91
private FutureImpl (Executor executor , Option <Try <T >> value , Queue <Consumer <Try <T >>> actions , Queue <Thread > waiters , Computation <T > computation ) {
92
+ this .lock = new ReentrantLock ();
90
93
this .executor = executor ;
91
- synchronized (lock ) {
94
+ lock .lock ();
95
+ try {
92
96
this .cancelled = false ;
93
97
this .value = value ;
94
98
this .actions = actions ;
@@ -98,6 +102,8 @@ private FutureImpl(Executor executor, Option<Try<T>> value, Queue<Consumer<Try<T
98
102
} catch (Throwable x ) {
99
103
tryComplete (Try .failure (x ));
100
104
}
105
+ } finally {
106
+ lock .unlock ();
101
107
}
102
108
}
103
109
@@ -219,8 +225,11 @@ private void _await(long start, long timeout, TimeUnit unit) {
219
225
public boolean block () {
220
226
try {
221
227
if (!threadEnqueued ) {
222
- synchronized (lock ) {
228
+ lock .lock ();
229
+ try {
223
230
waiters = waiters .enqueue (waitingThread );
231
+ } finally {
232
+ lock .unlock ();
224
233
}
225
234
threadEnqueued = true ;
226
235
}
@@ -256,14 +265,17 @@ public boolean isReleasable() {
256
265
@ Override
257
266
public boolean cancel (boolean mayInterruptIfRunning ) {
258
267
if (!isCompleted ()) {
259
- synchronized (lock ) {
268
+ lock .lock ();
269
+ try {
260
270
if (!isCompleted ()) {
261
271
if (mayInterruptIfRunning && this .thread != null ) {
262
272
this .thread .interrupt ();
263
273
}
264
274
this .cancelled = tryComplete (Try .failure (new CancellationException ()));
265
275
return this .cancelled ;
266
276
}
277
+ } finally {
278
+ lock .unlock ();
267
279
}
268
280
}
269
281
return false ;
@@ -272,7 +284,8 @@ public boolean cancel(boolean mayInterruptIfRunning) {
272
284
private void updateThread () {
273
285
// cancellation may have been initiated by a different thread before this.thread is set by the worker thread
274
286
if (!isCompleted ()) {
275
- synchronized (lock ) {
287
+ lock .lock ();
288
+ try {
276
289
if (!isCompleted ()) {
277
290
this .thread = Thread .currentThread ();
278
291
try {
@@ -281,6 +294,8 @@ private void updateThread() {
281
294
// we are not allowed to set the uncaught exception handler of the worker thread ¯\_(ツ)_/¯
282
295
}
283
296
}
297
+ } finally {
298
+ lock .unlock ();
284
299
}
285
300
}
286
301
}
@@ -322,12 +337,15 @@ public Future<T> onComplete(Consumer<? super Try<T>> action) {
322
337
if (isCompleted ()) {
323
338
perform (action );
324
339
} else {
325
- synchronized (lock ) {
340
+ lock .lock ();
341
+ try {
326
342
if (isCompleted ()) {
327
343
perform (action );
328
344
} else {
329
345
actions = actions .enqueue ((Consumer <Try <T >>) action );
330
346
}
347
+ } finally {
348
+ lock .unlock ();
331
349
}
332
350
}
333
351
return this ;
@@ -362,7 +380,8 @@ boolean tryComplete(Try<? extends T> value) {
362
380
final Queue <Consumer <Try <T >>> actions ;
363
381
final Queue <Thread > waiters ;
364
382
// it is essential to make the completed state public *before* performing the actions
365
- synchronized (lock ) {
383
+ lock .lock ();
384
+ try {
366
385
if (isCompleted ()) {
367
386
actions = null ;
368
387
waiters = null ;
@@ -374,6 +393,8 @@ boolean tryComplete(Try<? extends T> value) {
374
393
this .waiters = null ;
375
394
this .thread = null ;
376
395
}
396
+ } finally {
397
+ lock .unlock ();
377
398
}
378
399
if (waiters != null ) {
379
400
waiters .forEach (this ::unlock );
0 commit comments