Skip to content

Commit 2966950

Browse files
authored
Merge pull request aws#3616 from aws/dongie/buffer-frominputstream
Buffer if necessary in fromInputStream
2 parents 6127626 + 7ebc71b commit 2966950

File tree

2 files changed

+74
-5
lines changed

2 files changed

+74
-5
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/sync/RequestBody.java

+13-5
Original file line numberDiff line numberDiff line change
@@ -123,20 +123,28 @@ public static RequestBody fromFile(File file) {
123123
* To support resetting via {@link ContentStreamProvider}, this uses {@link InputStream#reset()} and uses a read limit of
124124
* 128 KiB. If you need more control, use {@link #fromContentProvider(ContentStreamProvider, long, String)} or
125125
* {@link #fromContentProvider(ContentStreamProvider, String)}.
126+
* <p>
127+
* <b>Important:</b> If {@code inputStream} does not support mark and reset, the stream will be buffered.
126128
*
127129
* @param inputStream Input stream to send to the service. The stream will not be closed by the SDK.
128130
* @param contentLength Content length of data in input stream.
129131
* @return RequestBody instance.
130132
*/
131133
public static RequestBody fromInputStream(InputStream inputStream, long contentLength) {
134+
// NOTE: does not have an effect if mark not supported
132135
IoUtils.markStreamWithMaxReadLimit(inputStream);
133136
InputStream nonCloseable = nonCloseableInputStream(inputStream);
134-
ContentStreamProvider provider = () -> {
135-
if (nonCloseable.markSupported()) {
137+
ContentStreamProvider provider;
138+
if (nonCloseable.markSupported()) {
139+
// stream supports mark + reset
140+
provider = () -> {
136141
invokeSafely(nonCloseable::reset);
137-
}
138-
return nonCloseable;
139-
};
142+
return nonCloseable;
143+
};
144+
} else {
145+
// stream doesn't support mark + reset, make sure to buffer it
146+
provider = new BufferingContentStreamProvider(() -> nonCloseable, contentLength);
147+
}
140148
return new RequestBody(provider, contentLength, Mimetype.MIMETYPE_OCTET_STREAM);
141149
}
142150

core/sdk-core/src/test/java/software/amazon/awssdk/core/sync/RequestBodyTest.java

+61
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,22 @@
3030
import java.nio.file.FileSystem;
3131
import java.nio.file.Files;
3232
import java.nio.file.Path;
33+
import java.util.Random;
3334
import org.junit.Rule;
3435
import org.junit.Test;
3536
import org.junit.rules.TemporaryFolder;
37+
import org.mockito.Mockito;
38+
import software.amazon.awssdk.checksums.DefaultChecksumAlgorithm;
39+
import software.amazon.awssdk.checksums.SdkChecksum;
40+
import software.amazon.awssdk.core.internal.sync.BufferingContentStreamProvider;
3641
import software.amazon.awssdk.core.internal.util.Mimetype;
42+
import software.amazon.awssdk.utils.BinaryUtils;
3743
import software.amazon.awssdk.utils.IoUtils;
3844
import software.amazon.awssdk.utils.StringInputStream;
3945

4046

4147
public class RequestBodyTest {
48+
private static final SdkChecksum CRC32 = SdkChecksum.forAlgorithm(DefaultChecksumAlgorithm.CRC32);
4249

4350
@Rule
4451
public TemporaryFolder folder = new TemporaryFolder();
@@ -140,4 +147,58 @@ public void remainingByteBufferConstructorOnlyRemainingBytesCopied() throws IOEx
140147
byte[] requestBodyBytes = IoUtils.toByteArray(requestBody.contentStreamProvider().newStream());
141148
assertThat(ByteBuffer.wrap(requestBodyBytes)).isEqualTo(bb);
142149
}
150+
151+
@Test
152+
public void fromInputStream_streamSupportMarkReset_doesNotBuffer() {
153+
byte[] newData = new byte[16536];
154+
new Random().nextBytes(newData);
155+
156+
ByteArrayInputStream stream = new ByteArrayInputStream(newData);
157+
158+
RequestBody requestBody = RequestBody.fromInputStream(stream, newData.length);
159+
assertThat(requestBody.contentStreamProvider()).isNotInstanceOf(BufferingContentStreamProvider.class);
160+
}
161+
162+
@Test
163+
public void fromInputStream_streamDoesNotSupportMarkReset_buffers() {
164+
byte[] newData = new byte[16536];
165+
new Random().nextBytes(newData);
166+
167+
ByteArrayInputStream stream = Mockito.spy(new ByteArrayInputStream(newData));
168+
Mockito.when(stream.markSupported()).thenReturn(false);
169+
170+
RequestBody requestBody = RequestBody.fromInputStream(stream, newData.length);
171+
assertThat(requestBody.contentStreamProvider()).isInstanceOf(BufferingContentStreamProvider.class);
172+
}
173+
174+
@Test
175+
public void fromInputStream_streamSupportsReset_resetsTheStream() {
176+
byte[] newData = new byte[16536];
177+
new Random().nextBytes(newData);
178+
179+
String streamCrc32 = getCrc32(new ByteArrayInputStream(newData));
180+
181+
ByteArrayInputStream stream = new ByteArrayInputStream(newData);
182+
assertThat(stream.markSupported()).isTrue();
183+
RequestBody requestBody = RequestBody.fromInputStream(stream, newData.length);
184+
185+
assertThat(getCrc32(requestBody.contentStreamProvider().newStream())).isEqualTo(streamCrc32);
186+
assertThat(getCrc32(requestBody.contentStreamProvider().newStream())).isEqualTo(streamCrc32);
187+
}
188+
189+
private static String getCrc32(InputStream inputStream) {
190+
byte[] buff = new byte[1024];
191+
int read;
192+
193+
CRC32.reset();
194+
try {
195+
while ((read = inputStream.read(buff)) != -1) {
196+
CRC32.update(buff, 0, read);
197+
}
198+
} catch (IOException e) {
199+
throw new RuntimeException(e);
200+
}
201+
202+
return BinaryUtils.toHex(CRC32.getChecksumBytes());
203+
}
143204
}

0 commit comments

Comments
 (0)