Skip to content

Implemented exponential backoff and max retry with resumable uploads #4087

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 31 commits into from
Sep 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
dd3fba3
Implemented exponential backoff and max retry with resumable uploads
maneesht Sep 14, 2022
08c5714
Merge remote-tracking branch 'origin/master' into mtewani/exponential…
maneesht Sep 14, 2022
16cffe8
Added test for exponential backoff/retry
maneesht Sep 21, 2022
10308ff
Updated formatting
maneesht Sep 21, 2022
7a9356c
Updated timeout and allowed for configurability
maneesht Sep 21, 2022
a47f2dc
Moved backoff to uploadChunks
maneesht Sep 21, 2022
d24c56d
Removed delaySend from public api
maneesht Sep 21, 2022
11f5f54
Fixed formatting
maneesht Sep 21, 2022
fdb61c9
Refactored to use existing fileupload
maneesht Sep 21, 2022
ad8203e
Merge branch 'master' into mtewani/exponentialbackoff-resumableupload
maneesht Sep 21, 2022
6acea85
Updated tests
maneesht Sep 22, 2022
ea6d56a
Updated tests
maneesht Sep 22, 2022
c6ac2f1
Removed print
maneesht Sep 22, 2022
1384235
Updated changelog
maneesht Sep 22, 2022
0bf2349
Updated formatting and decreased the keepalive timer
maneesht Sep 22, 2022
047a780
Addressed comments
maneesht Sep 22, 2022
da6b659
Addressed comments
maneesht Sep 22, 2022
1304294
Removed unnecessary line
maneesht Sep 22, 2022
7f8772b
Addressed comment
maneesht Sep 22, 2022
91c5403
Replaced retryrule temporarily
maneesht Sep 22, 2022
ce95d65
Increase timeout for fileUploadRecovery
maneesht Sep 22, 2022
277b03e
Increase timeout for fileUploadRecovery
maneesht Sep 22, 2022
fc9c9ac
Updated with logging info
maneesht Sep 23, 2022
d37c628
Removed logging
maneesht Sep 23, 2022
7787770
Addressed comments
maneesht Sep 23, 2022
254d8cc
Updated text files properly
maneesht Sep 23, 2022
1085b00
Updated timeout for recovery test
maneesht Sep 23, 2022
97520ea
Reduced retry count again
maneesht Sep 23, 2022
91a72a5
Updated to increase timeout
maneesht Sep 23, 2022
d6da159
Updated uploadtask
maneesht Sep 26, 2022
9ca073b
Removed unnecessary logs
maneesht Sep 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions firebase-storage/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# 20.0.3
- [fixed] Fixed an issue that caused infinite number of retries with no exponential
backoff for `uploadChunk`

# 19.2.2
- [fixed] Fixed an issue that caused the SDK to report incorrect values for
"getTotalByteCount()" after a download was paused and resumed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class FirebaseStorage {
@Nullable private final Provider<InternalAppCheckTokenProvider> mAppCheckProvider;
@Nullable private final String mBucketName;
private long sMaxUploadRetry = 10 * DateUtils.MINUTE_IN_MILLIS; // 10 * 60 * 1000
private long sMaxChunkUploadRetry = DateUtils.MINUTE_IN_MILLIS; // 60 * 1000
private long sMaxDownloadRetry = 10 * DateUtils.MINUTE_IN_MILLIS; // 10 * 60 * 1000
private long sMaxQueryRetry = 2 * DateUtils.MINUTE_IN_MILLIS; // 2 * 60 * 1000

Expand Down Expand Up @@ -226,6 +227,25 @@ public void setMaxUploadRetryTimeMillis(long maxTransferRetryMillis) {
sMaxUploadRetry = maxTransferRetryMillis;
}

/**
* Returns the maximum time to retry sending a chunk if a failure occurs
*
* @return maximum time in milliseconds. Defaults to 1 minute.
*/
public long getMaxChunkUploadRetry() {
return sMaxChunkUploadRetry;
}

/**
* Sets the maximum time to retry sending a chunk if a failure occurs
*
* @param maxChunkRetryMillis the maximum time in milliseconds. Defaults to 1 minute (60,000
* milliseconds).
*/
public void setMaxChunkUploadRetry(long maxChunkRetryMillis) {
sMaxChunkUploadRetry = maxChunkRetryMillis;
}

/**
* Returns the maximum time to retry operations other than upload and download if a failure
* occurs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ public class StorageTaskScheduler {
CALLBACK_QUEUE_EXECUTOR.allowCoreThreadTimeOut(true);
}

public static void setCallbackQueueKeepAlive(long keepAliveTime, TimeUnit timeUnit) {
CALLBACK_QUEUE_EXECUTOR.setKeepAliveTime(keepAliveTime, timeUnit);
}

public static StorageTaskScheduler getInstance() {
return sInstance;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package com.google.firebase.storage;

import static com.google.firebase.storage.internal.ExponentialBackoffSender.RND_MAX;

import android.content.ContentResolver;
import android.content.Context;
import android.net.Uri;
Expand All @@ -25,10 +27,14 @@
import androidx.annotation.VisibleForTesting;
import com.google.android.gms.common.api.Status;
import com.google.android.gms.common.internal.Preconditions;
import com.google.android.gms.common.util.Clock;
import com.google.android.gms.common.util.DefaultClock;
import com.google.firebase.appcheck.interop.InternalAppCheckTokenProvider;
import com.google.firebase.auth.internal.InternalAuthProvider;
import com.google.firebase.storage.internal.AdaptiveStreamBuffer;
import com.google.firebase.storage.internal.ExponentialBackoffSender;
import com.google.firebase.storage.internal.Sleeper;
import com.google.firebase.storage.internal.SleeperImpl;
import com.google.firebase.storage.internal.Util;
import com.google.firebase.storage.network.NetworkRequest;
import com.google.firebase.storage.network.ResumableUploadByteRequest;
Expand All @@ -40,6 +46,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import org.json.JSONException;

Expand Down Expand Up @@ -72,6 +79,12 @@ public class UploadTask extends StorageTask<UploadTask.TaskSnapshot> {
private volatile Exception mServerException = null;
private volatile int mResultCode = 0;
private volatile String mServerStatus;
private volatile long maxSleepTime;
private static final Random random = new Random();
/*package*/ static Sleeper sleeper = new SleeperImpl();
/*package*/ static Clock clock = DefaultClock.getInstance();
private int sleepTime = 0;
private final int minimumSleepInterval = 1000;

UploadTask(StorageReference targetRef, StorageMetadata metadata, byte[] bytes) {
Preconditions.checkNotNull(targetRef);
Expand All @@ -89,6 +102,7 @@ public class UploadTask extends StorageTask<UploadTask.TaskSnapshot> {
new AdaptiveStreamBuffer(new ByteArrayInputStream(bytes), PREFERRED_CHUNK_SIZE);
this.mIsStreamOwned = true;

this.maxSleepTime = storage.getMaxChunkUploadRetry();
mSender =
new ExponentialBackoffSender(
storage.getApp().getApplicationContext(),
Expand All @@ -110,6 +124,7 @@ public class UploadTask extends StorageTask<UploadTask.TaskSnapshot> {
this.mAppCheckProvider = storage.getAppCheckProvider();
this.mUri = file;
InputStream inputStream = null;
this.maxSleepTime = storage.getMaxChunkUploadRetry();
mSender =
new ExponentialBackoffSender(
mStorageRef.getApp().getApplicationContext(),
Expand Down Expand Up @@ -173,12 +188,13 @@ public class UploadTask extends StorageTask<UploadTask.TaskSnapshot> {
this.mStreamBuffer = new AdaptiveStreamBuffer(stream, PREFERRED_CHUNK_SIZE);
this.mIsStreamOwned = false;
this.mUri = null;
this.maxSleepTime = storage.getMaxChunkUploadRetry();
mSender =
new ExponentialBackoffSender(
mStorageRef.getApp().getApplicationContext(),
mAuthProvider,
mAppCheckProvider,
mStorageRef.getStorage().getMaxUploadRetryTimeMillis());
storage.getMaxUploadRetryTimeMillis());
}

/** @return the target of the upload. */
Expand Down Expand Up @@ -321,15 +337,18 @@ private boolean shouldContinue() {
}

boolean inErrorState = mServerException != null || mResultCode < 200 || mResultCode >= 300;
long deadLine = clock.elapsedRealtime() + this.maxSleepTime;
long currentTime = clock.elapsedRealtime() + this.sleepTime;
// we attempt to recover by calling recoverStatus(true)
if (inErrorState && !recoverStatus(true)) {
// we failed to recover.
if (serverStateValid()) {
tryChangeState(INTERNAL_STATE_FAILURE, false);
if (inErrorState) {
if (currentTime > deadLine || !recoverStatus(true)) {
if (serverStateValid()) {
tryChangeState(INTERNAL_STATE_FAILURE, false);
}
return false;
}
return false;
sleepTime = Math.max(sleepTime * 2, minimumSleepInterval);
}

return true;
}

Expand Down Expand Up @@ -410,6 +429,36 @@ private boolean recoverStatus(boolean withRetry) {
return true;
}

/**
* Send with a delay that uses sleepTime to delay sending a request to the server. Will reset
* sleepTime upon send success. TODO: Create an exponential backoff helper to consolidate code
* here and in ExponentialBackoffSender.java
*
* @param request to send
* @return whether the delay and send were successful
*/
private boolean delaySend(NetworkRequest request) {
try {
Log.d(TAG, "Waiting " + sleepTime + " milliseconds");
sleeper.sleep(sleepTime + random.nextInt(RND_MAX));
} catch (InterruptedException e) {
Log.w(TAG, "thread interrupted during exponential backoff.");

Thread.currentThread().interrupt();
mServerException = e;
return false;
}
boolean sendRes = send(request);
// We reset the sleepTime if the send was successful. For example,
// uploadChunk(request) // false, then sleepTime becomes 1000
// uploadChunk(request) // false, then sleepTime becomes 2000
// uploadChunk(request) // true, then sleepTime becomes 0 again
if (sendRes) {
sleepTime = 0;
}
return sendRes;
}

private void uploadChunk() {
try {
mStreamBuffer.fill(mCurrentChunkSize);
Expand All @@ -425,7 +474,7 @@ private void uploadChunk() {
bytesToUpload,
mStreamBuffer.isFinished());

if (!send(uploadRequest)) {
if (!delaySend(uploadRequest)) {
mCurrentChunkSize = PREFERRED_CHUNK_SIZE;
Log.d(TAG, "Resetting chunk size to " + mCurrentChunkSize);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public static Uri getBaseUrl(@Nullable EmulatedServiceSettings emulatorSettings)
return Uri.parse(
"http://" + emulatorSettings.getHost() + ":" + emulatorSettings.getPort() + "/v0");
} else {
return Uri.parse("https://firebasestorage.googleapis.com/v0");
return PROD_BASE_URL;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@
public class TestUtil {

static FirebaseApp createApp() {
/**
* Many tests require you to call the callback on the same thread that was initially
* instantiated. With the 5 second keepalive, after 5 seconds, the thread will get killed and
* eventually a new one will be created. Therefore causing many of the tests to fail.
*/
StorageTaskScheduler.setCallbackQueueKeepAlive(90, TimeUnit.SECONDS);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To add additional context:

We call attachListeners https://github.com/firebase/firebase-android-sdk/blob/ed10eb5ebe998dc23ead46419a2b4dcbb5e2482c/firebase-storage/src/testUtil/java/com/google/firebase/storage/TestUploadHelper.java#L49 to set up the listeners and then those listeners call ControllableSchedulerHelper to verify the callback thread that it initially was called with is the same one that is called in subsequent events


However, in cases where we have long backoffs, this won't be the case, as the thread pool will terminate any threads that are not executed within the keepalive time, which is 5 seconds.
Our alternative is to set the keepAlive, or disable the threads from being killed. A third option would be to remove this verification check.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know why this check is necessary? I'm not familiar with the tradeoffs of keeping threads around for an extended period of time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in case the wrong executor gets called by UploadTask.

Here's a quick StackOverflow answer that talks about the tradeoffs:
https://stackoverflow.com/a/18225863

return FirebaseApp.initializeApp(
ApplicationProvider.getApplicationContext(),
new FirebaseOptions.Builder()
Expand Down Expand Up @@ -144,10 +150,10 @@ static void await(Task<?> task, int timeout, TimeUnit timeUnit) throws Interrupt
}

/**
* Awaits for a Task for 3 seconds, but flushes the Robolectric scheduler to allow newly added
* Awaits for a Task for 10 seconds, but flushes the Robolectric scheduler to allow newly added
* Tasks to be executed.
*/
static void await(Task<?> task) throws InterruptedException {
await(task, 3, TimeUnit.SECONDS);
await(task, 10, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,36 @@ public void smallTextUpload() throws Exception {
TestUtil.verifyTaskStateChanges("smallTextUpload", task.getResult().toString());
}

/**
* This test will replicate uploadChunk() returning 500's and test to make sure the retries are
* limited and using exponential backoff. If the maxretry limit is not checked, then the await
* task will time out.
*
* @throws Exception
*/
@Test
public void fileUploadWith500() throws Exception {

System.out.println("Starting test fileUploadWith500.");

MockConnectionFactory factory = NetworkLayerMock.ensureNetworkMock("fileUploadWith500", true);

String filename = TEST_ASSET_ROOT + "image.jpg";
ClassLoader classLoader = UploadTest.class.getClassLoader();
InputStream imageStream = classLoader.getResourceAsStream(filename);
Uri sourceFile = Uri.parse("file://" + filename);

ContentResolver resolver = ApplicationProvider.getApplicationContext().getContentResolver();
Shadows.shadowOf(resolver).registerInputStream(sourceFile, imageStream);

Task<StringBuilder> task = TestUploadHelper.fileUpload(sourceFile, "image.jpg");

TestUtil.await(task, 3, TimeUnit.MINUTES);

factory.verifyOldMock();
TestUtil.verifyTaskStateChanges("fileUploadWith500", task.getResult().toString());
}

@Test
public void cantUploadToRoot() throws Exception {
System.out.println("Starting test cantUploadToRoot.");
Expand Down Expand Up @@ -130,12 +160,13 @@ public void cantUploadToRoot() throws Exception {
});

// TODO(mrschmidt): Lower the timeout
TestUtil.await(task, 300, TimeUnit.SECONDS);
TestUtil.await(task, 1, TimeUnit.MINUTES);

try {
task.getResult();
Assert.fail();
} catch (RuntimeExecutionException e) {
// Note: This test can be flaky due to the fact that the second .getCause() may be null.
Assert.assertEquals(taskException.get().getCause(), e.getCause().getCause());
}

Expand Down Expand Up @@ -263,7 +294,7 @@ public void cancelledUpload() throws Exception {
Task<StringBuilder> task = TestUploadHelper.byteUploadCancel();

// TODO(mrschmidt): Lower the timeout
TestUtil.await(task, 500, TimeUnit.SECONDS);
TestUtil.await(task, 1000, TimeUnit.SECONDS);

factory.verifyOldMock();
TestUtil.verifyTaskStateChanges("cancelledUpload", task.getResult().toString());
Expand Down Expand Up @@ -466,7 +497,7 @@ public void fileUploadRecovery() throws Exception {

Task<StringBuilder> task = TestUploadHelper.fileUpload(sourceFile, "flubbertest.jpg");

TestUtil.await(task, 5, TimeUnit.SECONDS);
TestUtil.await(task, 4, TimeUnit.MINUTES);

factory.verifyOldMock();
TestUtil.verifyTaskStateChanges("fileUploadRecovery", task.getResult().toString());
Expand All @@ -489,7 +520,7 @@ public void fileUploadNoRecovery() throws Exception {

Task<StringBuilder> task = TestUploadHelper.fileUpload(sourceFile, "flubbertest.jpg");

TestUtil.await(task, 5, TimeUnit.SECONDS);
TestUtil.await(task);

factory.verifyOldMock();
TestUtil.verifyTaskStateChanges("fileUploadNoRecovery", task.getResult().toString());
Expand Down
Loading