Skip to content

Fix Android connectivity issues #937

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,7 @@ public boolean isSslEnabled() {
return sslEnabled;
}

/**
* @return boolean indicating whether local persistent storage is enabled or not.
*/
/** @return boolean indicating whether local persistent storage is enabled or not. */
public boolean isPersistenceEnabled() {
return persistenceEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ public void run() {
/** The time a stream stays open after it is marked idle. */
private static final long IDLE_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1);

/** Maximum backoff time when reconnecting. */
private static final long RECONNECT_BACKOFF_MAX_DELAY_MS = TimeUnit.SECONDS.toMillis(5);

@Nullable private DelayedTask idleTimer;

private final FirestoreChannel firestoreChannel;
Expand All @@ -194,6 +197,9 @@ public void run() {
final ExponentialBackoff backoff;
final CallbackT listener;

/** Whether we should start the gRPC stream with a new underlying connection. */
private boolean useNewConnection = false;

AbstractStream(
FirestoreChannel channel,
MethodDescriptor<ReqT, RespT> methodDescriptor,
Expand Down Expand Up @@ -244,7 +250,12 @@ public void start() {

CloseGuardedRunner closeGuardedRunner = new CloseGuardedRunner(closeCount);
StreamObserver streamObserver = new StreamObserver(closeGuardedRunner);
call = firestoreChannel.runBidiStreamingRpc(methodDescriptor, streamObserver);
if (useNewConnection) {
call = firestoreChannel.runBidiStreamingRpcWithReset(methodDescriptor, streamObserver);
useNewConnection = false;
} else {
call = firestoreChannel.runBidiStreamingRpc(methodDescriptor, streamObserver);
}

state = State.Starting;
}
Expand Down Expand Up @@ -290,18 +301,23 @@ private void close(State finalState, Status status) {
if (code == Code.OK) {
// If this is an intentional close ensure we don't delay our next connection attempt.
backoff.reset();

} else if (code == Code.RESOURCE_EXHAUSTED) {
Logger.debug(
getClass().getSimpleName(),
"(%x) Using maximum backoff delay to prevent overloading the backend.",
System.identityHashCode(this));
backoff.resetToMax();

} else if (code == Code.UNAUTHENTICATED) {
// "unauthenticated" error means the token was rejected. Try force refreshing it in case it
// just expired.
firestoreChannel.invalidateToken();
} else if (code == Code.UNAVAILABLE) {
if (useNewConnection
|| status.getCause() instanceof java.net.ConnectException
|| status.getCause() instanceof java.net.UnknownHostException) {
backoff.setTemporaryMaxDelay(RECONNECT_BACKOFF_MAX_DELAY_MS);
useNewConnection = true;
}
}

if (finalState != State.Error) {
Expand Down Expand Up @@ -375,6 +391,14 @@ private void handleIdleCloseTimer() {
}
}

/** Called by the idle timer when the stream should close due to inactivity. */
void handleConnectionAttemptTimeout() {
useNewConnection = true;
if (this.isOpen()) {
close(State.Error, Status.UNAVAILABLE);
}
}

/** Called when GRPC closes the stream, which should always be due to some error. */
@VisibleForTesting
void handleServerClose(Status status) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class FirestoreChannel {
private final CredentialsProvider credentialsProvider;

/** Manages the gRPC channel and provides all gRPC ClientCalls. */
private final GrpcCallProvider callProvider;
private GrpcCallProvider callProvider;

/** The value to use as resource prefix header. */
private final String resourcePrefixValue;
Expand Down Expand Up @@ -92,6 +92,16 @@ public void shutdown() {
callProvider.shutdown();
}

/**
* Creates and starts a new bi-directional streaming RPC after creating a new connection for the
* channel.
*/
<ReqT, RespT> ClientCall<ReqT, RespT> runBidiStreamingRpcWithReset(
MethodDescriptor<ReqT, RespT> method, IncomingStreamObserver<RespT> observer) {
this.callProvider.markChannelIdle();
return runBidiStreamingRpc(method, observer);
}

/**
* Creates and starts a new bi-directional streaming RPC. The stream cannot accept message before
* the observer's `onOpen()` callback is invoked.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,17 @@ public static void overrideChannelBuilder(
});
}

/**
* Marks the underlying gRPC channel as idle. This allows on-going RPCs to continue, but the next
* RPC on the channel will trigger the creation of a new connection.
*/
void markChannelIdle() {
ManagedChannel channel = this.channelTask.getResult();
if (channel != null) {
this.channelTask.getResult().enterIdle();
}
}

/** Sets up the SSL provider and configures the gRPC channel. */
private ManagedChannel initChannel(Context context, DatabaseInfo databaseInfo) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
import static com.google.firebase.firestore.util.Assert.hardAssert;

import com.google.firebase.firestore.core.OnlineState;
import com.google.firebase.firestore.util.AsyncQueue;
import com.google.firebase.firestore.util.AsyncQueue.DelayedTask;
import com.google.firebase.firestore.util.AsyncQueue.TimerId;
import com.google.firebase.firestore.util.Logger;
import io.grpc.Status;
import java.util.Locale;
Expand All @@ -30,9 +28,9 @@
* heuristics.
*
* <p>In particular, when the client is trying to connect to the backend, we allow up to
* MAX_WATCH_STREAM_FAILURES within ONLINE_STATE_TIMEOUT_MS for a connection to succeed. If we have
* too many failures or the timeout elapses, then we set the OnlineState to OFFLINE, and the client
* will behave as if it is offline (get() calls will return cached data, etc.).
* MAX_WATCH_STREAM_FAILURES within CONNECTIVITY_ATTEMPT_TIMEOUT_MS for a connection to succeed. If
* we have too many failures or the timeout elapses, then we set the OnlineState to OFFLINE, and the
* client will behave as if it is offline (get() calls will return cached data, etc.).
*/
class OnlineStateTracker {

Expand All @@ -53,8 +51,9 @@ interface OnlineStateCallback {

// To deal with stream attempts that don't succeed or fail in a timely manner, we have a
// timeout for OnlineState to reach ONLINE or OFFLINE. If the timeout is reached, we transition
// to OFFLINE rather than waiting indefinitely.
private static final int ONLINE_STATE_TIMEOUT_MS = 10 * 1000;
// to OFFLINE rather than waiting indefinitely. This timeout is also used when attempting to
// establish a connection when in an OFFLINE state.
static final int CONNECTIVITY_ATTEMPT_TIMEOUT_MS = 5 * 1000;

/** The log tag to use for this class. */
private static final String LOG_TAG = "OnlineStateTracker";
Expand All @@ -66,23 +65,19 @@ interface OnlineStateCallback {
// MAX_WATCH_STREAM_FAILURES, we'll revert to OnlineState.OFFLINE.
private int watchStreamFailures;

// A timer that elapses after ONLINE_STATE_TIMEOUT_MS, at which point we transition from
// A timer that elapses after CONNECTIVITY_ATTEMPT_TIMEOUT_MS, at which point we transition from
// OnlineState.UNKNOWN to OFFLINE without waiting for the stream to actually fail
// (MAX_WATCH_STREAM_FAILURES times).
private DelayedTask onlineStateTimer;
private DelayedTask connectivityAttemptTimer;

// Whether the client should log a warning message if it fails to connect to the backend
// (initially true, cleared after a successful stream, or if we've logged the message already).
private boolean shouldWarnClientIsOffline;

// The AsyncQueue to use for running timers (and calling OnlineStateCallback methods).
private final AsyncQueue workerQueue;

// The callback to notify on OnlineState changes.
private final OnlineStateCallback onlineStateCallback;

OnlineStateTracker(AsyncQueue workerQueue, OnlineStateCallback onlineStateCallback) {
this.workerQueue = workerQueue;
OnlineStateTracker(OnlineStateCallback onlineStateCallback) {
this.onlineStateCallback = onlineStateCallback;
state = OnlineState.UNKNOWN;
shouldWarnClientIsOffline = true;
Expand All @@ -92,33 +87,18 @@ interface OnlineStateCallback {
* Called by RemoteStore when a watch stream is started (including on each backoff attempt).
*
* <p>If this is the first attempt, it sets the OnlineState to UNKNOWN and starts the
* onlineStateTimer.
* setConnectivityAttemptTimer.
*/
void handleWatchStreamStart() {
if (watchStreamFailures == 0) {
setAndBroadcastState(OnlineState.UNKNOWN);
void handleWatchStreamConnectionFailed() {
logClientOfflineWarningIfNecessary(
String.format(
Locale.ENGLISH,
"Backend didn't respond within %d seconds\n",
CONNECTIVITY_ATTEMPT_TIMEOUT_MS / 1000));
setAndBroadcastState(OnlineState.OFFLINE);

hardAssert(onlineStateTimer == null, "onlineStateTimer shouldn't be started yet");
onlineStateTimer =
workerQueue.enqueueAfterDelay(
TimerId.ONLINE_STATE_TIMEOUT,
ONLINE_STATE_TIMEOUT_MS,
() -> {
onlineStateTimer = null;
hardAssert(
state == OnlineState.UNKNOWN,
"Timer should be canceled if we transitioned to a different state.");
logClientOfflineWarningIfNecessary(
String.format(
Locale.ENGLISH,
"Backend didn't respond within %d seconds\n",
ONLINE_STATE_TIMEOUT_MS / 1000));
setAndBroadcastState(OnlineState.OFFLINE);

// NOTE: handleWatchStreamFailure() will continue to increment watchStreamFailures
// even though we are already marked OFFLINE but this is non-harmful.
});
}
// NOTE: handleWatchStreamFailure() will continue to increment watchStreamFailures
// even though we are already marked OFFLINE but this is non-harmful.
}

/**
Expand All @@ -135,11 +115,11 @@ void handleWatchStreamFailure(Status status) {
// To get to OnlineState.ONLINE, updateState() must have been called which would have reset
// our heuristics.
hardAssert(this.watchStreamFailures == 0, "watchStreamFailures must be 0");
hardAssert(this.onlineStateTimer == null, "onlineStateTimer must be null");
hardAssert(this.connectivityAttemptTimer == null, "connectivityAttemptTimer must be null");
} else {
watchStreamFailures++;
if (watchStreamFailures >= MAX_WATCH_STREAM_FAILURES) {
clearOnlineStateTimer();
clearConnectivityAttemptTimer();
logClientOfflineWarningIfNecessary(
String.format(
Locale.ENGLISH,
Expand All @@ -158,7 +138,7 @@ void handleWatchStreamFailure(Status status) {
* it must not be used in place of handleWatchStreamStart() and handleWatchStreamFailure().
*/
void updateState(OnlineState newState) {
clearOnlineStateTimer();
clearConnectivityAttemptTimer();
watchStreamFailures = 0;

if (newState == OnlineState.ONLINE) {
Expand Down Expand Up @@ -194,10 +174,21 @@ private void logClientOfflineWarningIfNecessary(String reason) {
}
}

private void clearOnlineStateTimer() {
if (onlineStateTimer != null) {
onlineStateTimer.cancel();
onlineStateTimer = null;
/** Clears the connectivity attempt timer that has been passed in. */
void clearConnectivityAttemptTimer() {
if (connectivityAttemptTimer != null) {
connectivityAttemptTimer.cancel();
connectivityAttemptTimer = null;
}
}

/** Returns the number of times the WatchStream has tried unsuccessfully to start. */
int getWatchStreamFailures() {
return watchStreamFailures;
}

/** Set the connectivity attempt timer to track. */
void setConnectivityAttemptTimer(DelayedTask connectivityAttemptTimer) {
this.connectivityAttemptTimer = connectivityAttemptTimer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.google.firebase.firestore.remote;

import static com.google.firebase.firestore.remote.OnlineStateTracker.CONNECTIVITY_ATTEMPT_TIMEOUT_MS;
import static com.google.firebase.firestore.util.Assert.hardAssert;

import androidx.annotation.Nullable;
Expand All @@ -35,6 +36,8 @@
import com.google.firebase.firestore.remote.WatchChange.WatchTargetChange;
import com.google.firebase.firestore.remote.WatchChange.WatchTargetChangeType;
import com.google.firebase.firestore.util.AsyncQueue;
import com.google.firebase.firestore.util.AsyncQueue.DelayedTask;
import com.google.firebase.firestore.util.AsyncQueue.TimerId;
import com.google.firebase.firestore.util.Logger;
import com.google.firebase.firestore.util.Util;
import com.google.protobuf.ByteString;
Expand Down Expand Up @@ -129,6 +132,8 @@ public interface RemoteStoreCallback {
private final WriteStream writeStream;
@Nullable private WatchChangeAggregator watchChangeAggregator;

private final AsyncQueue workerQueue;

/**
* A list of up to MAX_PENDING_WRITES writes that we have fetched from the LocalStore via
* fillWritePipeline() and have or will send to the write stream.
Expand All @@ -155,12 +160,11 @@ public RemoteStore(
this.localStore = localStore;
this.datastore = datastore;
this.connectivityMonitor = connectivityMonitor;
this.workerQueue = workerQueue;

listenTargets = new HashMap<>();
writePipeline = new ArrayDeque<>();

onlineStateTracker =
new OnlineStateTracker(workerQueue, remoteStoreCallback::handleOnlineStateChange);
onlineStateTracker = new OnlineStateTracker(remoteStoreCallback::handleOnlineStateChange);

// Create new streams (but note they're not started yet).
watchStream =
Expand Down Expand Up @@ -223,6 +227,11 @@ public void onClose(Status status) {
});
}

/** Marks that we should start checking for online state. */
public void attemptReconnect() {
watchStream.handleConnectionAttemptTimeout();
}

/** Re-enables the network. Only to be called as the counterpart to disableNetwork(). */
public void enableNetwork() {
networkEnabled = true;
Expand Down Expand Up @@ -408,7 +417,20 @@ private void startWatchStream() {
watchChangeAggregator = new WatchChangeAggregator(this);
watchStream.start();

onlineStateTracker.handleWatchStreamStart();
if (onlineStateTracker.getWatchStreamFailures() == 0) {
onlineStateTracker.clearConnectivityAttemptTimer();
}
DelayedTask connectivityAttemptTimer =
workerQueue.enqueueAfterDelay(
TimerId.ONLINE_STATE_TIMEOUT,
CONNECTIVITY_ATTEMPT_TIMEOUT_MS,
() -> {
if (canUseNetwork()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, in the case of an online state timeout we performed the following things:

                logClientOfflineWarningIfNecessary(	
                    String.format(	
                        Locale.ENGLISH,	
                        "Backend didn't respond within %d seconds\n",	
                        ONLINE_STATE_TIMEOUT_MS / 1000));	
                setAndBroadcastState(OnlineState.OFFLINE);	

This isn't calling handleWatchStreamFailure so how is the OnlineState supposed to get to OFFLINE after the first timeout?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleWatchStreamFailure is still being called:

  1. attemptReconnect() results in calling AbstractStream.close()
  2. close() calls the onClose() listener passed in, which calls RemoteStore.handleWatchStreamClose()
  3. handleWatchStreamClose() calls handleWatchStreamFailure().

The problem I was running into by leaving a handleWatchStreamFailure() in the timeout logic was that handleWatchStreamFailure() would be called twice, once in the timer timeout, and once in handleWatchStreamClose(). Calling attemptReconnect() would call handleWatchStreamFailure() followed by a call to startWatchStream(). The subsequent call to handleWatchStreamFailure() in the timeout logic would then clear the connectivity timer set by attemptReconnect().

attemptReconnect();
}
onlineStateTracker.handleWatchStreamConnectionFailed();
});
onlineStateTracker.setConnectivityAttemptTimer(connectivityAttemptTimer);
}

private void handleWatchStreamOpen() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public class ExponentialBackoff {
private final TimerId timerId;
private final long initialDelayMs;
private final double backoffFactor;
private final long maxDelayMs;

private long maxDelayMs;
private long currentBaseMs;
private long lastAttemptTime;
private DelayedTask timerTask;
Expand Down Expand Up @@ -103,6 +103,16 @@ public void resetToMax() {
currentBaseMs = maxDelayMs;
}

/**
* Set the backoff's maximum delay for only the next call to backoffAndRun, after which the delay
* will be reset to maxDelayMs.
*
* @param newMax The temporary maximum delay to set.
*/
public void setTemporaryMaxDelay(long newMax) {
maxDelayMs = newMax;
}

/**
* Waits for currentDelayMs, increases the delay and runs the specified task. If there was a
* pending backoff task waiting to run already, it will be canceled.
Expand Down Expand Up @@ -151,6 +161,9 @@ public void backoffAndRun(Runnable task) {
} else if (currentBaseMs > maxDelayMs) {
currentBaseMs = maxDelayMs;
}

// Reset max delay to the default.
maxDelayMs = DEFAULT_BACKOFF_MAX_DELAY_MS;
}

public void cancel() {
Expand Down