diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/AbstractStream.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/AbstractStream.java index 57179507e0a..e2d7a0f19e9 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/AbstractStream.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/AbstractStream.java @@ -174,6 +174,9 @@ public void run() { /** The time a stream stays open after it is marked idle. */ private static final long IDLE_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1); + /** Maximum backoff time when reconnecting. */ + private static final long RECONNECT_BACKOFF_MAX_DELAY_MS = TimeUnit.SECONDS.toMillis(10); + @Nullable private DelayedTask idleTimer; private final FirestoreChannel firestoreChannel; @@ -263,8 +266,9 @@ public void start() { * * @param finalState the intended state of the stream after closing. * @param status the status to emit to the listener. + * @param forceNewConnection whether to use a new connection the next time we open the stream */ - private void close(State finalState, Status status) { + protected void close(State finalState, Status status, boolean forceNewConnection) { hardAssert(isStarted(), "Only started streams should be closed."); hardAssert( finalState == State.Error || status.equals(Status.OK), @@ -290,18 +294,26 @@ private void close(State finalState, Status status) { if (code == Code.OK) { // If this is an intentional close ensure we don't delay our next connection attempt. backoff.reset(); - } else if (code == Code.RESOURCE_EXHAUSTED) { Logger.debug( getClass().getSimpleName(), "(%x) Using maximum backoff delay to prevent overloading the backend.", System.identityHashCode(this)); backoff.resetToMax(); - } else if (code == Code.UNAUTHENTICATED) { // "unauthenticated" error means the token was rejected. Try force refreshing it in case it // just expired. firestoreChannel.invalidateToken(); + } else if (code == Code.UNAVAILABLE) { + // These exceptions are thrown when the gRPC stream is closed with an connection error. For + // these cases, we need to use a new connection for the next connection attempt, which is + // done by marking the underlying channel as idle. + if (forceNewConnection + || status.getCause() instanceof java.net.ConnectException + || status.getCause() instanceof java.net.UnknownHostException) { + backoff.setTemporaryMaxDelay(RECONNECT_BACKOFF_MAX_DELAY_MS); + firestoreChannel.markChannelIdle(); + } } if (finalState != State.Error) { @@ -333,6 +345,10 @@ private void close(State finalState, Status status) { listener.onClose(status); } + protected void close(State finalState, Status status) { + close(finalState, status, false); + } + /** * Can be overridden to perform additional cleanup before the stream is closed. Calling * super.tearDown() is not required. @@ -375,6 +391,12 @@ private void handleIdleCloseTimer() { } } + /** Called when the connectivity attempt timer runs out. */ + void handleConnectionAttemptTimeout() { + // We want to force a new connection on the next connection attempt whenever we fail to connect. + close(State.Error, Status.UNAVAILABLE, true); + } + /** Called when GRPC closes the stream, which should always be due to some error. */ @VisibleForTesting void handleServerClose(Status status) { diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/FirestoreChannel.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/FirestoreChannel.java index 598901afc2a..70acd1340a1 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/FirestoreChannel.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/FirestoreChannel.java @@ -92,6 +92,15 @@ public void shutdown() { callProvider.shutdown(); } + /** + * Marks the underlying gRPC channel as idle. This allows on-going RPCs to continue, but the next + * RPC on the channel will trigger the creation of a new connection. This method is primarily used + * to reset the underlying connection. + */ + public void markChannelIdle() { + this.callProvider.markChannelIdle(); + } + /** * Creates and starts a new bi-directional streaming RPC. The stream cannot accept message before * the observer's `onOpen()` callback is invoked. diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/GrpcCallProvider.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/GrpcCallProvider.java index b0a297e8d47..a6579e8a78e 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/GrpcCallProvider.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/GrpcCallProvider.java @@ -89,6 +89,17 @@ public static void overrideChannelBuilder( }); } + /** + * Marks the underlying gRPC channel as idle. This allows on-going RPCs to continue, but the next + * RPC on the channel will trigger the creation of a new connection. + */ + void markChannelIdle() { + ManagedChannel channel = this.channelTask.getResult(); + if (channel != null) { + this.channelTask.getResult().enterIdle(); + } + } + /** Sets up the SSL provider and configures the gRPC channel. */ private ManagedChannel initChannel(Context context, DatabaseInfo databaseInfo) { try { diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java index e365749945d..d26c85437ee 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/OnlineStateTracker.java @@ -32,7 +32,9 @@ *

In particular, when the client is trying to connect to the backend, we allow up to * MAX_WATCH_STREAM_FAILURES within ONLINE_STATE_TIMEOUT_MS for a connection to succeed. If we have * too many failures or the timeout elapses, then we set the OnlineState to OFFLINE, and the client - * will behave as if it is offline (get() calls will return cached data, etc.). + * will behave as if it is offline (get() calls will return cached data, etc.). The client will + * continue to attempt reconnecting, restarting the underlying connection every + * CONNECTIVITY_ATTEMPT_TIMEOUT_MS. */ class OnlineStateTracker { @@ -56,6 +58,11 @@ interface OnlineStateCallback { // to OFFLINE rather than waiting indefinitely. private static final int ONLINE_STATE_TIMEOUT_MS = 10 * 1000; + // This timeout is also used when attempting to establish a connection. If a connection attempt + // does not succeed in time, we close the stream and restart the connection, rather than having + // it hang indefinitely. + static final int CONNECTIVITY_ATTEMPT_TIMEOUT_MS = 15 * 1000; + /** The log tag to use for this class. */ private static final String LOG_TAG = "OnlineStateTracker"; @@ -66,6 +73,10 @@ interface OnlineStateCallback { // MAX_WATCH_STREAM_FAILURES, we'll revert to OnlineState.OFFLINE. private int watchStreamFailures; + // A timer that elapses after CONNECTIVTY_ATTEMPT_TIMEOUT_MS, at which point we close the + // stream, reset the underlying connection, and try connecting again. + private DelayedTask connectivityAttemptTimer; + // A timer that elapses after ONLINE_STATE_TIMEOUT_MS, at which point we transition from // OnlineState.UNKNOWN to OFFLINE without waiting for the stream to actually fail // (MAX_WATCH_STREAM_FAILURES times). @@ -97,7 +108,6 @@ interface OnlineStateCallback { void handleWatchStreamStart() { if (watchStreamFailures == 0) { setAndBroadcastState(OnlineState.UNKNOWN); - hardAssert(onlineStateTimer == null, "onlineStateTimer shouldn't be started yet"); onlineStateTimer = workerQueue.enqueueAfterDelay( @@ -119,6 +129,8 @@ void handleWatchStreamStart() { // even though we are already marked OFFLINE but this is non-harmful. }); } + hardAssert( + connectivityAttemptTimer == null, "connectivityAttemptTimer shouldn't be started yet"); } /** @@ -135,16 +147,17 @@ void handleWatchStreamFailure(Status status) { // To get to OnlineState.ONLINE, updateState() must have been called which would have reset // our heuristics. hardAssert(this.watchStreamFailures == 0, "watchStreamFailures must be 0"); + hardAssert(this.connectivityAttemptTimer == null, "connectivityAttemptTimer must be null"); hardAssert(this.onlineStateTimer == null, "onlineStateTimer must be null"); } else { watchStreamFailures++; + clearTimers(); if (watchStreamFailures >= MAX_WATCH_STREAM_FAILURES) { - clearOnlineStateTimer(); logClientOfflineWarningIfNecessary( String.format( Locale.ENGLISH, - "Connection failed %d times. Most recent error: %s", - MAX_WATCH_STREAM_FAILURES, + "Backend didn't respond within %d seconds. Most recent error: %s\n", + CONNECTIVITY_ATTEMPT_TIMEOUT_MS / 1000, status)); setAndBroadcastState(OnlineState.OFFLINE); } @@ -158,7 +171,7 @@ void handleWatchStreamFailure(Status status) { * it must not be used in place of handleWatchStreamStart() and handleWatchStreamFailure(). */ void updateState(OnlineState newState) { - clearOnlineStateTimer(); + clearTimers(); watchStreamFailures = 0; if (newState == OnlineState.ONLINE) { @@ -194,10 +207,20 @@ private void logClientOfflineWarningIfNecessary(String reason) { } } - private void clearOnlineStateTimer() { + /** Clears the OnlineStateTimer and the passed in ConnectivityAttemptTimer. */ + private void clearTimers() { + if (connectivityAttemptTimer != null) { + connectivityAttemptTimer.cancel(); + connectivityAttemptTimer = null; + } if (onlineStateTimer != null) { onlineStateTimer.cancel(); onlineStateTimer = null; } } + + /** Sets the connectivity attempt timer to track. */ + void setConnectivityAttemptTimer(DelayedTask connectivityAttemptTimer) { + this.connectivityAttemptTimer = connectivityAttemptTimer; + } } diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java index f6172a41fca..2beb1ef3746 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java @@ -14,6 +14,7 @@ package com.google.firebase.firestore.remote; +import static com.google.firebase.firestore.remote.OnlineStateTracker.CONNECTIVITY_ATTEMPT_TIMEOUT_MS; import static com.google.firebase.firestore.util.Assert.hardAssert; import androidx.annotation.Nullable; @@ -35,6 +36,8 @@ import com.google.firebase.firestore.remote.WatchChange.WatchTargetChange; import com.google.firebase.firestore.remote.WatchChange.WatchTargetChangeType; import com.google.firebase.firestore.util.AsyncQueue; +import com.google.firebase.firestore.util.AsyncQueue.DelayedTask; +import com.google.firebase.firestore.util.AsyncQueue.TimerId; import com.google.firebase.firestore.util.Logger; import com.google.firebase.firestore.util.Util; import com.google.protobuf.ByteString; @@ -129,6 +132,8 @@ public interface RemoteStoreCallback { private final WriteStream writeStream; @Nullable private WatchChangeAggregator watchChangeAggregator; + private final AsyncQueue workerQueue; + /** * A list of up to MAX_PENDING_WRITES writes that we have fetched from the LocalStore via * fillWritePipeline() and have or will send to the write stream. @@ -155,12 +160,12 @@ public RemoteStore( this.localStore = localStore; this.datastore = datastore; this.connectivityMonitor = connectivityMonitor; + this.workerQueue = workerQueue; listenTargets = new HashMap<>(); writePipeline = new ArrayDeque<>(); - onlineStateTracker = - new OnlineStateTracker(workerQueue, remoteStoreCallback::handleOnlineStateChange); + new OnlineStateTracker(this.workerQueue, remoteStoreCallback::handleOnlineStateChange); // Create new streams (but note they're not started yet). watchStream = @@ -223,6 +228,11 @@ public void onClose(Status status) { }); } + /** Marks that we should start checking for online state. */ + public void attemptReconnect() { + watchStream.handleConnectionAttemptTimeout(); + } + /** Re-enables the network. Only to be called as the counterpart to disableNetwork(). */ public void enableNetwork() { networkEnabled = true; @@ -412,8 +422,20 @@ private void startWatchStream() { "startWatchStream() called when shouldStartWatchStream() is false."); watchChangeAggregator = new WatchChangeAggregator(this); watchStream.start(); - onlineStateTracker.handleWatchStreamStart(); + + DelayedTask connectivityAttemptTimer = + workerQueue.enqueueAfterDelay( + TimerId.CONNECTIVITY_ATTEMPT_TIMER, + CONNECTIVITY_ATTEMPT_TIMEOUT_MS, + () -> { + // If the network has been explicitly disabled, make sure we don't accidentally + // re-enable it. + if (canUseNetwork()) { + attemptReconnect(); + } + }); + onlineStateTracker.setConnectivityAttemptTimer(connectivityAttemptTimer); } private void handleWatchStreamOpen() { diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/util/AsyncQueue.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/util/AsyncQueue.java index 9802eb53e4c..4ccdd8c301a 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/util/AsyncQueue.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/util/AsyncQueue.java @@ -74,7 +74,13 @@ public enum TimerId { * A timer used to retry transactions. Since there can be multiple concurrent transactions, * multiple of these may be in the queue at a given time. */ - RETRY_TRANSACTION + RETRY_TRANSACTION, + /** + * A timer used in RemoteStore to monitor when a connection attempt is unsuccessful and retry + * accordingly. While `ONLINE_STATE_TIMEOUT` is used to transition from UNKNOWN to OFFLINE, + * `CONNECTIVITY_ATTEMPT_TIMER` is used for tracking and retrying connectivity attempts. + */ + CONNECTIVITY_ATTEMPT_TIMER } /** diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/util/ExponentialBackoff.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/util/ExponentialBackoff.java index b67c7339397..861136cb39f 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/util/ExponentialBackoff.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/util/ExponentialBackoff.java @@ -36,8 +36,8 @@ public class ExponentialBackoff { private final TimerId timerId; private final long initialDelayMs; private final double backoffFactor; - private final long maxDelayMs; + private long maxDelayMs; private long currentBaseMs; private long lastAttemptTime; private DelayedTask timerTask; @@ -103,6 +103,16 @@ public void resetToMax() { currentBaseMs = maxDelayMs; } + /** + * Set the backoff's maximum delay for only the next call to backoffAndRun, after which the delay + * will be reset to maxDelayMs. + * + * @param newMax The temporary maximum delay to set. + */ + public void setTemporaryMaxDelay(long newMax) { + maxDelayMs = newMax; + } + /** * Waits for currentDelayMs, increases the delay and runs the specified task. If there was a * pending backoff task waiting to run already, it will be canceled. @@ -151,6 +161,9 @@ public void backoffAndRun(Runnable task) { } else if (currentBaseMs > maxDelayMs) { currentBaseMs = maxDelayMs; } + + // Reset max delay to the default. + maxDelayMs = DEFAULT_BACKOFF_MAX_DELAY_MS; } public void cancel() { diff --git a/firebase-firestore/src/test/java/com/google/firebase/firestore/remote/MockDatastore.java b/firebase-firestore/src/test/java/com/google/firebase/firestore/remote/MockDatastore.java index 02b5f0e4c0f..7b5510f17a8 100644 --- a/firebase-firestore/src/test/java/com/google/firebase/firestore/remote/MockDatastore.java +++ b/firebase-firestore/src/test/java/com/google/firebase/firestore/remote/MockDatastore.java @@ -67,6 +67,12 @@ public void stop() { open = false; } + @Override + protected void close(State finalState, Status status, boolean forceNewConnection) { + open = false; + listener.onClose(status); + } + @Override public boolean isStarted() { return open;