Skip to content

Commit 6769a2b

Browse files
authored
fix: update UploadCallable to use createFrom to avoid NPE trying to resolve resulting object (#2086)
* fix: Fix bug where the Storage Object is accessed before WriteChannel Closes
1 parent 45d142a commit 6769a2b

File tree

3 files changed

+86
-22
lines changed

3 files changed

+86
-22
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/ParallelUploadConfig.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ private ParallelUploadConfig(
4747
this.prefix = prefix;
4848
this.bucketName = bucketName;
4949
this.targetOptsPerRequest = targetOptsPerRequest;
50-
this.writeOptsPerRequest = writeOptsPerRequest;
50+
this.writeOptsPerRequest = applySkipIfExists(skipIfExists, writeOptsPerRequest);
5151
}
5252

5353
/** If a corresponding object already exists skip uploading the object */
@@ -115,6 +115,15 @@ public static Builder newBuilder() {
115115
return new Builder();
116116
}
117117

118+
private static List<BlobWriteOption> applySkipIfExists(
119+
boolean skipIfExists, List<BlobWriteOption> writeOptsPerRequest) {
120+
if (skipIfExists) {
121+
return ImmutableList.copyOf(
122+
BlobWriteOption.dedupe(writeOptsPerRequest, BlobWriteOption.doesNotExist()));
123+
}
124+
return writeOptsPerRequest;
125+
}
126+
118127
@BetaApi
119128
public static final class Builder {
120129

google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/UploadCallable.java

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,12 @@
1616

1717
package com.google.cloud.storage.transfermanager;
1818

19-
import com.google.cloud.WriteChannel;
19+
import com.google.cloud.storage.Blob;
2020
import com.google.cloud.storage.BlobInfo;
21-
import com.google.cloud.storage.PackagePrivateMethodWorkarounds;
2221
import com.google.cloud.storage.Storage;
2322
import com.google.cloud.storage.Storage.BlobWriteOption;
24-
import com.google.common.io.ByteStreams;
25-
import java.nio.channels.FileChannel;
23+
import com.google.cloud.storage.StorageException;
2624
import java.nio.file.Path;
27-
import java.nio.file.StandardOpenOption;
28-
import java.util.Optional;
2925
import java.util.concurrent.Callable;
3026

3127
final class UploadCallable implements Callable<UploadResult> {
@@ -61,24 +57,23 @@ public UploadResult call() throws Exception {
6157
}
6258

6359
private UploadResult uploadWithoutChunking() {
64-
long bytesCopied = -1L;
6560
try {
66-
Optional<BlobInfo> newBlob;
67-
try (FileChannel r = FileChannel.open(sourceFile, StandardOpenOption.READ);
68-
WriteChannel w = storage.writer(originalBlob, opts)) {
69-
w.setChunkSize(transferManagerConfig.getPerWorkerBufferSize());
70-
bytesCopied = ByteStreams.copy(r, w);
71-
newBlob = PackagePrivateMethodWorkarounds.maybeGetBlobInfoFunction().apply(w);
72-
}
61+
Blob from = storage.createFrom(originalBlob, sourceFile, opts);
7362
return UploadResult.newBuilder(originalBlob, TransferStatus.SUCCESS)
74-
.setUploadedBlob(newBlob.get())
63+
.setUploadedBlob(from.asBlobInfo())
7564
.build();
76-
} catch (Exception e) {
77-
if (bytesCopied == -1) {
78-
return UploadResult.newBuilder(originalBlob, TransferStatus.FAILED_TO_START)
65+
} catch (StorageException e) {
66+
if (parallelUploadConfig.isSkipIfExists() && e.getCode() == 412) {
67+
return UploadResult.newBuilder(originalBlob, TransferStatus.SKIPPED)
68+
.setException(e)
69+
.build();
70+
} else {
71+
// TODO: check for FAILED_TO_START conditions
72+
return UploadResult.newBuilder(originalBlob, TransferStatus.FAILED_TO_FINISH)
7973
.setException(e)
8074
.build();
8175
}
76+
} catch (Exception e) {
8277
return UploadResult.newBuilder(originalBlob, TransferStatus.FAILED_TO_FINISH)
8378
.setException(e)
8479
.build();

google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITTransferManagerTest.java

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,11 @@ public void uploadFiles() throws Exception {
131131
UploadJob job = transferManager.uploadFiles(files, parallelUploadConfig);
132132
List<UploadResult> uploadResults = job.getUploadResults();
133133
assertThat(uploadResults).hasSize(3);
134+
assertThat(
135+
uploadResults.stream()
136+
.filter(result -> result.getStatus() == TransferStatus.SUCCESS)
137+
.collect(Collectors.toList()))
138+
.hasSize(3);
134139
}
135140
}
136141

@@ -153,6 +158,11 @@ public void uploadFilesWithOpts() throws Exception {
153158
UploadJob job = transferManager.uploadFiles(files, parallelUploadConfig);
154159
List<UploadResult> uploadResults = job.getUploadResults();
155160
assertThat(uploadResults).hasSize(3);
161+
assertThat(
162+
uploadResults.stream()
163+
.filter(result -> result.getStatus() == TransferStatus.SUCCESS)
164+
.collect(Collectors.toList()))
165+
.hasSize(3);
156166
}
157167
}
158168

@@ -178,9 +188,14 @@ public void uploadFilesOneFailure() throws Exception {
178188
assertThat(uploadResults).hasSize(4);
179189
assertThat(
180190
uploadResults.stream()
181-
.filter(x -> x.getStatus() == TransferStatus.FAILED_TO_START)
191+
.filter(x -> x.getStatus() == TransferStatus.FAILED_TO_FINISH)
182192
.collect(Collectors.toList()))
183193
.hasSize(1);
194+
assertThat(
195+
uploadResults.stream()
196+
.filter(result -> result.getStatus() == TransferStatus.SUCCESS)
197+
.collect(Collectors.toList()))
198+
.hasSize(3);
184199
}
185200
}
186201

@@ -196,7 +211,7 @@ public void uploadNonexistentBucket() throws Exception {
196211
ParallelUploadConfig.newBuilder().setBucketName(bucketName).build();
197212
UploadJob job = transferManager.uploadFiles(files, parallelUploadConfig);
198213
List<UploadResult> uploadResults = job.getUploadResults();
199-
assertThat(uploadResults.get(0).getStatus()).isEqualTo(TransferStatus.FAILED_TO_START);
214+
assertThat(uploadResults.get(0).getStatus()).isEqualTo(TransferStatus.FAILED_TO_FINISH);
200215
assertThat(uploadResults.get(0).getException()).isInstanceOf(StorageException.class);
201216
}
202217
}
@@ -212,11 +227,56 @@ public void uploadNonexistentFile() throws Exception {
212227
ParallelUploadConfig.newBuilder().setBucketName(bucketName).build();
213228
UploadJob job = transferManager.uploadFiles(files, parallelUploadConfig);
214229
List<UploadResult> uploadResults = job.getUploadResults();
215-
assertThat(uploadResults.get(0).getStatus()).isEqualTo(TransferStatus.FAILED_TO_START);
230+
assertThat(uploadResults.get(0).getStatus()).isEqualTo(TransferStatus.FAILED_TO_FINISH);
216231
assertThat(uploadResults.get(0).getException()).isInstanceOf(NoSuchFileException.class);
217232
}
218233
}
219234

235+
@Test
236+
public void uploadFailsSkipIfExists() throws Exception {
237+
TransferManagerConfig config =
238+
TransferManagerConfigTestingInstances.defaults(storage.getOptions()).toBuilder().build();
239+
String bucketName = bucket.getName();
240+
try (TransferManager transferManager = config.getService();
241+
TmpFile tmpFile = DataGenerator.base64Characters().tempFile(baseDir, objectContentSize)) {
242+
ParallelUploadConfig parallelUploadConfig =
243+
ParallelUploadConfig.newBuilder().setBucketName(bucketName).setSkipIfExists(true).build();
244+
UploadJob jobInitUpload =
245+
transferManager.uploadFiles(ImmutableList.of(tmpFile.getPath()), parallelUploadConfig);
246+
List<UploadResult> uploadResults = jobInitUpload.getUploadResults();
247+
assertThat(uploadResults.get(0).getStatus()).isEqualTo(TransferStatus.SUCCESS);
248+
UploadJob failedSecondUpload =
249+
transferManager.uploadFiles(ImmutableList.of(tmpFile.getPath()), parallelUploadConfig);
250+
List<UploadResult> failedResult = failedSecondUpload.getUploadResults();
251+
assertThat(failedResult.get(0).getStatus()).isEqualTo(TransferStatus.SKIPPED);
252+
}
253+
}
254+
255+
@Test
256+
public void uploadSkipIfExistsGenerationOverride() throws Exception {
257+
TransferManagerConfig config =
258+
TransferManagerConfigTestingInstances.defaults(storage.getOptions()).toBuilder().build();
259+
String bucketName = bucket.getName();
260+
try (TransferManager transferManager = config.getService();
261+
TmpFile tmpFile = DataGenerator.base64Characters().tempFile(baseDir, objectContentSize)) {
262+
ParallelUploadConfig parallelUploadConfig =
263+
ParallelUploadConfig.newBuilder()
264+
.setBucketName(bucketName)
265+
.setSkipIfExists(true)
266+
.setWriteOptsPerRequest(ImmutableList.of(BlobWriteOption.generationMatch(5L)))
267+
.build();
268+
assertThat(parallelUploadConfig.getWriteOptsPerRequest()).hasSize(1);
269+
UploadJob jobInitUpload =
270+
transferManager.uploadFiles(ImmutableList.of(tmpFile.getPath()), parallelUploadConfig);
271+
List<UploadResult> uploadResults = jobInitUpload.getUploadResults();
272+
assertThat(uploadResults.get(0).getStatus()).isEqualTo(TransferStatus.SUCCESS);
273+
UploadJob failedSecondUpload =
274+
transferManager.uploadFiles(ImmutableList.of(tmpFile.getPath()), parallelUploadConfig);
275+
List<UploadResult> failedResult = failedSecondUpload.getUploadResults();
276+
assertThat(failedResult.get(0).getStatus()).isEqualTo(TransferStatus.SKIPPED);
277+
}
278+
}
279+
220280
@Test
221281
public void downloadBlobs() throws Exception {
222282
TransferManagerConfig config =

0 commit comments

Comments
 (0)