29
29
import java .util .concurrent .CompletableFuture ;
30
30
import java .util .function .Function ;
31
31
32
+ import static com .madgag .aws .sdk .async .responsebytes .awssdk .utils .BinaryUtilsAlternative .copyBytes ;
32
33
import static software .amazon .awssdk .utils .FunctionalUtils .invokeSafely ;
33
34
34
35
/**
@@ -65,26 +66,27 @@ public void onResponse(ResponseT response) {
65
66
66
67
@ Override
67
68
public void onStream (SdkPublisher <ByteBuffer > publisher ) {
68
- ByteArrayOutputStream baos =
69
- knownSize .map (f -> new ByteArrayOutputStream (f .apply (response ))).orElse (new ByteArrayOutputStream ());
70
- publisher .subscribe (new BaosSubscriber (cf , baos ));
69
+ ByteStore byteStore =
70
+ knownSize .< ByteStore > map (f -> new KnownLengthStore (f .apply (response ))).orElse (new BaosStore ());
71
+ publisher .subscribe (new ByteSubscriber (cf , byteStore ));
71
72
}
72
73
73
74
@ Override
74
75
public void exceptionOccurred (Throwable throwable ) {
75
76
cf .completeExceptionally (throwable );
76
77
}
77
78
78
- static class BaosSubscriber implements Subscriber <ByteBuffer > {
79
+
80
+ static class ByteSubscriber implements Subscriber <ByteBuffer > {
79
81
private final CompletableFuture <byte []> resultFuture ;
80
82
81
- private ByteArrayOutputStream baos ;
83
+ private ByteStore byteStore ;
82
84
83
85
private Subscription subscription ;
84
86
85
- BaosSubscriber (CompletableFuture <byte []> resultFuture , ByteArrayOutputStream baos ) {
87
+ ByteSubscriber (CompletableFuture <byte []> resultFuture , ByteStore byteStore ) {
86
88
this .resultFuture = resultFuture ;
87
- this .baos = baos ;
89
+ this .byteStore = byteStore ;
88
90
}
89
91
90
92
@ Override
@@ -99,19 +101,55 @@ public void onSubscribe(Subscription s) {
99
101
100
102
@ Override
101
103
public void onNext (ByteBuffer byteBuffer ) {
102
- invokeSafely (() -> baos . write ( BinaryUtils . copyBytesFrom ( byteBuffer )) );
104
+ byteStore . append ( byteBuffer );
103
105
subscription .request (1 );
104
106
}
105
107
106
108
@ Override
107
109
public void onError (Throwable throwable ) {
108
- baos = null ;
110
+ byteStore = null ;
109
111
resultFuture .completeExceptionally (throwable );
110
112
}
111
113
112
114
@ Override
113
115
public void onComplete () {
114
- resultFuture .complete (baos .toByteArray ());
116
+ resultFuture .complete (byteStore .toByteArray ());
117
+ }
118
+ }
119
+
120
+
121
+ interface ByteStore {
122
+ void append (ByteBuffer byteBuffer );
123
+ byte [] toByteArray ();
124
+ }
125
+
126
+ static class BaosStore implements ByteStore {
127
+ private final ByteArrayOutputStream baos = new ByteArrayOutputStream ();
128
+
129
+ public void append (ByteBuffer byteBuffer ) {
130
+ invokeSafely (() -> baos .write (BinaryUtils .copyBytesFrom (byteBuffer )));
131
+ }
132
+
133
+ public byte [] toByteArray () {
134
+ return baos .toByteArray ();
135
+ }
136
+ }
137
+
138
+ static class KnownLengthStore implements ByteStore {
139
+ private final byte [] byteArray ;
140
+ private int offset = 0 ;
141
+
142
+ KnownLengthStore (int contentSize ) {
143
+ System .out .println ("We know the size is " +contentSize );
144
+ this .byteArray = new byte [contentSize ];
145
+ }
146
+
147
+ public void append (ByteBuffer byteBuffer ) {
148
+ offset += copyBytes (byteBuffer , byteArray , offset );
149
+ }
150
+
151
+ public byte [] toByteArray () {
152
+ return byteArray ;
115
153
}
116
154
}
117
155
}
0 commit comments