Skip to content

Commit 11ca8a5

Browse files
author
Brian Chen
committed
did the thing
1 parent b932f74 commit 11ca8a5

File tree

7 files changed

+128
-59
lines changed

7 files changed

+128
-59
lines changed

firebase-firestore/src/main/java/com/google/firebase/firestore/FirebaseFirestoreSettings.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,9 +156,7 @@ public boolean isSslEnabled() {
156156
return sslEnabled;
157157
}
158158

159-
/**
160-
* @return boolean indicating whether local persistent storage is enabled or not.
161-
*/
159+
/** @return boolean indicating whether local persistent storage is enabled or not. */
162160
public boolean isPersistenceEnabled() {
163161
return persistenceEnabled;
164162
}

firebase-firestore/src/main/java/com/google/firebase/firestore/remote/AbstractStream.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,9 @@ public void run() {
174174
/** The time a stream stays open after it is marked idle. */
175175
private static final long IDLE_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1);
176176

177+
/** Maximum backoff time when reconnecting. */
178+
private static final long RECONNECT_BACKOFF_MAX_DELAY_MS = TimeUnit.SECONDS.toMillis(5);
179+
177180
@Nullable private DelayedTask idleTimer;
178181

179182
private final FirestoreChannel firestoreChannel;
@@ -194,6 +197,9 @@ public void run() {
194197
final ExponentialBackoff backoff;
195198
final CallbackT listener;
196199

200+
/** Whether we should start the gRPC stream with a new underlying connection. */
201+
private boolean useNewConnection = false;
202+
197203
AbstractStream(
198204
FirestoreChannel channel,
199205
MethodDescriptor<ReqT, RespT> methodDescriptor,
@@ -244,7 +250,12 @@ public void start() {
244250

245251
CloseGuardedRunner closeGuardedRunner = new CloseGuardedRunner(closeCount);
246252
StreamObserver streamObserver = new StreamObserver(closeGuardedRunner);
247-
call = firestoreChannel.runBidiStreamingRpc(methodDescriptor, streamObserver);
253+
if (useNewConnection) {
254+
call = firestoreChannel.runBidiStreamingRpcWithReset(methodDescriptor, streamObserver);
255+
useNewConnection = false;
256+
} else {
257+
call = firestoreChannel.runBidiStreamingRpc(methodDescriptor, streamObserver);
258+
}
248259

249260
state = State.Starting;
250261
}
@@ -290,18 +301,23 @@ private void close(State finalState, Status status) {
290301
if (code == Code.OK) {
291302
// If this is an intentional close ensure we don't delay our next connection attempt.
292303
backoff.reset();
293-
294304
} else if (code == Code.RESOURCE_EXHAUSTED) {
295305
Logger.debug(
296306
getClass().getSimpleName(),
297307
"(%x) Using maximum backoff delay to prevent overloading the backend.",
298308
System.identityHashCode(this));
299309
backoff.resetToMax();
300-
301310
} else if (code == Code.UNAUTHENTICATED) {
302311
// "unauthenticated" error means the token was rejected. Try force refreshing it in case it
303312
// just expired.
304313
firestoreChannel.invalidateToken();
314+
} else if (code == Code.UNAVAILABLE) {
315+
if (useNewConnection
316+
|| status.getCause() instanceof java.net.ConnectException
317+
|| status.getCause() instanceof java.net.UnknownHostException) {
318+
backoff.setTemporaryMaxDelay(RECONNECT_BACKOFF_MAX_DELAY_MS);
319+
useNewConnection = true;
320+
}
305321
}
306322

307323
if (finalState != State.Error) {
@@ -375,6 +391,14 @@ private void handleIdleCloseTimer() {
375391
}
376392
}
377393

394+
/** Called by the idle timer when the stream should close due to inactivity. */
395+
void handleConnectionAttemptTimeout() {
396+
useNewConnection = true;
397+
if (this.isOpen()) {
398+
close(State.Error, Status.UNAVAILABLE);
399+
}
400+
}
401+
378402
/** Called when GRPC closes the stream, which should always be due to some error. */
379403
@VisibleForTesting
380404
void handleServerClose(Status status) {

firebase-firestore/src/main/java/com/google/firebase/firestore/remote/FirestoreChannel.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class FirestoreChannel {
5959
private final CredentialsProvider credentialsProvider;
6060

6161
/** Manages the gRPC channel and provides all gRPC ClientCalls. */
62-
private final GrpcCallProvider callProvider;
62+
private GrpcCallProvider callProvider;
6363

6464
/** The value to use as resource prefix header. */
6565
private final String resourcePrefixValue;
@@ -92,6 +92,16 @@ public void shutdown() {
9292
callProvider.shutdown();
9393
}
9494

95+
/**
96+
* Creates and starts a new bi-directional streaming RPC after creating a new connection for the
97+
* channel.
98+
*/
99+
<ReqT, RespT> ClientCall<ReqT, RespT> runBidiStreamingRpcWithReset(
100+
MethodDescriptor<ReqT, RespT> method, IncomingStreamObserver<RespT> observer) {
101+
this.callProvider.markChannelIdle();
102+
return runBidiStreamingRpc(method, observer);
103+
}
104+
95105
/**
96106
* Creates and starts a new bi-directional streaming RPC. The stream cannot accept message before
97107
* the observer's `onOpen()` callback is invoked.

firebase-firestore/src/main/java/com/google/firebase/firestore/remote/GrpcCallProvider.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,17 @@ public static void overrideChannelBuilder(
8989
});
9090
}
9191

92+
/**
93+
* Marks the underlying gRPC channel as idle. This allows on-going RPCs to continue, but the next
94+
* RPC on the channel will trigger the creation of a new connection.
95+
*/
96+
void markChannelIdle() {
97+
ManagedChannel channel = this.channelTask.getResult();
98+
if (channel != null) {
99+
this.channelTask.getResult().enterIdle();
100+
}
101+
}
102+
92103
/** Sets up the SSL provider and configures the gRPC channel. */
93104
private ManagedChannel initChannel(Context context, DatabaseInfo databaseInfo) {
94105
try {

firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java

Lines changed: 38 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717
import static com.google.firebase.firestore.util.Assert.hardAssert;
1818

1919
import com.google.firebase.firestore.core.OnlineState;
20-
import com.google.firebase.firestore.util.AsyncQueue;
2120
import com.google.firebase.firestore.util.AsyncQueue.DelayedTask;
22-
import com.google.firebase.firestore.util.AsyncQueue.TimerId;
2321
import com.google.firebase.firestore.util.Logger;
2422
import io.grpc.Status;
2523
import java.util.Locale;
@@ -30,9 +28,9 @@
3028
* heuristics.
3129
*
3230
* <p>In particular, when the client is trying to connect to the backend, we allow up to
33-
* MAX_WATCH_STREAM_FAILURES within ONLINE_STATE_TIMEOUT_MS for a connection to succeed. If we have
34-
* too many failures or the timeout elapses, then we set the OnlineState to OFFLINE, and the client
35-
* will behave as if it is offline (get() calls will return cached data, etc.).
31+
* MAX_WATCH_STREAM_FAILURES within CONNECTIVITY_ATTEMPT_TIMEOUT_MS for a connection to succeed. If
32+
* we have too many failures or the timeout elapses, then we set the OnlineState to OFFLINE, and the
33+
* client will behave as if it is offline (get() calls will return cached data, etc.).
3634
*/
3735
class OnlineStateTracker {
3836

@@ -53,8 +51,9 @@ interface OnlineStateCallback {
5351

5452
// To deal with stream attempts that don't succeed or fail in a timely manner, we have a
5553
// timeout for OnlineState to reach ONLINE or OFFLINE. If the timeout is reached, we transition
56-
// to OFFLINE rather than waiting indefinitely.
57-
private static final int ONLINE_STATE_TIMEOUT_MS = 10 * 1000;
54+
// to OFFLINE rather than waiting indefinitely. This timeout is also used when attempting to
55+
// establish a connection when in an OFFLINE state.
56+
static final int CONNECTIVITY_ATTEMPT_TIMEOUT_MS = 5 * 1000;
5857

5958
/** The log tag to use for this class. */
6059
private static final String LOG_TAG = "OnlineStateTracker";
@@ -66,23 +65,19 @@ interface OnlineStateCallback {
6665
// MAX_WATCH_STREAM_FAILURES, we'll revert to OnlineState.OFFLINE.
6766
private int watchStreamFailures;
6867

69-
// A timer that elapses after ONLINE_STATE_TIMEOUT_MS, at which point we transition from
68+
// A timer that elapses after CONNECTIVITY_ATTEMPT_TIMEOUT_MS, at which point we transition from
7069
// OnlineState.UNKNOWN to OFFLINE without waiting for the stream to actually fail
7170
// (MAX_WATCH_STREAM_FAILURES times).
72-
private DelayedTask onlineStateTimer;
71+
private DelayedTask connectivityAttemptTimer;
7372

7473
// Whether the client should log a warning message if it fails to connect to the backend
7574
// (initially true, cleared after a successful stream, or if we've logged the message already).
7675
private boolean shouldWarnClientIsOffline;
7776

78-
// The AsyncQueue to use for running timers (and calling OnlineStateCallback methods).
79-
private final AsyncQueue workerQueue;
80-
8177
// The callback to notify on OnlineState changes.
8278
private final OnlineStateCallback onlineStateCallback;
8379

84-
OnlineStateTracker(AsyncQueue workerQueue, OnlineStateCallback onlineStateCallback) {
85-
this.workerQueue = workerQueue;
80+
OnlineStateTracker(OnlineStateCallback onlineStateCallback) {
8681
this.onlineStateCallback = onlineStateCallback;
8782
state = OnlineState.UNKNOWN;
8883
shouldWarnClientIsOffline = true;
@@ -92,33 +87,18 @@ interface OnlineStateCallback {
9287
* Called by RemoteStore when a watch stream is started (including on each backoff attempt).
9388
*
9489
* <p>If this is the first attempt, it sets the OnlineState to UNKNOWN and starts the
95-
* onlineStateTimer.
90+
* setConnectivityAttemptTimer.
9691
*/
97-
void handleWatchStreamStart() {
98-
if (watchStreamFailures == 0) {
99-
setAndBroadcastState(OnlineState.UNKNOWN);
92+
void handleWatchStreamConnectionFailed() {
93+
logClientOfflineWarningIfNecessary(
94+
String.format(
95+
Locale.ENGLISH,
96+
"Backend didn't respond within %d seconds\n",
97+
CONNECTIVITY_ATTEMPT_TIMEOUT_MS / 1000));
98+
setAndBroadcastState(OnlineState.OFFLINE);
10099

101-
hardAssert(onlineStateTimer == null, "onlineStateTimer shouldn't be started yet");
102-
onlineStateTimer =
103-
workerQueue.enqueueAfterDelay(
104-
TimerId.ONLINE_STATE_TIMEOUT,
105-
ONLINE_STATE_TIMEOUT_MS,
106-
() -> {
107-
onlineStateTimer = null;
108-
hardAssert(
109-
state == OnlineState.UNKNOWN,
110-
"Timer should be canceled if we transitioned to a different state.");
111-
logClientOfflineWarningIfNecessary(
112-
String.format(
113-
Locale.ENGLISH,
114-
"Backend didn't respond within %d seconds\n",
115-
ONLINE_STATE_TIMEOUT_MS / 1000));
116-
setAndBroadcastState(OnlineState.OFFLINE);
117-
118-
// NOTE: handleWatchStreamFailure() will continue to increment watchStreamFailures
119-
// even though we are already marked OFFLINE but this is non-harmful.
120-
});
121-
}
100+
// NOTE: handleWatchStreamFailure() will continue to increment watchStreamFailures
101+
// even though we are already marked OFFLINE but this is non-harmful.
122102
}
123103

124104
/**
@@ -135,11 +115,11 @@ void handleWatchStreamFailure(Status status) {
135115
// To get to OnlineState.ONLINE, updateState() must have been called which would have reset
136116
// our heuristics.
137117
hardAssert(this.watchStreamFailures == 0, "watchStreamFailures must be 0");
138-
hardAssert(this.onlineStateTimer == null, "onlineStateTimer must be null");
118+
hardAssert(this.connectivityAttemptTimer == null, "setConnectivityAttemptTimer must be null");
139119
} else {
140120
watchStreamFailures++;
141121
if (watchStreamFailures >= MAX_WATCH_STREAM_FAILURES) {
142-
clearOnlineStateTimer();
122+
clearConnectivityAttemptTimer();
143123
logClientOfflineWarningIfNecessary(
144124
String.format(
145125
Locale.ENGLISH,
@@ -158,7 +138,7 @@ void handleWatchStreamFailure(Status status) {
158138
* it must not be used in place of handleWatchStreamStart() and handleWatchStreamFailure().
159139
*/
160140
void updateState(OnlineState newState) {
161-
clearOnlineStateTimer();
141+
clearConnectivityAttemptTimer();
162142
watchStreamFailures = 0;
163143

164144
if (newState == OnlineState.ONLINE) {
@@ -171,6 +151,7 @@ void updateState(OnlineState newState) {
171151
}
172152

173153
private void setAndBroadcastState(OnlineState newState) {
154+
Logger.debug("OST", "BCHEN: state set to " + newState);
174155
if (newState != state) {
175156
state = newState;
176157
onlineStateCallback.handleOnlineStateChange(newState);
@@ -194,10 +175,21 @@ private void logClientOfflineWarningIfNecessary(String reason) {
194175
}
195176
}
196177

197-
private void clearOnlineStateTimer() {
198-
if (onlineStateTimer != null) {
199-
onlineStateTimer.cancel();
200-
onlineStateTimer = null;
178+
/** Clears the connectivity attempt timer that has been passed in. */
179+
void clearConnectivityAttemptTimer() {
180+
if (connectivityAttemptTimer != null) {
181+
connectivityAttemptTimer.cancel();
182+
connectivityAttemptTimer = null;
201183
}
202184
}
185+
186+
/** Returns the number of times the WatchStream has tried unsuccessfully to start. */
187+
int getWatchStreamFailures() {
188+
return watchStreamFailures;
189+
}
190+
191+
/** Set the connectivity attempt timer to track. */
192+
void setConnectivityAttemptTimer(DelayedTask connectivityAttemptTimer) {
193+
this.connectivityAttemptTimer = connectivityAttemptTimer;
194+
}
203195
}

firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
package com.google.firebase.firestore.remote;
1616

17+
import static com.google.firebase.firestore.remote.OnlineStateTracker.CONNECTIVITY_ATTEMPT_TIMEOUT_MS;
1718
import static com.google.firebase.firestore.util.Assert.hardAssert;
1819

1920
import androidx.annotation.Nullable;
@@ -35,6 +36,8 @@
3536
import com.google.firebase.firestore.remote.WatchChange.WatchTargetChange;
3637
import com.google.firebase.firestore.remote.WatchChange.WatchTargetChangeType;
3738
import com.google.firebase.firestore.util.AsyncQueue;
39+
import com.google.firebase.firestore.util.AsyncQueue.DelayedTask;
40+
import com.google.firebase.firestore.util.AsyncQueue.TimerId;
3841
import com.google.firebase.firestore.util.Logger;
3942
import com.google.firebase.firestore.util.Util;
4043
import com.google.protobuf.ByteString;
@@ -129,6 +132,8 @@ public interface RemoteStoreCallback {
129132
private final WriteStream writeStream;
130133
@Nullable private WatchChangeAggregator watchChangeAggregator;
131134

135+
private final AsyncQueue workerQueue;
136+
132137
/**
133138
* A list of up to MAX_PENDING_WRITES writes that we have fetched from the LocalStore via
134139
* fillWritePipeline() and have or will send to the write stream.
@@ -155,12 +160,11 @@ public RemoteStore(
155160
this.localStore = localStore;
156161
this.datastore = datastore;
157162
this.connectivityMonitor = connectivityMonitor;
163+
this.workerQueue = workerQueue;
158164

159165
listenTargets = new HashMap<>();
160166
writePipeline = new ArrayDeque<>();
161-
162-
onlineStateTracker =
163-
new OnlineStateTracker(workerQueue, remoteStoreCallback::handleOnlineStateChange);
167+
onlineStateTracker = new OnlineStateTracker(remoteStoreCallback::handleOnlineStateChange);
164168

165169
// Create new streams (but note they're not started yet).
166170
watchStream =
@@ -223,6 +227,11 @@ public void onClose(Status status) {
223227
});
224228
}
225229

230+
/** Marks that we should start checking for online state. */
231+
public void attemptReconnect() {
232+
watchStream.handleConnectionAttemptTimeout();
233+
}
234+
226235
/** Re-enables the network. Only to be called as the counterpart to disableNetwork(). */
227236
public void enableNetwork() {
228237
networkEnabled = true;
@@ -408,7 +417,20 @@ private void startWatchStream() {
408417
watchChangeAggregator = new WatchChangeAggregator(this);
409418
watchStream.start();
410419

411-
onlineStateTracker.handleWatchStreamStart();
420+
if (onlineStateTracker.getWatchStreamFailures() == 0) {
421+
onlineStateTracker.clearConnectivityAttemptTimer();
422+
}
423+
DelayedTask connectivityAttemptTimer =
424+
workerQueue.enqueueAfterDelay(
425+
TimerId.ONLINE_STATE_TIMEOUT,
426+
CONNECTIVITY_ATTEMPT_TIMEOUT_MS,
427+
() -> {
428+
if (canUseNetwork()) {
429+
attemptReconnect();
430+
}
431+
onlineStateTracker.handleWatchStreamConnectionFailed();
432+
});
433+
onlineStateTracker.setConnectivityAttemptTimer(connectivityAttemptTimer);
412434
}
413435

414436
private void handleWatchStreamOpen() {

0 commit comments

Comments
 (0)