-
Notifications
You must be signed in to change notification settings - Fork 625
Implemented exponential backoff and max retry with resumable uploads #4087
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 15 commits
dd3fba3
08c5714
16cffe8
10308ff
7a9356c
a47f2dc
d24c56d
11f5f54
fdb61c9
ad8203e
6acea85
ea6d56a
c6ac2f1
1384235
0bf2349
047a780
da6b659
1304294
7f8772b
91c5403
ce95d65
277b03e
fc9c9ac
d37c628
7787770
254d8cc
1085b00
97520ea
91a72a5
d6da159
9ca073b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,8 @@ | |
|
||
package com.google.firebase.storage; | ||
|
||
import static com.google.firebase.storage.internal.ExponentialBackoffSender.RND_MAX; | ||
|
||
import android.content.ContentResolver; | ||
import android.content.Context; | ||
import android.net.Uri; | ||
|
@@ -25,10 +27,14 @@ | |
import androidx.annotation.VisibleForTesting; | ||
import com.google.android.gms.common.api.Status; | ||
import com.google.android.gms.common.internal.Preconditions; | ||
import com.google.android.gms.common.util.Clock; | ||
import com.google.android.gms.common.util.DefaultClock; | ||
import com.google.firebase.appcheck.interop.InternalAppCheckTokenProvider; | ||
import com.google.firebase.auth.internal.InternalAuthProvider; | ||
import com.google.firebase.storage.internal.AdaptiveStreamBuffer; | ||
import com.google.firebase.storage.internal.ExponentialBackoffSender; | ||
import com.google.firebase.storage.internal.Sleeper; | ||
import com.google.firebase.storage.internal.SleeperImpl; | ||
import com.google.firebase.storage.internal.Util; | ||
import com.google.firebase.storage.network.NetworkRequest; | ||
import com.google.firebase.storage.network.ResumableUploadByteRequest; | ||
|
@@ -40,6 +46,7 @@ | |
import java.io.FileNotFoundException; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.util.Random; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
import org.json.JSONException; | ||
|
||
|
@@ -72,6 +79,13 @@ public class UploadTask extends StorageTask<UploadTask.TaskSnapshot> { | |
private volatile Exception mServerException = null; | ||
private volatile int mResultCode = 0; | ||
private volatile String mServerStatus; | ||
private volatile long maxSleepTime; | ||
private static final Random random = new Random(); | ||
/*package*/ static Sleeper sleeper = new SleeperImpl(); | ||
/*package*/ static Clock clock = DefaultClock.getInstance(); | ||
private int sleepTime = | ||
0; // TODO(mtewani): Make it so that the send is 0,1,2,4,8,... and start at 0 | ||
private final int sleepInterval = 1000; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So this is essentially a minimum sleep time for retries right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. consider renaming to minimumSleepInterval? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
|
||
UploadTask(StorageReference targetRef, StorageMetadata metadata, byte[] bytes) { | ||
Preconditions.checkNotNull(targetRef); | ||
|
@@ -89,6 +103,7 @@ public class UploadTask extends StorageTask<UploadTask.TaskSnapshot> { | |
new AdaptiveStreamBuffer(new ByteArrayInputStream(bytes), PREFERRED_CHUNK_SIZE); | ||
this.mIsStreamOwned = true; | ||
|
||
this.maxSleepTime = storage.getMaxChunkUploadRetry(); | ||
mSender = | ||
new ExponentialBackoffSender( | ||
storage.getApp().getApplicationContext(), | ||
|
@@ -110,6 +125,7 @@ public class UploadTask extends StorageTask<UploadTask.TaskSnapshot> { | |
this.mAppCheckProvider = storage.getAppCheckProvider(); | ||
this.mUri = file; | ||
InputStream inputStream = null; | ||
this.maxSleepTime = storage.getMaxChunkUploadRetry(); | ||
mSender = | ||
new ExponentialBackoffSender( | ||
mStorageRef.getApp().getApplicationContext(), | ||
|
@@ -173,12 +189,13 @@ public class UploadTask extends StorageTask<UploadTask.TaskSnapshot> { | |
this.mStreamBuffer = new AdaptiveStreamBuffer(stream, PREFERRED_CHUNK_SIZE); | ||
this.mIsStreamOwned = false; | ||
this.mUri = null; | ||
this.maxSleepTime = storage.getMaxChunkUploadRetry(); | ||
mSender = | ||
new ExponentialBackoffSender( | ||
mStorageRef.getApp().getApplicationContext(), | ||
mAuthProvider, | ||
mAppCheckProvider, | ||
mStorageRef.getStorage().getMaxUploadRetryTimeMillis()); | ||
storage.getMaxUploadRetryTimeMillis()); | ||
} | ||
|
||
/** @return the target of the upload. */ | ||
|
@@ -321,15 +338,18 @@ private boolean shouldContinue() { | |
} | ||
|
||
boolean inErrorState = mServerException != null || mResultCode < 200 || mResultCode >= 300; | ||
long deadLine = clock.elapsedRealtime() + this.maxSleepTime; | ||
long currentTime = clock.elapsedRealtime() + this.sleepTime; | ||
// we attempt to recover by calling recoverStatus(true) | ||
if (inErrorState && !recoverStatus(true)) { | ||
// we failed to recover. | ||
if (serverStateValid()) { | ||
tryChangeState(INTERNAL_STATE_FAILURE, false); | ||
if (inErrorState) { | ||
if (currentTime > deadLine || !recoverStatus(true)) { | ||
if (serverStateValid()) { | ||
tryChangeState(INTERNAL_STATE_FAILURE, false); | ||
} | ||
return false; | ||
} | ||
return false; | ||
sleepTime = Math.max(sleepTime * 2, sleepTime + (sleepInterval * 2)); | ||
} | ||
|
||
return true; | ||
} | ||
|
||
|
@@ -410,6 +430,24 @@ private boolean recoverStatus(boolean withRetry) { | |
return true; | ||
} | ||
|
||
private boolean delaySend(NetworkRequest request) { | ||
try { | ||
Log.d(TAG, "Waiting " + sleepTime + " milliseconds"); | ||
sleeper.sleep(sleepTime + random.nextInt(RND_MAX)); | ||
} catch (InterruptedException e) { | ||
Log.w(TAG, "thread interrupted during exponential backoff."); | ||
|
||
Thread.currentThread().interrupt(); | ||
mServerException = e; | ||
return false; | ||
} | ||
boolean sendRes = send(request); | ||
if (sendRes) { | ||
sleepTime = 0; | ||
maneesht marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
return sendRes; | ||
} | ||
|
||
private void uploadChunk() { | ||
try { | ||
mStreamBuffer.fill(mCurrentChunkSize); | ||
|
@@ -425,7 +463,7 @@ private void uploadChunk() { | |
bytesToUpload, | ||
mStreamBuffer.isFinished()); | ||
|
||
if (!send(uploadRequest)) { | ||
if (!delaySend(uploadRequest)) { | ||
mCurrentChunkSize = PREFERRED_CHUNK_SIZE; | ||
Log.d(TAG, "Resetting chunk size to " + mCurrentChunkSize); | ||
return; | ||
|
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -32,6 +32,12 @@ | |||
public class TestUtil { | ||||
|
||||
static FirebaseApp createApp() { | ||||
// Many tests require you to call the callback on the same thread that was initially | ||||
// instantiated. | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. comment formatting |
||||
// With the 5 second keepalive, after 5 seconds, the thread will get killed and eventually a new | ||||
// one will be created. | ||||
// Therefore causing many of the tests to fail | ||||
StorageTaskScheduler.setCallbackQueueKeepAlive(90, TimeUnit.SECONDS); | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To add additional context: We call Line 63 in ed10eb5
However, in cases where we have long backoffs, this won't be the case, as the thread pool will terminate any threads that are not executed within the keepalive time, which is 5 seconds. Our alternative is to set the keepAlive, or disable the threads from being killed. A third option would be to remove this verification check. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we know why this check is necessary? I'm not familiar with the tradeoffs of keeping threads around for an extended period of time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think in case the wrong executor gets called by Here's a quick StackOverflow answer that talks about the tradeoffs: |
||||
return FirebaseApp.initializeApp( | ||||
ApplicationProvider.getApplicationContext(), | ||||
new FirebaseOptions.Builder() | ||||
|
@@ -148,6 +154,6 @@ static void await(Task<?> task, int timeout, TimeUnit timeUnit) throws Interrupt | |||
* Tasks to be executed. | ||||
*/ | ||||
static void await(Task<?> task) throws InterruptedException { | ||||
await(task, 3, TimeUnit.SECONDS); | ||||
await(task, 10, TimeUnit.SECONDS); | ||||
maneesht marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
} | ||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -100,6 +100,36 @@ public void smallTextUpload() throws Exception { | |
TestUtil.verifyTaskStateChanges("smallTextUpload", task.getResult().toString()); | ||
} | ||
|
||
/** | ||
* This test will replicate uploadChunk() returning 500's and test to make sure the retries are | ||
* limited and using exponential backoff. If the maxretry limit is not checked, then the await | ||
* task will time out. | ||
* | ||
* @throws Exception | ||
*/ | ||
@Test | ||
public void fileUploadWith500() throws Exception { | ||
|
||
System.out.println("Starting test fileUploadWith500."); | ||
|
||
MockConnectionFactory factory = NetworkLayerMock.ensureNetworkMock("fileUploadWith500", true); | ||
|
||
String filename = TEST_ASSET_ROOT + "image.jpg"; | ||
ClassLoader classLoader = UploadTest.class.getClassLoader(); | ||
InputStream imageStream = classLoader.getResourceAsStream(filename); | ||
Uri sourceFile = Uri.parse("file://" + filename); | ||
|
||
ContentResolver resolver = ApplicationProvider.getApplicationContext().getContentResolver(); | ||
Shadows.shadowOf(resolver).registerInputStream(sourceFile, imageStream); | ||
|
||
Task<StringBuilder> task = TestUploadHelper.fileUpload(sourceFile, "image.jpg"); | ||
|
||
TestUtil.await(task, 2, TimeUnit.MINUTES); | ||
|
||
factory.verifyOldMock(); | ||
TestUtil.verifyTaskStateChanges("fileUploadWith500", task.getResult().toString()); | ||
} | ||
|
||
@Test | ||
public void cantUploadToRoot() throws Exception { | ||
System.out.println("Starting test cantUploadToRoot."); | ||
|
@@ -130,7 +160,7 @@ public void cantUploadToRoot() throws Exception { | |
}); | ||
|
||
// TODO(mrschmidt): Lower the timeout | ||
TestUtil.await(task, 300, TimeUnit.SECONDS); | ||
TestUtil.await(task, 7, TimeUnit.MINUTES); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this seems at odds with the line comment above😅 why is this necessary? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ended up changing this to 1 minute. The 7 was a precaution :-) |
||
|
||
try { | ||
task.getResult(); | ||
|
@@ -466,7 +496,7 @@ public void fileUploadRecovery() throws Exception { | |
|
||
Task<StringBuilder> task = TestUploadHelper.fileUpload(sourceFile, "flubbertest.jpg"); | ||
|
||
TestUtil.await(task, 5, TimeUnit.SECONDS); | ||
TestUtil.await(task, 30, TimeUnit.SECONDS); | ||
|
||
factory.verifyOldMock(); | ||
TestUtil.verifyTaskStateChanges("fileUploadRecovery", task.getResult().toString()); | ||
|
@@ -489,7 +519,7 @@ public void fileUploadNoRecovery() throws Exception { | |
|
||
Task<StringBuilder> task = TestUploadHelper.fileUpload(sourceFile, "flubbertest.jpg"); | ||
|
||
TestUtil.await(task, 5, TimeUnit.SECONDS); | ||
TestUtil.await(task); | ||
|
||
factory.verifyOldMock(); | ||
TestUtil.verifyTaskStateChanges("fileUploadNoRecovery", task.getResult().toString()); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done