1
+ /*
2
+ * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
+ *
4
+ * Licensed under the Apache License, Version 2.0 (the "License").
5
+ * You may not use this file except in compliance with the License.
6
+ * A copy of the License is located at
7
+ *
8
+ * http://aws.amazon.com/apache2.0
9
+ *
10
+ * or in the "license" file accompanying this file. This file is distributed
11
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12
+ * express or implied. See the License for the specific language governing
13
+ * permissions and limitations under the License.
14
+ */
15
+ package software .amazon .awssdk .services .transcribestreaming ;
16
+
17
+ import static org .junit .Assert .assertTrue ;
18
+ import static software .amazon .awssdk .core .http .HttpResponseHandler .X_AMZN_REQUEST_ID_HEADER ;
19
+
20
+ import java .io .File ;
21
+ import java .io .FileInputStream ;
22
+ import java .io .FileNotFoundException ;
23
+ import java .io .IOException ;
24
+ import java .io .InputStream ;
25
+ import java .io .UncheckedIOException ;
26
+ import java .net .URISyntaxException ;
27
+ import java .nio .ByteBuffer ;
28
+ import java .util .concurrent .CompletableFuture ;
29
+ import java .util .concurrent .ExecutionException ;
30
+ import java .util .concurrent .ExecutorService ;
31
+ import java .util .concurrent .Executors ;
32
+ import java .util .concurrent .atomic .AtomicLong ;
33
+ import org .junit .BeforeClass ;
34
+ import org .junit .Ignore ;
35
+ import org .junit .Test ;
36
+ import org .reactivestreams .Publisher ;
37
+ import org .reactivestreams .Subscriber ;
38
+ import org .reactivestreams .Subscription ;
39
+ import software .amazon .awssdk .auth .credentials .AwsCredentialsProvider ;
40
+ import software .amazon .awssdk .auth .credentials .DefaultCredentialsProvider ;
41
+ import software .amazon .awssdk .core .SdkBytes ;
42
+ import software .amazon .awssdk .regions .Region ;
43
+ import software .amazon .awssdk .services .transcribestreaming .model .AudioEvent ;
44
+ import software .amazon .awssdk .services .transcribestreaming .model .AudioStream ;
45
+ import software .amazon .awssdk .services .transcribestreaming .model .LanguageCode ;
46
+ import software .amazon .awssdk .services .transcribestreaming .model .MediaEncoding ;
47
+ import software .amazon .awssdk .services .transcribestreaming .model .StartStreamTranscriptionRequest ;
48
+ import software .amazon .awssdk .services .transcribestreaming .model .StartStreamTranscriptionResponseHandler ;
49
+ import software .amazon .awssdk .services .transcribestreaming .model .TranscriptEvent ;
50
+
51
+ /**
52
+ * An example test class to show the usage of
53
+ * {@link TranscribeStreamingAsyncClient#startStreamTranscription(StartStreamTranscriptionRequest, Publisher,
54
+ * StartStreamTranscriptionResponseHandler)} API.
55
+ *
56
+ * The audio files used in this class don't have voice, so there won't be any transcripted text would be empty
57
+ */
58
+ @ Ignore
59
+ public class TranscribeStreamingIntegrationTest {
60
+
61
+ private static TranscribeStreamingAsyncClient client ;
62
+
63
+ @ BeforeClass
64
+ public static void setup () throws URISyntaxException {
65
+ client = TranscribeStreamingAsyncClient .builder ()
66
+ .region (Region .US_EAST_1 )
67
+ .credentialsProvider (getCredentials ())
68
+ .build ();
69
+ }
70
+
71
+ @ Test
72
+ public void testFileWith16kRate () throws ExecutionException , InterruptedException , URISyntaxException {
73
+ CompletableFuture <Void > result = client .startStreamTranscription (getRequest (16_000 ),
74
+ new AudioStreamPublisher (
75
+ getInputStream ("silence_16kHz_s16le.wav" )),
76
+ getResponseHandler ());
77
+
78
+ // Blocking call to keep the main thread for shutting down
79
+ result .get ();
80
+ }
81
+
82
+ @ Test
83
+ public void testFileWith8kRate () throws ExecutionException , InterruptedException , URISyntaxException {
84
+ CompletableFuture <Void > result = client .startStreamTranscription (getRequest (8_000 ),
85
+ new AudioStreamPublisher (
86
+ getInputStream ("silence_8kHz_s16le.wav" )),
87
+ getResponseHandler ());
88
+
89
+ result .get ();
90
+ }
91
+
92
+ private static AwsCredentialsProvider getCredentials () {
93
+ return DefaultCredentialsProvider .create ();
94
+ }
95
+
96
+ private StartStreamTranscriptionRequest getRequest (Integer mediaSampleRateHertz ) {
97
+ return StartStreamTranscriptionRequest .builder ()
98
+ .languageCode (LanguageCode .EN_US .toString ())
99
+ .mediaEncoding (MediaEncoding .PCM )
100
+ .mediaSampleRateHertz (mediaSampleRateHertz )
101
+ .build ();
102
+ }
103
+
104
+ private InputStream getInputStream (String audioFileName ) {
105
+ try {
106
+ File inputFile = new File (getClass ().getClassLoader ().getResource (audioFileName ).getFile ());
107
+ assertTrue (inputFile .exists ());
108
+ InputStream audioStream = new FileInputStream (inputFile );
109
+ return audioStream ;
110
+ } catch (FileNotFoundException e ) {
111
+ throw new RuntimeException (e );
112
+ }
113
+ }
114
+
115
+ private StartStreamTranscriptionResponseHandler getResponseHandler () {
116
+ return StartStreamTranscriptionResponseHandler .builder ()
117
+ .onResponse (r -> {
118
+ String idFromHeader = r .sdkHttpResponse ()
119
+ .firstMatchingHeader (X_AMZN_REQUEST_ID_HEADER )
120
+ .orElse (null );
121
+ System .out .println ("Received Initial response: " + idFromHeader );
122
+ })
123
+ .onError (e -> {
124
+ System .out .println ("Error message: " + e .getMessage ());
125
+ })
126
+ .onComplete (() -> {
127
+ System .out .println ("All records stream successfully" );
128
+ })
129
+ .subscriber (event -> {
130
+ System .out .println (((TranscriptEvent ) event ).transcript ().results ());
131
+ })
132
+ .build ();
133
+ }
134
+
135
+ private class AudioStreamPublisher implements Publisher <AudioStream > {
136
+ private final InputStream inputStream ;
137
+
138
+ private AudioStreamPublisher (InputStream inputStream ) {
139
+ this .inputStream = inputStream ;
140
+ }
141
+
142
+ @ Override
143
+ public void subscribe (Subscriber <? super AudioStream > s ) {
144
+ s .onSubscribe (new SubscriptionImpl (s , inputStream ));
145
+ }
146
+ }
147
+
148
+ private class SubscriptionImpl implements Subscription {
149
+ private static final int CHUNK_SIZE_IN_BYTES = 1024 * 1 ;
150
+ private ExecutorService executor = Executors .newFixedThreadPool (1 );
151
+ private AtomicLong demand = new AtomicLong (0 );
152
+
153
+ private final Subscriber <? super AudioStream > subscriber ;
154
+ private final InputStream inputStream ;
155
+
156
+ private SubscriptionImpl (Subscriber <? super AudioStream > s , InputStream inputStream ) {
157
+ this .subscriber = s ;
158
+ this .inputStream = inputStream ;
159
+ }
160
+
161
+ @ Override
162
+ public void request (long n ) {
163
+ if (n <= 0 ) {
164
+ subscriber .onError (new IllegalArgumentException ("Demand must be positive" ));
165
+ }
166
+
167
+ demand .getAndAdd (n );
168
+
169
+ executor .submit (() -> {
170
+ try {
171
+ do {
172
+ ByteBuffer audioBuffer = getNextEvent ();
173
+ if (audioBuffer .remaining () > 0 ) {
174
+ AudioEvent audioEvent = audioEventFromBuffer (audioBuffer );
175
+ subscriber .onNext (audioEvent );
176
+ } else {
177
+ subscriber .onComplete ();
178
+ break ;
179
+ }
180
+ } while (demand .decrementAndGet () > 0 );
181
+ } catch (Exception e ) {
182
+ subscriber .onError (e );
183
+ }
184
+ });
185
+ }
186
+
187
+ @ Override
188
+ public void cancel () {
189
+
190
+ }
191
+
192
+ private ByteBuffer getNextEvent () {
193
+ ByteBuffer audioBuffer = null ;
194
+ byte [] audioBytes = new byte [CHUNK_SIZE_IN_BYTES ];
195
+
196
+ int len = 0 ;
197
+ try {
198
+ len = inputStream .read (audioBytes );
199
+
200
+ if (len <= 0 ) {
201
+ audioBuffer = ByteBuffer .allocate (0 );
202
+ } else {
203
+ audioBuffer = ByteBuffer .wrap (audioBytes , 0 , len );
204
+ }
205
+ } catch (IOException e ) {
206
+ throw new UncheckedIOException (e );
207
+ }
208
+
209
+ return audioBuffer ;
210
+ }
211
+
212
+ private AudioEvent audioEventFromBuffer (ByteBuffer bb ) {
213
+ return AudioEvent .builder ()
214
+ .audioChunk (SdkBytes .fromByteBuffer (bb ))
215
+ .build ();
216
+ }
217
+ }
218
+ }
0 commit comments