• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * lws-minimal-secure-streams-avs
3  *
4  * Written in 2019-2020 by Andy Green <andy@warmcat.com>
5  *
6  * This file is made available under the Creative Commons CC0 1.0
7  * Universal Public Domain Dedication.
8  *
9  * This sends a canned WAV and received (and discards) the mp3 response.
10  * However it rate-limits the response reception to manage a small ringbuffer
11  * using ss / h2 flow control apis, reflecting consumption at 64kbps and only
12  * and 8KB buffer, indtended to model optimizing rx buffering on mp3 playback
13  * on a constrained device.
14  */
15 
16 #include <libwebsockets.h>
17 #include <string.h>
18 #include <sys/types.h>
19 #include <sys/stat.h>
20 #include <unistd.h>
21 #include <assert.h>
22 #include <fcntl.h>
23 
24 extern int interrupted, bad;
25 static struct lws_ss_handle *hss_avs_event, *hss_avs_sync;
26 static uint8_t *wav;
27 static size_t wav_len;
28 
29 typedef struct ss_avs_event {
30 	struct lws_ss_handle 	*ss;
31 	void			*opaque_data;
32 	/* ... application specific state ... */
33 	struct lejp_ctx		jctx;
34 } ss_avs_event_t;
35 
36 typedef struct ss_avs_metadata {
37 	struct lws_ss_handle 	*ss;
38 	void			*opaque_data;
39 	/* ... application specific state ... */
40 	struct lejp_ctx		jctx;
41 	size_t			pos;
42 
43 	/*
44 	 * We simulate a ringbuffer that is used up by a sul at 64Kbit/sec
45 	 * rate, and managed at the same rate using tx credit
46 	 */
47 
48 	lws_sorted_usec_list_t	sul;
49 	uint8_t			buf[256 * 1024]; /* to test rate-limiting, set to 8 * 1024 */
50 	int			head;
51 	int			tail;
52 
53 	char			filled;
54 
55 } ss_avs_metadata_t;
56 
57 static const char *metadata = "{"
58 	"\"event\": {"
59 		"\"header\": {"
60 			"\"namespace\": \"SpeechRecognizer\","
61 			"\"name\": \"Recognize\","
62 			"\"messageId\": \"message-123\","
63 			"\"dialogRequestId\": \"dialog-request-321\""
64 		"},"
65 		"\"payload\": {"
66 			"\"profile\":"	"\"CLOSE_TALK\","
67 			"\"format\":"	"\"AUDIO_L16_RATE_16000_CHANNELS_1\""
68 		"}"
69 	"}"
70 "}";
71 
72 /*
73  * avs metadata
74  */
75 
76 static void
use_buffer_50ms(lws_sorted_usec_list_t * sul)77 use_buffer_50ms(lws_sorted_usec_list_t *sul)
78 {
79 	ss_avs_metadata_t *m = lws_container_of(sul, ss_avs_metadata_t, sul);
80 	struct lws_context *context = (struct lws_context *)m->opaque_data;
81 	size_t n;
82 	int e;
83 
84 	/*
85 	 * Use up 50ms-worth (8KB / 20) == 401 bytes of buffered data
86 	 */
87 
88 	/* remaining data in buffer */
89 	n = ((m->head - m->tail) % sizeof(m->buf));
90 	lwsl_info("%s: avail %d\n", __func__, (int)n);
91 
92 	if (n < 401)
93 		lwsl_err("%s: underrun\n", __func__);
94 
95 	m->tail = (m->tail + 401) % sizeof(m->buf);
96 	n = ((m->head - m->tail) % sizeof(m->buf));
97 
98 	e = lws_ss_get_est_peer_tx_credit(m->ss);
99 
100 	lwsl_info("%s: avail after: %d, curr est %d\n", __func__, (int)n, e);
101 
102 	if (n < (sizeof(m->buf) * 2) / 3 && e < (int)(sizeof(m->buf) - 1 - n)) {
103 		lwsl_info("%s: requesting additional %d\n", __func__,
104 				(int)(sizeof(m->buf) - 1 - e - n));
105 		lws_ss_add_peer_tx_credit(m->ss, (sizeof(m->buf) - 1 - e - n));
106 	}
107 
108 	lws_sul_schedule(context, 0, &m->sul, use_buffer_50ms,
109 			 50 * LWS_US_PER_MS);
110 }
111 
112 static int
ss_avs_metadata_rx(void * userobj,const uint8_t * buf,size_t len,int flags)113 ss_avs_metadata_rx(void *userobj, const uint8_t *buf, size_t len, int flags)
114 {
115 	ss_avs_metadata_t *m = (ss_avs_metadata_t *)userobj;
116 	struct lws_context *context = (struct lws_context *)m->opaque_data;
117 	size_t n, n1;
118 
119 	lwsl_notice("%s: rideshare %s, len %d, flags 0x%x\n", __func__,
120 			lws_ss_rideshare(m->ss), (int)len, flags);
121 	// lwsl_hexdump_warn(buf, len);
122 
123 	n = sizeof(m->buf) - ((m->head - m->tail) % sizeof(m->buf));
124 	lwsl_info("%s: len %d, buf h %d, t %d, space %d\n", __func__,
125 		    (int)len, (int)m->head, (int)m->tail, (int)n);
126 	lws_ss_get_est_peer_tx_credit(m->ss);
127 	if (len > n) {
128 		lwsl_err("%s: bad len: len %d, n %d\n", __func__, (int)len, (int)n);
129 		assert(0);
130 
131 		return 1;
132 	}
133 
134 	if (m->head < m->tail)				/* |****h-------t**| */
135 		memcpy(&m->buf[m->head], buf, len);
136 	else {						/* |---t*****h-----| */
137 		n1 = sizeof(m->buf) - m->head;
138 		if (len < n1)
139 			n1 = len;
140 		memcpy(&m->buf[m->head], buf, n1);
141 		if (n1 != len)
142 			memcpy(m->buf, buf, len - n1);
143 	}
144 
145 	m->head = (m->head + len) % sizeof(m->buf);
146 
147 	lws_sul_schedule(context, 0, &m->sul, use_buffer_50ms,
148 			 50 * LWS_US_PER_MS);
149 
150 	return 0;
151 }
152 
153 static int
ss_avs_metadata_tx(void * userobj,lws_ss_tx_ordinal_t ord,uint8_t * buf,size_t * len,int * flags)154 ss_avs_metadata_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
155 		   size_t *len, int *flags)
156 {
157 	ss_avs_metadata_t *m = (ss_avs_metadata_t *)userobj;
158 	//struct lws_context *context = (struct lws_context *)m->opaque_data;
159 	size_t tot;
160 
161 	if ((long)m->pos < 0) {
162 		*len = 0;
163 		lwsl_debug("%s: skip tx\n", __func__);
164 		return 1;
165 	}
166 
167 //	lwsl_notice("%s: rideshare '%s'\n", __func__, lws_ss_rideshare(m->ss));
168 
169 	if (!strcmp(lws_ss_rideshare(m->ss), "avs_audio")) {
170 		/* audio rideshare */
171 
172 		if (!m->pos)
173 			*flags |= LWSSS_FLAG_SOM;
174 
175 		if (*len > wav_len - m->pos)
176 			*len = wav_len - m->pos;
177 
178 		memcpy(buf, wav + m->pos, *len);
179 		m->pos += *len;
180 
181 		if (m->pos == wav_len) {
182 			*flags |= LWSSS_FLAG_EOM;
183 			lwsl_info("%s: tx done\n", __func__);
184 			m->pos = (long)-1l; /* ban subsequent until new stream */
185 		} else
186 			lws_ss_request_tx(m->ss);
187 
188 		lwsl_hexdump_info(buf, *len);
189 
190 		return 0;
191 	}
192 
193 	/* metadata part */
194 
195 	tot = strlen(metadata);
196 
197 	if (!m->pos)
198 		*flags |= LWSSS_FLAG_SOM;
199 
200 	if (*len > tot - m->pos)
201 		*len = tot - m->pos;
202 
203 	memcpy(buf, metadata + m->pos, *len);
204 
205 	m->pos += *len;
206 
207 	if (m->pos == tot) {
208 		*flags |= LWSSS_FLAG_EOM;
209 		m->pos = 0; /* for next time */
210 		lws_ss_request_tx(m->ss);
211 	}
212 
213 	lwsl_hexdump_info(buf, *len);
214 
215 	return 0;
216 }
217 
218 static int
ss_avs_metadata_state(void * userobj,void * sh,lws_ss_constate_t state,lws_ss_tx_ordinal_t ack)219 ss_avs_metadata_state(void *userobj, void *sh,
220 		      lws_ss_constate_t state, lws_ss_tx_ordinal_t ack)
221 {
222 
223 	ss_avs_metadata_t *m = (ss_avs_metadata_t *)userobj;
224 	struct lws_context *context = (struct lws_context *)m->opaque_data;
225 
226 	lwsl_user("%s: %s, ord 0x%x\n", __func__, lws_ss_state_name(state),
227 		  (unsigned int)ack);
228 
229 	switch (state) {
230 	case LWSSSCS_CREATING:
231 		lwsl_user("%s: CREATING\n", __func__);
232 		lws_ss_client_connect(m->ss);
233 		m->pos = 0;
234 		break;
235 	case LWSSSCS_CONNECTING:
236 		break;
237 	case LWSSSCS_CONNECTED:
238 		lws_ss_request_tx(m->ss);
239 		break;
240 	case LWSSSCS_ALL_RETRIES_FAILED:
241 		/* for this demo app, we want to exit on fail to connect */
242 	case LWSSSCS_DISCONNECTED:
243 		/* for this demo app, we want to exit after complete flow */
244 		lws_sul_schedule(context, 0, &m->sul, use_buffer_50ms,
245 				 LWS_SET_TIMER_USEC_CANCEL);
246 		interrupted = 1;
247 		break;
248 	case LWSSSCS_DESTROYING:
249 		lws_sul_schedule(context, 0, &m->sul, use_buffer_50ms,
250 				 LWS_SET_TIMER_USEC_CANCEL);
251 		break;
252 	default:
253 		break;
254 	}
255 
256 	return 0;
257 }
258 
259 /*
260  * avs event
261  */
262 
263 static int
ss_avs_event_rx(void * userobj,const uint8_t * buf,size_t len,int flags)264 ss_avs_event_rx(void *userobj, const uint8_t *buf, size_t len, int flags)
265 {
266 	ss_avs_event_t *m = (ss_avs_event_t *)userobj;
267 	// struct lws_context *context = (struct lws_context *)m->opaque_data;
268 
269 	lwsl_notice("%s: rideshare %s, len %d, flags 0x%x\n", __func__,
270 			lws_ss_rideshare(m->ss), (int)len, flags);
271 
272 //	lwsl_hexdump_warn(buf, len);
273 
274 	bad = 0; /* for this demo, receiving something here == success */
275 
276 	return 0;
277 }
278 
279 static int
ss_avs_event_tx(void * userobj,lws_ss_tx_ordinal_t ord,uint8_t * buf,size_t * len,int * flags)280 ss_avs_event_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
281 		      size_t *len, int *flags)
282 {
283 	ss_avs_event_t *m = (ss_avs_event_t *)userobj;
284 	lwsl_notice("%s: rideshare %s\n", __func__, lws_ss_rideshare(m->ss));
285 
286 	return 1; /* don't transmit anything */
287 }
288 
289 static int
ss_avs_event_state(void * userobj,void * sh,lws_ss_constate_t state,lws_ss_tx_ordinal_t ack)290 ss_avs_event_state(void *userobj, void *sh,
291 		   lws_ss_constate_t state, lws_ss_tx_ordinal_t ack)
292 {
293 	ss_avs_event_t *m = (ss_avs_event_t *)userobj;
294 	struct lws_context *context = (struct lws_context *)m->opaque_data;
295 	lws_ss_info_t ssi;
296 
297 	lwsl_user("%s: %s, ord 0x%x\n", __func__, lws_ss_state_name(state),
298 		  (unsigned int)ack);
299 
300 	switch (state) {
301 	case LWSSSCS_CREATING:
302 	case LWSSSCS_CONNECTING:
303 		break;
304 	case LWSSSCS_CONNECTED:
305 		if (hss_avs_sync)
306 			break;
307 
308 		lwsl_notice("%s: starting the second avs stream\n", __func__);
309 
310 		/*
311 		 * When we have established the event stream, we must POST
312 		 * on another stream within 10s
313 		 */
314 
315 		memset(&ssi, 0, sizeof(ssi));
316 		ssi.handle_offset	    = offsetof(ss_avs_metadata_t, ss);
317 		ssi.opaque_user_data_offset = offsetof(ss_avs_metadata_t,
318 						       opaque_data);
319 		ssi.rx			    = ss_avs_metadata_rx;
320 		ssi.tx			    = ss_avs_metadata_tx;
321 		ssi.state		    = ss_avs_metadata_state;
322 		ssi.user_alloc		    = sizeof(ss_avs_metadata_t);
323 		ssi.streamtype		    = "avs_metadata";
324 
325 		/*
326 		 * We want to allow the other side to fill our buffer, but no
327 		 * more.  But it's a bit tricky when the payload is inside
328 		 * framing like multipart MIME and contains other parts
329 		 */
330 
331 		/* uncomment to test rate-limiting, doesn't work with AVS servers */
332 //		ssi.manual_initial_tx_credit =
333 //				sizeof(((ss_avs_metadata_t *)0)->buf) / 2;
334 
335 		if (lws_ss_create(context, 0, &ssi, context, &hss_avs_sync,
336 				  NULL, NULL)) {
337 			lwsl_err("%s: failed to create avs metadata secstream\n",
338 				 __func__);
339 		}
340 		break;
341 	case LWSSSCS_ALL_RETRIES_FAILED:
342 		/* for this demo app, we want to exit on fail to connect */
343 		interrupted = 1;
344 		break;
345 	case LWSSSCS_DISCONNECTED:
346 		break;
347 	case LWSSSCS_DESTROYING:
348 		lwsl_notice("%s: DESTROYING\n", __func__);
349 		if (wav) {
350 			free(wav);
351 			wav = NULL;
352 		}
353 		break;
354 	default:
355 		break;
356 	}
357 
358 	return 0;
359 }
360 
361 int
avs_example_start(struct lws_context * context)362 avs_example_start(struct lws_context *context)
363 {
364 	lws_ss_info_t ssi;
365 	struct stat stat;
366 	int fd;
367 
368 	if (hss_avs_event)
369 		return 0;
370 
371 	fd = open("./year.wav", O_RDONLY);
372 	if (fd < 0) {
373 		lwsl_err("%s: failed to open wav file\n", __func__);
374 
375 		return 1;
376 	}
377 	if (fstat(fd, &stat) < 0) {
378 		lwsl_err("%s: failed to stat wav file\n", __func__);
379 
380 		goto bail;
381 	}
382 
383 	wav_len = stat.st_size;
384 	wav = malloc(wav_len);
385 	if (!wav) {
386 		lwsl_err("%s: failed to alloc wav buffer", __func__);
387 
388 		goto bail;
389 	}
390 	if (read(fd, wav, wav_len) != (int)wav_len) {
391 		lwsl_err("%s: failed to read wav\n", __func__);
392 
393 		goto bail;
394 	}
395 	close(fd);
396 
397 	lwsl_user("%s: Starting AVS stream\n", __func__);
398 
399 	/* AVS wants us to establish the long poll event stream first */
400 
401 	memset(&ssi, 0, sizeof(ssi));
402 	ssi.handle_offset	    = offsetof(ss_avs_event_t, ss);
403 	ssi.opaque_user_data_offset = offsetof(ss_avs_event_t, opaque_data);
404 	ssi.rx			    = ss_avs_event_rx;
405 	ssi.tx			    = ss_avs_event_tx;
406 	ssi.state		    = ss_avs_event_state;
407 	ssi.user_alloc		    = sizeof(ss_avs_event_t);
408 	ssi.streamtype		    = "avs_event";
409 
410 	if (lws_ss_create(context, 0, &ssi, context, &hss_avs_event, NULL, NULL)) {
411 		lwsl_err("%s: failed to create avs event secure stream\n",
412 			 __func__);
413 		free(wav);
414 		wav = NULL;
415 		return 1;
416 	}
417 
418 	return 0;
419 
420 bail:
421 	close(fd);
422 
423 	return 1;
424 }
425