Skip to content

Fix Android Connectivity Monitor (v2) #1045

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Jan 8, 2020
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
11ca8a5
did the thing
Oct 23, 2019
448311d
comment fixes
Oct 23, 2019
403f91f
Merge branch 'master' into bc/reconnect
Oct 30, 2019
00568be
resolved comments
Oct 30, 2019
c865827
Merge branch 'bc/reconnect' of github.com:firebase/firebase-android-s…
Oct 30, 2019
d714378
just kidding, had to update more comments and remove unused vars
Oct 30, 2019
55e8f46
fix onlinestatetracker constructor
Oct 30, 2019
d202893
continue, make spec tests pass
Nov 1, 2019
411ed9d
resolve comments: comments, code ordering, rename to connectivity_att…
Nov 5, 2019
1519a40
Merge branch 'master' into bc/reconnect
Nov 5, 2019
5bce0a0
separate online_state_timeout from connectivity_attempt_timeout
Nov 12, 2019
ba515b7
update comments
Nov 12, 2019
110fdc3
Merge branch 'master' into bc/reconnect
Nov 20, 2019
c9cafd5
Merge branch 'master' into bc/reconnect
Dec 3, 2019
b77e02e
update comments
Dec 3, 2019
31d0ad8
working with logging comments for future debugging
Dec 6, 2019
8b2ad17
ready for review
Dec 6, 2019
af9ed48
Merge branch 'master' into bc/reconnect-grpc
Dec 6, 2019
1cca6a4
resolve michael comments with runBidi, has comments
Dec 9, 2019
86b4733
working in grpc exclusively with logs
Dec 10, 2019
873d83d
remove logs
Dec 10, 2019
d321316
remove markChannelIdle()
Dec 10, 2019
a5dba09
change close() from protected to private
Dec 10, 2019
51fac1c
added logging, fixed comments
Dec 10, 2019
595046f
fix backoff maxDelay, add comments, some renaming
Dec 12, 2019
a3fc304
Merge branch 'master' into bc/reconnect-grpc
Jan 8, 2020
36e448f
comment fixes and always clear connectivity timer
Jan 8, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ public void run() {
/** The time a stream stays open after it is marked idle. */
private static final long IDLE_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1);

/** Maximum backoff time when reconnecting. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"when reconnecting" doesn't really do anything to distinguish this from the existing BACKOFF_MAX_DELAY_MS which is also used when reconnecting.

I think the intention is that this max backoff delay is used when we're 100% sure that the connection failed on the client-side and didn't even reach the server. In practice, I think the only case we can be sure of that is DNS failures (see my other comment about ConnectException). If that's the case, then maybe we can call this DNS_FAILURE_BACKOFF_MAX_DELAY_MS or something...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about CLIENT_NETWORK_FAILURE_BACKOFF_MAX_DELAY_MS?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just noticed that BACKOFF should maybe be first (to group this with the other backoff-related constants). So: BACKOFF_CLIENT_NETWORK_FAILURE_MAX_DELAY_MS?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW- I don't feel strongly. Just trying to figure out how to make the name parse more sensibly...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to Michael's suggestion.

private static final long RECONNECT_BACKOFF_MAX_DELAY_MS = TimeUnit.SECONDS.toMillis(10);

@Nullable private DelayedTask idleTimer;

private final FirestoreChannel firestoreChannel;
Expand Down Expand Up @@ -263,8 +266,9 @@ public void start() {
*
* @param finalState the intended state of the stream after closing.
* @param status the status to emit to the listener.
* @param forceNewConnection whether to use a new connection the next time we open the stream
*/
private void close(State finalState, Status status) {
protected void close(State finalState, Status status, boolean forceNewConnection) {
hardAssert(isStarted(), "Only started streams should be closed.");
hardAssert(
finalState == State.Error || status.equals(Status.OK),
Expand All @@ -290,18 +294,26 @@ private void close(State finalState, Status status) {
if (code == Code.OK) {
// If this is an intentional close ensure we don't delay our next connection attempt.
backoff.reset();

} else if (code == Code.RESOURCE_EXHAUSTED) {
Logger.debug(
getClass().getSimpleName(),
"(%x) Using maximum backoff delay to prevent overloading the backend.",
System.identityHashCode(this));
backoff.resetToMax();

} else if (code == Code.UNAUTHENTICATED) {
// "unauthenticated" error means the token was rejected. Try force refreshing it in case it
// just expired.
firestoreChannel.invalidateToken();
} else if (code == Code.UNAVAILABLE) {
// These exceptions are thrown when the gRPC stream is closed with an connection error. For
// these cases, we need to use a new connection for the next connection attempt, which is
// done by marking the underlying channel as idle.
if (forceNewConnection
|| status.getCause() instanceof java.net.ConnectException
|| status.getCause() instanceof java.net.UnknownHostException) {
backoff.setTemporaryMaxDelay(RECONNECT_BACKOFF_MAX_DELAY_MS);
firestoreChannel.markChannelIdle();
}
}

if (finalState != State.Error) {
Expand Down Expand Up @@ -333,6 +345,10 @@ private void close(State finalState, Status status) {
listener.onClose(status);
}

protected void close(State finalState, Status status) {
close(finalState, status, false);
}

/**
* Can be overridden to perform additional cleanup before the stream is closed. Calling
* super.tearDown() is not required.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.google.firebase.firestore.core.DatabaseInfo;
import com.google.firebase.firestore.model.DatabaseId;
import com.google.firebase.firestore.util.AsyncQueue;
import com.google.firebase.firestore.util.AsyncQueue.DelayedTask;
import com.google.firebase.firestore.util.AsyncQueue.TimerId;
import com.google.firebase.firestore.util.Util;
import io.grpc.ClientCall;
import io.grpc.ForwardingClientCall;
Expand Down Expand Up @@ -53,6 +55,11 @@ class FirestoreChannel {
private static final String X_GOOG_API_CLIENT_VALUE =
"gl-java/ fire/" + BuildConfig.VERSION_NAME + " grpc/";

// This timeout is used when attempting to establish a connection. If a connection attempt
// does not succeed in time, we close the stream and restart the connection, rather than having
Copy link
Contributor

@wilhuff wilhuff Dec 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timer callback does not seem to "close the stream" in any way that I can tell. Based on comments in markChannelIdle, the first stream continues? Does this actually close the stream, or is this a side effect?

From what I can tell, the connectivityAttemptTimer merely starts a new clientCall (through a ~recursive invocation of runBidiStreamingRpc). What prevents the observer from getting callbacks from the old stream once that fails ~2 minutes later?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're looking at an old version of the PR? 😬 This code is gone in the latest version, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's the outdated version. The new one can be found here.

// it hang indefinitely.
private static final int CONNECTIVITY_ATTEMPT_TIMEOUT_MS = 15 * 1000;

/** The async worker queue that is used to dispatch events. */
private final AsyncQueue asyncQueue;

Expand Down Expand Up @@ -92,6 +99,15 @@ public void shutdown() {
callProvider.shutdown();
}

/**
* Marks the underlying gRPC channel as idle. This allows on-going RPCs to continue, but the next
* RPC on the channel will trigger the creation of a new connection. This method is primarily used
* to reset the underlying connection.
*/
public void markChannelIdle() {
this.callProvider.markChannelIdle();
}

/**
* Creates and starts a new bi-directional streaming RPC. The stream cannot accept message before
* the observer's `onOpen()` callback is invoked.
Expand Down Expand Up @@ -154,6 +170,18 @@ public void onReady() {
call[0].request(1);
});

DelayedTask connectivityAttemptTimer =
asyncQueue.enqueueAfterDelay(
TimerId.CONNECTIVITY_ATTEMPT_TIMER,
CONNECTIVITY_ATTEMPT_TIMEOUT_MS,
() -> {
// Reset the underlying connection and restart the stream.
callProvider.clearConnectivityTimer();
markChannelIdle();
runBidiStreamingRpc(method, observer);
});
callProvider.setConnectivityAttemptTimer(connectivityAttemptTimer);

return new ForwardingClientCall<ReqT, RespT>() {
@Override
protected ClientCall<ReqT, RespT> delegate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import com.google.android.gms.tasks.Tasks;
import com.google.firebase.firestore.core.DatabaseInfo;
import com.google.firebase.firestore.util.AsyncQueue;
import com.google.firebase.firestore.util.AsyncQueue.DelayedTask;
import com.google.firebase.firestore.util.Executors;
import com.google.firebase.firestore.util.Logger;
import com.google.firebase.firestore.util.Supplier;
import com.google.firestore.v1.FirestoreGrpc;
import io.grpc.CallCredentials;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.MethodDescriptor;
Expand All @@ -50,6 +52,10 @@ public class GrpcCallProvider {

private CallOptions callOptions;

// A timer that elapses after CONNECTIVTY_ATTEMPT_TIMEOUT_MS, at which point we close the
// stream, reset the underlying connection, and try connecting again.
private DelayedTask connectivityAttemptTimer;

/**
* Helper function to globally override the channel that RPCs use. Useful for testing when you
* want to bypass SSL certificate checking.
Expand Down Expand Up @@ -89,6 +95,17 @@ public static void overrideChannelBuilder(
});
}

/**
* Marks the underlying gRPC channel as idle. This allows on-going RPCs to continue, but the next
* RPC on the channel will trigger the creation of a new connection.
*/
void markChannelIdle() {
ManagedChannel channel = this.channelTask.getResult();
if (channel != null) {
this.channelTask.getResult().enterIdle();
}
}

/** Sets up the SSL provider and configures the gRPC channel. */
private ManagedChannel initChannel(Context context, DatabaseInfo databaseInfo) {
try {
Expand Down Expand Up @@ -198,4 +215,48 @@ void shutdown() {
Thread.currentThread().interrupt();
}
}

/**
* Sets the connectivity attempt timer to track. Existing timers must complete or be cancelled
* before a new timer can be set.
*/
void setConnectivityAttemptTimer(DelayedTask connectivityAttemptTimer) {
// If an existing timer is already running, we want to continue using that one.
if (this.connectivityAttemptTimer != null) {
connectivityAttemptTimer.cancel();
} else {
try {
ManagedChannel channel = this.channelTask.getResult();
if (channel != null) {
this.connectivityAttemptTimer = connectivityAttemptTimer;
listenToConnectivityState(channel);
}
} catch (IllegalStateException e) {
// When the ManagedChannel is first initialized, channelTask could still be running.
connectivityAttemptTimer.cancel();
}
}
}

/**
* Cancels the connectivityStateTimer if the new state indicates grpc is online. Otherwise, we
* reset the listener for the next state change.
*/
private void listenToConnectivityState(ManagedChannel channel) {
ConnectivityState newState = channel.getState(false);
// Check that the new state is online, then cancel timer.
if (newState == ConnectivityState.READY) {
clearConnectivityTimer();
} else {
channel.notifyWhenStateChanged(newState, () -> listenToConnectivityState(channel));
}
}

/** Clears the connectivity timer if it exists. */
void clearConnectivityTimer() {
if (connectivityAttemptTimer != null) {
connectivityAttemptTimer.cancel();
connectivityAttemptTimer = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ void handleWatchStreamFailure(Status status) {
logClientOfflineWarningIfNecessary(
String.format(
Locale.ENGLISH,
"Connection failed %d times. Most recent error: %s",
MAX_WATCH_STREAM_FAILURES,
"Backend didn't respond within %d seconds. Most recent error: %s\n",
ONLINE_STATE_TIMEOUT_MS / 1000,
status));
setAndBroadcastState(OnlineState.OFFLINE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,13 @@ public enum TimerId {
* A timer used to retry transactions. Since there can be multiple concurrent transactions,
* multiple of these may be in the queue at a given time.
*/
RETRY_TRANSACTION
RETRY_TRANSACTION,
/**
* A timer used in RemoteStore to monitor when a connection attempt is unsuccessful and retry
* accordingly. While `ONLINE_STATE_TIMEOUT` is used to transition from UNKNOWN to OFFLINE,
* `CONNECTIVITY_ATTEMPT_TIMER` is used for tracking and retrying connectivity attempts.
*/
CONNECTIVITY_ATTEMPT_TIMER
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public class ExponentialBackoff {
private final TimerId timerId;
private final long initialDelayMs;
private final double backoffFactor;
private final long maxDelayMs;

private long maxDelayMs;
private long currentBaseMs;
private long lastAttemptTime;
private DelayedTask timerTask;
Expand Down Expand Up @@ -103,6 +103,16 @@ public void resetToMax() {
currentBaseMs = maxDelayMs;
}

/**
* Set the backoff's maximum delay for only the next call to backoffAndRun, after which the delay
* will be reset to maxDelayMs.
*
* @param newMax The temporary maximum delay to set.
*/
public void setTemporaryMaxDelay(long newMax) {
maxDelayMs = newMax;
}

/**
* Waits for currentDelayMs, increases the delay and runs the specified task. If there was a
* pending backoff task waiting to run already, it will be canceled.
Expand Down Expand Up @@ -151,6 +161,9 @@ public void backoffAndRun(Runnable task) {
} else if (currentBaseMs > maxDelayMs) {
currentBaseMs = maxDelayMs;
}

// Reset max delay to the default.
maxDelayMs = DEFAULT_BACKOFF_MAX_DELAY_MS;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ignored the comments I made on the last version of this PR.

The max delay was configured in the constructor and may not actually be DEFAULT_BACKOFF_MAX_DELAY_MS. You can't read this constant here.

Instead, save two max delay fields in the constructor. One should be final, as maxDelayMs is today and it should be used here as the value to which to reset. The second value should be the one you actually bounce around based on which kind of error is in effect.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done (attempted).

}

public void cancel() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ public void stop() {
open = false;
}

@Override
protected void close(State finalState, Status status, boolean forceNewConnection) {
open = false;
listener.onClose(status);
}

@Override
public boolean isStarted() {
return open;
Expand Down