1
+ /*
2
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
+ *
4
+ * Licensed under the Apache License, Version 2.0 (the "License").
5
+ * You may not use this file except in compliance with the License.
6
+ * A copy of the License is located at
7
+ *
8
+ * http://aws.amazon.com/apache2.0
9
+ *
10
+ * or in the "license" file accompanying this file. This file is distributed
11
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12
+ * express or implied. See the License for the specific language governing
13
+ * permissions and limitations under the License.
14
+ */
15
+
16
+ package software .amazon .awssdk .core .internal .async ;
17
+
18
+ import static org .junit .jupiter .api .Assertions .assertArrayEquals ;
19
+ import static org .junit .jupiter .api .Assertions .assertEquals ;
20
+ import static org .junit .jupiter .api .Assertions .assertFalse ;
21
+ import static org .junit .jupiter .api .Assertions .assertTrue ;
22
+
23
+ import java .nio .ByteBuffer ;
24
+ import java .nio .charset .StandardCharsets ;
25
+ import java .util .ArrayList ;
26
+ import java .util .Collections ;
27
+ import java .util .List ;
28
+ import java .util .concurrent .ExecutorService ;
29
+ import java .util .concurrent .Executors ;
30
+ import java .util .concurrent .TimeUnit ;
31
+ import java .util .stream .IntStream ;
32
+ import org .junit .jupiter .api .Test ;
33
+ import org .reactivestreams .Subscriber ;
34
+ import org .reactivestreams .Subscription ;
35
+ import software .amazon .awssdk .core .async .AsyncRequestBody ;
36
+ import software .amazon .awssdk .utils .BinaryUtils ;
37
+
38
+ class SimpleAsyncRequestBodyTest {
39
+
40
+ private static class TestSubscriber implements Subscriber <ByteBuffer > {
41
+ private Subscription subscription ;
42
+ private boolean onCompleteCalled = false ;
43
+ private int callsToComplete = 0 ;
44
+ private final List <ByteBuffer > publishedResults = Collections .synchronizedList (new ArrayList <>());
45
+
46
+ public void request (long n ) {
47
+ subscription .request (n );
48
+ }
49
+
50
+ @ Override
51
+ public void onSubscribe (Subscription s ) {
52
+ this .subscription = s ;
53
+ }
54
+
55
+ @ Override
56
+ public void onNext (ByteBuffer byteBuffer ) {
57
+ publishedResults .add (byteBuffer );
58
+ }
59
+
60
+ @ Override
61
+ public void onError (Throwable throwable ) {
62
+ throw new IllegalStateException (throwable );
63
+ }
64
+
65
+ @ Override
66
+ public void onComplete () {
67
+ onCompleteCalled = true ;
68
+ callsToComplete ++;
69
+ }
70
+ }
71
+
72
+ @ Test
73
+ public void subscriberIsMarkedAsCompleted () {
74
+ AsyncRequestBody requestBody = SimpleAsyncRequestBody .of ("Hello World!" .getBytes (StandardCharsets .UTF_8 ));
75
+
76
+ TestSubscriber subscriber = new TestSubscriber ();
77
+ requestBody .subscribe (subscriber );
78
+ subscriber .request (1 );
79
+
80
+ assertTrue (subscriber .onCompleteCalled );
81
+ assertEquals (1 , subscriber .publishedResults .size ());
82
+ }
83
+
84
+ @ Test
85
+ public void subscriberIsMarkedAsCompletedWhenARequestIsMadeForMoreBuffersThanAreAvailable () {
86
+ AsyncRequestBody requestBody = SimpleAsyncRequestBody .of ("Hello World!" .getBytes (StandardCharsets .UTF_8 ));
87
+
88
+ TestSubscriber subscriber = new TestSubscriber ();
89
+ requestBody .subscribe (subscriber );
90
+ subscriber .request (2 );
91
+
92
+ assertTrue (subscriber .onCompleteCalled );
93
+ assertEquals (1 , subscriber .publishedResults .size ());
94
+ }
95
+
96
+ @ Test
97
+ public void subscriberIsThreadSafeAndMarkedAsCompletedExactlyOnce () throws InterruptedException {
98
+ int numBuffers = 100 ;
99
+ AsyncRequestBody requestBody = SimpleAsyncRequestBody .of (null , IntStream .range (0 , numBuffers )
100
+ .mapToObj (i -> ByteBuffer .wrap (new byte [1 ]))
101
+ .toArray (ByteBuffer []::new ));
102
+
103
+ TestSubscriber subscriber = new TestSubscriber ();
104
+ requestBody .subscribe (subscriber );
105
+
106
+ int parallelism = 8 ;
107
+ ExecutorService executorService = Executors .newFixedThreadPool (parallelism );
108
+ for (int i = 0 ; i < parallelism ; i ++) {
109
+ executorService .submit (() -> {
110
+ for (int j = 0 ; j < numBuffers ; j ++) {
111
+ subscriber .request (2 );
112
+ }
113
+ });
114
+ }
115
+ executorService .shutdown ();
116
+ executorService .awaitTermination (1 , TimeUnit .MINUTES );
117
+
118
+ assertTrue (subscriber .onCompleteCalled );
119
+ assertEquals (1 , subscriber .callsToComplete );
120
+ assertEquals (numBuffers , subscriber .publishedResults .size ());
121
+ }
122
+
123
+ @ Test
124
+ public void subscriberIsNotMarkedAsCompletedWhenThereAreRemainingBuffersToPublish () {
125
+ byte [] helloWorld = "Hello World!" .getBytes (StandardCharsets .UTF_8 );
126
+ byte [] goodbyeWorld = "Goodbye World!" .getBytes (StandardCharsets .UTF_8 );
127
+ AsyncRequestBody requestBody = SimpleAsyncRequestBody .of ((long ) (helloWorld .length + goodbyeWorld .length ),
128
+ ByteBuffer .wrap (helloWorld ),
129
+ ByteBuffer .wrap (goodbyeWorld ));
130
+
131
+ TestSubscriber subscriber = new TestSubscriber ();
132
+ requestBody .subscribe (subscriber );
133
+ subscriber .request (1 );
134
+
135
+ assertFalse (subscriber .onCompleteCalled );
136
+ assertEquals (1 , subscriber .publishedResults .size ());
137
+ }
138
+
139
+ @ Test
140
+ public void subscriberReceivesAllBuffers () {
141
+ byte [] helloWorld = "Hello World!" .getBytes (StandardCharsets .UTF_8 );
142
+ byte [] goodbyeWorld = "Goodbye World!" .getBytes (StandardCharsets .UTF_8 );
143
+
144
+ AsyncRequestBody requestBody = SimpleAsyncRequestBody .of ((long ) (helloWorld .length + goodbyeWorld .length ),
145
+ ByteBuffer .wrap (helloWorld ),
146
+ ByteBuffer .wrap (goodbyeWorld ));
147
+
148
+ TestSubscriber subscriber = new TestSubscriber ();
149
+ requestBody .subscribe (subscriber );
150
+ subscriber .request (2 );
151
+
152
+ assertEquals (2 , subscriber .publishedResults .size ());
153
+ assertTrue (subscriber .onCompleteCalled );
154
+ assertArrayEquals (helloWorld , BinaryUtils .copyAllBytesFrom (subscriber .publishedResults .get (0 )));
155
+ assertArrayEquals (goodbyeWorld , BinaryUtils .copyAllBytesFrom (subscriber .publishedResults .get (1 )));
156
+ }
157
+
158
+ @ Test
159
+ public void canceledSubscriberDoesNotReturnNewResults () {
160
+ AsyncRequestBody requestBody = SimpleAsyncRequestBody .of (null , ByteBuffer .wrap (new byte [0 ]));
161
+
162
+ TestSubscriber subscriber = new TestSubscriber ();
163
+ requestBody .subscribe (subscriber );
164
+
165
+ subscriber .subscription .cancel ();
166
+ subscriber .request (1 );
167
+
168
+ assertTrue (subscriber .publishedResults .isEmpty ());
169
+ }
170
+
171
+ }
0 commit comments