From 11ca8a504febd11fab6008f508f8f3fad1fee778 Mon Sep 17 00:00:00 2001 From: Brian Chen Date: Wed, 23 Oct 2019 10:53:01 -0700 Subject: [PATCH 01/10] did the thing --- .../firestore/FirebaseFirestoreSettings.java | 4 +- .../firestore/remote/AbstractStream.java | 30 ++++++- .../firestore/remote/FirestoreChannel.java | 12 ++- .../firestore/remote/GrpcCallProvider.java | 11 +++ .../firestore/remote/OnlineStateTracker.java | 84 +++++++++---------- .../firestore/remote/RemoteStore.java | 30 ++++++- .../firestore/util/ExponentialBackoff.java | 16 +++- 7 files changed, 128 insertions(+), 59 deletions(-) diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/FirebaseFirestoreSettings.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/FirebaseFirestoreSettings.java index a3386a5cc76..9ec68457a48 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/FirebaseFirestoreSettings.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/FirebaseFirestoreSettings.java @@ -156,9 +156,7 @@ public boolean isSslEnabled() { return sslEnabled; } - /** - * @return boolean indicating whether local persistent storage is enabled or not. - */ + /** @return boolean indicating whether local persistent storage is enabled or not. */ public boolean isPersistenceEnabled() { return persistenceEnabled; } diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/AbstractStream.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/AbstractStream.java index 57179507e0a..b5178b813fa 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/AbstractStream.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/AbstractStream.java @@ -174,6 +174,9 @@ public void run() { /** The time a stream stays open after it is marked idle. */ private static final long IDLE_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1); + /** Maximum backoff time when reconnecting. */ + private static final long RECONNECT_BACKOFF_MAX_DELAY_MS = TimeUnit.SECONDS.toMillis(5); + @Nullable private DelayedTask idleTimer; private final FirestoreChannel firestoreChannel; @@ -194,6 +197,9 @@ public void run() { final ExponentialBackoff backoff; final CallbackT listener; + /** Whether we should start the gRPC stream with a new underlying connection. */ + private boolean useNewConnection = false; + AbstractStream( FirestoreChannel channel, MethodDescriptor methodDescriptor, @@ -244,7 +250,12 @@ public void start() { CloseGuardedRunner closeGuardedRunner = new CloseGuardedRunner(closeCount); StreamObserver streamObserver = new StreamObserver(closeGuardedRunner); - call = firestoreChannel.runBidiStreamingRpc(methodDescriptor, streamObserver); + if (useNewConnection) { + call = firestoreChannel.runBidiStreamingRpcWithReset(methodDescriptor, streamObserver); + useNewConnection = false; + } else { + call = firestoreChannel.runBidiStreamingRpc(methodDescriptor, streamObserver); + } state = State.Starting; } @@ -290,18 +301,23 @@ private void close(State finalState, Status status) { if (code == Code.OK) { // If this is an intentional close ensure we don't delay our next connection attempt. backoff.reset(); - } else if (code == Code.RESOURCE_EXHAUSTED) { Logger.debug( getClass().getSimpleName(), "(%x) Using maximum backoff delay to prevent overloading the backend.", System.identityHashCode(this)); backoff.resetToMax(); - } else if (code == Code.UNAUTHENTICATED) { // "unauthenticated" error means the token was rejected. Try force refreshing it in case it // just expired. firestoreChannel.invalidateToken(); + } else if (code == Code.UNAVAILABLE) { + if (useNewConnection + || status.getCause() instanceof java.net.ConnectException + || status.getCause() instanceof java.net.UnknownHostException) { + backoff.setTemporaryMaxDelay(RECONNECT_BACKOFF_MAX_DELAY_MS); + useNewConnection = true; + } } if (finalState != State.Error) { @@ -375,6 +391,14 @@ private void handleIdleCloseTimer() { } } + /** Called by the idle timer when the stream should close due to inactivity. */ + void handleConnectionAttemptTimeout() { + useNewConnection = true; + if (this.isOpen()) { + close(State.Error, Status.UNAVAILABLE); + } + } + /** Called when GRPC closes the stream, which should always be due to some error. */ @VisibleForTesting void handleServerClose(Status status) { diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/FirestoreChannel.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/FirestoreChannel.java index 598901afc2a..ade7c757174 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/FirestoreChannel.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/FirestoreChannel.java @@ -59,7 +59,7 @@ class FirestoreChannel { private final CredentialsProvider credentialsProvider; /** Manages the gRPC channel and provides all gRPC ClientCalls. */ - private final GrpcCallProvider callProvider; + private GrpcCallProvider callProvider; /** The value to use as resource prefix header. */ private final String resourcePrefixValue; @@ -92,6 +92,16 @@ public void shutdown() { callProvider.shutdown(); } + /** + * Creates and starts a new bi-directional streaming RPC after creating a new connection for the + * channel. + */ + ClientCall runBidiStreamingRpcWithReset( + MethodDescriptor method, IncomingStreamObserver observer) { + this.callProvider.markChannelIdle(); + return runBidiStreamingRpc(method, observer); + } + /** * Creates and starts a new bi-directional streaming RPC. The stream cannot accept message before * the observer's `onOpen()` callback is invoked. diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/GrpcCallProvider.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/GrpcCallProvider.java index b0a297e8d47..a6579e8a78e 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/GrpcCallProvider.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/GrpcCallProvider.java @@ -89,6 +89,17 @@ public static void overrideChannelBuilder( }); } + /** + * Marks the underlying gRPC channel as idle. This allows on-going RPCs to continue, but the next + * RPC on the channel will trigger the creation of a new connection. + */ + void markChannelIdle() { + ManagedChannel channel = this.channelTask.getResult(); + if (channel != null) { + this.channelTask.getResult().enterIdle(); + } + } + /** Sets up the SSL provider and configures the gRPC channel. */ private ManagedChannel initChannel(Context context, DatabaseInfo databaseInfo) { try { diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java index e365749945d..49356fc5e2a 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java @@ -17,9 +17,7 @@ import static com.google.firebase.firestore.util.Assert.hardAssert; import com.google.firebase.firestore.core.OnlineState; -import com.google.firebase.firestore.util.AsyncQueue; import com.google.firebase.firestore.util.AsyncQueue.DelayedTask; -import com.google.firebase.firestore.util.AsyncQueue.TimerId; import com.google.firebase.firestore.util.Logger; import io.grpc.Status; import java.util.Locale; @@ -30,9 +28,9 @@ * heuristics. * *

In particular, when the client is trying to connect to the backend, we allow up to - * MAX_WATCH_STREAM_FAILURES within ONLINE_STATE_TIMEOUT_MS for a connection to succeed. If we have - * too many failures or the timeout elapses, then we set the OnlineState to OFFLINE, and the client - * will behave as if it is offline (get() calls will return cached data, etc.). + * MAX_WATCH_STREAM_FAILURES within CONNECTIVITY_ATTEMPT_TIMEOUT_MS for a connection to succeed. If + * we have too many failures or the timeout elapses, then we set the OnlineState to OFFLINE, and the + * client will behave as if it is offline (get() calls will return cached data, etc.). */ class OnlineStateTracker { @@ -53,8 +51,9 @@ interface OnlineStateCallback { // To deal with stream attempts that don't succeed or fail in a timely manner, we have a // timeout for OnlineState to reach ONLINE or OFFLINE. If the timeout is reached, we transition - // to OFFLINE rather than waiting indefinitely. - private static final int ONLINE_STATE_TIMEOUT_MS = 10 * 1000; + // to OFFLINE rather than waiting indefinitely. This timeout is also used when attempting to + // establish a connection when in an OFFLINE state. + static final int CONNECTIVITY_ATTEMPT_TIMEOUT_MS = 5 * 1000; /** The log tag to use for this class. */ private static final String LOG_TAG = "OnlineStateTracker"; @@ -66,23 +65,19 @@ interface OnlineStateCallback { // MAX_WATCH_STREAM_FAILURES, we'll revert to OnlineState.OFFLINE. private int watchStreamFailures; - // A timer that elapses after ONLINE_STATE_TIMEOUT_MS, at which point we transition from + // A timer that elapses after CONNECTIVITY_ATTEMPT_TIMEOUT_MS, at which point we transition from // OnlineState.UNKNOWN to OFFLINE without waiting for the stream to actually fail // (MAX_WATCH_STREAM_FAILURES times). - private DelayedTask onlineStateTimer; + private DelayedTask connectivityAttemptTimer; // Whether the client should log a warning message if it fails to connect to the backend // (initially true, cleared after a successful stream, or if we've logged the message already). private boolean shouldWarnClientIsOffline; - // The AsyncQueue to use for running timers (and calling OnlineStateCallback methods). - private final AsyncQueue workerQueue; - // The callback to notify on OnlineState changes. private final OnlineStateCallback onlineStateCallback; - OnlineStateTracker(AsyncQueue workerQueue, OnlineStateCallback onlineStateCallback) { - this.workerQueue = workerQueue; + OnlineStateTracker(OnlineStateCallback onlineStateCallback) { this.onlineStateCallback = onlineStateCallback; state = OnlineState.UNKNOWN; shouldWarnClientIsOffline = true; @@ -92,33 +87,18 @@ interface OnlineStateCallback { * Called by RemoteStore when a watch stream is started (including on each backoff attempt). * *

If this is the first attempt, it sets the OnlineState to UNKNOWN and starts the - * onlineStateTimer. + * setConnectivityAttemptTimer. */ - void handleWatchStreamStart() { - if (watchStreamFailures == 0) { - setAndBroadcastState(OnlineState.UNKNOWN); + void handleWatchStreamConnectionFailed() { + logClientOfflineWarningIfNecessary( + String.format( + Locale.ENGLISH, + "Backend didn't respond within %d seconds\n", + CONNECTIVITY_ATTEMPT_TIMEOUT_MS / 1000)); + setAndBroadcastState(OnlineState.OFFLINE); - hardAssert(onlineStateTimer == null, "onlineStateTimer shouldn't be started yet"); - onlineStateTimer = - workerQueue.enqueueAfterDelay( - TimerId.ONLINE_STATE_TIMEOUT, - ONLINE_STATE_TIMEOUT_MS, - () -> { - onlineStateTimer = null; - hardAssert( - state == OnlineState.UNKNOWN, - "Timer should be canceled if we transitioned to a different state."); - logClientOfflineWarningIfNecessary( - String.format( - Locale.ENGLISH, - "Backend didn't respond within %d seconds\n", - ONLINE_STATE_TIMEOUT_MS / 1000)); - setAndBroadcastState(OnlineState.OFFLINE); - - // NOTE: handleWatchStreamFailure() will continue to increment watchStreamFailures - // even though we are already marked OFFLINE but this is non-harmful. - }); - } + // NOTE: handleWatchStreamFailure() will continue to increment watchStreamFailures + // even though we are already marked OFFLINE but this is non-harmful. } /** @@ -135,11 +115,11 @@ void handleWatchStreamFailure(Status status) { // To get to OnlineState.ONLINE, updateState() must have been called which would have reset // our heuristics. hardAssert(this.watchStreamFailures == 0, "watchStreamFailures must be 0"); - hardAssert(this.onlineStateTimer == null, "onlineStateTimer must be null"); + hardAssert(this.connectivityAttemptTimer == null, "setConnectivityAttemptTimer must be null"); } else { watchStreamFailures++; if (watchStreamFailures >= MAX_WATCH_STREAM_FAILURES) { - clearOnlineStateTimer(); + clearConnectivityAttemptTimer(); logClientOfflineWarningIfNecessary( String.format( Locale.ENGLISH, @@ -158,7 +138,7 @@ void handleWatchStreamFailure(Status status) { * it must not be used in place of handleWatchStreamStart() and handleWatchStreamFailure(). */ void updateState(OnlineState newState) { - clearOnlineStateTimer(); + clearConnectivityAttemptTimer(); watchStreamFailures = 0; if (newState == OnlineState.ONLINE) { @@ -171,6 +151,7 @@ void updateState(OnlineState newState) { } private void setAndBroadcastState(OnlineState newState) { + Logger.debug("OST", "BCHEN: state set to " + newState); if (newState != state) { state = newState; onlineStateCallback.handleOnlineStateChange(newState); @@ -194,10 +175,21 @@ private void logClientOfflineWarningIfNecessary(String reason) { } } - private void clearOnlineStateTimer() { - if (onlineStateTimer != null) { - onlineStateTimer.cancel(); - onlineStateTimer = null; + /** Clears the connectivity attempt timer that has been passed in. */ + void clearConnectivityAttemptTimer() { + if (connectivityAttemptTimer != null) { + connectivityAttemptTimer.cancel(); + connectivityAttemptTimer = null; } } + + /** Returns the number of times the WatchStream has tried unsuccessfully to start. */ + int getWatchStreamFailures() { + return watchStreamFailures; + } + + /** Set the connectivity attempt timer to track. */ + void setConnectivityAttemptTimer(DelayedTask connectivityAttemptTimer) { + this.connectivityAttemptTimer = connectivityAttemptTimer; + } } diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java index dc51e730898..3dac875c5af 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java @@ -14,6 +14,7 @@ package com.google.firebase.firestore.remote; +import static com.google.firebase.firestore.remote.OnlineStateTracker.CONNECTIVITY_ATTEMPT_TIMEOUT_MS; import static com.google.firebase.firestore.util.Assert.hardAssert; import androidx.annotation.Nullable; @@ -35,6 +36,8 @@ import com.google.firebase.firestore.remote.WatchChange.WatchTargetChange; import com.google.firebase.firestore.remote.WatchChange.WatchTargetChangeType; import com.google.firebase.firestore.util.AsyncQueue; +import com.google.firebase.firestore.util.AsyncQueue.DelayedTask; +import com.google.firebase.firestore.util.AsyncQueue.TimerId; import com.google.firebase.firestore.util.Logger; import com.google.firebase.firestore.util.Util; import com.google.protobuf.ByteString; @@ -129,6 +132,8 @@ public interface RemoteStoreCallback { private final WriteStream writeStream; @Nullable private WatchChangeAggregator watchChangeAggregator; + private final AsyncQueue workerQueue; + /** * A list of up to MAX_PENDING_WRITES writes that we have fetched from the LocalStore via * fillWritePipeline() and have or will send to the write stream. @@ -155,12 +160,11 @@ public RemoteStore( this.localStore = localStore; this.datastore = datastore; this.connectivityMonitor = connectivityMonitor; + this.workerQueue = workerQueue; listenTargets = new HashMap<>(); writePipeline = new ArrayDeque<>(); - - onlineStateTracker = - new OnlineStateTracker(workerQueue, remoteStoreCallback::handleOnlineStateChange); + onlineStateTracker = new OnlineStateTracker(remoteStoreCallback::handleOnlineStateChange); // Create new streams (but note they're not started yet). watchStream = @@ -223,6 +227,11 @@ public void onClose(Status status) { }); } + /** Marks that we should start checking for online state. */ + public void attemptReconnect() { + watchStream.handleConnectionAttemptTimeout(); + } + /** Re-enables the network. Only to be called as the counterpart to disableNetwork(). */ public void enableNetwork() { networkEnabled = true; @@ -408,7 +417,20 @@ private void startWatchStream() { watchChangeAggregator = new WatchChangeAggregator(this); watchStream.start(); - onlineStateTracker.handleWatchStreamStart(); + if (onlineStateTracker.getWatchStreamFailures() == 0) { + onlineStateTracker.clearConnectivityAttemptTimer(); + } + DelayedTask connectivityAttemptTimer = + workerQueue.enqueueAfterDelay( + TimerId.ONLINE_STATE_TIMEOUT, + CONNECTIVITY_ATTEMPT_TIMEOUT_MS, + () -> { + if (canUseNetwork()) { + attemptReconnect(); + } + onlineStateTracker.handleWatchStreamConnectionFailed(); + }); + onlineStateTracker.setConnectivityAttemptTimer(connectivityAttemptTimer); } private void handleWatchStreamOpen() { diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/util/ExponentialBackoff.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/util/ExponentialBackoff.java index b67c7339397..53f19a7223e 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/util/ExponentialBackoff.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/util/ExponentialBackoff.java @@ -36,8 +36,8 @@ public class ExponentialBackoff { private final TimerId timerId; private final long initialDelayMs; private final double backoffFactor; - private final long maxDelayMs; + private long maxDelayMs; private long currentBaseMs; private long lastAttemptTime; private DelayedTask timerTask; @@ -103,6 +103,16 @@ public void resetToMax() { currentBaseMs = maxDelayMs; } + /** + * Set the backoff's maximum delay for only the next call to backoffAndRun, after which the delay + * will be reset to maxDelayMs. + * + * @param newMax The temporary maximum delay to set. + */ + public void setTemporaryMaxDelay(long newMax) { + maxDelayMs = newMax; + } + /** * Waits for currentDelayMs, increases the delay and runs the specified task. If there was a * pending backoff task waiting to run already, it will be canceled. @@ -121,7 +131,6 @@ public void backoffAndRun(Runnable task) { // Guard against the backoff delay already being past. long remainingDelayMs = Math.max(0, desiredDelayWithJitterMs - delaySoFarMs); - if (currentBaseMs > 0) { Logger.debug( getClass().getSimpleName(), @@ -151,6 +160,9 @@ public void backoffAndRun(Runnable task) { } else if (currentBaseMs > maxDelayMs) { currentBaseMs = maxDelayMs; } + + // Reset max delay to the default. + maxDelayMs = DEFAULT_BACKOFF_MAX_DELAY_MS; } public void cancel() { From 448311d23917e4bcba8e8c54886204a55cbde1dd Mon Sep 17 00:00:00 2001 From: Brian Chen Date: Wed, 23 Oct 2019 11:44:55 -0700 Subject: [PATCH 02/10] comment fixes --- .../google/firebase/firestore/remote/OnlineStateTracker.java | 3 +-- .../com/google/firebase/firestore/util/ExponentialBackoff.java | 1 + 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java index 49356fc5e2a..c04330d2930 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java @@ -115,7 +115,7 @@ void handleWatchStreamFailure(Status status) { // To get to OnlineState.ONLINE, updateState() must have been called which would have reset // our heuristics. hardAssert(this.watchStreamFailures == 0, "watchStreamFailures must be 0"); - hardAssert(this.connectivityAttemptTimer == null, "setConnectivityAttemptTimer must be null"); + hardAssert(this.connectivityAttemptTimer == null, "connectivityAttemptTimer must be null"); } else { watchStreamFailures++; if (watchStreamFailures >= MAX_WATCH_STREAM_FAILURES) { @@ -151,7 +151,6 @@ void updateState(OnlineState newState) { } private void setAndBroadcastState(OnlineState newState) { - Logger.debug("OST", "BCHEN: state set to " + newState); if (newState != state) { state = newState; onlineStateCallback.handleOnlineStateChange(newState); diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/util/ExponentialBackoff.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/util/ExponentialBackoff.java index 53f19a7223e..861136cb39f 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/util/ExponentialBackoff.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/util/ExponentialBackoff.java @@ -131,6 +131,7 @@ public void backoffAndRun(Runnable task) { // Guard against the backoff delay already being past. long remainingDelayMs = Math.max(0, desiredDelayWithJitterMs - delaySoFarMs); + if (currentBaseMs > 0) { Logger.debug( getClass().getSimpleName(), From 00568bebae61d7a85e3791dc17dbae6bfaf5766a Mon Sep 17 00:00:00 2001 From: Brian Chen Date: Wed, 30 Oct 2019 16:31:04 -0700 Subject: [PATCH 03/10] resolved comments --- .../firestore/remote/AbstractStream.java | 27 +++++------- .../firestore/remote/FirestoreChannel.java | 8 +--- .../firestore/remote/OnlineStateTracker.java | 44 +++++++------------ .../firestore/remote/RemoteStore.java | 12 ++--- 4 files changed, 33 insertions(+), 58 deletions(-) diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/AbstractStream.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/AbstractStream.java index b5178b813fa..018c678b2f6 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/AbstractStream.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/AbstractStream.java @@ -197,9 +197,6 @@ public void run() { final ExponentialBackoff backoff; final CallbackT listener; - /** Whether we should start the gRPC stream with a new underlying connection. */ - private boolean useNewConnection = false; - AbstractStream( FirestoreChannel channel, MethodDescriptor methodDescriptor, @@ -250,12 +247,7 @@ public void start() { CloseGuardedRunner closeGuardedRunner = new CloseGuardedRunner(closeCount); StreamObserver streamObserver = new StreamObserver(closeGuardedRunner); - if (useNewConnection) { - call = firestoreChannel.runBidiStreamingRpcWithReset(methodDescriptor, streamObserver); - useNewConnection = false; - } else { - call = firestoreChannel.runBidiStreamingRpc(methodDescriptor, streamObserver); - } + call = firestoreChannel.runBidiStreamingRpc(methodDescriptor, streamObserver); state = State.Starting; } @@ -275,7 +267,7 @@ public void start() { * @param finalState the intended state of the stream after closing. * @param status the status to emit to the listener. */ - private void close(State finalState, Status status) { + private void close(State finalState, Status status, boolean forceNewConnection) { hardAssert(isStarted(), "Only started streams should be closed."); hardAssert( finalState == State.Error || status.equals(Status.OK), @@ -312,11 +304,11 @@ private void close(State finalState, Status status) { // just expired. firestoreChannel.invalidateToken(); } else if (code == Code.UNAVAILABLE) { - if (useNewConnection + if (forceNewConnection || status.getCause() instanceof java.net.ConnectException || status.getCause() instanceof java.net.UnknownHostException) { backoff.setTemporaryMaxDelay(RECONNECT_BACKOFF_MAX_DELAY_MS); - useNewConnection = true; + firestoreChannel.markChannelIdle(); } } @@ -349,6 +341,10 @@ private void close(State finalState, Status status) { listener.onClose(status); } + private void close(State finalState, Status status) { + close(finalState, status, false); + } + /** * Can be overridden to perform additional cleanup before the stream is closed. Calling * super.tearDown() is not required. @@ -391,12 +387,9 @@ private void handleIdleCloseTimer() { } } - /** Called by the idle timer when the stream should close due to inactivity. */ + /** Called when the connectivity attempt timer runs out. */ void handleConnectionAttemptTimeout() { - useNewConnection = true; - if (this.isOpen()) { - close(State.Error, Status.UNAVAILABLE); - } + close(State.Error, Status.UNAVAILABLE, true); } /** Called when GRPC closes the stream, which should always be due to some error. */ diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/FirestoreChannel.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/FirestoreChannel.java index ade7c757174..d6ecd06fe94 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/FirestoreChannel.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/FirestoreChannel.java @@ -92,14 +92,8 @@ public void shutdown() { callProvider.shutdown(); } - /** - * Creates and starts a new bi-directional streaming RPC after creating a new connection for the - * channel. - */ - ClientCall runBidiStreamingRpcWithReset( - MethodDescriptor method, IncomingStreamObserver observer) { + public void markChannelIdle() { this.callProvider.markChannelIdle(); - return runBidiStreamingRpc(method, observer); } /** diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java index 49356fc5e2a..874e96a68ee 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java @@ -17,6 +17,7 @@ import static com.google.firebase.firestore.util.Assert.hardAssert; import com.google.firebase.firestore.core.OnlineState; +import com.google.firebase.firestore.util.AsyncQueue; import com.google.firebase.firestore.util.AsyncQueue.DelayedTask; import com.google.firebase.firestore.util.Logger; import io.grpc.Status; @@ -53,7 +54,7 @@ interface OnlineStateCallback { // timeout for OnlineState to reach ONLINE or OFFLINE. If the timeout is reached, we transition // to OFFLINE rather than waiting indefinitely. This timeout is also used when attempting to // establish a connection when in an OFFLINE state. - static final int CONNECTIVITY_ATTEMPT_TIMEOUT_MS = 5 * 1000; + static final int CONNECTIVITY_ATTEMPT_TIMEOUT_MS = 10 * 1000; /** The log tag to use for this class. */ private static final String LOG_TAG = "OnlineStateTracker"; @@ -77,30 +78,16 @@ interface OnlineStateCallback { // The callback to notify on OnlineState changes. private final OnlineStateCallback onlineStateCallback; - OnlineStateTracker(OnlineStateCallback onlineStateCallback) { + // The AsyncQueue to use for running timers (and calling OnlineStateCallback methods). + private final AsyncQueue workerQueue; + + OnlineStateTracker(OnlineStateCallback onlineStateCallback, AsyncQueue workerQueue) { + this.workerQueue = workerQueue; this.onlineStateCallback = onlineStateCallback; state = OnlineState.UNKNOWN; shouldWarnClientIsOffline = true; } - /** - * Called by RemoteStore when a watch stream is started (including on each backoff attempt). - * - *

If this is the first attempt, it sets the OnlineState to UNKNOWN and starts the - * setConnectivityAttemptTimer. - */ - void handleWatchStreamConnectionFailed() { - logClientOfflineWarningIfNecessary( - String.format( - Locale.ENGLISH, - "Backend didn't respond within %d seconds\n", - CONNECTIVITY_ATTEMPT_TIMEOUT_MS / 1000)); - setAndBroadcastState(OnlineState.OFFLINE); - - // NOTE: handleWatchStreamFailure() will continue to increment watchStreamFailures - // even though we are already marked OFFLINE but this is non-harmful. - } - /** * Called by RemoteStore when a watch stream fails. * @@ -119,18 +106,23 @@ void handleWatchStreamFailure(Status status) { } else { watchStreamFailures++; if (watchStreamFailures >= MAX_WATCH_STREAM_FAILURES) { - clearConnectivityAttemptTimer(); logClientOfflineWarningIfNecessary( String.format( Locale.ENGLISH, - "Connection failed %d times. Most recent error: %s", - MAX_WATCH_STREAM_FAILURES, + "Backend didn't respond within %d seconds. Most recent error: %s\n", + CONNECTIVITY_ATTEMPT_TIMEOUT_MS / 1000, status)); setAndBroadcastState(OnlineState.OFFLINE); } } } + void handleWatchStreamStart() { + if (watchStreamFailures == 0) { + setAndBroadcastState(OnlineState.UNKNOWN); + } + } + /** * Explicitly sets the OnlineState to the specified state. * @@ -151,7 +143,6 @@ void updateState(OnlineState newState) { } private void setAndBroadcastState(OnlineState newState) { - Logger.debug("OST", "BCHEN: state set to " + newState); if (newState != state) { state = newState; onlineStateCallback.handleOnlineStateChange(newState); @@ -183,11 +174,6 @@ void clearConnectivityAttemptTimer() { } } - /** Returns the number of times the WatchStream has tried unsuccessfully to start. */ - int getWatchStreamFailures() { - return watchStreamFailures; - } - /** Set the connectivity attempt timer to track. */ void setConnectivityAttemptTimer(DelayedTask connectivityAttemptTimer) { this.connectivityAttemptTimer = connectivityAttemptTimer; diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java index 3dac875c5af..1b068ca2df6 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java @@ -164,7 +164,8 @@ public RemoteStore( listenTargets = new HashMap<>(); writePipeline = new ArrayDeque<>(); - onlineStateTracker = new OnlineStateTracker(remoteStoreCallback::handleOnlineStateChange); + onlineStateTracker = + new OnlineStateTracker(remoteStoreCallback::handleOnlineStateChange, workerQueue); // Create new streams (but note they're not started yet). watchStream = @@ -417,18 +418,19 @@ private void startWatchStream() { watchChangeAggregator = new WatchChangeAggregator(this); watchStream.start(); - if (onlineStateTracker.getWatchStreamFailures() == 0) { - onlineStateTracker.clearConnectivityAttemptTimer(); - } + onlineStateTracker.handleWatchStreamStart(); + DelayedTask connectivityAttemptTimer = workerQueue.enqueueAfterDelay( TimerId.ONLINE_STATE_TIMEOUT, CONNECTIVITY_ATTEMPT_TIMEOUT_MS, () -> { + // If the network has been explicitly disabled, make sure we don't accidentally + // re-enable it. if (canUseNetwork()) { attemptReconnect(); } - onlineStateTracker.handleWatchStreamConnectionFailed(); + onlineStateTracker.handleWatchStreamFailure(Status.UNAVAILABLE); }); onlineStateTracker.setConnectivityAttemptTimer(connectivityAttemptTimer); } From d7143787147e8cf5eb64c098b32c07485f93be5e Mon Sep 17 00:00:00 2001 From: Brian Chen Date: Wed, 30 Oct 2019 16:42:20 -0700 Subject: [PATCH 04/10] just kidding, had to update more comments and remove unused vars --- .../google/firebase/firestore/remote/AbstractStream.java | 5 +++++ .../firebase/firestore/remote/OnlineStateTracker.java | 7 +------ 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/AbstractStream.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/AbstractStream.java index 018c678b2f6..905342cfeed 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/AbstractStream.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/AbstractStream.java @@ -266,6 +266,7 @@ public void start() { * * @param finalState the intended state of the stream after closing. * @param status the status to emit to the listener. + * @param forceNewConnection whether to use a new connection the next time we open the stream */ private void close(State finalState, Status status, boolean forceNewConnection) { hardAssert(isStarted(), "Only started streams should be closed."); @@ -304,6 +305,9 @@ private void close(State finalState, Status status, boolean forceNewConnection) // just expired. firestoreChannel.invalidateToken(); } else if (code == Code.UNAVAILABLE) { + // These exceptions are thrown when the gRPC stream is closed with an connection error. For + // these cases, we need to use a new connection for the next connection attempt, which is + // done by marking the underlying channel as idle. if (forceNewConnection || status.getCause() instanceof java.net.ConnectException || status.getCause() instanceof java.net.UnknownHostException) { @@ -389,6 +393,7 @@ private void handleIdleCloseTimer() { /** Called when the connectivity attempt timer runs out. */ void handleConnectionAttemptTimeout() { + // We want to force a new connection on the next connection attempt whenever we fail to connect. close(State.Error, Status.UNAVAILABLE, true); } diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java index 94b144ed1da..0e4f613b1e4 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java @@ -17,7 +17,6 @@ import static com.google.firebase.firestore.util.Assert.hardAssert; import com.google.firebase.firestore.core.OnlineState; -import com.google.firebase.firestore.util.AsyncQueue; import com.google.firebase.firestore.util.AsyncQueue.DelayedTask; import com.google.firebase.firestore.util.Logger; import io.grpc.Status; @@ -78,11 +77,7 @@ interface OnlineStateCallback { // The callback to notify on OnlineState changes. private final OnlineStateCallback onlineStateCallback; - // The AsyncQueue to use for running timers (and calling OnlineStateCallback methods). - private final AsyncQueue workerQueue; - - OnlineStateTracker(OnlineStateCallback onlineStateCallback, AsyncQueue workerQueue) { - this.workerQueue = workerQueue; + OnlineStateTracker(OnlineStateCallback onlineStateCallback) { this.onlineStateCallback = onlineStateCallback; state = OnlineState.UNKNOWN; shouldWarnClientIsOffline = true; From 55e8f468e5c85adb34b1b6dd8a9987501d11199a Mon Sep 17 00:00:00 2001 From: Brian Chen Date: Wed, 30 Oct 2019 16:55:11 -0700 Subject: [PATCH 05/10] fix onlinestatetracker constructor --- .../java/com/google/firebase/firestore/remote/RemoteStore.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java index 1b068ca2df6..d94adfa7507 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java @@ -165,7 +165,7 @@ public RemoteStore( listenTargets = new HashMap<>(); writePipeline = new ArrayDeque<>(); onlineStateTracker = - new OnlineStateTracker(remoteStoreCallback::handleOnlineStateChange, workerQueue); + new OnlineStateTracker(remoteStoreCallback::handleOnlineStateChange); // Create new streams (but note they're not started yet). watchStream = From d2028939102d4106d4050f780a2e29926d7f6d2c Mon Sep 17 00:00:00 2001 From: Brian Chen Date: Fri, 1 Nov 2019 16:51:36 -0700 Subject: [PATCH 06/10] continue, make spec tests pass --- .../google/firebase/firestore/remote/AbstractStream.java | 4 ++-- .../firebase/firestore/remote/OnlineStateTracker.java | 3 ++- .../com/google/firebase/firestore/remote/RemoteStore.java | 5 +---- .../com/google/firebase/firestore/remote/MockDatastore.java | 6 ++++++ 4 files changed, 11 insertions(+), 7 deletions(-) diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/AbstractStream.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/AbstractStream.java index 905342cfeed..25f33a360bf 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/AbstractStream.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/AbstractStream.java @@ -268,7 +268,7 @@ public void start() { * @param status the status to emit to the listener. * @param forceNewConnection whether to use a new connection the next time we open the stream */ - private void close(State finalState, Status status, boolean forceNewConnection) { + protected void close(State finalState, Status status, boolean forceNewConnection) { hardAssert(isStarted(), "Only started streams should be closed."); hardAssert( finalState == State.Error || status.equals(Status.OK), @@ -345,7 +345,7 @@ private void close(State finalState, Status status, boolean forceNewConnection) listener.onClose(status); } - private void close(State finalState, Status status) { + protected void close(State finalState, Status status) { close(finalState, status, false); } diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java index 0e4f613b1e4..22621c23ef5 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java @@ -100,6 +100,7 @@ void handleWatchStreamFailure(Status status) { hardAssert(this.connectivityAttemptTimer == null, "connectivityAttemptTimer must be null"); } else { watchStreamFailures++; + clearConnectivityAttemptTimer(); if (watchStreamFailures >= MAX_WATCH_STREAM_FAILURES) { logClientOfflineWarningIfNecessary( String.format( @@ -162,7 +163,7 @@ private void logClientOfflineWarningIfNecessary(String reason) { } /** Clears the connectivity attempt timer that has been passed in. */ - void clearConnectivityAttemptTimer() { + private void clearConnectivityAttemptTimer() { if (connectivityAttemptTimer != null) { connectivityAttemptTimer.cancel(); connectivityAttemptTimer = null; diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java index d94adfa7507..c478b5fd9bd 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java @@ -164,8 +164,7 @@ public RemoteStore( listenTargets = new HashMap<>(); writePipeline = new ArrayDeque<>(); - onlineStateTracker = - new OnlineStateTracker(remoteStoreCallback::handleOnlineStateChange); + onlineStateTracker = new OnlineStateTracker(remoteStoreCallback::handleOnlineStateChange); // Create new streams (but note they're not started yet). watchStream = @@ -417,7 +416,6 @@ private void startWatchStream() { "startWatchStream() called when shouldStartWatchStream() is false."); watchChangeAggregator = new WatchChangeAggregator(this); watchStream.start(); - onlineStateTracker.handleWatchStreamStart(); DelayedTask connectivityAttemptTimer = @@ -430,7 +428,6 @@ private void startWatchStream() { if (canUseNetwork()) { attemptReconnect(); } - onlineStateTracker.handleWatchStreamFailure(Status.UNAVAILABLE); }); onlineStateTracker.setConnectivityAttemptTimer(connectivityAttemptTimer); } diff --git a/firebase-firestore/src/test/java/com/google/firebase/firestore/remote/MockDatastore.java b/firebase-firestore/src/test/java/com/google/firebase/firestore/remote/MockDatastore.java index 86d49d26549..41f9986a920 100644 --- a/firebase-firestore/src/test/java/com/google/firebase/firestore/remote/MockDatastore.java +++ b/firebase-firestore/src/test/java/com/google/firebase/firestore/remote/MockDatastore.java @@ -67,6 +67,12 @@ public void stop() { open = false; } + @Override + protected void close(State finalState, Status status, boolean forceNewConnection) { + open = false; + listener.onClose(status); + } + @Override public boolean isStarted() { return open; From 411ed9d3a3482aabca7c2a3e04f38be80b275b44 Mon Sep 17 00:00:00 2001 From: Brian Chen Date: Tue, 5 Nov 2019 11:50:55 -0600 Subject: [PATCH 07/10] resolve comments: comments, code ordering, rename to connectivity_attempt_timer --- .../firestore/remote/FirestoreChannel.java | 7 ++++++- .../firestore/remote/OnlineStateTracker.java | 19 +++++++++++++------ .../firestore/remote/RemoteStore.java | 2 +- .../firebase/firestore/util/AsyncQueue.java | 2 +- .../firebase/firestore/spec/SpecTestCase.java | 2 +- 5 files changed, 22 insertions(+), 10 deletions(-) diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/FirestoreChannel.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/FirestoreChannel.java index d6ecd06fe94..70acd1340a1 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/FirestoreChannel.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/FirestoreChannel.java @@ -59,7 +59,7 @@ class FirestoreChannel { private final CredentialsProvider credentialsProvider; /** Manages the gRPC channel and provides all gRPC ClientCalls. */ - private GrpcCallProvider callProvider; + private final GrpcCallProvider callProvider; /** The value to use as resource prefix header. */ private final String resourcePrefixValue; @@ -92,6 +92,11 @@ public void shutdown() { callProvider.shutdown(); } + /** + * Marks the underlying gRPC channel as idle. This allows on-going RPCs to continue, but the next + * RPC on the channel will trigger the creation of a new connection. This method is primarily used + * to reset the underlying connection. + */ public void markChannelIdle() { this.callProvider.markChannelIdle(); } diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java index 22621c23ef5..e56a17cd69e 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java @@ -83,6 +83,19 @@ interface OnlineStateCallback { shouldWarnClientIsOffline = true; } + /** + * Called by RemoteStore when a watch stream is started (including on each backoff attempt). + * + *

If this is the first attempt, it sets the OnlineState to UNKNOWN. + */ + void handleWatchStreamStart() { + if (watchStreamFailures == 0) { + setAndBroadcastState(OnlineState.UNKNOWN); + } + hardAssert( + connectivityAttemptTimer == null, "connectivityAttemptTimer shouldn't be started yet"); + } + /** * Called by RemoteStore when a watch stream fails. * @@ -113,12 +126,6 @@ void handleWatchStreamFailure(Status status) { } } - void handleWatchStreamStart() { - if (watchStreamFailures == 0) { - setAndBroadcastState(OnlineState.UNKNOWN); - } - } - /** * Explicitly sets the OnlineState to the specified state. * diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java index c478b5fd9bd..c7fb3b2d8ad 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java @@ -420,7 +420,7 @@ private void startWatchStream() { DelayedTask connectivityAttemptTimer = workerQueue.enqueueAfterDelay( - TimerId.ONLINE_STATE_TIMEOUT, + TimerId.CONNECTIVITY_ATTEMPT_TIMER, CONNECTIVITY_ATTEMPT_TIMEOUT_MS, () -> { // If the network has been explicitly disabled, make sure we don't accidentally diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/util/AsyncQueue.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/util/AsyncQueue.java index 9802eb53e4c..82297f051d7 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/util/AsyncQueue.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/util/AsyncQueue.java @@ -67,7 +67,7 @@ public enum TimerId { * A timer used in OnlineStateTracker to transition from OnlineState UNKNOWN to OFFLINE after a * set timeout, rather than waiting indefinitely for success or failure. */ - ONLINE_STATE_TIMEOUT, + CONNECTIVITY_ATTEMPT_TIMER, /** A timer used to periodically attempt LRU Garbage collection */ GARBAGE_COLLECTION, /** diff --git a/firebase-firestore/src/test/java/com/google/firebase/firestore/spec/SpecTestCase.java b/firebase-firestore/src/test/java/com/google/firebase/firestore/spec/SpecTestCase.java index 691f371a20b..142c45a794f 100644 --- a/firebase-firestore/src/test/java/com/google/firebase/firestore/spec/SpecTestCase.java +++ b/firebase-firestore/src/test/java/com/google/firebase/firestore/spec/SpecTestCase.java @@ -688,7 +688,7 @@ private void doRunTimer(String timer) throws Exception { timerId = TimerId.WRITE_STREAM_CONNECTION_BACKOFF; break; case "online_state_timeout": - timerId = TimerId.ONLINE_STATE_TIMEOUT; + timerId = TimerId.CONNECTIVITY_ATTEMPT_TIMER; break; default: throw Assert.fail("runTimer spec step specified unknown timer: %s", timer); From 5bce0a0f2c8abc7bfae84470ee3801e6545a19e1 Mon Sep 17 00:00:00 2001 From: Brian Chen Date: Tue, 12 Nov 2019 14:41:33 -0800 Subject: [PATCH 08/10] separate online_state_timeout from connectivity_attempt_timeout --- .../firestore/remote/AbstractStream.java | 2 +- .../firestore/remote/OnlineStateTracker.java | 37 ++++++++++++++++++- .../firestore/remote/RemoteStore.java | 3 +- .../firebase/firestore/util/AsyncQueue.java | 10 ++++- .../firebase/firestore/spec/SpecTestCase.java | 2 +- 5 files changed, 47 insertions(+), 7 deletions(-) diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/AbstractStream.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/AbstractStream.java index 25f33a360bf..e2d7a0f19e9 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/AbstractStream.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/AbstractStream.java @@ -175,7 +175,7 @@ public void run() { private static final long IDLE_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1); /** Maximum backoff time when reconnecting. */ - private static final long RECONNECT_BACKOFF_MAX_DELAY_MS = TimeUnit.SECONDS.toMillis(5); + private static final long RECONNECT_BACKOFF_MAX_DELAY_MS = TimeUnit.SECONDS.toMillis(10); @Nullable private DelayedTask idleTimer; diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java index e56a17cd69e..8daf102f3cd 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java @@ -17,7 +17,9 @@ import static com.google.firebase.firestore.util.Assert.hardAssert; import com.google.firebase.firestore.core.OnlineState; +import com.google.firebase.firestore.util.AsyncQueue; import com.google.firebase.firestore.util.AsyncQueue.DelayedTask; +import com.google.firebase.firestore.util.AsyncQueue.TimerId; import com.google.firebase.firestore.util.Logger; import io.grpc.Status; import java.util.Locale; @@ -53,7 +55,9 @@ interface OnlineStateCallback { // timeout for OnlineState to reach ONLINE or OFFLINE. If the timeout is reached, we transition // to OFFLINE rather than waiting indefinitely. This timeout is also used when attempting to // establish a connection when in an OFFLINE state. - static final int CONNECTIVITY_ATTEMPT_TIMEOUT_MS = 10 * 1000; + static final int CONNECTIVITY_ATTEMPT_TIMEOUT_MS = 15 * 1000; + + private static final int ONLINE_STATE_TIMEOUT_MS = 10 * 1000; /** The log tag to use for this class. */ private static final String LOG_TAG = "OnlineStateTracker"; @@ -70,14 +74,20 @@ interface OnlineStateCallback { // (MAX_WATCH_STREAM_FAILURES times). private DelayedTask connectivityAttemptTimer; + private DelayedTask onlineStateTimer; + // Whether the client should log a warning message if it fails to connect to the backend // (initially true, cleared after a successful stream, or if we've logged the message already). private boolean shouldWarnClientIsOffline; + // The AsyncQueue to use for running timers (and calling OnlineStateCallback methods). + private final AsyncQueue workerQueue; + // The callback to notify on OnlineState changes. private final OnlineStateCallback onlineStateCallback; - OnlineStateTracker(OnlineStateCallback onlineStateCallback) { + OnlineStateTracker(AsyncQueue workerQueue, OnlineStateCallback onlineStateCallback) { + this.workerQueue = workerQueue; this.onlineStateCallback = onlineStateCallback; state = OnlineState.UNKNOWN; shouldWarnClientIsOffline = true; @@ -91,6 +101,25 @@ interface OnlineStateCallback { void handleWatchStreamStart() { if (watchStreamFailures == 0) { setAndBroadcastState(OnlineState.UNKNOWN); + onlineStateTimer = + workerQueue.enqueueAfterDelay( + TimerId.ONLINE_STATE_TIMEOUT, + ONLINE_STATE_TIMEOUT_MS, + () -> { + onlineStateTimer = null; + hardAssert( + state == OnlineState.UNKNOWN, + "Timer should be canceled if we transitioned to a different state."); + logClientOfflineWarningIfNecessary( + String.format( + Locale.ENGLISH, + "Backend didn't respond within %d seconds\n", + ONLINE_STATE_TIMEOUT_MS / 1000)); + setAndBroadcastState(OnlineState.OFFLINE); + + // NOTE: handleWatchStreamFailure() will continue to increment watchStreamFailures + // even though we are already marked OFFLINE but this is non-harmful. + }); } hardAssert( connectivityAttemptTimer == null, "connectivityAttemptTimer shouldn't be started yet"); @@ -175,6 +204,10 @@ private void clearConnectivityAttemptTimer() { connectivityAttemptTimer.cancel(); connectivityAttemptTimer = null; } + if (onlineStateTimer != null) { + onlineStateTimer.cancel(); + onlineStateTimer = null; + } } /** Set the connectivity attempt timer to track. */ diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java index c7fb3b2d8ad..504d11ec68e 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java @@ -164,7 +164,8 @@ public RemoteStore( listenTargets = new HashMap<>(); writePipeline = new ArrayDeque<>(); - onlineStateTracker = new OnlineStateTracker(remoteStoreCallback::handleOnlineStateChange); + onlineStateTracker = + new OnlineStateTracker(this.workerQueue, remoteStoreCallback::handleOnlineStateChange); // Create new streams (but note they're not started yet). watchStream = diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/util/AsyncQueue.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/util/AsyncQueue.java index 82297f051d7..4ccdd8c301a 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/util/AsyncQueue.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/util/AsyncQueue.java @@ -67,14 +67,20 @@ public enum TimerId { * A timer used in OnlineStateTracker to transition from OnlineState UNKNOWN to OFFLINE after a * set timeout, rather than waiting indefinitely for success or failure. */ - CONNECTIVITY_ATTEMPT_TIMER, + ONLINE_STATE_TIMEOUT, /** A timer used to periodically attempt LRU Garbage collection */ GARBAGE_COLLECTION, /** * A timer used to retry transactions. Since there can be multiple concurrent transactions, * multiple of these may be in the queue at a given time. */ - RETRY_TRANSACTION + RETRY_TRANSACTION, + /** + * A timer used in RemoteStore to monitor when a connection attempt is unsuccessful and retry + * accordingly. While `ONLINE_STATE_TIMEOUT` is used to transition from UNKNOWN to OFFLINE, + * `CONNECTIVITY_ATTEMPT_TIMER` is used for tracking and retrying connectivity attempts. + */ + CONNECTIVITY_ATTEMPT_TIMER } /** diff --git a/firebase-firestore/src/test/java/com/google/firebase/firestore/spec/SpecTestCase.java b/firebase-firestore/src/test/java/com/google/firebase/firestore/spec/SpecTestCase.java index b7f348d5e22..777bd434c93 100644 --- a/firebase-firestore/src/test/java/com/google/firebase/firestore/spec/SpecTestCase.java +++ b/firebase-firestore/src/test/java/com/google/firebase/firestore/spec/SpecTestCase.java @@ -687,7 +687,7 @@ private void doRunTimer(String timer) throws Exception { timerId = TimerId.WRITE_STREAM_CONNECTION_BACKOFF; break; case "online_state_timeout": - timerId = TimerId.CONNECTIVITY_ATTEMPT_TIMER; + timerId = TimerId.ONLINE_STATE_TIMEOUT; break; default: throw Assert.fail("runTimer spec step specified unknown timer: %s", timer); From ba515b70c1bc5cc7fdd5ed1783df437a3ea3198d Mon Sep 17 00:00:00 2001 From: Brian Chen Date: Tue, 12 Nov 2019 14:51:18 -0800 Subject: [PATCH 09/10] update comments --- .../firestore/remote/OnlineStateTracker.java | 29 ++++++++++++------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java index 8daf102f3cd..b8cd8f740ad 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java @@ -30,9 +30,9 @@ * heuristics. * *

In particular, when the client is trying to connect to the backend, we allow up to - * MAX_WATCH_STREAM_FAILURES within CONNECTIVITY_ATTEMPT_TIMEOUT_MS for a connection to succeed. If - * we have too many failures or the timeout elapses, then we set the OnlineState to OFFLINE, and the - * client will behave as if it is offline (get() calls will return cached data, etc.). + * MAX_WATCH_STREAM_FAILURES within ONLINE_STATE_TIMEOUT_MS for a connection to succeed. If we have + * too many failures or the timeout elapses, then we set the OnlineState to OFFLINE, and the client + * will behave as if it is offline (get() calls will return cached data, etc.). */ class OnlineStateTracker { @@ -53,12 +53,14 @@ interface OnlineStateCallback { // To deal with stream attempts that don't succeed or fail in a timely manner, we have a // timeout for OnlineState to reach ONLINE or OFFLINE. If the timeout is reached, we transition - // to OFFLINE rather than waiting indefinitely. This timeout is also used when attempting to - // establish a connection when in an OFFLINE state. - static final int CONNECTIVITY_ATTEMPT_TIMEOUT_MS = 15 * 1000; - + // to OFFLINE rather than waiting indefinitely. private static final int ONLINE_STATE_TIMEOUT_MS = 10 * 1000; + // This timeout is also used when attempting to establish a connection. If a connection attempt + // does not succeed in time, we close the stream and restart the connection, rather than having + // it hang indefinitely. + static final int CONNECTIVITY_ATTEMPT_TIMEOUT_MS = 15 * 1000; + /** The log tag to use for this class. */ private static final String LOG_TAG = "OnlineStateTracker"; @@ -69,11 +71,13 @@ interface OnlineStateCallback { // MAX_WATCH_STREAM_FAILURES, we'll revert to OnlineState.OFFLINE. private int watchStreamFailures; - // A timer that elapses after CONNECTIVITY_ATTEMPT_TIMEOUT_MS, at which point we transition from - // OnlineState.UNKNOWN to OFFLINE without waiting for the stream to actually fail - // (MAX_WATCH_STREAM_FAILURES times). + // A timer that elapses after CONNECTIVTY_ATTEMPT_TIMEOUT_MS, at which point we close the + // stream, reset the underlying connection, and try connecting again. private DelayedTask connectivityAttemptTimer; + // A timer that elapses after ONLINE_STATE_TIMEOUT_MS, at which point we transition from + // OnlineState.UNKNOWN to OFFLINE without waiting for the stream to actually fail + // (MAX_WATCH_STREAM_FAILURES times). private DelayedTask onlineStateTimer; // Whether the client should log a warning message if it fails to connect to the backend @@ -96,11 +100,13 @@ interface OnlineStateCallback { /** * Called by RemoteStore when a watch stream is started (including on each backoff attempt). * - *

If this is the first attempt, it sets the OnlineState to UNKNOWN. + *

If this is the first attempt, it sets the OnlineState to UNKNOWN and starts the + * onlineStateTimer. */ void handleWatchStreamStart() { if (watchStreamFailures == 0) { setAndBroadcastState(OnlineState.UNKNOWN); + hardAssert(onlineStateTimer == null, "onlineStateTimer shouldn't be started yet"); onlineStateTimer = workerQueue.enqueueAfterDelay( TimerId.ONLINE_STATE_TIMEOUT, @@ -140,6 +146,7 @@ void handleWatchStreamFailure(Status status) { // our heuristics. hardAssert(this.watchStreamFailures == 0, "watchStreamFailures must be 0"); hardAssert(this.connectivityAttemptTimer == null, "connectivityAttemptTimer must be null"); + hardAssert(this.onlineStateTimer == null, "onlineStateTimer must be null"); } else { watchStreamFailures++; clearConnectivityAttemptTimer(); From b77e02ec420a7427a89fb4aa0fa07c62f5141fc0 Mon Sep 17 00:00:00 2001 From: Brian Chen Date: Mon, 2 Dec 2019 17:05:43 -0800 Subject: [PATCH 10/10] update comments --- .../firestore/remote/OnlineStateTracker.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java index b8cd8f740ad..d26c85437ee 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java @@ -32,7 +32,9 @@ *

In particular, when the client is trying to connect to the backend, we allow up to * MAX_WATCH_STREAM_FAILURES within ONLINE_STATE_TIMEOUT_MS for a connection to succeed. If we have * too many failures or the timeout elapses, then we set the OnlineState to OFFLINE, and the client - * will behave as if it is offline (get() calls will return cached data, etc.). + * will behave as if it is offline (get() calls will return cached data, etc.). The client will + * continue to attempt reconnecting, restarting the underlying connection every + * CONNECTIVITY_ATTEMPT_TIMEOUT_MS. */ class OnlineStateTracker { @@ -149,7 +151,7 @@ void handleWatchStreamFailure(Status status) { hardAssert(this.onlineStateTimer == null, "onlineStateTimer must be null"); } else { watchStreamFailures++; - clearConnectivityAttemptTimer(); + clearTimers(); if (watchStreamFailures >= MAX_WATCH_STREAM_FAILURES) { logClientOfflineWarningIfNecessary( String.format( @@ -169,7 +171,7 @@ void handleWatchStreamFailure(Status status) { * it must not be used in place of handleWatchStreamStart() and handleWatchStreamFailure(). */ void updateState(OnlineState newState) { - clearConnectivityAttemptTimer(); + clearTimers(); watchStreamFailures = 0; if (newState == OnlineState.ONLINE) { @@ -205,8 +207,8 @@ private void logClientOfflineWarningIfNecessary(String reason) { } } - /** Clears the connectivity attempt timer that has been passed in. */ - private void clearConnectivityAttemptTimer() { + /** Clears the OnlineStateTimer and the passed in ConnectivityAttemptTimer. */ + private void clearTimers() { if (connectivityAttemptTimer != null) { connectivityAttemptTimer.cancel(); connectivityAttemptTimer = null; @@ -217,7 +219,7 @@ private void clearConnectivityAttemptTimer() { } } - /** Set the connectivity attempt timer to track. */ + /** Sets the connectivity attempt timer to track. */ void setConnectivityAttemptTimer(DelayedTask connectivityAttemptTimer) { this.connectivityAttemptTimer = connectivityAttemptTimer; }