1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24
25 #include <string.h>
26 #include <stdio.h>
27
28 #include <pulse/def.h>
29 #include <pulse/timeval.h>
30 #include <pulse/rtclock.h>
31 #include <pulse/xmalloc.h>
32 #include <pulse/fork-detect.h>
33
34 #include <pulsecore/pstream-util.h>
35 #include <pulsecore/sample-util.h>
36 #include <pulsecore/log.h>
37 #include <pulsecore/hashmap.h>
38 #include <pulsecore/macro.h>
39 #include <pulsecore/core-rtclock.h>
40 #include <pulsecore/core-util.h>
41
42 #include "internal.h"
43 #include "stream.h"
44
45 /* #define STREAM_DEBUG */
46
47 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
48 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
49
50 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
51 #ifndef USE_SMOOTHER_2
52 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
53 #define SMOOTHER_MIN_HISTORY (4)
54 #endif
55
pa_stream_new(pa_context * c,const char * name,const pa_sample_spec * ss,const pa_channel_map * map)56 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
57 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
58 }
59
reset_callbacks(pa_stream * s)60 static void reset_callbacks(pa_stream *s) {
61 s->read_callback = NULL;
62 s->read_userdata = NULL;
63 s->write_callback = NULL;
64 s->write_userdata = NULL;
65 s->state_callback = NULL;
66 s->state_userdata = NULL;
67 s->overflow_callback = NULL;
68 s->overflow_userdata = NULL;
69 s->underflow_callback = NULL;
70 s->underflow_userdata = NULL;
71 s->latency_update_callback = NULL;
72 s->latency_update_userdata = NULL;
73 s->moved_callback = NULL;
74 s->moved_userdata = NULL;
75 s->suspended_callback = NULL;
76 s->suspended_userdata = NULL;
77 s->started_callback = NULL;
78 s->started_userdata = NULL;
79 s->event_callback = NULL;
80 s->event_userdata = NULL;
81 s->buffer_attr_callback = NULL;
82 s->buffer_attr_userdata = NULL;
83 s->underflow_ohos_callback = NULL;
84 s->underflow_ohos_userdata = NULL;
85 }
86
pa_stream_new_with_proplist_internal(pa_context * c,const char * name,const pa_sample_spec * ss,const pa_channel_map * map,pa_format_info * const * formats,unsigned int n_formats,pa_proplist * p)87 static pa_stream *pa_stream_new_with_proplist_internal(
88 pa_context *c,
89 const char *name,
90 const pa_sample_spec *ss,
91 const pa_channel_map *map,
92 pa_format_info * const *formats,
93 unsigned int n_formats,
94 pa_proplist *p) {
95
96 pa_stream *s;
97 unsigned int i;
98
99 pa_assert(c);
100 pa_assert(PA_REFCNT_VALUE(c) >= 1);
101 pa_assert((ss == NULL && map == NULL) || (formats == NULL && n_formats == 0));
102 pa_assert(n_formats < PA_MAX_FORMATS);
103
104 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
105 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
106
107 s = pa_xnew(pa_stream, 1);
108 PA_REFCNT_INIT(s);
109 s->context = c;
110 s->mainloop = c->mainloop;
111
112 s->direction = PA_STREAM_NODIRECTION;
113 s->state = PA_STREAM_UNCONNECTED;
114 s->flags = 0;
115
116 if (ss)
117 s->sample_spec = *ss;
118 else
119 pa_sample_spec_init(&s->sample_spec);
120
121 if (map)
122 s->channel_map = *map;
123 else
124 pa_channel_map_init(&s->channel_map);
125
126 s->n_formats = 0;
127 if (formats) {
128 s->n_formats = n_formats;
129 for (i = 0; i < n_formats; i++)
130 s->req_formats[i] = pa_format_info_copy(formats[i]);
131 }
132
133 /* We'll get the final negotiated format after connecting */
134 s->format = NULL;
135
136 s->direct_on_input = PA_INVALID_INDEX;
137
138 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
139 if (name)
140 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
141
142 s->channel = 0;
143 s->channel_valid = false;
144 s->syncid = c->csyncid++;
145 s->stream_index = PA_INVALID_INDEX;
146
147 s->requested_bytes = 0;
148 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
149
150 /* We initialize the target length here, so that if the user
151 * passes no explicit buffering metrics the default is similar to
152 * what older PA versions provided. */
153
154 s->buffer_attr.maxlength = (uint32_t) -1;
155 if (ss)
156 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
157 else {
158 /* FIXME: We assume a worst-case compressed format corresponding to
159 * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
160 pa_sample_spec tmp_ss = {
161 .format = PA_SAMPLE_S16NE,
162 .rate = 48000,
163 .channels = 2,
164 };
165 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
166 }
167 s->buffer_attr.minreq = (uint32_t) -1;
168 s->buffer_attr.prebuf = (uint32_t) -1;
169 s->buffer_attr.fragsize = (uint32_t) -1;
170
171 s->device_index = PA_INVALID_INDEX;
172 s->device_name = NULL;
173 s->suspended = false;
174 s->corked = false;
175
176 s->write_memblock = NULL;
177 s->write_data = NULL;
178
179 pa_memchunk_reset(&s->peek_memchunk);
180 s->peek_data = NULL;
181 s->record_memblockq = NULL;
182
183 memset(&s->timing_info, 0, sizeof(s->timing_info));
184 s->timing_info_valid = false;
185
186 s->previous_time = 0;
187 s->latest_underrun_at_index = -1;
188
189 s->read_index_not_before = 0;
190 s->write_index_not_before = 0;
191 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
192 s->write_index_corrections[i].valid = 0;
193 s->current_write_index_correction = 0;
194
195 s->auto_timing_update_event = NULL;
196 s->auto_timing_update_requested = false;
197 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
198
199 reset_callbacks(s);
200
201 s->smoother = NULL;
202
203 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
204 PA_LLIST_PREPEND(pa_stream, c->streams, s);
205 pa_stream_ref(s);
206
207 return s;
208 }
209
pa_stream_new_with_proplist(pa_context * c,const char * name,const pa_sample_spec * ss,const pa_channel_map * map,pa_proplist * p)210 pa_stream *pa_stream_new_with_proplist(
211 pa_context *c,
212 const char *name,
213 const pa_sample_spec *ss,
214 const pa_channel_map *map,
215 pa_proplist *p) {
216
217 pa_channel_map tmap;
218
219 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
220 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
221 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
222 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
223 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
224
225 if (!map)
226 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
227
228 return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, 0, p);
229 }
230
pa_stream_new_extended(pa_context * c,const char * name,pa_format_info * const * formats,unsigned int n_formats,pa_proplist * p)231 pa_stream *pa_stream_new_extended(
232 pa_context *c,
233 const char *name,
234 pa_format_info * const *formats,
235 unsigned int n_formats,
236 pa_proplist *p) {
237
238 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
239
240 return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, n_formats, p);
241 }
242
stream_unlink(pa_stream * s)243 static void stream_unlink(pa_stream *s) {
244 pa_operation *o, *n;
245 pa_assert(s);
246
247 if (!s->context)
248 return;
249
250 /* Detach from context */
251
252 /* Unref all operation objects that point to us */
253 for (o = s->context->operations; o; o = n) {
254 n = o->next;
255
256 if (o->stream == s)
257 pa_operation_cancel(o);
258 }
259
260 /* Drop all outstanding replies for this stream */
261 if (s->context->pdispatch)
262 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
263
264 if (s->channel_valid) {
265 pa_hashmap_remove((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel));
266 s->channel = 0;
267 s->channel_valid = false;
268 }
269
270 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
271 pa_stream_unref(s);
272
273 s->context = NULL;
274
275 if (s->auto_timing_update_event) {
276 pa_assert(s->mainloop);
277 s->mainloop->time_free(s->auto_timing_update_event);
278 }
279
280 reset_callbacks(s);
281 }
282
stream_free(pa_stream * s)283 static void stream_free(pa_stream *s) {
284 unsigned int i;
285
286 pa_assert(s);
287
288 stream_unlink(s);
289
290 if (s->write_memblock) {
291 if (s->write_data)
292 pa_memblock_release(s->write_memblock);
293 pa_memblock_unref(s->write_memblock);
294 }
295
296 if (s->peek_memchunk.memblock) {
297 if (s->peek_data)
298 pa_memblock_release(s->peek_memchunk.memblock);
299 pa_memblock_unref(s->peek_memchunk.memblock);
300 }
301
302 if (s->record_memblockq)
303 pa_memblockq_free(s->record_memblockq);
304
305 if (s->proplist)
306 pa_proplist_free(s->proplist);
307
308 if (s->smoother)
309 #ifdef USE_SMOOTHER_2
310 pa_smoother_2_free(s->smoother);
311 #else
312 pa_smoother_free(s->smoother);
313 #endif
314
315 for (i = 0; i < s->n_formats; i++)
316 pa_format_info_free(s->req_formats[i]);
317
318 if (s->format)
319 pa_format_info_free(s->format);
320
321 pa_xfree(s->device_name);
322 pa_xfree(s);
323 }
324
pa_stream_unref(pa_stream * s)325 void pa_stream_unref(pa_stream *s) {
326 pa_assert(s);
327 pa_assert(PA_REFCNT_VALUE(s) >= 1);
328
329 if (PA_REFCNT_DEC(s) <= 0)
330 stream_free(s);
331 }
332
pa_stream_ref(pa_stream * s)333 pa_stream* pa_stream_ref(pa_stream *s) {
334 pa_assert(s);
335 pa_assert(PA_REFCNT_VALUE(s) >= 1);
336
337 PA_REFCNT_INC(s);
338 return s;
339 }
340
pa_stream_get_state(const pa_stream * s)341 pa_stream_state_t pa_stream_get_state(const pa_stream *s) {
342 pa_assert(s);
343 pa_assert(PA_REFCNT_VALUE(s) >= 1);
344
345 return s->state;
346 }
347
pa_stream_terminate(pa_stream * s)348 void pa_stream_terminate(pa_stream *s) {
349 pa_stream_set_state(s, PA_STREAM_TERMINATED);
350 }
351
pa_stream_get_context(const pa_stream * s)352 pa_context* pa_stream_get_context(const pa_stream *s) {
353 pa_assert(s);
354 pa_assert(PA_REFCNT_VALUE(s) >= 1);
355
356 return s->context;
357 }
358
pa_stream_get_index(const pa_stream * s)359 uint32_t pa_stream_get_index(const pa_stream *s) {
360 pa_assert(s);
361 pa_assert(PA_REFCNT_VALUE(s) >= 1);
362
363 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
364 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
365
366 return s->stream_index;
367 }
368
pa_stream_set_state(pa_stream * s,pa_stream_state_t st)369 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
370 pa_assert(s);
371 pa_assert(PA_REFCNT_VALUE(s) >= 1);
372
373 if (s->state == st)
374 return;
375
376 pa_stream_ref(s);
377
378 s->state = st;
379
380 if (s->state_callback)
381 s->state_callback(s, s->state_userdata);
382
383 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
384 stream_unlink(s);
385
386 pa_stream_unref(s);
387 }
388
request_auto_timing_update(pa_stream * s,bool force)389 static void request_auto_timing_update(pa_stream *s, bool force) {
390 pa_assert(s);
391 pa_assert(PA_REFCNT_VALUE(s) >= 1);
392
393 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
394 return;
395
396 if (s->state == PA_STREAM_READY &&
397 (force || !s->auto_timing_update_requested)) {
398 pa_operation *o;
399
400 #ifdef STREAM_DEBUG
401 pa_log_debug("Automatically requesting new timing data");
402 #endif
403
404 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
405 pa_operation_unref(o);
406 s->auto_timing_update_requested = true;
407 }
408 }
409
410 if (s->auto_timing_update_event) {
411 if (s->suspended && !force) {
412 pa_assert(s->mainloop);
413 s->mainloop->time_free(s->auto_timing_update_event);
414 s->auto_timing_update_event = NULL;
415 } else {
416 if (force)
417 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
418
419 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
420
421 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
422 }
423 }
424 }
425
pa_command_stream_killed(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)426 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
427 pa_context *c = userdata;
428 pa_stream *s;
429 uint32_t channel;
430
431 pa_assert(pd);
432 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
433 pa_assert(t);
434 pa_assert(c);
435 pa_assert(PA_REFCNT_VALUE(c) >= 1);
436
437 pa_context_ref(c);
438
439 if (pa_tagstruct_getu32(t, &channel) < 0 ||
440 !pa_tagstruct_eof(t)) {
441 pa_context_fail(c, PA_ERR_PROTOCOL);
442 goto finish;
443 }
444
445 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
446 goto finish;
447
448 if (s->state != PA_STREAM_READY)
449 goto finish;
450
451 pa_context_set_error(c, PA_ERR_KILLED);
452 pa_stream_set_state(s, PA_STREAM_FAILED);
453
454 finish:
455 pa_context_unref(c);
456 }
457
check_smoother_status(pa_stream * s,bool aposteriori,bool force_start,bool force_stop)458 static void check_smoother_status(pa_stream *s, bool aposteriori, bool force_start, bool force_stop) {
459 pa_usec_t x;
460
461 pa_assert(s);
462 pa_assert(!force_start || !force_stop);
463
464 if (!s->smoother)
465 return;
466
467 x = pa_rtclock_now();
468
469 if (s->timing_info_valid) {
470 if (aposteriori)
471 x -= s->timing_info.transport_usec;
472 else
473 x += s->timing_info.transport_usec;
474 }
475
476 if (s->suspended || s->corked || force_stop)
477 #ifdef USE_SMOOTHER_2
478 pa_smoother_2_pause(s->smoother, x);
479 #else
480 pa_smoother_pause(s->smoother, x);
481 #endif
482 else if (force_start || s->buffer_attr.prebuf == 0) {
483
484 if (!s->timing_info_valid &&
485 !aposteriori &&
486 !force_start &&
487 !force_stop &&
488 s->context->version >= 13) {
489
490 /* If the server supports STARTED events we take them as
491 * indications when audio really starts/stops playing, if
492 * we don't have any timing info yet -- instead of trying
493 * to be smart and guessing the server time. Otherwise the
494 * unknown transport delay adds too much noise to our time
495 * calculations. */
496
497 return;
498 }
499
500 #ifdef USE_SMOOTHER_2
501 pa_smoother_2_resume(s->smoother, x);
502 #else
503 pa_smoother_resume(s->smoother, x, true);
504 #endif
505 }
506
507 /* Please note that we have no idea if playback actually started
508 * if prebuf is non-zero! */
509 }
510
511 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata);
512
pa_command_stream_moved(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)513 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
514 pa_context *c = userdata;
515 pa_stream *s;
516 uint32_t channel;
517 const char *dn;
518 bool suspended;
519 uint32_t di;
520 pa_usec_t usec = 0;
521 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
522
523 pa_assert(pd);
524 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
525 pa_assert(t);
526 pa_assert(c);
527 pa_assert(PA_REFCNT_VALUE(c) >= 1);
528
529 pa_context_ref(c);
530
531 if (c->version < 12) {
532 pa_context_fail(c, PA_ERR_PROTOCOL);
533 goto finish;
534 }
535
536 if (pa_tagstruct_getu32(t, &channel) < 0 ||
537 pa_tagstruct_getu32(t, &di) < 0 ||
538 pa_tagstruct_gets(t, &dn) < 0 ||
539 pa_tagstruct_get_boolean(t, &suspended) < 0) {
540 pa_context_fail(c, PA_ERR_PROTOCOL);
541 goto finish;
542 }
543
544 if (c->version >= 13) {
545
546 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
547 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
548 pa_tagstruct_getu32(t, &fragsize) < 0 ||
549 pa_tagstruct_get_usec(t, &usec) < 0) {
550 pa_context_fail(c, PA_ERR_PROTOCOL);
551 goto finish;
552 }
553 } else {
554 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
555 pa_tagstruct_getu32(t, &tlength) < 0 ||
556 pa_tagstruct_getu32(t, &prebuf) < 0 ||
557 pa_tagstruct_getu32(t, &minreq) < 0 ||
558 pa_tagstruct_get_usec(t, &usec) < 0) {
559 pa_context_fail(c, PA_ERR_PROTOCOL);
560 goto finish;
561 }
562 }
563 }
564
565 if (!pa_tagstruct_eof(t)) {
566 pa_context_fail(c, PA_ERR_PROTOCOL);
567 goto finish;
568 }
569
570 if (!dn || di == PA_INVALID_INDEX) {
571 pa_context_fail(c, PA_ERR_PROTOCOL);
572 goto finish;
573 }
574
575 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
576 goto finish;
577
578 if (s->state != PA_STREAM_READY)
579 goto finish;
580
581 if (c->version >= 13) {
582 if (s->direction == PA_STREAM_RECORD)
583 s->timing_info.configured_source_usec = usec;
584 else
585 s->timing_info.configured_sink_usec = usec;
586
587 s->buffer_attr.maxlength = maxlength;
588 s->buffer_attr.fragsize = fragsize;
589 s->buffer_attr.tlength = tlength;
590 s->buffer_attr.prebuf = prebuf;
591 s->buffer_attr.minreq = minreq;
592 }
593
594 pa_xfree(s->device_name);
595 s->device_name = pa_xstrdup(dn);
596 s->device_index = di;
597
598 s->suspended = suspended;
599
600 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
601 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
602 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
603 request_auto_timing_update(s, true);
604 }
605
606 check_smoother_status(s, true, false, false);
607 request_auto_timing_update(s, true);
608
609 if (s->moved_callback)
610 s->moved_callback(s, s->moved_userdata);
611
612 finish:
613 pa_context_unref(c);
614 }
615
pa_command_stream_buffer_attr(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)616 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
617 pa_context *c = userdata;
618 pa_stream *s;
619 uint32_t channel;
620 pa_usec_t usec = 0;
621 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
622
623 pa_assert(pd);
624 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
625 pa_assert(t);
626 pa_assert(c);
627 pa_assert(PA_REFCNT_VALUE(c) >= 1);
628
629 pa_context_ref(c);
630
631 if (c->version < 15) {
632 pa_context_fail(c, PA_ERR_PROTOCOL);
633 goto finish;
634 }
635
636 if (pa_tagstruct_getu32(t, &channel) < 0) {
637 pa_context_fail(c, PA_ERR_PROTOCOL);
638 goto finish;
639 }
640
641 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
642 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
643 pa_tagstruct_getu32(t, &fragsize) < 0 ||
644 pa_tagstruct_get_usec(t, &usec) < 0) {
645 pa_context_fail(c, PA_ERR_PROTOCOL);
646 goto finish;
647 }
648 } else {
649 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
650 pa_tagstruct_getu32(t, &tlength) < 0 ||
651 pa_tagstruct_getu32(t, &prebuf) < 0 ||
652 pa_tagstruct_getu32(t, &minreq) < 0 ||
653 pa_tagstruct_get_usec(t, &usec) < 0) {
654 pa_context_fail(c, PA_ERR_PROTOCOL);
655 goto finish;
656 }
657 }
658
659 if (!pa_tagstruct_eof(t)) {
660 pa_context_fail(c, PA_ERR_PROTOCOL);
661 goto finish;
662 }
663
664 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
665 goto finish;
666
667 if (s->state != PA_STREAM_READY)
668 goto finish;
669
670 if (s->direction == PA_STREAM_RECORD)
671 s->timing_info.configured_source_usec = usec;
672 else
673 s->timing_info.configured_sink_usec = usec;
674
675 s->buffer_attr.maxlength = maxlength;
676 s->buffer_attr.fragsize = fragsize;
677 s->buffer_attr.tlength = tlength;
678 s->buffer_attr.prebuf = prebuf;
679 s->buffer_attr.minreq = minreq;
680
681 request_auto_timing_update(s, true);
682
683 if (s->buffer_attr_callback)
684 s->buffer_attr_callback(s, s->buffer_attr_userdata);
685
686 finish:
687 pa_context_unref(c);
688 }
689
pa_command_stream_suspended(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)690 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
691 pa_context *c = userdata;
692 pa_stream *s;
693 uint32_t channel;
694 bool suspended;
695
696 pa_assert(pd);
697 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
698 pa_assert(t);
699 pa_assert(c);
700 pa_assert(PA_REFCNT_VALUE(c) >= 1);
701
702 pa_context_ref(c);
703
704 if (c->version < 12) {
705 pa_context_fail(c, PA_ERR_PROTOCOL);
706 goto finish;
707 }
708
709 if (pa_tagstruct_getu32(t, &channel) < 0 ||
710 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
711 !pa_tagstruct_eof(t)) {
712 pa_context_fail(c, PA_ERR_PROTOCOL);
713 goto finish;
714 }
715
716 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
717 goto finish;
718
719 if (s->state != PA_STREAM_READY)
720 goto finish;
721
722 s->suspended = suspended;
723
724 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
725 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
726 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
727 request_auto_timing_update(s, true);
728 }
729
730 check_smoother_status(s, true, false, false);
731 request_auto_timing_update(s, true);
732
733 if (s->suspended_callback)
734 s->suspended_callback(s, s->suspended_userdata);
735
736 finish:
737 pa_context_unref(c);
738 }
739
pa_command_stream_started(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)740 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
741 pa_context *c = userdata;
742 pa_stream *s;
743 uint32_t channel;
744
745 pa_assert(pd);
746 pa_assert(command == PA_COMMAND_STARTED);
747 pa_assert(t);
748 pa_assert(c);
749 pa_assert(PA_REFCNT_VALUE(c) >= 1);
750
751 pa_context_ref(c);
752
753 if (c->version < 13) {
754 pa_context_fail(c, PA_ERR_PROTOCOL);
755 goto finish;
756 }
757
758 if (pa_tagstruct_getu32(t, &channel) < 0 ||
759 !pa_tagstruct_eof(t)) {
760 pa_context_fail(c, PA_ERR_PROTOCOL);
761 goto finish;
762 }
763
764 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
765 goto finish;
766
767 if (s->state != PA_STREAM_READY)
768 goto finish;
769
770 check_smoother_status(s, true, true, false);
771 request_auto_timing_update(s, true);
772
773 if (s->started_callback)
774 s->started_callback(s, s->started_userdata);
775
776 finish:
777 pa_context_unref(c);
778 }
779
pa_command_stream_event(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)780 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
781 pa_context *c = userdata;
782 pa_stream *s;
783 uint32_t channel;
784 pa_proplist *pl = NULL;
785 const char *event;
786
787 pa_assert(pd);
788 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
789 pa_assert(t);
790 pa_assert(c);
791 pa_assert(PA_REFCNT_VALUE(c) >= 1);
792
793 pa_context_ref(c);
794
795 if (c->version < 15) {
796 pa_context_fail(c, PA_ERR_PROTOCOL);
797 goto finish;
798 }
799
800 pl = pa_proplist_new();
801
802 if (pa_tagstruct_getu32(t, &channel) < 0 ||
803 pa_tagstruct_gets(t, &event) < 0 ||
804 pa_tagstruct_get_proplist(t, pl) < 0 ||
805 !pa_tagstruct_eof(t) || !event) {
806 pa_context_fail(c, PA_ERR_PROTOCOL);
807 goto finish;
808 }
809
810 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
811 goto finish;
812
813 if (s->state != PA_STREAM_READY)
814 goto finish;
815
816 if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) {
817 /* Let client know what the running time was when the stream had to be killed */
818 pa_usec_t stream_time;
819 if (pa_stream_get_time(s, &stream_time) == 0)
820 pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) stream_time);
821 }
822
823 if (s->event_callback)
824 s->event_callback(s, event, pl, s->event_userdata);
825
826 finish:
827 pa_context_unref(c);
828
829 if (pl)
830 pa_proplist_free(pl);
831 }
832
pa_command_request(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)833 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
834 pa_stream *s;
835 pa_context *c = userdata;
836 uint32_t bytes, channel;
837
838 pa_assert(pd);
839 pa_assert(command == PA_COMMAND_REQUEST);
840 pa_assert(t);
841 pa_assert(c);
842 pa_assert(PA_REFCNT_VALUE(c) >= 1);
843
844 pa_context_ref(c);
845
846 if (pa_tagstruct_getu32(t, &channel) < 0 ||
847 pa_tagstruct_getu32(t, &bytes) < 0 ||
848 !pa_tagstruct_eof(t)) {
849 pa_context_fail(c, PA_ERR_PROTOCOL);
850 goto finish;
851 }
852
853 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
854 goto finish;
855
856 if (s->state != PA_STREAM_READY)
857 goto finish;
858
859 s->requested_bytes += bytes;
860
861 #ifdef STREAM_DEBUG
862 pa_log_debug("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes);
863 #endif
864
865 if (s->requested_bytes > 0 && s->write_callback)
866 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
867
868 finish:
869 pa_context_unref(c);
870 }
871
pa_stream_get_underflow_index(const pa_stream * p)872 int64_t pa_stream_get_underflow_index(const pa_stream *p) {
873 pa_assert(p);
874 return p->latest_underrun_at_index;
875 }
876
pa_command_overflow_or_underflow(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)877 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
878 pa_stream *s;
879 pa_context *c = userdata;
880 uint32_t channel;
881 int64_t offset = -1;
882
883 pa_assert(pd);
884 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW
885 || command == PA_COMMAND_UNDERFLOW_OHOS);
886 pa_assert(t);
887 pa_assert(c);
888 pa_assert(PA_REFCNT_VALUE(c) >= 1);
889
890 pa_context_ref(c);
891
892 if (pa_tagstruct_getu32(t, &channel) < 0) {
893 pa_context_fail(c, PA_ERR_PROTOCOL);
894 goto finish;
895 }
896
897 if (c->version >= 23 && command == PA_COMMAND_UNDERFLOW) {
898 if (pa_tagstruct_gets64(t, &offset) < 0) {
899 pa_context_fail(c, PA_ERR_PROTOCOL);
900 goto finish;
901 }
902 }
903
904 if (!pa_tagstruct_eof(t)) {
905 pa_context_fail(c, PA_ERR_PROTOCOL);
906 goto finish;
907 }
908
909 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
910 goto finish;
911
912 if (s->state != PA_STREAM_READY)
913 goto finish;
914
915 if (command == PA_COMMAND_UNDERFLOW_OHOS) {
916 if (s->underflow_ohos_callback) {
917 s->underflow_ohos_callback(s, s->underflow_ohos_userdata);
918 }
919 goto finish;
920 }
921
922 if (offset != -1)
923 s->latest_underrun_at_index = offset;
924
925 if (s->buffer_attr.prebuf > 0)
926 check_smoother_status(s, true, false, true);
927
928 request_auto_timing_update(s, true);
929
930 if (command == PA_COMMAND_OVERFLOW) {
931 if (s->overflow_callback)
932 s->overflow_callback(s, s->overflow_userdata);
933 } else if (command == PA_COMMAND_UNDERFLOW) {
934 if (s->underflow_callback)
935 s->underflow_callback(s, s->underflow_userdata);
936 }
937
938 finish:
939 pa_context_unref(c);
940 }
941
invalidate_indexes(pa_stream * s,bool r,bool w)942 static void invalidate_indexes(pa_stream *s, bool r, bool w) {
943 pa_assert(s);
944 pa_assert(PA_REFCNT_VALUE(s) >= 1);
945
946 #ifdef STREAM_DEBUG
947 pa_log_debug("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag);
948 #endif
949
950 if (s->state != PA_STREAM_READY)
951 return;
952
953 if (w) {
954 s->write_index_not_before = s->context->ctag;
955
956 if (s->timing_info_valid)
957 s->timing_info.write_index_corrupt = true;
958
959 #ifdef STREAM_DEBUG
960 pa_log_debug("write_index invalidated");
961 #endif
962 }
963
964 if (r) {
965 s->read_index_not_before = s->context->ctag;
966
967 if (s->timing_info_valid)
968 s->timing_info.read_index_corrupt = true;
969
970 #ifdef STREAM_DEBUG
971 pa_log_debug("read_index invalidated");
972 #endif
973 }
974
975 request_auto_timing_update(s, true);
976 }
977
auto_timing_update_callback(pa_mainloop_api * m,pa_time_event * e,const struct timeval * t,void * userdata)978 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
979 pa_stream *s = userdata;
980
981 pa_assert(s);
982 pa_assert(PA_REFCNT_VALUE(s) >= 1);
983
984 pa_stream_ref(s);
985 request_auto_timing_update(s, false);
986 pa_stream_unref(s);
987 }
988
create_stream_complete(pa_stream * s)989 static void create_stream_complete(pa_stream *s) {
990 pa_assert(s);
991 pa_assert(PA_REFCNT_VALUE(s) >= 1);
992 pa_assert(s->state == PA_STREAM_CREATING);
993
994 pa_stream_set_state(s, PA_STREAM_READY);
995
996 if (s->requested_bytes > 0 && s->write_callback)
997 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
998
999 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
1000 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
1001 pa_assert(!s->auto_timing_update_event);
1002 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
1003
1004 request_auto_timing_update(s, true);
1005 }
1006
1007 check_smoother_status(s, true, false, false);
1008 }
1009
patch_buffer_attr(pa_stream * s,pa_buffer_attr * attr,pa_stream_flags_t * flags)1010 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
1011 const char *e;
1012
1013 pa_assert(s);
1014 pa_assert(attr);
1015
1016 if ((e = getenv("PULSE_LATENCY_MSEC"))) {
1017 uint32_t ms;
1018 pa_sample_spec ss;
1019
1020 pa_sample_spec_init(&ss);
1021
1022 if (pa_sample_spec_valid(&s->sample_spec))
1023 ss = s->sample_spec;
1024 else if (s->n_formats == 1)
1025 pa_format_info_to_sample_spec(s->req_formats[0], &ss, NULL);
1026
1027 if (pa_atou(e, &ms) < 0 || ms <= 0)
1028 pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
1029 else if (!pa_sample_spec_valid(&s->sample_spec))
1030 pa_log_debug("Ignoring $PULSE_LATENCY_MSEC: %s (invalid sample spec)", e);
1031 else {
1032 attr->maxlength = (uint32_t) -1;
1033 attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &ss);
1034 attr->minreq = (uint32_t) -1;
1035 attr->prebuf = (uint32_t) -1;
1036 attr->fragsize = attr->tlength;
1037
1038 if (flags)
1039 *flags |= PA_STREAM_ADJUST_LATENCY;
1040 }
1041 }
1042
1043 if (s->context->version >= 13)
1044 return;
1045
1046 /* Version older than 0.9.10 didn't do server side buffer_attr
1047 * selection, hence we have to fake it on the client side. */
1048
1049 /* We choose fairly conservative values here, to not confuse
1050 * old clients with extremely large playback buffers */
1051
1052 if (attr->maxlength == (uint32_t) -1)
1053 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
1054
1055 if (attr->tlength == (uint32_t) -1)
1056 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
1057
1058 if (attr->minreq == (uint32_t) -1)
1059 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
1060
1061 if (attr->prebuf == (uint32_t) -1)
1062 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
1063
1064 if (attr->fragsize == (uint32_t) -1)
1065 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
1066 }
1067
pa_create_stream_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1068 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1069 pa_stream *s = userdata;
1070 uint32_t requested_bytes = 0;
1071
1072 pa_assert(pd);
1073 pa_assert(s);
1074 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1075 pa_assert(s->state == PA_STREAM_CREATING);
1076
1077 pa_stream_ref(s);
1078
1079 if (command != PA_COMMAND_REPLY) {
1080 if (pa_context_handle_error(s->context, command, t, false) < 0)
1081 goto finish;
1082
1083 pa_stream_set_state(s, PA_STREAM_FAILED);
1084 goto finish;
1085 }
1086
1087 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
1088 s->channel == PA_INVALID_INDEX ||
1089 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
1090 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
1091 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1092 goto finish;
1093 }
1094
1095 s->requested_bytes = (int64_t) requested_bytes;
1096
1097 if (s->context->version >= 9) {
1098 if (s->direction == PA_STREAM_PLAYBACK) {
1099 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1100 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1101 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1102 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1103 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1104 goto finish;
1105 }
1106 } else if (s->direction == PA_STREAM_RECORD) {
1107 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1108 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1109 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1110 goto finish;
1111 }
1112 }
1113 }
1114
1115 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1116 pa_sample_spec ss;
1117 pa_channel_map cm;
1118 const char *dn = NULL;
1119 bool suspended;
1120
1121 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1122 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1123 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1124 pa_tagstruct_gets(t, &dn) < 0 ||
1125 pa_tagstruct_get_boolean(t, &suspended) < 0) {
1126 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1127 goto finish;
1128 }
1129
1130 if (!dn || s->device_index == PA_INVALID_INDEX ||
1131 ss.channels != cm.channels ||
1132 !pa_channel_map_valid(&cm) ||
1133 !pa_sample_spec_valid(&ss) ||
1134 (s->n_formats == 0 && (
1135 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1136 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1137 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1138 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1139 goto finish;
1140 }
1141
1142 pa_xfree(s->device_name);
1143 s->device_name = pa_xstrdup(dn);
1144 s->suspended = suspended;
1145
1146 s->channel_map = cm;
1147 s->sample_spec = ss;
1148 }
1149
1150 #ifdef USE_SMOOTHER_2
1151 if (s->flags & PA_STREAM_INTERPOLATE_TIMING)
1152 pa_smoother_2_set_sample_spec(s->smoother, pa_rtclock_now(), &s->sample_spec);
1153 #endif
1154
1155 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1156 pa_usec_t usec;
1157
1158 if (pa_tagstruct_get_usec(t, &usec) < 0) {
1159 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1160 goto finish;
1161 }
1162
1163 if (s->direction == PA_STREAM_RECORD)
1164 s->timing_info.configured_source_usec = usec;
1165 else
1166 s->timing_info.configured_sink_usec = usec;
1167 }
1168
1169 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1170 || s->context->version >= 22) {
1171
1172 pa_format_info *f = pa_format_info_new();
1173
1174 if (pa_tagstruct_get_format_info(t, f) < 0 || !pa_format_info_valid(f)) {
1175 pa_format_info_free(f);
1176 if (s->n_formats > 0) {
1177 /* We used the extended API, so we should have got back a proper format */
1178 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1179 goto finish;
1180 }
1181 } else
1182 s->format = f;
1183 }
1184
1185 if (!pa_tagstruct_eof(t)) {
1186 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1187 goto finish;
1188 }
1189
1190 if (s->direction == PA_STREAM_RECORD) {
1191 pa_assert(!s->record_memblockq);
1192
1193 s->record_memblockq = pa_memblockq_new(
1194 "client side record memblockq",
1195 0,
1196 s->buffer_attr.maxlength,
1197 0,
1198 &s->sample_spec,
1199 1,
1200 0,
1201 0,
1202 NULL);
1203 }
1204
1205 s->channel_valid = true;
1206 pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1207
1208 create_stream_complete(s);
1209
1210 finish:
1211 pa_stream_unref(s);
1212 }
1213
create_stream(pa_stream_direction_t direction,pa_stream * s,const char * dev,const pa_buffer_attr * attr,pa_stream_flags_t flags,const pa_cvolume * volume,pa_stream * sync_stream)1214 static int create_stream(
1215 pa_stream_direction_t direction,
1216 pa_stream *s,
1217 const char *dev,
1218 const pa_buffer_attr *attr,
1219 pa_stream_flags_t flags,
1220 const pa_cvolume *volume,
1221 pa_stream *sync_stream) {
1222
1223 pa_tagstruct *t;
1224 uint32_t tag;
1225 bool volume_set = !!volume;
1226 pa_cvolume cv;
1227 uint32_t i;
1228
1229 pa_assert(s);
1230 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1231 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1232
1233 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1234 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1235 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1236 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1237 PA_STREAM_INTERPOLATE_TIMING|
1238 PA_STREAM_NOT_MONOTONIC|
1239 PA_STREAM_AUTO_TIMING_UPDATE|
1240 PA_STREAM_NO_REMAP_CHANNELS|
1241 PA_STREAM_NO_REMIX_CHANNELS|
1242 PA_STREAM_FIX_FORMAT|
1243 PA_STREAM_FIX_RATE|
1244 PA_STREAM_FIX_CHANNELS|
1245 PA_STREAM_DONT_MOVE|
1246 PA_STREAM_VARIABLE_RATE|
1247 PA_STREAM_PEAK_DETECT|
1248 PA_STREAM_START_MUTED|
1249 PA_STREAM_ADJUST_LATENCY|
1250 PA_STREAM_EARLY_REQUESTS|
1251 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1252 PA_STREAM_START_UNMUTED|
1253 PA_STREAM_FAIL_ON_SUSPEND|
1254 PA_STREAM_RELATIVE_VOLUME|
1255 PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1256
1257 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1258 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1259 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1260 /* Although some of the other flags are not supported on older
1261 * version, we don't check for them here, because it doesn't hurt
1262 * when they are passed but actually not supported. This makes
1263 * client development easier */
1264
1265 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1266 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1267 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1268
1269 pa_stream_ref(s);
1270
1271 s->direction = direction;
1272
1273 if (sync_stream)
1274 s->syncid = sync_stream->syncid;
1275
1276 if (attr)
1277 s->buffer_attr = *attr;
1278 patch_buffer_attr(s, &s->buffer_attr, &flags);
1279
1280 s->flags = flags;
1281 s->corked = !!(flags & PA_STREAM_START_CORKED);
1282
1283 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1284 pa_usec_t x;
1285
1286 x = pa_rtclock_now();
1287
1288 pa_assert(!s->smoother);
1289 #ifdef USE_SMOOTHER_2
1290 s->smoother = pa_smoother_2_new(SMOOTHER_HISTORY_TIME, x, 0, 0);
1291 #else
1292 s->smoother = pa_smoother_new(
1293 SMOOTHER_ADJUST_TIME,
1294 SMOOTHER_HISTORY_TIME,
1295 !(flags & PA_STREAM_NOT_MONOTONIC),
1296 true,
1297 SMOOTHER_MIN_HISTORY,
1298 x,
1299 true);
1300 #endif
1301 }
1302
1303 if (!dev)
1304 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1305
1306 t = pa_tagstruct_command(
1307 s->context,
1308 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1309 &tag);
1310
1311 if (s->context->version < 13)
1312 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1313
1314 pa_tagstruct_put(
1315 t,
1316 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1317 PA_TAG_CHANNEL_MAP, &s->channel_map,
1318 PA_TAG_U32, PA_INVALID_INDEX,
1319 PA_TAG_STRING, dev,
1320 PA_TAG_U32, s->buffer_attr.maxlength,
1321 PA_TAG_BOOLEAN, s->corked,
1322 PA_TAG_INVALID);
1323
1324 if (!volume) {
1325 if (pa_sample_spec_valid(&s->sample_spec))
1326 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1327 else {
1328 /* This is not really relevant, since no volume was set, and
1329 * the real number of channels is embedded in the format_info
1330 * structure */
1331 volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1332 }
1333 }
1334
1335 if (s->direction == PA_STREAM_PLAYBACK) {
1336 pa_tagstruct_put(
1337 t,
1338 PA_TAG_U32, s->buffer_attr.tlength,
1339 PA_TAG_U32, s->buffer_attr.prebuf,
1340 PA_TAG_U32, s->buffer_attr.minreq,
1341 PA_TAG_U32, s->syncid,
1342 PA_TAG_INVALID);
1343
1344 pa_tagstruct_put_cvolume(t, volume);
1345 } else
1346 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1347
1348 if (s->context->version >= 12) {
1349 pa_tagstruct_put(
1350 t,
1351 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1352 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1353 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1354 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1355 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1356 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1357 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1358 PA_TAG_INVALID);
1359 }
1360
1361 if (s->context->version >= 13) {
1362
1363 if (s->direction == PA_STREAM_PLAYBACK)
1364 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1365 else
1366 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1367
1368 pa_tagstruct_put(
1369 t,
1370 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1371 PA_TAG_PROPLIST, s->proplist,
1372 PA_TAG_INVALID);
1373
1374 if (s->direction == PA_STREAM_RECORD)
1375 pa_tagstruct_putu32(t, s->direct_on_input);
1376 }
1377
1378 if (s->context->version >= 14) {
1379
1380 if (s->direction == PA_STREAM_PLAYBACK)
1381 pa_tagstruct_put_boolean(t, volume_set);
1382
1383 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1384 }
1385
1386 if (s->context->version >= 15) {
1387
1388 if (s->direction == PA_STREAM_PLAYBACK)
1389 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1390
1391 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1392 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1393 }
1394
1395 if (s->context->version >= 17 && s->direction == PA_STREAM_PLAYBACK)
1396 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1397
1398 if (s->context->version >= 18 && s->direction == PA_STREAM_PLAYBACK)
1399 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1400
1401 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1402 || s->context->version >= 22) {
1403
1404 pa_tagstruct_putu8(t, s->n_formats);
1405 for (i = 0; i < s->n_formats; i++)
1406 pa_tagstruct_put_format_info(t, s->req_formats[i]);
1407 }
1408
1409 if (s->context->version >= 22 && s->direction == PA_STREAM_RECORD) {
1410 pa_tagstruct_put_cvolume(t, volume);
1411 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1412 pa_tagstruct_put_boolean(t, volume_set);
1413 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1414 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1415 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1416 }
1417
1418 pa_pstream_send_tagstruct(s->context->pstream, t);
1419 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1420
1421 pa_stream_set_state(s, PA_STREAM_CREATING);
1422
1423 pa_stream_unref(s);
1424 return 0;
1425 }
1426
pa_stream_connect_playback(pa_stream * s,const char * dev,const pa_buffer_attr * attr,pa_stream_flags_t flags,const pa_cvolume * volume,pa_stream * sync_stream)1427 int pa_stream_connect_playback(
1428 pa_stream *s,
1429 const char *dev,
1430 const pa_buffer_attr *attr,
1431 pa_stream_flags_t flags,
1432 const pa_cvolume *volume,
1433 pa_stream *sync_stream) {
1434
1435 pa_assert(s);
1436 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1437
1438 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1439 }
1440
pa_stream_connect_record(pa_stream * s,const char * dev,const pa_buffer_attr * attr,pa_stream_flags_t flags)1441 int pa_stream_connect_record(
1442 pa_stream *s,
1443 const char *dev,
1444 const pa_buffer_attr *attr,
1445 pa_stream_flags_t flags) {
1446
1447 pa_assert(s);
1448 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1449
1450 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1451 }
1452
pa_stream_begin_write(pa_stream * s,void ** data,size_t * nbytes)1453 int pa_stream_begin_write(
1454 pa_stream *s,
1455 void **data,
1456 size_t *nbytes) {
1457
1458 pa_assert(s);
1459 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1460
1461 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1462 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1463 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1464 PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1465 PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1466
1467 if (*nbytes != (size_t) -1) {
1468 size_t m, fs;
1469
1470 m = pa_mempool_block_size_max(s->context->mempool);
1471 fs = pa_frame_size(&s->sample_spec);
1472
1473 m = (m / fs) * fs;
1474 if (*nbytes > m)
1475 *nbytes = m;
1476 }
1477
1478 if (!s->write_memblock) {
1479 s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1480 s->write_data = pa_memblock_acquire(s->write_memblock);
1481 }
1482
1483 *data = s->write_data;
1484 *nbytes = pa_memblock_get_length(s->write_memblock);
1485
1486 return 0;
1487 }
1488
pa_stream_cancel_write(pa_stream * s)1489 int pa_stream_cancel_write(
1490 pa_stream *s) {
1491
1492 pa_assert(s);
1493 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1494
1495 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1496 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1497 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1498 PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1499
1500 pa_assert(s->write_data);
1501
1502 pa_memblock_release(s->write_memblock);
1503 pa_memblock_unref(s->write_memblock);
1504 s->write_memblock = NULL;
1505 s->write_data = NULL;
1506
1507 return 0;
1508 }
1509
pa_stream_write_ext_free(pa_stream * s,const void * data,size_t length,pa_free_cb_t free_cb,void * free_cb_data,int64_t offset,pa_seek_mode_t seek)1510 int pa_stream_write_ext_free(
1511 pa_stream *s,
1512 const void *data,
1513 size_t length,
1514 pa_free_cb_t free_cb,
1515 void *free_cb_data,
1516 int64_t offset,
1517 pa_seek_mode_t seek) {
1518
1519 pa_assert(s);
1520 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1521 pa_assert(data);
1522
1523 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1524 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1525 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1526 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1527 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1528 PA_CHECK_VALIDITY(s->context,
1529 !s->write_memblock ||
1530 ((data >= s->write_data) &&
1531 ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1532 PA_ERR_INVALID);
1533 PA_CHECK_VALIDITY(s->context, offset % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID);
1534 PA_CHECK_VALIDITY(s->context, length % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID);
1535 PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1536
1537 if (s->write_memblock) {
1538 pa_memchunk chunk;
1539
1540 /* pa_stream_write_begin() was called before */
1541
1542 pa_memblock_release(s->write_memblock);
1543
1544 chunk.memblock = s->write_memblock;
1545 chunk.index = (const char *) data - (const char *) s->write_data;
1546 chunk.length = length;
1547
1548 s->write_memblock = NULL;
1549 s->write_data = NULL;
1550
1551 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk, pa_frame_size(&s->sample_spec));
1552 pa_memblock_unref(chunk.memblock);
1553
1554 } else {
1555 pa_seek_mode_t t_seek = seek;
1556 int64_t t_offset = offset;
1557 size_t t_length = length;
1558 const void *t_data = data;
1559
1560 /* pa_stream_write_begin() was not called before */
1561
1562 while (t_length > 0) {
1563 pa_memchunk chunk;
1564
1565 chunk.index = 0;
1566
1567 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1568 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, free_cb_data, 1);
1569 chunk.length = t_length;
1570 } else {
1571 void *d;
1572 size_t blk_size_max;
1573
1574 /* Break large audio streams into _aligned_ blocks or the
1575 * other endpoint will happily discard them upon arrival. */
1576 blk_size_max = pa_frame_align(pa_mempool_block_size_max(s->context->mempool), &s->sample_spec);
1577 chunk.length = PA_MIN(t_length, blk_size_max);
1578 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1579
1580 d = pa_memblock_acquire(chunk.memblock);
1581 memcpy(d, t_data, chunk.length);
1582 pa_memblock_release(chunk.memblock);
1583 }
1584
1585 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk, pa_frame_size(&s->sample_spec));
1586
1587 t_offset = 0;
1588 t_seek = PA_SEEK_RELATIVE;
1589
1590 t_data = (const uint8_t*) t_data + chunk.length;
1591 t_length -= chunk.length;
1592
1593 pa_memblock_unref(chunk.memblock);
1594 }
1595
1596 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1597 free_cb(free_cb_data);
1598 }
1599
1600 /* This is obviously wrong since we ignore the seeking index . But
1601 * that's OK, the server side applies the same error */
1602 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1603
1604 #ifdef STREAM_DEBUG
1605 pa_log_debug("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes);
1606 #endif
1607
1608 if (s->direction == PA_STREAM_PLAYBACK) {
1609
1610 /* Update latency request correction */
1611 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1612
1613 if (seek == PA_SEEK_ABSOLUTE) {
1614 s->write_index_corrections[s->current_write_index_correction].corrupt = false;
1615 s->write_index_corrections[s->current_write_index_correction].absolute = true;
1616 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1617 } else if (seek == PA_SEEK_RELATIVE) {
1618 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1619 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1620 } else
1621 s->write_index_corrections[s->current_write_index_correction].corrupt = true;
1622 }
1623
1624 /* Update the write index in the already available latency data */
1625 if (s->timing_info_valid) {
1626
1627 if (seek == PA_SEEK_ABSOLUTE) {
1628 s->timing_info.write_index_corrupt = false;
1629 s->timing_info.write_index = offset + (int64_t) length;
1630 } else if (seek == PA_SEEK_RELATIVE) {
1631 if (!s->timing_info.write_index_corrupt)
1632 s->timing_info.write_index += offset + (int64_t) length;
1633 } else
1634 s->timing_info.write_index_corrupt = true;
1635 }
1636
1637 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1638 request_auto_timing_update(s, true);
1639 }
1640
1641 return 0;
1642 }
1643
pa_stream_write(pa_stream * s,const void * data,size_t length,pa_free_cb_t free_cb,int64_t offset,pa_seek_mode_t seek)1644 int pa_stream_write(
1645 pa_stream *s,
1646 const void *data,
1647 size_t length,
1648 pa_free_cb_t free_cb,
1649 int64_t offset,
1650 pa_seek_mode_t seek) {
1651
1652 return pa_stream_write_ext_free(s, data, length, free_cb, (void*) data, offset, seek);
1653 }
1654
pa_stream_peek(pa_stream * s,const void ** data,size_t * length)1655 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1656 pa_assert(s);
1657 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1658 pa_assert(data);
1659 pa_assert(length);
1660
1661 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1662 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1663 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1664
1665 if (!s->peek_memchunk.memblock) {
1666
1667 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1668 /* record_memblockq is empty. */
1669 *data = NULL;
1670 *length = 0;
1671 return 0;
1672
1673 } else if (!s->peek_memchunk.memblock) {
1674 /* record_memblockq isn't empty, but it doesn't have any data at
1675 * the current read index. */
1676 *data = NULL;
1677 *length = s->peek_memchunk.length;
1678 return 0;
1679 }
1680
1681 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1682 }
1683
1684 pa_assert(s->peek_data);
1685 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1686 *length = s->peek_memchunk.length;
1687 return 0;
1688 }
1689
pa_stream_drop(pa_stream * s)1690 int pa_stream_drop(pa_stream *s) {
1691 pa_assert(s);
1692 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1693
1694 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1695 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1696 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1697 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.length > 0, PA_ERR_BADSTATE);
1698
1699 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1700
1701 /* Fix the simulated local read index */
1702 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1703 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1704
1705 if (s->peek_memchunk.memblock) {
1706 pa_assert(s->peek_data);
1707 s->peek_data = NULL;
1708 pa_memblock_release(s->peek_memchunk.memblock);
1709 pa_memblock_unref(s->peek_memchunk.memblock);
1710 }
1711
1712 pa_memchunk_reset(&s->peek_memchunk);
1713
1714 return 0;
1715 }
1716
pa_stream_writable_size(const pa_stream * s)1717 size_t pa_stream_writable_size(const pa_stream *s) {
1718 pa_assert(s);
1719 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1720
1721 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1722 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1723 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1724
1725 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1726 }
1727
pa_stream_readable_size(const pa_stream * s)1728 size_t pa_stream_readable_size(const pa_stream *s) {
1729 pa_assert(s);
1730 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1731
1732 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1733 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1734 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1735
1736 return pa_memblockq_get_length(s->record_memblockq);
1737 }
1738
pa_stream_drain(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)1739 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1740 pa_operation *o;
1741 pa_tagstruct *t;
1742 uint32_t tag;
1743
1744 pa_assert(s);
1745 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1746
1747 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1748 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1749 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1750
1751 /* Ask for a timing update before we cork/uncork to get the best
1752 * accuracy for the transport latency suitable for the
1753 * check_smoother_status() call in the started callback */
1754 request_auto_timing_update(s, true);
1755
1756 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1757
1758 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1759 pa_tagstruct_putu32(t, s->channel);
1760 pa_pstream_send_tagstruct(s->context->pstream, t);
1761 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1762
1763 /* This might cause the read index to continue again, hence
1764 * let's request a timing update */
1765 request_auto_timing_update(s, true);
1766
1767 return o;
1768 }
1769
calc_time(const pa_stream * s,bool ignore_transport)1770 static pa_usec_t calc_time(const pa_stream *s, bool ignore_transport) {
1771 pa_usec_t usec;
1772
1773 pa_assert(s);
1774 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1775 pa_assert(s->state == PA_STREAM_READY);
1776 pa_assert(s->direction != PA_STREAM_UPLOAD);
1777 pa_assert(s->timing_info_valid);
1778 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1779 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1780
1781 if (s->direction == PA_STREAM_PLAYBACK) {
1782 /* The last byte that was written into the output device
1783 * had this time value associated */
1784 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1785
1786 if (!s->corked && !s->suspended) {
1787
1788 if (!ignore_transport)
1789 /* Because the latency info took a little time to come
1790 * to us, we assume that the real output time is actually
1791 * a little ahead */
1792 usec += s->timing_info.transport_usec;
1793
1794 /* However, the output device usually maintains a buffer
1795 too, hence the real sample currently played is a little
1796 back */
1797 if (s->timing_info.sink_usec >= usec)
1798 usec = 0;
1799 else
1800 usec -= s->timing_info.sink_usec;
1801 }
1802
1803 } else {
1804 pa_assert(s->direction == PA_STREAM_RECORD);
1805
1806 /* The last byte written into the server side queue had
1807 * this time value associated */
1808 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1809
1810 if (!s->corked && !s->suspended) {
1811
1812 if (!ignore_transport)
1813 /* Add transport latency */
1814 usec += s->timing_info.transport_usec;
1815
1816 /* Add latency of data in device buffer */
1817 usec += s->timing_info.source_usec;
1818
1819 /* If this is a monitor source, we need to correct the
1820 * time by the playback device buffer */
1821 if (s->timing_info.sink_usec >= usec)
1822 usec = 0;
1823 else
1824 usec -= s->timing_info.sink_usec;
1825 }
1826 }
1827
1828 return usec;
1829 }
1830
1831 #ifdef USE_SMOOTHER_2
calc_bytes(pa_stream * s,bool ignore_transport)1832 static inline uint64_t calc_bytes(pa_stream *s, bool ignore_transport) {
1833 return (uint64_t)(calc_time(s, ignore_transport) * s->sample_spec.rate / PA_USEC_PER_SEC * pa_frame_size(&s->sample_spec));
1834 }
1835 #endif
1836
stream_get_timing_info_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1837 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1838 pa_operation *o = userdata;
1839 struct timeval local, remote, now;
1840 pa_timing_info *i;
1841 bool playing = false;
1842 uint64_t underrun_for = 0, playing_for = 0;
1843
1844 pa_assert(pd);
1845 pa_assert(o);
1846 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1847
1848 if (!o->context || !o->stream)
1849 goto finish;
1850
1851 i = &o->stream->timing_info;
1852
1853 o->stream->timing_info_valid = false;
1854 i->write_index_corrupt = true;
1855 i->read_index_corrupt = true;
1856
1857 if (command != PA_COMMAND_REPLY) {
1858 if (pa_context_handle_error(o->context, command, t, false) < 0)
1859 goto finish;
1860
1861 } else {
1862
1863 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1864 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1865 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1866 pa_tagstruct_get_timeval(t, &local) < 0 ||
1867 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1868 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1869 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1870
1871 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1872 goto finish;
1873 }
1874
1875 if (o->context->version >= 13 &&
1876 o->stream->direction == PA_STREAM_PLAYBACK)
1877 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1878 pa_tagstruct_getu64(t, &playing_for) < 0) {
1879
1880 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1881 goto finish;
1882 }
1883
1884 if (!pa_tagstruct_eof(t)) {
1885 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1886 goto finish;
1887 }
1888 o->stream->timing_info_valid = true;
1889 i->write_index_corrupt = false;
1890 i->read_index_corrupt = false;
1891
1892 i->playing = (int) playing;
1893 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1894
1895 pa_gettimeofday(&now);
1896
1897 /* Calculate timestamps */
1898 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1899 /* local and remote seem to have synchronized clocks */
1900
1901 if (o->stream->direction == PA_STREAM_PLAYBACK)
1902 i->transport_usec = pa_timeval_diff(&remote, &local);
1903 else
1904 i->transport_usec = pa_timeval_diff(&now, &remote);
1905
1906 i->synchronized_clocks = true;
1907 i->timestamp = remote;
1908 } else {
1909 /* clocks are not synchronized, let's estimate latency then */
1910 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1911 i->synchronized_clocks = false;
1912 i->timestamp = local;
1913 pa_timeval_add(&i->timestamp, i->transport_usec);
1914 }
1915
1916 /* Invalidate read and write indexes if necessary */
1917 if (tag < o->stream->read_index_not_before)
1918 i->read_index_corrupt = true;
1919
1920 if (tag < o->stream->write_index_not_before)
1921 i->write_index_corrupt = true;
1922
1923 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1924 /* Write index correction */
1925
1926 int n, j;
1927 uint32_t ctag = tag;
1928
1929 /* Go through the saved correction values and add up the
1930 * total correction.*/
1931 for (n = 0, j = o->stream->current_write_index_correction+1;
1932 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1933 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1934
1935 /* Step over invalid data or out-of-date data */
1936 if (!o->stream->write_index_corrections[j].valid ||
1937 o->stream->write_index_corrections[j].tag < ctag)
1938 continue;
1939
1940 /* Make sure that everything is in order */
1941 ctag = o->stream->write_index_corrections[j].tag+1;
1942
1943 /* Now fix the write index */
1944 if (o->stream->write_index_corrections[j].corrupt) {
1945 /* A corrupting seek was made */
1946 i->write_index_corrupt = true;
1947 } else if (o->stream->write_index_corrections[j].absolute) {
1948 /* An absolute seek was made */
1949 i->write_index = o->stream->write_index_corrections[j].value;
1950 i->write_index_corrupt = false;
1951 } else if (!i->write_index_corrupt) {
1952 /* A relative seek was made */
1953 i->write_index += o->stream->write_index_corrections[j].value;
1954 }
1955 }
1956
1957 /* Clear old correction entries */
1958 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1959 if (!o->stream->write_index_corrections[n].valid)
1960 continue;
1961
1962 if (o->stream->write_index_corrections[n].tag <= tag)
1963 o->stream->write_index_corrections[n].valid = false;
1964 }
1965 }
1966
1967 if (o->stream->direction == PA_STREAM_RECORD) {
1968 /* Read index correction */
1969
1970 if (!i->read_index_corrupt)
1971 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1972 }
1973
1974 /* Update smoother if we're not corked */
1975 if (o->stream->smoother && !o->stream->corked) {
1976 pa_usec_t u, x;
1977
1978 u = x = pa_rtclock_now() - i->transport_usec;
1979
1980 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1981 pa_usec_t su;
1982
1983 /* If we weren't playing then it will take some time
1984 * until the audio will actually come out through the
1985 * speakers. Since we follow that timing here, we need
1986 * to try to fix this up */
1987
1988 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1989
1990 if (su < i->sink_usec)
1991 x += i->sink_usec - su;
1992 }
1993
1994 if (!i->playing)
1995 #ifdef USE_SMOOTHER_2
1996 pa_smoother_2_pause(o->stream->smoother, x);
1997 #else
1998 pa_smoother_pause(o->stream->smoother, x);
1999 #endif
2000
2001 /* Update the smoother */
2002 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
2003 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
2004 #ifdef USE_SMOOTHER_2
2005 pa_smoother_2_put(o->stream->smoother, u, calc_bytes(o->stream, true));
2006 #else
2007 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, true));
2008 #endif
2009
2010 if (i->playing)
2011 #ifdef USE_SMOOTHER_2
2012 pa_smoother_2_resume(o->stream->smoother, x);
2013 #else
2014 pa_smoother_resume(o->stream->smoother, x, true);
2015 #endif
2016 }
2017 }
2018
2019 o->stream->auto_timing_update_requested = false;
2020
2021 if (o->stream->latency_update_callback)
2022 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
2023
2024 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
2025 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2026 cb(o->stream, o->stream->timing_info_valid, o->userdata);
2027 }
2028
2029 finish:
2030
2031 pa_operation_done(o);
2032 pa_operation_unref(o);
2033 }
2034
pa_stream_update_timing_info(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)2035 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2036 uint32_t tag;
2037 pa_operation *o;
2038 pa_tagstruct *t;
2039 struct timeval now;
2040 int cidx = 0;
2041
2042 pa_assert(s);
2043 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2044
2045 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2046 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2047 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2048
2049 if (s->direction == PA_STREAM_PLAYBACK) {
2050 /* Find a place to store the write_index correction data for this entry */
2051 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
2052
2053 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
2054 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
2055 }
2056 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2057
2058 t = pa_tagstruct_command(
2059 s->context,
2060 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
2061 &tag);
2062 pa_tagstruct_putu32(t, s->channel);
2063 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
2064
2065 pa_pstream_send_tagstruct(s->context->pstream, t);
2066 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2067
2068 if (s->direction == PA_STREAM_PLAYBACK) {
2069 /* Fill in initial correction data */
2070
2071 s->current_write_index_correction = cidx;
2072
2073 s->write_index_corrections[cidx].valid = true;
2074 s->write_index_corrections[cidx].absolute = false;
2075 s->write_index_corrections[cidx].corrupt = false;
2076 s->write_index_corrections[cidx].tag = tag;
2077 s->write_index_corrections[cidx].value = 0;
2078 }
2079
2080 return o;
2081 }
2082
pa_stream_disconnect_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)2083 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2084 pa_stream *s = userdata;
2085
2086 pa_assert(pd);
2087 pa_assert(s);
2088 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2089
2090 pa_stream_ref(s);
2091
2092 if (command != PA_COMMAND_REPLY) {
2093 if (pa_context_handle_error(s->context, command, t, false) < 0)
2094 goto finish;
2095
2096 pa_stream_set_state(s, PA_STREAM_FAILED);
2097 goto finish;
2098 } else if (!pa_tagstruct_eof(t)) {
2099 pa_context_fail(s->context, PA_ERR_PROTOCOL);
2100 goto finish;
2101 }
2102
2103 pa_stream_set_state(s, PA_STREAM_TERMINATED);
2104
2105 finish:
2106 pa_stream_unref(s);
2107 }
2108
pa_stream_disconnect(pa_stream * s)2109 int pa_stream_disconnect(pa_stream *s) {
2110 pa_tagstruct *t;
2111 uint32_t tag;
2112
2113 pa_assert(s);
2114 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2115
2116 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2117 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
2118 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
2119
2120 pa_stream_ref(s);
2121
2122 t = pa_tagstruct_command(
2123 s->context,
2124 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
2125 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
2126 &tag);
2127 pa_tagstruct_putu32(t, s->channel);
2128 pa_pstream_send_tagstruct(s->context->pstream, t);
2129 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
2130
2131 pa_stream_unref(s);
2132 return 0;
2133 }
2134
pa_stream_set_read_callback(pa_stream * s,pa_stream_request_cb_t cb,void * userdata)2135 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2136 pa_assert(s);
2137 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2138
2139 if (pa_detect_fork())
2140 return;
2141
2142 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2143 return;
2144
2145 s->read_callback = cb;
2146 s->read_userdata = userdata;
2147 }
2148
pa_stream_set_write_callback(pa_stream * s,pa_stream_request_cb_t cb,void * userdata)2149 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2150 pa_assert(s);
2151 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2152
2153 if (pa_detect_fork())
2154 return;
2155
2156 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2157 return;
2158
2159 s->write_callback = cb;
2160 s->write_userdata = userdata;
2161 }
2162
pa_stream_set_state_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2163 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2164 pa_assert(s);
2165 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2166
2167 if (pa_detect_fork())
2168 return;
2169
2170 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2171 return;
2172
2173 s->state_callback = cb;
2174 s->state_userdata = userdata;
2175 }
2176
pa_stream_set_overflow_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2177 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2178 pa_assert(s);
2179 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2180
2181 if (pa_detect_fork())
2182 return;
2183
2184 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2185 return;
2186
2187 s->overflow_callback = cb;
2188 s->overflow_userdata = userdata;
2189 }
2190
pa_stream_set_underflow_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2191 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2192 pa_assert(s);
2193 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2194
2195 if (pa_detect_fork())
2196 return;
2197
2198 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2199 return;
2200
2201 s->underflow_callback = cb;
2202 s->underflow_userdata = userdata;
2203 }
2204
pa_stream_set_underflow_ohos_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2205 void pa_stream_set_underflow_ohos_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2206 pa_assert(s);
2207 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2208
2209 if (pa_detect_fork())
2210 return;
2211
2212 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2213 return;
2214
2215 s->underflow_ohos_callback = cb;
2216 s->underflow_ohos_userdata = userdata;
2217 }
2218
pa_stream_set_latency_update_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2219 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2220 pa_assert(s);
2221 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2222
2223 if (pa_detect_fork())
2224 return;
2225
2226 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2227 return;
2228
2229 s->latency_update_callback = cb;
2230 s->latency_update_userdata = userdata;
2231 }
2232
pa_stream_set_moved_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2233 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2234 pa_assert(s);
2235 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2236
2237 if (pa_detect_fork())
2238 return;
2239
2240 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2241 return;
2242
2243 s->moved_callback = cb;
2244 s->moved_userdata = userdata;
2245 }
2246
pa_stream_set_suspended_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2247 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2248 pa_assert(s);
2249 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2250
2251 if (pa_detect_fork())
2252 return;
2253
2254 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2255 return;
2256
2257 s->suspended_callback = cb;
2258 s->suspended_userdata = userdata;
2259 }
2260
pa_stream_set_started_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2261 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2262 pa_assert(s);
2263 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2264
2265 if (pa_detect_fork())
2266 return;
2267
2268 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2269 return;
2270
2271 s->started_callback = cb;
2272 s->started_userdata = userdata;
2273 }
2274
pa_stream_set_event_callback(pa_stream * s,pa_stream_event_cb_t cb,void * userdata)2275 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2276 pa_assert(s);
2277 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2278
2279 if (pa_detect_fork())
2280 return;
2281
2282 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2283 return;
2284
2285 s->event_callback = cb;
2286 s->event_userdata = userdata;
2287 }
2288
pa_stream_set_buffer_attr_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2289 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2290 pa_assert(s);
2291 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2292
2293 if (pa_detect_fork())
2294 return;
2295
2296 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2297 return;
2298
2299 s->buffer_attr_callback = cb;
2300 s->buffer_attr_userdata = userdata;
2301 }
2302
pa_stream_simple_ack_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)2303 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2304 pa_operation *o = userdata;
2305 int success = 1;
2306
2307 pa_assert(pd);
2308 pa_assert(o);
2309 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2310
2311 if (!o->context)
2312 goto finish;
2313
2314 if (command != PA_COMMAND_REPLY) {
2315 if (pa_context_handle_error(o->context, command, t, false) < 0)
2316 goto finish;
2317
2318 success = 0;
2319 } else if (!pa_tagstruct_eof(t)) {
2320 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2321 goto finish;
2322 }
2323
2324 if (o->callback) {
2325 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2326 cb(o->stream, success, o->userdata);
2327 }
2328
2329 finish:
2330 pa_operation_done(o);
2331 pa_operation_unref(o);
2332 }
2333
pa_stream_cork(pa_stream * s,int b,pa_stream_success_cb_t cb,void * userdata)2334 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2335 pa_operation *o;
2336 pa_tagstruct *t;
2337 uint32_t tag;
2338
2339 pa_assert(s);
2340 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2341
2342 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2343 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2344 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2345
2346 /* Ask for a timing update before we cork/uncork to get the best
2347 * accuracy for the transport latency suitable for the
2348 * check_smoother_status() call in the started callback */
2349 request_auto_timing_update(s, true);
2350
2351 s->corked = b;
2352
2353 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2354
2355 t = pa_tagstruct_command(
2356 s->context,
2357 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2358 &tag);
2359 pa_tagstruct_putu32(t, s->channel);
2360 pa_tagstruct_put_boolean(t, !!b);
2361 pa_pstream_send_tagstruct(s->context->pstream, t);
2362 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2363
2364 check_smoother_status(s, false, false, false);
2365
2366 /* This might cause the indexes to hang/start again, hence let's
2367 * request a timing update, after the cork/uncork, too */
2368 request_auto_timing_update(s, true);
2369
2370 return o;
2371 }
2372
stream_send_simple_command(pa_stream * s,uint32_t command,pa_stream_success_cb_t cb,void * userdata)2373 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2374 pa_tagstruct *t;
2375 pa_operation *o;
2376 uint32_t tag;
2377
2378 pa_assert(s);
2379 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2380
2381 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2382 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2383
2384 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2385
2386 t = pa_tagstruct_command(s->context, command, &tag);
2387 pa_tagstruct_putu32(t, s->channel);
2388 pa_pstream_send_tagstruct(s->context->pstream, t);
2389 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2390
2391 return o;
2392 }
2393
pa_stream_flush(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)2394 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2395 pa_operation *o;
2396
2397 pa_assert(s);
2398 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2399
2400 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2401 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2402 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2403
2404 /* Ask for a timing update *before* the flush, so that the
2405 * transport usec is as up to date as possible when we get the
2406 * underflow message and update the smoother status*/
2407 request_auto_timing_update(s, true);
2408
2409 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2410 return NULL;
2411
2412 if (s->direction == PA_STREAM_PLAYBACK) {
2413
2414 if (s->write_index_corrections[s->current_write_index_correction].valid)
2415 s->write_index_corrections[s->current_write_index_correction].corrupt = true;
2416
2417 if (s->buffer_attr.prebuf > 0)
2418 check_smoother_status(s, false, false, true);
2419
2420 /* This will change the write index, but leave the
2421 * read index untouched. */
2422 invalidate_indexes(s, false, true);
2423
2424 } else
2425 /* For record streams this has no influence on the write
2426 * index, but the read index might jump. */
2427 invalidate_indexes(s, true, false);
2428
2429 /* Note that we do not update requested_bytes here. This is
2430 * because we cannot really know how data actually was dropped
2431 * from the write index due to this. This 'error' will be applied
2432 * by both client and server and hence we should be fine. */
2433
2434 return o;
2435 }
2436
pa_stream_prebuf(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)2437 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2438 pa_operation *o;
2439
2440 pa_assert(s);
2441 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2442
2443 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2444 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2445 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2446 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2447
2448 /* Ask for a timing update before we cork/uncork to get the best
2449 * accuracy for the transport latency suitable for the
2450 * check_smoother_status() call in the started callback */
2451 request_auto_timing_update(s, true);
2452
2453 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2454 return NULL;
2455
2456 /* This might cause the read index to hang again, hence
2457 * let's request a timing update */
2458 request_auto_timing_update(s, true);
2459
2460 return o;
2461 }
2462
pa_stream_trigger(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)2463 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2464 pa_operation *o;
2465
2466 pa_assert(s);
2467 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2468
2469 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2470 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2471 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2472 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2473
2474 /* Ask for a timing update before we cork/uncork to get the best
2475 * accuracy for the transport latency suitable for the
2476 * check_smoother_status() call in the started callback */
2477 request_auto_timing_update(s, true);
2478
2479 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2480 return NULL;
2481
2482 /* This might cause the read index to start moving again, hence
2483 * let's request a timing update */
2484 request_auto_timing_update(s, true);
2485
2486 return o;
2487 }
2488
pa_stream_set_name(pa_stream * s,const char * name,pa_stream_success_cb_t cb,void * userdata)2489 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2490 pa_operation *o;
2491
2492 pa_assert(s);
2493 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2494 pa_assert(name);
2495
2496 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2497 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2498 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2499
2500 if (s->context->version >= 13) {
2501 pa_proplist *p = pa_proplist_new();
2502
2503 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2504 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2505 pa_proplist_free(p);
2506 } else {
2507 pa_tagstruct *t;
2508 uint32_t tag;
2509
2510 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2511 t = pa_tagstruct_command(
2512 s->context,
2513 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2514 &tag);
2515 pa_tagstruct_putu32(t, s->channel);
2516 pa_tagstruct_puts(t, name);
2517 pa_pstream_send_tagstruct(s->context->pstream, t);
2518 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2519 }
2520
2521 return o;
2522 }
2523
pa_stream_get_time(pa_stream * s,pa_usec_t * r_usec)2524 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2525 pa_usec_t usec;
2526
2527 pa_assert(s);
2528 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2529
2530 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2531 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2532 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2533 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2534 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2535 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2536
2537 if (s->smoother)
2538 #ifdef USE_SMOOTHER_2
2539 usec = pa_smoother_2_get(s->smoother, pa_rtclock_now());
2540 #else
2541 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2542 #endif
2543
2544 else
2545 usec = calc_time(s, false);
2546
2547 /* Make sure the time runs monotonically */
2548 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2549 if (usec < s->previous_time)
2550 usec = s->previous_time;
2551 else
2552 s->previous_time = usec;
2553 }
2554
2555 if (r_usec)
2556 *r_usec = usec;
2557
2558 return 0;
2559 }
2560
time_counter_diff(const pa_stream * s,pa_usec_t a,pa_usec_t b,int * negative)2561 static pa_usec_t time_counter_diff(const pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2562 pa_assert(s);
2563 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2564
2565 if (negative)
2566 *negative = 0;
2567
2568 if (a >= b)
2569 return a-b;
2570 else {
2571 if (negative && s->direction == PA_STREAM_RECORD) {
2572 *negative = 1;
2573 return b-a;
2574 } else
2575 return 0;
2576 }
2577 }
2578
pa_stream_get_latency(pa_stream * s,pa_usec_t * r_usec,int * negative)2579 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2580 pa_usec_t t, c;
2581 int r;
2582 int64_t cindex;
2583
2584 pa_assert(s);
2585 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2586 pa_assert(r_usec);
2587
2588 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2589 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2590 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2591 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2592 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2593 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2594
2595 if ((r = pa_stream_get_time(s, &t)) < 0)
2596 return r;
2597
2598 if (s->direction == PA_STREAM_PLAYBACK)
2599 cindex = s->timing_info.write_index;
2600 else
2601 cindex = s->timing_info.read_index;
2602
2603 if (cindex < 0)
2604 cindex = 0;
2605
2606 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2607
2608 if (s->direction == PA_STREAM_PLAYBACK)
2609 *r_usec = time_counter_diff(s, c, t, negative);
2610 else
2611 *r_usec = time_counter_diff(s, t, c, negative);
2612
2613 return 0;
2614 }
2615
pa_stream_get_timing_info(pa_stream * s)2616 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2617 pa_assert(s);
2618 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2619
2620 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2621 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2622 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2623 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2624
2625 return &s->timing_info;
2626 }
2627
pa_stream_get_sample_spec(pa_stream * s)2628 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2629 pa_assert(s);
2630 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2631
2632 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2633
2634 return &s->sample_spec;
2635 }
2636
pa_stream_get_channel_map(pa_stream * s)2637 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2638 pa_assert(s);
2639 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2640
2641 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2642
2643 return &s->channel_map;
2644 }
2645
pa_stream_get_format_info(const pa_stream * s)2646 const pa_format_info* pa_stream_get_format_info(const pa_stream *s) {
2647 pa_assert(s);
2648 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2649
2650 /* We don't have the format till routing is done */
2651 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2652 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2653
2654 return s->format;
2655 }
pa_stream_get_buffer_attr(pa_stream * s)2656 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2657 pa_assert(s);
2658 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2659
2660 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2661 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2662 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2663
2664 return &s->buffer_attr;
2665 }
2666
stream_set_buffer_attr_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)2667 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2668 pa_operation *o = userdata;
2669 int success = 1;
2670
2671 pa_assert(pd);
2672 pa_assert(o);
2673 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2674
2675 if (!o->context)
2676 goto finish;
2677
2678 if (command != PA_COMMAND_REPLY) {
2679 if (pa_context_handle_error(o->context, command, t, false) < 0)
2680 goto finish;
2681
2682 success = 0;
2683 } else {
2684 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2685 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2686 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2687 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2688 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2689 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2690 goto finish;
2691 }
2692 } else if (o->stream->direction == PA_STREAM_RECORD) {
2693 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2694 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2695 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2696 goto finish;
2697 }
2698 }
2699
2700 if (o->stream->context->version >= 13) {
2701 pa_usec_t usec;
2702
2703 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2704 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2705 goto finish;
2706 }
2707
2708 if (o->stream->direction == PA_STREAM_RECORD)
2709 o->stream->timing_info.configured_source_usec = usec;
2710 else
2711 o->stream->timing_info.configured_sink_usec = usec;
2712 }
2713
2714 if (!pa_tagstruct_eof(t)) {
2715 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2716 goto finish;
2717 }
2718 }
2719
2720 if (o->callback) {
2721 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2722 cb(o->stream, success, o->userdata);
2723 }
2724
2725 finish:
2726 pa_operation_done(o);
2727 pa_operation_unref(o);
2728 }
2729
pa_stream_set_buffer_attr(pa_stream * s,const pa_buffer_attr * attr,pa_stream_success_cb_t cb,void * userdata)2730 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2731 pa_operation *o;
2732 pa_tagstruct *t;
2733 uint32_t tag;
2734 pa_buffer_attr copy;
2735
2736 pa_assert(s);
2737 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2738 pa_assert(attr);
2739
2740 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2741 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2742 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2743 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2744
2745 /* Ask for a timing update before we cork/uncork to get the best
2746 * accuracy for the transport latency suitable for the
2747 * check_smoother_status() call in the started callback */
2748 request_auto_timing_update(s, true);
2749
2750 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2751
2752 t = pa_tagstruct_command(
2753 s->context,
2754 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2755 &tag);
2756 pa_tagstruct_putu32(t, s->channel);
2757
2758 copy = *attr;
2759 patch_buffer_attr(s, ©, NULL);
2760 attr = ©
2761
2762 pa_tagstruct_putu32(t, attr->maxlength);
2763
2764 if (s->direction == PA_STREAM_PLAYBACK)
2765 pa_tagstruct_put(
2766 t,
2767 PA_TAG_U32, attr->tlength,
2768 PA_TAG_U32, attr->prebuf,
2769 PA_TAG_U32, attr->minreq,
2770 PA_TAG_INVALID);
2771 else
2772 pa_tagstruct_putu32(t, attr->fragsize);
2773
2774 if (s->context->version >= 13)
2775 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2776
2777 if (s->context->version >= 14)
2778 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2779
2780 pa_pstream_send_tagstruct(s->context->pstream, t);
2781 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2782
2783 /* This might cause changes in the read/write index, hence let's
2784 * request a timing update */
2785 request_auto_timing_update(s, true);
2786
2787 return o;
2788 }
2789
pa_stream_get_device_index(const pa_stream * s)2790 uint32_t pa_stream_get_device_index(const pa_stream *s) {
2791 pa_assert(s);
2792 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2793
2794 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2795 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2796 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2797 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2798 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2799
2800 return s->device_index;
2801 }
2802
pa_stream_get_device_name(const pa_stream * s)2803 const char *pa_stream_get_device_name(const pa_stream *s) {
2804 pa_assert(s);
2805 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2806
2807 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2808 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2809 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2810 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2811 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2812
2813 return s->device_name;
2814 }
2815
pa_stream_is_suspended(const pa_stream * s)2816 int pa_stream_is_suspended(const pa_stream *s) {
2817 pa_assert(s);
2818 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2819
2820 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2821 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2822 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2823 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2824
2825 return s->suspended;
2826 }
2827
pa_stream_is_corked(const pa_stream * s)2828 int pa_stream_is_corked(const pa_stream *s) {
2829 pa_assert(s);
2830 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2831
2832 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2833 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2834 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2835
2836 return s->corked;
2837 }
2838
stream_update_sample_rate_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)2839 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2840 pa_operation *o = userdata;
2841 int success = 1;
2842
2843 pa_assert(pd);
2844 pa_assert(o);
2845 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2846
2847 if (!o->context)
2848 goto finish;
2849
2850 if (command != PA_COMMAND_REPLY) {
2851 if (pa_context_handle_error(o->context, command, t, false) < 0)
2852 goto finish;
2853
2854 success = 0;
2855 } else {
2856
2857 if (!pa_tagstruct_eof(t)) {
2858 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2859 goto finish;
2860 }
2861 }
2862
2863 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2864 #ifdef USE_SMOOTHER_2
2865 if (o->stream->smoother)
2866 pa_smoother_2_set_rate(o->stream->smoother, pa_rtclock_now(), o->stream->sample_spec.rate);
2867 #endif
2868 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2869
2870 if (o->callback) {
2871 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2872 cb(o->stream, success, o->userdata);
2873 }
2874
2875 finish:
2876 pa_operation_done(o);
2877 pa_operation_unref(o);
2878 }
2879
pa_stream_update_sample_rate(pa_stream * s,uint32_t rate,pa_stream_success_cb_t cb,void * userdata)2880 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2881 pa_operation *o;
2882 pa_tagstruct *t;
2883 uint32_t tag;
2884
2885 pa_assert(s);
2886 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2887
2888 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2889 PA_CHECK_VALIDITY_RETURN_NULL(s->context, pa_sample_rate_valid(rate), PA_ERR_INVALID);
2890 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2891 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2892 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2893 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2894
2895 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2896 o->private = PA_UINT_TO_PTR(rate);
2897
2898 t = pa_tagstruct_command(
2899 s->context,
2900 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2901 &tag);
2902 pa_tagstruct_putu32(t, s->channel);
2903 pa_tagstruct_putu32(t, rate);
2904
2905 pa_pstream_send_tagstruct(s->context->pstream, t);
2906 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2907
2908 return o;
2909 }
2910
pa_stream_proplist_update(pa_stream * s,pa_update_mode_t mode,pa_proplist * p,pa_stream_success_cb_t cb,void * userdata)2911 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2912 pa_operation *o;
2913 pa_tagstruct *t;
2914 uint32_t tag;
2915
2916 pa_assert(s);
2917 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2918
2919 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2920 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2921 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2922 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2923 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2924
2925 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2926
2927 t = pa_tagstruct_command(
2928 s->context,
2929 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2930 &tag);
2931 pa_tagstruct_putu32(t, s->channel);
2932 pa_tagstruct_putu32(t, (uint32_t) mode);
2933 pa_tagstruct_put_proplist(t, p);
2934
2935 pa_pstream_send_tagstruct(s->context->pstream, t);
2936 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2937
2938 /* Please note that we don't update s->proplist here, because we
2939 * don't export that field */
2940
2941 return o;
2942 }
2943
pa_stream_proplist_remove(pa_stream * s,const char * const keys[],pa_stream_success_cb_t cb,void * userdata)2944 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2945 pa_operation *o;
2946 pa_tagstruct *t;
2947 uint32_t tag;
2948 const char * const*k;
2949
2950 pa_assert(s);
2951 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2952
2953 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2954 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2955 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2956 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2957 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2958
2959 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2960
2961 t = pa_tagstruct_command(
2962 s->context,
2963 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2964 &tag);
2965 pa_tagstruct_putu32(t, s->channel);
2966
2967 for (k = keys; *k; k++)
2968 pa_tagstruct_puts(t, *k);
2969
2970 pa_tagstruct_puts(t, NULL);
2971
2972 pa_pstream_send_tagstruct(s->context->pstream, t);
2973 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2974
2975 /* Please note that we don't update s->proplist here, because we
2976 * don't export that field */
2977
2978 return o;
2979 }
2980
pa_stream_set_monitor_stream(pa_stream * s,uint32_t sink_input_idx)2981 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2982 pa_assert(s);
2983 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2984
2985 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2986 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2987 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2988 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2989
2990 s->direct_on_input = sink_input_idx;
2991
2992 return 0;
2993 }
2994
pa_stream_get_monitor_stream(const pa_stream * s)2995 uint32_t pa_stream_get_monitor_stream(const pa_stream *s) {
2996 pa_assert(s);
2997 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2998
2999 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
3000 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
3001 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
3002
3003 return s->direct_on_input;
3004 }
3005