Skip to content

Fix Android connectivity issues #937

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ public void run() {
/** The time a stream stays open after it is marked idle. */
private static final long IDLE_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1);

/** Maximum backoff time when reconnecting. */
private static final long RECONNECT_BACKOFF_MAX_DELAY_MS = TimeUnit.SECONDS.toMillis(10);

@Nullable private DelayedTask idleTimer;

private final FirestoreChannel firestoreChannel;
Expand Down Expand Up @@ -263,8 +266,9 @@ public void start() {
*
* @param finalState the intended state of the stream after closing.
* @param status the status to emit to the listener.
* @param forceNewConnection whether to use a new connection the next time we open the stream
*/
private void close(State finalState, Status status) {
protected void close(State finalState, Status status, boolean forceNewConnection) {
hardAssert(isStarted(), "Only started streams should be closed.");
hardAssert(
finalState == State.Error || status.equals(Status.OK),
Expand All @@ -290,18 +294,26 @@ private void close(State finalState, Status status) {
if (code == Code.OK) {
// If this is an intentional close ensure we don't delay our next connection attempt.
backoff.reset();

} else if (code == Code.RESOURCE_EXHAUSTED) {
Logger.debug(
getClass().getSimpleName(),
"(%x) Using maximum backoff delay to prevent overloading the backend.",
System.identityHashCode(this));
backoff.resetToMax();

} else if (code == Code.UNAUTHENTICATED) {
// "unauthenticated" error means the token was rejected. Try force refreshing it in case it
// just expired.
firestoreChannel.invalidateToken();
} else if (code == Code.UNAVAILABLE) {
// These exceptions are thrown when the gRPC stream is closed with an connection error. For
// these cases, we need to use a new connection for the next connection attempt, which is
// done by marking the underlying channel as idle.
if (forceNewConnection
|| status.getCause() instanceof java.net.ConnectException
|| status.getCause() instanceof java.net.UnknownHostException) {
backoff.setTemporaryMaxDelay(RECONNECT_BACKOFF_MAX_DELAY_MS);
firestoreChannel.markChannelIdle();
}
}

if (finalState != State.Error) {
Expand Down Expand Up @@ -333,6 +345,10 @@ private void close(State finalState, Status status) {
listener.onClose(status);
}

protected void close(State finalState, Status status) {
close(finalState, status, false);
}

/**
* Can be overridden to perform additional cleanup before the stream is closed. Calling
* super.tearDown() is not required.
Expand Down Expand Up @@ -375,6 +391,12 @@ private void handleIdleCloseTimer() {
}
}

/** Called when the connectivity attempt timer runs out. */
void handleConnectionAttemptTimeout() {
// We want to force a new connection on the next connection attempt whenever we fail to connect.
close(State.Error, Status.UNAVAILABLE, true);
}

/** Called when GRPC closes the stream, which should always be due to some error. */
@VisibleForTesting
void handleServerClose(Status status) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ public void shutdown() {
callProvider.shutdown();
}

/**
* Marks the underlying gRPC channel as idle. This allows on-going RPCs to continue, but the next
* RPC on the channel will trigger the creation of a new connection. This method is primarily used
* to reset the underlying connection.
*/
public void markChannelIdle() {
this.callProvider.markChannelIdle();
}

/**
* Creates and starts a new bi-directional streaming RPC. The stream cannot accept message before
* the observer's `onOpen()` callback is invoked.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,17 @@ public static void overrideChannelBuilder(
});
}

/**
* Marks the underlying gRPC channel as idle. This allows on-going RPCs to continue, but the next
* RPC on the channel will trigger the creation of a new connection.
*/
void markChannelIdle() {
ManagedChannel channel = this.channelTask.getResult();
if (channel != null) {
this.channelTask.getResult().enterIdle();
}
}

/** Sets up the SSL provider and configures the gRPC channel. */
private ManagedChannel initChannel(Context context, DatabaseInfo databaseInfo) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
* <p>In particular, when the client is trying to connect to the backend, we allow up to
* MAX_WATCH_STREAM_FAILURES within ONLINE_STATE_TIMEOUT_MS for a connection to succeed. If we have
* too many failures or the timeout elapses, then we set the OnlineState to OFFLINE, and the client
* will behave as if it is offline (get() calls will return cached data, etc.).
* will behave as if it is offline (get() calls will return cached data, etc.). The client will
* continue to attempt reconnecting, restarting the underlying connection every
* CONNECTIVITY_ATTEMPT_TIMEOUT_MS.
*/
class OnlineStateTracker {

Expand All @@ -56,6 +58,11 @@ interface OnlineStateCallback {
// to OFFLINE rather than waiting indefinitely.
private static final int ONLINE_STATE_TIMEOUT_MS = 10 * 1000;

// This timeout is also used when attempting to establish a connection. If a connection attempt
// does not succeed in time, we close the stream and restart the connection, rather than having
// it hang indefinitely.
static final int CONNECTIVITY_ATTEMPT_TIMEOUT_MS = 15 * 1000;

/** The log tag to use for this class. */
private static final String LOG_TAG = "OnlineStateTracker";

Expand All @@ -66,6 +73,10 @@ interface OnlineStateCallback {
// MAX_WATCH_STREAM_FAILURES, we'll revert to OnlineState.OFFLINE.
private int watchStreamFailures;

// A timer that elapses after CONNECTIVTY_ATTEMPT_TIMEOUT_MS, at which point we close the
// stream, reset the underlying connection, and try connecting again.
private DelayedTask connectivityAttemptTimer;

// A timer that elapses after ONLINE_STATE_TIMEOUT_MS, at which point we transition from
// OnlineState.UNKNOWN to OFFLINE without waiting for the stream to actually fail
// (MAX_WATCH_STREAM_FAILURES times).
Expand Down Expand Up @@ -97,7 +108,6 @@ interface OnlineStateCallback {
void handleWatchStreamStart() {
if (watchStreamFailures == 0) {
setAndBroadcastState(OnlineState.UNKNOWN);

hardAssert(onlineStateTimer == null, "onlineStateTimer shouldn't be started yet");
onlineStateTimer =
workerQueue.enqueueAfterDelay(
Expand All @@ -119,6 +129,8 @@ void handleWatchStreamStart() {
// even though we are already marked OFFLINE but this is non-harmful.
});
}
hardAssert(
connectivityAttemptTimer == null, "connectivityAttemptTimer shouldn't be started yet");
}

/**
Expand All @@ -135,16 +147,17 @@ void handleWatchStreamFailure(Status status) {
// To get to OnlineState.ONLINE, updateState() must have been called which would have reset
// our heuristics.
hardAssert(this.watchStreamFailures == 0, "watchStreamFailures must be 0");
hardAssert(this.connectivityAttemptTimer == null, "connectivityAttemptTimer must be null");
hardAssert(this.onlineStateTimer == null, "onlineStateTimer must be null");
} else {
watchStreamFailures++;
clearTimers();
if (watchStreamFailures >= MAX_WATCH_STREAM_FAILURES) {
clearOnlineStateTimer();
logClientOfflineWarningIfNecessary(
String.format(
Locale.ENGLISH,
"Connection failed %d times. Most recent error: %s",
MAX_WATCH_STREAM_FAILURES,
"Backend didn't respond within %d seconds. Most recent error: %s\n",
CONNECTIVITY_ATTEMPT_TIMEOUT_MS / 1000,
status));
setAndBroadcastState(OnlineState.OFFLINE);
}
Expand All @@ -158,7 +171,7 @@ void handleWatchStreamFailure(Status status) {
* it must not be used in place of handleWatchStreamStart() and handleWatchStreamFailure().
*/
void updateState(OnlineState newState) {
clearOnlineStateTimer();
clearTimers();
watchStreamFailures = 0;

if (newState == OnlineState.ONLINE) {
Expand Down Expand Up @@ -194,10 +207,20 @@ private void logClientOfflineWarningIfNecessary(String reason) {
}
}

private void clearOnlineStateTimer() {
/** Clears the OnlineStateTimer and the passed in ConnectivityAttemptTimer. */
private void clearTimers() {
if (connectivityAttemptTimer != null) {
connectivityAttemptTimer.cancel();
connectivityAttemptTimer = null;
}
if (onlineStateTimer != null) {
onlineStateTimer.cancel();
onlineStateTimer = null;
}
}

/** Sets the connectivity attempt timer to track. */
void setConnectivityAttemptTimer(DelayedTask connectivityAttemptTimer) {
this.connectivityAttemptTimer = connectivityAttemptTimer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.google.firebase.firestore.remote;

import static com.google.firebase.firestore.remote.OnlineStateTracker.CONNECTIVITY_ATTEMPT_TIMEOUT_MS;
import static com.google.firebase.firestore.util.Assert.hardAssert;

import androidx.annotation.Nullable;
Expand All @@ -35,6 +36,8 @@
import com.google.firebase.firestore.remote.WatchChange.WatchTargetChange;
import com.google.firebase.firestore.remote.WatchChange.WatchTargetChangeType;
import com.google.firebase.firestore.util.AsyncQueue;
import com.google.firebase.firestore.util.AsyncQueue.DelayedTask;
import com.google.firebase.firestore.util.AsyncQueue.TimerId;
import com.google.firebase.firestore.util.Logger;
import com.google.firebase.firestore.util.Util;
import com.google.protobuf.ByteString;
Expand Down Expand Up @@ -129,6 +132,8 @@ public interface RemoteStoreCallback {
private final WriteStream writeStream;
@Nullable private WatchChangeAggregator watchChangeAggregator;

private final AsyncQueue workerQueue;

/**
* A list of up to MAX_PENDING_WRITES writes that we have fetched from the LocalStore via
* fillWritePipeline() and have or will send to the write stream.
Expand All @@ -155,12 +160,12 @@ public RemoteStore(
this.localStore = localStore;
this.datastore = datastore;
this.connectivityMonitor = connectivityMonitor;
this.workerQueue = workerQueue;

listenTargets = new HashMap<>();
writePipeline = new ArrayDeque<>();

onlineStateTracker =
new OnlineStateTracker(workerQueue, remoteStoreCallback::handleOnlineStateChange);
new OnlineStateTracker(this.workerQueue, remoteStoreCallback::handleOnlineStateChange);

// Create new streams (but note they're not started yet).
watchStream =
Expand Down Expand Up @@ -223,6 +228,11 @@ public void onClose(Status status) {
});
}

/** Marks that we should start checking for online state. */
public void attemptReconnect() {
watchStream.handleConnectionAttemptTimeout();
}

/** Re-enables the network. Only to be called as the counterpart to disableNetwork(). */
public void enableNetwork() {
networkEnabled = true;
Expand Down Expand Up @@ -412,8 +422,20 @@ private void startWatchStream() {
"startWatchStream() called when shouldStartWatchStream() is false.");
watchChangeAggregator = new WatchChangeAggregator(this);
watchStream.start();

onlineStateTracker.handleWatchStreamStart();

DelayedTask connectivityAttemptTimer =
workerQueue.enqueueAfterDelay(
TimerId.CONNECTIVITY_ATTEMPT_TIMER,
CONNECTIVITY_ATTEMPT_TIMEOUT_MS,
() -> {
// If the network has been explicitly disabled, make sure we don't accidentally
// re-enable it.
if (canUseNetwork()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, in the case of an online state timeout we performed the following things:

                logClientOfflineWarningIfNecessary(	
                    String.format(	
                        Locale.ENGLISH,	
                        "Backend didn't respond within %d seconds\n",	
                        ONLINE_STATE_TIMEOUT_MS / 1000));	
                setAndBroadcastState(OnlineState.OFFLINE);	

This isn't calling handleWatchStreamFailure so how is the OnlineState supposed to get to OFFLINE after the first timeout?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleWatchStreamFailure is still being called:

  1. attemptReconnect() results in calling AbstractStream.close()
  2. close() calls the onClose() listener passed in, which calls RemoteStore.handleWatchStreamClose()
  3. handleWatchStreamClose() calls handleWatchStreamFailure().

The problem I was running into by leaving a handleWatchStreamFailure() in the timeout logic was that handleWatchStreamFailure() would be called twice, once in the timer timeout, and once in handleWatchStreamClose(). Calling attemptReconnect() would call handleWatchStreamFailure() followed by a call to startWatchStream(). The subsequent call to handleWatchStreamFailure() in the timeout logic would then clear the connectivity timer set by attemptReconnect().

attemptReconnect();
}
});
onlineStateTracker.setConnectivityAttemptTimer(connectivityAttemptTimer);
}

private void handleWatchStreamOpen() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,13 @@ public enum TimerId {
* A timer used to retry transactions. Since there can be multiple concurrent transactions,
* multiple of these may be in the queue at a given time.
*/
RETRY_TRANSACTION
RETRY_TRANSACTION,
/**
* A timer used in RemoteStore to monitor when a connection attempt is unsuccessful and retry
* accordingly. While `ONLINE_STATE_TIMEOUT` is used to transition from UNKNOWN to OFFLINE,
* `CONNECTIVITY_ATTEMPT_TIMER` is used for tracking and retrying connectivity attempts.
*/
CONNECTIVITY_ATTEMPT_TIMER
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public class ExponentialBackoff {
private final TimerId timerId;
private final long initialDelayMs;
private final double backoffFactor;
private final long maxDelayMs;

private long maxDelayMs;
private long currentBaseMs;
private long lastAttemptTime;
private DelayedTask timerTask;
Expand Down Expand Up @@ -103,6 +103,16 @@ public void resetToMax() {
currentBaseMs = maxDelayMs;
}

/**
* Set the backoff's maximum delay for only the next call to backoffAndRun, after which the delay
* will be reset to maxDelayMs.
*
* @param newMax The temporary maximum delay to set.
*/
public void setTemporaryMaxDelay(long newMax) {
maxDelayMs = newMax;
}

/**
* Waits for currentDelayMs, increases the delay and runs the specified task. If there was a
* pending backoff task waiting to run already, it will be canceled.
Expand Down Expand Up @@ -151,6 +161,9 @@ public void backoffAndRun(Runnable task) {
} else if (currentBaseMs > maxDelayMs) {
currentBaseMs = maxDelayMs;
}

// Reset max delay to the default.
maxDelayMs = DEFAULT_BACKOFF_MAX_DELAY_MS;
}

public void cancel() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ public void stop() {
open = false;
}

@Override
protected void close(State finalState, Status status, boolean forceNewConnection) {
open = false;
listener.onClose(status);
}

@Override
public boolean isStarted() {
return open;
Expand Down