diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/AbstractStream.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/AbstractStream.java index 57179507e0a..e41ebda72c8 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/AbstractStream.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/AbstractStream.java @@ -174,6 +174,12 @@ public void run() { /** The time a stream stays open after it is marked idle. */ private static final long IDLE_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1); + /** + * Maximum backoff time for reconnecting when we know the connection is failed on the client-side. + */ + private static final long BACKOFF_CLIENT_NETWORK_FAILURE_MAX_DELAY_MS = + TimeUnit.SECONDS.toMillis(10); + @Nullable private DelayedTask idleTimer; private final FirestoreChannel firestoreChannel; @@ -290,18 +296,23 @@ private void close(State finalState, Status status) { if (code == Code.OK) { // If this is an intentional close ensure we don't delay our next connection attempt. backoff.reset(); - } else if (code == Code.RESOURCE_EXHAUSTED) { Logger.debug( getClass().getSimpleName(), "(%x) Using maximum backoff delay to prevent overloading the backend.", System.identityHashCode(this)); backoff.resetToMax(); - } else if (code == Code.UNAUTHENTICATED) { // "unauthenticated" error means the token was rejected. Try force refreshing it in case it // just expired. firestoreChannel.invalidateToken(); + } else if (code == Code.UNAVAILABLE) { + // This exception is thrown when the gRPC connection fails on the client side, To shorten + // reconnect time, we can use a shorter max delay when reconnecting. + if (status.getCause() instanceof java.net.UnknownHostException + || status.getCause() instanceof java.net.ConnectException) { + backoff.setTemporaryMaxDelay(BACKOFF_CLIENT_NETWORK_FAILURE_MAX_DELAY_MS); + } } if (finalState != State.Error) { diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/GrpcCallProvider.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/GrpcCallProvider.java index b0a297e8d47..9284c3970e3 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/GrpcCallProvider.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/GrpcCallProvider.java @@ -23,6 +23,8 @@ import com.google.android.gms.tasks.Tasks; import com.google.firebase.firestore.core.DatabaseInfo; import com.google.firebase.firestore.util.AsyncQueue; +import com.google.firebase.firestore.util.AsyncQueue.DelayedTask; +import com.google.firebase.firestore.util.AsyncQueue.TimerId; import com.google.firebase.firestore.util.Executors; import com.google.firebase.firestore.util.Logger; import com.google.firebase.firestore.util.Supplier; @@ -30,6 +32,7 @@ import io.grpc.CallCredentials; import io.grpc.CallOptions; import io.grpc.ClientCall; +import io.grpc.ConnectivityState; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.MethodDescriptor; @@ -45,11 +48,22 @@ public class GrpcCallProvider { private static Supplier> overrideChannelBuilderSupplier; - private final Task channelTask; + private Task channelTask; private final AsyncQueue asyncQueue; private CallOptions callOptions; + // This timeout is used when attempting to establish a connection in gRPC. If a connection attempt + // does not succeed in CONNECTIVITY_ATTEMPT_TIMEOUT_MS, we restart the channel and try + // reconnecting again, rather than waiting up to 2+ minutes for gRPC to timeout. + // More details about usage can be found in GrpcCallProvider.onConnectivityStateChanged(). + private static final int CONNECTIVITY_ATTEMPT_TIMEOUT_MS = 15 * 1000; + private DelayedTask connectivityAttemptTimer; + + private final Context context; + private final DatabaseInfo databaseInfo; + private final CallCredentials firestoreHeaders; + /** * Helper function to globally override the channel that RPCs use. Useful for testing when you * want to bypass SSL certificate checking. @@ -69,24 +83,11 @@ public static void overrideChannelBuilder( DatabaseInfo databaseInfo, CallCredentials firestoreHeaders) { this.asyncQueue = asyncQueue; + this.context = context; + this.databaseInfo = databaseInfo; + this.firestoreHeaders = firestoreHeaders; - // We execute network initialization on a separate thread to not block operations that depend on - // the AsyncQueue. - this.channelTask = - Tasks.call( - Executors.BACKGROUND_EXECUTOR, - () -> { - ManagedChannel channel = initChannel(context, databaseInfo); - FirestoreGrpc.FirestoreStub firestoreStub = - FirestoreGrpc.newStub(channel) - .withCallCredentials(firestoreHeaders) - // Ensure all callbacks are issued on the worker queue. If this call is - // removed, all calls need to be audited to make sure they are executed on the - // right thread. - .withExecutor(asyncQueue.getExecutor()); - callOptions = firestoreStub.getCallOptions(); - return channel; - }); + initChannelTask(); } /** Sets up the SSL provider and configures the gRPC channel. */ @@ -198,4 +199,80 @@ void shutdown() { Thread.currentThread().interrupt(); } } + + /** + * Monitors the connectivity state of the gRPC channel and resets the channel when gRPC fails to + * connect. + * + *

We currently cannot configure timeouts in connection attempts for gRPC + * (https://github.com/grpc/grpc-java/issues/1943), and until they support doing so, the gRPC + * connection can stay open for up to 2+ minutes before notifying us that it has shut down. + * + *

We start a timer when the channel enters ConnectivityState.CONNECTING. If the timer elapses, + * we reset the channel by shutting it down and reinitializing the channelTask. Changes to the + * connectivity state will clear the timer and start a new one-time listener for the next + * ConnectivityState change. + * + * @param channel The channel to monitor the connectivity state of. + */ + private void onConnectivityStateChange(ManagedChannel channel) { + ConnectivityState newState = channel.getState(true); + Logger.debug(LOG_TAG, "Current gRPC connectivity state: " + newState); + // Clear the timer, so we don't end up with multiple connectivityAttemptTimers. + clearConnectivityAttemptTimer(); + + if (newState == ConnectivityState.CONNECTING) { + Logger.debug(LOG_TAG, "Setting the connectivityAttemptTimer"); + connectivityAttemptTimer = + asyncQueue.enqueueAfterDelay( + TimerId.CONNECTIVITY_ATTEMPT_TIMER, + CONNECTIVITY_ATTEMPT_TIMEOUT_MS, + () -> { + Logger.debug(LOG_TAG, "connectivityAttemptTimer elapsed. Resetting the channel."); + clearConnectivityAttemptTimer(); + resetChannel(channel); + }); + } + // Re-listen for next state change. + channel.notifyWhenStateChanged( + newState, () -> asyncQueue.enqueueAndForget(() -> onConnectivityStateChange(channel))); + } + + private void resetChannel(ManagedChannel channel) { + asyncQueue.enqueueAndForget( + () -> { + channel.shutdownNow(); + initChannelTask(); + }); + } + + private void initChannelTask() { + // We execute network initialization on a separate thread to not block operations that depend on + // the AsyncQueue. + this.channelTask = + Tasks.call( + Executors.BACKGROUND_EXECUTOR, + () -> { + ManagedChannel channel = initChannel(context, databaseInfo); + onConnectivityStateChange(channel); + FirestoreGrpc.FirestoreStub firestoreStub = + FirestoreGrpc.newStub(channel) + .withCallCredentials(firestoreHeaders) + // Ensure all callbacks are issued on the worker queue. If this call is + // removed, all calls need to be audited to make sure they are executed on the + // right thread. + .withExecutor(asyncQueue.getExecutor()); + callOptions = firestoreStub.getCallOptions(); + Logger.debug(LOG_TAG, "Channel successfully reset."); + return channel; + }); + } + + private void clearConnectivityAttemptTimer() { + if (connectivityAttemptTimer != null) { + Logger.debug(LOG_TAG, "Clearing the connectivityAttemptTimer"); + connectivityAttemptTimer.cancel(); + connectivityAttemptTimer = null; + } + } } diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/util/AsyncQueue.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/util/AsyncQueue.java index 9802eb53e4c..825968c21d2 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/util/AsyncQueue.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/util/AsyncQueue.java @@ -74,7 +74,12 @@ public enum TimerId { * A timer used to retry transactions. Since there can be multiple concurrent transactions, * multiple of these may be in the queue at a given time. */ - RETRY_TRANSACTION + RETRY_TRANSACTION, + /** + * A timer used to monitor when a connection attempt in gRPC is unsuccessful and retry + * accordingly. + */ + CONNECTIVITY_ATTEMPT_TIMER } /** diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/util/ExponentialBackoff.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/util/ExponentialBackoff.java index b67c7339397..b98ec152006 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/util/ExponentialBackoff.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/util/ExponentialBackoff.java @@ -29,15 +29,22 @@ public class ExponentialBackoff { public static final double DEFAULT_BACKOFF_FACTOR = 1.5; - /** Maximum backoff time in milliseconds */ public static final long DEFAULT_BACKOFF_MAX_DELAY_MS = 60 * 1000; private final AsyncQueue queue; private final TimerId timerId; private final long initialDelayMs; private final double backoffFactor; + + /** The maximum backoff time in milliseconds. */ private final long maxDelayMs; + /** + * The maximum backoff time used when calculating the next backoff. This value can be changed for + * a single backoffAndRun call, after which it resets to maxDelayMs. + */ + private long nextMaxDelayMs; + private long currentBaseMs; private long lastAttemptTime; private DelayedTask timerTask; @@ -71,6 +78,7 @@ public ExponentialBackoff( this.initialDelayMs = initialDelayMs; this.backoffFactor = backoffFactor; this.maxDelayMs = maxDelayMs; + this.nextMaxDelayMs = maxDelayMs; this.lastAttemptTime = new Date().getTime(); reset(); @@ -100,7 +108,17 @@ public void reset() { * Resets the backoff delay to the maximum delay (e.g. for use after a RESOURCE_EXHAUSTED error). */ public void resetToMax() { - currentBaseMs = maxDelayMs; + currentBaseMs = nextMaxDelayMs; + } + + /** + * Set the backoff's maximum delay for only the next call to backoffAndRun, after which the delay + * will be reset to maxDelayMs. + * + * @param newMax The temporary maximum delay to set. + */ + public void setTemporaryMaxDelay(long newMax) { + nextMaxDelayMs = newMax; } /** @@ -148,9 +166,12 @@ public void backoffAndRun(Runnable task) { currentBaseMs = (long) (currentBaseMs * backoffFactor); if (currentBaseMs < initialDelayMs) { currentBaseMs = initialDelayMs; - } else if (currentBaseMs > maxDelayMs) { - currentBaseMs = maxDelayMs; + } else if (currentBaseMs > nextMaxDelayMs) { + currentBaseMs = nextMaxDelayMs; } + + // Reset max delay to the default. + nextMaxDelayMs = maxDelayMs; } public void cancel() {