Skip to content

Commit 911e9ef

Browse files
authored
Fix fd remaining open longer than necessary (#2616)
In FileAsyncRequestBody, the file was not being closed until a request() read past the end of the file without ready any bytes; i.e. if a read() call reads up to the end of the file, it would not complete the subscription on the subsequent request() signal. This fixes this issue by signaling complete if we know we've reached the end.
1 parent 2a663fc commit 911e9ef

File tree

3 files changed

+129
-6
lines changed

3 files changed

+129
-6
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "Fix issue in `FileAsyncRequestBody` where the underlying file channel would only be closed when enough requests are sent to read *past* the end of the file; if just enough requests are sent to read to the end of the file, the file is not closed, leaving an open file descriptor around longer than it needs to be."
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
package software.amazon.awssdk.core.internal.async;
1717

18+
import static software.amazon.awssdk.utils.FunctionalUtils.runAndLogError;
19+
1820
import java.io.IOException;
1921
import java.io.UncheckedIOException;
2022
import java.nio.ByteBuffer;
@@ -32,6 +34,7 @@
3234
import software.amazon.awssdk.core.internal.util.Mimetype;
3335
import software.amazon.awssdk.core.internal.util.NoopSubscription;
3436
import software.amazon.awssdk.utils.Logger;
37+
import software.amazon.awssdk.utils.Validate;
3538
import software.amazon.awssdk.utils.builder.SdkBuilder;
3639

3740
/**
@@ -80,17 +83,23 @@ public String contentType() {
8083

8184
@Override
8285
public void subscribe(Subscriber<? super ByteBuffer> s) {
86+
AsynchronousFileChannel channel = null;
8387
try {
84-
AsynchronousFileChannel channel = openInputChannel(this.path);
88+
channel = openInputChannel(this.path);
8589

8690
// We need to synchronize here because the subscriber could call
8791
// request() from within onSubscribe which would potentially
8892
// trigger onNext before onSubscribe is finished.
89-
Subscription subscription = new FileSubscription(channel, s, chunkSizeInBytes);
93+
//
94+
// Note: size() can throw IOE here
95+
Subscription subscription = new FileSubscription(channel, channel.size(), s, chunkSizeInBytes);
9096
synchronized (subscription) {
9197
s.onSubscribe(subscription);
9298
}
9399
} catch (IOException e) {
100+
if (channel != null) {
101+
runAndLogError(log.logger(), "Unable to close file channel", channel::close);
102+
}
94103
// subscribe() must return normally, so we need to signal the
95104
// failure to open via onError() once onSubscribe() is signaled.
96105
s.onSubscribe(new NoopSubscription(s));
@@ -172,15 +181,20 @@ private static final class FileSubscription implements Subscription {
172181
private final int chunkSize;
173182

174183
private final AtomicLong position = new AtomicLong(0);
184+
private final AtomicLong remainingBytes = new AtomicLong(0);
175185
private long outstandingDemand = 0;
176186
private boolean readInProgress = false;
177187
private volatile boolean done = false;
178188
private final Object lock = new Object();
179189

180-
private FileSubscription(AsynchronousFileChannel inputChannel, Subscriber<? super ByteBuffer> subscriber, int chunkSize) {
190+
private FileSubscription(AsynchronousFileChannel inputChannel,
191+
long size,
192+
Subscriber<? super ByteBuffer> subscriber,
193+
int chunkSize) {
181194
this.inputChannel = inputChannel;
182195
this.subscriber = subscriber;
183196
this.chunkSize = chunkSize;
197+
this.remainingBytes.set(Validate.isNotNegative(size, "size"));
184198
}
185199

186200
@Override
@@ -239,12 +253,20 @@ private void readData() {
239253
inputChannel.read(buffer, position.get(), buffer, new CompletionHandler<Integer, ByteBuffer>() {
240254
@Override
241255
public void completed(Integer result, ByteBuffer attachment) {
242-
243256
if (result > 0) {
244257
attachment.flip();
245-
position.addAndGet(attachment.remaining());
258+
259+
int readBytes = attachment.remaining();
260+
position.addAndGet(readBytes);
261+
remainingBytes.addAndGet(-readBytes);
262+
246263
signalOnNext(attachment);
247264

265+
if (remainingBytes.get() == 0) {
266+
closeFile();
267+
signalOnComplete();
268+
}
269+
248270
synchronized (lock) {
249271
// If we have more permits, queue up another read.
250272
if (--outstandingDemand > 0) {
@@ -255,8 +277,8 @@ public void completed(Integer result, ByteBuffer attachment) {
255277
}
256278
} else {
257279
// Reached the end of the file, notify the subscriber and cleanup
258-
signalOnComplete();
259280
closeFile();
281+
signalOnComplete();
260282
}
261283
}
262284

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.async;
17+
18+
import java.io.IOException;
19+
import java.nio.ByteBuffer;
20+
import java.nio.channels.AsynchronousFileChannel;
21+
import java.nio.file.Files;
22+
import java.nio.file.Path;
23+
import java.nio.file.StandardOpenOption;
24+
import java.util.concurrent.CompletableFuture;
25+
import java.util.concurrent.ExecutionException;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.TimeoutException;
28+
import java.util.concurrent.atomic.AtomicLong;
29+
import org.junit.AfterClass;
30+
import org.junit.BeforeClass;
31+
import org.junit.Test;
32+
import org.reactivestreams.Subscriber;
33+
import org.reactivestreams.Subscription;
34+
import software.amazon.awssdk.core.async.AsyncRequestBody;
35+
import software.amazon.awssdk.testutils.RandomTempFile;
36+
37+
public class FileAsyncRequestBodyTest {
38+
private static final long MiB = 1024 * 1024;
39+
private static final long TEST_FILE_SIZE = 10 * MiB;
40+
private static Path testFile;
41+
42+
@BeforeClass
43+
public static void setup() throws IOException {
44+
testFile = new RandomTempFile(TEST_FILE_SIZE).toPath();
45+
}
46+
47+
@AfterClass
48+
public static void teardown() throws IOException {
49+
Files.delete(testFile);
50+
}
51+
52+
// If we issue just enough requests to read the file entirely but not more (to go past EOF), we should still receive
53+
// an onComplete
54+
@Test
55+
public void readFully_doesNotRequestPastEndOfFile_receivesComplete() throws InterruptedException, ExecutionException, TimeoutException {
56+
int chunkSize = 16384;
57+
AsyncRequestBody asyncRequestBody = FileAsyncRequestBody.builder()
58+
.path(testFile)
59+
.chunkSizeInBytes(chunkSize)
60+
.build();
61+
62+
long totalRequests = TEST_FILE_SIZE / chunkSize;
63+
64+
CompletableFuture<Void> completed = new CompletableFuture<>();
65+
asyncRequestBody.subscribe(new Subscriber<ByteBuffer>() {
66+
private Subscription sub;
67+
private long requests = 0;
68+
@Override
69+
public void onSubscribe(Subscription subscription) {
70+
this.sub = subscription;
71+
if (requests++ < totalRequests) {
72+
this.sub.request(1);
73+
}
74+
}
75+
76+
@Override
77+
public void onNext(ByteBuffer byteBuffer) {
78+
if (requests++ < totalRequests) {
79+
this.sub.request(1);
80+
}
81+
}
82+
83+
@Override
84+
public void onError(Throwable throwable) {
85+
}
86+
87+
@Override
88+
public void onComplete() {
89+
completed.complete(null);
90+
}
91+
});
92+
93+
completed.get(5, TimeUnit.SECONDS);
94+
}
95+
}

0 commit comments

Comments
 (0)