1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24
25 #include <string.h>
26 #include <stdio.h>
27 #include <string.h>
28
29 #include <pulse/def.h>
30 #include <pulse/timeval.h>
31 #include <pulse/rtclock.h>
32 #include <pulse/xmalloc.h>
33 #include <pulse/fork-detect.h>
34
35 #include <pulsecore/pstream-util.h>
36 #include <pulsecore/sample-util.h>
37 #include <pulsecore/log.h>
38 #include <pulsecore/hashmap.h>
39 #include <pulsecore/macro.h>
40 #include <pulsecore/core-rtclock.h>
41 #include <pulsecore/core-util.h>
42
43 #include "internal.h"
44 #include "stream.h"
45
46 /* #define STREAM_DEBUG */
47
48 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
49 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
50
51 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
52 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
53 #define SMOOTHER_MIN_HISTORY (4)
54
pa_stream_new(pa_context * c,const char * name,const pa_sample_spec * ss,const pa_channel_map * map)55 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
56 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
57 }
58
reset_callbacks(pa_stream * s)59 static void reset_callbacks(pa_stream *s) {
60 s->read_callback = NULL;
61 s->read_userdata = NULL;
62 s->write_callback = NULL;
63 s->write_userdata = NULL;
64 s->state_callback = NULL;
65 s->state_userdata = NULL;
66 s->overflow_callback = NULL;
67 s->overflow_userdata = NULL;
68 s->underflow_callback = NULL;
69 s->underflow_userdata = NULL;
70 s->latency_update_callback = NULL;
71 s->latency_update_userdata = NULL;
72 s->moved_callback = NULL;
73 s->moved_userdata = NULL;
74 s->suspended_callback = NULL;
75 s->suspended_userdata = NULL;
76 s->started_callback = NULL;
77 s->started_userdata = NULL;
78 s->event_callback = NULL;
79 s->event_userdata = NULL;
80 s->buffer_attr_callback = NULL;
81 s->buffer_attr_userdata = NULL;
82 }
83
pa_stream_new_with_proplist_internal(pa_context * c,const char * name,const pa_sample_spec * ss,const pa_channel_map * map,pa_format_info * const * formats,unsigned int n_formats,pa_proplist * p)84 static pa_stream *pa_stream_new_with_proplist_internal(
85 pa_context *c,
86 const char *name,
87 const pa_sample_spec *ss,
88 const pa_channel_map *map,
89 pa_format_info * const *formats,
90 unsigned int n_formats,
91 pa_proplist *p) {
92
93 pa_stream *s;
94 unsigned int i;
95
96 pa_assert(c);
97 pa_assert(PA_REFCNT_VALUE(c) >= 1);
98 pa_assert((ss == NULL && map == NULL) || (formats == NULL && n_formats == 0));
99 pa_assert(n_formats < PA_MAX_FORMATS);
100
101 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
102 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
103
104 s = pa_xnew(pa_stream, 1);
105 PA_REFCNT_INIT(s);
106 s->context = c;
107 s->mainloop = c->mainloop;
108
109 s->direction = PA_STREAM_NODIRECTION;
110 s->state = PA_STREAM_UNCONNECTED;
111 s->flags = 0;
112
113 if (ss)
114 s->sample_spec = *ss;
115 else
116 pa_sample_spec_init(&s->sample_spec);
117
118 if (map)
119 s->channel_map = *map;
120 else
121 pa_channel_map_init(&s->channel_map);
122
123 s->n_formats = 0;
124 if (formats) {
125 s->n_formats = n_formats;
126 for (i = 0; i < n_formats; i++)
127 s->req_formats[i] = pa_format_info_copy(formats[i]);
128 }
129
130 /* We'll get the final negotiated format after connecting */
131 s->format = NULL;
132
133 s->direct_on_input = PA_INVALID_INDEX;
134
135 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
136 if (name)
137 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
138
139 s->channel = 0;
140 s->channel_valid = false;
141 s->syncid = c->csyncid++;
142 s->stream_index = PA_INVALID_INDEX;
143
144 s->requested_bytes = 0;
145 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
146
147 /* We initialize the target length here, so that if the user
148 * passes no explicit buffering metrics the default is similar to
149 * what older PA versions provided. */
150
151 s->buffer_attr.maxlength = (uint32_t) -1;
152 if (ss)
153 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
154 else {
155 /* FIXME: We assume a worst-case compressed format corresponding to
156 * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
157 pa_sample_spec tmp_ss = {
158 .format = PA_SAMPLE_S16NE,
159 .rate = 48000,
160 .channels = 2,
161 };
162 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
163 }
164 s->buffer_attr.minreq = (uint32_t) -1;
165 s->buffer_attr.prebuf = (uint32_t) -1;
166 s->buffer_attr.fragsize = (uint32_t) -1;
167
168 s->device_index = PA_INVALID_INDEX;
169 s->device_name = NULL;
170 s->suspended = false;
171 s->corked = false;
172
173 s->write_memblock = NULL;
174 s->write_data = NULL;
175
176 pa_memchunk_reset(&s->peek_memchunk);
177 s->peek_data = NULL;
178 s->record_memblockq = NULL;
179
180 memset(&s->timing_info, 0, sizeof(s->timing_info));
181 s->timing_info_valid = false;
182
183 s->previous_time = 0;
184 s->latest_underrun_at_index = -1;
185
186 s->read_index_not_before = 0;
187 s->write_index_not_before = 0;
188 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
189 s->write_index_corrections[i].valid = 0;
190 s->current_write_index_correction = 0;
191
192 s->auto_timing_update_event = NULL;
193 s->auto_timing_update_requested = false;
194 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
195
196 reset_callbacks(s);
197
198 s->smoother = NULL;
199
200 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
201 PA_LLIST_PREPEND(pa_stream, c->streams, s);
202 pa_stream_ref(s);
203
204 return s;
205 }
206
pa_stream_new_with_proplist(pa_context * c,const char * name,const pa_sample_spec * ss,const pa_channel_map * map,pa_proplist * p)207 pa_stream *pa_stream_new_with_proplist(
208 pa_context *c,
209 const char *name,
210 const pa_sample_spec *ss,
211 const pa_channel_map *map,
212 pa_proplist *p) {
213
214 pa_channel_map tmap;
215
216 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
217 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
218 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
219 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
220 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
221
222 if (!map)
223 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
224
225 return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, 0, p);
226 }
227
pa_stream_new_extended(pa_context * c,const char * name,pa_format_info * const * formats,unsigned int n_formats,pa_proplist * p)228 pa_stream *pa_stream_new_extended(
229 pa_context *c,
230 const char *name,
231 pa_format_info * const *formats,
232 unsigned int n_formats,
233 pa_proplist *p) {
234
235 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
236
237 return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, n_formats, p);
238 }
239
stream_unlink(pa_stream * s)240 static void stream_unlink(pa_stream *s) {
241 pa_operation *o, *n;
242 pa_assert(s);
243
244 if (!s->context)
245 return;
246
247 /* Detach from context */
248
249 /* Unref all operation objects that point to us */
250 for (o = s->context->operations; o; o = n) {
251 n = o->next;
252
253 if (o->stream == s)
254 pa_operation_cancel(o);
255 }
256
257 /* Drop all outstanding replies for this stream */
258 if (s->context->pdispatch)
259 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
260
261 if (s->channel_valid) {
262 pa_hashmap_remove((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel));
263 s->channel = 0;
264 s->channel_valid = false;
265 }
266
267 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
268 pa_stream_unref(s);
269
270 s->context = NULL;
271
272 if (s->auto_timing_update_event) {
273 pa_assert(s->mainloop);
274 s->mainloop->time_free(s->auto_timing_update_event);
275 }
276
277 reset_callbacks(s);
278 }
279
stream_free(pa_stream * s)280 static void stream_free(pa_stream *s) {
281 unsigned int i;
282
283 pa_assert(s);
284
285 stream_unlink(s);
286
287 if (s->write_memblock) {
288 if (s->write_data)
289 pa_memblock_release(s->write_memblock);
290 pa_memblock_unref(s->write_memblock);
291 }
292
293 if (s->peek_memchunk.memblock) {
294 if (s->peek_data)
295 pa_memblock_release(s->peek_memchunk.memblock);
296 pa_memblock_unref(s->peek_memchunk.memblock);
297 }
298
299 if (s->record_memblockq)
300 pa_memblockq_free(s->record_memblockq);
301
302 if (s->proplist)
303 pa_proplist_free(s->proplist);
304
305 if (s->smoother)
306 pa_smoother_free(s->smoother);
307
308 for (i = 0; i < s->n_formats; i++)
309 pa_format_info_free(s->req_formats[i]);
310
311 if (s->format)
312 pa_format_info_free(s->format);
313
314 pa_xfree(s->device_name);
315 pa_xfree(s);
316 }
317
pa_stream_unref(pa_stream * s)318 void pa_stream_unref(pa_stream *s) {
319 pa_assert(s);
320 pa_assert(PA_REFCNT_VALUE(s) >= 1);
321
322 if (PA_REFCNT_DEC(s) <= 0)
323 stream_free(s);
324 }
325
pa_stream_ref(pa_stream * s)326 pa_stream* pa_stream_ref(pa_stream *s) {
327 pa_assert(s);
328 pa_assert(PA_REFCNT_VALUE(s) >= 1);
329
330 PA_REFCNT_INC(s);
331 return s;
332 }
333
pa_stream_get_state(const pa_stream * s)334 pa_stream_state_t pa_stream_get_state(const pa_stream *s) {
335 pa_assert(s);
336 pa_assert(PA_REFCNT_VALUE(s) >= 1);
337
338 return s->state;
339 }
340
pa_stream_get_context(const pa_stream * s)341 pa_context* pa_stream_get_context(const pa_stream *s) {
342 pa_assert(s);
343 pa_assert(PA_REFCNT_VALUE(s) >= 1);
344
345 return s->context;
346 }
347
pa_stream_get_index(const pa_stream * s)348 uint32_t pa_stream_get_index(const pa_stream *s) {
349 pa_assert(s);
350 pa_assert(PA_REFCNT_VALUE(s) >= 1);
351
352 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
353 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
354
355 return s->stream_index;
356 }
357
pa_stream_set_state(pa_stream * s,pa_stream_state_t st)358 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
359 pa_assert(s);
360 pa_assert(PA_REFCNT_VALUE(s) >= 1);
361
362 if (s->state == st)
363 return;
364
365 pa_stream_ref(s);
366
367 s->state = st;
368
369 if (s->state_callback)
370 s->state_callback(s, s->state_userdata);
371
372 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
373 stream_unlink(s);
374
375 pa_stream_unref(s);
376 }
377
request_auto_timing_update(pa_stream * s,bool force)378 static void request_auto_timing_update(pa_stream *s, bool force) {
379 pa_assert(s);
380 pa_assert(PA_REFCNT_VALUE(s) >= 1);
381
382 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
383 return;
384
385 if (s->state == PA_STREAM_READY &&
386 (force || !s->auto_timing_update_requested)) {
387 pa_operation *o;
388
389 #ifdef STREAM_DEBUG
390 pa_log_debug("Automatically requesting new timing data");
391 #endif
392
393 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
394 pa_operation_unref(o);
395 s->auto_timing_update_requested = true;
396 }
397 }
398
399 if (s->auto_timing_update_event) {
400 if (s->suspended && !force) {
401 pa_assert(s->mainloop);
402 s->mainloop->time_free(s->auto_timing_update_event);
403 s->auto_timing_update_event = NULL;
404 } else {
405 if (force)
406 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
407
408 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
409
410 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
411 }
412 }
413 }
414
pa_command_stream_killed(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)415 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
416 pa_context *c = userdata;
417 pa_stream *s;
418 uint32_t channel;
419
420 pa_assert(pd);
421 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
422 pa_assert(t);
423 pa_assert(c);
424 pa_assert(PA_REFCNT_VALUE(c) >= 1);
425
426 pa_context_ref(c);
427
428 if (pa_tagstruct_getu32(t, &channel) < 0 ||
429 !pa_tagstruct_eof(t)) {
430 pa_context_fail(c, PA_ERR_PROTOCOL);
431 goto finish;
432 }
433
434 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
435 goto finish;
436
437 if (s->state != PA_STREAM_READY)
438 goto finish;
439
440 pa_context_set_error(c, PA_ERR_KILLED);
441 pa_stream_set_state(s, PA_STREAM_FAILED);
442
443 finish:
444 pa_context_unref(c);
445 }
446
check_smoother_status(pa_stream * s,bool aposteriori,bool force_start,bool force_stop)447 static void check_smoother_status(pa_stream *s, bool aposteriori, bool force_start, bool force_stop) {
448 pa_usec_t x;
449
450 pa_assert(s);
451 pa_assert(!force_start || !force_stop);
452
453 if (!s->smoother)
454 return;
455
456 x = pa_rtclock_now();
457
458 if (s->timing_info_valid) {
459 if (aposteriori)
460 x -= s->timing_info.transport_usec;
461 else
462 x += s->timing_info.transport_usec;
463 }
464
465 if (s->suspended || s->corked || force_stop)
466 pa_smoother_pause(s->smoother, x);
467 else if (force_start || s->buffer_attr.prebuf == 0) {
468
469 if (!s->timing_info_valid &&
470 !aposteriori &&
471 !force_start &&
472 !force_stop &&
473 s->context->version >= 13) {
474
475 /* If the server supports STARTED events we take them as
476 * indications when audio really starts/stops playing, if
477 * we don't have any timing info yet -- instead of trying
478 * to be smart and guessing the server time. Otherwise the
479 * unknown transport delay adds too much noise to our time
480 * calculations. */
481
482 return;
483 }
484
485 pa_smoother_resume(s->smoother, x, true);
486 }
487
488 /* Please note that we have no idea if playback actually started
489 * if prebuf is non-zero! */
490 }
491
492 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata);
493
pa_command_stream_moved(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)494 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
495 pa_context *c = userdata;
496 pa_stream *s;
497 uint32_t channel;
498 const char *dn;
499 bool suspended;
500 uint32_t di;
501 pa_usec_t usec = 0;
502 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
503
504 pa_assert(pd);
505 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
506 pa_assert(t);
507 pa_assert(c);
508 pa_assert(PA_REFCNT_VALUE(c) >= 1);
509
510 pa_context_ref(c);
511
512 if (c->version < 12) {
513 pa_context_fail(c, PA_ERR_PROTOCOL);
514 goto finish;
515 }
516
517 if (pa_tagstruct_getu32(t, &channel) < 0 ||
518 pa_tagstruct_getu32(t, &di) < 0 ||
519 pa_tagstruct_gets(t, &dn) < 0 ||
520 pa_tagstruct_get_boolean(t, &suspended) < 0) {
521 pa_context_fail(c, PA_ERR_PROTOCOL);
522 goto finish;
523 }
524
525 if (c->version >= 13) {
526
527 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
528 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
529 pa_tagstruct_getu32(t, &fragsize) < 0 ||
530 pa_tagstruct_get_usec(t, &usec) < 0) {
531 pa_context_fail(c, PA_ERR_PROTOCOL);
532 goto finish;
533 }
534 } else {
535 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
536 pa_tagstruct_getu32(t, &tlength) < 0 ||
537 pa_tagstruct_getu32(t, &prebuf) < 0 ||
538 pa_tagstruct_getu32(t, &minreq) < 0 ||
539 pa_tagstruct_get_usec(t, &usec) < 0) {
540 pa_context_fail(c, PA_ERR_PROTOCOL);
541 goto finish;
542 }
543 }
544 }
545
546 if (!pa_tagstruct_eof(t)) {
547 pa_context_fail(c, PA_ERR_PROTOCOL);
548 goto finish;
549 }
550
551 if (!dn || di == PA_INVALID_INDEX) {
552 pa_context_fail(c, PA_ERR_PROTOCOL);
553 goto finish;
554 }
555
556 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
557 goto finish;
558
559 if (s->state != PA_STREAM_READY)
560 goto finish;
561
562 if (c->version >= 13) {
563 if (s->direction == PA_STREAM_RECORD)
564 s->timing_info.configured_source_usec = usec;
565 else
566 s->timing_info.configured_sink_usec = usec;
567
568 s->buffer_attr.maxlength = maxlength;
569 s->buffer_attr.fragsize = fragsize;
570 s->buffer_attr.tlength = tlength;
571 s->buffer_attr.prebuf = prebuf;
572 s->buffer_attr.minreq = minreq;
573 }
574
575 pa_xfree(s->device_name);
576 s->device_name = pa_xstrdup(dn);
577 s->device_index = di;
578
579 s->suspended = suspended;
580
581 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
582 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
583 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
584 request_auto_timing_update(s, true);
585 }
586
587 check_smoother_status(s, true, false, false);
588 request_auto_timing_update(s, true);
589
590 if (s->moved_callback)
591 s->moved_callback(s, s->moved_userdata);
592
593 finish:
594 pa_context_unref(c);
595 }
596
pa_command_stream_buffer_attr(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)597 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
598 pa_context *c = userdata;
599 pa_stream *s;
600 uint32_t channel;
601 pa_usec_t usec = 0;
602 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
603
604 pa_assert(pd);
605 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
606 pa_assert(t);
607 pa_assert(c);
608 pa_assert(PA_REFCNT_VALUE(c) >= 1);
609
610 pa_context_ref(c);
611
612 if (c->version < 15) {
613 pa_context_fail(c, PA_ERR_PROTOCOL);
614 goto finish;
615 }
616
617 if (pa_tagstruct_getu32(t, &channel) < 0) {
618 pa_context_fail(c, PA_ERR_PROTOCOL);
619 goto finish;
620 }
621
622 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
623 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
624 pa_tagstruct_getu32(t, &fragsize) < 0 ||
625 pa_tagstruct_get_usec(t, &usec) < 0) {
626 pa_context_fail(c, PA_ERR_PROTOCOL);
627 goto finish;
628 }
629 } else {
630 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
631 pa_tagstruct_getu32(t, &tlength) < 0 ||
632 pa_tagstruct_getu32(t, &prebuf) < 0 ||
633 pa_tagstruct_getu32(t, &minreq) < 0 ||
634 pa_tagstruct_get_usec(t, &usec) < 0) {
635 pa_context_fail(c, PA_ERR_PROTOCOL);
636 goto finish;
637 }
638 }
639
640 if (!pa_tagstruct_eof(t)) {
641 pa_context_fail(c, PA_ERR_PROTOCOL);
642 goto finish;
643 }
644
645 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
646 goto finish;
647
648 if (s->state != PA_STREAM_READY)
649 goto finish;
650
651 if (s->direction == PA_STREAM_RECORD)
652 s->timing_info.configured_source_usec = usec;
653 else
654 s->timing_info.configured_sink_usec = usec;
655
656 s->buffer_attr.maxlength = maxlength;
657 s->buffer_attr.fragsize = fragsize;
658 s->buffer_attr.tlength = tlength;
659 s->buffer_attr.prebuf = prebuf;
660 s->buffer_attr.minreq = minreq;
661
662 request_auto_timing_update(s, true);
663
664 if (s->buffer_attr_callback)
665 s->buffer_attr_callback(s, s->buffer_attr_userdata);
666
667 finish:
668 pa_context_unref(c);
669 }
670
pa_command_stream_suspended(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)671 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
672 pa_context *c = userdata;
673 pa_stream *s;
674 uint32_t channel;
675 bool suspended;
676
677 pa_assert(pd);
678 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
679 pa_assert(t);
680 pa_assert(c);
681 pa_assert(PA_REFCNT_VALUE(c) >= 1);
682
683 pa_context_ref(c);
684
685 if (c->version < 12) {
686 pa_context_fail(c, PA_ERR_PROTOCOL);
687 goto finish;
688 }
689
690 if (pa_tagstruct_getu32(t, &channel) < 0 ||
691 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
692 !pa_tagstruct_eof(t)) {
693 pa_context_fail(c, PA_ERR_PROTOCOL);
694 goto finish;
695 }
696
697 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
698 goto finish;
699
700 if (s->state != PA_STREAM_READY)
701 goto finish;
702
703 s->suspended = suspended;
704
705 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
706 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
707 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
708 request_auto_timing_update(s, true);
709 }
710
711 check_smoother_status(s, true, false, false);
712 request_auto_timing_update(s, true);
713
714 if (s->suspended_callback)
715 s->suspended_callback(s, s->suspended_userdata);
716
717 finish:
718 pa_context_unref(c);
719 }
720
pa_command_stream_started(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)721 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
722 pa_context *c = userdata;
723 pa_stream *s;
724 uint32_t channel;
725
726 pa_assert(pd);
727 pa_assert(command == PA_COMMAND_STARTED);
728 pa_assert(t);
729 pa_assert(c);
730 pa_assert(PA_REFCNT_VALUE(c) >= 1);
731
732 pa_context_ref(c);
733
734 if (c->version < 13) {
735 pa_context_fail(c, PA_ERR_PROTOCOL);
736 goto finish;
737 }
738
739 if (pa_tagstruct_getu32(t, &channel) < 0 ||
740 !pa_tagstruct_eof(t)) {
741 pa_context_fail(c, PA_ERR_PROTOCOL);
742 goto finish;
743 }
744
745 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
746 goto finish;
747
748 if (s->state != PA_STREAM_READY)
749 goto finish;
750
751 check_smoother_status(s, true, true, false);
752 request_auto_timing_update(s, true);
753
754 if (s->started_callback)
755 s->started_callback(s, s->started_userdata);
756
757 finish:
758 pa_context_unref(c);
759 }
760
pa_command_stream_event(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)761 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
762 pa_context *c = userdata;
763 pa_stream *s;
764 uint32_t channel;
765 pa_proplist *pl = NULL;
766 const char *event;
767
768 pa_assert(pd);
769 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
770 pa_assert(t);
771 pa_assert(c);
772 pa_assert(PA_REFCNT_VALUE(c) >= 1);
773
774 pa_context_ref(c);
775
776 if (c->version < 15) {
777 pa_context_fail(c, PA_ERR_PROTOCOL);
778 goto finish;
779 }
780
781 pl = pa_proplist_new();
782
783 if (pa_tagstruct_getu32(t, &channel) < 0 ||
784 pa_tagstruct_gets(t, &event) < 0 ||
785 pa_tagstruct_get_proplist(t, pl) < 0 ||
786 !pa_tagstruct_eof(t) || !event) {
787 pa_context_fail(c, PA_ERR_PROTOCOL);
788 goto finish;
789 }
790
791 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
792 goto finish;
793
794 if (s->state != PA_STREAM_READY)
795 goto finish;
796
797 if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) {
798 /* Let client know what the running time was when the stream had to be killed */
799 pa_usec_t stream_time;
800 if (pa_stream_get_time(s, &stream_time) == 0)
801 pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) stream_time);
802 }
803
804 if (s->event_callback)
805 s->event_callback(s, event, pl, s->event_userdata);
806
807 finish:
808 pa_context_unref(c);
809
810 if (pl)
811 pa_proplist_free(pl);
812 }
813
pa_command_request(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)814 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
815 pa_stream *s;
816 pa_context *c = userdata;
817 uint32_t bytes, channel;
818
819 pa_assert(pd);
820 pa_assert(command == PA_COMMAND_REQUEST);
821 pa_assert(t);
822 pa_assert(c);
823 pa_assert(PA_REFCNT_VALUE(c) >= 1);
824
825 pa_context_ref(c);
826
827 if (pa_tagstruct_getu32(t, &channel) < 0 ||
828 pa_tagstruct_getu32(t, &bytes) < 0 ||
829 !pa_tagstruct_eof(t)) {
830 pa_context_fail(c, PA_ERR_PROTOCOL);
831 goto finish;
832 }
833
834 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
835 goto finish;
836
837 if (s->state != PA_STREAM_READY)
838 goto finish;
839
840 s->requested_bytes += bytes;
841
842 #ifdef STREAM_DEBUG
843 pa_log_debug("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes);
844 #endif
845
846 if (s->requested_bytes > 0 && s->write_callback)
847 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
848
849 finish:
850 pa_context_unref(c);
851 }
852
pa_stream_get_underflow_index(const pa_stream * p)853 int64_t pa_stream_get_underflow_index(const pa_stream *p) {
854 pa_assert(p);
855 return p->latest_underrun_at_index;
856 }
857
pa_command_overflow_or_underflow(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)858 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
859 pa_stream *s;
860 pa_context *c = userdata;
861 uint32_t channel;
862 int64_t offset = -1;
863
864 pa_assert(pd);
865 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
866 pa_assert(t);
867 pa_assert(c);
868 pa_assert(PA_REFCNT_VALUE(c) >= 1);
869
870 pa_context_ref(c);
871
872 if (pa_tagstruct_getu32(t, &channel) < 0) {
873 pa_context_fail(c, PA_ERR_PROTOCOL);
874 goto finish;
875 }
876
877 if (c->version >= 23 && command == PA_COMMAND_UNDERFLOW) {
878 if (pa_tagstruct_gets64(t, &offset) < 0) {
879 pa_context_fail(c, PA_ERR_PROTOCOL);
880 goto finish;
881 }
882 }
883
884 if (!pa_tagstruct_eof(t)) {
885 pa_context_fail(c, PA_ERR_PROTOCOL);
886 goto finish;
887 }
888
889 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
890 goto finish;
891
892 if (s->state != PA_STREAM_READY)
893 goto finish;
894
895 if (offset != -1)
896 s->latest_underrun_at_index = offset;
897
898 if (s->buffer_attr.prebuf > 0)
899 check_smoother_status(s, true, false, true);
900
901 request_auto_timing_update(s, true);
902
903 if (command == PA_COMMAND_OVERFLOW) {
904 if (s->overflow_callback)
905 s->overflow_callback(s, s->overflow_userdata);
906 } else if (command == PA_COMMAND_UNDERFLOW) {
907 if (s->underflow_callback)
908 s->underflow_callback(s, s->underflow_userdata);
909 }
910
911 finish:
912 pa_context_unref(c);
913 }
914
invalidate_indexes(pa_stream * s,bool r,bool w)915 static void invalidate_indexes(pa_stream *s, bool r, bool w) {
916 pa_assert(s);
917 pa_assert(PA_REFCNT_VALUE(s) >= 1);
918
919 #ifdef STREAM_DEBUG
920 pa_log_debug("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag);
921 #endif
922
923 if (s->state != PA_STREAM_READY)
924 return;
925
926 if (w) {
927 s->write_index_not_before = s->context->ctag;
928
929 if (s->timing_info_valid)
930 s->timing_info.write_index_corrupt = true;
931
932 #ifdef STREAM_DEBUG
933 pa_log_debug("write_index invalidated");
934 #endif
935 }
936
937 if (r) {
938 s->read_index_not_before = s->context->ctag;
939
940 if (s->timing_info_valid)
941 s->timing_info.read_index_corrupt = true;
942
943 #ifdef STREAM_DEBUG
944 pa_log_debug("read_index invalidated");
945 #endif
946 }
947
948 request_auto_timing_update(s, true);
949 }
950
auto_timing_update_callback(pa_mainloop_api * m,pa_time_event * e,const struct timeval * t,void * userdata)951 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
952 pa_stream *s = userdata;
953
954 pa_assert(s);
955 pa_assert(PA_REFCNT_VALUE(s) >= 1);
956
957 pa_stream_ref(s);
958 request_auto_timing_update(s, false);
959 pa_stream_unref(s);
960 }
961
create_stream_complete(pa_stream * s)962 static void create_stream_complete(pa_stream *s) {
963 pa_assert(s);
964 pa_assert(PA_REFCNT_VALUE(s) >= 1);
965 pa_assert(s->state == PA_STREAM_CREATING);
966
967 pa_stream_set_state(s, PA_STREAM_READY);
968
969 if (s->requested_bytes > 0 && s->write_callback)
970 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
971
972 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
973 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
974 pa_assert(!s->auto_timing_update_event);
975 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
976
977 request_auto_timing_update(s, true);
978 }
979
980 check_smoother_status(s, true, false, false);
981 }
982
patch_buffer_attr(pa_stream * s,pa_buffer_attr * attr,pa_stream_flags_t * flags)983 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
984 const char *e;
985
986 pa_assert(s);
987 pa_assert(attr);
988
989 if ((e = getenv("PULSE_LATENCY_MSEC"))) {
990 uint32_t ms;
991 pa_sample_spec ss;
992
993 pa_sample_spec_init(&ss);
994
995 if (pa_sample_spec_valid(&s->sample_spec))
996 ss = s->sample_spec;
997 else if (s->n_formats == 1)
998 pa_format_info_to_sample_spec(s->req_formats[0], &ss, NULL);
999
1000 if (pa_atou(e, &ms) < 0 || ms <= 0)
1001 pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
1002 else if (!pa_sample_spec_valid(&s->sample_spec))
1003 pa_log_debug("Ignoring $PULSE_LATENCY_MSEC: %s (invalid sample spec)", e);
1004 else {
1005 attr->maxlength = (uint32_t) -1;
1006 attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &ss);
1007 attr->minreq = (uint32_t) -1;
1008 attr->prebuf = (uint32_t) -1;
1009 attr->fragsize = attr->tlength;
1010
1011 if (flags)
1012 *flags |= PA_STREAM_ADJUST_LATENCY;
1013 }
1014 }
1015
1016 if (s->context->version >= 13)
1017 return;
1018
1019 /* Version older than 0.9.10 didn't do server side buffer_attr
1020 * selection, hence we have to fake it on the client side. */
1021
1022 /* We choose fairly conservative values here, to not confuse
1023 * old clients with extremely large playback buffers */
1024
1025 if (attr->maxlength == (uint32_t) -1)
1026 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
1027
1028 if (attr->tlength == (uint32_t) -1)
1029 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
1030
1031 if (attr->minreq == (uint32_t) -1)
1032 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
1033
1034 if (attr->prebuf == (uint32_t) -1)
1035 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
1036
1037 if (attr->fragsize == (uint32_t) -1)
1038 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
1039 }
1040
pa_create_stream_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1041 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1042 pa_stream *s = userdata;
1043 uint32_t requested_bytes = 0;
1044
1045 pa_assert(pd);
1046 pa_assert(s);
1047 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1048 pa_assert(s->state == PA_STREAM_CREATING);
1049
1050 pa_stream_ref(s);
1051
1052 if (command != PA_COMMAND_REPLY) {
1053 if (pa_context_handle_error(s->context, command, t, false) < 0)
1054 goto finish;
1055
1056 pa_stream_set_state(s, PA_STREAM_FAILED);
1057 goto finish;
1058 }
1059
1060 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
1061 s->channel == PA_INVALID_INDEX ||
1062 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
1063 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
1064 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1065 goto finish;
1066 }
1067
1068 s->requested_bytes = (int64_t) requested_bytes;
1069
1070 if (s->context->version >= 9) {
1071 if (s->direction == PA_STREAM_PLAYBACK) {
1072 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1073 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1074 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1075 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1076 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1077 goto finish;
1078 }
1079 } else if (s->direction == PA_STREAM_RECORD) {
1080 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1081 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1082 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1083 goto finish;
1084 }
1085 }
1086 }
1087
1088 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1089 pa_sample_spec ss;
1090 pa_channel_map cm;
1091 const char *dn = NULL;
1092 bool suspended;
1093
1094 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1095 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1096 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1097 pa_tagstruct_gets(t, &dn) < 0 ||
1098 pa_tagstruct_get_boolean(t, &suspended) < 0) {
1099 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1100 goto finish;
1101 }
1102
1103 if (!dn || s->device_index == PA_INVALID_INDEX ||
1104 ss.channels != cm.channels ||
1105 !pa_channel_map_valid(&cm) ||
1106 !pa_sample_spec_valid(&ss) ||
1107 (s->n_formats == 0 && (
1108 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1109 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1110 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1111 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1112 goto finish;
1113 }
1114
1115 pa_xfree(s->device_name);
1116 s->device_name = pa_xstrdup(dn);
1117 s->suspended = suspended;
1118
1119 s->channel_map = cm;
1120 s->sample_spec = ss;
1121 }
1122
1123 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1124 pa_usec_t usec;
1125
1126 if (pa_tagstruct_get_usec(t, &usec) < 0) {
1127 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1128 goto finish;
1129 }
1130
1131 if (s->direction == PA_STREAM_RECORD)
1132 s->timing_info.configured_source_usec = usec;
1133 else
1134 s->timing_info.configured_sink_usec = usec;
1135 }
1136
1137 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1138 || s->context->version >= 22) {
1139
1140 pa_format_info *f = pa_format_info_new();
1141
1142 if (pa_tagstruct_get_format_info(t, f) < 0 || !pa_format_info_valid(f)) {
1143 pa_format_info_free(f);
1144 if (s->n_formats > 0) {
1145 /* We used the extended API, so we should have got back a proper format */
1146 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1147 goto finish;
1148 }
1149 } else
1150 s->format = f;
1151 }
1152
1153 if (!pa_tagstruct_eof(t)) {
1154 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1155 goto finish;
1156 }
1157
1158 if (s->direction == PA_STREAM_RECORD) {
1159 pa_assert(!s->record_memblockq);
1160
1161 s->record_memblockq = pa_memblockq_new(
1162 "client side record memblockq",
1163 0,
1164 s->buffer_attr.maxlength,
1165 0,
1166 &s->sample_spec,
1167 1,
1168 0,
1169 0,
1170 NULL);
1171 }
1172
1173 s->channel_valid = true;
1174 pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1175
1176 create_stream_complete(s);
1177
1178 finish:
1179 pa_stream_unref(s);
1180 }
1181
create_stream(pa_stream_direction_t direction,pa_stream * s,const char * dev,const pa_buffer_attr * attr,pa_stream_flags_t flags,const pa_cvolume * volume,pa_stream * sync_stream)1182 static int create_stream(
1183 pa_stream_direction_t direction,
1184 pa_stream *s,
1185 const char *dev,
1186 const pa_buffer_attr *attr,
1187 pa_stream_flags_t flags,
1188 const pa_cvolume *volume,
1189 pa_stream *sync_stream) {
1190
1191 pa_tagstruct *t;
1192 uint32_t tag;
1193 bool volume_set = !!volume;
1194 pa_cvolume cv;
1195 uint32_t i;
1196
1197 pa_assert(s);
1198 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1199 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1200
1201 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1202 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1203 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1204 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1205 PA_STREAM_INTERPOLATE_TIMING|
1206 PA_STREAM_NOT_MONOTONIC|
1207 PA_STREAM_AUTO_TIMING_UPDATE|
1208 PA_STREAM_NO_REMAP_CHANNELS|
1209 PA_STREAM_NO_REMIX_CHANNELS|
1210 PA_STREAM_FIX_FORMAT|
1211 PA_STREAM_FIX_RATE|
1212 PA_STREAM_FIX_CHANNELS|
1213 PA_STREAM_DONT_MOVE|
1214 PA_STREAM_VARIABLE_RATE|
1215 PA_STREAM_PEAK_DETECT|
1216 PA_STREAM_START_MUTED|
1217 PA_STREAM_ADJUST_LATENCY|
1218 PA_STREAM_EARLY_REQUESTS|
1219 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1220 PA_STREAM_START_UNMUTED|
1221 PA_STREAM_FAIL_ON_SUSPEND|
1222 PA_STREAM_RELATIVE_VOLUME|
1223 PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1224
1225 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1226 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1227 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1228 /* Although some of the other flags are not supported on older
1229 * version, we don't check for them here, because it doesn't hurt
1230 * when they are passed but actually not supported. This makes
1231 * client development easier */
1232
1233 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1234 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1235 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1236
1237 pa_stream_ref(s);
1238
1239 s->direction = direction;
1240
1241 if (sync_stream)
1242 s->syncid = sync_stream->syncid;
1243
1244 if (attr)
1245 s->buffer_attr = *attr;
1246 patch_buffer_attr(s, &s->buffer_attr, &flags);
1247
1248 s->flags = flags;
1249 s->corked = !!(flags & PA_STREAM_START_CORKED);
1250
1251 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1252 pa_usec_t x;
1253
1254 x = pa_rtclock_now();
1255
1256 pa_assert(!s->smoother);
1257 s->smoother = pa_smoother_new(
1258 SMOOTHER_ADJUST_TIME,
1259 SMOOTHER_HISTORY_TIME,
1260 !(flags & PA_STREAM_NOT_MONOTONIC),
1261 true,
1262 SMOOTHER_MIN_HISTORY,
1263 x,
1264 true);
1265 }
1266
1267 if (!dev)
1268 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1269
1270 t = pa_tagstruct_command(
1271 s->context,
1272 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1273 &tag);
1274
1275 if (s->context->version < 13)
1276 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1277
1278 pa_tagstruct_put(
1279 t,
1280 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1281 PA_TAG_CHANNEL_MAP, &s->channel_map,
1282 PA_TAG_U32, PA_INVALID_INDEX,
1283 PA_TAG_STRING, dev,
1284 PA_TAG_U32, s->buffer_attr.maxlength,
1285 PA_TAG_BOOLEAN, s->corked,
1286 PA_TAG_INVALID);
1287
1288 if (!volume) {
1289 if (pa_sample_spec_valid(&s->sample_spec))
1290 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1291 else {
1292 /* This is not really relevant, since no volume was set, and
1293 * the real number of channels is embedded in the format_info
1294 * structure */
1295 volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1296 }
1297 }
1298
1299 if (s->direction == PA_STREAM_PLAYBACK) {
1300 pa_tagstruct_put(
1301 t,
1302 PA_TAG_U32, s->buffer_attr.tlength,
1303 PA_TAG_U32, s->buffer_attr.prebuf,
1304 PA_TAG_U32, s->buffer_attr.minreq,
1305 PA_TAG_U32, s->syncid,
1306 PA_TAG_INVALID);
1307
1308 pa_tagstruct_put_cvolume(t, volume);
1309 } else
1310 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1311
1312 if (s->context->version >= 12) {
1313 pa_tagstruct_put(
1314 t,
1315 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1316 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1317 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1318 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1319 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1320 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1321 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1322 PA_TAG_INVALID);
1323 }
1324
1325 if (s->context->version >= 13) {
1326
1327 if (s->direction == PA_STREAM_PLAYBACK)
1328 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1329 else
1330 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1331
1332 pa_tagstruct_put(
1333 t,
1334 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1335 PA_TAG_PROPLIST, s->proplist,
1336 PA_TAG_INVALID);
1337
1338 if (s->direction == PA_STREAM_RECORD)
1339 pa_tagstruct_putu32(t, s->direct_on_input);
1340 }
1341
1342 if (s->context->version >= 14) {
1343
1344 if (s->direction == PA_STREAM_PLAYBACK)
1345 pa_tagstruct_put_boolean(t, volume_set);
1346
1347 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1348 }
1349
1350 if (s->context->version >= 15) {
1351
1352 if (s->direction == PA_STREAM_PLAYBACK)
1353 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1354
1355 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1356 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1357 }
1358
1359 if (s->context->version >= 17 && s->direction == PA_STREAM_PLAYBACK)
1360 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1361
1362 if (s->context->version >= 18 && s->direction == PA_STREAM_PLAYBACK)
1363 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1364
1365 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1366 || s->context->version >= 22) {
1367
1368 pa_tagstruct_putu8(t, s->n_formats);
1369 for (i = 0; i < s->n_formats; i++)
1370 pa_tagstruct_put_format_info(t, s->req_formats[i]);
1371 }
1372
1373 if (s->context->version >= 22 && s->direction == PA_STREAM_RECORD) {
1374 pa_tagstruct_put_cvolume(t, volume);
1375 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1376 pa_tagstruct_put_boolean(t, volume_set);
1377 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1378 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1379 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1380 }
1381
1382 pa_pstream_send_tagstruct(s->context->pstream, t);
1383 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1384
1385 pa_stream_set_state(s, PA_STREAM_CREATING);
1386
1387 pa_stream_unref(s);
1388 return 0;
1389 }
1390
pa_stream_connect_playback(pa_stream * s,const char * dev,const pa_buffer_attr * attr,pa_stream_flags_t flags,const pa_cvolume * volume,pa_stream * sync_stream)1391 int pa_stream_connect_playback(
1392 pa_stream *s,
1393 const char *dev,
1394 const pa_buffer_attr *attr,
1395 pa_stream_flags_t flags,
1396 const pa_cvolume *volume,
1397 pa_stream *sync_stream) {
1398
1399 pa_assert(s);
1400 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1401
1402 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1403 }
1404
pa_stream_connect_record(pa_stream * s,const char * dev,const pa_buffer_attr * attr,pa_stream_flags_t flags)1405 int pa_stream_connect_record(
1406 pa_stream *s,
1407 const char *dev,
1408 const pa_buffer_attr *attr,
1409 pa_stream_flags_t flags) {
1410
1411 pa_assert(s);
1412 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1413
1414 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1415 }
1416
pa_stream_begin_write(pa_stream * s,void ** data,size_t * nbytes)1417 int pa_stream_begin_write(
1418 pa_stream *s,
1419 void **data,
1420 size_t *nbytes) {
1421
1422 pa_assert(s);
1423 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1424
1425 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1426 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1427 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1428 PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1429 PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1430
1431 if (*nbytes != (size_t) -1) {
1432 size_t m, fs;
1433
1434 m = pa_mempool_block_size_max(s->context->mempool);
1435 fs = pa_frame_size(&s->sample_spec);
1436
1437 m = (m / fs) * fs;
1438 if (*nbytes > m)
1439 *nbytes = m;
1440 }
1441
1442 if (!s->write_memblock) {
1443 s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1444 s->write_data = pa_memblock_acquire(s->write_memblock);
1445 }
1446
1447 *data = s->write_data;
1448 *nbytes = pa_memblock_get_length(s->write_memblock);
1449
1450 return 0;
1451 }
1452
pa_stream_cancel_write(pa_stream * s)1453 int pa_stream_cancel_write(
1454 pa_stream *s) {
1455
1456 pa_assert(s);
1457 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1458
1459 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1460 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1461 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1462 PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1463
1464 pa_assert(s->write_data);
1465
1466 pa_memblock_release(s->write_memblock);
1467 pa_memblock_unref(s->write_memblock);
1468 s->write_memblock = NULL;
1469 s->write_data = NULL;
1470
1471 return 0;
1472 }
1473
pa_stream_write_ext_free(pa_stream * s,const void * data,size_t length,pa_free_cb_t free_cb,void * free_cb_data,int64_t offset,pa_seek_mode_t seek)1474 int pa_stream_write_ext_free(
1475 pa_stream *s,
1476 const void *data,
1477 size_t length,
1478 pa_free_cb_t free_cb,
1479 void *free_cb_data,
1480 int64_t offset,
1481 pa_seek_mode_t seek) {
1482
1483 pa_assert(s);
1484 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1485 pa_assert(data);
1486
1487 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1488 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1489 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1490 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1491 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1492 PA_CHECK_VALIDITY(s->context,
1493 !s->write_memblock ||
1494 ((data >= s->write_data) &&
1495 ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1496 PA_ERR_INVALID);
1497 PA_CHECK_VALIDITY(s->context, offset % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID);
1498 PA_CHECK_VALIDITY(s->context, length % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID);
1499 PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1500
1501 if (s->write_memblock) {
1502 pa_memchunk chunk;
1503
1504 /* pa_stream_write_begin() was called before */
1505
1506 pa_memblock_release(s->write_memblock);
1507
1508 chunk.memblock = s->write_memblock;
1509 chunk.index = (const char *) data - (const char *) s->write_data;
1510 chunk.length = length;
1511
1512 s->write_memblock = NULL;
1513 s->write_data = NULL;
1514
1515 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1516 pa_memblock_unref(chunk.memblock);
1517
1518 } else {
1519 pa_seek_mode_t t_seek = seek;
1520 int64_t t_offset = offset;
1521 size_t t_length = length;
1522 const void *t_data = data;
1523
1524 /* pa_stream_write_begin() was not called before */
1525
1526 while (t_length > 0) {
1527 pa_memchunk chunk;
1528
1529 chunk.index = 0;
1530
1531 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1532 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, free_cb_data, 1);
1533 chunk.length = t_length;
1534 } else {
1535 void *d;
1536 size_t blk_size_max;
1537
1538 /* Break large audio streams into _aligned_ blocks or the
1539 * other endpoint will happily discard them upon arrival. */
1540 blk_size_max = pa_frame_align(pa_mempool_block_size_max(s->context->mempool), &s->sample_spec);
1541 chunk.length = PA_MIN(t_length, blk_size_max);
1542 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1543
1544 d = pa_memblock_acquire(chunk.memblock);
1545 memcpy(d, t_data, chunk.length);
1546 pa_memblock_release(chunk.memblock);
1547 }
1548
1549 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1550
1551 t_offset = 0;
1552 t_seek = PA_SEEK_RELATIVE;
1553
1554 t_data = (const uint8_t*) t_data + chunk.length;
1555 t_length -= chunk.length;
1556
1557 pa_memblock_unref(chunk.memblock);
1558 }
1559
1560 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1561 free_cb(free_cb_data);
1562 }
1563
1564 /* This is obviously wrong since we ignore the seeking index . But
1565 * that's OK, the server side applies the same error */
1566 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1567
1568 #ifdef STREAM_DEBUG
1569 pa_log_debug("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes);
1570 #endif
1571
1572 if (s->direction == PA_STREAM_PLAYBACK) {
1573
1574 /* Update latency request correction */
1575 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1576
1577 if (seek == PA_SEEK_ABSOLUTE) {
1578 s->write_index_corrections[s->current_write_index_correction].corrupt = false;
1579 s->write_index_corrections[s->current_write_index_correction].absolute = true;
1580 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1581 } else if (seek == PA_SEEK_RELATIVE) {
1582 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1583 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1584 } else
1585 s->write_index_corrections[s->current_write_index_correction].corrupt = true;
1586 }
1587
1588 /* Update the write index in the already available latency data */
1589 if (s->timing_info_valid) {
1590
1591 if (seek == PA_SEEK_ABSOLUTE) {
1592 s->timing_info.write_index_corrupt = false;
1593 s->timing_info.write_index = offset + (int64_t) length;
1594 } else if (seek == PA_SEEK_RELATIVE) {
1595 if (!s->timing_info.write_index_corrupt)
1596 s->timing_info.write_index += offset + (int64_t) length;
1597 } else
1598 s->timing_info.write_index_corrupt = true;
1599 }
1600
1601 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1602 request_auto_timing_update(s, true);
1603 }
1604
1605 return 0;
1606 }
1607
pa_stream_write(pa_stream * s,const void * data,size_t length,pa_free_cb_t free_cb,int64_t offset,pa_seek_mode_t seek)1608 int pa_stream_write(
1609 pa_stream *s,
1610 const void *data,
1611 size_t length,
1612 pa_free_cb_t free_cb,
1613 int64_t offset,
1614 pa_seek_mode_t seek) {
1615
1616 return pa_stream_write_ext_free(s, data, length, free_cb, (void*) data, offset, seek);
1617 }
1618
pa_stream_peek(pa_stream * s,const void ** data,size_t * length)1619 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1620 pa_assert(s);
1621 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1622 pa_assert(data);
1623 pa_assert(length);
1624
1625 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1626 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1627 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1628
1629 if (!s->peek_memchunk.memblock) {
1630
1631 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1632 /* record_memblockq is empty. */
1633 *data = NULL;
1634 *length = 0;
1635 return 0;
1636
1637 } else if (!s->peek_memchunk.memblock) {
1638 /* record_memblockq isn't empty, but it doesn't have any data at
1639 * the current read index. */
1640 *data = NULL;
1641 *length = s->peek_memchunk.length;
1642 return 0;
1643 }
1644
1645 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1646 }
1647
1648 pa_assert(s->peek_data);
1649 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1650 *length = s->peek_memchunk.length;
1651 return 0;
1652 }
1653
pa_stream_drop(pa_stream * s)1654 int pa_stream_drop(pa_stream *s) {
1655 pa_assert(s);
1656 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1657
1658 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1659 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1660 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1661 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.length > 0, PA_ERR_BADSTATE);
1662
1663 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1664
1665 /* Fix the simulated local read index */
1666 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1667 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1668
1669 if (s->peek_memchunk.memblock) {
1670 pa_assert(s->peek_data);
1671 s->peek_data = NULL;
1672 pa_memblock_release(s->peek_memchunk.memblock);
1673 pa_memblock_unref(s->peek_memchunk.memblock);
1674 }
1675
1676 pa_memchunk_reset(&s->peek_memchunk);
1677
1678 return 0;
1679 }
1680
pa_stream_writable_size(const pa_stream * s)1681 size_t pa_stream_writable_size(const pa_stream *s) {
1682 pa_assert(s);
1683 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1684
1685 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1686 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1687 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1688
1689 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1690 }
1691
pa_stream_readable_size(const pa_stream * s)1692 size_t pa_stream_readable_size(const pa_stream *s) {
1693 pa_assert(s);
1694 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1695
1696 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1697 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1698 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1699
1700 return pa_memblockq_get_length(s->record_memblockq);
1701 }
1702
pa_stream_drain(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)1703 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1704 pa_operation *o;
1705 pa_tagstruct *t;
1706 uint32_t tag;
1707
1708 pa_assert(s);
1709 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1710
1711 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1712 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1713 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1714
1715 /* Ask for a timing update before we cork/uncork to get the best
1716 * accuracy for the transport latency suitable for the
1717 * check_smoother_status() call in the started callback */
1718 request_auto_timing_update(s, true);
1719
1720 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1721
1722 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1723 pa_tagstruct_putu32(t, s->channel);
1724 pa_pstream_send_tagstruct(s->context->pstream, t);
1725 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1726
1727 /* This might cause the read index to continue again, hence
1728 * let's request a timing update */
1729 request_auto_timing_update(s, true);
1730
1731 return o;
1732 }
1733
calc_time(const pa_stream * s,bool ignore_transport)1734 static pa_usec_t calc_time(const pa_stream *s, bool ignore_transport) {
1735 pa_usec_t usec;
1736
1737 pa_assert(s);
1738 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1739 pa_assert(s->state == PA_STREAM_READY);
1740 pa_assert(s->direction != PA_STREAM_UPLOAD);
1741 pa_assert(s->timing_info_valid);
1742 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1743 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1744
1745 if (s->direction == PA_STREAM_PLAYBACK) {
1746 /* The last byte that was written into the output device
1747 * had this time value associated */
1748 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1749
1750 if (!s->corked && !s->suspended) {
1751
1752 if (!ignore_transport)
1753 /* Because the latency info took a little time to come
1754 * to us, we assume that the real output time is actually
1755 * a little ahead */
1756 usec += s->timing_info.transport_usec;
1757
1758 /* However, the output device usually maintains a buffer
1759 too, hence the real sample currently played is a little
1760 back */
1761 if (s->timing_info.sink_usec >= usec)
1762 usec = 0;
1763 else
1764 usec -= s->timing_info.sink_usec;
1765 }
1766
1767 } else {
1768 pa_assert(s->direction == PA_STREAM_RECORD);
1769
1770 /* The last byte written into the server side queue had
1771 * this time value associated */
1772 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1773
1774 if (!s->corked && !s->suspended) {
1775
1776 if (!ignore_transport)
1777 /* Add transport latency */
1778 usec += s->timing_info.transport_usec;
1779
1780 /* Add latency of data in device buffer */
1781 usec += s->timing_info.source_usec;
1782
1783 /* If this is a monitor source, we need to correct the
1784 * time by the playback device buffer */
1785 if (s->timing_info.sink_usec >= usec)
1786 usec = 0;
1787 else
1788 usec -= s->timing_info.sink_usec;
1789 }
1790 }
1791
1792 return usec;
1793 }
1794
stream_get_timing_info_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1795 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1796 pa_operation *o = userdata;
1797 struct timeval local, remote, now;
1798 pa_timing_info *i;
1799 bool playing = false;
1800 uint64_t underrun_for = 0, playing_for = 0;
1801
1802 pa_assert(pd);
1803 pa_assert(o);
1804 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1805
1806 if (!o->context || !o->stream)
1807 goto finish;
1808
1809 i = &o->stream->timing_info;
1810
1811 o->stream->timing_info_valid = false;
1812 i->write_index_corrupt = true;
1813 i->read_index_corrupt = true;
1814
1815 if (command != PA_COMMAND_REPLY) {
1816 if (pa_context_handle_error(o->context, command, t, false) < 0)
1817 goto finish;
1818
1819 } else {
1820
1821 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1822 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1823 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1824 pa_tagstruct_get_timeval(t, &local) < 0 ||
1825 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1826 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1827 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1828
1829 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1830 goto finish;
1831 }
1832
1833 if (o->context->version >= 13 &&
1834 o->stream->direction == PA_STREAM_PLAYBACK)
1835 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1836 pa_tagstruct_getu64(t, &playing_for) < 0) {
1837
1838 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1839 goto finish;
1840 }
1841
1842 if (!pa_tagstruct_eof(t)) {
1843 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1844 goto finish;
1845 }
1846 o->stream->timing_info_valid = true;
1847 i->write_index_corrupt = false;
1848 i->read_index_corrupt = false;
1849
1850 i->playing = (int) playing;
1851 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1852
1853 pa_gettimeofday(&now);
1854
1855 /* Calculate timestamps */
1856 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1857 /* local and remote seem to have synchronized clocks */
1858
1859 if (o->stream->direction == PA_STREAM_PLAYBACK)
1860 i->transport_usec = pa_timeval_diff(&remote, &local);
1861 else
1862 i->transport_usec = pa_timeval_diff(&now, &remote);
1863
1864 i->synchronized_clocks = true;
1865 i->timestamp = remote;
1866 } else {
1867 /* clocks are not synchronized, let's estimate latency then */
1868 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1869 i->synchronized_clocks = false;
1870 i->timestamp = local;
1871 pa_timeval_add(&i->timestamp, i->transport_usec);
1872 }
1873
1874 /* Invalidate read and write indexes if necessary */
1875 if (tag < o->stream->read_index_not_before)
1876 i->read_index_corrupt = true;
1877
1878 if (tag < o->stream->write_index_not_before)
1879 i->write_index_corrupt = true;
1880
1881 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1882 /* Write index correction */
1883
1884 int n, j;
1885 uint32_t ctag = tag;
1886
1887 /* Go through the saved correction values and add up the
1888 * total correction.*/
1889 for (n = 0, j = o->stream->current_write_index_correction+1;
1890 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1891 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1892
1893 /* Step over invalid data or out-of-date data */
1894 if (!o->stream->write_index_corrections[j].valid ||
1895 o->stream->write_index_corrections[j].tag < ctag)
1896 continue;
1897
1898 /* Make sure that everything is in order */
1899 ctag = o->stream->write_index_corrections[j].tag+1;
1900
1901 /* Now fix the write index */
1902 if (o->stream->write_index_corrections[j].corrupt) {
1903 /* A corrupting seek was made */
1904 i->write_index_corrupt = true;
1905 } else if (o->stream->write_index_corrections[j].absolute) {
1906 /* An absolute seek was made */
1907 i->write_index = o->stream->write_index_corrections[j].value;
1908 i->write_index_corrupt = false;
1909 } else if (!i->write_index_corrupt) {
1910 /* A relative seek was made */
1911 i->write_index += o->stream->write_index_corrections[j].value;
1912 }
1913 }
1914
1915 /* Clear old correction entries */
1916 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1917 if (!o->stream->write_index_corrections[n].valid)
1918 continue;
1919
1920 if (o->stream->write_index_corrections[n].tag <= tag)
1921 o->stream->write_index_corrections[n].valid = false;
1922 }
1923 }
1924
1925 if (o->stream->direction == PA_STREAM_RECORD) {
1926 /* Read index correction */
1927
1928 if (!i->read_index_corrupt)
1929 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1930 }
1931
1932 /* Update smoother if we're not corked */
1933 if (o->stream->smoother && !o->stream->corked) {
1934 pa_usec_t u, x;
1935
1936 u = x = pa_rtclock_now() - i->transport_usec;
1937
1938 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1939 pa_usec_t su;
1940
1941 /* If we weren't playing then it will take some time
1942 * until the audio will actually come out through the
1943 * speakers. Since we follow that timing here, we need
1944 * to try to fix this up */
1945
1946 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1947
1948 if (su < i->sink_usec)
1949 x += i->sink_usec - su;
1950 }
1951
1952 if (!i->playing)
1953 pa_smoother_pause(o->stream->smoother, x);
1954
1955 /* Update the smoother */
1956 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1957 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1958 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, true));
1959
1960 if (i->playing)
1961 pa_smoother_resume(o->stream->smoother, x, true);
1962 }
1963 }
1964
1965 o->stream->auto_timing_update_requested = false;
1966
1967 if (o->stream->latency_update_callback)
1968 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1969
1970 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1971 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1972 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1973 }
1974
1975 finish:
1976
1977 pa_operation_done(o);
1978 pa_operation_unref(o);
1979 }
1980
pa_stream_update_timing_info(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)1981 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1982 uint32_t tag;
1983 pa_operation *o;
1984 pa_tagstruct *t;
1985 struct timeval now;
1986 int cidx = 0;
1987
1988 pa_assert(s);
1989 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1990
1991 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1992 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1993 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1994
1995 if (s->direction == PA_STREAM_PLAYBACK) {
1996 /* Find a place to store the write_index correction data for this entry */
1997 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1998
1999 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
2000 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
2001 }
2002 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2003
2004 t = pa_tagstruct_command(
2005 s->context,
2006 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
2007 &tag);
2008 pa_tagstruct_putu32(t, s->channel);
2009 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
2010
2011 pa_pstream_send_tagstruct(s->context->pstream, t);
2012 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2013
2014 if (s->direction == PA_STREAM_PLAYBACK) {
2015 /* Fill in initial correction data */
2016
2017 s->current_write_index_correction = cidx;
2018
2019 s->write_index_corrections[cidx].valid = true;
2020 s->write_index_corrections[cidx].absolute = false;
2021 s->write_index_corrections[cidx].corrupt = false;
2022 s->write_index_corrections[cidx].tag = tag;
2023 s->write_index_corrections[cidx].value = 0;
2024 }
2025
2026 return o;
2027 }
2028
pa_stream_disconnect_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)2029 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2030 pa_stream *s = userdata;
2031
2032 pa_assert(pd);
2033 pa_assert(s);
2034 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2035
2036 pa_stream_ref(s);
2037
2038 if (command != PA_COMMAND_REPLY) {
2039 if (pa_context_handle_error(s->context, command, t, false) < 0)
2040 goto finish;
2041
2042 pa_stream_set_state(s, PA_STREAM_FAILED);
2043 goto finish;
2044 } else if (!pa_tagstruct_eof(t)) {
2045 pa_context_fail(s->context, PA_ERR_PROTOCOL);
2046 goto finish;
2047 }
2048
2049 pa_stream_set_state(s, PA_STREAM_TERMINATED);
2050
2051 finish:
2052 pa_stream_unref(s);
2053 }
2054
pa_stream_disconnect(pa_stream * s)2055 int pa_stream_disconnect(pa_stream *s) {
2056 pa_tagstruct *t;
2057 uint32_t tag;
2058
2059 pa_assert(s);
2060 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2061
2062 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2063 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
2064 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
2065
2066 pa_stream_ref(s);
2067
2068 t = pa_tagstruct_command(
2069 s->context,
2070 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
2071 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
2072 &tag);
2073 pa_tagstruct_putu32(t, s->channel);
2074 pa_pstream_send_tagstruct(s->context->pstream, t);
2075 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
2076
2077 pa_stream_unref(s);
2078 return 0;
2079 }
2080
pa_stream_set_read_callback(pa_stream * s,pa_stream_request_cb_t cb,void * userdata)2081 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2082 pa_assert(s);
2083 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2084
2085 if (pa_detect_fork())
2086 return;
2087
2088 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2089 return;
2090
2091 s->read_callback = cb;
2092 s->read_userdata = userdata;
2093 }
2094
pa_stream_set_write_callback(pa_stream * s,pa_stream_request_cb_t cb,void * userdata)2095 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2096 pa_assert(s);
2097 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2098
2099 if (pa_detect_fork())
2100 return;
2101
2102 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2103 return;
2104
2105 s->write_callback = cb;
2106 s->write_userdata = userdata;
2107 }
2108
pa_stream_set_state_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2109 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2110 pa_assert(s);
2111 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2112
2113 if (pa_detect_fork())
2114 return;
2115
2116 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2117 return;
2118
2119 s->state_callback = cb;
2120 s->state_userdata = userdata;
2121 }
2122
pa_stream_set_overflow_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2123 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2124 pa_assert(s);
2125 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2126
2127 if (pa_detect_fork())
2128 return;
2129
2130 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2131 return;
2132
2133 s->overflow_callback = cb;
2134 s->overflow_userdata = userdata;
2135 }
2136
pa_stream_set_underflow_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2137 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2138 pa_assert(s);
2139 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2140
2141 if (pa_detect_fork())
2142 return;
2143
2144 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2145 return;
2146
2147 s->underflow_callback = cb;
2148 s->underflow_userdata = userdata;
2149 }
2150
pa_stream_set_latency_update_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2151 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2152 pa_assert(s);
2153 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2154
2155 if (pa_detect_fork())
2156 return;
2157
2158 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2159 return;
2160
2161 s->latency_update_callback = cb;
2162 s->latency_update_userdata = userdata;
2163 }
2164
pa_stream_set_moved_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2165 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2166 pa_assert(s);
2167 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2168
2169 if (pa_detect_fork())
2170 return;
2171
2172 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2173 return;
2174
2175 s->moved_callback = cb;
2176 s->moved_userdata = userdata;
2177 }
2178
pa_stream_set_suspended_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2179 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2180 pa_assert(s);
2181 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2182
2183 if (pa_detect_fork())
2184 return;
2185
2186 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2187 return;
2188
2189 s->suspended_callback = cb;
2190 s->suspended_userdata = userdata;
2191 }
2192
pa_stream_set_started_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2193 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2194 pa_assert(s);
2195 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2196
2197 if (pa_detect_fork())
2198 return;
2199
2200 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2201 return;
2202
2203 s->started_callback = cb;
2204 s->started_userdata = userdata;
2205 }
2206
pa_stream_set_event_callback(pa_stream * s,pa_stream_event_cb_t cb,void * userdata)2207 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2208 pa_assert(s);
2209 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2210
2211 if (pa_detect_fork())
2212 return;
2213
2214 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2215 return;
2216
2217 s->event_callback = cb;
2218 s->event_userdata = userdata;
2219 }
2220
pa_stream_set_buffer_attr_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2221 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2222 pa_assert(s);
2223 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2224
2225 if (pa_detect_fork())
2226 return;
2227
2228 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2229 return;
2230
2231 s->buffer_attr_callback = cb;
2232 s->buffer_attr_userdata = userdata;
2233 }
2234
pa_stream_simple_ack_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)2235 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2236 pa_operation *o = userdata;
2237 int success = 1;
2238
2239 pa_assert(pd);
2240 pa_assert(o);
2241 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2242
2243 if (!o->context)
2244 goto finish;
2245
2246 if (command != PA_COMMAND_REPLY) {
2247 if (pa_context_handle_error(o->context, command, t, false) < 0)
2248 goto finish;
2249
2250 success = 0;
2251 } else if (!pa_tagstruct_eof(t)) {
2252 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2253 goto finish;
2254 }
2255
2256 if (o->callback) {
2257 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2258 cb(o->stream, success, o->userdata);
2259 }
2260
2261 finish:
2262 pa_operation_done(o);
2263 pa_operation_unref(o);
2264 }
2265
pa_stream_cork(pa_stream * s,int b,pa_stream_success_cb_t cb,void * userdata)2266 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2267 pa_operation *o;
2268 pa_tagstruct *t;
2269 uint32_t tag;
2270
2271 pa_assert(s);
2272 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2273
2274 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2275 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2276 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2277
2278 /* Ask for a timing update before we cork/uncork to get the best
2279 * accuracy for the transport latency suitable for the
2280 * check_smoother_status() call in the started callback */
2281 request_auto_timing_update(s, true);
2282
2283 s->corked = b;
2284
2285 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2286
2287 t = pa_tagstruct_command(
2288 s->context,
2289 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2290 &tag);
2291 pa_tagstruct_putu32(t, s->channel);
2292 pa_tagstruct_put_boolean(t, !!b);
2293 pa_pstream_send_tagstruct(s->context->pstream, t);
2294 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2295
2296 check_smoother_status(s, false, false, false);
2297
2298 /* This might cause the indexes to hang/start again, hence let's
2299 * request a timing update, after the cork/uncork, too */
2300 request_auto_timing_update(s, true);
2301
2302 return o;
2303 }
2304
stream_send_simple_command(pa_stream * s,uint32_t command,pa_stream_success_cb_t cb,void * userdata)2305 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2306 pa_tagstruct *t;
2307 pa_operation *o;
2308 uint32_t tag;
2309
2310 pa_assert(s);
2311 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2312
2313 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2314 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2315
2316 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2317
2318 t = pa_tagstruct_command(s->context, command, &tag);
2319 pa_tagstruct_putu32(t, s->channel);
2320 pa_pstream_send_tagstruct(s->context->pstream, t);
2321 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2322
2323 return o;
2324 }
2325
pa_stream_flush(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)2326 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2327 pa_operation *o;
2328
2329 pa_assert(s);
2330 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2331
2332 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2333 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2334 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2335
2336 /* Ask for a timing update *before* the flush, so that the
2337 * transport usec is as up to date as possible when we get the
2338 * underflow message and update the smoother status*/
2339 request_auto_timing_update(s, true);
2340
2341 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2342 return NULL;
2343
2344 if (s->direction == PA_STREAM_PLAYBACK) {
2345
2346 if (s->write_index_corrections[s->current_write_index_correction].valid)
2347 s->write_index_corrections[s->current_write_index_correction].corrupt = true;
2348
2349 if (s->buffer_attr.prebuf > 0)
2350 check_smoother_status(s, false, false, true);
2351
2352 /* This will change the write index, but leave the
2353 * read index untouched. */
2354 invalidate_indexes(s, false, true);
2355
2356 } else
2357 /* For record streams this has no influence on the write
2358 * index, but the read index might jump. */
2359 invalidate_indexes(s, true, false);
2360
2361 /* Note that we do not update requested_bytes here. This is
2362 * because we cannot really know how data actually was dropped
2363 * from the write index due to this. This 'error' will be applied
2364 * by both client and server and hence we should be fine. */
2365
2366 return o;
2367 }
2368
pa_stream_prebuf(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)2369 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2370 pa_operation *o;
2371
2372 pa_assert(s);
2373 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2374
2375 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2376 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2377 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2378 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2379
2380 /* Ask for a timing update before we cork/uncork to get the best
2381 * accuracy for the transport latency suitable for the
2382 * check_smoother_status() call in the started callback */
2383 request_auto_timing_update(s, true);
2384
2385 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2386 return NULL;
2387
2388 /* This might cause the read index to hang again, hence
2389 * let's request a timing update */
2390 request_auto_timing_update(s, true);
2391
2392 return o;
2393 }
2394
pa_stream_trigger(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)2395 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2396 pa_operation *o;
2397
2398 pa_assert(s);
2399 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2400
2401 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2402 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2403 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2404 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2405
2406 /* Ask for a timing update before we cork/uncork to get the best
2407 * accuracy for the transport latency suitable for the
2408 * check_smoother_status() call in the started callback */
2409 request_auto_timing_update(s, true);
2410
2411 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2412 return NULL;
2413
2414 /* This might cause the read index to start moving again, hence
2415 * let's request a timing update */
2416 request_auto_timing_update(s, true);
2417
2418 return o;
2419 }
2420
pa_stream_set_name(pa_stream * s,const char * name,pa_stream_success_cb_t cb,void * userdata)2421 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2422 pa_operation *o;
2423
2424 pa_assert(s);
2425 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2426 pa_assert(name);
2427
2428 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2429 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2430 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2431
2432 if (s->context->version >= 13) {
2433 pa_proplist *p = pa_proplist_new();
2434
2435 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2436 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2437 pa_proplist_free(p);
2438 } else {
2439 pa_tagstruct *t;
2440 uint32_t tag;
2441
2442 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2443 t = pa_tagstruct_command(
2444 s->context,
2445 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2446 &tag);
2447 pa_tagstruct_putu32(t, s->channel);
2448 pa_tagstruct_puts(t, name);
2449 pa_pstream_send_tagstruct(s->context->pstream, t);
2450 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2451 }
2452
2453 return o;
2454 }
2455
pa_stream_get_time(pa_stream * s,pa_usec_t * r_usec)2456 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2457 pa_usec_t usec;
2458
2459 pa_assert(s);
2460 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2461
2462 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2463 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2464 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2465 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2466 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2467 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2468
2469 if (s->smoother)
2470 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2471 else
2472 usec = calc_time(s, false);
2473
2474 /* Make sure the time runs monotonically */
2475 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2476 if (usec < s->previous_time)
2477 usec = s->previous_time;
2478 else
2479 s->previous_time = usec;
2480 }
2481
2482 if (r_usec)
2483 *r_usec = usec;
2484
2485 return 0;
2486 }
2487
time_counter_diff(const pa_stream * s,pa_usec_t a,pa_usec_t b,int * negative)2488 static pa_usec_t time_counter_diff(const pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2489 pa_assert(s);
2490 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2491
2492 if (negative)
2493 *negative = 0;
2494
2495 if (a >= b)
2496 return a-b;
2497 else {
2498 if (negative && s->direction == PA_STREAM_RECORD) {
2499 *negative = 1;
2500 return b-a;
2501 } else
2502 return 0;
2503 }
2504 }
2505
pa_stream_get_latency(pa_stream * s,pa_usec_t * r_usec,int * negative)2506 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2507 pa_usec_t t, c;
2508 int r;
2509 int64_t cindex;
2510
2511 pa_assert(s);
2512 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2513 pa_assert(r_usec);
2514
2515 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2516 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2517 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2518 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2519 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2520 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2521
2522 if ((r = pa_stream_get_time(s, &t)) < 0)
2523 return r;
2524
2525 if (s->direction == PA_STREAM_PLAYBACK)
2526 cindex = s->timing_info.write_index;
2527 else
2528 cindex = s->timing_info.read_index;
2529
2530 if (cindex < 0)
2531 cindex = 0;
2532
2533 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2534
2535 if (s->direction == PA_STREAM_PLAYBACK)
2536 *r_usec = time_counter_diff(s, c, t, negative);
2537 else
2538 *r_usec = time_counter_diff(s, t, c, negative);
2539
2540 return 0;
2541 }
2542
pa_stream_get_timing_info(pa_stream * s)2543 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2544 pa_assert(s);
2545 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2546
2547 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2548 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2549 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2550 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2551
2552 return &s->timing_info;
2553 }
2554
pa_stream_get_sample_spec(pa_stream * s)2555 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2556 pa_assert(s);
2557 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2558
2559 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2560
2561 return &s->sample_spec;
2562 }
2563
pa_stream_get_channel_map(pa_stream * s)2564 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2565 pa_assert(s);
2566 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2567
2568 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2569
2570 return &s->channel_map;
2571 }
2572
pa_stream_get_format_info(const pa_stream * s)2573 const pa_format_info* pa_stream_get_format_info(const pa_stream *s) {
2574 pa_assert(s);
2575 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2576
2577 /* We don't have the format till routing is done */
2578 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2579 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2580
2581 return s->format;
2582 }
pa_stream_get_buffer_attr(pa_stream * s)2583 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2584 pa_assert(s);
2585 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2586
2587 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2588 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2589 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2590
2591 return &s->buffer_attr;
2592 }
2593
stream_set_buffer_attr_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)2594 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2595 pa_operation *o = userdata;
2596 int success = 1;
2597
2598 pa_assert(pd);
2599 pa_assert(o);
2600 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2601
2602 if (!o->context)
2603 goto finish;
2604
2605 if (command != PA_COMMAND_REPLY) {
2606 if (pa_context_handle_error(o->context, command, t, false) < 0)
2607 goto finish;
2608
2609 success = 0;
2610 } else {
2611 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2612 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2613 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2614 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2615 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2616 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2617 goto finish;
2618 }
2619 } else if (o->stream->direction == PA_STREAM_RECORD) {
2620 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2621 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2622 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2623 goto finish;
2624 }
2625 }
2626
2627 if (o->stream->context->version >= 13) {
2628 pa_usec_t usec;
2629
2630 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2631 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2632 goto finish;
2633 }
2634
2635 if (o->stream->direction == PA_STREAM_RECORD)
2636 o->stream->timing_info.configured_source_usec = usec;
2637 else
2638 o->stream->timing_info.configured_sink_usec = usec;
2639 }
2640
2641 if (!pa_tagstruct_eof(t)) {
2642 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2643 goto finish;
2644 }
2645 }
2646
2647 if (o->callback) {
2648 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2649 cb(o->stream, success, o->userdata);
2650 }
2651
2652 finish:
2653 pa_operation_done(o);
2654 pa_operation_unref(o);
2655 }
2656
pa_stream_set_buffer_attr(pa_stream * s,const pa_buffer_attr * attr,pa_stream_success_cb_t cb,void * userdata)2657 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2658 pa_operation *o;
2659 pa_tagstruct *t;
2660 uint32_t tag;
2661 pa_buffer_attr copy;
2662
2663 pa_assert(s);
2664 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2665 pa_assert(attr);
2666
2667 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2668 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2669 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2670 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2671
2672 /* Ask for a timing update before we cork/uncork to get the best
2673 * accuracy for the transport latency suitable for the
2674 * check_smoother_status() call in the started callback */
2675 request_auto_timing_update(s, true);
2676
2677 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2678
2679 t = pa_tagstruct_command(
2680 s->context,
2681 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2682 &tag);
2683 pa_tagstruct_putu32(t, s->channel);
2684
2685 copy = *attr;
2686 patch_buffer_attr(s, ©, NULL);
2687 attr = ©
2688
2689 pa_tagstruct_putu32(t, attr->maxlength);
2690
2691 if (s->direction == PA_STREAM_PLAYBACK)
2692 pa_tagstruct_put(
2693 t,
2694 PA_TAG_U32, attr->tlength,
2695 PA_TAG_U32, attr->prebuf,
2696 PA_TAG_U32, attr->minreq,
2697 PA_TAG_INVALID);
2698 else
2699 pa_tagstruct_putu32(t, attr->fragsize);
2700
2701 if (s->context->version >= 13)
2702 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2703
2704 if (s->context->version >= 14)
2705 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2706
2707 pa_pstream_send_tagstruct(s->context->pstream, t);
2708 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2709
2710 /* This might cause changes in the read/write index, hence let's
2711 * request a timing update */
2712 request_auto_timing_update(s, true);
2713
2714 return o;
2715 }
2716
pa_stream_get_device_index(const pa_stream * s)2717 uint32_t pa_stream_get_device_index(const pa_stream *s) {
2718 pa_assert(s);
2719 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2720
2721 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2722 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2723 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2724 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2725 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2726
2727 return s->device_index;
2728 }
2729
pa_stream_get_device_name(const pa_stream * s)2730 const char *pa_stream_get_device_name(const pa_stream *s) {
2731 pa_assert(s);
2732 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2733
2734 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2735 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2736 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2737 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2738 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2739
2740 return s->device_name;
2741 }
2742
pa_stream_is_suspended(const pa_stream * s)2743 int pa_stream_is_suspended(const pa_stream *s) {
2744 pa_assert(s);
2745 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2746
2747 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2748 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2749 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2750 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2751
2752 return s->suspended;
2753 }
2754
pa_stream_is_corked(const pa_stream * s)2755 int pa_stream_is_corked(const pa_stream *s) {
2756 pa_assert(s);
2757 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2758
2759 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2760 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2761 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2762
2763 return s->corked;
2764 }
2765
stream_update_sample_rate_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)2766 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2767 pa_operation *o = userdata;
2768 int success = 1;
2769
2770 pa_assert(pd);
2771 pa_assert(o);
2772 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2773
2774 if (!o->context)
2775 goto finish;
2776
2777 if (command != PA_COMMAND_REPLY) {
2778 if (pa_context_handle_error(o->context, command, t, false) < 0)
2779 goto finish;
2780
2781 success = 0;
2782 } else {
2783
2784 if (!pa_tagstruct_eof(t)) {
2785 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2786 goto finish;
2787 }
2788 }
2789
2790 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2791 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2792
2793 if (o->callback) {
2794 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2795 cb(o->stream, success, o->userdata);
2796 }
2797
2798 finish:
2799 pa_operation_done(o);
2800 pa_operation_unref(o);
2801 }
2802
pa_stream_update_sample_rate(pa_stream * s,uint32_t rate,pa_stream_success_cb_t cb,void * userdata)2803 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2804 pa_operation *o;
2805 pa_tagstruct *t;
2806 uint32_t tag;
2807
2808 pa_assert(s);
2809 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2810
2811 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2812 PA_CHECK_VALIDITY_RETURN_NULL(s->context, pa_sample_rate_valid(rate), PA_ERR_INVALID);
2813 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2814 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2815 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2816 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2817
2818 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2819 o->private = PA_UINT_TO_PTR(rate);
2820
2821 t = pa_tagstruct_command(
2822 s->context,
2823 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2824 &tag);
2825 pa_tagstruct_putu32(t, s->channel);
2826 pa_tagstruct_putu32(t, rate);
2827
2828 pa_pstream_send_tagstruct(s->context->pstream, t);
2829 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2830
2831 return o;
2832 }
2833
pa_stream_proplist_update(pa_stream * s,pa_update_mode_t mode,pa_proplist * p,pa_stream_success_cb_t cb,void * userdata)2834 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2835 pa_operation *o;
2836 pa_tagstruct *t;
2837 uint32_t tag;
2838
2839 pa_assert(s);
2840 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2841
2842 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2843 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2844 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2845 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2846 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2847
2848 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2849
2850 t = pa_tagstruct_command(
2851 s->context,
2852 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2853 &tag);
2854 pa_tagstruct_putu32(t, s->channel);
2855 pa_tagstruct_putu32(t, (uint32_t) mode);
2856 pa_tagstruct_put_proplist(t, p);
2857
2858 pa_pstream_send_tagstruct(s->context->pstream, t);
2859 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2860
2861 /* Please note that we don't update s->proplist here, because we
2862 * don't export that field */
2863
2864 return o;
2865 }
2866
pa_stream_proplist_remove(pa_stream * s,const char * const keys[],pa_stream_success_cb_t cb,void * userdata)2867 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2868 pa_operation *o;
2869 pa_tagstruct *t;
2870 uint32_t tag;
2871 const char * const*k;
2872
2873 pa_assert(s);
2874 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2875
2876 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2877 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2878 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2879 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2880 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2881
2882 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2883
2884 t = pa_tagstruct_command(
2885 s->context,
2886 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2887 &tag);
2888 pa_tagstruct_putu32(t, s->channel);
2889
2890 for (k = keys; *k; k++)
2891 pa_tagstruct_puts(t, *k);
2892
2893 pa_tagstruct_puts(t, NULL);
2894
2895 pa_pstream_send_tagstruct(s->context->pstream, t);
2896 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2897
2898 /* Please note that we don't update s->proplist here, because we
2899 * don't export that field */
2900
2901 return o;
2902 }
2903
pa_stream_set_monitor_stream(pa_stream * s,uint32_t sink_input_idx)2904 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2905 pa_assert(s);
2906 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2907
2908 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2909 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2910 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2911 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2912
2913 s->direct_on_input = sink_input_idx;
2914
2915 return 0;
2916 }
2917
pa_stream_get_monitor_stream(const pa_stream * s)2918 uint32_t pa_stream_get_monitor_stream(const pa_stream *s) {
2919 pa_assert(s);
2920 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2921
2922 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2923 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2924 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2925
2926 return s->direct_on_input;
2927 }
2928