1 /*
2 * lws-minimal-secure-streams-avs
3 *
4 * Written in 2019-2020 by Andy Green <andy@warmcat.com>
5 *
6 * This file is made available under the Creative Commons CC0 1.0
7 * Universal Public Domain Dedication.
8 *
9 * This sends a canned WAV and received (and discards) the mp3 response.
10 * However it rate-limits the response reception to manage a small ringbuffer
11 * using ss / h2 flow control apis, reflecting consumption at 64kbps and only
12 * and 8KB buffer, indtended to model optimizing rx buffering on mp3 playback
13 * on a constrained device.
14 */
15
16 #include <libwebsockets.h>
17 #include <string.h>
18 #include <sys/types.h>
19 #include <sys/stat.h>
20 #if !defined(WIN32)
21 #include <unistd.h>
22 #endif
23 #include <assert.h>
24 #include <fcntl.h>
25
26 extern int interrupted, bad;
27 static struct lws_ss_handle *hss_avs_event, *hss_avs_sync;
28 static uint8_t *wav;
29 static size_t wav_len;
30
31 typedef struct ss_avs_event {
32 struct lws_ss_handle *ss;
33 void *opaque_data;
34 /* ... application specific state ... */
35 struct lejp_ctx jctx;
36 } ss_avs_event_t;
37
38 typedef struct ss_avs_metadata {
39 struct lws_ss_handle *ss;
40 void *opaque_data;
41 /* ... application specific state ... */
42 struct lejp_ctx jctx;
43 size_t pos;
44
45 /*
46 * We simulate a ringbuffer that is used up by a sul at 64Kbit/sec
47 * rate, and managed at the same rate using tx credit
48 */
49
50 lws_sorted_usec_list_t sul;
51 uint8_t buf[256 * 1024]; /* to test rate-limiting, set to 8 * 1024 */
52 int head;
53 int tail;
54
55 char filled;
56
57 } ss_avs_metadata_t;
58
59 static const char *metadata = "{"
60 "\"event\": {"
61 "\"header\": {"
62 "\"namespace\": \"SpeechRecognizer\","
63 "\"name\": \"Recognize\","
64 "\"messageId\": \"message-123\","
65 "\"dialogRequestId\": \"dialog-request-321\""
66 "},"
67 "\"payload\": {"
68 "\"profile\":" "\"CLOSE_TALK\","
69 "\"format\":" "\"AUDIO_L16_RATE_16000_CHANNELS_1\""
70 "}"
71 "}"
72 "}";
73
74 /*
75 * avs metadata
76 */
77
78 static void
use_buffer_50ms(lws_sorted_usec_list_t * sul)79 use_buffer_50ms(lws_sorted_usec_list_t *sul)
80 {
81 ss_avs_metadata_t *m = lws_container_of(sul, ss_avs_metadata_t, sul);
82 struct lws_context *context = (struct lws_context *)m->opaque_data;
83 size_t n;
84 int e;
85
86 /*
87 * Use up 50ms-worth (8KB / 20) == 401 bytes of buffered data
88 */
89
90 /* remaining data in buffer */
91 n = ((size_t)(m->head - m->tail) % sizeof(m->buf));
92 lwsl_info("%s: avail %d\n", __func__, (int)n);
93
94 if (n < 401)
95 lwsl_err("%s: underrun\n", __func__);
96
97 m->tail = ((size_t)m->tail + 401) % sizeof(m->buf);
98 n = ((size_t)(m->head - m->tail) % sizeof(m->buf));
99
100 e = lws_ss_get_est_peer_tx_credit(m->ss);
101
102 lwsl_info("%s: avail after: %d, curr est %d\n", __func__, (int)n, e);
103
104 if (n < (sizeof(m->buf) * 2) / 3 && e < (int)(sizeof(m->buf) - 1 - n)) {
105 lwsl_info("%s: requesting additional %d\n", __func__,
106 (int)sizeof(m->buf) - 1 - e - (int)n);
107 lws_ss_add_peer_tx_credit(m->ss, (int32_t)((int)sizeof(m->buf) - 1 - e - (int)n));
108 }
109
110 lws_sul_schedule(context, 0, &m->sul, use_buffer_50ms,
111 50 * LWS_US_PER_MS);
112 }
113
114 static lws_ss_state_return_t
ss_avs_metadata_rx(void * userobj,const uint8_t * buf,size_t len,int flags)115 ss_avs_metadata_rx(void *userobj, const uint8_t *buf, size_t len, int flags)
116 {
117 ss_avs_metadata_t *m = (ss_avs_metadata_t *)userobj;
118 struct lws_context *context = (struct lws_context *)m->opaque_data;
119 size_t n, n1;
120
121 lwsl_notice("%s: rideshare %s, len %d, flags 0x%x\n", __func__,
122 lws_ss_rideshare(m->ss), (int)len, flags);
123 #if 0
124 lwsl_hexdump_warn(buf, len);
125 #endif
126
127 n = sizeof(m->buf) - ((size_t)(m->head - m->tail) % sizeof(m->buf));
128 lwsl_info("%s: len %d, buf h %d, t %d, space %d\n", __func__,
129 (int)len, (int)m->head, (int)m->tail, (int)n);
130 lws_ss_get_est_peer_tx_credit(m->ss);
131 if (len > n) {
132 lwsl_err("%s: bad len: len %d, n %d\n", __func__, (int)len, (int)n);
133 assert(0);
134
135 return 1;
136 }
137
138 if (m->head < m->tail) /* |****h-------t**| */
139 memcpy(&m->buf[m->head], buf, len);
140 else { /* |---t*****h-----| */
141 n1 = sizeof(m->buf) - (size_t)m->head;
142 if (len < n1)
143 n1 = len;
144 memcpy(&m->buf[m->head], buf, n1);
145 if (n1 != len)
146 memcpy(m->buf, buf, len - n1);
147 }
148
149 m->head = (((size_t)m->head) + len) % sizeof(m->buf);
150
151 lws_sul_schedule(context, 0, &m->sul, use_buffer_50ms,
152 50 * LWS_US_PER_MS);
153
154 return 0;
155 }
156
157 static lws_ss_state_return_t
ss_avs_metadata_tx(void * userobj,lws_ss_tx_ordinal_t ord,uint8_t * buf,size_t * len,int * flags)158 ss_avs_metadata_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
159 size_t *len, int *flags)
160 {
161 ss_avs_metadata_t *m = (ss_avs_metadata_t *)userobj;
162 //struct lws_context *context = (struct lws_context *)m->opaque_data;
163 size_t tot;
164
165 if ((long)m->pos < 0) {
166 *len = 0;
167 lwsl_debug("%s: skip tx\n", __func__);
168 return 1;
169 }
170
171 // lwsl_notice("%s: rideshare '%s'\n", __func__, lws_ss_rideshare(m->ss));
172
173 if (!strcmp(lws_ss_rideshare(m->ss), "avs_audio")) {
174 /* audio rideshare */
175
176 if (!m->pos)
177 *flags |= LWSSS_FLAG_SOM;
178
179 if (*len > wav_len - m->pos)
180 *len = wav_len - m->pos;
181
182 memcpy(buf, wav + m->pos, *len);
183 m->pos += *len;
184
185 if (m->pos == wav_len) {
186 *flags |= LWSSS_FLAG_EOM;
187 lwsl_info("%s: tx done\n", __func__);
188 m->pos = (size_t)-1l; /* ban subsequent until new stream */
189 } else
190 return lws_ss_request_tx(m->ss);
191
192 lwsl_hexdump_info(buf, *len);
193
194 return 0;
195 }
196
197 /* metadata part */
198
199 tot = strlen(metadata);
200
201 if (!m->pos)
202 *flags |= LWSSS_FLAG_SOM;
203
204 if (*len > tot - m->pos)
205 *len = tot - m->pos;
206
207 memcpy(buf, metadata + m->pos, *len);
208
209 m->pos += *len;
210
211 if (m->pos == tot) {
212 *flags |= LWSSS_FLAG_EOM;
213 m->pos = 0; /* for next time */
214 return lws_ss_request_tx(m->ss);
215 }
216
217 lwsl_hexdump_info(buf, *len);
218
219 return 0;
220 }
221
222 static lws_ss_state_return_t
ss_avs_metadata_state(void * userobj,void * sh,lws_ss_constate_t state,lws_ss_tx_ordinal_t ack)223 ss_avs_metadata_state(void *userobj, void *sh,
224 lws_ss_constate_t state, lws_ss_tx_ordinal_t ack)
225 {
226
227 ss_avs_metadata_t *m = (ss_avs_metadata_t *)userobj;
228 // struct lws_context *context = (struct lws_context *)m->opaque_data;
229
230 lwsl_user("%s: %s, ord 0x%x\n", __func__, lws_ss_state_name((int)state),
231 (unsigned int)ack);
232
233 switch (state) {
234 case LWSSSCS_CREATING:
235 lwsl_user("%s: CREATING\n", __func__);
236 m->pos = 0;
237 return lws_ss_client_connect(m->ss);
238
239 case LWSSSCS_CONNECTING:
240 break;
241 case LWSSSCS_CONNECTED:
242 return lws_ss_request_tx(m->ss);
243
244 case LWSSSCS_ALL_RETRIES_FAILED:
245 /* for this demo app, we want to exit on fail to connect */
246 case LWSSSCS_DISCONNECTED:
247 /* for this demo app, we want to exit after complete flow */
248 lws_sul_cancel(&m->sul);
249 interrupted = 1;
250 break;
251 case LWSSSCS_DESTROYING:
252 lws_sul_cancel(&m->sul);
253 break;
254 default:
255 break;
256 }
257
258 return 0;
259 }
260
261 /*
262 * avs event
263 */
264
265 static lws_ss_state_return_t
ss_avs_event_rx(void * userobj,const uint8_t * buf,size_t len,int flags)266 ss_avs_event_rx(void *userobj, const uint8_t *buf, size_t len, int flags)
267 {
268 #if !defined(LWS_WITH_NO_LOGS)
269 ss_avs_event_t *m = (ss_avs_event_t *)userobj;
270 // struct lws_context *context = (struct lws_context *)m->opaque_data;
271
272 lwsl_notice("%s: rideshare %s, len %d, flags 0x%x\n", __func__,
273 lws_ss_rideshare(m->ss), (int)len, flags);
274 #endif
275 // lwsl_hexdump_warn(buf, len);
276
277 bad = 0; /* for this demo, receiving something here == success */
278
279 return 0;
280 }
281
282 static lws_ss_state_return_t
ss_avs_event_tx(void * userobj,lws_ss_tx_ordinal_t ord,uint8_t * buf,size_t * len,int * flags)283 ss_avs_event_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
284 size_t *len, int *flags)
285 {
286 #if !defined(LWS_WITH_NO_LOGS)
287 ss_avs_event_t *m = (ss_avs_event_t *)userobj;
288 lwsl_notice("%s: rideshare %s\n", __func__, lws_ss_rideshare(m->ss));
289 #endif
290 return 1; /* don't transmit anything */
291 }
292
293 static lws_ss_state_return_t
ss_avs_event_state(void * userobj,void * sh,lws_ss_constate_t state,lws_ss_tx_ordinal_t ack)294 ss_avs_event_state(void *userobj, void *sh,
295 lws_ss_constate_t state, lws_ss_tx_ordinal_t ack)
296 {
297 ss_avs_event_t *m = (ss_avs_event_t *)userobj;
298 struct lws_context *context = (struct lws_context *)m->opaque_data;
299 lws_ss_info_t ssi;
300
301 lwsl_user("%s: %s, ord 0x%x\n", __func__, lws_ss_state_name((int)state),
302 (unsigned int)ack);
303
304 switch (state) {
305 case LWSSSCS_CREATING:
306 case LWSSSCS_CONNECTING:
307 break;
308 case LWSSSCS_CONNECTED:
309 if (hss_avs_sync)
310 break;
311
312 lwsl_notice("%s: starting the second avs stream\n", __func__);
313
314 /*
315 * When we have established the event stream, we must POST
316 * on another stream within 10s
317 */
318
319 memset(&ssi, 0, sizeof(ssi));
320 ssi.handle_offset = offsetof(ss_avs_metadata_t, ss);
321 ssi.opaque_user_data_offset = offsetof(ss_avs_metadata_t,
322 opaque_data);
323 ssi.rx = ss_avs_metadata_rx;
324 ssi.tx = ss_avs_metadata_tx;
325 ssi.state = ss_avs_metadata_state;
326 ssi.user_alloc = sizeof(ss_avs_metadata_t);
327 ssi.streamtype = "avs_metadata";
328
329 /*
330 * We want to allow the other side to fill our buffer, but no
331 * more. But it's a bit tricky when the payload is inside
332 * framing like multipart MIME and contains other parts
333 */
334
335 /* uncomment to test rate-limiting, doesn't work with AVS servers */
336 // ssi.manual_initial_tx_credit =
337 // sizeof(((ss_avs_metadata_t *)0)->buf) / 2;
338
339 if (lws_ss_create(context, 0, &ssi, context, &hss_avs_sync,
340 NULL, NULL)) {
341 lwsl_err("%s: failed to create avs metadata secstream\n",
342 __func__);
343 }
344 break;
345 case LWSSSCS_ALL_RETRIES_FAILED:
346 /* for this demo app, we want to exit on fail to connect */
347 interrupted = 1;
348 break;
349 case LWSSSCS_DISCONNECTED:
350 break;
351 case LWSSSCS_DESTROYING:
352 lwsl_notice("%s: DESTROYING\n", __func__);
353 if (wav) {
354 free(wav);
355 wav = NULL;
356 }
357 break;
358 default:
359 break;
360 }
361
362 return 0;
363 }
364
365 int
avs_example_start(struct lws_context * context)366 avs_example_start(struct lws_context *context)
367 {
368 lws_ss_info_t ssi;
369 struct stat stat;
370 int fd;
371
372 if (hss_avs_event)
373 return 0;
374
375 fd = open("./year.wav", O_RDONLY);
376 if (fd < 0) {
377 lwsl_err("%s: failed to open wav file\n", __func__);
378
379 return 1;
380 }
381 if (fstat(fd, &stat) < 0) {
382 lwsl_err("%s: failed to stat wav file\n", __func__);
383
384 goto bail;
385 }
386
387 wav_len = (size_t)stat.st_size;
388 wav = malloc(wav_len);
389 if (!wav) {
390 lwsl_err("%s: failed to alloc wav buffer", __func__);
391
392 goto bail;
393 }
394 if (read(fd, wav,
395 #if defined(WIN32)
396 (unsigned int)
397 #endif
398 wav_len) != (int)wav_len) {
399 lwsl_err("%s: failed to read wav\n", __func__);
400
401 goto bail;
402 }
403 close(fd);
404
405 lwsl_user("%s: Starting AVS stream\n", __func__);
406
407 /* AVS wants us to establish the long poll event stream first */
408
409 memset(&ssi, 0, sizeof(ssi));
410 ssi.handle_offset = offsetof(ss_avs_event_t, ss);
411 ssi.opaque_user_data_offset = offsetof(ss_avs_event_t, opaque_data);
412 ssi.rx = ss_avs_event_rx;
413 ssi.tx = ss_avs_event_tx;
414 ssi.state = ss_avs_event_state;
415 ssi.user_alloc = sizeof(ss_avs_event_t);
416 ssi.streamtype = "avs_event";
417
418 if (lws_ss_create(context, 0, &ssi, context, &hss_avs_event, NULL, NULL)) {
419 lwsl_err("%s: failed to create avs event secure stream\n",
420 __func__);
421 free(wav);
422 wav = NULL;
423 return 1;
424 }
425
426 return 0;
427
428 bail:
429 close(fd);
430
431 return 1;
432 }
433