Skip to content

Commit ededdd5

Browse files
authored
Add support for etag validation in resumableFileDownload (#6042)
* Add support for etag validation in resumableFileDownload * Adding test cases to make sure download resumes or restarts based on etag * Add changelog * Add null check * Add test coverage * Fix checkstyle * Adding integ test and serialization unit test coverage * removing logging assertions for netty based transfer managers --------- Co-authored-by: Ran Vaknin <[email protected]>
1 parent 8ec4918 commit ededdd5

File tree

8 files changed

+234
-14
lines changed

8 files changed

+234
-14
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "Amazon S3 Transfer Manager",
4+
"contributor": "",
5+
"description": "Add support for etag validation in resumableFileDownload: restart paused downloads when etag does not match"
6+
}

services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerDownloadPauseResumeIntegrationTest.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,15 @@
2121
import static software.amazon.awssdk.transfer.s3.SizeConstant.MB;
2222

2323
import java.io.File;
24+
import java.io.IOException;
2425
import java.nio.charset.StandardCharsets;
2526
import java.nio.file.Files;
2627
import java.nio.file.Path;
2728
import java.time.Duration;
2829
import java.util.Optional;
2930
import org.apache.commons.lang3.RandomStringUtils;
31+
import org.apache.logging.log4j.Level;
32+
import org.assertj.core.api.Assertions;
3033
import org.junit.jupiter.api.AfterAll;
3134
import org.junit.jupiter.api.BeforeAll;
3235
import org.junit.jupiter.params.ParameterizedTest;
@@ -38,7 +41,10 @@
3841
import software.amazon.awssdk.core.waiters.WaiterAcceptor;
3942
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
4043
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
44+
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
45+
import software.amazon.awssdk.testutils.LogCaptor;
4146
import software.amazon.awssdk.testutils.RandomTempFile;
47+
import software.amazon.awssdk.transfer.s3.model.CompletedFileDownload;
4248
import software.amazon.awssdk.transfer.s3.model.DownloadFileRequest;
4349
import software.amazon.awssdk.transfer.s3.model.FileDownload;
4450
import software.amazon.awssdk.transfer.s3.model.ResumableFileDownload;
@@ -70,6 +76,61 @@ public static void cleanup() {
7076
sourceFile.delete();
7177
}
7278

79+
@ParameterizedTest
80+
@MethodSource("transferManagers")
81+
void pauseAndResume_ObjectEtagChange_shouldRestartDownload(S3TransferManager tm) throws IOException {
82+
Path path = RandomTempFile.randomUncreatedFile().toPath();
83+
84+
TestDownloadListener testDownloadListener = new TestDownloadListener();
85+
DownloadFileRequest request = DownloadFileRequest.builder()
86+
.getObjectRequest(b -> b.bucket(BUCKET).key(KEY))
87+
.destination(path)
88+
.addTransferListener(testDownloadListener)
89+
.build();
90+
FileDownload download = tm.downloadFile(request);
91+
waitUntilFirstByteBufferDelivered(download);
92+
93+
ResumableFileDownload resumableFileDownload = download.pause();
94+
log.debug(() -> "Paused: " + resumableFileDownload);
95+
96+
String originalEtag = testDownloadListener.getObjectResponse.eTag();
97+
98+
File newSourceFile = new RandomTempFile(OBJ_SIZE);
99+
PutObjectResponse putResponse = s3.putObject(PutObjectRequest.builder()
100+
.bucket(BUCKET)
101+
.key(KEY)
102+
.build(), newSourceFile.toPath());
103+
104+
String newEtag = putResponse.eTag();
105+
assertThat(newEtag).isNotEqualTo(originalEtag);
106+
107+
boolean isCrtClient = tm.getClass().getName().contains("Crt");
108+
if (isCrtClient) {
109+
try (LogCaptor logCaptor = LogCaptor.create(Level.DEBUG)) {
110+
FileDownload resumedFileDownload = tm.resumeDownloadFile(resumableFileDownload);
111+
CompletedFileDownload completedDownload = resumedFileDownload.completionFuture().join();
112+
113+
assertThat(completedDownload.response().eTag()).isEqualTo(newEtag);
114+
assertThat(testDownloadListener.transferInitiatedCount == 2).isTrue();
115+
116+
Assertions.assertThat(logCaptor.loggedEvents())
117+
.anySatisfy(logEvent -> Assertions.assertThat(logEvent.getMessage().getFormattedMessage())
118+
.contains(String.format("The ETag of the requested object in bucket (%s) with key (%s) "
119+
+ "has changed since the last "
120+
+ "pause. The SDK will download the S3 object from "
121+
+ "the beginning",
122+
BUCKET, KEY)));
123+
}
124+
} else {
125+
// skip the log assertion for Netty because DEBUG level will log the entire byte stream and crash codebuild
126+
FileDownload resumedFileDownload = tm.resumeDownloadFile(resumableFileDownload);
127+
CompletedFileDownload completedDownload = resumedFileDownload.completionFuture().join();
128+
129+
assertThat(completedDownload.response().eTag()).isEqualTo(newEtag);
130+
assertThat(testDownloadListener.transferInitiatedCount == 2).isTrue();
131+
}
132+
}
133+
73134
@ParameterizedTest
74135
@MethodSource("transferManagers")
75136
void pauseAndResume_ObjectNotChanged_shouldResumeDownload(S3TransferManager tm) {
@@ -187,8 +248,14 @@ private static void waitUntilFirstByteBufferDelivered(FileDownload download) {
187248
}
188249

189250
private static final class TestDownloadListener implements TransferListener {
251+
private int transferInitiatedCount = 0;
190252
private GetObjectResponse getObjectResponse;
191253

254+
@Override
255+
public void transferInitiated(Context.TransferInitiated context) {
256+
transferInitiatedCount++;
257+
}
258+
192259
@Override
193260
public void bytesTransferred(Context.BytesTransferred context) {
194261
Optional<SdkResponse> sdkResponse = context.progressSnapshot().sdkResponse();

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/model/DefaultFileDownload.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,18 @@ private ResumableFileDownload doPause() {
6666
completionFuture.cancel(true);
6767

6868
Instant s3objectLastModified = null;
69+
String s3objectEtag = null;
6970
Long totalSizeInBytes = null;
7071
TransferProgressSnapshot snapshot = progress.snapshot();
7172

7273
if (snapshot.sdkResponse().isPresent() && snapshot.sdkResponse().get() instanceof GetObjectResponse) {
7374
GetObjectResponse getObjectResponse = (GetObjectResponse) snapshot.sdkResponse().get();
7475
s3objectLastModified = getObjectResponse.lastModified();
76+
s3objectEtag = getObjectResponse.eTag();
7577
totalSizeInBytes = getObjectResponse.contentLength();
7678
} else if (resumedDownload != null) {
7779
s3objectLastModified = resumedDownload.s3ObjectLastModified().orElse(null);
80+
s3objectEtag = resumedDownload.s3ObjectEtag().orElse(null);
7881
totalSizeInBytes = resumedDownload.totalSizeInBytes().isPresent() ? resumedDownload.totalSizeInBytes().getAsLong()
7982
: null;
8083
}
@@ -87,6 +90,7 @@ private ResumableFileDownload doPause() {
8790
return ResumableFileDownload.builder()
8891
.downloadFileRequest(request)
8992
.s3ObjectLastModified(s3objectLastModified)
93+
.s3ObjectEtag(s3objectEtag)
9094
.fileLastModified(fileLastModified)
9195
.bytesTransferred(length)
9296
.totalSizeInBytes(totalSizeInBytes)

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/serialization/ResumableFileDownloadSerializer.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,11 @@ public static byte[] toJson(ResumableFileDownload download) {
6262
jsonGenerator,
6363
"s3ObjectLastModified");
6464
}
65+
if (download.s3ObjectEtag().isPresent()) {
66+
TransferManagerJsonMarshaller.STRING.marshall(download.s3ObjectEtag().get(),
67+
jsonGenerator,
68+
"s3ObjectEtag");
69+
}
6570
marshallDownloadFileRequest(download.downloadFileRequest(), jsonGenerator);
6671
TransferManagerJsonMarshaller.LIST.marshall(download.completedParts(), jsonGenerator, "completedParts");
6772
jsonGenerator.writeEndObject();
@@ -127,6 +132,8 @@ private static ResumableFileDownload fromNodes(Map<String, JsonNode> downloadNod
127132
(TransferManagerJsonUnmarshaller<Long>) getUnmarshaller(MarshallingType.LONG);
128133
TransferManagerJsonUnmarshaller<Instant> instantUnmarshaller =
129134
(TransferManagerJsonUnmarshaller<Instant>) getUnmarshaller(MarshallingType.INSTANT);
135+
TransferManagerJsonUnmarshaller<String> stringUnmarshaller =
136+
(TransferManagerJsonUnmarshaller<String>) getUnmarshaller(MarshallingType.STRING);
130137

131138
ResumableFileDownload.Builder builder = ResumableFileDownload.builder();
132139
builder.bytesTransferred(longUnmarshaller.unmarshall(downloadNodes.get("bytesTransferred")));
@@ -138,6 +145,10 @@ private static ResumableFileDownload fromNodes(Map<String, JsonNode> downloadNod
138145
if (downloadNodes.get("s3ObjectLastModified") != null) {
139146
builder.s3ObjectLastModified(instantUnmarshaller.unmarshall(downloadNodes.get("s3ObjectLastModified")));
140147
}
148+
149+
if (downloadNodes.get("s3ObjectEtag") != null) {
150+
builder.s3ObjectEtag(stringUnmarshaller.unmarshall(downloadNodes.get("s3ObjectEtag")));
151+
}
141152
builder.downloadFileRequest(parseDownloadFileRequest(downloadNodes.get("downloadFileRequest")));
142153
if (downloadNodes.get("completedParts") != null) {
143154
builder.completedParts(TransferManagerJsonUnmarshaller.LIST_INT.unmarshall(downloadNodes.get("completedParts")));

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/utils/ResumableRequestConverter.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,21 @@ private ResumableRequestConverter() {
6060
GetObjectRequest getObjectRequest = originalDownloadRequest.getObjectRequest();
6161
DownloadFileRequest newDownloadFileRequest;
6262
Instant lastModified = resumableFileDownload.s3ObjectLastModified().orElse(null);
63-
boolean s3ObjectModified = !headObjectResponse.lastModified().equals(lastModified);
63+
String resumableFileDownloadEtag = resumableFileDownload.s3ObjectEtag().orElse(null);
64+
65+
String s3ObjectEtag = headObjectResponse.eTag();
66+
boolean etagModified = resumableFileDownloadEtag != null &&
67+
!resumableFileDownloadEtag.equals(s3ObjectEtag);
68+
6469

70+
boolean s3ObjectModified = !headObjectResponse.lastModified().equals(lastModified);
6571
boolean fileModified = !fileNotModified(resumableFileDownload.bytesTransferred(),
6672
resumableFileDownload.fileLastModified(),
6773
resumableFileDownload.downloadFileRequest().destination());
6874

69-
if (fileModified || s3ObjectModified) {
75+
if (fileModified || s3ObjectModified || etagModified) {
7076
// modification detected: new download request for the whole object from the beginning
71-
logIfNeeded(originalDownloadRequest, getObjectRequest, fileModified, s3ObjectModified);
77+
logIfNeeded(originalDownloadRequest, getObjectRequest, fileModified, s3ObjectModified, etagModified);
7278
newDownloadFileRequest = newDownloadFileRequest(originalDownloadRequest, getObjectRequest, headObjectResponse);
7379

7480
AsyncResponseTransformer<GetObjectResponse, GetObjectResponse> responseTransformer =
@@ -128,7 +134,8 @@ private static AsyncResponseTransformer<GetObjectResponse, GetObjectResponse> fi
128134
private static void logIfNeeded(DownloadFileRequest downloadRequest,
129135
GetObjectRequest getObjectRequest,
130136
boolean fileModified,
131-
boolean s3ObjectModified) {
137+
boolean s3ObjectModified,
138+
boolean s3ObjectEtagModified) {
132139
if (log.logger().isDebugEnabled()) {
133140
if (s3ObjectModified) {
134141
log.debug(() -> String.format("The requested object in bucket (%s) with key (%s) "
@@ -149,6 +156,14 @@ private static void logIfNeeded(DownloadFileRequest downloadRequest,
149156
getObjectRequest.bucket(),
150157
getObjectRequest.key()));
151158
}
159+
if (s3ObjectEtagModified) {
160+
log.debug(() -> String.format("The ETag of the requested object in bucket (%s) with key (%s) "
161+
+ "has changed since the last "
162+
+ "pause. The SDK will download the S3 object from "
163+
+ "the beginning",
164+
getObjectRequest.bucket(), getObjectRequest.key()));
165+
}
166+
152167
}
153168
}
154169

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/model/ResumableFileDownload.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public final class ResumableFileDownload implements ResumableTransfer,
6262
private final DownloadFileRequest downloadFileRequest;
6363
private final long bytesTransferred;
6464
private final Instant s3ObjectLastModified;
65+
private final String s3ObjectEtag;
6566
private final Long totalSizeInBytes;
6667
private final Instant fileLastModified;
6768
private final List<Integer> completedParts;
@@ -71,6 +72,7 @@ private ResumableFileDownload(DefaultBuilder builder) {
7172
this.bytesTransferred = builder.bytesTransferred == null ? 0 : Validate.isNotNegative(builder.bytesTransferred,
7273
"bytesTransferred");
7374
this.s3ObjectLastModified = builder.s3ObjectLastModified;
75+
s3ObjectEtag = builder.s3ObjectEtag;
7476
this.totalSizeInBytes = Validate.isPositiveOrNull(builder.totalSizeInBytes, "totalSizeInBytes");
7577
this.fileLastModified = builder.fileLastModified;
7678
List<Integer> compledPartsList = Validate.getOrDefault(builder.completedParts, Collections::emptyList);
@@ -97,6 +99,9 @@ public boolean equals(Object o) {
9799
if (!Objects.equals(s3ObjectLastModified, that.s3ObjectLastModified)) {
98100
return false;
99101
}
102+
if (!Objects.equals(s3ObjectEtag, that.s3ObjectEtag)) {
103+
return false;
104+
}
100105
if (!Objects.equals(fileLastModified, that.fileLastModified)) {
101106
return false;
102107
}
@@ -111,6 +116,7 @@ public int hashCode() {
111116
int result = downloadFileRequest.hashCode();
112117
result = 31 * result + (int) (bytesTransferred ^ (bytesTransferred >>> 32));
113118
result = 31 * result + (s3ObjectLastModified != null ? s3ObjectLastModified.hashCode() : 0);
119+
result = 31 * result + (s3ObjectEtag != null ? s3ObjectEtag.hashCode() : 0);
114120
result = 31 * result + (fileLastModified != null ? fileLastModified.hashCode() : 0);
115121
result = 31 * result + (totalSizeInBytes != null ? totalSizeInBytes.hashCode() : 0);
116122
result = 31 * result + (completedParts != null ? completedParts.hashCode() : 0);
@@ -143,6 +149,13 @@ public Optional<Instant> s3ObjectLastModified() {
143149
return Optional.ofNullable(s3ObjectLastModified);
144150
}
145151

152+
/**
153+
* Etag of the S3 object since last pause, or {@link Optional#empty()} if unknown
154+
*/
155+
public Optional<String> s3ObjectEtag() {
156+
return Optional.ofNullable(s3ObjectEtag);
157+
}
158+
146159
/**
147160
* Last modified time of the file since last pause
148161
*/
@@ -174,6 +187,7 @@ public String toString() {
174187
.add("bytesTransferred", bytesTransferred)
175188
.add("fileLastModified", fileLastModified)
176189
.add("s3ObjectLastModified", s3ObjectLastModified)
190+
.add("s3ObjectEtag", s3ObjectEtag)
177191
.add("totalSizeInBytes", totalSizeInBytes)
178192
.add("downloadFileRequest", downloadFileRequest)
179193
.add("completedParts", completedParts)
@@ -331,6 +345,14 @@ default ResumableFileDownload.Builder downloadFileRequest(Consumer<DownloadFileR
331345
*/
332346
Builder s3ObjectLastModified(Instant s3ObjectLastModified);
333347

348+
/**
349+
* Sets the Etag of the object
350+
*
351+
* @param s3ObjectEtag the Etag of the object
352+
* @return a reference to this object so that method calls can be chained together.
353+
*/
354+
Builder s3ObjectEtag(String s3ObjectEtag);
355+
334356
/**
335357
* Sets the last modified time of the object
336358
*
@@ -353,6 +375,7 @@ private static final class DefaultBuilder implements Builder {
353375
private DownloadFileRequest downloadFileRequest;
354376
private Long bytesTransferred;
355377
private Instant s3ObjectLastModified;
378+
private String s3ObjectEtag;
356379
private Long totalSizeInBytes;
357380
private Instant fileLastModified;
358381
private List<Integer> completedParts;
@@ -366,6 +389,7 @@ private DefaultBuilder(ResumableFileDownload persistableFileDownload) {
366389
this.totalSizeInBytes = persistableFileDownload.totalSizeInBytes;
367390
this.fileLastModified = persistableFileDownload.fileLastModified;
368391
this.s3ObjectLastModified = persistableFileDownload.s3ObjectLastModified;
392+
this.s3ObjectEtag = persistableFileDownload.s3ObjectEtag;
369393
this.completedParts = persistableFileDownload.completedParts;
370394
}
371395

@@ -393,6 +417,12 @@ public Builder s3ObjectLastModified(Instant s3ObjectLastModified) {
393417
return this;
394418
}
395419

420+
@Override
421+
public Builder s3ObjectEtag(String s3ObjectEtag) {
422+
this.s3ObjectEtag = s3ObjectEtag;
423+
return this;
424+
}
425+
396426
@Override
397427
public Builder fileLastModified(Instant fileLastModified) {
398428
this.fileLastModified = fileLastModified;

0 commit comments

Comments
 (0)