1 /* 2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"). 5 * You may not use this file except in compliance with the License. 6 * A copy of the License is located at 7 * 8 * http://aws.amazon.com/apache2.0 9 * 10 * or in the "license" file accompanying this file. This file is distributed 11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 12 * express or implied. See the License for the specific language governing 13 * permissions and limitations under the License. 14 */ 15 package software.amazon.awssdk.services.transcribestreaming; 16 17 import com.github.davidmoten.rx2.Bytes; 18 import java.io.File; 19 import java.io.FileInputStream; 20 import java.io.FileNotFoundException; 21 import java.util.concurrent.CompletableFuture; 22 import java.util.concurrent.atomic.AtomicReference; 23 import java.util.function.Consumer; 24 import org.junit.Test; 25 import org.reactivestreams.Publisher; 26 import software.amazon.awssdk.core.SdkBytes; 27 import software.amazon.awssdk.services.transcribestreaming.model.AudioEvent; 28 import software.amazon.awssdk.services.transcribestreaming.model.AudioStream; 29 import software.amazon.awssdk.services.transcribestreaming.model.LanguageCode; 30 import software.amazon.awssdk.services.transcribestreaming.model.MediaEncoding; 31 import software.amazon.awssdk.services.transcribestreaming.model.StartStreamTranscriptionRequest; 32 import software.amazon.awssdk.services.transcribestreaming.model.StartStreamTranscriptionResponseHandler; 33 import software.amazon.awssdk.services.transcribestreaming.model.TranscriptEvent; 34 import software.amazon.awssdk.services.transcribestreaming.model.TranscriptResultStream; 35 36 public class CurrentState { 37 private File audioFile = new File(getClass().getClassLoader().getResource("silence_16kHz_s16le.wav").getFile()); 38 39 @Test demoCurrentState()40 public void demoCurrentState() throws FileNotFoundException { 41 try (TranscribeStreamingAsyncClient client = TranscribeStreamingAsyncClient.create()) { 42 // Create the audio stream for transcription - we have to create a publisher that resumes where it left off. 43 // If we don't, we'll replay the whole thing again on a reconnect. 44 Publisher<AudioStream> audioStream = 45 Bytes.from(new FileInputStream(audioFile)) 46 .map(SdkBytes::fromByteArray) 47 .map(bytes -> AudioEvent.builder().audioChunk(bytes).build()) 48 .cast(AudioStream.class); 49 50 CompletableFuture<Void> result = printAudio(client, audioStream, null, 3); 51 result.join(); 52 } 53 } 54 printAudio(TranscribeStreamingAsyncClient client, Publisher<AudioStream> audioStream, String sessionId, int resumesRemaining)55 private CompletableFuture<Void> printAudio(TranscribeStreamingAsyncClient client, 56 Publisher<AudioStream> audioStream, 57 String sessionId, 58 int resumesRemaining) { 59 if (resumesRemaining == 0) { 60 CompletableFuture<Void> result = new CompletableFuture<>(); 61 result.completeExceptionally(new IllegalStateException("Failed to resume audio, because the maximum resumes " + 62 "have been exceeded.")); 63 return result; 64 } 65 66 // Create the request for transcribe that includes the audio metadata 67 StartStreamTranscriptionRequest audioMetadata = 68 StartStreamTranscriptionRequest.builder() 69 .languageCode(LanguageCode.EN_US) 70 .mediaEncoding(MediaEncoding.PCM) 71 .mediaSampleRateHertz(16_000) 72 .sessionId(sessionId) 73 .build(); 74 75 // Create the transcription handler 76 AtomicReference<String> atomicSessionId = new AtomicReference<>(sessionId); 77 Consumer<TranscriptResultStream> reader = event -> { 78 if (event instanceof TranscriptEvent) { 79 TranscriptEvent transcriptEvent = (TranscriptEvent) event; 80 System.out.println(transcriptEvent.transcript().results()); 81 } 82 }; 83 84 StartStreamTranscriptionResponseHandler responseHandler = 85 StartStreamTranscriptionResponseHandler.builder() 86 .onResponse(r -> atomicSessionId.set(r.sessionId())) 87 .subscriber(reader) 88 .build(); 89 90 // Start talking with transcribe 91 return client.startStreamTranscription(audioMetadata, audioStream, responseHandler) 92 .handle((x, error) -> resumePrintAudio(client, audioStream, atomicSessionId.get(), resumesRemaining, error)) 93 .thenCompose(flatten -> flatten); 94 } 95 resumePrintAudio(TranscribeStreamingAsyncClient client, Publisher<AudioStream> audioStream, String sessionId, int resumesRemaining, Throwable error)96 private CompletableFuture<Void> resumePrintAudio(TranscribeStreamingAsyncClient client, 97 Publisher<AudioStream> audioStream, 98 String sessionId, 99 int resumesRemaining, 100 Throwable error) { 101 if (error == null) { 102 return CompletableFuture.completedFuture(null); 103 } 104 105 System.out.print("Error happened. Reconnecting and trying again..."); 106 error.printStackTrace(); 107 108 if (sessionId == null) { 109 CompletableFuture<Void> result = new CompletableFuture<>(); 110 result.completeExceptionally(error); 111 return result; 112 } 113 114 // If we failed, recursively call printAudio 115 return printAudio(client, audioStream, sessionId, resumesRemaining - 1); 116 } 117 } 118