Skip to content

Commit 028fe92

Browse files
committed
Add download implementation for API requests
1 parent dbbc0a8 commit 028fe92

File tree

54 files changed

+4805
-10
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+4805
-10
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"category": "AWS SDK for Java v2",
3+
"type": "bugfix",
4+
"description": "Fix a bug where events in an event stream were being signed with the request date, and not with the current system time."
5+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,26 @@ static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile(Path pa
118118
return new FileAsyncResponseTransformer<>(path);
119119
}
120120

121+
/**
122+
* Creates an {@link AsyncResponseTransformer} that writes all the content to the given file.
123+
*
124+
* @param path Path to file to write to.
125+
* @param position The value for the data between the current end of the file and the starting position is
126+
* undefined.
127+
* @param isNewFile Whether this is a new file. If this is {@code true} and the file already exists, the
128+
* transformer will complete with an exception.
129+
* @param deleteOnFailure Whether the file on disk should be deleted in the event of a failure when writing the
130+
* stream.
131+
* @param <ResponseT> Pojo Response type.
132+
* @return AsyncResponseTransformer instance.
133+
*/
134+
static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile(Path path,
135+
long position,
136+
boolean isNewFile,
137+
boolean deleteOnFailure) {
138+
return new FileAsyncResponseTransformer<>(path, position, isNewFile, deleteOnFailure);
139+
}
140+
121141
/**
122142
* Creates an {@link AsyncResponseTransformer} that writes all the content to the given file. In the event of an error,
123143
* the SDK will attempt to delete the file (whatever has been written to it so far). If the file already exists, an

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformer.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import software.amazon.awssdk.annotations.SdkInternalApi;
3232
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
3333
import software.amazon.awssdk.core.async.SdkPublisher;
34+
import software.amazon.awssdk.utils.Validate;
3435

3536
/**
3637
* {@link AsyncResponseTransformer} that writes the data to the specified file.
@@ -40,16 +41,29 @@
4041
@SdkInternalApi
4142
public final class FileAsyncResponseTransformer<ResponseT> implements AsyncResponseTransformer<ResponseT, ResponseT> {
4243
private final Path path;
44+
private final long offset;
45+
private final boolean isNewFile;
46+
private final boolean deleteOnFailure;
4347
private volatile AsynchronousFileChannel fileChannel;
4448
private volatile CompletableFuture<Void> cf;
4549
private volatile ResponseT response;
4650

4751
public FileAsyncResponseTransformer(Path path) {
52+
this(path, 0L, true, true);
53+
}
54+
55+
public FileAsyncResponseTransformer(Path path, long offset, boolean isNewFile, boolean deleteOnFailure) {
4856
this.path = path;
57+
this.offset = Validate.isNotNegative(offset, "offset");
58+
this.isNewFile = isNewFile;
59+
this.deleteOnFailure = deleteOnFailure;
4960
}
5061

5162
private AsynchronousFileChannel createChannel(Path path) throws IOException {
52-
return AsynchronousFileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
63+
if (isNewFile) {
64+
return AsynchronousFileChannel.open(path, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
65+
}
66+
return AsynchronousFileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
5367
}
5468

5569
@Override
@@ -72,15 +86,17 @@ public void onResponse(ResponseT response) {
7286
public void onStream(SdkPublisher<ByteBuffer> publisher) {
7387
// onStream may be called multiple times so reset the file channel every time
7488
this.fileChannel = invokeSafely(() -> createChannel(path));
75-
publisher.subscribe(new FileSubscriber(this.fileChannel, path, cf));
89+
publisher.subscribe(new FileSubscriber(offset, this.fileChannel, path, cf));
7690
}
7791

7892
@Override
7993
public void exceptionOccurred(Throwable throwable) {
8094
try {
8195
invokeSafely(fileChannel::close);
8296
} finally {
83-
invokeSafely(() -> Files.deleteIfExists(path));
97+
if (deleteOnFailure) {
98+
invokeSafely(() -> Files.deleteIfExists(path));
99+
}
84100
}
85101
cf.completeExceptionally(throwable);
86102
}
@@ -89,7 +105,7 @@ public void exceptionOccurred(Throwable throwable) {
89105
* {@link Subscriber} implementation that writes chunks to a file.
90106
*/
91107
static class FileSubscriber implements Subscriber<ByteBuffer> {
92-
private final AtomicLong position = new AtomicLong();
108+
private final AtomicLong position;
93109

94110
private final AsynchronousFileChannel fileChannel;
95111
private final Path path;
@@ -99,12 +115,17 @@ static class FileSubscriber implements Subscriber<ByteBuffer> {
99115
private volatile boolean closeOnLastWrite = false;
100116
private Subscription subscription;
101117

102-
FileSubscriber(AsynchronousFileChannel fileChannel, Path path, CompletableFuture<Void> future) {
118+
FileSubscriber(long position, AsynchronousFileChannel fileChannel, Path path, CompletableFuture<Void> future) {
119+
this.position = new AtomicLong(position);
103120
this.fileChannel = fileChannel;
104121
this.path = path;
105122
this.future = future;
106123
}
107124

125+
FileSubscriber(AsynchronousFileChannel fileChannel, Path path, CompletableFuture<Void> future) {
126+
this(0, fileChannel, path, future);
127+
}
128+
108129
@Override
109130
public void onSubscribe(Subscription s) {
110131
if (this.subscription != null) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
/*
2+
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.async;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
20+
import com.google.common.jimfs.Configuration;
21+
import com.google.common.jimfs.Jimfs;
22+
import java.io.BufferedReader;
23+
import java.io.IOException;
24+
import java.io.InputStreamReader;
25+
import java.io.OutputStream;
26+
import java.io.UncheckedIOException;
27+
import java.nio.charset.StandardCharsets;
28+
import java.nio.file.FileAlreadyExistsException;
29+
import java.nio.file.FileSystem;
30+
import java.nio.file.Files;
31+
import java.nio.file.Path;
32+
import java.nio.file.StandardOpenOption;
33+
import java.util.concurrent.CompletableFuture;
34+
import org.junit.After;
35+
import org.junit.Before;
36+
import org.junit.Test;
37+
import org.reactivestreams.Subscriber;
38+
import org.reactivestreams.Subscription;
39+
import software.amazon.awssdk.core.async.AsyncRequestBody;
40+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
41+
import software.amazon.awssdk.core.async.SdkPublisher;
42+
43+
/**
44+
* Tests for {@link FileAsyncResponseTransformer}.
45+
*/
46+
public class FileAsyncResponseTransformerTest {
47+
private FileSystem testFs;
48+
49+
@Before
50+
public void setup() {
51+
testFs = Jimfs.newFileSystem(Configuration.forCurrentPlatform());
52+
}
53+
54+
@After
55+
public void teardown() throws IOException {
56+
testFs.close();
57+
}
58+
59+
@Test
60+
public void defaultCreatesNewWritableFile() {
61+
byte[] content = "test".getBytes(StandardCharsets.UTF_8);
62+
Path testFile = testFs.getPath("testFile");
63+
AsyncResponseTransformer<String, String> transformer = AsyncResponseTransformer.toFile(testFile);
64+
CompletableFuture<String> transformFuture = transformer.prepare();
65+
transformer.onResponse("some response");
66+
transformer.onStream(AsyncRequestBody.fromBytes(content));
67+
transformFuture.join();
68+
69+
assertFileContentsEquals(testFile, "test");
70+
}
71+
72+
@Test
73+
public void honorsPosition() throws IOException {
74+
byte[] content = "test".getBytes(StandardCharsets.UTF_8);
75+
Path testFile = testFs.getPath("testFile");
76+
AsyncResponseTransformer<String, String> transformer = AsyncResponseTransformer.toFile(testFile, content.length, true, true);
77+
CompletableFuture<String> transformFuture = transformer.prepare();
78+
transformer.onResponse("some response");
79+
transformer.onStream(AsyncRequestBody.fromBytes(content));
80+
transformFuture.join();
81+
82+
assertThat(Files.size(testFile)).isEqualTo(content.length * 2);
83+
}
84+
85+
@Test
86+
public void honorsNewFileFlags_False() throws IOException {
87+
Path exists = testFs.getPath("exists");
88+
createFileWithContents(exists, "Hello".getBytes(StandardCharsets.UTF_8));
89+
90+
honorsNewFileFlagTest(exists, 5, false, "Test", "HelloTest");
91+
}
92+
93+
@Test
94+
public void honorsNewFileFlag_True_FileNotExists() {
95+
Path notExists = testFs.getPath("notExists");
96+
honorsNewFileFlagTest(notExists, 0, true, "Test", "Test");
97+
}
98+
99+
@Test
100+
public void honorsNewFileFlag_True_FileExists() throws IOException {
101+
Path exists = testFs.getPath("exists");
102+
createFileWithContents(exists, "Hello".getBytes(StandardCharsets.UTF_8));
103+
assertThatThrownBy(() -> honorsNewFileFlagTest(exists, 5, true, "Test", null))
104+
.hasCauseInstanceOf(FileAlreadyExistsException.class);
105+
}
106+
107+
@Test
108+
public void honorsDeleteOnFailure_True_NoExistingFile() {
109+
Path notExists = testFs.getPath("notExists");
110+
honorsDeleteOnFailureTest(notExists, true, true);
111+
}
112+
113+
@Test
114+
public void honorsDeleteOnFailure_True_ExistingFile() throws IOException {
115+
Path exists = testFs.getPath("exists");
116+
createFileWithContents(exists, "Hello".getBytes(StandardCharsets.UTF_8));
117+
honorsDeleteOnFailureTest(exists, false, true);
118+
}
119+
120+
@Test
121+
public void honorsDeleteOnFailure_False_NonExistingFile() {
122+
Path notExists = testFs.getPath("notExists");
123+
honorsDeleteOnFailureTest(notExists, true, false);
124+
}
125+
126+
@Test
127+
public void honorsDeleteOnFailure_False_ExistingFile() throws IOException {
128+
Path exists = testFs.getPath("exists");
129+
createFileWithContents(exists, "Hello".getBytes(StandardCharsets.UTF_8));
130+
honorsDeleteOnFailureTest(exists, false, false);
131+
}
132+
133+
private void honorsNewFileFlagTest(Path file, long position, boolean isNewFile, String streamContents, String expectedContents) {
134+
AsyncResponseTransformer<String, String> transformer = AsyncResponseTransformer.toFile(file, position, isNewFile, true);
135+
CompletableFuture<String> transformFuture = transformer.prepare();
136+
transformer.onResponse("some response");
137+
transformer.onStream(AsyncRequestBody.fromString(streamContents));
138+
transformFuture.join();
139+
140+
if (expectedContents != null) {
141+
assertFileContentsEquals(file, expectedContents);
142+
}
143+
}
144+
145+
private void honorsDeleteOnFailureTest(Path file, boolean isNewFile, boolean deleteOnFailure) {
146+
AsyncResponseTransformer<String, String> transformer = AsyncResponseTransformer.toFile(file, 0, isNewFile, deleteOnFailure);
147+
CompletableFuture<String> transformFuture = transformer.prepare();
148+
IOException error = new IOException("Something went wrong");
149+
transformer.onResponse("some response");
150+
transformer.onStream(new ErrorPublisher<>(error));
151+
transformer.exceptionOccurred(error);
152+
assertThatThrownBy(transformFuture::join).hasCause(error);
153+
if (deleteOnFailure) {
154+
assertThat(Files.exists(file)).isFalse();
155+
} else {
156+
assertThat(Files.exists(file)).isTrue();
157+
}
158+
}
159+
160+
private static void createFileWithContents(Path file, byte[] contents) throws IOException {
161+
OutputStream os = Files.newOutputStream(file, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
162+
os.write(contents);
163+
os.close();
164+
}
165+
166+
private static void assertFileContentsEquals(Path file, String expected) {
167+
StringBuilder sb = new StringBuilder();
168+
try {
169+
BufferedReader reader = new BufferedReader(new InputStreamReader(Files.newInputStream(file)));
170+
String s;
171+
while ((s = reader.readLine()) != null) {
172+
sb.append(s);
173+
}
174+
} catch (IOException ioe) {
175+
throw new UncheckedIOException(ioe);
176+
}
177+
assertThat(sb.toString()).isEqualTo(expected);
178+
}
179+
180+
private static final class ErrorPublisher<T> implements SdkPublisher<T> {
181+
private final Throwable error;
182+
183+
private ErrorPublisher(Throwable error) {
184+
this.error = error;
185+
}
186+
187+
@Override
188+
public void subscribe(Subscriber<? super T> subscriber) {
189+
subscriber.onSubscribe(new ErrorSubscription(subscriber, error));
190+
}
191+
}
192+
193+
private static final class ErrorSubscription implements Subscription {
194+
private final Subscriber<?> subscriber;
195+
private final Throwable error;
196+
197+
public ErrorSubscription(Subscriber<?> subscriber, Throwable error) {
198+
this.subscriber = subscriber;
199+
this.error = error;
200+
}
201+
202+
@Override
203+
public void request(long l) {
204+
subscriber.onError(error);
205+
}
206+
207+
@Override
208+
public void cancel() {
209+
210+
}
211+
}
212+
}

docs/design/services/s3/transfermanager/prototype.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,16 @@ interface Builder {
221221
*/
222222
Builder maxDownloadBytesPerSecond(Long maxDownloadBytesPerSecond);
223223

224+
/**
225+
* The multipart download configuration.
226+
*/
227+
Builder multipartDownloadConfiguration(MultipartDownloadConfiguration multipartDownloadConfiguration);
228+
229+
/**
230+
* The multipart upload configuration.
231+
*/
232+
Builder multipartUploadConfiguration(MultipartUploadConfiguration multipartUploadConfiguration);
233+
224234
/**
225235
* Add a progress listener to the currently configured list of
226236
* listeners.
@@ -445,11 +455,6 @@ public interface SinglePartDownloadContext {
445455
* The original download request given to the Transfer Manager.
446456
*/
447457
DownloadObjectRequest downloadRequest();
448-
449-
/**
450-
* The request sent to S3 for this object. This is empty if downloading a presigned URL.
451-
*/
452-
GetObjectRequest objectRequest();
453458
}
454459

455460
/**

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
<module>aws-sdk-java</module>
5454
<module>core</module>
5555
<module>services</module>
56+
<module>services-custom/s3-transfermanager</module>
5657
<module>bom</module>
5758
<module>bom-internal</module>
5859
<module>codegen</module>

0 commit comments

Comments
 (0)