|
Name |
|
Date |
Size |
#Lines |
LOC |
| .. | | - | - |
| README.md | D | 04-Jul-2025 | 6.7 KiB | 168 | 132 |
README.md
1**Design:** New Feature, **Status:** [Proposed](../../../README.md)
2
3# Event Stream Alternate Syntax
4
5Event streaming allows long-running bi-directional communication between
6customers and AWS services over HTTP/2 connections.
7
8The current syntax for event streaming APIs is adequate for power users,
9but has a few disadvantages:
10
111. Customers must use reactive streams APIs, even for relatively simple
12 use-cases. Reactive streams APIs are powerful, but difficult to use
13 without external documentation and libraries.
142. All response processing must be performed in a callback (the
15 `ResponseHandler` abstraction), which makes it challenging to
16 propagate information to the rest of the application.
17
18This mini-proposal suggests an alternate syntax that customers would be
19able to use for all event streaming operations.
20
21## Proposal
22
23A new method will be added to each event streaming operation:
24`Running{OPERATION} {OPERATION}({OPERATION}Request)` (and its
25consumer-builder variant).
26
27A new type will be created for each event streaming operation:
28`Running{OPERATION}`:
29
30```Java
31interface Running{OPERATION} extends AutoCloseable {
32 // A future that is completed when the entire operation completes.
33 CompletableFuture<Void> completionFuture();
34
35 /**
36 * Methods enabling reading individual events asynchronously, as they are received.
37 */
38
39 CompletableFuture<Void> readAll(Consumer<{RESPONSE_EVENT_TYPE}> reader);
40 CompletableFuture<Void> readAll({RESPONSE_EVENT_TYPE}Visitor responseVisitor);
41 <T extends {RESPONSE_EVENT_TYPE}> CompletableFuture<Void> readAll(Class<T> type, Consumer<T> reader);
42
43 CompletableFuture<Optional<{REQUEST_EVENT_TYPE}>> readNext();
44 <T extends {RESPONSE_EVENT_TYPE}> CompletableFuture<Optional<T>> readNext(Class<T> type);
45
46 /**
47 * Methods enabling writing individual events asynchronously.
48 */
49
50 CompletableFuture<Void> writeAll(Publisher<? extends {REQUEST_EVENT_TYPE}> events);
51 CompletableFuture<Void> writeAll(Iterable<? extends {REQUEST_EVENT_TYPE}> events);
52 CompletableFuture<Void> write({REQUEST_EVENT_TYPE} event);
53
54 /**
55 * Reactive-streams methods for reading events and response messages, as they are received.
56 */
57 Publisher<{RESPONSE_EVENT_TYPE}> responseEventPublisher();
58 Publisher<{OPERATION}Response> responsePublisher();
59
60 /**
61 * Java-8-streams methods for reading events and response messages, as they are received.
62 */
63
64 Stream<{RESPONSE_EVENT_TYPE}> blockingResponseEventStream();
65 Stream<{OPERATION}Response> blockingResponseStream();
66
67 @Override
68 default void close() {
69 completionFuture().cancel(false);
70 }
71}
72```
73
74This type enables customers to use the operation in either a
75reactive-streams or a Java-8 usage pattern, depending on how they care
76to manage their threads and back-pressure.
77
78It's worth noting that every method on `Running{OPERATION}` is still
79non-blocking and will never throw exceptions directly. Any method that
80returns a type that itself contains blocking methods is prefixed with
81`blocking`, e.g. `Stream<{RESPONSE_EVENT_TYPE}>
82blockingResponseEventStream()`.
83
84**Example 1: Transcribe's `startStreamTranscription` with Reactive
85Streams**
86
87```Java
88try (TranscribeStreamingAsyncClient client = TranscribeStreamingAsyncClient.create();
89 // Create the connection to transcribe and send the initial request message
90 RunningStartStreamTranscription transcription =
91 client.startStreamTranscription(r -> r.languageCode(LanguageCode.EN_US)
92 .mediaEncoding(MediaEncoding.PCM)
93 .mediaSampleRateHertz(16_000))) {
94
95 // Use RxJava to create the audio stream to be transcribed
96 Publisher<AudioStream> audioPublisher =
97 Bytes.from(audioFile)
98 .map(SdkBytes::fromByteArray)
99 .map(bytes -> AudioEvent.builder().audioChunk(bytes).build())
100 .cast(AudioStream.class);
101
102 // Begin sending the audio data to transcribe, asynchronously
103 transcription.writeAll(audioPublisher);
104
105 // Get a publisher for the transcription
106 Publisher<TranscriptResultStream> transcriptionPublisher = transcription.responseEventPublisher();
107
108 // Use RxJava to log the transcription
109 Flowable.fromPublisher(transcriptionPublisher)
110 .filter(e -> e instanceof TranscriptEvent)
111 .cast(TranscriptEvent.class)
112 .forEach(e -> System.out.println(e.transcript().results()));
113
114 // Wait for the operation to complete
115 transcription.completionFuture().join();
116}
117```
118
119**Example 2: Transcribe's `startStreamTranscription` without Reactive
120Streams**
121
122```Java
123try (TranscribeStreamingAsyncClient client = TranscribeStreamingAsyncClient.create();
124 // Create the connection to transcribe and send the initial request message
125 RunningStartStreamTranscription transcription =
126 client.startStreamTranscription(r -> r.languageCode(LanguageCode.EN_US)
127 .mediaEncoding(MediaEncoding.PCM)
128 .mediaSampleRateHertz(16_000))) {
129
130 // Asynchronously log response transcription events, as we receive them
131 transcription.readAll(TranscriptEvent.class, e -> System.out.println(e.transcript().results()));
132
133 // Read from our audio file, 4 KB at a time
134 try (InputStream reader = Files.newInputStream(audioFile)) {
135 byte[] buffer = new byte[4096];
136 int bytesRead;
137
138 while ((bytesRead = reader.read(buffer)) != -1) {
139 if (bytesRead > 0) {
140 // Write the 4 KB we read to transcribe, and wait for the write to complete
141 SdkBytes audioChunk = SdkBytes.fromByteBuffer(ByteBuffer.wrap(buffer, 0, bytesRead));
142 CompletableFuture<Void> writeCompleteFuture =
143 transcription.write(AudioEvent.builder().audioChunk(audioChunk).build());
144 writeCompleteFuture.join();
145 }
146 }
147 }
148
149 // Wait for the operation to complete
150 transcription.completionFuture().join();
151}
152```
153
154**Example 3: Kinesis's `subscribeToShard` with Java 8 Streams**
155
156```Java
157try (KinesisAsyncClient client = KinesisAsyncClient.create();
158 // Create the connection to Kinesis and send the initial request message
159 RunningSubscribeToShard transcription = client.subscribeToShard(r -> r.shardId("myShardId"))) {
160
161 // Block this thread to log 5 Kinesis SubscribeToShardEvent messages
162 transcription.blockingResponseEventStream()
163 .filter(SubscribeToShardEvent.class::isInstance)
164 .map(SubscribeToShardEvent.class::cast)
165 .limit(5)
166 .forEach(event -> System.out.println(event.records()));
167}
168```