Skip to content

Fix Android connectivity issues #937

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ public void run() {
/** The time a stream stays open after it is marked idle. */
private static final long IDLE_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1);

/** Maximum backoff time when reconnecting. */
private static final long RECONNECT_BACKOFF_MAX_DELAY_MS = TimeUnit.SECONDS.toMillis(5);

@Nullable private DelayedTask idleTimer;

private final FirestoreChannel firestoreChannel;
Expand Down Expand Up @@ -263,8 +266,9 @@ public void start() {
*
* @param finalState the intended state of the stream after closing.
* @param status the status to emit to the listener.
* @param forceNewConnection whether to use a new connection the next time we open the stream
*/
private void close(State finalState, Status status) {
protected void close(State finalState, Status status, boolean forceNewConnection) {
hardAssert(isStarted(), "Only started streams should be closed.");
hardAssert(
finalState == State.Error || status.equals(Status.OK),
Expand All @@ -290,18 +294,26 @@ private void close(State finalState, Status status) {
if (code == Code.OK) {
// If this is an intentional close ensure we don't delay our next connection attempt.
backoff.reset();

} else if (code == Code.RESOURCE_EXHAUSTED) {
Logger.debug(
getClass().getSimpleName(),
"(%x) Using maximum backoff delay to prevent overloading the backend.",
System.identityHashCode(this));
backoff.resetToMax();

} else if (code == Code.UNAUTHENTICATED) {
// "unauthenticated" error means the token was rejected. Try force refreshing it in case it
// just expired.
firestoreChannel.invalidateToken();
} else if (code == Code.UNAVAILABLE) {
// These exceptions are thrown when the gRPC stream is closed with an connection error. For
// these cases, we need to use a new connection for the next connection attempt, which is
// done by marking the underlying channel as idle.
if (forceNewConnection
|| status.getCause() instanceof java.net.ConnectException
|| status.getCause() instanceof java.net.UnknownHostException) {
backoff.setTemporaryMaxDelay(RECONNECT_BACKOFF_MAX_DELAY_MS);
firestoreChannel.markChannelIdle();
}
}

if (finalState != State.Error) {
Expand Down Expand Up @@ -333,6 +345,10 @@ private void close(State finalState, Status status) {
listener.onClose(status);
}

protected void close(State finalState, Status status) {
close(finalState, status, false);
}

/**
* Can be overridden to perform additional cleanup before the stream is closed. Calling
* super.tearDown() is not required.
Expand Down Expand Up @@ -375,6 +391,12 @@ private void handleIdleCloseTimer() {
}
}

/** Called when the connectivity attempt timer runs out. */
void handleConnectionAttemptTimeout() {
// We want to force a new connection on the next connection attempt whenever we fail to connect.
close(State.Error, Status.UNAVAILABLE, true);
}

/** Called when GRPC closes the stream, which should always be due to some error. */
@VisibleForTesting
void handleServerClose(Status status) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class FirestoreChannel {
private final CredentialsProvider credentialsProvider;

/** Manages the gRPC channel and provides all gRPC ClientCalls. */
private final GrpcCallProvider callProvider;
private GrpcCallProvider callProvider;

/** The value to use as resource prefix header. */
private final String resourcePrefixValue;
Expand Down Expand Up @@ -92,6 +92,10 @@ public void shutdown() {
callProvider.shutdown();
}

public void markChannelIdle() {
this.callProvider.markChannelIdle();
}

/**
* Creates and starts a new bi-directional streaming RPC. The stream cannot accept message before
* the observer's `onOpen()` callback is invoked.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,17 @@ public static void overrideChannelBuilder(
});
}

/**
* Marks the underlying gRPC channel as idle. This allows on-going RPCs to continue, but the next
* RPC on the channel will trigger the creation of a new connection.
*/
void markChannelIdle() {
ManagedChannel channel = this.channelTask.getResult();
if (channel != null) {
this.channelTask.getResult().enterIdle();
}
}

/** Sets up the SSL provider and configures the gRPC channel. */
private ManagedChannel initChannel(Context context, DatabaseInfo databaseInfo) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
import static com.google.firebase.firestore.util.Assert.hardAssert;

import com.google.firebase.firestore.core.OnlineState;
import com.google.firebase.firestore.util.AsyncQueue;
import com.google.firebase.firestore.util.AsyncQueue.DelayedTask;
import com.google.firebase.firestore.util.AsyncQueue.TimerId;
import com.google.firebase.firestore.util.Logger;
import io.grpc.Status;
import java.util.Locale;
Expand All @@ -30,9 +28,9 @@
* heuristics.
*
* <p>In particular, when the client is trying to connect to the backend, we allow up to
* MAX_WATCH_STREAM_FAILURES within ONLINE_STATE_TIMEOUT_MS for a connection to succeed. If we have
* too many failures or the timeout elapses, then we set the OnlineState to OFFLINE, and the client
* will behave as if it is offline (get() calls will return cached data, etc.).
* MAX_WATCH_STREAM_FAILURES within CONNECTIVITY_ATTEMPT_TIMEOUT_MS for a connection to succeed. If
* we have too many failures or the timeout elapses, then we set the OnlineState to OFFLINE, and the
* client will behave as if it is offline (get() calls will return cached data, etc.).
*/
class OnlineStateTracker {

Expand All @@ -53,8 +51,9 @@ interface OnlineStateCallback {

// To deal with stream attempts that don't succeed or fail in a timely manner, we have a
// timeout for OnlineState to reach ONLINE or OFFLINE. If the timeout is reached, we transition
// to OFFLINE rather than waiting indefinitely.
private static final int ONLINE_STATE_TIMEOUT_MS = 10 * 1000;
// to OFFLINE rather than waiting indefinitely. This timeout is also used when attempting to
// establish a connection when in an OFFLINE state.
static final int CONNECTIVITY_ATTEMPT_TIMEOUT_MS = 10 * 1000;

/** The log tag to use for this class. */
private static final String LOG_TAG = "OnlineStateTracker";
Expand All @@ -66,61 +65,24 @@ interface OnlineStateCallback {
// MAX_WATCH_STREAM_FAILURES, we'll revert to OnlineState.OFFLINE.
private int watchStreamFailures;

// A timer that elapses after ONLINE_STATE_TIMEOUT_MS, at which point we transition from
// A timer that elapses after CONNECTIVITY_ATTEMPT_TIMEOUT_MS, at which point we transition from
// OnlineState.UNKNOWN to OFFLINE without waiting for the stream to actually fail
// (MAX_WATCH_STREAM_FAILURES times).
private DelayedTask onlineStateTimer;
private DelayedTask connectivityAttemptTimer;

// Whether the client should log a warning message if it fails to connect to the backend
// (initially true, cleared after a successful stream, or if we've logged the message already).
private boolean shouldWarnClientIsOffline;

// The AsyncQueue to use for running timers (and calling OnlineStateCallback methods).
private final AsyncQueue workerQueue;

// The callback to notify on OnlineState changes.
private final OnlineStateCallback onlineStateCallback;

OnlineStateTracker(AsyncQueue workerQueue, OnlineStateCallback onlineStateCallback) {
this.workerQueue = workerQueue;
OnlineStateTracker(OnlineStateCallback onlineStateCallback) {
this.onlineStateCallback = onlineStateCallback;
state = OnlineState.UNKNOWN;
shouldWarnClientIsOffline = true;
}

/**
* Called by RemoteStore when a watch stream is started (including on each backoff attempt).
*
* <p>If this is the first attempt, it sets the OnlineState to UNKNOWN and starts the
* onlineStateTimer.
*/
void handleWatchStreamStart() {
if (watchStreamFailures == 0) {
setAndBroadcastState(OnlineState.UNKNOWN);

hardAssert(onlineStateTimer == null, "onlineStateTimer shouldn't be started yet");
onlineStateTimer =
workerQueue.enqueueAfterDelay(
TimerId.ONLINE_STATE_TIMEOUT,
ONLINE_STATE_TIMEOUT_MS,
() -> {
onlineStateTimer = null;
hardAssert(
state == OnlineState.UNKNOWN,
"Timer should be canceled if we transitioned to a different state.");
logClientOfflineWarningIfNecessary(
String.format(
Locale.ENGLISH,
"Backend didn't respond within %d seconds\n",
ONLINE_STATE_TIMEOUT_MS / 1000));
setAndBroadcastState(OnlineState.OFFLINE);

// NOTE: handleWatchStreamFailure() will continue to increment watchStreamFailures
// even though we are already marked OFFLINE but this is non-harmful.
});
}
}

/**
* Called by RemoteStore when a watch stream fails.
*
Expand All @@ -135,30 +97,36 @@ void handleWatchStreamFailure(Status status) {
// To get to OnlineState.ONLINE, updateState() must have been called which would have reset
// our heuristics.
hardAssert(this.watchStreamFailures == 0, "watchStreamFailures must be 0");
hardAssert(this.onlineStateTimer == null, "onlineStateTimer must be null");
hardAssert(this.connectivityAttemptTimer == null, "connectivityAttemptTimer must be null");
} else {
watchStreamFailures++;
clearConnectivityAttemptTimer();
if (watchStreamFailures >= MAX_WATCH_STREAM_FAILURES) {
clearOnlineStateTimer();
logClientOfflineWarningIfNecessary(
String.format(
Locale.ENGLISH,
"Connection failed %d times. Most recent error: %s",
MAX_WATCH_STREAM_FAILURES,
"Backend didn't respond within %d seconds. Most recent error: %s\n",
CONNECTIVITY_ATTEMPT_TIMEOUT_MS / 1000,
status));
setAndBroadcastState(OnlineState.OFFLINE);
}
}
}

void handleWatchStreamStart() {
if (watchStreamFailures == 0) {
setAndBroadcastState(OnlineState.UNKNOWN);
}
}

/**
* Explicitly sets the OnlineState to the specified state.
*
* <p>Note that this resets the timers / failure counters, etc. used by our offline heuristics, so
* it must not be used in place of handleWatchStreamStart() and handleWatchStreamFailure().
*/
void updateState(OnlineState newState) {
clearOnlineStateTimer();
clearConnectivityAttemptTimer();
watchStreamFailures = 0;

if (newState == OnlineState.ONLINE) {
Expand Down Expand Up @@ -194,10 +162,16 @@ private void logClientOfflineWarningIfNecessary(String reason) {
}
}

private void clearOnlineStateTimer() {
if (onlineStateTimer != null) {
onlineStateTimer.cancel();
onlineStateTimer = null;
/** Clears the connectivity attempt timer that has been passed in. */
private void clearConnectivityAttemptTimer() {
if (connectivityAttemptTimer != null) {
connectivityAttemptTimer.cancel();
connectivityAttemptTimer = null;
}
}

/** Set the connectivity attempt timer to track. */
void setConnectivityAttemptTimer(DelayedTask connectivityAttemptTimer) {
this.connectivityAttemptTimer = connectivityAttemptTimer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.google.firebase.firestore.remote;

import static com.google.firebase.firestore.remote.OnlineStateTracker.CONNECTIVITY_ATTEMPT_TIMEOUT_MS;
import static com.google.firebase.firestore.util.Assert.hardAssert;

import androidx.annotation.Nullable;
Expand All @@ -35,6 +36,8 @@
import com.google.firebase.firestore.remote.WatchChange.WatchTargetChange;
import com.google.firebase.firestore.remote.WatchChange.WatchTargetChangeType;
import com.google.firebase.firestore.util.AsyncQueue;
import com.google.firebase.firestore.util.AsyncQueue.DelayedTask;
import com.google.firebase.firestore.util.AsyncQueue.TimerId;
import com.google.firebase.firestore.util.Logger;
import com.google.firebase.firestore.util.Util;
import com.google.protobuf.ByteString;
Expand Down Expand Up @@ -129,6 +132,8 @@ public interface RemoteStoreCallback {
private final WriteStream writeStream;
@Nullable private WatchChangeAggregator watchChangeAggregator;

private final AsyncQueue workerQueue;

/**
* A list of up to MAX_PENDING_WRITES writes that we have fetched from the LocalStore via
* fillWritePipeline() and have or will send to the write stream.
Expand All @@ -155,12 +160,11 @@ public RemoteStore(
this.localStore = localStore;
this.datastore = datastore;
this.connectivityMonitor = connectivityMonitor;
this.workerQueue = workerQueue;

listenTargets = new HashMap<>();
writePipeline = new ArrayDeque<>();

onlineStateTracker =
new OnlineStateTracker(workerQueue, remoteStoreCallback::handleOnlineStateChange);
onlineStateTracker = new OnlineStateTracker(remoteStoreCallback::handleOnlineStateChange);

// Create new streams (but note they're not started yet).
watchStream =
Expand Down Expand Up @@ -223,6 +227,11 @@ public void onClose(Status status) {
});
}

/** Marks that we should start checking for online state. */
public void attemptReconnect() {
watchStream.handleConnectionAttemptTimeout();
}

/** Re-enables the network. Only to be called as the counterpart to disableNetwork(). */
public void enableNetwork() {
networkEnabled = true;
Expand Down Expand Up @@ -407,8 +416,20 @@ private void startWatchStream() {
"startWatchStream() called when shouldStartWatchStream() is false.");
watchChangeAggregator = new WatchChangeAggregator(this);
watchStream.start();

onlineStateTracker.handleWatchStreamStart();

DelayedTask connectivityAttemptTimer =
workerQueue.enqueueAfterDelay(
TimerId.ONLINE_STATE_TIMEOUT,
CONNECTIVITY_ATTEMPT_TIMEOUT_MS,
() -> {
// If the network has been explicitly disabled, make sure we don't accidentally
// re-enable it.
if (canUseNetwork()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, in the case of an online state timeout we performed the following things:

                logClientOfflineWarningIfNecessary(	
                    String.format(	
                        Locale.ENGLISH,	
                        "Backend didn't respond within %d seconds\n",	
                        ONLINE_STATE_TIMEOUT_MS / 1000));	
                setAndBroadcastState(OnlineState.OFFLINE);	

This isn't calling handleWatchStreamFailure so how is the OnlineState supposed to get to OFFLINE after the first timeout?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleWatchStreamFailure is still being called:

  1. attemptReconnect() results in calling AbstractStream.close()
  2. close() calls the onClose() listener passed in, which calls RemoteStore.handleWatchStreamClose()
  3. handleWatchStreamClose() calls handleWatchStreamFailure().

The problem I was running into by leaving a handleWatchStreamFailure() in the timeout logic was that handleWatchStreamFailure() would be called twice, once in the timer timeout, and once in handleWatchStreamClose(). Calling attemptReconnect() would call handleWatchStreamFailure() followed by a call to startWatchStream(). The subsequent call to handleWatchStreamFailure() in the timeout logic would then clear the connectivity timer set by attemptReconnect().

attemptReconnect();
}
});
onlineStateTracker.setConnectivityAttemptTimer(connectivityAttemptTimer);
}

private void handleWatchStreamOpen() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public class ExponentialBackoff {
private final TimerId timerId;
private final long initialDelayMs;
private final double backoffFactor;
private final long maxDelayMs;

private long maxDelayMs;
private long currentBaseMs;
private long lastAttemptTime;
private DelayedTask timerTask;
Expand Down Expand Up @@ -103,6 +103,16 @@ public void resetToMax() {
currentBaseMs = maxDelayMs;
}

/**
* Set the backoff's maximum delay for only the next call to backoffAndRun, after which the delay
* will be reset to maxDelayMs.
*
* @param newMax The temporary maximum delay to set.
*/
public void setTemporaryMaxDelay(long newMax) {
maxDelayMs = newMax;
}

/**
* Waits for currentDelayMs, increases the delay and runs the specified task. If there was a
* pending backoff task waiting to run already, it will be canceled.
Expand Down Expand Up @@ -151,6 +161,9 @@ public void backoffAndRun(Runnable task) {
} else if (currentBaseMs > maxDelayMs) {
currentBaseMs = maxDelayMs;
}

// Reset max delay to the default.
maxDelayMs = DEFAULT_BACKOFF_MAX_DELAY_MS;
}

public void cancel() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ public void stop() {
open = false;
}

@Override
protected void close(State finalState, Status status, boolean forceNewConnection) {
open = false;
listener.onClose(status);
}

@Override
public boolean isStarted() {
return open;
Expand Down