1 /*
2 * lws-minimal-secure-streams-alexa
3 *
4 * This file is made available under the Creative Commons CC0 1.0
5 * Universal Public Domain Dedication.
6 */
7
8 #include <libwebsockets.h>
9 #include <string.h>
10 #include <sys/types.h>
11 #include <sys/stat.h>
12 #include <unistd.h>
13 #include <fcntl.h>
14
15 #include <mpg123.h>
16
17 #include "private.h"
18
19 struct lws_ss_handle *hss_avs_event, *hss_avs_sync;
20
21 /* this is the type for the long poll event channel */
22
23 typedef struct ss_avs_event {
24 struct lws_ss_handle *ss;
25 void *opaque_data;
26 /* ... application specific state ... */
27
28 struct lejp_ctx jctx;
29 } ss_avs_event_t;
30
31 enum {
32 LAMP3STATE_IDLE,
33 LAMP3STATE_SPOOLING,
34 LAMP3STATE_DRAINING,
35 };
36
37 /* this is the type for the utterance metadata (and audio rideshares) */
38
39 typedef struct ss_avs_metadata {
40 struct lws_ss_handle *ss;
41 void *opaque_data;
42 /* ... application specific state ... */
43
44 struct lws_buflist *dribble; /* next mp3 data while draining last */
45
46 struct lejp_ctx jctx;
47 size_t pos;
48 size_t mp3_in;
49 mpg123_handle *mh;
50
51 lws_sorted_usec_list_t sul;
52
53 uint8_t stash_eom[16];
54
55 uint8_t se_head;
56 uint8_t se_tail;
57
58 char mp3_state;
59 char first_mp3;
60 uint8_t mp3_mime_match;
61 uint8_t seen;
62 uint8_t inside_mp3;
63
64 } ss_avs_metadata_t;
65
66 /*
67 * The remote server only seems to give us a budget of 10s to consume the
68 * results, after that it doesn't drop the stream, but doesn't send us anything
69 * further on it.
70 *
71 * This makes it impossible to optimize buffering for incoming mp3 since we
72 * have to go ahead and take it before the 10s is up.
73 */
74
75 #define MAX_MP3_IN_BUFFERING_BYTES 32768
76
77 /*
78 * Structure of JSON metadata for utterance handling
79 */
80
81 static const char *metadata = "{"
82 "\"event\": {"
83 "\"header\": {"
84 "\"namespace\": \"SpeechRecognizer\","
85 "\"name\": \"Recognize\","
86 "\"messageId\": \"message-123\","
87 "\"dialogRequestId\": \"dialog-request-321\""
88 "},"
89 "\"payload\": {"
90 "\"profile\":" "\"CLOSE_TALK\","
91 "\"format\":" "\"AUDIO_L16_RATE_16000_CHANNELS_1\""
92 "}"
93 "}"
94 "}";
95
96 /*
97 * avs metadata
98 */
99
100 static void
use_buffer_250ms(lws_sorted_usec_list_t * sul)101 use_buffer_250ms(lws_sorted_usec_list_t *sul)
102 {
103 ss_avs_metadata_t *m = lws_container_of(sul, ss_avs_metadata_t, sul);
104 struct lws_context *context = (struct lws_context *)m->opaque_data;
105 int est = lws_ss_get_est_peer_tx_credit(m->ss);
106
107 lwsl_notice("%s: est txcr %d\n", __func__, est);
108
109 if (est < MAX_MP3_IN_BUFFERING_BYTES - (MAX_MP3_IN_BUFFERING_BYTES / 4)) {
110 lwsl_notice(" adding %d\n", MAX_MP3_IN_BUFFERING_BYTES / 4);
111 lws_ss_add_peer_tx_credit(m->ss, MAX_MP3_IN_BUFFERING_BYTES / 4);
112 }
113
114 lws_sul_schedule(context, 0, &m->sul, use_buffer_250ms,
115 250 * LWS_US_PER_MS);
116 }
117
118 static const char *mp3_mimetype = "application/octet-stream",
119 *match2 = "\x0d\x0a\x0d\x0a";
120
121 static int
ss_avs_mp3_open(ss_avs_metadata_t * m)122 ss_avs_mp3_open(ss_avs_metadata_t *m)
123 {
124 int r;
125
126 lwsl_notice("%s\n", __func__);
127
128 m->first_mp3 = 1;
129 m->mh = mpg123_new(NULL, NULL);
130 if (!m->mh) {
131 lwsl_err("%s: unable to make new mp3\n",
132 __func__);
133 goto bail;
134 }
135 mpg123_format_none(m->mh);
136 r = mpg123_format(m->mh, 16000, MPG123_M_MONO,
137 MPG123_ENC_SIGNED_16);
138 if (r) {
139 lwsl_err("%s: mpg123 format failed %d\n",
140 __func__, r);
141 goto bail1;
142 }
143 r = mpg123_open_feed(m->mh);
144 if (r) {
145 lwsl_err("%s: mpg123 open feed failed %d\n",
146 __func__, r);
147 goto bail1;
148 }
149
150 return 0;
151
152 bail1:
153 mpg123_delete(m->mh);
154 m->mh = NULL;
155
156 bail:
157 return 1;
158 }
159
160 static lws_ss_state_return_t
161 ss_avs_metadata_rx(void *userobj, const uint8_t *buf, size_t len, int flags);
162
163 /*
164 * This is called when the mp3 has drained it's input buffer and destroyed
165 * itself.
166 */
167
168 static int
drain_end_cb(void * v)169 drain_end_cb(void *v)
170 {
171 ss_avs_metadata_t *m = (ss_avs_metadata_t *)v;
172 struct lws_context *context = (struct lws_context *)m->opaque_data;
173 int tot = 0;
174
175 lwsl_err("%s\n", __func__);
176
177 /*
178 * We have drained and destroyed the existing mp3 session. Is there
179 * a new one pending?
180 */
181
182 m->first_mp3 = 1;
183 m->mp3_state = LAMP3STATE_IDLE;
184
185 if (lws_buflist_total_len(&m->dribble)) {
186 /* we started another one */
187
188 /* resume tx credit top up */
189 lws_sul_schedule(context, 0, &m->sul, use_buffer_250ms, 1);
190
191 if (ss_avs_mp3_open(m))
192 return 1;
193
194 m->mp3_state = LAMP3STATE_SPOOLING;
195
196 /*
197 * Dump what we stashed from draining into the new mp3
198 */
199
200 while (lws_buflist_total_len(&m->dribble)) {
201 size_t s;
202 uint8_t *u, t;
203
204 s = lws_buflist_next_segment_len(&m->dribble, &u);
205 t = m->stash_eom[m->se_tail];
206 lwsl_notice("%s: preload %d: %d\n", __func__, (int)s, t);
207
208 mpg123_feed(m->mh, u, s);
209 lws_buflist_use_segment(&m->dribble, s);
210 if (m->first_mp3) {
211 play_mp3(m->mh, NULL, NULL);
212 m->first_mp3 = 0;
213 }
214
215 tot += s;
216
217 m->se_tail = (m->se_tail + 1) % sizeof(m->stash_eom);
218 if (t) {
219 lwsl_notice("%s: preloaded EOM\n", __func__);
220
221 /*
222 * We stashed the whole of the message, we need
223 * to also do the EOM processing. We will come
224 * back here if there's another message in the
225 * stash.
226 */
227
228 m->mp3_state = LAMP3STATE_DRAINING;
229 if (m->mh)
230 play_mp3(NULL, drain_end_cb, m);
231
232 lws_ss_add_peer_tx_credit(m->ss, tot);
233 #if 0
234 /*
235 * Put a hold on bringing in any more data
236 */
237 lws_sul_cancel(&m->sul);
238 #endif
239 /* destroy our copy of the handle */
240 m->mh = NULL;
241
242 break;
243 }
244 }
245
246 lws_ss_add_peer_tx_credit(m->ss, tot);
247 }
248
249 return 0;
250 }
251
252 static lws_ss_state_return_t
ss_avs_metadata_rx(void * userobj,const uint8_t * buf,size_t len,int flags)253 ss_avs_metadata_rx(void *userobj, const uint8_t *buf, size_t len, int flags)
254 {
255 ss_avs_metadata_t *m = (ss_avs_metadata_t *)userobj;
256 struct lws_context *context = (struct lws_context *)m->opaque_data;
257 int n = 0, hit = 0;
258
259 lwsl_notice("%s: len %d, flags %d (est peer txcr %d)\n", __func__,
260 (int)len, flags, lws_ss_get_est_peer_tx_credit(m->ss));
261
262 // lwsl_hexdump_warn(buf, len);
263
264 if ((flags & LWSSS_FLAG_SOM) && !m->mh && !m->seen) {
265 m->mp3_mime_match = 0;
266 m->seen = 0;
267 m->inside_mp3 = 0;
268 }
269
270 if (!m->inside_mp3) {
271 /*
272 * Identify the part with the mp3 in, if any
273 */
274
275 while (n < (int)len - 24) {
276 if (!m->seen) {
277 if (buf[n] == mp3_mimetype[m->mp3_mime_match]) {
278 m->mp3_mime_match++;
279 if (m->mp3_mime_match == 24) {
280 m->mp3_mime_match = 0;
281 m->seen = 1;
282 n++;
283 continue;
284 }
285 } else
286 m->mp3_mime_match = 0;
287 } else {
288 if (buf[n] == match2[m->mp3_mime_match]) {
289 m->mp3_mime_match++;
290 if (m->mp3_mime_match == 4) {
291 m->seen = 0;
292 m->mp3_mime_match = 0;
293 hit = 1;
294 n++;
295 buf += n;
296 len -= n;
297 lwsl_notice("identified reply...\n");
298 m->inside_mp3 = 1;
299 break;
300 }
301 } else
302 m->mp3_mime_match = 0;
303 }
304
305 n++;
306 }
307
308 if (!hit) {
309 lws_ss_add_peer_tx_credit(m->ss, len);
310 return 0;
311 }
312 }
313
314 // lwsl_notice("%s: state %d\n", __func__, m->mp3_state);
315
316 switch (m->mp3_state) {
317 case LAMP3STATE_IDLE:
318
319 if (hit) {
320
321 lws_ss_add_peer_tx_credit(m->ss, n);
322
323 if (ss_avs_mp3_open(m))
324 goto bail;
325
326 lws_sul_schedule(context, 0, &m->sul, use_buffer_250ms, 1);
327 m->mp3_state = LAMP3STATE_SPOOLING;
328 break;
329 }
330
331 lws_ss_add_peer_tx_credit(m->ss, len);
332
333 if (!m->inside_mp3)
334 break;
335
336 /* fallthru */
337
338 case LAMP3STATE_SPOOLING:
339
340 if (m->dribble)
341 goto draining;
342
343 if (len) {
344 /*
345 * We are shoving encoded mp3 into mpg123-allocated heap
346 * buffers... unfortunately mpg123 doesn't seem to
347 * expose where it is in its allocated input so we can
348 * track how much is stashed. Instead while in playback
349 * mode, we assume 64kbps mp3 encoding, ie, 8KB/s, and
350 * run a sul that allows an additional 2KB tx credit
351 * every 250ms, with 4KB initial credit.
352 */
353 lwsl_notice("%s: SPOOL %d\n", __func__, (int)len);
354 mpg123_feed(m->mh, buf, len);
355
356 if (m->first_mp3) {
357 lws_sul_schedule(context, 0, &m->sul,
358 use_buffer_250ms, 1);
359 // lws_ss_add_peer_tx_credit(m->ss,
360 // len + (MAX_MP3_IN_BUFFERING_BYTES / 2));
361 play_mp3(m->mh, NULL, NULL);
362 } //else
363 // lws_ss_add_peer_tx_credit(m->ss, len);
364 m->first_mp3 = 0;
365 }
366
367 if (flags & LWSSS_FLAG_EOM) {
368 /*
369 * This means one "message" / mime part with mp3 data
370 * has finished coming in. But there may be whole other
371 * parts with other mp3s following, with potentially
372 * different mp3 parameters. So we want to tell this
373 * one to drain and finish and destroy the current mp3
374 * object before we go on.
375 *
376 * But not knowing the length of the current one, there
377 * will already be outstanding tx credit at the server,
378 * so it's going to spam us with the next part before we
379 * have the new mp3 sink for it.
380 */
381 lwsl_notice("%s: EOM\n", __func__);
382 m->mp3_mime_match = 0;
383 m->seen = 0;
384 m->mp3_state = LAMP3STATE_DRAINING;
385 /* from input POV, we're no longer inside an mp3 */
386 m->inside_mp3 = 0;
387 if (m->mh)
388 play_mp3(NULL, drain_end_cb, m);
389 #if 0
390 /*
391 * Put a hold on bringing in any more data
392 */
393 lws_sul_cancel(&m->sul);
394 #endif
395 /* destroy our copy of the handle */
396 m->mh = NULL;
397 }
398 break;
399
400 case LAMP3STATE_DRAINING:
401
402 draining:
403 if (buf && len && m->inside_mp3) {
404 lwsl_notice("%s: DRAINING: stashing %d: %d %d %d\n",
405 __func__, (int)len, !!(flags & LWSSS_FLAG_EOM),
406 m->se_head, m->se_tail);
407 lwsl_hexdump_notice(buf, len);
408 if (lws_buflist_append_segment(&m->dribble, buf, len) < 0)
409 goto bail;
410
411 m->stash_eom[m->se_head] = !!(flags & LWSSS_FLAG_EOM);
412 m->se_head = (m->se_head + 1) % sizeof(m->stash_eom);
413 lwsl_notice("%s: next head %d\n", __func__, m->se_head);
414
415 lws_ss_add_peer_tx_credit(m->ss, len);
416 }
417
418 if (flags & LWSSS_FLAG_EOM) {
419 if (!len && m->se_head != m->se_tail) {
420 /* 0-len EOM... retrospectively mark last stash */
421 lwsl_notice("%s: retro EOM\n", __func__);
422 m->stash_eom[(m->se_head - 1) % sizeof(m->stash_eom)] = 1;
423 }
424
425 lwsl_notice("%s: Draining EOM\n", __func__);
426 m->inside_mp3 = 0;
427 }
428 /*
429 * Don't provide any additional tx credit... we're just
430 * mopping up the overspill from the previous mp3 credit
431 */
432 break;
433 }
434
435 return 0;
436
437 bail:
438 return -1;
439 }
440
441 /*
442 * Because this is multipart mime in h2 currently, use a "rideshare" to handle
443 * first the native metadata on this secure stream, then the "rideshare" audio
444 * stream mentioned in the policy.
445 *
446 * Lws takes care of interleaving the multipart mime pieces since the policy
447 * calls for it.
448 */
449
450 static lws_ss_state_return_t
ss_avs_metadata_tx(void * userobj,lws_ss_tx_ordinal_t ord,uint8_t * buf,size_t * len,int * flags)451 ss_avs_metadata_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
452 size_t *len, int *flags)
453 {
454 ss_avs_metadata_t *m = (ss_avs_metadata_t *)userobj;
455 size_t tot;
456 int n;
457
458 // lwsl_notice("%s %d\n", __func__, (int)m->pos);
459
460 if ((long)m->pos < 0) {
461 *len = 0;
462 lwsl_info("%s: skip\n", __func__);
463 return 1;
464 }
465
466 if (!strcmp(lws_ss_rideshare(m->ss), "avs_audio")) {
467
468 /* audio rideshare part */
469
470 if (!m->pos)
471 *flags |= LWSSS_FLAG_SOM;
472
473 n = spool_capture(buf, *len);
474 if (n > 0)
475 *len = n;
476 else
477 *len = 0;
478 if (!n) {
479 lwsl_info("%s: trying to skip tx\n", __func__);
480 return 1;
481 }
482
483 m->pos += *len;
484
485 if (n < 0) {
486 *flags |= LWSSS_FLAG_EOM;
487 m->pos = (long)-1l; /* ban subsequent until new stream */
488 }
489
490 lwsl_notice("%s: tx audio %d\n", __func__, (int)*len);
491
492 #if 0
493 {
494 int ff = open("/tmp/z1", O_RDWR | O_CREAT | O_APPEND, 0666);
495 if (ff == -1)
496 lwsl_err("%s: errno %d\n", __func__, errno);
497 write(ff, buf, *len);
498 close(ff);
499 }
500 #endif
501
502 return 0;
503 }
504
505 /* metadata part */
506
507 tot = strlen(metadata);
508
509 if (!m->pos)
510 *flags |= LWSSS_FLAG_SOM;
511
512 if (*len > tot - m->pos)
513 *len = tot - m->pos;
514
515 memcpy(buf, metadata + m->pos, *len);
516
517 m->pos += *len;
518
519 if (m->pos == tot) {
520 lwsl_notice("metadata done\n");
521 *flags |= LWSSS_FLAG_EOM;
522 m->pos = 0; /* for next time */
523 }
524
525 return 0;
526 }
527
528 static lws_ss_state_return_t
ss_avs_metadata_state(void * userobj,void * sh,lws_ss_constate_t state,lws_ss_tx_ordinal_t ack)529 ss_avs_metadata_state(void *userobj, void *sh,
530 lws_ss_constate_t state, lws_ss_tx_ordinal_t ack)
531 {
532 ss_avs_metadata_t *m = (ss_avs_metadata_t *)userobj;
533 struct lws_context *context = (struct lws_context *)m->opaque_data;
534
535 lwsl_notice("%s: %p: %s, ord 0x%x\n", __func__, m->ss,
536 lws_ss_state_name(state), (unsigned int)ack);
537
538 switch (state) {
539 case LWSSSCS_CREATING:
540 return lws_ss_client_connect(m->ss);
541
542 case LWSSSCS_CONNECTING:
543 m->pos = 0;
544 break;
545 case LWSSSCS_CONNECTED:
546 lwsl_info("%s: CONNECTED\n", __func__);
547 return lws_ss_request_tx(m->ss);
548
549 case LWSSSCS_DISCONNECTED:
550 lws_sul_cancel(&m->sul);
551 //if (m->mh) {
552 play_mp3(NULL, NULL, NULL);
553 m->mh = NULL;
554 //}
555 /*
556 * For this stream encapsulating an alexa exchange, dropping
557 * is the end of its life
558 */
559 return 1;
560
561 case LWSSSCS_DESTROYING:
562 lws_buflist_destroy_all_segments(&m->dribble);
563 break;
564 default:
565 break;
566 }
567
568 return 0;
569 }
570
571 /*
572 * avs event
573 */
574
575 static lws_ss_state_return_t
ss_avs_event_rx(void * userobj,const uint8_t * buf,size_t len,int flags)576 ss_avs_event_rx(void *userobj, const uint8_t *buf, size_t len, int flags)
577 {
578 return 0;
579 }
580
581 static lws_ss_state_return_t
ss_avs_event_tx(void * userobj,lws_ss_tx_ordinal_t ord,uint8_t * buf,size_t * len,int * flags)582 ss_avs_event_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
583 size_t *len, int *flags)
584 {
585 return 1; /* don't transmit anything */
586 }
587
588 static lws_ss_state_return_t
ss_avs_event_state(void * userobj,void * sh,lws_ss_constate_t state,lws_ss_tx_ordinal_t ack)589 ss_avs_event_state(void *userobj, void *sh,
590 lws_ss_constate_t state, lws_ss_tx_ordinal_t ack)
591 {
592 lwsl_info("%s: %s, ord 0x%x\n", __func__, lws_ss_state_name(state),
593 (unsigned int)ack);
594
595 switch (state) {
596 case LWSSSCS_CREATING:
597 mpg123_init();
598 break;
599 case LWSSSCS_CONNECTING:
600 break;
601 case LWSSSCS_CONNECTED:
602 lwsl_user("Connected to Alexa... speak \"Alexa, ...\"\n");
603 break;
604 case LWSSSCS_DISCONNECTED:
605 lwsl_user("Disconnected from Alexa\n");
606 break;
607 case LWSSSCS_DESTROYING:
608 mpg123_exit();
609 break;
610 default:
611 break;
612 }
613
614 return 0;
615 }
616
617 int
avs_query_start(struct lws_context * context)618 avs_query_start(struct lws_context *context)
619 {
620 lws_ss_info_t ssi;
621
622 lwsl_notice("%s:\n", __func__);
623
624 memset(&ssi, 0, sizeof(ssi));
625 ssi.handle_offset = offsetof(ss_avs_metadata_t, ss);
626 ssi.opaque_user_data_offset = offsetof(ss_avs_metadata_t, opaque_data);
627 ssi.rx = ss_avs_metadata_rx;
628 ssi.tx = ss_avs_metadata_tx;
629 ssi.state = ss_avs_metadata_state;
630 ssi.user_alloc = sizeof(ss_avs_metadata_t);
631 ssi.streamtype = "avs_metadata";
632
633 ssi.manual_initial_tx_credit = 8192;
634
635 if (lws_ss_create(context, 0, &ssi, context, &hss_avs_sync, NULL, NULL)) {
636 lwsl_err("%s: failed to create avs metadata secstream\n",
637 __func__);
638
639 return 1;
640 }
641
642 lwsl_user("%s: created query stream %p\n", __func__, hss_avs_sync);
643
644 return 0;
645 }
646
647 int
avs_example_start(struct lws_context * context)648 avs_example_start(struct lws_context *context)
649 {
650 lws_ss_info_t ssi;
651
652 if (hss_avs_event)
653 return 0;
654
655 lwsl_info("%s: Starting AVS stream\n", __func__);
656
657 /* AVS wants us to establish the long poll event stream first */
658
659 memset(&ssi, 0, sizeof(ssi));
660 ssi.handle_offset = offsetof(ss_avs_event_t, ss);
661 ssi.opaque_user_data_offset = offsetof(ss_avs_event_t, opaque_data);
662 ssi.rx = ss_avs_event_rx;
663 ssi.tx = ss_avs_event_tx;
664 ssi.state = ss_avs_event_state;
665 ssi.user_alloc = sizeof(ss_avs_event_t);
666 ssi.streamtype = "avs_event";
667
668 if (lws_ss_create(context, 0, &ssi, context, &hss_avs_event, NULL, NULL)) {
669 lwsl_err("%s: failed to create avs event secure stream\n",
670 __func__);
671 return 1;
672 }
673
674 return 0;
675 }
676