• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * lws-minimal-secure-streams-avs
3  *
4  * Written in 2019-2020 by Andy Green <andy@warmcat.com>
5  *
6  * This file is made available under the Creative Commons CC0 1.0
7  * Universal Public Domain Dedication.
8  *
9  * This sends a canned WAV and received (and discards) the mp3 response.
10  * However it rate-limits the response reception to manage a small ringbuffer
11  * using ss / h2 flow control apis, reflecting consumption at 64kbps and only
12  * and 8KB buffer, indtended to model optimizing rx buffering on mp3 playback
13  * on a constrained device.
14  */
15 
16 #include <libwebsockets.h>
17 #include <string.h>
18 #include <sys/types.h>
19 #include <sys/stat.h>
20 #if !defined(WIN32)
21 #include <unistd.h>
22 #endif
23 #include <assert.h>
24 #include <fcntl.h>
25 
26 extern int interrupted, bad;
27 static struct lws_ss_handle *hss_avs_event, *hss_avs_sync;
28 static uint8_t *wav;
29 static size_t wav_len;
30 
31 typedef struct ss_avs_event {
32 	struct lws_ss_handle 	*ss;
33 	void			*opaque_data;
34 	/* ... application specific state ... */
35 	struct lejp_ctx		jctx;
36 } ss_avs_event_t;
37 
38 typedef struct ss_avs_metadata {
39 	struct lws_ss_handle 	*ss;
40 	void			*opaque_data;
41 	/* ... application specific state ... */
42 	struct lejp_ctx		jctx;
43 	size_t			pos;
44 
45 	/*
46 	 * We simulate a ringbuffer that is used up by a sul at 64Kbit/sec
47 	 * rate, and managed at the same rate using tx credit
48 	 */
49 
50 	lws_sorted_usec_list_t	sul;
51 	uint8_t			buf[256 * 1024]; /* to test rate-limiting, set to 8 * 1024 */
52 	int			head;
53 	int			tail;
54 
55 	char			filled;
56 
57 } ss_avs_metadata_t;
58 
59 static const char *metadata = "{"
60 	"\"event\": {"
61 		"\"header\": {"
62 			"\"namespace\": \"SpeechRecognizer\","
63 			"\"name\": \"Recognize\","
64 			"\"messageId\": \"message-123\","
65 			"\"dialogRequestId\": \"dialog-request-321\""
66 		"},"
67 		"\"payload\": {"
68 			"\"profile\":"	"\"CLOSE_TALK\","
69 			"\"format\":"	"\"AUDIO_L16_RATE_16000_CHANNELS_1\""
70 		"}"
71 	"}"
72 "}";
73 
74 /*
75  * avs metadata
76  */
77 
78 static void
use_buffer_50ms(lws_sorted_usec_list_t * sul)79 use_buffer_50ms(lws_sorted_usec_list_t *sul)
80 {
81 	ss_avs_metadata_t *m = lws_container_of(sul, ss_avs_metadata_t, sul);
82 	struct lws_context *context = (struct lws_context *)m->opaque_data;
83 	size_t n;
84 	int e;
85 
86 	/*
87 	 * Use up 50ms-worth (8KB / 20) == 401 bytes of buffered data
88 	 */
89 
90 	/* remaining data in buffer */
91 	n = ((size_t)(m->head - m->tail) % sizeof(m->buf));
92 	lwsl_info("%s: avail %d\n", __func__, (int)n);
93 
94 	if (n < 401)
95 		lwsl_err("%s: underrun\n", __func__);
96 
97 	m->tail = ((size_t)m->tail + 401) % sizeof(m->buf);
98 	n = ((size_t)(m->head - m->tail) % sizeof(m->buf));
99 
100 	e = lws_ss_get_est_peer_tx_credit(m->ss);
101 
102 	lwsl_info("%s: avail after: %d, curr est %d\n", __func__, (int)n, e);
103 
104 	if (n < (sizeof(m->buf) * 2) / 3 && e < (int)(sizeof(m->buf) - 1 - n)) {
105 		lwsl_info("%s: requesting additional %d\n", __func__,
106 				(int)sizeof(m->buf) - 1 - e - (int)n);
107 		lws_ss_add_peer_tx_credit(m->ss, (int32_t)((int)sizeof(m->buf) - 1 - e - (int)n));
108 	}
109 
110 	lws_sul_schedule(context, 0, &m->sul, use_buffer_50ms,
111 			 50 * LWS_US_PER_MS);
112 }
113 
114 static lws_ss_state_return_t
ss_avs_metadata_rx(void * userobj,const uint8_t * buf,size_t len,int flags)115 ss_avs_metadata_rx(void *userobj, const uint8_t *buf, size_t len, int flags)
116 {
117 	ss_avs_metadata_t *m = (ss_avs_metadata_t *)userobj;
118 	struct lws_context *context = (struct lws_context *)m->opaque_data;
119 	size_t n, n1;
120 
121 	lwsl_notice("%s: rideshare %s, len %d, flags 0x%x\n", __func__,
122 			lws_ss_rideshare(m->ss), (int)len, flags);
123 #if 0
124 	lwsl_hexdump_warn(buf, len);
125 #endif
126 
127 	n = sizeof(m->buf) - ((size_t)(m->head - m->tail) % sizeof(m->buf));
128 	lwsl_info("%s: len %d, buf h %d, t %d, space %d\n", __func__,
129 		    (int)len, (int)m->head, (int)m->tail, (int)n);
130 	lws_ss_get_est_peer_tx_credit(m->ss);
131 	if (len > n) {
132 		lwsl_err("%s: bad len: len %d, n %d\n", __func__, (int)len, (int)n);
133 		assert(0);
134 
135 		return 1;
136 	}
137 
138 	if (m->head < m->tail)				/* |****h-------t**| */
139 		memcpy(&m->buf[m->head], buf, len);
140 	else {						/* |---t*****h-----| */
141 		n1 = sizeof(m->buf) - (size_t)m->head;
142 		if (len < n1)
143 			n1 = len;
144 		memcpy(&m->buf[m->head], buf, n1);
145 		if (n1 != len)
146 			memcpy(m->buf, buf, len - n1);
147 	}
148 
149 	m->head = (((size_t)m->head) + len) % sizeof(m->buf);
150 
151 	lws_sul_schedule(context, 0, &m->sul, use_buffer_50ms,
152 			 50 * LWS_US_PER_MS);
153 
154 	return 0;
155 }
156 
157 static lws_ss_state_return_t
ss_avs_metadata_tx(void * userobj,lws_ss_tx_ordinal_t ord,uint8_t * buf,size_t * len,int * flags)158 ss_avs_metadata_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
159 		   size_t *len, int *flags)
160 {
161 	ss_avs_metadata_t *m = (ss_avs_metadata_t *)userobj;
162 	//struct lws_context *context = (struct lws_context *)m->opaque_data;
163 	size_t tot;
164 
165 	if ((long)m->pos < 0) {
166 		*len = 0;
167 		lwsl_debug("%s: skip tx\n", __func__);
168 		return 1;
169 	}
170 
171 //	lwsl_notice("%s: rideshare '%s'\n", __func__, lws_ss_rideshare(m->ss));
172 
173 	if (!strcmp(lws_ss_rideshare(m->ss), "avs_audio")) {
174 		/* audio rideshare */
175 
176 		if (!m->pos)
177 			*flags |= LWSSS_FLAG_SOM;
178 
179 		if (*len > wav_len - m->pos)
180 			*len = wav_len - m->pos;
181 
182 		memcpy(buf, wav + m->pos, *len);
183 		m->pos += *len;
184 
185 		if (m->pos == wav_len) {
186 			*flags |= LWSSS_FLAG_EOM;
187 			lwsl_info("%s: tx done\n", __func__);
188 			m->pos = (size_t)-1l; /* ban subsequent until new stream */
189 		} else
190 			return lws_ss_request_tx(m->ss);
191 
192 		lwsl_hexdump_info(buf, *len);
193 
194 		return 0;
195 	}
196 
197 	/* metadata part */
198 
199 	tot = strlen(metadata);
200 
201 	if (!m->pos)
202 		*flags |= LWSSS_FLAG_SOM;
203 
204 	if (*len > tot - m->pos)
205 		*len = tot - m->pos;
206 
207 	memcpy(buf, metadata + m->pos, *len);
208 
209 	m->pos += *len;
210 
211 	if (m->pos == tot) {
212 		*flags |= LWSSS_FLAG_EOM;
213 		m->pos = 0; /* for next time */
214 		return lws_ss_request_tx(m->ss);
215 	}
216 
217 	lwsl_hexdump_info(buf, *len);
218 
219 	return 0;
220 }
221 
222 static lws_ss_state_return_t
ss_avs_metadata_state(void * userobj,void * sh,lws_ss_constate_t state,lws_ss_tx_ordinal_t ack)223 ss_avs_metadata_state(void *userobj, void *sh,
224 		      lws_ss_constate_t state, lws_ss_tx_ordinal_t ack)
225 {
226 
227 	ss_avs_metadata_t *m = (ss_avs_metadata_t *)userobj;
228 	// struct lws_context *context = (struct lws_context *)m->opaque_data;
229 
230 	lwsl_user("%s: %s, ord 0x%x\n", __func__, lws_ss_state_name((int)state),
231 		  (unsigned int)ack);
232 
233 	switch (state) {
234 	case LWSSSCS_CREATING:
235 		lwsl_user("%s: CREATING\n", __func__);
236 		m->pos = 0;
237 		return lws_ss_client_connect(m->ss);
238 
239 	case LWSSSCS_CONNECTING:
240 		break;
241 	case LWSSSCS_CONNECTED:
242 		return lws_ss_request_tx(m->ss);
243 
244 	case LWSSSCS_ALL_RETRIES_FAILED:
245 		/* for this demo app, we want to exit on fail to connect */
246 	case LWSSSCS_DISCONNECTED:
247 		/* for this demo app, we want to exit after complete flow */
248 		lws_sul_cancel(&m->sul);
249 		interrupted = 1;
250 		break;
251 	case LWSSSCS_DESTROYING:
252 		lws_sul_cancel(&m->sul);
253 		break;
254 	default:
255 		break;
256 	}
257 
258 	return 0;
259 }
260 
261 /*
262  * avs event
263  */
264 
265 static lws_ss_state_return_t
ss_avs_event_rx(void * userobj,const uint8_t * buf,size_t len,int flags)266 ss_avs_event_rx(void *userobj, const uint8_t *buf, size_t len, int flags)
267 {
268 #if !defined(LWS_WITH_NO_LOGS)
269 	ss_avs_event_t *m = (ss_avs_event_t *)userobj;
270 	// struct lws_context *context = (struct lws_context *)m->opaque_data;
271 
272 	lwsl_notice("%s: rideshare %s, len %d, flags 0x%x\n", __func__,
273 			lws_ss_rideshare(m->ss), (int)len, flags);
274 #endif
275 //	lwsl_hexdump_warn(buf, len);
276 
277 	bad = 0; /* for this demo, receiving something here == success */
278 
279 	return 0;
280 }
281 
282 static lws_ss_state_return_t
ss_avs_event_tx(void * userobj,lws_ss_tx_ordinal_t ord,uint8_t * buf,size_t * len,int * flags)283 ss_avs_event_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
284 		      size_t *len, int *flags)
285 {
286 #if !defined(LWS_WITH_NO_LOGS)
287 	ss_avs_event_t *m = (ss_avs_event_t *)userobj;
288 	lwsl_notice("%s: rideshare %s\n", __func__, lws_ss_rideshare(m->ss));
289 #endif
290 	return 1; /* don't transmit anything */
291 }
292 
293 static lws_ss_state_return_t
ss_avs_event_state(void * userobj,void * sh,lws_ss_constate_t state,lws_ss_tx_ordinal_t ack)294 ss_avs_event_state(void *userobj, void *sh,
295 		   lws_ss_constate_t state, lws_ss_tx_ordinal_t ack)
296 {
297 	ss_avs_event_t *m = (ss_avs_event_t *)userobj;
298 	struct lws_context *context = (struct lws_context *)m->opaque_data;
299 	lws_ss_info_t ssi;
300 
301 	lwsl_user("%s: %s, ord 0x%x\n", __func__, lws_ss_state_name((int)state),
302 		  (unsigned int)ack);
303 
304 	switch (state) {
305 	case LWSSSCS_CREATING:
306 	case LWSSSCS_CONNECTING:
307 		break;
308 	case LWSSSCS_CONNECTED:
309 		if (hss_avs_sync)
310 			break;
311 
312 		lwsl_notice("%s: starting the second avs stream\n", __func__);
313 
314 		/*
315 		 * When we have established the event stream, we must POST
316 		 * on another stream within 10s
317 		 */
318 
319 		memset(&ssi, 0, sizeof(ssi));
320 		ssi.handle_offset	    = offsetof(ss_avs_metadata_t, ss);
321 		ssi.opaque_user_data_offset = offsetof(ss_avs_metadata_t,
322 						       opaque_data);
323 		ssi.rx			    = ss_avs_metadata_rx;
324 		ssi.tx			    = ss_avs_metadata_tx;
325 		ssi.state		    = ss_avs_metadata_state;
326 		ssi.user_alloc		    = sizeof(ss_avs_metadata_t);
327 		ssi.streamtype		    = "avs_metadata";
328 
329 		/*
330 		 * We want to allow the other side to fill our buffer, but no
331 		 * more.  But it's a bit tricky when the payload is inside
332 		 * framing like multipart MIME and contains other parts
333 		 */
334 
335 		/* uncomment to test rate-limiting, doesn't work with AVS servers */
336 //		ssi.manual_initial_tx_credit =
337 //				sizeof(((ss_avs_metadata_t *)0)->buf) / 2;
338 
339 		if (lws_ss_create(context, 0, &ssi, context, &hss_avs_sync,
340 				  NULL, NULL)) {
341 			lwsl_err("%s: failed to create avs metadata secstream\n",
342 				 __func__);
343 		}
344 		break;
345 	case LWSSSCS_ALL_RETRIES_FAILED:
346 		/* for this demo app, we want to exit on fail to connect */
347 		interrupted = 1;
348 		break;
349 	case LWSSSCS_DISCONNECTED:
350 		break;
351 	case LWSSSCS_DESTROYING:
352 		lwsl_notice("%s: DESTROYING\n", __func__);
353 		if (wav) {
354 			free(wav);
355 			wav = NULL;
356 		}
357 		break;
358 	default:
359 		break;
360 	}
361 
362 	return 0;
363 }
364 
365 int
avs_example_start(struct lws_context * context)366 avs_example_start(struct lws_context *context)
367 {
368 	lws_ss_info_t ssi;
369 	struct stat stat;
370 	int fd;
371 
372 	if (hss_avs_event)
373 		return 0;
374 
375 	fd = open("./year.wav", O_RDONLY);
376 	if (fd < 0) {
377 		lwsl_err("%s: failed to open wav file\n", __func__);
378 
379 		return 1;
380 	}
381 	if (fstat(fd, &stat) < 0) {
382 		lwsl_err("%s: failed to stat wav file\n", __func__);
383 
384 		goto bail;
385 	}
386 
387 	wav_len = (size_t)stat.st_size;
388 	wav = malloc(wav_len);
389 	if (!wav) {
390 		lwsl_err("%s: failed to alloc wav buffer", __func__);
391 
392 		goto bail;
393 	}
394 	if (read(fd, wav,
395 #if defined(WIN32)
396 		(unsigned int)
397 #endif
398 			wav_len) != (int)wav_len) {
399 		lwsl_err("%s: failed to read wav\n", __func__);
400 
401 		goto bail;
402 	}
403 	close(fd);
404 
405 	lwsl_user("%s: Starting AVS stream\n", __func__);
406 
407 	/* AVS wants us to establish the long poll event stream first */
408 
409 	memset(&ssi, 0, sizeof(ssi));
410 	ssi.handle_offset	    = offsetof(ss_avs_event_t, ss);
411 	ssi.opaque_user_data_offset = offsetof(ss_avs_event_t, opaque_data);
412 	ssi.rx			    = ss_avs_event_rx;
413 	ssi.tx			    = ss_avs_event_tx;
414 	ssi.state		    = ss_avs_event_state;
415 	ssi.user_alloc		    = sizeof(ss_avs_event_t);
416 	ssi.streamtype		    = "avs_event";
417 
418 	if (lws_ss_create(context, 0, &ssi, context, &hss_avs_event, NULL, NULL)) {
419 		lwsl_err("%s: failed to create avs event secure stream\n",
420 			 __func__);
421 		free(wav);
422 		wav = NULL;
423 		return 1;
424 	}
425 
426 	return 0;
427 
428 bail:
429 	close(fd);
430 
431 	return 1;
432 }
433