Skip to content

Commit db8c9e2

Browse files
committed
1.x Remove all instances of Atomic*FieldUpdater
Replace them all with their respective Atomic* counterparts For example AtomicLongFieldUpdater -> AtomicLong Addresses ReactiveX#3459
1 parent 51527b7 commit db8c9e2

24 files changed

+205
-293
lines changed

src/main/java/rx/internal/operators/BlockingOperatorLatest.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import java.util.Iterator;
1919
import java.util.NoSuchElementException;
2020
import java.util.concurrent.Semaphore;
21-
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
21+
import java.util.concurrent.atomic.AtomicReference;
2222

2323
import rx.Notification;
2424
import rx.Observable;
@@ -59,15 +59,11 @@ public Iterator<T> iterator() {
5959
static final class LatestObserverIterator<T> extends Subscriber<Notification<? extends T>> implements Iterator<T> {
6060
final Semaphore notify = new Semaphore(0);
6161
// observer's notification
62-
volatile Notification<? extends T> value;
63-
/** Updater for the value field. */
64-
@SuppressWarnings("rawtypes")
65-
static final AtomicReferenceFieldUpdater<LatestObserverIterator, Notification> REFERENCE_UPDATER
66-
= AtomicReferenceFieldUpdater.newUpdater(LatestObserverIterator.class, Notification.class, "value");
62+
AtomicReference<Notification<? extends T>> value = new AtomicReference<Notification<? extends T>>();
6763

6864
@Override
6965
public void onNext(Notification<? extends T> args) {
70-
boolean wasntAvailable = REFERENCE_UPDATER.getAndSet(this, args) == null;
66+
boolean wasntAvailable = value.getAndSet(args) == null;
7167
if (wasntAvailable) {
7268
notify.release();
7369
}
@@ -103,7 +99,7 @@ public boolean hasNext() {
10399
}
104100

105101
@SuppressWarnings("unchecked")
106-
Notification<? extends T> n = REFERENCE_UPDATER.getAndSet(this, null);
102+
Notification<? extends T> n = value.getAndSet(null);
107103
iNotif = n;
108104
if (iNotif.isOnError()) {
109105
throw Exceptions.propagate(iNotif.getThrowable());

src/main/java/rx/internal/operators/BlockingOperatorNext.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import java.util.NoSuchElementException;
2020
import java.util.concurrent.ArrayBlockingQueue;
2121
import java.util.concurrent.BlockingQueue;
22-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
22+
import java.util.concurrent.atomic.AtomicInteger;
2323

2424
import rx.Notification;
2525
import rx.Observable;
@@ -148,10 +148,7 @@ public void remove() {
148148
private static class NextObserver<T> extends Subscriber<Notification<? extends T>> {
149149
private final BlockingQueue<Notification<? extends T>> buf = new ArrayBlockingQueue<Notification<? extends T>>(1);
150150
@SuppressWarnings("unused")
151-
volatile int waiting;
152-
@SuppressWarnings("rawtypes")
153-
static final AtomicIntegerFieldUpdater<NextObserver> WAITING_UPDATER
154-
= AtomicIntegerFieldUpdater.newUpdater(NextObserver.class, "waiting");
151+
AtomicInteger waiting = new AtomicInteger();
155152

156153
@Override
157154
public void onCompleted() {
@@ -166,7 +163,7 @@ public void onError(Throwable e) {
166163
@Override
167164
public void onNext(Notification<? extends T> args) {
168165

169-
if (WAITING_UPDATER.getAndSet(this, 0) == 1 || !args.isOnNext()) {
166+
if (waiting.getAndSet(0) == 1 || !args.isOnNext()) {
170167
Notification<? extends T> toOffer = args;
171168
while (!buf.offer(toOffer)) {
172169
Notification<? extends T> concurrentItem = buf.poll();
@@ -185,7 +182,7 @@ public Notification<? extends T> takeNext() throws InterruptedException {
185182
return buf.take();
186183
}
187184
void setWaiting(int value) {
188-
waiting = value;
185+
waiting.set(value);
189186
}
190187
}
191188
}

src/main/java/rx/internal/operators/BufferUntilSubscriber.java

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
package rx.internal.operators;
1717

1818
import java.util.concurrent.ConcurrentLinkedQueue;
19-
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
19+
import java.util.concurrent.atomic.AtomicReference;
2020

2121
import rx.Observer;
2222
import rx.Subscriber;
@@ -60,14 +60,10 @@ public static <T> BufferUntilSubscriber<T> create() {
6060

6161
/** The common state. */
6262
static final class State<T> {
63-
volatile Observer<? super T> observerRef = null;
64-
/** Field updater for observerRef. */
65-
@SuppressWarnings("rawtypes")
66-
static final AtomicReferenceFieldUpdater<State, Observer> OBSERVER_UPDATER
67-
= AtomicReferenceFieldUpdater.newUpdater(State.class, Observer.class, "observerRef");
63+
AtomicReference<Observer<? super T>> observerRef = new AtomicReference<Observer<? super T>>(null);
6864

6965
boolean casObserverRef(Observer<? super T> expected, Observer<? super T> next) {
70-
return OBSERVER_UPDATER.compareAndSet(this, expected, next);
66+
return observerRef.compareAndSet(expected, next);
7167
}
7268

7369
final Object guard = new Object();
@@ -92,7 +88,7 @@ public void call(final Subscriber<? super T> s) {
9288
@SuppressWarnings("unchecked")
9389
@Override
9490
public void call() {
95-
state.observerRef = EMPTY_OBSERVER;
91+
state.observerRef.set(EMPTY_OBSERVER);
9692
}
9793
}));
9894
boolean win = false;
@@ -107,7 +103,7 @@ public void call() {
107103
while(true) {
108104
Object o;
109105
while ((o = state.buffer.poll()) != null) {
110-
nl.accept(state.observerRef, o);
106+
nl.accept(state.observerRef.get(), o);
111107
}
112108
synchronized (state.guard) {
113109
if (state.buffer.isEmpty()) {
@@ -138,7 +134,7 @@ private BufferUntilSubscriber(State<T> state) {
138134
private void emit(Object v) {
139135
synchronized (state.guard) {
140136
state.buffer.add(v);
141-
if (state.observerRef != null && !state.emitting) {
137+
if (state.observerRef.get() != null && !state.emitting) {
142138
// Have an observer and nobody is emitting,
143139
// should drain the `buffer`
144140
forward = true;
@@ -148,7 +144,7 @@ private void emit(Object v) {
148144
if (forward) {
149145
Object o;
150146
while ((o = state.buffer.poll()) != null) {
151-
state.nl.accept(state.observerRef, o);
147+
state.nl.accept(state.observerRef.get(), o);
152148
}
153149
// Because `emit(Object v)` will be called in sequence,
154150
// no event will be put into `buffer` after we drain it.
@@ -158,7 +154,7 @@ private void emit(Object v) {
158154
@Override
159155
public void onCompleted() {
160156
if (forward) {
161-
state.observerRef.onCompleted();
157+
state.observerRef.get().onCompleted();
162158
}
163159
else {
164160
emit(state.nl.completed());
@@ -168,7 +164,7 @@ public void onCompleted() {
168164
@Override
169165
public void onError(Throwable e) {
170166
if (forward) {
171-
state.observerRef.onError(e);
167+
state.observerRef.get().onError(e);
172168
}
173169
else {
174170
emit(state.nl.error(e));
@@ -178,7 +174,7 @@ public void onError(Throwable e) {
178174
@Override
179175
public void onNext(T t) {
180176
if (forward) {
181-
state.observerRef.onNext(t);
177+
state.observerRef.get().onNext(t);
182178
}
183179
else {
184180
emit(state.nl.next(t));
@@ -188,7 +184,7 @@ public void onNext(T t) {
188184
@Override
189185
public boolean hasObservers() {
190186
synchronized (state.guard) {
191-
return state.observerRef != null;
187+
return state.observerRef.get() != null;
192188
}
193189
}
194190

src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import java.util.List;
2020
import java.util.concurrent.atomic.AtomicBoolean;
2121
import java.util.concurrent.atomic.AtomicLong;
22-
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
2322

2423
import rx.Observable;
2524
import rx.Observable.OnSubscribe;
@@ -91,9 +90,7 @@ final static class MultiSourceProducer<T, R> implements Producer {
9190
private volatile int completionCount; // does this need to be volatile or is WIP sufficient?
9291

9392
@SuppressWarnings("unused")
94-
private volatile long counter;
95-
@SuppressWarnings("rawtypes")
96-
private static final AtomicLongFieldUpdater<MultiSourceProducer> WIP = AtomicLongFieldUpdater.newUpdater(MultiSourceProducer.class, "counter");
93+
private AtomicLong counter = new AtomicLong();
9794

9895
@SuppressWarnings("unchecked")
9996
public MultiSourceProducer(final Subscriber<? super R> child, final List<? extends Observable<? extends T>> sources, FuncN<? extends R> combinator) {
@@ -139,7 +136,7 @@ public void request(long n) {
139136
* that there is always once who acts on each `tick`. Same concept as used in OperationObserveOn.
140137
*/
141138
void tick() {
142-
if (WIP.getAndIncrement(this) == 0) {
139+
if (counter.getAndIncrement() == 0) {
143140
int emitted = 0;
144141
do {
145142
// we only emit if requested > 0
@@ -155,7 +152,7 @@ void tick() {
155152
}
156153
}
157154
}
158-
} while (WIP.decrementAndGet(this) > 0);
155+
} while (counter.decrementAndGet() > 0);
159156
if (emitted > 0) {
160157
for (MultiSourceRequestableSubscriber<T, R> s : subscribers) {
161158
s.requestUpTo(emitted);

src/main/java/rx/internal/operators/OperatorConcat.java

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
package rx.internal.operators;
1717

1818
import java.util.concurrent.ConcurrentLinkedQueue;
19-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
20-
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
19+
import java.util.concurrent.atomic.AtomicInteger;
20+
import java.util.concurrent.atomic.AtomicLong;
2121

2222
import rx.Observable;
2323
import rx.Observable.Operator;
@@ -84,14 +84,10 @@ static final class ConcatSubscriber<T> extends Subscriber<Observable<? extends T
8484

8585
volatile ConcatInnerSubscriber<T> currentSubscriber;
8686

87-
volatile int wip;
88-
@SuppressWarnings("rawtypes")
89-
static final AtomicIntegerFieldUpdater<ConcatSubscriber> WIP = AtomicIntegerFieldUpdater.newUpdater(ConcatSubscriber.class, "wip");
87+
AtomicInteger wip = new AtomicInteger();
9088

9189
// accessed by REQUESTED
92-
private volatile long requested;
93-
@SuppressWarnings("rawtypes")
94-
private static final AtomicLongFieldUpdater<ConcatSubscriber> REQUESTED = AtomicLongFieldUpdater.newUpdater(ConcatSubscriber.class, "requested");
90+
private AtomicLong requested = new AtomicLong();
9591
private final ProducerArbiter arbiter;
9692

9793
public ConcatSubscriber(Subscriber<T> s, SerialSubscription current) {
@@ -118,10 +114,10 @@ public void onStart() {
118114
private void requestFromChild(long n) {
119115
if (n <=0) return;
120116
// we track 'requested' so we know whether we should subscribe the next or not
121-
long previous = BackpressureUtils.getAndAddRequest(REQUESTED, this, n);
117+
long previous = BackpressureUtils.getAndAddRequest(requested, n);
122118
arbiter.request(n);
123119
if (previous == 0) {
124-
if (currentSubscriber == null && wip > 0) {
120+
if (currentSubscriber == null && wip.get() > 0) {
125121
// this means we may be moving from one subscriber to another after having stopped processing
126122
// so need to kick off the subscribe via this request notification
127123
subscribeNext();
@@ -130,13 +126,13 @@ private void requestFromChild(long n) {
130126
}
131127

132128
private void decrementRequested() {
133-
REQUESTED.decrementAndGet(this);
129+
requested.decrementAndGet();
134130
}
135131

136132
@Override
137133
public void onNext(Observable<? extends T> t) {
138134
queue.add(nl.next(t));
139-
if (WIP.getAndIncrement(this) == 0) {
135+
if (wip.getAndIncrement() == 0) {
140136
subscribeNext();
141137
}
142138
}
@@ -150,22 +146,22 @@ public void onError(Throwable e) {
150146
@Override
151147
public void onCompleted() {
152148
queue.add(nl.completed());
153-
if (WIP.getAndIncrement(this) == 0) {
149+
if (wip.getAndIncrement() == 0) {
154150
subscribeNext();
155151
}
156152
}
157153

158154

159155
void completeInner() {
160156
currentSubscriber = null;
161-
if (WIP.decrementAndGet(this) > 0) {
157+
if (wip.decrementAndGet() > 0) {
162158
subscribeNext();
163159
}
164160
request(1);
165161
}
166162

167163
void subscribeNext() {
168-
if (requested > 0) {
164+
if (requested.get() > 0) {
169165
Object o = queue.poll();
170166
if (nl.isCompleted(o)) {
171167
child.onCompleted();
@@ -190,9 +186,7 @@ static class ConcatInnerSubscriber<T> extends Subscriber<T> {
190186
private final Subscriber<T> child;
191187
private final ConcatSubscriber<T> parent;
192188
@SuppressWarnings("unused")
193-
private volatile int once = 0;
194-
@SuppressWarnings("rawtypes")
195-
private final static AtomicIntegerFieldUpdater<ConcatInnerSubscriber> ONCE = AtomicIntegerFieldUpdater.newUpdater(ConcatInnerSubscriber.class, "once");
189+
private AtomicInteger once = new AtomicInteger();
196190
private final ProducerArbiter arbiter;
197191

198192
public ConcatInnerSubscriber(ConcatSubscriber<T> parent, Subscriber<T> child, ProducerArbiter arbiter) {
@@ -210,15 +204,15 @@ public void onNext(T t) {
210204

211205
@Override
212206
public void onError(Throwable e) {
213-
if (ONCE.compareAndSet(this, 0, 1)) {
207+
if (once.compareAndSet(0, 1)) {
214208
// terminal error through parent so everything gets cleaned up, including this inner
215209
parent.onError(e);
216210
}
217211
}
218212

219213
@Override
220214
public void onCompleted() {
221-
if (ONCE.compareAndSet(this, 0, 1)) {
215+
if (once.compareAndSet(0, 1)) {
222216
// terminal completion to parent so it continues to the next
223217
parent.completeInner();
224218
}

0 commit comments

Comments
 (0)