1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24
25 #include <string.h>
26 #include <stdio.h>
27 #include <string.h>
28
29 #include <pulse/def.h>
30 #include <pulse/timeval.h>
31 #include <pulse/rtclock.h>
32 #include <pulse/xmalloc.h>
33 #include <pulse/fork-detect.h>
34
35 #include <pulsecore/pstream-util.h>
36 #include <pulsecore/sample-util.h>
37 #include <pulsecore/log.h>
38 #include <pulsecore/hashmap.h>
39 #include <pulsecore/macro.h>
40 #include <pulsecore/core-rtclock.h>
41 #include <pulsecore/core-util.h>
42
43 #include "internal.h"
44 #include "stream.h"
45
46 /* #define STREAM_DEBUG */
47
48 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
49 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
50
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #ifndef USE_SMOOTHER_2
53 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
54 #define SMOOTHER_MIN_HISTORY (4)
55 #endif
56
pa_stream_new(pa_context * c,const char * name,const pa_sample_spec * ss,const pa_channel_map * map)57 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
58 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
59 }
60
reset_callbacks(pa_stream * s)61 static void reset_callbacks(pa_stream *s) {
62 s->read_callback = NULL;
63 s->read_userdata = NULL;
64 s->write_callback = NULL;
65 s->write_userdata = NULL;
66 s->state_callback = NULL;
67 s->state_userdata = NULL;
68 s->overflow_callback = NULL;
69 s->overflow_userdata = NULL;
70 s->underflow_callback = NULL;
71 s->underflow_userdata = NULL;
72 s->latency_update_callback = NULL;
73 s->latency_update_userdata = NULL;
74 s->moved_callback = NULL;
75 s->moved_userdata = NULL;
76 s->suspended_callback = NULL;
77 s->suspended_userdata = NULL;
78 s->started_callback = NULL;
79 s->started_userdata = NULL;
80 s->event_callback = NULL;
81 s->event_userdata = NULL;
82 s->buffer_attr_callback = NULL;
83 s->buffer_attr_userdata = NULL;
84 }
85
pa_stream_new_with_proplist_internal(pa_context * c,const char * name,const pa_sample_spec * ss,const pa_channel_map * map,pa_format_info * const * formats,unsigned int n_formats,pa_proplist * p)86 static pa_stream *pa_stream_new_with_proplist_internal(
87 pa_context *c,
88 const char *name,
89 const pa_sample_spec *ss,
90 const pa_channel_map *map,
91 pa_format_info * const *formats,
92 unsigned int n_formats,
93 pa_proplist *p) {
94
95 pa_stream *s;
96 unsigned int i;
97
98 pa_assert(c);
99 pa_assert(PA_REFCNT_VALUE(c) >= 1);
100 pa_assert((ss == NULL && map == NULL) || (formats == NULL && n_formats == 0));
101 pa_assert(n_formats < PA_MAX_FORMATS);
102
103 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
104 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
105
106 s = pa_xnew(pa_stream, 1);
107 PA_REFCNT_INIT(s);
108 s->context = c;
109 s->mainloop = c->mainloop;
110
111 s->direction = PA_STREAM_NODIRECTION;
112 s->state = PA_STREAM_UNCONNECTED;
113 s->flags = 0;
114
115 if (ss)
116 s->sample_spec = *ss;
117 else
118 pa_sample_spec_init(&s->sample_spec);
119
120 if (map)
121 s->channel_map = *map;
122 else
123 pa_channel_map_init(&s->channel_map);
124
125 s->n_formats = 0;
126 if (formats) {
127 s->n_formats = n_formats;
128 for (i = 0; i < n_formats; i++)
129 s->req_formats[i] = pa_format_info_copy(formats[i]);
130 }
131
132 /* We'll get the final negotiated format after connecting */
133 s->format = NULL;
134
135 s->direct_on_input = PA_INVALID_INDEX;
136
137 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
138 if (name)
139 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
140
141 s->channel = 0;
142 s->channel_valid = false;
143 s->syncid = c->csyncid++;
144 s->stream_index = PA_INVALID_INDEX;
145
146 s->requested_bytes = 0;
147 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
148
149 /* We initialize the target length here, so that if the user
150 * passes no explicit buffering metrics the default is similar to
151 * what older PA versions provided. */
152
153 s->buffer_attr.maxlength = (uint32_t) -1;
154 if (ss)
155 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
156 else {
157 /* FIXME: We assume a worst-case compressed format corresponding to
158 * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
159 pa_sample_spec tmp_ss = {
160 .format = PA_SAMPLE_S16NE,
161 .rate = 48000,
162 .channels = 2,
163 };
164 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
165 }
166 s->buffer_attr.minreq = (uint32_t) -1;
167 s->buffer_attr.prebuf = (uint32_t) -1;
168 s->buffer_attr.fragsize = (uint32_t) -1;
169
170 s->device_index = PA_INVALID_INDEX;
171 s->device_name = NULL;
172 s->suspended = false;
173 s->corked = false;
174
175 s->write_memblock = NULL;
176 s->write_data = NULL;
177
178 pa_memchunk_reset(&s->peek_memchunk);
179 s->peek_data = NULL;
180 s->record_memblockq = NULL;
181
182 memset(&s->timing_info, 0, sizeof(s->timing_info));
183 s->timing_info_valid = false;
184
185 s->previous_time = 0;
186 s->latest_underrun_at_index = -1;
187
188 s->read_index_not_before = 0;
189 s->write_index_not_before = 0;
190 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
191 s->write_index_corrections[i].valid = 0;
192 s->current_write_index_correction = 0;
193
194 s->auto_timing_update_event = NULL;
195 s->auto_timing_update_requested = false;
196 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
197
198 reset_callbacks(s);
199
200 s->smoother = NULL;
201
202 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
203 PA_LLIST_PREPEND(pa_stream, c->streams, s);
204 pa_stream_ref(s);
205
206 return s;
207 }
208
pa_stream_new_with_proplist(pa_context * c,const char * name,const pa_sample_spec * ss,const pa_channel_map * map,pa_proplist * p)209 pa_stream *pa_stream_new_with_proplist(
210 pa_context *c,
211 const char *name,
212 const pa_sample_spec *ss,
213 const pa_channel_map *map,
214 pa_proplist *p) {
215
216 pa_channel_map tmap;
217
218 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
219 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
220 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
221 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
222 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
223
224 if (!map)
225 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
226
227 return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, 0, p);
228 }
229
pa_stream_new_extended(pa_context * c,const char * name,pa_format_info * const * formats,unsigned int n_formats,pa_proplist * p)230 pa_stream *pa_stream_new_extended(
231 pa_context *c,
232 const char *name,
233 pa_format_info * const *formats,
234 unsigned int n_formats,
235 pa_proplist *p) {
236
237 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
238
239 return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, n_formats, p);
240 }
241
stream_unlink(pa_stream * s)242 static void stream_unlink(pa_stream *s) {
243 pa_operation *o, *n;
244 pa_assert(s);
245
246 if (!s->context)
247 return;
248
249 /* Detach from context */
250
251 /* Unref all operation objects that point to us */
252 for (o = s->context->operations; o; o = n) {
253 n = o->next;
254
255 if (o->stream == s)
256 pa_operation_cancel(o);
257 }
258
259 /* Drop all outstanding replies for this stream */
260 if (s->context->pdispatch)
261 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
262
263 if (s->channel_valid) {
264 pa_hashmap_remove((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel));
265 s->channel = 0;
266 s->channel_valid = false;
267 }
268
269 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
270 pa_stream_unref(s);
271
272 s->context = NULL;
273
274 if (s->auto_timing_update_event) {
275 pa_assert(s->mainloop);
276 s->mainloop->time_free(s->auto_timing_update_event);
277 }
278
279 reset_callbacks(s);
280 }
281
stream_free(pa_stream * s)282 static void stream_free(pa_stream *s) {
283 unsigned int i;
284
285 pa_assert(s);
286
287 stream_unlink(s);
288
289 if (s->write_memblock) {
290 if (s->write_data)
291 pa_memblock_release(s->write_memblock);
292 pa_memblock_unref(s->write_memblock);
293 }
294
295 if (s->peek_memchunk.memblock) {
296 if (s->peek_data)
297 pa_memblock_release(s->peek_memchunk.memblock);
298 pa_memblock_unref(s->peek_memchunk.memblock);
299 }
300
301 if (s->record_memblockq)
302 pa_memblockq_free(s->record_memblockq);
303
304 if (s->proplist)
305 pa_proplist_free(s->proplist);
306
307 if (s->smoother)
308 #ifdef USE_SMOOTHER_2
309 pa_smoother_2_free(s->smoother);
310 #else
311 pa_smoother_free(s->smoother);
312 #endif
313
314 for (i = 0; i < s->n_formats; i++)
315 pa_format_info_free(s->req_formats[i]);
316
317 if (s->format)
318 pa_format_info_free(s->format);
319
320 pa_xfree(s->device_name);
321 pa_xfree(s);
322 }
323
pa_stream_unref(pa_stream * s)324 void pa_stream_unref(pa_stream *s) {
325 pa_assert(s);
326 pa_assert(PA_REFCNT_VALUE(s) >= 1);
327
328 if (PA_REFCNT_DEC(s) <= 0)
329 stream_free(s);
330 }
331
pa_stream_ref(pa_stream * s)332 pa_stream* pa_stream_ref(pa_stream *s) {
333 pa_assert(s);
334 pa_assert(PA_REFCNT_VALUE(s) >= 1);
335
336 PA_REFCNT_INC(s);
337 return s;
338 }
339
pa_stream_get_state(const pa_stream * s)340 pa_stream_state_t pa_stream_get_state(const pa_stream *s) {
341 pa_assert(s);
342 pa_assert(PA_REFCNT_VALUE(s) >= 1);
343
344 return s->state;
345 }
346
pa_stream_get_context(const pa_stream * s)347 pa_context* pa_stream_get_context(const pa_stream *s) {
348 pa_assert(s);
349 pa_assert(PA_REFCNT_VALUE(s) >= 1);
350
351 return s->context;
352 }
353
pa_stream_get_index(const pa_stream * s)354 uint32_t pa_stream_get_index(const pa_stream *s) {
355 pa_assert(s);
356 pa_assert(PA_REFCNT_VALUE(s) >= 1);
357
358 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
359 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
360
361 return s->stream_index;
362 }
363
pa_stream_set_state(pa_stream * s,pa_stream_state_t st)364 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
365 pa_assert(s);
366 pa_assert(PA_REFCNT_VALUE(s) >= 1);
367
368 if (s->state == st)
369 return;
370
371 pa_stream_ref(s);
372
373 s->state = st;
374
375 if (s->state_callback)
376 s->state_callback(s, s->state_userdata);
377
378 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
379 stream_unlink(s);
380
381 pa_stream_unref(s);
382 }
383
request_auto_timing_update(pa_stream * s,bool force)384 static void request_auto_timing_update(pa_stream *s, bool force) {
385 pa_assert(s);
386 pa_assert(PA_REFCNT_VALUE(s) >= 1);
387
388 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
389 return;
390
391 if (s->state == PA_STREAM_READY &&
392 (force || !s->auto_timing_update_requested)) {
393 pa_operation *o;
394
395 #ifdef STREAM_DEBUG
396 pa_log_debug("Automatically requesting new timing data");
397 #endif
398
399 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
400 pa_operation_unref(o);
401 s->auto_timing_update_requested = true;
402 }
403 }
404
405 if (s->auto_timing_update_event) {
406 if (s->suspended && !force) {
407 pa_assert(s->mainloop);
408 s->mainloop->time_free(s->auto_timing_update_event);
409 s->auto_timing_update_event = NULL;
410 } else {
411 if (force)
412 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
413
414 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
415
416 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
417 }
418 }
419 }
420
pa_command_stream_killed(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)421 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
422 pa_context *c = userdata;
423 pa_stream *s;
424 uint32_t channel;
425
426 pa_assert(pd);
427 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
428 pa_assert(t);
429 pa_assert(c);
430 pa_assert(PA_REFCNT_VALUE(c) >= 1);
431
432 pa_context_ref(c);
433
434 if (pa_tagstruct_getu32(t, &channel) < 0 ||
435 !pa_tagstruct_eof(t)) {
436 pa_context_fail(c, PA_ERR_PROTOCOL);
437 goto finish;
438 }
439
440 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
441 goto finish;
442
443 if (s->state != PA_STREAM_READY)
444 goto finish;
445
446 pa_context_set_error(c, PA_ERR_KILLED);
447 pa_stream_set_state(s, PA_STREAM_FAILED);
448
449 finish:
450 pa_context_unref(c);
451 }
452
check_smoother_status(pa_stream * s,bool aposteriori,bool force_start,bool force_stop)453 static void check_smoother_status(pa_stream *s, bool aposteriori, bool force_start, bool force_stop) {
454 pa_usec_t x;
455
456 pa_assert(s);
457 pa_assert(!force_start || !force_stop);
458
459 if (!s->smoother)
460 return;
461
462 x = pa_rtclock_now();
463
464 if (s->timing_info_valid) {
465 if (aposteriori)
466 x -= s->timing_info.transport_usec;
467 else
468 x += s->timing_info.transport_usec;
469 }
470
471 if (s->suspended || s->corked || force_stop)
472 #ifdef USE_SMOOTHER_2
473 pa_smoother_2_pause(s->smoother, x);
474 #else
475 pa_smoother_pause(s->smoother, x);
476 #endif
477 else if (force_start || s->buffer_attr.prebuf == 0) {
478
479 if (!s->timing_info_valid &&
480 !aposteriori &&
481 !force_start &&
482 !force_stop &&
483 s->context->version >= 13) {
484
485 /* If the server supports STARTED events we take them as
486 * indications when audio really starts/stops playing, if
487 * we don't have any timing info yet -- instead of trying
488 * to be smart and guessing the server time. Otherwise the
489 * unknown transport delay adds too much noise to our time
490 * calculations. */
491
492 return;
493 }
494
495 #ifdef USE_SMOOTHER_2
496 pa_smoother_2_resume(s->smoother, x);
497 #else
498 pa_smoother_resume(s->smoother, x, true);
499 #endif
500 }
501
502 /* Please note that we have no idea if playback actually started
503 * if prebuf is non-zero! */
504 }
505
506 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata);
507
pa_command_stream_moved(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)508 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
509 pa_context *c = userdata;
510 pa_stream *s;
511 uint32_t channel;
512 const char *dn;
513 bool suspended;
514 uint32_t di;
515 pa_usec_t usec = 0;
516 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
517
518 pa_assert(pd);
519 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
520 pa_assert(t);
521 pa_assert(c);
522 pa_assert(PA_REFCNT_VALUE(c) >= 1);
523
524 pa_context_ref(c);
525
526 if (c->version < 12) {
527 pa_context_fail(c, PA_ERR_PROTOCOL);
528 goto finish;
529 }
530
531 if (pa_tagstruct_getu32(t, &channel) < 0 ||
532 pa_tagstruct_getu32(t, &di) < 0 ||
533 pa_tagstruct_gets(t, &dn) < 0 ||
534 pa_tagstruct_get_boolean(t, &suspended) < 0) {
535 pa_context_fail(c, PA_ERR_PROTOCOL);
536 goto finish;
537 }
538
539 if (c->version >= 13) {
540
541 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
542 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
543 pa_tagstruct_getu32(t, &fragsize) < 0 ||
544 pa_tagstruct_get_usec(t, &usec) < 0) {
545 pa_context_fail(c, PA_ERR_PROTOCOL);
546 goto finish;
547 }
548 } else {
549 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
550 pa_tagstruct_getu32(t, &tlength) < 0 ||
551 pa_tagstruct_getu32(t, &prebuf) < 0 ||
552 pa_tagstruct_getu32(t, &minreq) < 0 ||
553 pa_tagstruct_get_usec(t, &usec) < 0) {
554 pa_context_fail(c, PA_ERR_PROTOCOL);
555 goto finish;
556 }
557 }
558 }
559
560 if (!pa_tagstruct_eof(t)) {
561 pa_context_fail(c, PA_ERR_PROTOCOL);
562 goto finish;
563 }
564
565 if (!dn || di == PA_INVALID_INDEX) {
566 pa_context_fail(c, PA_ERR_PROTOCOL);
567 goto finish;
568 }
569
570 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
571 goto finish;
572
573 if (s->state != PA_STREAM_READY)
574 goto finish;
575
576 if (c->version >= 13) {
577 if (s->direction == PA_STREAM_RECORD)
578 s->timing_info.configured_source_usec = usec;
579 else
580 s->timing_info.configured_sink_usec = usec;
581
582 s->buffer_attr.maxlength = maxlength;
583 s->buffer_attr.fragsize = fragsize;
584 s->buffer_attr.tlength = tlength;
585 s->buffer_attr.prebuf = prebuf;
586 s->buffer_attr.minreq = minreq;
587 }
588
589 pa_xfree(s->device_name);
590 s->device_name = pa_xstrdup(dn);
591 s->device_index = di;
592
593 s->suspended = suspended;
594
595 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
596 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
597 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
598 request_auto_timing_update(s, true);
599 }
600
601 check_smoother_status(s, true, false, false);
602 request_auto_timing_update(s, true);
603
604 if (s->moved_callback)
605 s->moved_callback(s, s->moved_userdata);
606
607 finish:
608 pa_context_unref(c);
609 }
610
pa_command_stream_buffer_attr(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)611 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
612 pa_context *c = userdata;
613 pa_stream *s;
614 uint32_t channel;
615 pa_usec_t usec = 0;
616 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
617
618 pa_assert(pd);
619 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
620 pa_assert(t);
621 pa_assert(c);
622 pa_assert(PA_REFCNT_VALUE(c) >= 1);
623
624 pa_context_ref(c);
625
626 if (c->version < 15) {
627 pa_context_fail(c, PA_ERR_PROTOCOL);
628 goto finish;
629 }
630
631 if (pa_tagstruct_getu32(t, &channel) < 0) {
632 pa_context_fail(c, PA_ERR_PROTOCOL);
633 goto finish;
634 }
635
636 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
637 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
638 pa_tagstruct_getu32(t, &fragsize) < 0 ||
639 pa_tagstruct_get_usec(t, &usec) < 0) {
640 pa_context_fail(c, PA_ERR_PROTOCOL);
641 goto finish;
642 }
643 } else {
644 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
645 pa_tagstruct_getu32(t, &tlength) < 0 ||
646 pa_tagstruct_getu32(t, &prebuf) < 0 ||
647 pa_tagstruct_getu32(t, &minreq) < 0 ||
648 pa_tagstruct_get_usec(t, &usec) < 0) {
649 pa_context_fail(c, PA_ERR_PROTOCOL);
650 goto finish;
651 }
652 }
653
654 if (!pa_tagstruct_eof(t)) {
655 pa_context_fail(c, PA_ERR_PROTOCOL);
656 goto finish;
657 }
658
659 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
660 goto finish;
661
662 if (s->state != PA_STREAM_READY)
663 goto finish;
664
665 if (s->direction == PA_STREAM_RECORD)
666 s->timing_info.configured_source_usec = usec;
667 else
668 s->timing_info.configured_sink_usec = usec;
669
670 s->buffer_attr.maxlength = maxlength;
671 s->buffer_attr.fragsize = fragsize;
672 s->buffer_attr.tlength = tlength;
673 s->buffer_attr.prebuf = prebuf;
674 s->buffer_attr.minreq = minreq;
675
676 request_auto_timing_update(s, true);
677
678 if (s->buffer_attr_callback)
679 s->buffer_attr_callback(s, s->buffer_attr_userdata);
680
681 finish:
682 pa_context_unref(c);
683 }
684
pa_command_stream_suspended(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)685 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
686 pa_context *c = userdata;
687 pa_stream *s;
688 uint32_t channel;
689 bool suspended;
690
691 pa_assert(pd);
692 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
693 pa_assert(t);
694 pa_assert(c);
695 pa_assert(PA_REFCNT_VALUE(c) >= 1);
696
697 pa_context_ref(c);
698
699 if (c->version < 12) {
700 pa_context_fail(c, PA_ERR_PROTOCOL);
701 goto finish;
702 }
703
704 if (pa_tagstruct_getu32(t, &channel) < 0 ||
705 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
706 !pa_tagstruct_eof(t)) {
707 pa_context_fail(c, PA_ERR_PROTOCOL);
708 goto finish;
709 }
710
711 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
712 goto finish;
713
714 if (s->state != PA_STREAM_READY)
715 goto finish;
716
717 s->suspended = suspended;
718
719 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
720 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
721 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
722 request_auto_timing_update(s, true);
723 }
724
725 check_smoother_status(s, true, false, false);
726 request_auto_timing_update(s, true);
727
728 if (s->suspended_callback)
729 s->suspended_callback(s, s->suspended_userdata);
730
731 finish:
732 pa_context_unref(c);
733 }
734
pa_command_stream_started(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)735 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
736 pa_context *c = userdata;
737 pa_stream *s;
738 uint32_t channel;
739
740 pa_assert(pd);
741 pa_assert(command == PA_COMMAND_STARTED);
742 pa_assert(t);
743 pa_assert(c);
744 pa_assert(PA_REFCNT_VALUE(c) >= 1);
745
746 pa_context_ref(c);
747
748 if (c->version < 13) {
749 pa_context_fail(c, PA_ERR_PROTOCOL);
750 goto finish;
751 }
752
753 if (pa_tagstruct_getu32(t, &channel) < 0 ||
754 !pa_tagstruct_eof(t)) {
755 pa_context_fail(c, PA_ERR_PROTOCOL);
756 goto finish;
757 }
758
759 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
760 goto finish;
761
762 if (s->state != PA_STREAM_READY)
763 goto finish;
764
765 check_smoother_status(s, true, true, false);
766 request_auto_timing_update(s, true);
767
768 if (s->started_callback)
769 s->started_callback(s, s->started_userdata);
770
771 finish:
772 pa_context_unref(c);
773 }
774
pa_command_stream_event(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)775 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
776 pa_context *c = userdata;
777 pa_stream *s;
778 uint32_t channel;
779 pa_proplist *pl = NULL;
780 const char *event;
781
782 pa_assert(pd);
783 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
784 pa_assert(t);
785 pa_assert(c);
786 pa_assert(PA_REFCNT_VALUE(c) >= 1);
787
788 pa_context_ref(c);
789
790 if (c->version < 15) {
791 pa_context_fail(c, PA_ERR_PROTOCOL);
792 goto finish;
793 }
794
795 pl = pa_proplist_new();
796
797 if (pa_tagstruct_getu32(t, &channel) < 0 ||
798 pa_tagstruct_gets(t, &event) < 0 ||
799 pa_tagstruct_get_proplist(t, pl) < 0 ||
800 !pa_tagstruct_eof(t) || !event) {
801 pa_context_fail(c, PA_ERR_PROTOCOL);
802 goto finish;
803 }
804
805 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
806 goto finish;
807
808 if (s->state != PA_STREAM_READY)
809 goto finish;
810
811 if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) {
812 /* Let client know what the running time was when the stream had to be killed */
813 pa_usec_t stream_time;
814 if (pa_stream_get_time(s, &stream_time) == 0)
815 pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) stream_time);
816 }
817
818 if (s->event_callback)
819 s->event_callback(s, event, pl, s->event_userdata);
820
821 finish:
822 pa_context_unref(c);
823
824 if (pl)
825 pa_proplist_free(pl);
826 }
827
pa_command_request(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)828 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
829 pa_stream *s;
830 pa_context *c = userdata;
831 uint32_t bytes, channel;
832
833 pa_assert(pd);
834 pa_assert(command == PA_COMMAND_REQUEST);
835 pa_assert(t);
836 pa_assert(c);
837 pa_assert(PA_REFCNT_VALUE(c) >= 1);
838
839 pa_context_ref(c);
840
841 if (pa_tagstruct_getu32(t, &channel) < 0 ||
842 pa_tagstruct_getu32(t, &bytes) < 0 ||
843 !pa_tagstruct_eof(t)) {
844 pa_context_fail(c, PA_ERR_PROTOCOL);
845 goto finish;
846 }
847
848 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
849 goto finish;
850
851 if (s->state != PA_STREAM_READY)
852 goto finish;
853
854 s->requested_bytes += bytes;
855
856 #ifdef STREAM_DEBUG
857 pa_log_debug("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes);
858 #endif
859
860 if (s->requested_bytes > 0 && s->write_callback)
861 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
862
863 finish:
864 pa_context_unref(c);
865 }
866
pa_stream_get_underflow_index(const pa_stream * p)867 int64_t pa_stream_get_underflow_index(const pa_stream *p) {
868 pa_assert(p);
869 return p->latest_underrun_at_index;
870 }
871
pa_command_overflow_or_underflow(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)872 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
873 pa_stream *s;
874 pa_context *c = userdata;
875 uint32_t channel;
876 int64_t offset = -1;
877
878 pa_assert(pd);
879 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
880 pa_assert(t);
881 pa_assert(c);
882 pa_assert(PA_REFCNT_VALUE(c) >= 1);
883
884 pa_context_ref(c);
885
886 if (pa_tagstruct_getu32(t, &channel) < 0) {
887 pa_context_fail(c, PA_ERR_PROTOCOL);
888 goto finish;
889 }
890
891 if (c->version >= 23 && command == PA_COMMAND_UNDERFLOW) {
892 if (pa_tagstruct_gets64(t, &offset) < 0) {
893 pa_context_fail(c, PA_ERR_PROTOCOL);
894 goto finish;
895 }
896 }
897
898 if (!pa_tagstruct_eof(t)) {
899 pa_context_fail(c, PA_ERR_PROTOCOL);
900 goto finish;
901 }
902
903 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
904 goto finish;
905
906 if (s->state != PA_STREAM_READY)
907 goto finish;
908
909 if (offset != -1)
910 s->latest_underrun_at_index = offset;
911
912 if (s->buffer_attr.prebuf > 0)
913 check_smoother_status(s, true, false, true);
914
915 request_auto_timing_update(s, true);
916
917 if (command == PA_COMMAND_OVERFLOW) {
918 if (s->overflow_callback)
919 s->overflow_callback(s, s->overflow_userdata);
920 } else if (command == PA_COMMAND_UNDERFLOW) {
921 if (s->underflow_callback)
922 s->underflow_callback(s, s->underflow_userdata);
923 }
924
925 finish:
926 pa_context_unref(c);
927 }
928
invalidate_indexes(pa_stream * s,bool r,bool w)929 static void invalidate_indexes(pa_stream *s, bool r, bool w) {
930 pa_assert(s);
931 pa_assert(PA_REFCNT_VALUE(s) >= 1);
932
933 #ifdef STREAM_DEBUG
934 pa_log_debug("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag);
935 #endif
936
937 if (s->state != PA_STREAM_READY)
938 return;
939
940 if (w) {
941 s->write_index_not_before = s->context->ctag;
942
943 if (s->timing_info_valid)
944 s->timing_info.write_index_corrupt = true;
945
946 #ifdef STREAM_DEBUG
947 pa_log_debug("write_index invalidated");
948 #endif
949 }
950
951 if (r) {
952 s->read_index_not_before = s->context->ctag;
953
954 if (s->timing_info_valid)
955 s->timing_info.read_index_corrupt = true;
956
957 #ifdef STREAM_DEBUG
958 pa_log_debug("read_index invalidated");
959 #endif
960 }
961
962 request_auto_timing_update(s, true);
963 }
964
auto_timing_update_callback(pa_mainloop_api * m,pa_time_event * e,const struct timeval * t,void * userdata)965 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
966 pa_stream *s = userdata;
967
968 pa_assert(s);
969 pa_assert(PA_REFCNT_VALUE(s) >= 1);
970
971 pa_stream_ref(s);
972 request_auto_timing_update(s, false);
973 pa_stream_unref(s);
974 }
975
create_stream_complete(pa_stream * s)976 static void create_stream_complete(pa_stream *s) {
977 pa_assert(s);
978 pa_assert(PA_REFCNT_VALUE(s) >= 1);
979 pa_assert(s->state == PA_STREAM_CREATING);
980
981 pa_stream_set_state(s, PA_STREAM_READY);
982
983 if (s->requested_bytes > 0 && s->write_callback)
984 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
985
986 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
987 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
988 pa_assert(!s->auto_timing_update_event);
989 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
990
991 request_auto_timing_update(s, true);
992 }
993
994 check_smoother_status(s, true, false, false);
995 }
996
patch_buffer_attr(pa_stream * s,pa_buffer_attr * attr,pa_stream_flags_t * flags)997 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
998 const char *e;
999
1000 pa_assert(s);
1001 pa_assert(attr);
1002
1003 if ((e = getenv("PULSE_LATENCY_MSEC"))) {
1004 uint32_t ms;
1005 pa_sample_spec ss;
1006
1007 pa_sample_spec_init(&ss);
1008
1009 if (pa_sample_spec_valid(&s->sample_spec))
1010 ss = s->sample_spec;
1011 else if (s->n_formats == 1)
1012 pa_format_info_to_sample_spec(s->req_formats[0], &ss, NULL);
1013
1014 if (pa_atou(e, &ms) < 0 || ms <= 0)
1015 pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
1016 else if (!pa_sample_spec_valid(&s->sample_spec))
1017 pa_log_debug("Ignoring $PULSE_LATENCY_MSEC: %s (invalid sample spec)", e);
1018 else {
1019 attr->maxlength = (uint32_t) -1;
1020 attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &ss);
1021 attr->minreq = (uint32_t) -1;
1022 attr->prebuf = (uint32_t) -1;
1023 attr->fragsize = attr->tlength;
1024
1025 if (flags)
1026 *flags |= PA_STREAM_ADJUST_LATENCY;
1027 }
1028 }
1029
1030 if (s->context->version >= 13)
1031 return;
1032
1033 /* Version older than 0.9.10 didn't do server side buffer_attr
1034 * selection, hence we have to fake it on the client side. */
1035
1036 /* We choose fairly conservative values here, to not confuse
1037 * old clients with extremely large playback buffers */
1038
1039 if (attr->maxlength == (uint32_t) -1)
1040 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
1041
1042 if (attr->tlength == (uint32_t) -1)
1043 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
1044
1045 if (attr->minreq == (uint32_t) -1)
1046 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
1047
1048 if (attr->prebuf == (uint32_t) -1)
1049 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
1050
1051 if (attr->fragsize == (uint32_t) -1)
1052 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
1053 }
1054
pa_create_stream_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1055 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1056 pa_stream *s = userdata;
1057 uint32_t requested_bytes = 0;
1058
1059 pa_assert(pd);
1060 pa_assert(s);
1061 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1062 pa_assert(s->state == PA_STREAM_CREATING);
1063
1064 pa_stream_ref(s);
1065
1066 if (command != PA_COMMAND_REPLY) {
1067 if (pa_context_handle_error(s->context, command, t, false) < 0)
1068 goto finish;
1069
1070 pa_stream_set_state(s, PA_STREAM_FAILED);
1071 goto finish;
1072 }
1073
1074 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
1075 s->channel == PA_INVALID_INDEX ||
1076 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
1077 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
1078 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1079 goto finish;
1080 }
1081
1082 s->requested_bytes = (int64_t) requested_bytes;
1083
1084 if (s->context->version >= 9) {
1085 if (s->direction == PA_STREAM_PLAYBACK) {
1086 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1087 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1088 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1089 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1090 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1091 goto finish;
1092 }
1093 } else if (s->direction == PA_STREAM_RECORD) {
1094 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1095 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1096 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1097 goto finish;
1098 }
1099 }
1100 }
1101
1102 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1103 pa_sample_spec ss;
1104 pa_channel_map cm;
1105 const char *dn = NULL;
1106 bool suspended;
1107
1108 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1109 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1110 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1111 pa_tagstruct_gets(t, &dn) < 0 ||
1112 pa_tagstruct_get_boolean(t, &suspended) < 0) {
1113 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1114 goto finish;
1115 }
1116
1117 if (!dn || s->device_index == PA_INVALID_INDEX ||
1118 ss.channels != cm.channels ||
1119 !pa_channel_map_valid(&cm) ||
1120 !pa_sample_spec_valid(&ss) ||
1121 (s->n_formats == 0 && (
1122 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1123 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1124 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1125 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1126 goto finish;
1127 }
1128
1129 pa_xfree(s->device_name);
1130 s->device_name = pa_xstrdup(dn);
1131 s->suspended = suspended;
1132
1133 s->channel_map = cm;
1134 s->sample_spec = ss;
1135 }
1136
1137 #ifdef USE_SMOOTHER_2
1138 if (s->flags & PA_STREAM_INTERPOLATE_TIMING)
1139 pa_smoother_2_set_sample_spec(s->smoother, pa_rtclock_now(), &s->sample_spec);
1140 #endif
1141
1142 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1143 pa_usec_t usec;
1144
1145 if (pa_tagstruct_get_usec(t, &usec) < 0) {
1146 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1147 goto finish;
1148 }
1149
1150 if (s->direction == PA_STREAM_RECORD)
1151 s->timing_info.configured_source_usec = usec;
1152 else
1153 s->timing_info.configured_sink_usec = usec;
1154 }
1155
1156 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1157 || s->context->version >= 22) {
1158
1159 pa_format_info *f = pa_format_info_new();
1160
1161 if (pa_tagstruct_get_format_info(t, f) < 0 || !pa_format_info_valid(f)) {
1162 pa_format_info_free(f);
1163 if (s->n_formats > 0) {
1164 /* We used the extended API, so we should have got back a proper format */
1165 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1166 goto finish;
1167 }
1168 } else
1169 s->format = f;
1170 }
1171
1172 if (!pa_tagstruct_eof(t)) {
1173 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1174 goto finish;
1175 }
1176
1177 if (s->direction == PA_STREAM_RECORD) {
1178 pa_assert(!s->record_memblockq);
1179
1180 s->record_memblockq = pa_memblockq_new(
1181 "client side record memblockq",
1182 0,
1183 s->buffer_attr.maxlength,
1184 0,
1185 &s->sample_spec,
1186 1,
1187 0,
1188 0,
1189 NULL);
1190 }
1191
1192 s->channel_valid = true;
1193 pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1194
1195 create_stream_complete(s);
1196
1197 finish:
1198 pa_stream_unref(s);
1199 }
1200
create_stream(pa_stream_direction_t direction,pa_stream * s,const char * dev,const pa_buffer_attr * attr,pa_stream_flags_t flags,const pa_cvolume * volume,pa_stream * sync_stream)1201 static int create_stream(
1202 pa_stream_direction_t direction,
1203 pa_stream *s,
1204 const char *dev,
1205 const pa_buffer_attr *attr,
1206 pa_stream_flags_t flags,
1207 const pa_cvolume *volume,
1208 pa_stream *sync_stream) {
1209
1210 pa_tagstruct *t;
1211 uint32_t tag;
1212 bool volume_set = !!volume;
1213 pa_cvolume cv;
1214 uint32_t i;
1215
1216 pa_assert(s);
1217 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1218 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1219
1220 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1221 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1222 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1223 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1224 PA_STREAM_INTERPOLATE_TIMING|
1225 PA_STREAM_NOT_MONOTONIC|
1226 PA_STREAM_AUTO_TIMING_UPDATE|
1227 PA_STREAM_NO_REMAP_CHANNELS|
1228 PA_STREAM_NO_REMIX_CHANNELS|
1229 PA_STREAM_FIX_FORMAT|
1230 PA_STREAM_FIX_RATE|
1231 PA_STREAM_FIX_CHANNELS|
1232 PA_STREAM_DONT_MOVE|
1233 PA_STREAM_VARIABLE_RATE|
1234 PA_STREAM_PEAK_DETECT|
1235 PA_STREAM_START_MUTED|
1236 PA_STREAM_ADJUST_LATENCY|
1237 PA_STREAM_EARLY_REQUESTS|
1238 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1239 PA_STREAM_START_UNMUTED|
1240 PA_STREAM_FAIL_ON_SUSPEND|
1241 PA_STREAM_RELATIVE_VOLUME|
1242 PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1243
1244 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1245 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1246 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1247 /* Although some of the other flags are not supported on older
1248 * version, we don't check for them here, because it doesn't hurt
1249 * when they are passed but actually not supported. This makes
1250 * client development easier */
1251
1252 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1253 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1254 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1255
1256 pa_stream_ref(s);
1257
1258 s->direction = direction;
1259
1260 if (sync_stream)
1261 s->syncid = sync_stream->syncid;
1262
1263 if (attr)
1264 s->buffer_attr = *attr;
1265 patch_buffer_attr(s, &s->buffer_attr, &flags);
1266
1267 s->flags = flags;
1268 s->corked = !!(flags & PA_STREAM_START_CORKED);
1269
1270 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1271 pa_usec_t x;
1272
1273 x = pa_rtclock_now();
1274
1275 pa_assert(!s->smoother);
1276 #ifdef USE_SMOOTHER_2
1277 s->smoother = pa_smoother_2_new(SMOOTHER_HISTORY_TIME, x, 0, 0);
1278 #else
1279 s->smoother = pa_smoother_new(
1280 SMOOTHER_ADJUST_TIME,
1281 SMOOTHER_HISTORY_TIME,
1282 !(flags & PA_STREAM_NOT_MONOTONIC),
1283 true,
1284 SMOOTHER_MIN_HISTORY,
1285 x,
1286 true);
1287 #endif
1288 }
1289
1290 if (!dev)
1291 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1292
1293 t = pa_tagstruct_command(
1294 s->context,
1295 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1296 &tag);
1297
1298 if (s->context->version < 13)
1299 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1300
1301 pa_tagstruct_put(
1302 t,
1303 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1304 PA_TAG_CHANNEL_MAP, &s->channel_map,
1305 PA_TAG_U32, PA_INVALID_INDEX,
1306 PA_TAG_STRING, dev,
1307 PA_TAG_U32, s->buffer_attr.maxlength,
1308 PA_TAG_BOOLEAN, s->corked,
1309 PA_TAG_INVALID);
1310
1311 if (!volume) {
1312 if (pa_sample_spec_valid(&s->sample_spec))
1313 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1314 else {
1315 /* This is not really relevant, since no volume was set, and
1316 * the real number of channels is embedded in the format_info
1317 * structure */
1318 volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1319 }
1320 }
1321
1322 if (s->direction == PA_STREAM_PLAYBACK) {
1323 pa_tagstruct_put(
1324 t,
1325 PA_TAG_U32, s->buffer_attr.tlength,
1326 PA_TAG_U32, s->buffer_attr.prebuf,
1327 PA_TAG_U32, s->buffer_attr.minreq,
1328 PA_TAG_U32, s->syncid,
1329 PA_TAG_INVALID);
1330
1331 pa_tagstruct_put_cvolume(t, volume);
1332 } else
1333 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1334
1335 if (s->context->version >= 12) {
1336 pa_tagstruct_put(
1337 t,
1338 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1339 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1340 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1341 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1342 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1343 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1344 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1345 PA_TAG_INVALID);
1346 }
1347
1348 if (s->context->version >= 13) {
1349
1350 if (s->direction == PA_STREAM_PLAYBACK)
1351 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1352 else
1353 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1354
1355 pa_tagstruct_put(
1356 t,
1357 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1358 PA_TAG_PROPLIST, s->proplist,
1359 PA_TAG_INVALID);
1360
1361 if (s->direction == PA_STREAM_RECORD)
1362 pa_tagstruct_putu32(t, s->direct_on_input);
1363 }
1364
1365 if (s->context->version >= 14) {
1366
1367 if (s->direction == PA_STREAM_PLAYBACK)
1368 pa_tagstruct_put_boolean(t, volume_set);
1369
1370 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1371 }
1372
1373 if (s->context->version >= 15) {
1374
1375 if (s->direction == PA_STREAM_PLAYBACK)
1376 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1377
1378 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1379 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1380 }
1381
1382 if (s->context->version >= 17 && s->direction == PA_STREAM_PLAYBACK)
1383 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1384
1385 if (s->context->version >= 18 && s->direction == PA_STREAM_PLAYBACK)
1386 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1387
1388 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1389 || s->context->version >= 22) {
1390
1391 pa_tagstruct_putu8(t, s->n_formats);
1392 for (i = 0; i < s->n_formats; i++)
1393 pa_tagstruct_put_format_info(t, s->req_formats[i]);
1394 }
1395
1396 if (s->context->version >= 22 && s->direction == PA_STREAM_RECORD) {
1397 pa_tagstruct_put_cvolume(t, volume);
1398 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1399 pa_tagstruct_put_boolean(t, volume_set);
1400 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1401 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1402 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1403 }
1404
1405 pa_pstream_send_tagstruct(s->context->pstream, t);
1406 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1407
1408 pa_stream_set_state(s, PA_STREAM_CREATING);
1409
1410 pa_stream_unref(s);
1411 return 0;
1412 }
1413
pa_stream_connect_playback(pa_stream * s,const char * dev,const pa_buffer_attr * attr,pa_stream_flags_t flags,const pa_cvolume * volume,pa_stream * sync_stream)1414 int pa_stream_connect_playback(
1415 pa_stream *s,
1416 const char *dev,
1417 const pa_buffer_attr *attr,
1418 pa_stream_flags_t flags,
1419 const pa_cvolume *volume,
1420 pa_stream *sync_stream) {
1421
1422 pa_assert(s);
1423 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1424
1425 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1426 }
1427
pa_stream_connect_record(pa_stream * s,const char * dev,const pa_buffer_attr * attr,pa_stream_flags_t flags)1428 int pa_stream_connect_record(
1429 pa_stream *s,
1430 const char *dev,
1431 const pa_buffer_attr *attr,
1432 pa_stream_flags_t flags) {
1433
1434 pa_assert(s);
1435 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1436
1437 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1438 }
1439
pa_stream_begin_write(pa_stream * s,void ** data,size_t * nbytes)1440 int pa_stream_begin_write(
1441 pa_stream *s,
1442 void **data,
1443 size_t *nbytes) {
1444
1445 pa_assert(s);
1446 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1447
1448 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1449 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1450 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1451 PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1452 PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1453
1454 if (*nbytes != (size_t) -1) {
1455 size_t m, fs;
1456
1457 m = pa_mempool_block_size_max(s->context->mempool);
1458 fs = pa_frame_size(&s->sample_spec);
1459
1460 m = (m / fs) * fs;
1461 if (*nbytes > m)
1462 *nbytes = m;
1463 }
1464
1465 if (!s->write_memblock) {
1466 s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1467 s->write_data = pa_memblock_acquire(s->write_memblock);
1468 }
1469
1470 *data = s->write_data;
1471 *nbytes = pa_memblock_get_length(s->write_memblock);
1472
1473 return 0;
1474 }
1475
pa_stream_cancel_write(pa_stream * s)1476 int pa_stream_cancel_write(
1477 pa_stream *s) {
1478
1479 pa_assert(s);
1480 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1481
1482 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1483 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1484 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1485 PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1486
1487 pa_assert(s->write_data);
1488
1489 pa_memblock_release(s->write_memblock);
1490 pa_memblock_unref(s->write_memblock);
1491 s->write_memblock = NULL;
1492 s->write_data = NULL;
1493
1494 return 0;
1495 }
1496
pa_stream_write_ext_free(pa_stream * s,const void * data,size_t length,pa_free_cb_t free_cb,void * free_cb_data,int64_t offset,pa_seek_mode_t seek)1497 int pa_stream_write_ext_free(
1498 pa_stream *s,
1499 const void *data,
1500 size_t length,
1501 pa_free_cb_t free_cb,
1502 void *free_cb_data,
1503 int64_t offset,
1504 pa_seek_mode_t seek) {
1505
1506 pa_assert(s);
1507 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1508 pa_assert(data);
1509
1510 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1511 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1512 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1513 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1514 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1515 PA_CHECK_VALIDITY(s->context,
1516 !s->write_memblock ||
1517 ((data >= s->write_data) &&
1518 ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1519 PA_ERR_INVALID);
1520 PA_CHECK_VALIDITY(s->context, offset % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID);
1521 PA_CHECK_VALIDITY(s->context, length % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID);
1522 PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1523
1524 if (s->write_memblock) {
1525 pa_memchunk chunk;
1526
1527 /* pa_stream_write_begin() was called before */
1528
1529 pa_memblock_release(s->write_memblock);
1530
1531 chunk.memblock = s->write_memblock;
1532 chunk.index = (const char *) data - (const char *) s->write_data;
1533 chunk.length = length;
1534
1535 s->write_memblock = NULL;
1536 s->write_data = NULL;
1537
1538 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1539 pa_memblock_unref(chunk.memblock);
1540
1541 } else {
1542 pa_seek_mode_t t_seek = seek;
1543 int64_t t_offset = offset;
1544 size_t t_length = length;
1545 const void *t_data = data;
1546
1547 /* pa_stream_write_begin() was not called before */
1548
1549 while (t_length > 0) {
1550 pa_memchunk chunk;
1551
1552 chunk.index = 0;
1553
1554 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1555 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, free_cb_data, 1);
1556 chunk.length = t_length;
1557 } else {
1558 void *d;
1559 size_t blk_size_max;
1560
1561 /* Break large audio streams into _aligned_ blocks or the
1562 * other endpoint will happily discard them upon arrival. */
1563 blk_size_max = pa_frame_align(pa_mempool_block_size_max(s->context->mempool), &s->sample_spec);
1564 chunk.length = PA_MIN(t_length, blk_size_max);
1565 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1566
1567 d = pa_memblock_acquire(chunk.memblock);
1568 memcpy(d, t_data, chunk.length);
1569 pa_memblock_release(chunk.memblock);
1570 }
1571
1572 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1573
1574 t_offset = 0;
1575 t_seek = PA_SEEK_RELATIVE;
1576
1577 t_data = (const uint8_t*) t_data + chunk.length;
1578 t_length -= chunk.length;
1579
1580 pa_memblock_unref(chunk.memblock);
1581 }
1582
1583 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1584 free_cb(free_cb_data);
1585 }
1586
1587 /* This is obviously wrong since we ignore the seeking index . But
1588 * that's OK, the server side applies the same error */
1589 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1590
1591 #ifdef STREAM_DEBUG
1592 pa_log_debug("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes);
1593 #endif
1594
1595 if (s->direction == PA_STREAM_PLAYBACK) {
1596
1597 /* Update latency request correction */
1598 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1599
1600 if (seek == PA_SEEK_ABSOLUTE) {
1601 s->write_index_corrections[s->current_write_index_correction].corrupt = false;
1602 s->write_index_corrections[s->current_write_index_correction].absolute = true;
1603 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1604 } else if (seek == PA_SEEK_RELATIVE) {
1605 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1606 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1607 } else
1608 s->write_index_corrections[s->current_write_index_correction].corrupt = true;
1609 }
1610
1611 /* Update the write index in the already available latency data */
1612 if (s->timing_info_valid) {
1613
1614 if (seek == PA_SEEK_ABSOLUTE) {
1615 s->timing_info.write_index_corrupt = false;
1616 s->timing_info.write_index = offset + (int64_t) length;
1617 } else if (seek == PA_SEEK_RELATIVE) {
1618 if (!s->timing_info.write_index_corrupt)
1619 s->timing_info.write_index += offset + (int64_t) length;
1620 } else
1621 s->timing_info.write_index_corrupt = true;
1622 }
1623
1624 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1625 request_auto_timing_update(s, true);
1626 }
1627
1628 return 0;
1629 }
1630
pa_stream_write(pa_stream * s,const void * data,size_t length,pa_free_cb_t free_cb,int64_t offset,pa_seek_mode_t seek)1631 int pa_stream_write(
1632 pa_stream *s,
1633 const void *data,
1634 size_t length,
1635 pa_free_cb_t free_cb,
1636 int64_t offset,
1637 pa_seek_mode_t seek) {
1638
1639 return pa_stream_write_ext_free(s, data, length, free_cb, (void*) data, offset, seek);
1640 }
1641
pa_stream_peek(pa_stream * s,const void ** data,size_t * length)1642 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1643 pa_assert(s);
1644 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1645 pa_assert(data);
1646 pa_assert(length);
1647
1648 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1649 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1650 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1651
1652 if (!s->peek_memchunk.memblock) {
1653
1654 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1655 /* record_memblockq is empty. */
1656 *data = NULL;
1657 *length = 0;
1658 return 0;
1659
1660 } else if (!s->peek_memchunk.memblock) {
1661 /* record_memblockq isn't empty, but it doesn't have any data at
1662 * the current read index. */
1663 *data = NULL;
1664 *length = s->peek_memchunk.length;
1665 return 0;
1666 }
1667
1668 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1669 }
1670
1671 pa_assert(s->peek_data);
1672 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1673 *length = s->peek_memchunk.length;
1674 return 0;
1675 }
1676
pa_stream_drop(pa_stream * s)1677 int pa_stream_drop(pa_stream *s) {
1678 pa_assert(s);
1679 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1680
1681 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1682 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1683 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1684 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.length > 0, PA_ERR_BADSTATE);
1685
1686 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1687
1688 /* Fix the simulated local read index */
1689 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1690 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1691
1692 if (s->peek_memchunk.memblock) {
1693 pa_assert(s->peek_data);
1694 s->peek_data = NULL;
1695 pa_memblock_release(s->peek_memchunk.memblock);
1696 pa_memblock_unref(s->peek_memchunk.memblock);
1697 }
1698
1699 pa_memchunk_reset(&s->peek_memchunk);
1700
1701 return 0;
1702 }
1703
pa_stream_writable_size(const pa_stream * s)1704 size_t pa_stream_writable_size(const pa_stream *s) {
1705 pa_assert(s);
1706 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1707
1708 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1709 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1710 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1711
1712 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1713 }
1714
pa_stream_readable_size(const pa_stream * s)1715 size_t pa_stream_readable_size(const pa_stream *s) {
1716 pa_assert(s);
1717 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1718
1719 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1720 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1721 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1722
1723 return pa_memblockq_get_length(s->record_memblockq);
1724 }
1725
pa_stream_drain(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)1726 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1727 pa_operation *o;
1728 pa_tagstruct *t;
1729 uint32_t tag;
1730
1731 pa_assert(s);
1732 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1733
1734 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1735 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1736 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1737
1738 /* Ask for a timing update before we cork/uncork to get the best
1739 * accuracy for the transport latency suitable for the
1740 * check_smoother_status() call in the started callback */
1741 request_auto_timing_update(s, true);
1742
1743 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1744
1745 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1746 pa_tagstruct_putu32(t, s->channel);
1747 pa_pstream_send_tagstruct(s->context->pstream, t);
1748 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1749
1750 /* This might cause the read index to continue again, hence
1751 * let's request a timing update */
1752 request_auto_timing_update(s, true);
1753
1754 return o;
1755 }
1756
calc_time(const pa_stream * s,bool ignore_transport)1757 static pa_usec_t calc_time(const pa_stream *s, bool ignore_transport) {
1758 pa_usec_t usec;
1759
1760 pa_assert(s);
1761 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1762 pa_assert(s->state == PA_STREAM_READY);
1763 pa_assert(s->direction != PA_STREAM_UPLOAD);
1764 pa_assert(s->timing_info_valid);
1765 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1766 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1767
1768 if (s->direction == PA_STREAM_PLAYBACK) {
1769 /* The last byte that was written into the output device
1770 * had this time value associated */
1771 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1772
1773 if (!s->corked && !s->suspended) {
1774
1775 if (!ignore_transport)
1776 /* Because the latency info took a little time to come
1777 * to us, we assume that the real output time is actually
1778 * a little ahead */
1779 usec += s->timing_info.transport_usec;
1780
1781 /* However, the output device usually maintains a buffer
1782 too, hence the real sample currently played is a little
1783 back */
1784 if (s->timing_info.sink_usec >= usec)
1785 usec = 0;
1786 else
1787 usec -= s->timing_info.sink_usec;
1788 }
1789
1790 } else {
1791 pa_assert(s->direction == PA_STREAM_RECORD);
1792
1793 /* The last byte written into the server side queue had
1794 * this time value associated */
1795 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1796
1797 if (!s->corked && !s->suspended) {
1798
1799 if (!ignore_transport)
1800 /* Add transport latency */
1801 usec += s->timing_info.transport_usec;
1802
1803 /* Add latency of data in device buffer */
1804 usec += s->timing_info.source_usec;
1805
1806 /* If this is a monitor source, we need to correct the
1807 * time by the playback device buffer */
1808 if (s->timing_info.sink_usec >= usec)
1809 usec = 0;
1810 else
1811 usec -= s->timing_info.sink_usec;
1812 }
1813 }
1814
1815 return usec;
1816 }
1817
1818 #ifdef USE_SMOOTHER_2
calc_bytes(pa_stream * s,bool ignore_transport)1819 static inline uint64_t calc_bytes(pa_stream *s, bool ignore_transport) {
1820 return (uint64_t)(calc_time(s, ignore_transport) * s->sample_spec.rate / PA_USEC_PER_SEC * pa_frame_size(&s->sample_spec));
1821 }
1822 #endif
1823
stream_get_timing_info_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1824 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1825 pa_operation *o = userdata;
1826 struct timeval local, remote, now;
1827 pa_timing_info *i;
1828 bool playing = false;
1829 uint64_t underrun_for = 0, playing_for = 0;
1830
1831 pa_assert(pd);
1832 pa_assert(o);
1833 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1834
1835 if (!o->context || !o->stream)
1836 goto finish;
1837
1838 i = &o->stream->timing_info;
1839
1840 o->stream->timing_info_valid = false;
1841 i->write_index_corrupt = true;
1842 i->read_index_corrupt = true;
1843
1844 if (command != PA_COMMAND_REPLY) {
1845 if (pa_context_handle_error(o->context, command, t, false) < 0)
1846 goto finish;
1847
1848 } else {
1849
1850 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1851 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1852 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1853 pa_tagstruct_get_timeval(t, &local) < 0 ||
1854 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1855 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1856 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1857
1858 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1859 goto finish;
1860 }
1861
1862 if (o->context->version >= 13 &&
1863 o->stream->direction == PA_STREAM_PLAYBACK)
1864 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1865 pa_tagstruct_getu64(t, &playing_for) < 0) {
1866
1867 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1868 goto finish;
1869 }
1870
1871 if (!pa_tagstruct_eof(t)) {
1872 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1873 goto finish;
1874 }
1875 o->stream->timing_info_valid = true;
1876 i->write_index_corrupt = false;
1877 i->read_index_corrupt = false;
1878
1879 i->playing = (int) playing;
1880 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1881
1882 pa_gettimeofday(&now);
1883
1884 /* Calculate timestamps */
1885 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1886 /* local and remote seem to have synchronized clocks */
1887
1888 if (o->stream->direction == PA_STREAM_PLAYBACK)
1889 i->transport_usec = pa_timeval_diff(&remote, &local);
1890 else
1891 i->transport_usec = pa_timeval_diff(&now, &remote);
1892
1893 i->synchronized_clocks = true;
1894 i->timestamp = remote;
1895 } else {
1896 /* clocks are not synchronized, let's estimate latency then */
1897 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1898 i->synchronized_clocks = false;
1899 i->timestamp = local;
1900 pa_timeval_add(&i->timestamp, i->transport_usec);
1901 }
1902
1903 /* Invalidate read and write indexes if necessary */
1904 if (tag < o->stream->read_index_not_before)
1905 i->read_index_corrupt = true;
1906
1907 if (tag < o->stream->write_index_not_before)
1908 i->write_index_corrupt = true;
1909
1910 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1911 /* Write index correction */
1912
1913 int n, j;
1914 uint32_t ctag = tag;
1915
1916 /* Go through the saved correction values and add up the
1917 * total correction.*/
1918 for (n = 0, j = o->stream->current_write_index_correction+1;
1919 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1920 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1921
1922 /* Step over invalid data or out-of-date data */
1923 if (!o->stream->write_index_corrections[j].valid ||
1924 o->stream->write_index_corrections[j].tag < ctag)
1925 continue;
1926
1927 /* Make sure that everything is in order */
1928 ctag = o->stream->write_index_corrections[j].tag+1;
1929
1930 /* Now fix the write index */
1931 if (o->stream->write_index_corrections[j].corrupt) {
1932 /* A corrupting seek was made */
1933 i->write_index_corrupt = true;
1934 } else if (o->stream->write_index_corrections[j].absolute) {
1935 /* An absolute seek was made */
1936 i->write_index = o->stream->write_index_corrections[j].value;
1937 i->write_index_corrupt = false;
1938 } else if (!i->write_index_corrupt) {
1939 /* A relative seek was made */
1940 i->write_index += o->stream->write_index_corrections[j].value;
1941 }
1942 }
1943
1944 /* Clear old correction entries */
1945 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1946 if (!o->stream->write_index_corrections[n].valid)
1947 continue;
1948
1949 if (o->stream->write_index_corrections[n].tag <= tag)
1950 o->stream->write_index_corrections[n].valid = false;
1951 }
1952 }
1953
1954 if (o->stream->direction == PA_STREAM_RECORD) {
1955 /* Read index correction */
1956
1957 if (!i->read_index_corrupt)
1958 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1959 }
1960
1961 /* Update smoother if we're not corked */
1962 if (o->stream->smoother && !o->stream->corked) {
1963 pa_usec_t u, x;
1964
1965 u = x = pa_rtclock_now() - i->transport_usec;
1966
1967 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1968 pa_usec_t su;
1969
1970 /* If we weren't playing then it will take some time
1971 * until the audio will actually come out through the
1972 * speakers. Since we follow that timing here, we need
1973 * to try to fix this up */
1974
1975 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1976
1977 if (su < i->sink_usec)
1978 x += i->sink_usec - su;
1979 }
1980
1981 if (!i->playing)
1982 #ifdef USE_SMOOTHER_2
1983 pa_smoother_2_pause(o->stream->smoother, x);
1984 #else
1985 pa_smoother_pause(o->stream->smoother, x);
1986 #endif
1987
1988 /* Update the smoother */
1989 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1990 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1991 #ifdef USE_SMOOTHER_2
1992 pa_smoother_2_put(o->stream->smoother, u, calc_bytes(o->stream, true));
1993 #else
1994 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, true));
1995 #endif
1996
1997 if (i->playing)
1998 #ifdef USE_SMOOTHER_2
1999 pa_smoother_2_resume(o->stream->smoother, x);
2000 #else
2001 pa_smoother_resume(o->stream->smoother, x, true);
2002 #endif
2003 }
2004 }
2005
2006 o->stream->auto_timing_update_requested = false;
2007
2008 if (o->stream->latency_update_callback)
2009 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
2010
2011 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
2012 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2013 cb(o->stream, o->stream->timing_info_valid, o->userdata);
2014 }
2015
2016 finish:
2017
2018 pa_operation_done(o);
2019 pa_operation_unref(o);
2020 }
2021
pa_stream_update_timing_info(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)2022 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2023 uint32_t tag;
2024 pa_operation *o;
2025 pa_tagstruct *t;
2026 struct timeval now;
2027 int cidx = 0;
2028
2029 pa_assert(s);
2030 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2031
2032 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2033 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2034 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2035
2036 if (s->direction == PA_STREAM_PLAYBACK) {
2037 /* Find a place to store the write_index correction data for this entry */
2038 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
2039
2040 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
2041 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
2042 }
2043 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2044
2045 t = pa_tagstruct_command(
2046 s->context,
2047 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
2048 &tag);
2049 pa_tagstruct_putu32(t, s->channel);
2050 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
2051
2052 pa_pstream_send_tagstruct(s->context->pstream, t);
2053 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2054
2055 if (s->direction == PA_STREAM_PLAYBACK) {
2056 /* Fill in initial correction data */
2057
2058 s->current_write_index_correction = cidx;
2059
2060 s->write_index_corrections[cidx].valid = true;
2061 s->write_index_corrections[cidx].absolute = false;
2062 s->write_index_corrections[cidx].corrupt = false;
2063 s->write_index_corrections[cidx].tag = tag;
2064 s->write_index_corrections[cidx].value = 0;
2065 }
2066
2067 return o;
2068 }
2069
pa_stream_disconnect_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)2070 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2071 pa_stream *s = userdata;
2072
2073 pa_assert(pd);
2074 pa_assert(s);
2075 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2076
2077 pa_stream_ref(s);
2078
2079 if (command != PA_COMMAND_REPLY) {
2080 if (pa_context_handle_error(s->context, command, t, false) < 0)
2081 goto finish;
2082
2083 pa_stream_set_state(s, PA_STREAM_FAILED);
2084 goto finish;
2085 } else if (!pa_tagstruct_eof(t)) {
2086 pa_context_fail(s->context, PA_ERR_PROTOCOL);
2087 goto finish;
2088 }
2089
2090 pa_stream_set_state(s, PA_STREAM_TERMINATED);
2091
2092 finish:
2093 pa_stream_unref(s);
2094 }
2095
pa_stream_disconnect(pa_stream * s)2096 int pa_stream_disconnect(pa_stream *s) {
2097 pa_tagstruct *t;
2098 uint32_t tag;
2099
2100 pa_assert(s);
2101 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2102
2103 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2104 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
2105 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
2106
2107 pa_stream_ref(s);
2108
2109 t = pa_tagstruct_command(
2110 s->context,
2111 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
2112 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
2113 &tag);
2114 pa_tagstruct_putu32(t, s->channel);
2115 pa_pstream_send_tagstruct(s->context->pstream, t);
2116 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
2117
2118 pa_stream_unref(s);
2119 return 0;
2120 }
2121
pa_stream_set_read_callback(pa_stream * s,pa_stream_request_cb_t cb,void * userdata)2122 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2123 pa_assert(s);
2124 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2125
2126 if (pa_detect_fork())
2127 return;
2128
2129 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2130 return;
2131
2132 s->read_callback = cb;
2133 s->read_userdata = userdata;
2134 }
2135
pa_stream_set_write_callback(pa_stream * s,pa_stream_request_cb_t cb,void * userdata)2136 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2137 pa_assert(s);
2138 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2139
2140 if (pa_detect_fork())
2141 return;
2142
2143 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2144 return;
2145
2146 s->write_callback = cb;
2147 s->write_userdata = userdata;
2148 }
2149
pa_stream_set_state_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2150 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2151 pa_assert(s);
2152 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2153
2154 if (pa_detect_fork())
2155 return;
2156
2157 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2158 return;
2159
2160 s->state_callback = cb;
2161 s->state_userdata = userdata;
2162 }
2163
pa_stream_set_overflow_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2164 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2165 pa_assert(s);
2166 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2167
2168 if (pa_detect_fork())
2169 return;
2170
2171 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2172 return;
2173
2174 s->overflow_callback = cb;
2175 s->overflow_userdata = userdata;
2176 }
2177
pa_stream_set_underflow_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2178 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2179 pa_assert(s);
2180 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2181
2182 if (pa_detect_fork())
2183 return;
2184
2185 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2186 return;
2187
2188 s->underflow_callback = cb;
2189 s->underflow_userdata = userdata;
2190 }
2191
pa_stream_set_latency_update_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2192 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2193 pa_assert(s);
2194 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2195
2196 if (pa_detect_fork())
2197 return;
2198
2199 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2200 return;
2201
2202 s->latency_update_callback = cb;
2203 s->latency_update_userdata = userdata;
2204 }
2205
pa_stream_set_moved_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2206 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2207 pa_assert(s);
2208 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2209
2210 if (pa_detect_fork())
2211 return;
2212
2213 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2214 return;
2215
2216 s->moved_callback = cb;
2217 s->moved_userdata = userdata;
2218 }
2219
pa_stream_set_suspended_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2220 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2221 pa_assert(s);
2222 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2223
2224 if (pa_detect_fork())
2225 return;
2226
2227 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2228 return;
2229
2230 s->suspended_callback = cb;
2231 s->suspended_userdata = userdata;
2232 }
2233
pa_stream_set_started_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2234 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2235 pa_assert(s);
2236 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2237
2238 if (pa_detect_fork())
2239 return;
2240
2241 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2242 return;
2243
2244 s->started_callback = cb;
2245 s->started_userdata = userdata;
2246 }
2247
pa_stream_set_event_callback(pa_stream * s,pa_stream_event_cb_t cb,void * userdata)2248 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2249 pa_assert(s);
2250 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2251
2252 if (pa_detect_fork())
2253 return;
2254
2255 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2256 return;
2257
2258 s->event_callback = cb;
2259 s->event_userdata = userdata;
2260 }
2261
pa_stream_set_buffer_attr_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2262 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2263 pa_assert(s);
2264 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2265
2266 if (pa_detect_fork())
2267 return;
2268
2269 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2270 return;
2271
2272 s->buffer_attr_callback = cb;
2273 s->buffer_attr_userdata = userdata;
2274 }
2275
pa_stream_simple_ack_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)2276 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2277 pa_operation *o = userdata;
2278 int success = 1;
2279
2280 pa_assert(pd);
2281 pa_assert(o);
2282 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2283
2284 if (!o->context)
2285 goto finish;
2286
2287 if (command != PA_COMMAND_REPLY) {
2288 if (pa_context_handle_error(o->context, command, t, false) < 0)
2289 goto finish;
2290
2291 success = 0;
2292 } else if (!pa_tagstruct_eof(t)) {
2293 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2294 goto finish;
2295 }
2296
2297 if (o->callback) {
2298 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2299 cb(o->stream, success, o->userdata);
2300 }
2301
2302 finish:
2303 pa_operation_done(o);
2304 pa_operation_unref(o);
2305 }
2306
pa_stream_cork(pa_stream * s,int b,pa_stream_success_cb_t cb,void * userdata)2307 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2308 pa_operation *o;
2309 pa_tagstruct *t;
2310 uint32_t tag;
2311
2312 pa_assert(s);
2313 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2314
2315 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2316 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2317 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2318
2319 /* Ask for a timing update before we cork/uncork to get the best
2320 * accuracy for the transport latency suitable for the
2321 * check_smoother_status() call in the started callback */
2322 request_auto_timing_update(s, true);
2323
2324 s->corked = b;
2325
2326 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2327
2328 t = pa_tagstruct_command(
2329 s->context,
2330 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2331 &tag);
2332 pa_tagstruct_putu32(t, s->channel);
2333 pa_tagstruct_put_boolean(t, !!b);
2334 pa_pstream_send_tagstruct(s->context->pstream, t);
2335 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2336
2337 check_smoother_status(s, false, false, false);
2338
2339 /* This might cause the indexes to hang/start again, hence let's
2340 * request a timing update, after the cork/uncork, too */
2341 request_auto_timing_update(s, true);
2342
2343 return o;
2344 }
2345
stream_send_simple_command(pa_stream * s,uint32_t command,pa_stream_success_cb_t cb,void * userdata)2346 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2347 pa_tagstruct *t;
2348 pa_operation *o;
2349 uint32_t tag;
2350
2351 pa_assert(s);
2352 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2353
2354 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2355 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2356
2357 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2358
2359 t = pa_tagstruct_command(s->context, command, &tag);
2360 pa_tagstruct_putu32(t, s->channel);
2361 pa_pstream_send_tagstruct(s->context->pstream, t);
2362 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2363
2364 return o;
2365 }
2366
pa_stream_flush(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)2367 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2368 pa_operation *o;
2369
2370 pa_assert(s);
2371 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2372
2373 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2374 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2375 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2376
2377 /* Ask for a timing update *before* the flush, so that the
2378 * transport usec is as up to date as possible when we get the
2379 * underflow message and update the smoother status*/
2380 request_auto_timing_update(s, true);
2381
2382 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2383 return NULL;
2384
2385 if (s->direction == PA_STREAM_PLAYBACK) {
2386
2387 if (s->write_index_corrections[s->current_write_index_correction].valid)
2388 s->write_index_corrections[s->current_write_index_correction].corrupt = true;
2389
2390 if (s->buffer_attr.prebuf > 0)
2391 check_smoother_status(s, false, false, true);
2392
2393 /* This will change the write index, but leave the
2394 * read index untouched. */
2395 invalidate_indexes(s, false, true);
2396
2397 } else
2398 /* For record streams this has no influence on the write
2399 * index, but the read index might jump. */
2400 invalidate_indexes(s, true, false);
2401
2402 /* Note that we do not update requested_bytes here. This is
2403 * because we cannot really know how data actually was dropped
2404 * from the write index due to this. This 'error' will be applied
2405 * by both client and server and hence we should be fine. */
2406
2407 return o;
2408 }
2409
pa_stream_prebuf(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)2410 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2411 pa_operation *o;
2412
2413 pa_assert(s);
2414 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2415
2416 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2417 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2418 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2419 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2420
2421 /* Ask for a timing update before we cork/uncork to get the best
2422 * accuracy for the transport latency suitable for the
2423 * check_smoother_status() call in the started callback */
2424 request_auto_timing_update(s, true);
2425
2426 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2427 return NULL;
2428
2429 /* This might cause the read index to hang again, hence
2430 * let's request a timing update */
2431 request_auto_timing_update(s, true);
2432
2433 return o;
2434 }
2435
pa_stream_trigger(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)2436 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2437 pa_operation *o;
2438
2439 pa_assert(s);
2440 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2441
2442 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2443 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2444 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2445 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2446
2447 /* Ask for a timing update before we cork/uncork to get the best
2448 * accuracy for the transport latency suitable for the
2449 * check_smoother_status() call in the started callback */
2450 request_auto_timing_update(s, true);
2451
2452 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2453 return NULL;
2454
2455 /* This might cause the read index to start moving again, hence
2456 * let's request a timing update */
2457 request_auto_timing_update(s, true);
2458
2459 return o;
2460 }
2461
pa_stream_set_name(pa_stream * s,const char * name,pa_stream_success_cb_t cb,void * userdata)2462 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2463 pa_operation *o;
2464
2465 pa_assert(s);
2466 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2467 pa_assert(name);
2468
2469 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2470 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2471 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2472
2473 if (s->context->version >= 13) {
2474 pa_proplist *p = pa_proplist_new();
2475
2476 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2477 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2478 pa_proplist_free(p);
2479 } else {
2480 pa_tagstruct *t;
2481 uint32_t tag;
2482
2483 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2484 t = pa_tagstruct_command(
2485 s->context,
2486 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2487 &tag);
2488 pa_tagstruct_putu32(t, s->channel);
2489 pa_tagstruct_puts(t, name);
2490 pa_pstream_send_tagstruct(s->context->pstream, t);
2491 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2492 }
2493
2494 return o;
2495 }
2496
pa_stream_get_time(pa_stream * s,pa_usec_t * r_usec)2497 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2498 pa_usec_t usec;
2499
2500 pa_assert(s);
2501 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2502
2503 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2504 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2505 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2506 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2507 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2508 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2509
2510 if (s->smoother)
2511 #ifdef USE_SMOOTHER_2
2512 usec = pa_smoother_2_get(s->smoother, pa_rtclock_now());
2513 #else
2514 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2515 #endif
2516
2517 else
2518 usec = calc_time(s, false);
2519
2520 /* Make sure the time runs monotonically */
2521 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2522 if (usec < s->previous_time)
2523 usec = s->previous_time;
2524 else
2525 s->previous_time = usec;
2526 }
2527
2528 if (r_usec)
2529 *r_usec = usec;
2530
2531 return 0;
2532 }
2533
time_counter_diff(const pa_stream * s,pa_usec_t a,pa_usec_t b,int * negative)2534 static pa_usec_t time_counter_diff(const pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2535 pa_assert(s);
2536 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2537
2538 if (negative)
2539 *negative = 0;
2540
2541 if (a >= b)
2542 return a-b;
2543 else {
2544 if (negative && s->direction == PA_STREAM_RECORD) {
2545 *negative = 1;
2546 return b-a;
2547 } else
2548 return 0;
2549 }
2550 }
2551
pa_stream_get_latency(pa_stream * s,pa_usec_t * r_usec,int * negative)2552 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2553 pa_usec_t t, c;
2554 int r;
2555 int64_t cindex;
2556
2557 pa_assert(s);
2558 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2559 pa_assert(r_usec);
2560
2561 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2562 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2563 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2564 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2565 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2566 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2567
2568 if ((r = pa_stream_get_time(s, &t)) < 0)
2569 return r;
2570
2571 if (s->direction == PA_STREAM_PLAYBACK)
2572 cindex = s->timing_info.write_index;
2573 else
2574 cindex = s->timing_info.read_index;
2575
2576 if (cindex < 0)
2577 cindex = 0;
2578
2579 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2580
2581 if (s->direction == PA_STREAM_PLAYBACK)
2582 *r_usec = time_counter_diff(s, c, t, negative);
2583 else
2584 *r_usec = time_counter_diff(s, t, c, negative);
2585
2586 return 0;
2587 }
2588
pa_stream_get_timing_info(pa_stream * s)2589 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2590 pa_assert(s);
2591 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2592
2593 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2594 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2595 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2596 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2597
2598 return &s->timing_info;
2599 }
2600
pa_stream_get_sample_spec(pa_stream * s)2601 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2602 pa_assert(s);
2603 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2604
2605 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2606
2607 return &s->sample_spec;
2608 }
2609
pa_stream_get_channel_map(pa_stream * s)2610 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2611 pa_assert(s);
2612 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2613
2614 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2615
2616 return &s->channel_map;
2617 }
2618
pa_stream_get_format_info(const pa_stream * s)2619 const pa_format_info* pa_stream_get_format_info(const pa_stream *s) {
2620 pa_assert(s);
2621 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2622
2623 /* We don't have the format till routing is done */
2624 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2625 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2626
2627 return s->format;
2628 }
pa_stream_get_buffer_attr(pa_stream * s)2629 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2630 pa_assert(s);
2631 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2632
2633 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2634 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2635 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2636
2637 return &s->buffer_attr;
2638 }
2639
stream_set_buffer_attr_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)2640 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2641 pa_operation *o = userdata;
2642 int success = 1;
2643
2644 pa_assert(pd);
2645 pa_assert(o);
2646 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2647
2648 if (!o->context)
2649 goto finish;
2650
2651 if (command != PA_COMMAND_REPLY) {
2652 if (pa_context_handle_error(o->context, command, t, false) < 0)
2653 goto finish;
2654
2655 success = 0;
2656 } else {
2657 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2658 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2659 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2660 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2661 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2662 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2663 goto finish;
2664 }
2665 } else if (o->stream->direction == PA_STREAM_RECORD) {
2666 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2667 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2668 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2669 goto finish;
2670 }
2671 }
2672
2673 if (o->stream->context->version >= 13) {
2674 pa_usec_t usec;
2675
2676 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2677 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2678 goto finish;
2679 }
2680
2681 if (o->stream->direction == PA_STREAM_RECORD)
2682 o->stream->timing_info.configured_source_usec = usec;
2683 else
2684 o->stream->timing_info.configured_sink_usec = usec;
2685 }
2686
2687 if (!pa_tagstruct_eof(t)) {
2688 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2689 goto finish;
2690 }
2691 }
2692
2693 if (o->callback) {
2694 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2695 cb(o->stream, success, o->userdata);
2696 }
2697
2698 finish:
2699 pa_operation_done(o);
2700 pa_operation_unref(o);
2701 }
2702
pa_stream_set_buffer_attr(pa_stream * s,const pa_buffer_attr * attr,pa_stream_success_cb_t cb,void * userdata)2703 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2704 pa_operation *o;
2705 pa_tagstruct *t;
2706 uint32_t tag;
2707 pa_buffer_attr copy;
2708
2709 pa_assert(s);
2710 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2711 pa_assert(attr);
2712
2713 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2714 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2715 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2716 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2717
2718 /* Ask for a timing update before we cork/uncork to get the best
2719 * accuracy for the transport latency suitable for the
2720 * check_smoother_status() call in the started callback */
2721 request_auto_timing_update(s, true);
2722
2723 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2724
2725 t = pa_tagstruct_command(
2726 s->context,
2727 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2728 &tag);
2729 pa_tagstruct_putu32(t, s->channel);
2730
2731 copy = *attr;
2732 patch_buffer_attr(s, ©, NULL);
2733 attr = ©
2734
2735 pa_tagstruct_putu32(t, attr->maxlength);
2736
2737 if (s->direction == PA_STREAM_PLAYBACK)
2738 pa_tagstruct_put(
2739 t,
2740 PA_TAG_U32, attr->tlength,
2741 PA_TAG_U32, attr->prebuf,
2742 PA_TAG_U32, attr->minreq,
2743 PA_TAG_INVALID);
2744 else
2745 pa_tagstruct_putu32(t, attr->fragsize);
2746
2747 if (s->context->version >= 13)
2748 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2749
2750 if (s->context->version >= 14)
2751 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2752
2753 pa_pstream_send_tagstruct(s->context->pstream, t);
2754 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2755
2756 /* This might cause changes in the read/write index, hence let's
2757 * request a timing update */
2758 request_auto_timing_update(s, true);
2759
2760 return o;
2761 }
2762
pa_stream_get_device_index(const pa_stream * s)2763 uint32_t pa_stream_get_device_index(const pa_stream *s) {
2764 pa_assert(s);
2765 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2766
2767 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2768 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2769 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2770 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2771 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2772
2773 return s->device_index;
2774 }
2775
pa_stream_get_device_name(const pa_stream * s)2776 const char *pa_stream_get_device_name(const pa_stream *s) {
2777 pa_assert(s);
2778 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2779
2780 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2781 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2782 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2783 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2784 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2785
2786 return s->device_name;
2787 }
2788
pa_stream_is_suspended(const pa_stream * s)2789 int pa_stream_is_suspended(const pa_stream *s) {
2790 pa_assert(s);
2791 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2792
2793 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2794 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2795 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2796 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2797
2798 return s->suspended;
2799 }
2800
pa_stream_is_corked(const pa_stream * s)2801 int pa_stream_is_corked(const pa_stream *s) {
2802 pa_assert(s);
2803 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2804
2805 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2806 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2807 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2808
2809 return s->corked;
2810 }
2811
stream_update_sample_rate_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)2812 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2813 pa_operation *o = userdata;
2814 int success = 1;
2815
2816 pa_assert(pd);
2817 pa_assert(o);
2818 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2819
2820 if (!o->context)
2821 goto finish;
2822
2823 if (command != PA_COMMAND_REPLY) {
2824 if (pa_context_handle_error(o->context, command, t, false) < 0)
2825 goto finish;
2826
2827 success = 0;
2828 } else {
2829
2830 if (!pa_tagstruct_eof(t)) {
2831 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2832 goto finish;
2833 }
2834 }
2835
2836 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2837 #ifdef USE_SMOOTHER_2
2838 if (o->stream->smoother)
2839 pa_smoother_2_set_rate(o->stream->smoother, pa_rtclock_now(), o->stream->sample_spec.rate);
2840 #endif
2841 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2842
2843 if (o->callback) {
2844 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2845 cb(o->stream, success, o->userdata);
2846 }
2847
2848 finish:
2849 pa_operation_done(o);
2850 pa_operation_unref(o);
2851 }
2852
pa_stream_update_sample_rate(pa_stream * s,uint32_t rate,pa_stream_success_cb_t cb,void * userdata)2853 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2854 pa_operation *o;
2855 pa_tagstruct *t;
2856 uint32_t tag;
2857
2858 pa_assert(s);
2859 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2860
2861 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2862 PA_CHECK_VALIDITY_RETURN_NULL(s->context, pa_sample_rate_valid(rate), PA_ERR_INVALID);
2863 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2864 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2865 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2866 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2867
2868 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2869 o->private = PA_UINT_TO_PTR(rate);
2870
2871 t = pa_tagstruct_command(
2872 s->context,
2873 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2874 &tag);
2875 pa_tagstruct_putu32(t, s->channel);
2876 pa_tagstruct_putu32(t, rate);
2877
2878 pa_pstream_send_tagstruct(s->context->pstream, t);
2879 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2880
2881 return o;
2882 }
2883
pa_stream_proplist_update(pa_stream * s,pa_update_mode_t mode,pa_proplist * p,pa_stream_success_cb_t cb,void * userdata)2884 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2885 pa_operation *o;
2886 pa_tagstruct *t;
2887 uint32_t tag;
2888
2889 pa_assert(s);
2890 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2891
2892 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2893 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2894 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2895 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2896 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2897
2898 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2899
2900 t = pa_tagstruct_command(
2901 s->context,
2902 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2903 &tag);
2904 pa_tagstruct_putu32(t, s->channel);
2905 pa_tagstruct_putu32(t, (uint32_t) mode);
2906 pa_tagstruct_put_proplist(t, p);
2907
2908 pa_pstream_send_tagstruct(s->context->pstream, t);
2909 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2910
2911 /* Please note that we don't update s->proplist here, because we
2912 * don't export that field */
2913
2914 return o;
2915 }
2916
pa_stream_proplist_remove(pa_stream * s,const char * const keys[],pa_stream_success_cb_t cb,void * userdata)2917 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2918 pa_operation *o;
2919 pa_tagstruct *t;
2920 uint32_t tag;
2921 const char * const*k;
2922
2923 pa_assert(s);
2924 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2925
2926 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2927 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2928 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2929 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2930 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2931
2932 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2933
2934 t = pa_tagstruct_command(
2935 s->context,
2936 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2937 &tag);
2938 pa_tagstruct_putu32(t, s->channel);
2939
2940 for (k = keys; *k; k++)
2941 pa_tagstruct_puts(t, *k);
2942
2943 pa_tagstruct_puts(t, NULL);
2944
2945 pa_pstream_send_tagstruct(s->context->pstream, t);
2946 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2947
2948 /* Please note that we don't update s->proplist here, because we
2949 * don't export that field */
2950
2951 return o;
2952 }
2953
pa_stream_set_monitor_stream(pa_stream * s,uint32_t sink_input_idx)2954 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2955 pa_assert(s);
2956 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2957
2958 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2959 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2960 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2961 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2962
2963 s->direct_on_input = sink_input_idx;
2964
2965 return 0;
2966 }
2967
pa_stream_get_monitor_stream(const pa_stream * s)2968 uint32_t pa_stream_get_monitor_stream(const pa_stream *s) {
2969 pa_assert(s);
2970 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2971
2972 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2973 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2974 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2975
2976 return s->direct_on_input;
2977 }
2978