1 /*
2 * lws-minimal-secure-streams-avs
3 *
4 * Written in 2019-2020 by Andy Green <andy@warmcat.com>
5 *
6 * This file is made available under the Creative Commons CC0 1.0
7 * Universal Public Domain Dedication.
8 *
9 * This sends a canned WAV and received (and discards) the mp3 response.
10 * However it rate-limits the response reception to manage a small ringbuffer
11 * using ss / h2 flow control apis, reflecting consumption at 64kbps and only
12 * and 8KB buffer, indtended to model optimizing rx buffering on mp3 playback
13 * on a constrained device.
14 */
15
16 #include <libwebsockets.h>
17 #include <string.h>
18 #include <sys/types.h>
19 #include <sys/stat.h>
20 #include <unistd.h>
21 #include <assert.h>
22 #include <fcntl.h>
23
24 extern int interrupted, bad;
25 static struct lws_ss_handle *hss_avs_event, *hss_avs_sync;
26 static uint8_t *wav;
27 static size_t wav_len;
28
29 typedef struct ss_avs_event {
30 struct lws_ss_handle *ss;
31 void *opaque_data;
32 /* ... application specific state ... */
33 struct lejp_ctx jctx;
34 } ss_avs_event_t;
35
36 typedef struct ss_avs_metadata {
37 struct lws_ss_handle *ss;
38 void *opaque_data;
39 /* ... application specific state ... */
40 struct lejp_ctx jctx;
41 size_t pos;
42
43 /*
44 * We simulate a ringbuffer that is used up by a sul at 64Kbit/sec
45 * rate, and managed at the same rate using tx credit
46 */
47
48 lws_sorted_usec_list_t sul;
49 uint8_t buf[256 * 1024]; /* to test rate-limiting, set to 8 * 1024 */
50 int head;
51 int tail;
52
53 char filled;
54
55 } ss_avs_metadata_t;
56
57 static const char *metadata = "{"
58 "\"event\": {"
59 "\"header\": {"
60 "\"namespace\": \"SpeechRecognizer\","
61 "\"name\": \"Recognize\","
62 "\"messageId\": \"message-123\","
63 "\"dialogRequestId\": \"dialog-request-321\""
64 "},"
65 "\"payload\": {"
66 "\"profile\":" "\"CLOSE_TALK\","
67 "\"format\":" "\"AUDIO_L16_RATE_16000_CHANNELS_1\""
68 "}"
69 "}"
70 "}";
71
72 /*
73 * avs metadata
74 */
75
76 static void
use_buffer_50ms(lws_sorted_usec_list_t * sul)77 use_buffer_50ms(lws_sorted_usec_list_t *sul)
78 {
79 ss_avs_metadata_t *m = lws_container_of(sul, ss_avs_metadata_t, sul);
80 struct lws_context *context = (struct lws_context *)m->opaque_data;
81 size_t n;
82 int e;
83
84 /*
85 * Use up 50ms-worth (8KB / 20) == 401 bytes of buffered data
86 */
87
88 /* remaining data in buffer */
89 n = ((m->head - m->tail) % sizeof(m->buf));
90 lwsl_info("%s: avail %d\n", __func__, (int)n);
91
92 if (n < 401)
93 lwsl_err("%s: underrun\n", __func__);
94
95 m->tail = (m->tail + 401) % sizeof(m->buf);
96 n = ((m->head - m->tail) % sizeof(m->buf));
97
98 e = lws_ss_get_est_peer_tx_credit(m->ss);
99
100 lwsl_info("%s: avail after: %d, curr est %d\n", __func__, (int)n, e);
101
102 if (n < (sizeof(m->buf) * 2) / 3 && e < (int)(sizeof(m->buf) - 1 - n)) {
103 lwsl_info("%s: requesting additional %d\n", __func__,
104 (int)(sizeof(m->buf) - 1 - e - n));
105 lws_ss_add_peer_tx_credit(m->ss, (sizeof(m->buf) - 1 - e - n));
106 }
107
108 lws_sul_schedule(context, 0, &m->sul, use_buffer_50ms,
109 50 * LWS_US_PER_MS);
110 }
111
112 static int
ss_avs_metadata_rx(void * userobj,const uint8_t * buf,size_t len,int flags)113 ss_avs_metadata_rx(void *userobj, const uint8_t *buf, size_t len, int flags)
114 {
115 ss_avs_metadata_t *m = (ss_avs_metadata_t *)userobj;
116 struct lws_context *context = (struct lws_context *)m->opaque_data;
117 size_t n, n1;
118
119 lwsl_notice("%s: rideshare %s, len %d, flags 0x%x\n", __func__,
120 lws_ss_rideshare(m->ss), (int)len, flags);
121 // lwsl_hexdump_warn(buf, len);
122
123 n = sizeof(m->buf) - ((m->head - m->tail) % sizeof(m->buf));
124 lwsl_info("%s: len %d, buf h %d, t %d, space %d\n", __func__,
125 (int)len, (int)m->head, (int)m->tail, (int)n);
126 lws_ss_get_est_peer_tx_credit(m->ss);
127 if (len > n) {
128 lwsl_err("%s: bad len: len %d, n %d\n", __func__, (int)len, (int)n);
129 assert(0);
130
131 return 1;
132 }
133
134 if (m->head < m->tail) /* |****h-------t**| */
135 memcpy(&m->buf[m->head], buf, len);
136 else { /* |---t*****h-----| */
137 n1 = sizeof(m->buf) - m->head;
138 if (len < n1)
139 n1 = len;
140 memcpy(&m->buf[m->head], buf, n1);
141 if (n1 != len)
142 memcpy(m->buf, buf, len - n1);
143 }
144
145 m->head = (m->head + len) % sizeof(m->buf);
146
147 lws_sul_schedule(context, 0, &m->sul, use_buffer_50ms,
148 50 * LWS_US_PER_MS);
149
150 return 0;
151 }
152
153 static int
ss_avs_metadata_tx(void * userobj,lws_ss_tx_ordinal_t ord,uint8_t * buf,size_t * len,int * flags)154 ss_avs_metadata_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
155 size_t *len, int *flags)
156 {
157 ss_avs_metadata_t *m = (ss_avs_metadata_t *)userobj;
158 //struct lws_context *context = (struct lws_context *)m->opaque_data;
159 size_t tot;
160
161 if ((long)m->pos < 0) {
162 *len = 0;
163 lwsl_debug("%s: skip tx\n", __func__);
164 return 1;
165 }
166
167 // lwsl_notice("%s: rideshare '%s'\n", __func__, lws_ss_rideshare(m->ss));
168
169 if (!strcmp(lws_ss_rideshare(m->ss), "avs_audio")) {
170 /* audio rideshare */
171
172 if (!m->pos)
173 *flags |= LWSSS_FLAG_SOM;
174
175 if (*len > wav_len - m->pos)
176 *len = wav_len - m->pos;
177
178 memcpy(buf, wav + m->pos, *len);
179 m->pos += *len;
180
181 if (m->pos == wav_len) {
182 *flags |= LWSSS_FLAG_EOM;
183 lwsl_info("%s: tx done\n", __func__);
184 m->pos = (long)-1l; /* ban subsequent until new stream */
185 } else
186 lws_ss_request_tx(m->ss);
187
188 lwsl_hexdump_info(buf, *len);
189
190 return 0;
191 }
192
193 /* metadata part */
194
195 tot = strlen(metadata);
196
197 if (!m->pos)
198 *flags |= LWSSS_FLAG_SOM;
199
200 if (*len > tot - m->pos)
201 *len = tot - m->pos;
202
203 memcpy(buf, metadata + m->pos, *len);
204
205 m->pos += *len;
206
207 if (m->pos == tot) {
208 *flags |= LWSSS_FLAG_EOM;
209 m->pos = 0; /* for next time */
210 lws_ss_request_tx(m->ss);
211 }
212
213 lwsl_hexdump_info(buf, *len);
214
215 return 0;
216 }
217
218 static int
ss_avs_metadata_state(void * userobj,void * sh,lws_ss_constate_t state,lws_ss_tx_ordinal_t ack)219 ss_avs_metadata_state(void *userobj, void *sh,
220 lws_ss_constate_t state, lws_ss_tx_ordinal_t ack)
221 {
222
223 ss_avs_metadata_t *m = (ss_avs_metadata_t *)userobj;
224 struct lws_context *context = (struct lws_context *)m->opaque_data;
225
226 lwsl_user("%s: %s, ord 0x%x\n", __func__, lws_ss_state_name(state),
227 (unsigned int)ack);
228
229 switch (state) {
230 case LWSSSCS_CREATING:
231 lwsl_user("%s: CREATING\n", __func__);
232 lws_ss_client_connect(m->ss);
233 m->pos = 0;
234 break;
235 case LWSSSCS_CONNECTING:
236 break;
237 case LWSSSCS_CONNECTED:
238 lws_ss_request_tx(m->ss);
239 break;
240 case LWSSSCS_ALL_RETRIES_FAILED:
241 /* for this demo app, we want to exit on fail to connect */
242 case LWSSSCS_DISCONNECTED:
243 /* for this demo app, we want to exit after complete flow */
244 lws_sul_schedule(context, 0, &m->sul, use_buffer_50ms,
245 LWS_SET_TIMER_USEC_CANCEL);
246 interrupted = 1;
247 break;
248 case LWSSSCS_DESTROYING:
249 lws_sul_schedule(context, 0, &m->sul, use_buffer_50ms,
250 LWS_SET_TIMER_USEC_CANCEL);
251 break;
252 default:
253 break;
254 }
255
256 return 0;
257 }
258
259 /*
260 * avs event
261 */
262
263 static int
ss_avs_event_rx(void * userobj,const uint8_t * buf,size_t len,int flags)264 ss_avs_event_rx(void *userobj, const uint8_t *buf, size_t len, int flags)
265 {
266 ss_avs_event_t *m = (ss_avs_event_t *)userobj;
267 // struct lws_context *context = (struct lws_context *)m->opaque_data;
268
269 lwsl_notice("%s: rideshare %s, len %d, flags 0x%x\n", __func__,
270 lws_ss_rideshare(m->ss), (int)len, flags);
271
272 // lwsl_hexdump_warn(buf, len);
273
274 bad = 0; /* for this demo, receiving something here == success */
275
276 return 0;
277 }
278
279 static int
ss_avs_event_tx(void * userobj,lws_ss_tx_ordinal_t ord,uint8_t * buf,size_t * len,int * flags)280 ss_avs_event_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
281 size_t *len, int *flags)
282 {
283 ss_avs_event_t *m = (ss_avs_event_t *)userobj;
284 lwsl_notice("%s: rideshare %s\n", __func__, lws_ss_rideshare(m->ss));
285
286 return 1; /* don't transmit anything */
287 }
288
289 static int
ss_avs_event_state(void * userobj,void * sh,lws_ss_constate_t state,lws_ss_tx_ordinal_t ack)290 ss_avs_event_state(void *userobj, void *sh,
291 lws_ss_constate_t state, lws_ss_tx_ordinal_t ack)
292 {
293 ss_avs_event_t *m = (ss_avs_event_t *)userobj;
294 struct lws_context *context = (struct lws_context *)m->opaque_data;
295 lws_ss_info_t ssi;
296
297 lwsl_user("%s: %s, ord 0x%x\n", __func__, lws_ss_state_name(state),
298 (unsigned int)ack);
299
300 switch (state) {
301 case LWSSSCS_CREATING:
302 case LWSSSCS_CONNECTING:
303 break;
304 case LWSSSCS_CONNECTED:
305 if (hss_avs_sync)
306 break;
307
308 lwsl_notice("%s: starting the second avs stream\n", __func__);
309
310 /*
311 * When we have established the event stream, we must POST
312 * on another stream within 10s
313 */
314
315 memset(&ssi, 0, sizeof(ssi));
316 ssi.handle_offset = offsetof(ss_avs_metadata_t, ss);
317 ssi.opaque_user_data_offset = offsetof(ss_avs_metadata_t,
318 opaque_data);
319 ssi.rx = ss_avs_metadata_rx;
320 ssi.tx = ss_avs_metadata_tx;
321 ssi.state = ss_avs_metadata_state;
322 ssi.user_alloc = sizeof(ss_avs_metadata_t);
323 ssi.streamtype = "avs_metadata";
324
325 /*
326 * We want to allow the other side to fill our buffer, but no
327 * more. But it's a bit tricky when the payload is inside
328 * framing like multipart MIME and contains other parts
329 */
330
331 /* uncomment to test rate-limiting, doesn't work with AVS servers */
332 // ssi.manual_initial_tx_credit =
333 // sizeof(((ss_avs_metadata_t *)0)->buf) / 2;
334
335 if (lws_ss_create(context, 0, &ssi, context, &hss_avs_sync,
336 NULL, NULL)) {
337 lwsl_err("%s: failed to create avs metadata secstream\n",
338 __func__);
339 }
340 break;
341 case LWSSSCS_ALL_RETRIES_FAILED:
342 /* for this demo app, we want to exit on fail to connect */
343 interrupted = 1;
344 break;
345 case LWSSSCS_DISCONNECTED:
346 break;
347 case LWSSSCS_DESTROYING:
348 lwsl_notice("%s: DESTROYING\n", __func__);
349 if (wav) {
350 free(wav);
351 wav = NULL;
352 }
353 break;
354 default:
355 break;
356 }
357
358 return 0;
359 }
360
361 int
avs_example_start(struct lws_context * context)362 avs_example_start(struct lws_context *context)
363 {
364 lws_ss_info_t ssi;
365 struct stat stat;
366 int fd;
367
368 if (hss_avs_event)
369 return 0;
370
371 fd = open("./year.wav", O_RDONLY);
372 if (fd < 0) {
373 lwsl_err("%s: failed to open wav file\n", __func__);
374
375 return 1;
376 }
377 if (fstat(fd, &stat) < 0) {
378 lwsl_err("%s: failed to stat wav file\n", __func__);
379
380 goto bail;
381 }
382
383 wav_len = stat.st_size;
384 wav = malloc(wav_len);
385 if (!wav) {
386 lwsl_err("%s: failed to alloc wav buffer", __func__);
387
388 goto bail;
389 }
390 if (read(fd, wav, wav_len) != (int)wav_len) {
391 lwsl_err("%s: failed to read wav\n", __func__);
392
393 goto bail;
394 }
395 close(fd);
396
397 lwsl_user("%s: Starting AVS stream\n", __func__);
398
399 /* AVS wants us to establish the long poll event stream first */
400
401 memset(&ssi, 0, sizeof(ssi));
402 ssi.handle_offset = offsetof(ss_avs_event_t, ss);
403 ssi.opaque_user_data_offset = offsetof(ss_avs_event_t, opaque_data);
404 ssi.rx = ss_avs_event_rx;
405 ssi.tx = ss_avs_event_tx;
406 ssi.state = ss_avs_event_state;
407 ssi.user_alloc = sizeof(ss_avs_event_t);
408 ssi.streamtype = "avs_event";
409
410 if (lws_ss_create(context, 0, &ssi, context, &hss_avs_event, NULL, NULL)) {
411 lwsl_err("%s: failed to create avs event secure stream\n",
412 __func__);
413 free(wav);
414 wav = NULL;
415 return 1;
416 }
417
418 return 0;
419
420 bail:
421 close(fd);
422
423 return 1;
424 }
425