• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  *  http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 package software.amazon.awssdk.services.transcribestreaming;
16 
17 import com.github.davidmoten.rx2.Bytes;
18 import java.io.File;
19 import java.io.FileInputStream;
20 import java.io.FileNotFoundException;
21 import java.util.concurrent.CompletableFuture;
22 import java.util.concurrent.atomic.AtomicReference;
23 import java.util.function.Consumer;
24 import org.junit.Test;
25 import org.reactivestreams.Publisher;
26 import software.amazon.awssdk.core.SdkBytes;
27 import software.amazon.awssdk.services.transcribestreaming.model.AudioEvent;
28 import software.amazon.awssdk.services.transcribestreaming.model.AudioStream;
29 import software.amazon.awssdk.services.transcribestreaming.model.LanguageCode;
30 import software.amazon.awssdk.services.transcribestreaming.model.MediaEncoding;
31 import software.amazon.awssdk.services.transcribestreaming.model.StartStreamTranscriptionRequest;
32 import software.amazon.awssdk.services.transcribestreaming.model.StartStreamTranscriptionResponseHandler;
33 import software.amazon.awssdk.services.transcribestreaming.model.TranscriptEvent;
34 import software.amazon.awssdk.services.transcribestreaming.model.TranscriptResultStream;
35 
36 public class CurrentState {
37     private File audioFile = new File(getClass().getClassLoader().getResource("silence_16kHz_s16le.wav").getFile());
38 
39     @Test
demoCurrentState()40     public void demoCurrentState() throws FileNotFoundException {
41         try (TranscribeStreamingAsyncClient client = TranscribeStreamingAsyncClient.create()) {
42             // Create the audio stream for transcription - we have to create a publisher that resumes where it left off.
43             // If we don't, we'll replay the whole thing again on a reconnect.
44             Publisher<AudioStream> audioStream =
45                     Bytes.from(new FileInputStream(audioFile))
46                          .map(SdkBytes::fromByteArray)
47                          .map(bytes -> AudioEvent.builder().audioChunk(bytes).build())
48                          .cast(AudioStream.class);
49 
50             CompletableFuture<Void> result = printAudio(client, audioStream, null, 3);
51             result.join();
52         }
53     }
54 
printAudio(TranscribeStreamingAsyncClient client, Publisher<AudioStream> audioStream, String sessionId, int resumesRemaining)55     private CompletableFuture<Void> printAudio(TranscribeStreamingAsyncClient client,
56                                                Publisher<AudioStream> audioStream,
57                                                String sessionId,
58                                                int resumesRemaining) {
59         if (resumesRemaining == 0) {
60             CompletableFuture<Void> result = new CompletableFuture<>();
61             result.completeExceptionally(new IllegalStateException("Failed to resume audio, because the maximum resumes " +
62                                                                    "have been exceeded."));
63             return result;
64         }
65 
66         // Create the request for transcribe that includes the audio metadata
67         StartStreamTranscriptionRequest audioMetadata =
68                 StartStreamTranscriptionRequest.builder()
69                                                .languageCode(LanguageCode.EN_US)
70                                                .mediaEncoding(MediaEncoding.PCM)
71                                                .mediaSampleRateHertz(16_000)
72                                                .sessionId(sessionId)
73                                                .build();
74 
75         // Create the transcription handler
76         AtomicReference<String> atomicSessionId = new AtomicReference<>(sessionId);
77         Consumer<TranscriptResultStream> reader = event -> {
78             if (event instanceof TranscriptEvent) {
79                 TranscriptEvent transcriptEvent = (TranscriptEvent) event;
80                 System.out.println(transcriptEvent.transcript().results());
81             }
82         };
83 
84         StartStreamTranscriptionResponseHandler responseHandler =
85                 StartStreamTranscriptionResponseHandler.builder()
86                                                        .onResponse(r -> atomicSessionId.set(r.sessionId()))
87                                                        .subscriber(reader)
88                                                        .build();
89 
90         // Start talking with transcribe
91         return client.startStreamTranscription(audioMetadata, audioStream, responseHandler)
92                      .handle((x, error) -> resumePrintAudio(client, audioStream, atomicSessionId.get(), resumesRemaining, error))
93                      .thenCompose(flatten -> flatten);
94     }
95 
resumePrintAudio(TranscribeStreamingAsyncClient client, Publisher<AudioStream> audioStream, String sessionId, int resumesRemaining, Throwable error)96     private CompletableFuture<Void> resumePrintAudio(TranscribeStreamingAsyncClient client,
97                                                      Publisher<AudioStream> audioStream,
98                                                      String sessionId,
99                                                      int resumesRemaining,
100                                                      Throwable error) {
101         if (error == null) {
102             return CompletableFuture.completedFuture(null);
103         }
104 
105         System.out.print("Error happened. Reconnecting and trying again...");
106         error.printStackTrace();
107 
108         if (sessionId == null) {
109             CompletableFuture<Void> result = new CompletableFuture<>();
110             result.completeExceptionally(error);
111             return result;
112         }
113 
114         // If we failed, recursively call printAudio
115         return printAudio(client, audioStream, sessionId, resumesRemaining - 1);
116     }
117 }
118