-
Notifications
You must be signed in to change notification settings - Fork 43
/
Copy pathDelegatingIdlingResourceScheduler.java
199 lines (170 loc) · 6.44 KB
/
DelegatingIdlingResourceScheduler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
package com.squareup.rx3.idler;
import androidx.annotation.Nullable;
import androidx.annotation.RestrictTo;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.disposables.CompositeDisposable;
import io.reactivex.rxjava3.disposables.Disposable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static androidx.annotation.RestrictTo.Scope.LIBRARY;
@RestrictTo(LIBRARY)
final class DelegatingIdlingResourceScheduler extends IdlingResourceScheduler {
private final Scheduler delegate;
private final String name;
private final AtomicInteger work = new AtomicInteger();
@Nullable private ResourceCallback callback;
DelegatingIdlingResourceScheduler(Scheduler delegate, String name) {
this.delegate = delegate;
this.name = name;
}
@Override public String getName() {
return name;
}
@Override public boolean isIdleNow() {
return work.get() == 0;
}
@Override public void registerIdleTransitionCallback(ResourceCallback callback) {
this.callback = callback;
}
@Override public Worker createWorker() {
final Worker delegateWorker = delegate.createWorker();
return new Worker() {
private final CompositeDisposable disposables = new CompositeDisposable(delegateWorker);
@Override public Disposable schedule(Runnable action) {
if (disposables.isDisposed()) {
return Disposable.disposed();
}
ScheduledWork work = createWork(action, 0L, 0L);
Disposable disposable = delegateWorker.schedule(work);
ScheduledWorkDisposable workDisposable = new ScheduledWorkDisposable(work, disposable);
disposables.add(workDisposable);
return workDisposable;
}
@Override public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) {
if (disposables.isDisposed()) {
return Disposable.disposed();
}
ScheduledWork work = createWork(action, delayTime, 0L);
Disposable disposable = delegateWorker.schedule(work, delayTime, unit);
disposables.add(disposable);
ScheduledWorkDisposable workDisposable = new ScheduledWorkDisposable(work, disposable);
disposables.add(workDisposable);
return workDisposable;
}
@Override
public Disposable schedulePeriodically(Runnable action, long initialDelay, long period,
TimeUnit unit) {
if (disposables.isDisposed()) {
return Disposable.disposed();
}
ScheduledWork work = createWork(action, initialDelay, period);
Disposable disposable =
delegateWorker.schedulePeriodically(work, initialDelay, period, unit);
disposables.add(disposable);
ScheduledWorkDisposable workDisposable = new ScheduledWorkDisposable(work, disposable);
disposables.add(workDisposable);
return workDisposable;
}
@Override public void dispose() {
disposables.dispose();
}
@Override public boolean isDisposed() {
return disposables.isDisposed();
}
};
}
void startWork() {
work.incrementAndGet();
}
void stopWork() {
if (work.decrementAndGet() == 0 && callback != null) {
callback.onTransitionToIdle();
}
}
ScheduledWork createWork(Runnable action, long delay, long period) {
if (action instanceof ScheduledWork) {
// Unwrap any re-scheduled work. We want each scheduler to get its own state machine.
action = ((ScheduledWork) action).delegate;
}
boolean immediate = delay == 0;
if (immediate) {
startWork();
}
int startingState = immediate ? ScheduledWork.STATE_SCHEDULED : ScheduledWork.STATE_IDLE;
return new ScheduledWork(action, startingState, period > 0L);
}
final class ScheduledWork extends AtomicInteger implements Runnable {
static final int STATE_IDLE = 0; // --> STATE_RUNNING, STATE_DISPOSED
static final int STATE_SCHEDULED = 1; // --> STATE_RUNNING, STATE_DISPOSED
static final int STATE_RUNNING = 2; // --> STATE_IDLE, STATE_COMPLETED, STATE_DISPOSED
static final int STATE_COMPLETED = 3; // --> STATE_DISPOSED
static final int STATE_DISPOSED = 4;
final Runnable delegate;
private final boolean isPeriodic;
ScheduledWork(Runnable delegate, int startingState, boolean isPeriodic) {
super(startingState);
this.delegate = delegate;
this.isPeriodic = isPeriodic;
}
@Override public void run() {
for (;;) {
int state = get();
switch (state) {
case STATE_IDLE:
case STATE_SCHEDULED:
if (compareAndSet(state, STATE_RUNNING)) {
if (state == STATE_IDLE) {
startWork();
}
try {
delegate.run();
} finally {
// Change state with a CAS to ensure we don't overwrite a disposed state.
compareAndSet(STATE_RUNNING, isPeriodic ? STATE_IDLE : STATE_COMPLETED);
stopWork();
}
return; // CAS success, we're done.
}
break; // CAS failed, retry.
case STATE_RUNNING:
throw new IllegalStateException("Already running");
case STATE_COMPLETED:
throw new IllegalStateException("Already completed");
case STATE_DISPOSED:
return; // Nothing to do.
}
}
}
void dispose() {
for (;;) {
int state = get();
if (state == STATE_DISPOSED) {
return; // Nothing to do.
} else if (compareAndSet(state, STATE_DISPOSED)) {
// If idle, startWork() hasn't been called so we don't need a matching stopWork().
// If running, startWork() was called but the try/finally ensures a stopWork() call.
// If completed, both startWork() and stopWork() have been called.
if (state == STATE_SCHEDULED) {
stopWork(); // Scheduled but not running means we called startWork().
}
return;
}
}
}
}
static final class ScheduledWorkDisposable implements Disposable {
private final ScheduledWork work;
private final Disposable delegate;
ScheduledWorkDisposable(ScheduledWork work, Disposable delegate) {
this.delegate = delegate;
this.work = work;
}
@Override public void dispose() {
work.dispose();
delegate.dispose();
}
@Override public boolean isDisposed() {
return work.get() == ScheduledWork.STATE_DISPOSED;
}
}
}