Skip to content

Commit bfab2f3

Browse files
JonathanHensonrccarper
authored andcommitted
Hopefully fix the build this time.
1 parent 7ba4045 commit bfab2f3

File tree

3 files changed

+46
-10
lines changed

3 files changed

+46
-10
lines changed

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/AwsCrtResponseBodyPublisher.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,12 @@ protected void request(long n) {
132132
outstandingReqs = outstandingRequests.addAndGet(n);
133133
}
134134

135+
/*
136+
* Since we buffer, in the case where the subscriber came in after the publication has already begun,
137+
* go ahead and flush what we have.
138+
*/
139+
publishToSubscribers();
140+
135141
log.trace(() -> "Subscriber Requested more Buffers. Outstanding Requests: " + outstandingReqs);
136142
}
137143

@@ -249,7 +255,7 @@ protected synchronized void publishToSubscribers() {
249255
}
250256

251257
// Check if Complete, consider no subscriber as a completion.
252-
if (queueComplete.get() && (subscriberRef.get() == null || queuedBuffers.size() == 0)) {
258+
if (queueComplete.get() && queuedBuffers.size() == 0) {
253259
completeSubscriptionExactlyOnce();
254260
}
255261
}

http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientSpiVerificationTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,13 +201,33 @@ private void makePutRequest(String path, byte[] reqBody, int expectedStatus) thr
201201
final AtomicReference<SdkHttpResponse> response = new AtomicReference<>(null);
202202
final AtomicReference<Throwable> error = new AtomicReference<>(null);
203203

204+
Subscriber<ByteBuffer> subscriber = new Subscriber<ByteBuffer>() {
205+
@Override
206+
public void onSubscribe(Subscription subscription) {
207+
subscription.request(Long.MAX_VALUE);
208+
}
209+
210+
@Override
211+
public void onNext(ByteBuffer byteBuffer) {
212+
}
213+
214+
@Override
215+
public void onError(Throwable throwable) {
216+
}
217+
218+
@Override
219+
public void onComplete() {
220+
}
221+
};
222+
204223
SdkAsyncHttpResponseHandler handler = new SdkAsyncHttpResponseHandler() {
205224
@Override
206225
public void onHeaders(SdkHttpResponse headers) {
207226
response.compareAndSet(null, headers);
208227
}
209228
@Override
210229
public void onStream(Publisher<ByteBuffer> stream) {
230+
stream.subscribe(subscriber);
211231
streamReceived.complete(true);
212232
}
213233

test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/async/AwsCrtClientNonTLSBenchmark.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,25 @@
1515

1616
package software.amazon.awssdk.benchmark.apicall.httpclient.async;
1717

18-
import org.openjdk.jmh.annotations.*;
18+
import static software.amazon.awssdk.benchmark.utils.BenchmarkConstant.CONCURRENT_CALLS;
19+
import static software.amazon.awssdk.benchmark.utils.BenchmarkUtils.awaitCountdownLatchUninterruptibly;
20+
import static software.amazon.awssdk.benchmark.utils.BenchmarkUtils.countDownUponCompletion;
21+
22+
import java.util.Collection;
23+
import java.util.concurrent.CountDownLatch;
24+
import java.util.concurrent.TimeUnit;
25+
import org.openjdk.jmh.annotations.Benchmark;
26+
import org.openjdk.jmh.annotations.BenchmarkMode;
27+
import org.openjdk.jmh.annotations.Fork;
28+
import org.openjdk.jmh.annotations.Level;
29+
import org.openjdk.jmh.annotations.Measurement;
30+
import org.openjdk.jmh.annotations.Mode;
31+
import org.openjdk.jmh.annotations.OperationsPerInvocation;
32+
import org.openjdk.jmh.annotations.Scope;
33+
import org.openjdk.jmh.annotations.Setup;
34+
import org.openjdk.jmh.annotations.State;
35+
import org.openjdk.jmh.annotations.TearDown;
36+
import org.openjdk.jmh.annotations.Warmup;
1937
import org.openjdk.jmh.infra.Blackhole;
2038
import org.openjdk.jmh.profile.StackProfiler;
2139
import org.openjdk.jmh.results.RunResult;
@@ -28,14 +46,6 @@
2846
import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient;
2947
import software.amazon.awssdk.services.protocolrestjson.ProtocolRestJsonAsyncClient;
3048

31-
import java.util.Collection;
32-
import java.util.concurrent.CountDownLatch;
33-
import java.util.concurrent.TimeUnit;
34-
35-
import static software.amazon.awssdk.benchmark.utils.BenchmarkConstant.CONCURRENT_CALLS;
36-
import static software.amazon.awssdk.benchmark.utils.BenchmarkUtils.awaitCountdownLatchUninterruptibly;
37-
import static software.amazon.awssdk.benchmark.utils.BenchmarkUtils.countDownUponCompletion;
38-
3949
/**
4050
* Using aws-crt-client to test against local mock https server.
4151
*/

0 commit comments

Comments
 (0)