• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * lws-minimal-secure-streams-alexa
3  *
4  * This file is made available under the Creative Commons CC0 1.0
5  * Universal Public Domain Dedication.
6  */
7 
8 #include <libwebsockets.h>
9 #include <string.h>
10 #include <sys/types.h>
11 #include <sys/stat.h>
12 #include <unistd.h>
13 #include <fcntl.h>
14 
15 #include <mpg123.h>
16 
17 #include "private.h"
18 
19 struct lws_ss_handle *hss_avs_event, *hss_avs_sync;
20 
21 /* this is the type for the long poll event channel */
22 
23 typedef struct ss_avs_event {
24 	struct lws_ss_handle 	*ss;
25 	void			*opaque_data;
26 	/* ... application specific state ... */
27 
28 	struct lejp_ctx		jctx;
29 } ss_avs_event_t;
30 
31 enum {
32 	LAMP3STATE_IDLE,
33 	LAMP3STATE_SPOOLING,
34 	LAMP3STATE_DRAINING,
35 };
36 
37 /* this is the type for the utterance metadata (and audio rideshares) */
38 
39 typedef struct ss_avs_metadata {
40 	struct lws_ss_handle 	*ss;
41 	void			*opaque_data;
42 	/* ... application specific state ... */
43 
44 	struct lws_buflist	*dribble; /* next mp3 data while draining last */
45 
46 	struct lejp_ctx		jctx;
47 	size_t			pos;
48 	size_t			mp3_in;
49 	mpg123_handle		*mh;
50 
51 	lws_sorted_usec_list_t	sul;
52 
53 	uint8_t			stash_eom[16];
54 
55 	uint8_t			se_head;
56 	uint8_t			se_tail;
57 
58 	char			mp3_state;
59 	char			first_mp3;
60 	uint8_t			mp3_mime_match;
61 	uint8_t			seen;
62 	uint8_t			inside_mp3;
63 
64 } ss_avs_metadata_t;
65 
66 /*
67  * The remote server only seems to give us a budget of 10s to consume the
68  * results, after that it doesn't drop the stream, but doesn't send us anything
69  * further on it.
70  *
71  * This makes it impossible to optimize buffering for incoming mp3 since we
72  * have to go ahead and take it before the 10s is up.
73  */
74 
75 #define MAX_MP3_IN_BUFFERING_BYTES 32768
76 
77 /*
78  * Structure of JSON metadata for utterance handling
79  */
80 
81 static const char *metadata = "{"
82 	"\"event\": {"
83 		"\"header\": {"
84 			"\"namespace\": \"SpeechRecognizer\","
85 			"\"name\": \"Recognize\","
86 			"\"messageId\": \"message-123\","
87 			"\"dialogRequestId\": \"dialog-request-321\""
88 		"},"
89 		"\"payload\": {"
90 			"\"profile\":"	"\"CLOSE_TALK\","
91 			"\"format\":"	"\"AUDIO_L16_RATE_16000_CHANNELS_1\""
92 		"}"
93 	"}"
94 "}";
95 
96 /*
97  * avs metadata
98  */
99 
100 static void
use_buffer_250ms(lws_sorted_usec_list_t * sul)101 use_buffer_250ms(lws_sorted_usec_list_t *sul)
102 {
103 	ss_avs_metadata_t *m = lws_container_of(sul, ss_avs_metadata_t, sul);
104 	struct lws_context *context = (struct lws_context *)m->opaque_data;
105 	int est = lws_ss_get_est_peer_tx_credit(m->ss);
106 
107 	lwsl_notice("%s: est txcr %d\n", __func__, est);
108 
109 	if (est < MAX_MP3_IN_BUFFERING_BYTES - (MAX_MP3_IN_BUFFERING_BYTES / 4)) {
110 		lwsl_notice("   adding %d\n", MAX_MP3_IN_BUFFERING_BYTES / 4);
111 		lws_ss_add_peer_tx_credit(m->ss, MAX_MP3_IN_BUFFERING_BYTES / 4);
112 	}
113 
114 	lws_sul_schedule(context, 0, &m->sul, use_buffer_250ms,
115 			 250 * LWS_US_PER_MS);
116 }
117 
118 static const char *mp3_mimetype = "application/octet-stream",
119 		  *match2 = "\x0d\x0a\x0d\x0a";
120 
121 static int
ss_avs_mp3_open(ss_avs_metadata_t * m)122 ss_avs_mp3_open(ss_avs_metadata_t *m)
123 {
124 	int r;
125 
126 	lwsl_notice("%s\n", __func__);
127 
128 	m->first_mp3 = 1;
129 	m->mh = mpg123_new(NULL, NULL);
130 	if (!m->mh) {
131 		lwsl_err("%s: unable to make new mp3\n",
132 				__func__);
133 		goto bail;
134 	}
135 	mpg123_format_none(m->mh);
136 	r = mpg123_format(m->mh, 16000, MPG123_M_MONO,
137 			  MPG123_ENC_SIGNED_16);
138 	if (r) {
139 		lwsl_err("%s: mpg123 format failed %d\n",
140 				__func__, r);
141 		goto bail1;
142 	}
143 	r = mpg123_open_feed(m->mh);
144 	if (r) {
145 		lwsl_err("%s: mpg123 open feed failed %d\n",
146 				__func__, r);
147 		goto bail1;
148 	}
149 
150 	return 0;
151 
152 bail1:
153 	mpg123_delete(m->mh);
154 	m->mh = NULL;
155 
156 bail:
157 	return 1;
158 }
159 
160 static lws_ss_state_return_t
161 ss_avs_metadata_rx(void *userobj, const uint8_t *buf, size_t len, int flags);
162 
163 /*
164  * This is called when the mp3 has drained it's input buffer and destroyed
165  * itself.
166  */
167 
168 static int
drain_end_cb(void * v)169 drain_end_cb(void *v)
170 {
171 	ss_avs_metadata_t *m = (ss_avs_metadata_t *)v;
172 	struct lws_context *context = (struct lws_context *)m->opaque_data;
173 	int tot = 0;
174 
175 	lwsl_err("%s\n", __func__);
176 
177 	/*
178 	 * We have drained and destroyed the existing mp3 session.  Is there
179 	 * a new one pending?
180 	 */
181 
182 	m->first_mp3 = 1;
183 	m->mp3_state = LAMP3STATE_IDLE;
184 
185 	if (lws_buflist_total_len(&m->dribble)) {
186 		/* we started another one */
187 
188 		/* resume tx credit top up */
189 		lws_sul_schedule(context, 0, &m->sul, use_buffer_250ms, 1);
190 
191 		if (ss_avs_mp3_open(m))
192 			return 1;
193 
194 		m->mp3_state = LAMP3STATE_SPOOLING;
195 
196 		/*
197 		 * Dump what we stashed from draining into the new mp3
198 		 */
199 
200 		while (lws_buflist_total_len(&m->dribble)) {
201 			size_t s;
202 			uint8_t *u, t;
203 
204 			s = lws_buflist_next_segment_len(&m->dribble, &u);
205 			t = m->stash_eom[m->se_tail];
206 			lwsl_notice("%s: preload %d: %d\n", __func__, (int)s, t);
207 
208 			mpg123_feed(m->mh, u, s);
209 			lws_buflist_use_segment(&m->dribble, s);
210 			if (m->first_mp3) {
211 				play_mp3(m->mh, NULL, NULL);
212 				m->first_mp3 = 0;
213 			}
214 
215 			tot += s;
216 
217 			m->se_tail = (m->se_tail + 1) % sizeof(m->stash_eom);
218 			if (t) {
219 				lwsl_notice("%s: preloaded EOM\n", __func__);
220 
221 				/*
222 				 * We stashed the whole of the message, we need
223 				 * to also do the EOM processing.  We will come
224 				 * back here if there's another message in the
225 				 * stash.
226 				 */
227 
228 				m->mp3_state = LAMP3STATE_DRAINING;
229 				if (m->mh)
230 					play_mp3(NULL, drain_end_cb, m);
231 
232 				lws_ss_add_peer_tx_credit(m->ss, tot);
233 #if 0
234 				/*
235 				 * Put a hold on bringing in any more data
236 				 */
237 				lws_sul_cancel(&m->sul);
238 #endif
239 				/* destroy our copy of the handle */
240 				m->mh = NULL;
241 
242 				break;
243 			}
244 		}
245 
246 		lws_ss_add_peer_tx_credit(m->ss, tot);
247 	}
248 
249 	return 0;
250 }
251 
252 static lws_ss_state_return_t
ss_avs_metadata_rx(void * userobj,const uint8_t * buf,size_t len,int flags)253 ss_avs_metadata_rx(void *userobj, const uint8_t *buf, size_t len, int flags)
254 {
255 	ss_avs_metadata_t *m = (ss_avs_metadata_t *)userobj;
256 	struct lws_context *context = (struct lws_context *)m->opaque_data;
257 	int n = 0, hit = 0;
258 
259 	lwsl_notice("%s: len %d, flags %d (est peer txcr %d)\n", __func__,
260 		    (int)len, flags, lws_ss_get_est_peer_tx_credit(m->ss));
261 
262 	// lwsl_hexdump_warn(buf, len);
263 
264 	if ((flags & LWSSS_FLAG_SOM) && !m->mh && !m->seen) {
265 		m->mp3_mime_match = 0;
266 		m->seen = 0;
267 		m->inside_mp3 = 0;
268 	}
269 
270 	if (!m->inside_mp3) {
271 		/*
272 		 * Identify the part with the mp3 in, if any
273 		 */
274 
275 		while (n < (int)len - 24) {
276 			if (!m->seen) {
277 				if (buf[n] == mp3_mimetype[m->mp3_mime_match]) {
278 					m->mp3_mime_match++;
279 					if (m->mp3_mime_match == 24) {
280 						m->mp3_mime_match = 0;
281 						m->seen = 1;
282 						n++;
283 						continue;
284 					}
285 				} else
286 					m->mp3_mime_match = 0;
287 			} else {
288 				if (buf[n] == match2[m->mp3_mime_match]) {
289 					m->mp3_mime_match++;
290 					if (m->mp3_mime_match == 4) {
291 						m->seen = 0;
292 						m->mp3_mime_match = 0;
293 						hit = 1;
294 						n++;
295 						buf += n;
296 						len -= n;
297 						lwsl_notice("identified reply...\n");
298 						m->inside_mp3 = 1;
299 						break;
300 					}
301 				} else
302 					m->mp3_mime_match = 0;
303 			}
304 
305 			n++;
306 		}
307 
308 		if (!hit) {
309 			lws_ss_add_peer_tx_credit(m->ss, len);
310 			return 0;
311 		}
312 	}
313 
314 	// lwsl_notice("%s: state %d\n", __func__, m->mp3_state);
315 
316 	switch (m->mp3_state) {
317 	case LAMP3STATE_IDLE:
318 
319 		if (hit) {
320 
321 			lws_ss_add_peer_tx_credit(m->ss, n);
322 
323 			if (ss_avs_mp3_open(m))
324 				goto bail;
325 
326 			lws_sul_schedule(context, 0, &m->sul, use_buffer_250ms, 1);
327 			m->mp3_state = LAMP3STATE_SPOOLING;
328 			break;
329 		}
330 
331 		lws_ss_add_peer_tx_credit(m->ss, len);
332 
333 		if (!m->inside_mp3)
334 			break;
335 
336 		/* fallthru */
337 
338 	case LAMP3STATE_SPOOLING:
339 
340 		if (m->dribble)
341 			goto draining;
342 
343 		if (len) {
344 			/*
345 			 * We are shoving encoded mp3 into mpg123-allocated heap
346 			 * buffers... unfortunately mpg123 doesn't seem to
347 			 * expose where it is in its allocated input so we can
348 			 * track how much is stashed.  Instead while in playback
349 			 * mode, we assume 64kbps mp3 encoding, ie, 8KB/s, and
350 			 * run a sul that allows an additional 2KB tx credit
351 			 * every 250ms, with 4KB initial credit.
352 			 */
353 			lwsl_notice("%s: SPOOL %d\n", __func__, (int)len);
354 			mpg123_feed(m->mh, buf, len);
355 
356 			if (m->first_mp3) {
357 				lws_sul_schedule(context, 0, &m->sul,
358 						 use_buffer_250ms, 1);
359 		//		lws_ss_add_peer_tx_credit(m->ss,
360 		//			len + (MAX_MP3_IN_BUFFERING_BYTES / 2));
361 				play_mp3(m->mh, NULL, NULL);
362 			} //else
363 		//		lws_ss_add_peer_tx_credit(m->ss, len);
364 			m->first_mp3 = 0;
365 		}
366 
367 		if (flags & LWSSS_FLAG_EOM) {
368 			/*
369 			 * This means one "message" / mime part with mp3 data
370 			 * has finished coming in.  But there may be whole other
371 			 * parts with other mp3s following, with potentially
372 			 * different mp3 parameters.  So we want to tell this
373 			 * one to drain and finish and destroy the current mp3
374 			 * object before we go on.
375 			 *
376 			 * But not knowing the length of the current one, there
377 			 * will already be outstanding tx credit at the server,
378 			 * so it's going to spam us with the next part before we
379 			 * have the new mp3 sink for it.
380 			 */
381 			lwsl_notice("%s: EOM\n", __func__);
382 			m->mp3_mime_match = 0;
383 			m->seen = 0;
384 			m->mp3_state = LAMP3STATE_DRAINING;
385 			/* from input POV, we're no longer inside an mp3 */
386 			m->inside_mp3 = 0;
387 			if (m->mh)
388 				play_mp3(NULL, drain_end_cb, m);
389 #if 0
390 			/*
391 			 * Put a hold on bringing in any more data
392 			 */
393 			lws_sul_cancel(&m->sul);
394 #endif
395 			/* destroy our copy of the handle */
396 			m->mh = NULL;
397 		}
398 		break;
399 
400 	case LAMP3STATE_DRAINING:
401 
402 draining:
403 		if (buf && len && m->inside_mp3) {
404 			lwsl_notice("%s: DRAINING: stashing %d: %d %d %d\n",
405 				    __func__, (int)len, !!(flags & LWSSS_FLAG_EOM),
406 				    m->se_head, m->se_tail);
407 			lwsl_hexdump_notice(buf, len);
408 			if (lws_buflist_append_segment(&m->dribble, buf, len) < 0)
409 				goto bail;
410 
411 			m->stash_eom[m->se_head] = !!(flags & LWSSS_FLAG_EOM);
412 			m->se_head = (m->se_head + 1) % sizeof(m->stash_eom);
413 			lwsl_notice("%s: next head %d\n", __func__, m->se_head);
414 
415 			lws_ss_add_peer_tx_credit(m->ss, len);
416 		}
417 
418 		if (flags & LWSSS_FLAG_EOM) {
419 			if (!len && m->se_head != m->se_tail) {
420 				/* 0-len EOM... retrospectively mark last stash */
421 				lwsl_notice("%s: retro EOM\n", __func__);
422 				m->stash_eom[(m->se_head - 1) % sizeof(m->stash_eom)] = 1;
423 			}
424 
425 			lwsl_notice("%s: Draining EOM\n", __func__);
426 			m->inside_mp3 = 0;
427 		}
428 		/*
429 		 * Don't provide any additional tx credit... we're just
430 		 * mopping up the overspill from the previous mp3 credit
431 		 */
432 		break;
433 	}
434 
435 	return 0;
436 
437 bail:
438 	return -1;
439 }
440 
441 /*
442  * Because this is multipart mime in h2 currently, use a "rideshare" to handle
443  * first the native metadata on this secure stream, then the "rideshare" audio
444  * stream mentioned in the policy.
445  *
446  * Lws takes care of interleaving the multipart mime pieces since the policy
447  * calls for it.
448  */
449 
450 static lws_ss_state_return_t
ss_avs_metadata_tx(void * userobj,lws_ss_tx_ordinal_t ord,uint8_t * buf,size_t * len,int * flags)451 ss_avs_metadata_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
452 		   size_t *len, int *flags)
453 {
454 	ss_avs_metadata_t *m = (ss_avs_metadata_t *)userobj;
455 	size_t tot;
456 	int n;
457 
458 	// lwsl_notice("%s %d\n", __func__, (int)m->pos);
459 
460 	if ((long)m->pos < 0) {
461 		*len = 0;
462 		lwsl_info("%s: skip\n", __func__);
463 		return 1;
464 	}
465 
466 	if (!strcmp(lws_ss_rideshare(m->ss), "avs_audio")) {
467 
468 		/* audio rideshare part */
469 
470 		if (!m->pos)
471 			*flags |= LWSSS_FLAG_SOM;
472 
473 		n = spool_capture(buf, *len);
474 		if (n > 0)
475 			*len = n;
476 		else
477 			*len = 0;
478 		if (!n) {
479 			lwsl_info("%s: trying to skip tx\n", __func__);
480 			return 1;
481 		}
482 
483 		m->pos += *len;
484 
485 		if (n < 0) {
486 			*flags |= LWSSS_FLAG_EOM;
487 			m->pos = (long)-1l; /* ban subsequent until new stream */
488 		}
489 
490 		lwsl_notice("%s: tx audio %d\n", __func__, (int)*len);
491 
492 #if 0
493 		{
494 			int ff = open("/tmp/z1", O_RDWR | O_CREAT | O_APPEND, 0666);
495 			if (ff == -1)
496 				lwsl_err("%s: errno %d\n", __func__, errno);
497 			write(ff, buf, *len);
498 			close(ff);
499 		}
500 #endif
501 
502 		return 0;
503 	}
504 
505 	/* metadata part */
506 
507 	tot = strlen(metadata);
508 
509 	if (!m->pos)
510 		*flags |= LWSSS_FLAG_SOM;
511 
512 	if (*len > tot - m->pos)
513 		*len = tot - m->pos;
514 
515 	memcpy(buf, metadata + m->pos, *len);
516 
517 	m->pos += *len;
518 
519 	if (m->pos == tot) {
520 		lwsl_notice("metadata done\n");
521 		*flags |= LWSSS_FLAG_EOM;
522 		m->pos = 0; /* for next time */
523 	}
524 
525 	return 0;
526 }
527 
528 static lws_ss_state_return_t
ss_avs_metadata_state(void * userobj,void * sh,lws_ss_constate_t state,lws_ss_tx_ordinal_t ack)529 ss_avs_metadata_state(void *userobj, void *sh,
530 		      lws_ss_constate_t state, lws_ss_tx_ordinal_t ack)
531 {
532 	ss_avs_metadata_t *m = (ss_avs_metadata_t *)userobj;
533 	struct lws_context *context = (struct lws_context *)m->opaque_data;
534 
535 	lwsl_notice("%s: %p: %s, ord 0x%x\n", __func__, m->ss,
536 		    lws_ss_state_name(state), (unsigned int)ack);
537 
538 	switch (state) {
539 	case LWSSSCS_CREATING:
540 		return lws_ss_client_connect(m->ss);
541 
542 	case LWSSSCS_CONNECTING:
543 		m->pos = 0;
544 		break;
545 	case LWSSSCS_CONNECTED:
546 		lwsl_info("%s: CONNECTED\n", __func__);
547 		return lws_ss_request_tx(m->ss);
548 
549 	case LWSSSCS_DISCONNECTED:
550 		lws_sul_cancel(&m->sul);
551 		//if (m->mh) {
552 			play_mp3(NULL, NULL, NULL);
553 			m->mh = NULL;
554 		//}
555 		/*
556 		 * For this stream encapsulating an alexa exchange, dropping
557 		 * is the end of its life
558 		 */
559 		return 1;
560 
561 	case LWSSSCS_DESTROYING:
562 		lws_buflist_destroy_all_segments(&m->dribble);
563 		break;
564 	default:
565 		break;
566 	}
567 
568 	return 0;
569 }
570 
571 /*
572  * avs event
573  */
574 
575 static lws_ss_state_return_t
ss_avs_event_rx(void * userobj,const uint8_t * buf,size_t len,int flags)576 ss_avs_event_rx(void *userobj, const uint8_t *buf, size_t len, int flags)
577 {
578 	return 0;
579 }
580 
581 static lws_ss_state_return_t
ss_avs_event_tx(void * userobj,lws_ss_tx_ordinal_t ord,uint8_t * buf,size_t * len,int * flags)582 ss_avs_event_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
583 		      size_t *len, int *flags)
584 {
585 	return 1; /* don't transmit anything */
586 }
587 
588 static lws_ss_state_return_t
ss_avs_event_state(void * userobj,void * sh,lws_ss_constate_t state,lws_ss_tx_ordinal_t ack)589 ss_avs_event_state(void *userobj, void *sh,
590 		   lws_ss_constate_t state, lws_ss_tx_ordinal_t ack)
591 {
592 	lwsl_info("%s: %s, ord 0x%x\n", __func__, lws_ss_state_name(state),
593 		  (unsigned int)ack);
594 
595 	switch (state) {
596 	case LWSSSCS_CREATING:
597 		mpg123_init();
598 		break;
599 	case LWSSSCS_CONNECTING:
600 		break;
601 	case LWSSSCS_CONNECTED:
602 		lwsl_user("Connected to Alexa... speak \"Alexa, ...\"\n");
603 		break;
604 	case LWSSSCS_DISCONNECTED:
605 		lwsl_user("Disconnected from Alexa\n");
606 		break;
607 	case LWSSSCS_DESTROYING:
608 		mpg123_exit();
609 		break;
610 	default:
611 		break;
612 	}
613 
614 	return 0;
615 }
616 
617 int
avs_query_start(struct lws_context * context)618 avs_query_start(struct lws_context *context)
619 {
620 	lws_ss_info_t ssi;
621 
622 	lwsl_notice("%s:\n", __func__);
623 
624 	memset(&ssi, 0, sizeof(ssi));
625 	ssi.handle_offset	    = offsetof(ss_avs_metadata_t, ss);
626 	ssi.opaque_user_data_offset = offsetof(ss_avs_metadata_t, opaque_data);
627 	ssi.rx			    = ss_avs_metadata_rx;
628 	ssi.tx			    = ss_avs_metadata_tx;
629 	ssi.state		    = ss_avs_metadata_state;
630 	ssi.user_alloc		    = sizeof(ss_avs_metadata_t);
631 	ssi.streamtype		    = "avs_metadata";
632 
633 	ssi.manual_initial_tx_credit = 8192;
634 
635 	if (lws_ss_create(context, 0, &ssi, context, &hss_avs_sync, NULL, NULL)) {
636 		lwsl_err("%s: failed to create avs metadata secstream\n",
637 			 __func__);
638 
639 		return 1;
640 	}
641 
642 	lwsl_user("%s: created query stream %p\n", __func__, hss_avs_sync);
643 
644 	return 0;
645 }
646 
647 int
avs_example_start(struct lws_context * context)648 avs_example_start(struct lws_context *context)
649 {
650 	lws_ss_info_t ssi;
651 
652 	if (hss_avs_event)
653 		return 0;
654 
655 	lwsl_info("%s: Starting AVS stream\n", __func__);
656 
657 	/* AVS wants us to establish the long poll event stream first */
658 
659 	memset(&ssi, 0, sizeof(ssi));
660 	ssi.handle_offset	    = offsetof(ss_avs_event_t, ss);
661 	ssi.opaque_user_data_offset = offsetof(ss_avs_event_t, opaque_data);
662 	ssi.rx			    = ss_avs_event_rx;
663 	ssi.tx			    = ss_avs_event_tx;
664 	ssi.state		    = ss_avs_event_state;
665 	ssi.user_alloc		    = sizeof(ss_avs_event_t);
666 	ssi.streamtype		    = "avs_event";
667 
668 	if (lws_ss_create(context, 0, &ssi, context, &hss_avs_event, NULL, NULL)) {
669 		lwsl_err("%s: failed to create avs event secure stream\n",
670 			 __func__);
671 		return 1;
672 	}
673 
674 	return 0;
675 }
676