Skip to content

Commit 1fd1420

Browse files
committed
fixup! Add "unsafe" AsyncRequestBody constructors for byte[] and ByteBuffers
1 parent 9b18b74 commit 1fd1420

File tree

5 files changed

+264
-43
lines changed

5 files changed

+264
-43
lines changed

.changes/next-release/feature-AWSSDKforJavav2-5d806ad.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@
22
"type": "feature",
33
"category": "AWS SDK for Java v2",
44
"contributor": "StephenFlavin",
5-
"description": "Add \"unsafe\" AsyncRequestBody constructors for byte arrays and ByteBuffers"
5+
"description": "Add \"unsafe\" and \"fromRemaining\" AsyncRequestBody constructors for byte arrays and ByteBuffers"
66
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java

Lines changed: 94 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,9 @@ static AsyncRequestBody fromBytesUnsafe(byte[] bytes) {
173173
/**
174174
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer}. This will copy the contents of the {@link ByteBuffer} to
175175
* prevent modifications to the provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}.
176-
* <p> The position is set to 0 in the copied {@link ByteBuffer} and the mark if defined is discarded.
176+
* <p>
177+
* <b>NOTE:</b> This method ignores the current read position. Use {@link #fromRemainingByteBuffer(ByteBuffer)} if you need
178+
* it to copy only the remaining readable bytes.
177179
*
178180
* @param byteBuffer ByteBuffer to send to the service.
179181
* @return AsyncRequestBody instance.
@@ -184,11 +186,28 @@ static AsyncRequestBody fromByteBuffer(ByteBuffer byteBuffer) {
184186
return ByteBuffersAsyncRequestBody.of(null, immutableCopy);
185187
}
186188

189+
/**
190+
* Creates an {@link AsyncRequestBody} from the remaining readable bytes from a {@link ByteBuffer}. This will copy the
191+
* remaining contents of the {@link ByteBuffer} to prevent modifications to the provided {@link ByteBuffer} from being
192+
* reflected in the {@link AsyncRequestBody}.
193+
* <p> Unlike {@link #fromByteBuffer(ByteBuffer)}, this method respects the current read position of the buffer and reads
194+
* only the remaining bytes.
195+
*
196+
* @param byteBuffer ByteBuffer to send to the service.
197+
* @return AsyncRequestBody instance.
198+
*/
199+
static AsyncRequestBody fromRemainingByteBuffer(ByteBuffer byteBuffer) {
200+
ByteBuffer immutableCopy = BinaryUtils.immutableCopyOfRemaining(byteBuffer);
201+
return ByteBuffersAsyncRequestBody.of(null, immutableCopy);
202+
}
203+
187204
/**
188205
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} <b>without</b> copying the contents of the
189-
* {@link ByteBuffer}. This introduces concurrency risks, allowing: (1) the caller to modify the {@link ByteBuffer} stored in
190-
* this {@code AsyncRequestBody} implementation AND (2) any users of {@link #fromByteBufferUnsafe(ByteBuffer)} to modify the
191-
* {@link ByteBuffer} passed into this {@code AsyncRequestBody} implementation.
206+
* {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify the {@link ByteBuffer} stored in this
207+
* {@code AsyncRequestBody} implementation.
208+
* <p>
209+
* <b>NOTE:</b> This method ignores the current read position. Use {@link #fromRemainingByteBufferUnsafe(ByteBuffer)} if you
210+
* need it to copy only the remaining readable bytes.
192211
*
193212
* <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffer(ByteBuffer)}} unless you're sure you know the
194213
* risks.
@@ -197,13 +216,34 @@ static AsyncRequestBody fromByteBuffer(ByteBuffer byteBuffer) {
197216
* @return AsyncRequestBody instance.
198217
*/
199218
static AsyncRequestBody fromByteBufferUnsafe(ByteBuffer byteBuffer) {
200-
return ByteBuffersAsyncRequestBody.of(null, byteBuffer);
219+
ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer();
220+
readOnlyBuffer.rewind();
221+
return ByteBuffersAsyncRequestBody.of(null, readOnlyBuffer);
222+
}
223+
224+
/**
225+
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} <b>without</b> copying the contents of the
226+
* {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify the {@link ByteBuffer} stored in this
227+
* {@code AsyncRequestBody} implementation.
228+
* <p>Unlike {@link #fromByteBufferUnsafe(ByteBuffer)}, this method respects the current read position of
229+
* the buffer and reads only the remaining bytes.
230+
*
231+
* <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffer(ByteBuffer)}} unless you're sure you know the
232+
* risks.
233+
*
234+
* @param byteBuffer ByteBuffer to send to the service.
235+
* @return AsyncRequestBody instance.
236+
*/
237+
static AsyncRequestBody fromRemainingByteBufferUnsafe(ByteBuffer byteBuffer) {
238+
return ByteBuffersAsyncRequestBody.of(null, byteBuffer.asReadOnlyBuffer());
201239
}
202240

203241
/**
204242
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array. This will copy the contents of each {@link ByteBuffer}
205243
* to prevent modifications to any provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}.
206-
* <p> The position is set to 0 in each copied {@link ByteBuffer} and the mark if defined is discarded.
244+
* <p>
245+
* <b>NOTE:</b> This method ignores the current read position of each {@link ByteBuffer}. Use
246+
* {@link #fromRemainingByteBuffers(ByteBuffer...)} if you need it to copy only the remaining readable bytes.
207247
*
208248
* @param byteBuffers ByteBuffer array to send to the service.
209249
* @return AsyncRequestBody instance.
@@ -216,11 +256,31 @@ static AsyncRequestBody fromByteBuffers(ByteBuffer... byteBuffers) {
216256
return ByteBuffersAsyncRequestBody.of(null, immutableCopy);
217257
}
218258

259+
/**
260+
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array. This will copy the remaining contents of each
261+
* {@link ByteBuffer} to prevent modifications to any provided {@link ByteBuffer} from being reflected in the
262+
* {@link AsyncRequestBody}.
263+
* <p>Unlike {@link #fromByteBufferUnsafe(ByteBuffer)},
264+
* this method respects the current read position of each buffer and reads only the remaining bytes.
265+
*
266+
* @param byteBuffers ByteBuffer array to send to the service.
267+
* @return AsyncRequestBody instance.
268+
*/
269+
static AsyncRequestBody fromRemainingByteBuffers(ByteBuffer... byteBuffers) {
270+
ByteBuffer[] immutableCopy = Arrays.stream(byteBuffers)
271+
.map(BinaryUtils::immutableCopyOfRemaining)
272+
.peek(ByteBuffer::rewind)
273+
.toArray(ByteBuffer[]::new);
274+
return ByteBuffersAsyncRequestBody.of(null, immutableCopy);
275+
}
276+
219277
/**
220278
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array <b>without</b> copying the contents of each
221-
* {@link ByteBuffer}. This introduces concurrency risks, allowing: (1) the caller to modify any {@link ByteBuffer} stored in
222-
* this {@code AsyncRequestBody} implementation AND (2) any users of {@link #fromByteBufferUnsafe(ByteBuffer)} to modify any
223-
* {@link ByteBuffer} passed into this {@code AsyncRequestBody} implementation.
279+
* {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify any {@link ByteBuffer} stored in this
280+
* {@code AsyncRequestBody} implementation.
281+
* <p>
282+
* <b>NOTE:</b> This method ignores the current read position of each {@link ByteBuffer}. Use
283+
* {@link #fromRemainingByteBuffers(ByteBuffer...)} if you need it to copy only the remaining readable bytes.
224284
*
225285
* <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffers(ByteBuffer...)} unless you're sure you know the
226286
* risks.
@@ -229,7 +289,31 @@ static AsyncRequestBody fromByteBuffers(ByteBuffer... byteBuffers) {
229289
* @return AsyncRequestBody instance.
230290
*/
231291
static AsyncRequestBody fromByteBuffersUnsafe(ByteBuffer... byteBuffers) {
232-
return ByteBuffersAsyncRequestBody.of(null, byteBuffers);
292+
ByteBuffer[] readOnlyBuffers = Arrays.stream(byteBuffers)
293+
.map(ByteBuffer::asReadOnlyBuffer)
294+
.peek(ByteBuffer::rewind)
295+
.toArray(ByteBuffer[]::new);
296+
return ByteBuffersAsyncRequestBody.of(null, readOnlyBuffers);
297+
}
298+
299+
/**
300+
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array <b>without</b> copying the contents of each
301+
* {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify any {@link ByteBuffer} stored in this
302+
* {@code AsyncRequestBody} implementation.
303+
* <p>Unlike {@link #fromByteBuffersUnsafe(ByteBuffer...)},
304+
* this method respects the current read position of each buffer and reads only the remaining bytes.
305+
*
306+
* <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffers(ByteBuffer...)} unless you're sure you know the
307+
* risks.
308+
*
309+
* @param byteBuffers ByteBuffer array to send to the service.
310+
* @return AsyncRequestBody instance.
311+
*/
312+
static AsyncRequestBody fromRemainingByteBuffersUnsafe(ByteBuffer... byteBuffers) {
313+
ByteBuffer[] readOnlyBuffers = Arrays.stream(byteBuffers)
314+
.map(ByteBuffer::asReadOnlyBuffer)
315+
.toArray(ByteBuffer[]::new);
316+
return ByteBuffersAsyncRequestBody.of(null, readOnlyBuffers);
233317
}
234318

235319
/**

core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyTest.java

Lines changed: 101 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -162,15 +162,20 @@ void safeByteBufferBuildersCopyTheProvidedBuffer(Function<ByteBuffer, AsyncReque
162162
assertArrayEquals(bytesClone, publishedByteArray);
163163
}
164164

165+
private static Function<ByteBuffer, AsyncRequestBody>[] safeByteBufferBodyBuilders() {
166+
Function<ByteBuffer, AsyncRequestBody> fromByteBuffer = AsyncRequestBody::fromByteBuffer;
167+
Function<ByteBuffer, AsyncRequestBody> fromRemainingByteBuffer = AsyncRequestBody::fromRemainingByteBuffer;
168+
Function<ByteBuffer, AsyncRequestBody> fromByteBuffers = AsyncRequestBody::fromByteBuffers;
169+
Function<ByteBuffer, AsyncRequestBody> fromRemainingByteBuffers = AsyncRequestBody::fromRemainingByteBuffers;
170+
return new Function[] {fromByteBuffer, fromRemainingByteBuffer, fromByteBuffers, fromRemainingByteBuffers};
171+
}
172+
165173
@ParameterizedTest
166-
@MethodSource("safeByteBufferBodyBuilders")
167-
void safeByteBufferBuildersRewindTheInputByteBuffer(Function<ByteBuffer, AsyncRequestBody> bodyBuilder) {
174+
@MethodSource("unsafeByteBufferBodyBuilders")
175+
void unsafeByteBufferBuildersDoNotCopyTheProvidedBuffer(Function<ByteBuffer, AsyncRequestBody> bodyBuilder) {
168176
byte[] bytes = testString.getBytes(StandardCharsets.UTF_8);
169-
ByteBuffer bb = ByteBuffer.wrap(bytes);
170-
bb.position(bytes.length - 1);
171177

172-
173-
AsyncRequestBody asyncRequestBody = bodyBuilder.apply(bb);
178+
AsyncRequestBody asyncRequestBody = bodyBuilder.apply(ByteBuffer.wrap(bytes));
174179

175180
for (int i = 0; i < bytes.length; i++) {
176181
bytes[i] += 1;
@@ -181,57 +186,124 @@ void safeByteBufferBuildersRewindTheInputByteBuffer(Function<ByteBuffer, AsyncRe
181186

182187
asyncRequestBody.subscribe(subscriber);
183188

184-
assertEquals(0, publishedBuffer.get().position());
189+
byte[] publishedByteArray = BinaryUtils.copyAllBytesFrom(publishedBuffer.get());
190+
assertArrayEquals(bytes, publishedByteArray);
185191
}
186192

187-
private static Function<ByteBuffer, AsyncRequestBody>[] safeByteBufferBodyBuilders() {
188-
Function<ByteBuffer, AsyncRequestBody> fromByteBuffer = AsyncRequestBody::fromByteBuffer;
189-
Function<ByteBuffer, AsyncRequestBody> fromByteBuffers = AsyncRequestBody::fromByteBuffers;
190-
return new Function[] {fromByteBuffer, fromByteBuffers};
193+
private static Function<ByteBuffer, AsyncRequestBody>[] unsafeByteBufferBodyBuilders() {
194+
Function<ByteBuffer, AsyncRequestBody> fromByteBuffer = AsyncRequestBody::fromByteBufferUnsafe;
195+
Function<ByteBuffer, AsyncRequestBody> fromRemainingByteBuffer = AsyncRequestBody::fromRemainingByteBufferUnsafe;
196+
Function<ByteBuffer, AsyncRequestBody> fromByteBuffers = AsyncRequestBody::fromByteBuffersUnsafe;
197+
Function<ByteBuffer, AsyncRequestBody> fromRemainingByteBuffers = AsyncRequestBody::fromRemainingByteBuffersUnsafe;
198+
return new Function[] {fromByteBuffer, fromRemainingByteBuffer, fromByteBuffers, fromRemainingByteBuffers};
191199
}
192200

193201
@ParameterizedTest
194-
@MethodSource("unsafeByteBufferBodyBuilders")
195-
void unsafeByteBufferBuildersDoNotCopyTheProvidedBuffer(Function<ByteBuffer, AsyncRequestBody> bodyBuilder) {
202+
@MethodSource("nonRewindingByteBufferBodyBuilders")
203+
void nonRewindingByteBufferBuildersReadFromTheInputBufferPosition(
204+
Function<ByteBuffer, AsyncRequestBody> bodyBuilder) {
196205
byte[] bytes = testString.getBytes(StandardCharsets.UTF_8);
206+
ByteBuffer bb = ByteBuffer.wrap(bytes);
207+
int expectedPosition = bytes.length / 2;
208+
bb.position(expectedPosition);
197209

198-
AsyncRequestBody asyncRequestBody = bodyBuilder.apply(ByteBuffer.wrap(bytes));
210+
AsyncRequestBody asyncRequestBody = bodyBuilder.apply(bb);
199211

200-
for (int i = 0; i < bytes.length; i++) {
201-
bytes[i] += 1;
212+
AtomicReference<ByteBuffer> publishedBuffer = new AtomicReference<>();
213+
Subscriber<ByteBuffer> subscriber = new SimpleSubscriber(publishedBuffer::set);
214+
215+
asyncRequestBody.subscribe(subscriber);
216+
217+
int remaining = bb.remaining();
218+
assertEquals(remaining, publishedBuffer.get().remaining());
219+
for (int i = 0; i < remaining; i++) {
220+
assertEquals(bb.get(), publishedBuffer.get().get());
202221
}
222+
}
223+
224+
private static Function<ByteBuffer, AsyncRequestBody>[] nonRewindingByteBufferBodyBuilders() {
225+
Function<ByteBuffer, AsyncRequestBody> fromRemainingByteBuffer = AsyncRequestBody::fromRemainingByteBuffer;
226+
Function<ByteBuffer, AsyncRequestBody> fromRemainingByteBufferUnsafe = AsyncRequestBody::fromRemainingByteBufferUnsafe;
227+
Function<ByteBuffer, AsyncRequestBody> fromRemainingByteBuffers = AsyncRequestBody::fromRemainingByteBuffers;
228+
Function<ByteBuffer, AsyncRequestBody> fromRemainingByteBuffersUnsafe = AsyncRequestBody::fromRemainingByteBuffersUnsafe;
229+
return new Function[] {fromRemainingByteBuffer, fromRemainingByteBufferUnsafe, fromRemainingByteBuffers,
230+
fromRemainingByteBuffersUnsafe};
231+
}
232+
233+
@ParameterizedTest
234+
@MethodSource("safeNonRewindingByteBufferBodyBuilders")
235+
void safeNonRewindingByteBufferBuildersCopyFromTheInputBufferPosition(
236+
Function<ByteBuffer, AsyncRequestBody> bodyBuilder) {
237+
byte[] bytes = testString.getBytes(StandardCharsets.UTF_8);
238+
ByteBuffer bb = ByteBuffer.wrap(bytes);
239+
int expectedPosition = bytes.length / 2;
240+
bb.position(expectedPosition);
241+
242+
AsyncRequestBody asyncRequestBody = bodyBuilder.apply(bb);
203243

204244
AtomicReference<ByteBuffer> publishedBuffer = new AtomicReference<>();
205245
Subscriber<ByteBuffer> subscriber = new SimpleSubscriber(publishedBuffer::set);
206246

207247
asyncRequestBody.subscribe(subscriber);
208248

209-
byte[] publishedByteArray = BinaryUtils.copyAllBytesFrom(publishedBuffer.get());
210-
assertArrayEquals(bytes, publishedByteArray);
249+
int remaining = bb.remaining();
250+
assertEquals(remaining, publishedBuffer.get().capacity());
251+
for (int i = 0; i < remaining; i++) {
252+
assertEquals(bb.get(), publishedBuffer.get().get());
253+
}
254+
}
255+
256+
private static Function<ByteBuffer, AsyncRequestBody>[] safeNonRewindingByteBufferBodyBuilders() {
257+
Function<ByteBuffer, AsyncRequestBody> fromRemainingByteBuffer = AsyncRequestBody::fromRemainingByteBuffer;
258+
Function<ByteBuffer, AsyncRequestBody> fromRemainingByteBuffers = AsyncRequestBody::fromRemainingByteBuffers;
259+
return new Function[] {fromRemainingByteBuffer, fromRemainingByteBuffers};
211260
}
212261

213262
@ParameterizedTest
214-
@MethodSource("unsafeByteBufferBodyBuilders")
215-
void safeByteBufferBuildersDoNotRewindTheInputByteBuffer(Function<ByteBuffer, AsyncRequestBody> bodyBuilder) {
263+
@MethodSource("rewindingByteBufferBodyBuilders")
264+
void rewindingByteBufferBuildersDoNotRewindTheInputBuffer(Function<ByteBuffer, AsyncRequestBody> bodyBuilder) {
265+
byte[] bytes = testString.getBytes(StandardCharsets.UTF_8);
266+
ByteBuffer bb = ByteBuffer.wrap(bytes);
267+
int expectedPosition = bytes.length / 2;
268+
bb.position(expectedPosition);
269+
270+
AsyncRequestBody asyncRequestBody = bodyBuilder.apply(bb);
271+
272+
Subscriber<ByteBuffer> subscriber = new SimpleSubscriber(buffer -> {
273+
});
274+
275+
asyncRequestBody.subscribe(subscriber);
276+
277+
assertEquals(expectedPosition, bb.position());
278+
}
279+
280+
@ParameterizedTest
281+
@MethodSource("rewindingByteBufferBodyBuilders")
282+
void rewindingByteBufferBuildersReadTheInputBufferFromTheBeginning(
283+
Function<ByteBuffer, AsyncRequestBody> bodyBuilder) {
216284
byte[] bytes = testString.getBytes(StandardCharsets.UTF_8);
217-
ByteBuffer wrap = ByteBuffer.wrap(bytes);
218-
int expectedPosition = bytes.length - 1;
219-
wrap.position(expectedPosition);
285+
ByteBuffer bb = ByteBuffer.wrap(bytes);
286+
bb.position(bytes.length / 2);
220287

221-
AsyncRequestBody asyncRequestBody = bodyBuilder.apply(wrap);
288+
AsyncRequestBody asyncRequestBody = bodyBuilder.apply(bb);
222289

223290
AtomicReference<ByteBuffer> publishedBuffer = new AtomicReference<>();
224291
Subscriber<ByteBuffer> subscriber = new SimpleSubscriber(publishedBuffer::set);
225292

226293
asyncRequestBody.subscribe(subscriber);
227294

228-
assertEquals(expectedPosition, publishedBuffer.get().position());
295+
assertEquals(0, publishedBuffer.get().position());
296+
publishedBuffer.get().rewind();
297+
bb.rewind();
298+
assertEquals(bb, publishedBuffer.get());
229299
}
230300

231-
private static Function<ByteBuffer, AsyncRequestBody>[] unsafeByteBufferBodyBuilders() {
232-
Function<ByteBuffer, AsyncRequestBody> fromByteBuffer = AsyncRequestBody::fromByteBufferUnsafe;
233-
Function<ByteBuffer, AsyncRequestBody> fromByteBuffers = AsyncRequestBody::fromByteBuffersUnsafe;
234-
return new Function[] {fromByteBuffer, fromByteBuffers};
301+
private static Function<ByteBuffer, AsyncRequestBody>[] rewindingByteBufferBodyBuilders() {
302+
Function<ByteBuffer, AsyncRequestBody> fromByteBuffer = AsyncRequestBody::fromByteBuffer;
303+
Function<ByteBuffer, AsyncRequestBody> fromByteBufferUnsafe = AsyncRequestBody::fromByteBufferUnsafe;
304+
Function<ByteBuffer, AsyncRequestBody> fromByteBuffers = AsyncRequestBody::fromByteBuffers;
305+
Function<ByteBuffer, AsyncRequestBody> fromByteBuffersUnsafe = AsyncRequestBody::fromByteBuffersUnsafe;
306+
return new Function[] {fromByteBuffer, fromByteBufferUnsafe, fromByteBuffers, fromByteBuffersUnsafe};
235307
}
236308

237309
@ParameterizedTest

0 commit comments

Comments
 (0)