• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /***
2   This file is part of PulseAudio.
3 
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6 
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11 
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16 
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20 
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24 
25 #include <string.h>
26 #include <stdio.h>
27 #include <string.h>
28 
29 #include <pulse/def.h>
30 #include <pulse/timeval.h>
31 #include <pulse/rtclock.h>
32 #include <pulse/xmalloc.h>
33 #include <pulse/fork-detect.h>
34 
35 #include <pulsecore/pstream-util.h>
36 #include <pulsecore/sample-util.h>
37 #include <pulsecore/log.h>
38 #include <pulsecore/hashmap.h>
39 #include <pulsecore/macro.h>
40 #include <pulsecore/core-rtclock.h>
41 #include <pulsecore/core-util.h>
42 
43 #include "internal.h"
44 #include "stream.h"
45 
46 /* #define STREAM_DEBUG */
47 
48 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
49 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
50 
51 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
52 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
53 #define SMOOTHER_MIN_HISTORY (4)
54 
pa_stream_new(pa_context * c,const char * name,const pa_sample_spec * ss,const pa_channel_map * map)55 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
56     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
57 }
58 
reset_callbacks(pa_stream * s)59 static void reset_callbacks(pa_stream *s) {
60     s->read_callback = NULL;
61     s->read_userdata = NULL;
62     s->write_callback = NULL;
63     s->write_userdata = NULL;
64     s->state_callback = NULL;
65     s->state_userdata = NULL;
66     s->overflow_callback = NULL;
67     s->overflow_userdata = NULL;
68     s->underflow_callback = NULL;
69     s->underflow_userdata = NULL;
70     s->latency_update_callback = NULL;
71     s->latency_update_userdata = NULL;
72     s->moved_callback = NULL;
73     s->moved_userdata = NULL;
74     s->suspended_callback = NULL;
75     s->suspended_userdata = NULL;
76     s->started_callback = NULL;
77     s->started_userdata = NULL;
78     s->event_callback = NULL;
79     s->event_userdata = NULL;
80     s->buffer_attr_callback = NULL;
81     s->buffer_attr_userdata = NULL;
82 }
83 
pa_stream_new_with_proplist_internal(pa_context * c,const char * name,const pa_sample_spec * ss,const pa_channel_map * map,pa_format_info * const * formats,unsigned int n_formats,pa_proplist * p)84 static pa_stream *pa_stream_new_with_proplist_internal(
85         pa_context *c,
86         const char *name,
87         const pa_sample_spec *ss,
88         const pa_channel_map *map,
89         pa_format_info * const *formats,
90         unsigned int n_formats,
91         pa_proplist *p) {
92 
93     pa_stream *s;
94     unsigned int i;
95 
96     pa_assert(c);
97     pa_assert(PA_REFCNT_VALUE(c) >= 1);
98     pa_assert((ss == NULL && map == NULL) || (formats == NULL && n_formats == 0));
99     pa_assert(n_formats < PA_MAX_FORMATS);
100 
101     PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
102     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
103 
104     s = pa_xnew(pa_stream, 1);
105     PA_REFCNT_INIT(s);
106     s->context = c;
107     s->mainloop = c->mainloop;
108 
109     s->direction = PA_STREAM_NODIRECTION;
110     s->state = PA_STREAM_UNCONNECTED;
111     s->flags = 0;
112 
113     if (ss)
114         s->sample_spec = *ss;
115     else
116         pa_sample_spec_init(&s->sample_spec);
117 
118     if (map)
119         s->channel_map = *map;
120     else
121         pa_channel_map_init(&s->channel_map);
122 
123     s->n_formats = 0;
124     if (formats) {
125         s->n_formats = n_formats;
126         for (i = 0; i < n_formats; i++)
127             s->req_formats[i] = pa_format_info_copy(formats[i]);
128     }
129 
130     /* We'll get the final negotiated format after connecting */
131     s->format = NULL;
132 
133     s->direct_on_input = PA_INVALID_INDEX;
134 
135     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
136     if (name)
137         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
138 
139     s->channel = 0;
140     s->channel_valid = false;
141     s->syncid = c->csyncid++;
142     s->stream_index = PA_INVALID_INDEX;
143 
144     s->requested_bytes = 0;
145     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
146 
147     /* We initialize the target length here, so that if the user
148      * passes no explicit buffering metrics the default is similar to
149      * what older PA versions provided. */
150 
151     s->buffer_attr.maxlength = (uint32_t) -1;
152     if (ss)
153         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
154     else {
155         /* FIXME: We assume a worst-case compressed format corresponding to
156          * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
157         pa_sample_spec tmp_ss = {
158             .format   = PA_SAMPLE_S16NE,
159             .rate     = 48000,
160             .channels = 2,
161         };
162         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
163     }
164     s->buffer_attr.minreq = (uint32_t) -1;
165     s->buffer_attr.prebuf = (uint32_t) -1;
166     s->buffer_attr.fragsize = (uint32_t) -1;
167 
168     s->device_index = PA_INVALID_INDEX;
169     s->device_name = NULL;
170     s->suspended = false;
171     s->corked = false;
172 
173     s->write_memblock = NULL;
174     s->write_data = NULL;
175 
176     pa_memchunk_reset(&s->peek_memchunk);
177     s->peek_data = NULL;
178     s->record_memblockq = NULL;
179 
180     memset(&s->timing_info, 0, sizeof(s->timing_info));
181     s->timing_info_valid = false;
182 
183     s->previous_time = 0;
184     s->latest_underrun_at_index = -1;
185 
186     s->read_index_not_before = 0;
187     s->write_index_not_before = 0;
188     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
189         s->write_index_corrections[i].valid = 0;
190     s->current_write_index_correction = 0;
191 
192     s->auto_timing_update_event = NULL;
193     s->auto_timing_update_requested = false;
194     s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
195 
196     reset_callbacks(s);
197 
198     s->smoother = NULL;
199 
200     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
201     PA_LLIST_PREPEND(pa_stream, c->streams, s);
202     pa_stream_ref(s);
203 
204     return s;
205 }
206 
pa_stream_new_with_proplist(pa_context * c,const char * name,const pa_sample_spec * ss,const pa_channel_map * map,pa_proplist * p)207 pa_stream *pa_stream_new_with_proplist(
208         pa_context *c,
209         const char *name,
210         const pa_sample_spec *ss,
211         const pa_channel_map *map,
212         pa_proplist *p) {
213 
214     pa_channel_map tmap;
215 
216     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
217     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
218     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
219     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
220     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
221 
222     if (!map)
223         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
224 
225     return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, 0, p);
226 }
227 
pa_stream_new_extended(pa_context * c,const char * name,pa_format_info * const * formats,unsigned int n_formats,pa_proplist * p)228 pa_stream *pa_stream_new_extended(
229         pa_context *c,
230         const char *name,
231         pa_format_info * const *formats,
232         unsigned int n_formats,
233         pa_proplist *p) {
234 
235     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
236 
237     return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, n_formats, p);
238 }
239 
stream_unlink(pa_stream * s)240 static void stream_unlink(pa_stream *s) {
241     pa_operation *o, *n;
242     pa_assert(s);
243 
244     if (!s->context)
245         return;
246 
247     /* Detach from context */
248 
249     /* Unref all operation objects that point to us */
250     for (o = s->context->operations; o; o = n) {
251         n = o->next;
252 
253         if (o->stream == s)
254             pa_operation_cancel(o);
255     }
256 
257     /* Drop all outstanding replies for this stream */
258     if (s->context->pdispatch)
259         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
260 
261     if (s->channel_valid) {
262         pa_hashmap_remove((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel));
263         s->channel = 0;
264         s->channel_valid = false;
265     }
266 
267     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
268     pa_stream_unref(s);
269 
270     s->context = NULL;
271 
272     if (s->auto_timing_update_event) {
273         pa_assert(s->mainloop);
274         s->mainloop->time_free(s->auto_timing_update_event);
275     }
276 
277     reset_callbacks(s);
278 }
279 
stream_free(pa_stream * s)280 static void stream_free(pa_stream *s) {
281     unsigned int i;
282 
283     pa_assert(s);
284 
285     stream_unlink(s);
286 
287     if (s->write_memblock) {
288         if (s->write_data)
289             pa_memblock_release(s->write_memblock);
290         pa_memblock_unref(s->write_memblock);
291     }
292 
293     if (s->peek_memchunk.memblock) {
294         if (s->peek_data)
295             pa_memblock_release(s->peek_memchunk.memblock);
296         pa_memblock_unref(s->peek_memchunk.memblock);
297     }
298 
299     if (s->record_memblockq)
300         pa_memblockq_free(s->record_memblockq);
301 
302     if (s->proplist)
303         pa_proplist_free(s->proplist);
304 
305     if (s->smoother)
306         pa_smoother_free(s->smoother);
307 
308     for (i = 0; i < s->n_formats; i++)
309         pa_format_info_free(s->req_formats[i]);
310 
311     if (s->format)
312         pa_format_info_free(s->format);
313 
314     pa_xfree(s->device_name);
315     pa_xfree(s);
316 }
317 
pa_stream_unref(pa_stream * s)318 void pa_stream_unref(pa_stream *s) {
319     pa_assert(s);
320     pa_assert(PA_REFCNT_VALUE(s) >= 1);
321 
322     if (PA_REFCNT_DEC(s) <= 0)
323         stream_free(s);
324 }
325 
pa_stream_ref(pa_stream * s)326 pa_stream* pa_stream_ref(pa_stream *s) {
327     pa_assert(s);
328     pa_assert(PA_REFCNT_VALUE(s) >= 1);
329 
330     PA_REFCNT_INC(s);
331     return s;
332 }
333 
pa_stream_get_state(const pa_stream * s)334 pa_stream_state_t pa_stream_get_state(const pa_stream *s) {
335     pa_assert(s);
336     pa_assert(PA_REFCNT_VALUE(s) >= 1);
337 
338     return s->state;
339 }
340 
pa_stream_get_context(const pa_stream * s)341 pa_context* pa_stream_get_context(const pa_stream *s) {
342     pa_assert(s);
343     pa_assert(PA_REFCNT_VALUE(s) >= 1);
344 
345     return s->context;
346 }
347 
pa_stream_get_index(const pa_stream * s)348 uint32_t pa_stream_get_index(const pa_stream *s) {
349     pa_assert(s);
350     pa_assert(PA_REFCNT_VALUE(s) >= 1);
351 
352     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
353     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
354 
355     return s->stream_index;
356 }
357 
pa_stream_set_state(pa_stream * s,pa_stream_state_t st)358 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
359     pa_assert(s);
360     pa_assert(PA_REFCNT_VALUE(s) >= 1);
361 
362     if (s->state == st)
363         return;
364 
365     pa_stream_ref(s);
366 
367     s->state = st;
368 
369     if (s->state_callback)
370         s->state_callback(s, s->state_userdata);
371 
372     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
373         stream_unlink(s);
374 
375     pa_stream_unref(s);
376 }
377 
request_auto_timing_update(pa_stream * s,bool force)378 static void request_auto_timing_update(pa_stream *s, bool force) {
379     pa_assert(s);
380     pa_assert(PA_REFCNT_VALUE(s) >= 1);
381 
382     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
383         return;
384 
385     if (s->state == PA_STREAM_READY &&
386         (force || !s->auto_timing_update_requested)) {
387         pa_operation *o;
388 
389 #ifdef STREAM_DEBUG
390         pa_log_debug("Automatically requesting new timing data");
391 #endif
392 
393         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
394             pa_operation_unref(o);
395             s->auto_timing_update_requested = true;
396         }
397     }
398 
399     if (s->auto_timing_update_event) {
400         if (s->suspended && !force) {
401             pa_assert(s->mainloop);
402             s->mainloop->time_free(s->auto_timing_update_event);
403             s->auto_timing_update_event = NULL;
404         } else {
405             if (force)
406                 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
407 
408             pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
409 
410             s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
411         }
412     }
413 }
414 
pa_command_stream_killed(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)415 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
416     pa_context *c = userdata;
417     pa_stream *s;
418     uint32_t channel;
419 
420     pa_assert(pd);
421     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
422     pa_assert(t);
423     pa_assert(c);
424     pa_assert(PA_REFCNT_VALUE(c) >= 1);
425 
426     pa_context_ref(c);
427 
428     if (pa_tagstruct_getu32(t, &channel) < 0 ||
429         !pa_tagstruct_eof(t)) {
430         pa_context_fail(c, PA_ERR_PROTOCOL);
431         goto finish;
432     }
433 
434     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
435         goto finish;
436 
437     if (s->state != PA_STREAM_READY)
438         goto finish;
439 
440     pa_context_set_error(c, PA_ERR_KILLED);
441     pa_stream_set_state(s, PA_STREAM_FAILED);
442 
443 finish:
444     pa_context_unref(c);
445 }
446 
check_smoother_status(pa_stream * s,bool aposteriori,bool force_start,bool force_stop)447 static void check_smoother_status(pa_stream *s, bool aposteriori, bool force_start, bool force_stop) {
448     pa_usec_t x;
449 
450     pa_assert(s);
451     pa_assert(!force_start || !force_stop);
452 
453     if (!s->smoother)
454         return;
455 
456     x = pa_rtclock_now();
457 
458     if (s->timing_info_valid) {
459         if (aposteriori)
460             x -= s->timing_info.transport_usec;
461         else
462             x += s->timing_info.transport_usec;
463     }
464 
465     if (s->suspended || s->corked || force_stop)
466         pa_smoother_pause(s->smoother, x);
467     else if (force_start || s->buffer_attr.prebuf == 0) {
468 
469         if (!s->timing_info_valid &&
470             !aposteriori &&
471             !force_start &&
472             !force_stop &&
473             s->context->version >= 13) {
474 
475             /* If the server supports STARTED events we take them as
476              * indications when audio really starts/stops playing, if
477              * we don't have any timing info yet -- instead of trying
478              * to be smart and guessing the server time. Otherwise the
479              * unknown transport delay adds too much noise to our time
480              * calculations. */
481 
482             return;
483         }
484 
485         pa_smoother_resume(s->smoother, x, true);
486     }
487 
488     /* Please note that we have no idea if playback actually started
489      * if prebuf is non-zero! */
490 }
491 
492 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata);
493 
pa_command_stream_moved(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)494 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
495     pa_context *c = userdata;
496     pa_stream *s;
497     uint32_t channel;
498     const char *dn;
499     bool suspended;
500     uint32_t di;
501     pa_usec_t usec = 0;
502     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
503 
504     pa_assert(pd);
505     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
506     pa_assert(t);
507     pa_assert(c);
508     pa_assert(PA_REFCNT_VALUE(c) >= 1);
509 
510     pa_context_ref(c);
511 
512     if (c->version < 12) {
513         pa_context_fail(c, PA_ERR_PROTOCOL);
514         goto finish;
515     }
516 
517     if (pa_tagstruct_getu32(t, &channel) < 0 ||
518         pa_tagstruct_getu32(t, &di) < 0 ||
519         pa_tagstruct_gets(t, &dn) < 0 ||
520         pa_tagstruct_get_boolean(t, &suspended) < 0) {
521         pa_context_fail(c, PA_ERR_PROTOCOL);
522         goto finish;
523     }
524 
525     if (c->version >= 13) {
526 
527         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
528             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
529                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
530                 pa_tagstruct_get_usec(t, &usec) < 0) {
531                 pa_context_fail(c, PA_ERR_PROTOCOL);
532                 goto finish;
533             }
534         } else {
535             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
536                 pa_tagstruct_getu32(t, &tlength) < 0 ||
537                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
538                 pa_tagstruct_getu32(t, &minreq) < 0 ||
539                 pa_tagstruct_get_usec(t, &usec) < 0) {
540                 pa_context_fail(c, PA_ERR_PROTOCOL);
541                 goto finish;
542             }
543         }
544     }
545 
546     if (!pa_tagstruct_eof(t)) {
547         pa_context_fail(c, PA_ERR_PROTOCOL);
548         goto finish;
549     }
550 
551     if (!dn || di == PA_INVALID_INDEX) {
552         pa_context_fail(c, PA_ERR_PROTOCOL);
553         goto finish;
554     }
555 
556     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
557         goto finish;
558 
559     if (s->state != PA_STREAM_READY)
560         goto finish;
561 
562     if (c->version >= 13) {
563         if (s->direction == PA_STREAM_RECORD)
564             s->timing_info.configured_source_usec = usec;
565         else
566             s->timing_info.configured_sink_usec = usec;
567 
568         s->buffer_attr.maxlength = maxlength;
569         s->buffer_attr.fragsize = fragsize;
570         s->buffer_attr.tlength = tlength;
571         s->buffer_attr.prebuf = prebuf;
572         s->buffer_attr.minreq = minreq;
573     }
574 
575     pa_xfree(s->device_name);
576     s->device_name = pa_xstrdup(dn);
577     s->device_index = di;
578 
579     s->suspended = suspended;
580 
581     if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
582         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
583         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
584         request_auto_timing_update(s, true);
585     }
586 
587     check_smoother_status(s, true, false, false);
588     request_auto_timing_update(s, true);
589 
590     if (s->moved_callback)
591         s->moved_callback(s, s->moved_userdata);
592 
593 finish:
594     pa_context_unref(c);
595 }
596 
pa_command_stream_buffer_attr(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)597 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
598     pa_context *c = userdata;
599     pa_stream *s;
600     uint32_t channel;
601     pa_usec_t usec = 0;
602     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
603 
604     pa_assert(pd);
605     pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
606     pa_assert(t);
607     pa_assert(c);
608     pa_assert(PA_REFCNT_VALUE(c) >= 1);
609 
610     pa_context_ref(c);
611 
612     if (c->version < 15) {
613         pa_context_fail(c, PA_ERR_PROTOCOL);
614         goto finish;
615     }
616 
617     if (pa_tagstruct_getu32(t, &channel) < 0) {
618         pa_context_fail(c, PA_ERR_PROTOCOL);
619         goto finish;
620     }
621 
622     if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
623         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
624             pa_tagstruct_getu32(t, &fragsize) < 0 ||
625             pa_tagstruct_get_usec(t, &usec) < 0) {
626             pa_context_fail(c, PA_ERR_PROTOCOL);
627             goto finish;
628         }
629     } else {
630         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
631             pa_tagstruct_getu32(t, &tlength) < 0 ||
632             pa_tagstruct_getu32(t, &prebuf) < 0 ||
633             pa_tagstruct_getu32(t, &minreq) < 0 ||
634             pa_tagstruct_get_usec(t, &usec) < 0) {
635             pa_context_fail(c, PA_ERR_PROTOCOL);
636             goto finish;
637         }
638     }
639 
640     if (!pa_tagstruct_eof(t)) {
641         pa_context_fail(c, PA_ERR_PROTOCOL);
642         goto finish;
643     }
644 
645     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
646         goto finish;
647 
648     if (s->state != PA_STREAM_READY)
649         goto finish;
650 
651     if (s->direction == PA_STREAM_RECORD)
652         s->timing_info.configured_source_usec = usec;
653     else
654         s->timing_info.configured_sink_usec = usec;
655 
656     s->buffer_attr.maxlength = maxlength;
657     s->buffer_attr.fragsize = fragsize;
658     s->buffer_attr.tlength = tlength;
659     s->buffer_attr.prebuf = prebuf;
660     s->buffer_attr.minreq = minreq;
661 
662     request_auto_timing_update(s, true);
663 
664     if (s->buffer_attr_callback)
665         s->buffer_attr_callback(s, s->buffer_attr_userdata);
666 
667 finish:
668     pa_context_unref(c);
669 }
670 
pa_command_stream_suspended(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)671 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
672     pa_context *c = userdata;
673     pa_stream *s;
674     uint32_t channel;
675     bool suspended;
676 
677     pa_assert(pd);
678     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
679     pa_assert(t);
680     pa_assert(c);
681     pa_assert(PA_REFCNT_VALUE(c) >= 1);
682 
683     pa_context_ref(c);
684 
685     if (c->version < 12) {
686         pa_context_fail(c, PA_ERR_PROTOCOL);
687         goto finish;
688     }
689 
690     if (pa_tagstruct_getu32(t, &channel) < 0 ||
691         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
692         !pa_tagstruct_eof(t)) {
693         pa_context_fail(c, PA_ERR_PROTOCOL);
694         goto finish;
695     }
696 
697     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
698         goto finish;
699 
700     if (s->state != PA_STREAM_READY)
701         goto finish;
702 
703     s->suspended = suspended;
704 
705     if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
706         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
707         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
708         request_auto_timing_update(s, true);
709     }
710 
711     check_smoother_status(s, true, false, false);
712     request_auto_timing_update(s, true);
713 
714     if (s->suspended_callback)
715         s->suspended_callback(s, s->suspended_userdata);
716 
717 finish:
718     pa_context_unref(c);
719 }
720 
pa_command_stream_started(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)721 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
722     pa_context *c = userdata;
723     pa_stream *s;
724     uint32_t channel;
725 
726     pa_assert(pd);
727     pa_assert(command == PA_COMMAND_STARTED);
728     pa_assert(t);
729     pa_assert(c);
730     pa_assert(PA_REFCNT_VALUE(c) >= 1);
731 
732     pa_context_ref(c);
733 
734     if (c->version < 13) {
735         pa_context_fail(c, PA_ERR_PROTOCOL);
736         goto finish;
737     }
738 
739     if (pa_tagstruct_getu32(t, &channel) < 0 ||
740         !pa_tagstruct_eof(t)) {
741         pa_context_fail(c, PA_ERR_PROTOCOL);
742         goto finish;
743     }
744 
745     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
746         goto finish;
747 
748     if (s->state != PA_STREAM_READY)
749         goto finish;
750 
751     check_smoother_status(s, true, true, false);
752     request_auto_timing_update(s, true);
753 
754     if (s->started_callback)
755         s->started_callback(s, s->started_userdata);
756 
757 finish:
758     pa_context_unref(c);
759 }
760 
pa_command_stream_event(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)761 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
762     pa_context *c = userdata;
763     pa_stream *s;
764     uint32_t channel;
765     pa_proplist *pl = NULL;
766     const char *event;
767 
768     pa_assert(pd);
769     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
770     pa_assert(t);
771     pa_assert(c);
772     pa_assert(PA_REFCNT_VALUE(c) >= 1);
773 
774     pa_context_ref(c);
775 
776     if (c->version < 15) {
777         pa_context_fail(c, PA_ERR_PROTOCOL);
778         goto finish;
779     }
780 
781     pl = pa_proplist_new();
782 
783     if (pa_tagstruct_getu32(t, &channel) < 0 ||
784         pa_tagstruct_gets(t, &event) < 0 ||
785         pa_tagstruct_get_proplist(t, pl) < 0 ||
786         !pa_tagstruct_eof(t) || !event) {
787         pa_context_fail(c, PA_ERR_PROTOCOL);
788         goto finish;
789     }
790 
791     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
792         goto finish;
793 
794     if (s->state != PA_STREAM_READY)
795         goto finish;
796 
797     if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) {
798         /* Let client know what the running time was when the stream had to be killed  */
799         pa_usec_t stream_time;
800         if (pa_stream_get_time(s, &stream_time) == 0)
801             pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) stream_time);
802     }
803 
804     if (s->event_callback)
805         s->event_callback(s, event, pl, s->event_userdata);
806 
807 finish:
808     pa_context_unref(c);
809 
810     if (pl)
811         pa_proplist_free(pl);
812 }
813 
pa_command_request(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)814 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
815     pa_stream *s;
816     pa_context *c = userdata;
817     uint32_t bytes, channel;
818 
819     pa_assert(pd);
820     pa_assert(command == PA_COMMAND_REQUEST);
821     pa_assert(t);
822     pa_assert(c);
823     pa_assert(PA_REFCNT_VALUE(c) >= 1);
824 
825     pa_context_ref(c);
826 
827     if (pa_tagstruct_getu32(t, &channel) < 0 ||
828         pa_tagstruct_getu32(t, &bytes) < 0 ||
829         !pa_tagstruct_eof(t)) {
830         pa_context_fail(c, PA_ERR_PROTOCOL);
831         goto finish;
832     }
833 
834     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
835         goto finish;
836 
837     if (s->state != PA_STREAM_READY)
838         goto finish;
839 
840     s->requested_bytes += bytes;
841 
842 #ifdef STREAM_DEBUG
843     pa_log_debug("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes);
844 #endif
845 
846     if (s->requested_bytes > 0 && s->write_callback)
847         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
848 
849 finish:
850     pa_context_unref(c);
851 }
852 
pa_stream_get_underflow_index(const pa_stream * p)853 int64_t pa_stream_get_underflow_index(const pa_stream *p) {
854     pa_assert(p);
855     return p->latest_underrun_at_index;
856 }
857 
pa_command_overflow_or_underflow(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)858 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
859     pa_stream *s;
860     pa_context *c = userdata;
861     uint32_t channel;
862     int64_t offset = -1;
863 
864     pa_assert(pd);
865     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
866     pa_assert(t);
867     pa_assert(c);
868     pa_assert(PA_REFCNT_VALUE(c) >= 1);
869 
870     pa_context_ref(c);
871 
872     if (pa_tagstruct_getu32(t, &channel) < 0) {
873         pa_context_fail(c, PA_ERR_PROTOCOL);
874         goto finish;
875     }
876 
877     if (c->version >= 23 && command == PA_COMMAND_UNDERFLOW) {
878         if (pa_tagstruct_gets64(t, &offset) < 0) {
879             pa_context_fail(c, PA_ERR_PROTOCOL);
880             goto finish;
881         }
882     }
883 
884     if (!pa_tagstruct_eof(t)) {
885         pa_context_fail(c, PA_ERR_PROTOCOL);
886         goto finish;
887     }
888 
889     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
890         goto finish;
891 
892     if (s->state != PA_STREAM_READY)
893         goto finish;
894 
895     if (offset != -1)
896         s->latest_underrun_at_index = offset;
897 
898     if (s->buffer_attr.prebuf > 0)
899         check_smoother_status(s, true, false, true);
900 
901     request_auto_timing_update(s, true);
902 
903     if (command == PA_COMMAND_OVERFLOW) {
904         if (s->overflow_callback)
905             s->overflow_callback(s, s->overflow_userdata);
906     } else if (command == PA_COMMAND_UNDERFLOW) {
907         if (s->underflow_callback)
908             s->underflow_callback(s, s->underflow_userdata);
909     }
910 
911 finish:
912     pa_context_unref(c);
913 }
914 
invalidate_indexes(pa_stream * s,bool r,bool w)915 static void invalidate_indexes(pa_stream *s, bool r, bool w) {
916     pa_assert(s);
917     pa_assert(PA_REFCNT_VALUE(s) >= 1);
918 
919 #ifdef STREAM_DEBUG
920     pa_log_debug("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag);
921 #endif
922 
923     if (s->state != PA_STREAM_READY)
924         return;
925 
926     if (w) {
927         s->write_index_not_before = s->context->ctag;
928 
929         if (s->timing_info_valid)
930             s->timing_info.write_index_corrupt = true;
931 
932 #ifdef STREAM_DEBUG
933         pa_log_debug("write_index invalidated");
934 #endif
935     }
936 
937     if (r) {
938         s->read_index_not_before = s->context->ctag;
939 
940         if (s->timing_info_valid)
941             s->timing_info.read_index_corrupt = true;
942 
943 #ifdef STREAM_DEBUG
944         pa_log_debug("read_index invalidated");
945 #endif
946     }
947 
948     request_auto_timing_update(s, true);
949 }
950 
auto_timing_update_callback(pa_mainloop_api * m,pa_time_event * e,const struct timeval * t,void * userdata)951 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
952     pa_stream *s = userdata;
953 
954     pa_assert(s);
955     pa_assert(PA_REFCNT_VALUE(s) >= 1);
956 
957     pa_stream_ref(s);
958     request_auto_timing_update(s, false);
959     pa_stream_unref(s);
960 }
961 
create_stream_complete(pa_stream * s)962 static void create_stream_complete(pa_stream *s) {
963     pa_assert(s);
964     pa_assert(PA_REFCNT_VALUE(s) >= 1);
965     pa_assert(s->state == PA_STREAM_CREATING);
966 
967     pa_stream_set_state(s, PA_STREAM_READY);
968 
969     if (s->requested_bytes > 0 && s->write_callback)
970         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
971 
972     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
973         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
974         pa_assert(!s->auto_timing_update_event);
975         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
976 
977         request_auto_timing_update(s, true);
978     }
979 
980     check_smoother_status(s, true, false, false);
981 }
982 
patch_buffer_attr(pa_stream * s,pa_buffer_attr * attr,pa_stream_flags_t * flags)983 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
984     const char *e;
985 
986     pa_assert(s);
987     pa_assert(attr);
988 
989     if ((e = getenv("PULSE_LATENCY_MSEC"))) {
990         uint32_t ms;
991         pa_sample_spec ss;
992 
993         pa_sample_spec_init(&ss);
994 
995         if (pa_sample_spec_valid(&s->sample_spec))
996             ss = s->sample_spec;
997         else if (s->n_formats == 1)
998             pa_format_info_to_sample_spec(s->req_formats[0], &ss, NULL);
999 
1000         if (pa_atou(e, &ms) < 0 || ms <= 0)
1001             pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
1002         else if (!pa_sample_spec_valid(&s->sample_spec))
1003             pa_log_debug("Ignoring $PULSE_LATENCY_MSEC: %s (invalid sample spec)", e);
1004         else {
1005             attr->maxlength = (uint32_t) -1;
1006             attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &ss);
1007             attr->minreq = (uint32_t) -1;
1008             attr->prebuf = (uint32_t) -1;
1009             attr->fragsize = attr->tlength;
1010 
1011             if (flags)
1012                 *flags |= PA_STREAM_ADJUST_LATENCY;
1013         }
1014     }
1015 
1016     if (s->context->version >= 13)
1017         return;
1018 
1019     /* Version older than 0.9.10 didn't do server side buffer_attr
1020      * selection, hence we have to fake it on the client side. */
1021 
1022     /* We choose fairly conservative values here, to not confuse
1023      * old clients with extremely large playback buffers */
1024 
1025     if (attr->maxlength == (uint32_t) -1)
1026         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
1027 
1028     if (attr->tlength == (uint32_t) -1)
1029         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
1030 
1031     if (attr->minreq == (uint32_t) -1)
1032         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
1033 
1034     if (attr->prebuf == (uint32_t) -1)
1035         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
1036 
1037     if (attr->fragsize == (uint32_t) -1)
1038         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
1039 }
1040 
pa_create_stream_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1041 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1042     pa_stream *s = userdata;
1043     uint32_t requested_bytes = 0;
1044 
1045     pa_assert(pd);
1046     pa_assert(s);
1047     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1048     pa_assert(s->state == PA_STREAM_CREATING);
1049 
1050     pa_stream_ref(s);
1051 
1052     if (command != PA_COMMAND_REPLY) {
1053         if (pa_context_handle_error(s->context, command, t, false) < 0)
1054             goto finish;
1055 
1056         pa_stream_set_state(s, PA_STREAM_FAILED);
1057         goto finish;
1058     }
1059 
1060     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
1061         s->channel == PA_INVALID_INDEX ||
1062         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
1063         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
1064         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1065         goto finish;
1066     }
1067 
1068     s->requested_bytes = (int64_t) requested_bytes;
1069 
1070     if (s->context->version >= 9) {
1071         if (s->direction == PA_STREAM_PLAYBACK) {
1072             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1073                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1074                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1075                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1076                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1077                 goto finish;
1078             }
1079         } else if (s->direction == PA_STREAM_RECORD) {
1080             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1081                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1082                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1083                 goto finish;
1084             }
1085         }
1086     }
1087 
1088     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1089         pa_sample_spec ss;
1090         pa_channel_map cm;
1091         const char *dn = NULL;
1092         bool suspended;
1093 
1094         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1095             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1096             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1097             pa_tagstruct_gets(t, &dn) < 0 ||
1098             pa_tagstruct_get_boolean(t, &suspended) < 0) {
1099             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1100             goto finish;
1101         }
1102 
1103         if (!dn || s->device_index == PA_INVALID_INDEX ||
1104             ss.channels != cm.channels ||
1105             !pa_channel_map_valid(&cm) ||
1106             !pa_sample_spec_valid(&ss) ||
1107             (s->n_formats == 0 && (
1108                 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1109                 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1110                 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1111             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1112             goto finish;
1113         }
1114 
1115         pa_xfree(s->device_name);
1116         s->device_name = pa_xstrdup(dn);
1117         s->suspended = suspended;
1118 
1119         s->channel_map = cm;
1120         s->sample_spec = ss;
1121     }
1122 
1123     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1124         pa_usec_t usec;
1125 
1126         if (pa_tagstruct_get_usec(t, &usec) < 0) {
1127             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1128             goto finish;
1129         }
1130 
1131         if (s->direction == PA_STREAM_RECORD)
1132             s->timing_info.configured_source_usec = usec;
1133         else
1134             s->timing_info.configured_sink_usec = usec;
1135     }
1136 
1137     if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1138         || s->context->version >= 22) {
1139 
1140         pa_format_info *f = pa_format_info_new();
1141 
1142         if (pa_tagstruct_get_format_info(t, f) < 0 || !pa_format_info_valid(f)) {
1143             pa_format_info_free(f);
1144             if (s->n_formats > 0) {
1145                 /* We used the extended API, so we should have got back a proper format */
1146                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1147                 goto finish;
1148             }
1149         } else
1150             s->format = f;
1151     }
1152 
1153     if (!pa_tagstruct_eof(t)) {
1154         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1155         goto finish;
1156     }
1157 
1158     if (s->direction == PA_STREAM_RECORD) {
1159         pa_assert(!s->record_memblockq);
1160 
1161         s->record_memblockq = pa_memblockq_new(
1162                 "client side record memblockq",
1163                 0,
1164                 s->buffer_attr.maxlength,
1165                 0,
1166                 &s->sample_spec,
1167                 1,
1168                 0,
1169                 0,
1170                 NULL);
1171     }
1172 
1173     s->channel_valid = true;
1174     pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1175 
1176     create_stream_complete(s);
1177 
1178 finish:
1179     pa_stream_unref(s);
1180 }
1181 
create_stream(pa_stream_direction_t direction,pa_stream * s,const char * dev,const pa_buffer_attr * attr,pa_stream_flags_t flags,const pa_cvolume * volume,pa_stream * sync_stream)1182 static int create_stream(
1183         pa_stream_direction_t direction,
1184         pa_stream *s,
1185         const char *dev,
1186         const pa_buffer_attr *attr,
1187         pa_stream_flags_t flags,
1188         const pa_cvolume *volume,
1189         pa_stream *sync_stream) {
1190 
1191     pa_tagstruct *t;
1192     uint32_t tag;
1193     bool volume_set = !!volume;
1194     pa_cvolume cv;
1195     uint32_t i;
1196 
1197     pa_assert(s);
1198     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1199     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1200 
1201     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1202     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1203     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1204     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1205                                               PA_STREAM_INTERPOLATE_TIMING|
1206                                               PA_STREAM_NOT_MONOTONIC|
1207                                               PA_STREAM_AUTO_TIMING_UPDATE|
1208                                               PA_STREAM_NO_REMAP_CHANNELS|
1209                                               PA_STREAM_NO_REMIX_CHANNELS|
1210                                               PA_STREAM_FIX_FORMAT|
1211                                               PA_STREAM_FIX_RATE|
1212                                               PA_STREAM_FIX_CHANNELS|
1213                                               PA_STREAM_DONT_MOVE|
1214                                               PA_STREAM_VARIABLE_RATE|
1215                                               PA_STREAM_PEAK_DETECT|
1216                                               PA_STREAM_START_MUTED|
1217                                               PA_STREAM_ADJUST_LATENCY|
1218                                               PA_STREAM_EARLY_REQUESTS|
1219                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1220                                               PA_STREAM_START_UNMUTED|
1221                                               PA_STREAM_FAIL_ON_SUSPEND|
1222                                               PA_STREAM_RELATIVE_VOLUME|
1223                                               PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1224 
1225     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1226     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1227     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1228     /* Although some of the other flags are not supported on older
1229      * version, we don't check for them here, because it doesn't hurt
1230      * when they are passed but actually not supported. This makes
1231      * client development easier */
1232 
1233     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1234     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1235     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1236 
1237     pa_stream_ref(s);
1238 
1239     s->direction = direction;
1240 
1241     if (sync_stream)
1242         s->syncid = sync_stream->syncid;
1243 
1244     if (attr)
1245         s->buffer_attr = *attr;
1246     patch_buffer_attr(s, &s->buffer_attr, &flags);
1247 
1248     s->flags = flags;
1249     s->corked = !!(flags & PA_STREAM_START_CORKED);
1250 
1251     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1252         pa_usec_t x;
1253 
1254         x = pa_rtclock_now();
1255 
1256         pa_assert(!s->smoother);
1257         s->smoother = pa_smoother_new(
1258                 SMOOTHER_ADJUST_TIME,
1259                 SMOOTHER_HISTORY_TIME,
1260                 !(flags & PA_STREAM_NOT_MONOTONIC),
1261                 true,
1262                 SMOOTHER_MIN_HISTORY,
1263                 x,
1264                 true);
1265     }
1266 
1267     if (!dev)
1268         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1269 
1270     t = pa_tagstruct_command(
1271             s->context,
1272             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1273             &tag);
1274 
1275     if (s->context->version < 13)
1276         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1277 
1278     pa_tagstruct_put(
1279             t,
1280             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1281             PA_TAG_CHANNEL_MAP, &s->channel_map,
1282             PA_TAG_U32, PA_INVALID_INDEX,
1283             PA_TAG_STRING, dev,
1284             PA_TAG_U32, s->buffer_attr.maxlength,
1285             PA_TAG_BOOLEAN, s->corked,
1286             PA_TAG_INVALID);
1287 
1288     if (!volume) {
1289         if (pa_sample_spec_valid(&s->sample_spec))
1290             volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1291         else {
1292             /* This is not really relevant, since no volume was set, and
1293              * the real number of channels is embedded in the format_info
1294              * structure */
1295             volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1296         }
1297     }
1298 
1299     if (s->direction == PA_STREAM_PLAYBACK) {
1300         pa_tagstruct_put(
1301                 t,
1302                 PA_TAG_U32, s->buffer_attr.tlength,
1303                 PA_TAG_U32, s->buffer_attr.prebuf,
1304                 PA_TAG_U32, s->buffer_attr.minreq,
1305                 PA_TAG_U32, s->syncid,
1306                 PA_TAG_INVALID);
1307 
1308         pa_tagstruct_put_cvolume(t, volume);
1309     } else
1310         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1311 
1312     if (s->context->version >= 12) {
1313         pa_tagstruct_put(
1314                 t,
1315                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1316                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1317                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1318                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1319                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1320                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1321                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1322                 PA_TAG_INVALID);
1323     }
1324 
1325     if (s->context->version >= 13) {
1326 
1327         if (s->direction == PA_STREAM_PLAYBACK)
1328             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1329         else
1330             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1331 
1332         pa_tagstruct_put(
1333                 t,
1334                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1335                 PA_TAG_PROPLIST, s->proplist,
1336                 PA_TAG_INVALID);
1337 
1338         if (s->direction == PA_STREAM_RECORD)
1339             pa_tagstruct_putu32(t, s->direct_on_input);
1340     }
1341 
1342     if (s->context->version >= 14) {
1343 
1344         if (s->direction == PA_STREAM_PLAYBACK)
1345             pa_tagstruct_put_boolean(t, volume_set);
1346 
1347         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1348     }
1349 
1350     if (s->context->version >= 15) {
1351 
1352         if (s->direction == PA_STREAM_PLAYBACK)
1353             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1354 
1355         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1356         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1357     }
1358 
1359     if (s->context->version >= 17 && s->direction == PA_STREAM_PLAYBACK)
1360         pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1361 
1362     if (s->context->version >= 18 && s->direction == PA_STREAM_PLAYBACK)
1363         pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1364 
1365     if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1366         || s->context->version >= 22) {
1367 
1368         pa_tagstruct_putu8(t, s->n_formats);
1369         for (i = 0; i < s->n_formats; i++)
1370             pa_tagstruct_put_format_info(t, s->req_formats[i]);
1371     }
1372 
1373     if (s->context->version >= 22 && s->direction == PA_STREAM_RECORD) {
1374         pa_tagstruct_put_cvolume(t, volume);
1375         pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1376         pa_tagstruct_put_boolean(t, volume_set);
1377         pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1378         pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1379         pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1380     }
1381 
1382     pa_pstream_send_tagstruct(s->context->pstream, t);
1383     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1384 
1385     pa_stream_set_state(s, PA_STREAM_CREATING);
1386 
1387     pa_stream_unref(s);
1388     return 0;
1389 }
1390 
pa_stream_connect_playback(pa_stream * s,const char * dev,const pa_buffer_attr * attr,pa_stream_flags_t flags,const pa_cvolume * volume,pa_stream * sync_stream)1391 int pa_stream_connect_playback(
1392         pa_stream *s,
1393         const char *dev,
1394         const pa_buffer_attr *attr,
1395         pa_stream_flags_t flags,
1396         const pa_cvolume *volume,
1397         pa_stream *sync_stream) {
1398 
1399     pa_assert(s);
1400     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1401 
1402     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1403 }
1404 
pa_stream_connect_record(pa_stream * s,const char * dev,const pa_buffer_attr * attr,pa_stream_flags_t flags)1405 int pa_stream_connect_record(
1406         pa_stream *s,
1407         const char *dev,
1408         const pa_buffer_attr *attr,
1409         pa_stream_flags_t flags) {
1410 
1411     pa_assert(s);
1412     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1413 
1414     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1415 }
1416 
pa_stream_begin_write(pa_stream * s,void ** data,size_t * nbytes)1417 int pa_stream_begin_write(
1418         pa_stream *s,
1419         void **data,
1420         size_t *nbytes) {
1421 
1422     pa_assert(s);
1423     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1424 
1425     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1426     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1427     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1428     PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1429     PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1430 
1431     if (*nbytes != (size_t) -1) {
1432         size_t m, fs;
1433 
1434         m = pa_mempool_block_size_max(s->context->mempool);
1435         fs = pa_frame_size(&s->sample_spec);
1436 
1437         m = (m / fs) * fs;
1438         if (*nbytes > m)
1439             *nbytes = m;
1440     }
1441 
1442     if (!s->write_memblock) {
1443         s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1444         s->write_data = pa_memblock_acquire(s->write_memblock);
1445     }
1446 
1447     *data = s->write_data;
1448     *nbytes = pa_memblock_get_length(s->write_memblock);
1449 
1450     return 0;
1451 }
1452 
pa_stream_cancel_write(pa_stream * s)1453 int pa_stream_cancel_write(
1454         pa_stream *s) {
1455 
1456     pa_assert(s);
1457     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1458 
1459     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1460     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1461     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1462     PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1463 
1464     pa_assert(s->write_data);
1465 
1466     pa_memblock_release(s->write_memblock);
1467     pa_memblock_unref(s->write_memblock);
1468     s->write_memblock = NULL;
1469     s->write_data = NULL;
1470 
1471     return 0;
1472 }
1473 
pa_stream_write_ext_free(pa_stream * s,const void * data,size_t length,pa_free_cb_t free_cb,void * free_cb_data,int64_t offset,pa_seek_mode_t seek)1474 int pa_stream_write_ext_free(
1475         pa_stream *s,
1476         const void *data,
1477         size_t length,
1478         pa_free_cb_t free_cb,
1479         void *free_cb_data,
1480         int64_t offset,
1481         pa_seek_mode_t seek) {
1482 
1483     pa_assert(s);
1484     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1485     pa_assert(data);
1486 
1487     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1488     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1489     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1490     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1491     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1492     PA_CHECK_VALIDITY(s->context,
1493                       !s->write_memblock ||
1494                       ((data >= s->write_data) &&
1495                        ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1496                       PA_ERR_INVALID);
1497     PA_CHECK_VALIDITY(s->context, offset % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID);
1498     PA_CHECK_VALIDITY(s->context, length % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID);
1499     PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1500 
1501     if (s->write_memblock) {
1502         pa_memchunk chunk;
1503 
1504         /* pa_stream_write_begin() was called before */
1505 
1506         pa_memblock_release(s->write_memblock);
1507 
1508         chunk.memblock = s->write_memblock;
1509         chunk.index = (const char *) data - (const char *) s->write_data;
1510         chunk.length = length;
1511 
1512         s->write_memblock = NULL;
1513         s->write_data = NULL;
1514 
1515         pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1516         pa_memblock_unref(chunk.memblock);
1517 
1518     } else {
1519         pa_seek_mode_t t_seek = seek;
1520         int64_t t_offset = offset;
1521         size_t t_length = length;
1522         const void *t_data = data;
1523 
1524         /* pa_stream_write_begin() was not called before */
1525 
1526         while (t_length > 0) {
1527             pa_memchunk chunk;
1528 
1529             chunk.index = 0;
1530 
1531             if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1532                 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, free_cb_data, 1);
1533                 chunk.length = t_length;
1534             } else {
1535                 void *d;
1536                 size_t blk_size_max;
1537 
1538                 /* Break large audio streams into _aligned_ blocks or the
1539                  * other endpoint will happily discard them upon arrival. */
1540                 blk_size_max = pa_frame_align(pa_mempool_block_size_max(s->context->mempool), &s->sample_spec);
1541                 chunk.length = PA_MIN(t_length, blk_size_max);
1542                 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1543 
1544                 d = pa_memblock_acquire(chunk.memblock);
1545                 memcpy(d, t_data, chunk.length);
1546                 pa_memblock_release(chunk.memblock);
1547             }
1548 
1549             pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1550 
1551             t_offset = 0;
1552             t_seek = PA_SEEK_RELATIVE;
1553 
1554             t_data = (const uint8_t*) t_data + chunk.length;
1555             t_length -= chunk.length;
1556 
1557             pa_memblock_unref(chunk.memblock);
1558         }
1559 
1560         if (free_cb && pa_pstream_get_shm(s->context->pstream))
1561             free_cb(free_cb_data);
1562     }
1563 
1564     /* This is obviously wrong since we ignore the seeking index . But
1565      * that's OK, the server side applies the same error */
1566     s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1567 
1568 #ifdef STREAM_DEBUG
1569     pa_log_debug("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes);
1570 #endif
1571 
1572     if (s->direction == PA_STREAM_PLAYBACK) {
1573 
1574         /* Update latency request correction */
1575         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1576 
1577             if (seek == PA_SEEK_ABSOLUTE) {
1578                 s->write_index_corrections[s->current_write_index_correction].corrupt = false;
1579                 s->write_index_corrections[s->current_write_index_correction].absolute = true;
1580                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1581             } else if (seek == PA_SEEK_RELATIVE) {
1582                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1583                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1584             } else
1585                 s->write_index_corrections[s->current_write_index_correction].corrupt = true;
1586         }
1587 
1588         /* Update the write index in the already available latency data */
1589         if (s->timing_info_valid) {
1590 
1591             if (seek == PA_SEEK_ABSOLUTE) {
1592                 s->timing_info.write_index_corrupt = false;
1593                 s->timing_info.write_index = offset + (int64_t) length;
1594             } else if (seek == PA_SEEK_RELATIVE) {
1595                 if (!s->timing_info.write_index_corrupt)
1596                     s->timing_info.write_index += offset + (int64_t) length;
1597             } else
1598                 s->timing_info.write_index_corrupt = true;
1599         }
1600 
1601         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1602             request_auto_timing_update(s, true);
1603     }
1604 
1605     return 0;
1606 }
1607 
pa_stream_write(pa_stream * s,const void * data,size_t length,pa_free_cb_t free_cb,int64_t offset,pa_seek_mode_t seek)1608 int pa_stream_write(
1609         pa_stream *s,
1610         const void *data,
1611         size_t length,
1612         pa_free_cb_t free_cb,
1613         int64_t offset,
1614         pa_seek_mode_t seek) {
1615 
1616     return pa_stream_write_ext_free(s, data, length, free_cb, (void*) data, offset, seek);
1617 }
1618 
pa_stream_peek(pa_stream * s,const void ** data,size_t * length)1619 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1620     pa_assert(s);
1621     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1622     pa_assert(data);
1623     pa_assert(length);
1624 
1625     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1626     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1627     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1628 
1629     if (!s->peek_memchunk.memblock) {
1630 
1631         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1632             /* record_memblockq is empty. */
1633             *data = NULL;
1634             *length = 0;
1635             return 0;
1636 
1637         } else if (!s->peek_memchunk.memblock) {
1638             /* record_memblockq isn't empty, but it doesn't have any data at
1639              * the current read index. */
1640             *data = NULL;
1641             *length = s->peek_memchunk.length;
1642             return 0;
1643         }
1644 
1645         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1646     }
1647 
1648     pa_assert(s->peek_data);
1649     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1650     *length = s->peek_memchunk.length;
1651     return 0;
1652 }
1653 
pa_stream_drop(pa_stream * s)1654 int pa_stream_drop(pa_stream *s) {
1655     pa_assert(s);
1656     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1657 
1658     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1659     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1660     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1661     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.length > 0, PA_ERR_BADSTATE);
1662 
1663     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1664 
1665     /* Fix the simulated local read index */
1666     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1667         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1668 
1669     if (s->peek_memchunk.memblock) {
1670         pa_assert(s->peek_data);
1671         s->peek_data = NULL;
1672         pa_memblock_release(s->peek_memchunk.memblock);
1673         pa_memblock_unref(s->peek_memchunk.memblock);
1674     }
1675 
1676     pa_memchunk_reset(&s->peek_memchunk);
1677 
1678     return 0;
1679 }
1680 
pa_stream_writable_size(const pa_stream * s)1681 size_t pa_stream_writable_size(const pa_stream *s) {
1682     pa_assert(s);
1683     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1684 
1685     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1686     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1687     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1688 
1689     return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1690 }
1691 
pa_stream_readable_size(const pa_stream * s)1692 size_t pa_stream_readable_size(const pa_stream *s) {
1693     pa_assert(s);
1694     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1695 
1696     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1697     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1698     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1699 
1700     return pa_memblockq_get_length(s->record_memblockq);
1701 }
1702 
pa_stream_drain(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)1703 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1704     pa_operation *o;
1705     pa_tagstruct *t;
1706     uint32_t tag;
1707 
1708     pa_assert(s);
1709     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1710 
1711     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1712     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1713     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1714 
1715     /* Ask for a timing update before we cork/uncork to get the best
1716      * accuracy for the transport latency suitable for the
1717      * check_smoother_status() call in the started callback */
1718     request_auto_timing_update(s, true);
1719 
1720     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1721 
1722     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1723     pa_tagstruct_putu32(t, s->channel);
1724     pa_pstream_send_tagstruct(s->context->pstream, t);
1725     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1726 
1727     /* This might cause the read index to continue again, hence
1728      * let's request a timing update */
1729     request_auto_timing_update(s, true);
1730 
1731     return o;
1732 }
1733 
calc_time(const pa_stream * s,bool ignore_transport)1734 static pa_usec_t calc_time(const pa_stream *s, bool ignore_transport) {
1735     pa_usec_t usec;
1736 
1737     pa_assert(s);
1738     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1739     pa_assert(s->state == PA_STREAM_READY);
1740     pa_assert(s->direction != PA_STREAM_UPLOAD);
1741     pa_assert(s->timing_info_valid);
1742     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1743     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1744 
1745     if (s->direction == PA_STREAM_PLAYBACK) {
1746         /* The last byte that was written into the output device
1747          * had this time value associated */
1748         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1749 
1750         if (!s->corked && !s->suspended) {
1751 
1752             if (!ignore_transport)
1753                 /* Because the latency info took a little time to come
1754                  * to us, we assume that the real output time is actually
1755                  * a little ahead */
1756                 usec += s->timing_info.transport_usec;
1757 
1758             /* However, the output device usually maintains a buffer
1759                too, hence the real sample currently played is a little
1760                back  */
1761             if (s->timing_info.sink_usec >= usec)
1762                 usec = 0;
1763             else
1764                 usec -= s->timing_info.sink_usec;
1765         }
1766 
1767     } else {
1768         pa_assert(s->direction == PA_STREAM_RECORD);
1769 
1770         /* The last byte written into the server side queue had
1771          * this time value associated */
1772         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1773 
1774         if (!s->corked && !s->suspended) {
1775 
1776             if (!ignore_transport)
1777                 /* Add transport latency */
1778                 usec += s->timing_info.transport_usec;
1779 
1780             /* Add latency of data in device buffer */
1781             usec += s->timing_info.source_usec;
1782 
1783             /* If this is a monitor source, we need to correct the
1784              * time by the playback device buffer */
1785             if (s->timing_info.sink_usec >= usec)
1786                 usec = 0;
1787             else
1788                 usec -= s->timing_info.sink_usec;
1789         }
1790     }
1791 
1792     return usec;
1793 }
1794 
stream_get_timing_info_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1795 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1796     pa_operation *o = userdata;
1797     struct timeval local, remote, now;
1798     pa_timing_info *i;
1799     bool playing = false;
1800     uint64_t underrun_for = 0, playing_for = 0;
1801 
1802     pa_assert(pd);
1803     pa_assert(o);
1804     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1805 
1806     if (!o->context || !o->stream)
1807         goto finish;
1808 
1809     i = &o->stream->timing_info;
1810 
1811     o->stream->timing_info_valid = false;
1812     i->write_index_corrupt = true;
1813     i->read_index_corrupt = true;
1814 
1815     if (command != PA_COMMAND_REPLY) {
1816         if (pa_context_handle_error(o->context, command, t, false) < 0)
1817             goto finish;
1818 
1819     } else {
1820 
1821         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1822             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1823             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1824             pa_tagstruct_get_timeval(t, &local) < 0 ||
1825             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1826             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1827             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1828 
1829             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1830             goto finish;
1831         }
1832 
1833         if (o->context->version >= 13 &&
1834             o->stream->direction == PA_STREAM_PLAYBACK)
1835             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1836                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1837 
1838                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1839                 goto finish;
1840             }
1841 
1842         if (!pa_tagstruct_eof(t)) {
1843             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1844             goto finish;
1845         }
1846         o->stream->timing_info_valid = true;
1847         i->write_index_corrupt = false;
1848         i->read_index_corrupt = false;
1849 
1850         i->playing = (int) playing;
1851         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1852 
1853         pa_gettimeofday(&now);
1854 
1855         /* Calculate timestamps */
1856         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1857             /* local and remote seem to have synchronized clocks */
1858 
1859             if (o->stream->direction == PA_STREAM_PLAYBACK)
1860                 i->transport_usec = pa_timeval_diff(&remote, &local);
1861             else
1862                 i->transport_usec = pa_timeval_diff(&now, &remote);
1863 
1864             i->synchronized_clocks = true;
1865             i->timestamp = remote;
1866         } else {
1867             /* clocks are not synchronized, let's estimate latency then */
1868             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1869             i->synchronized_clocks = false;
1870             i->timestamp = local;
1871             pa_timeval_add(&i->timestamp, i->transport_usec);
1872         }
1873 
1874         /* Invalidate read and write indexes if necessary */
1875         if (tag < o->stream->read_index_not_before)
1876             i->read_index_corrupt = true;
1877 
1878         if (tag < o->stream->write_index_not_before)
1879             i->write_index_corrupt = true;
1880 
1881         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1882             /* Write index correction */
1883 
1884             int n, j;
1885             uint32_t ctag = tag;
1886 
1887             /* Go through the saved correction values and add up the
1888              * total correction.*/
1889             for (n = 0, j = o->stream->current_write_index_correction+1;
1890                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1891                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1892 
1893                 /* Step over invalid data or out-of-date data */
1894                 if (!o->stream->write_index_corrections[j].valid ||
1895                     o->stream->write_index_corrections[j].tag < ctag)
1896                     continue;
1897 
1898                 /* Make sure that everything is in order */
1899                 ctag = o->stream->write_index_corrections[j].tag+1;
1900 
1901                 /* Now fix the write index */
1902                 if (o->stream->write_index_corrections[j].corrupt) {
1903                     /* A corrupting seek was made */
1904                     i->write_index_corrupt = true;
1905                 } else if (o->stream->write_index_corrections[j].absolute) {
1906                     /* An absolute seek was made */
1907                     i->write_index = o->stream->write_index_corrections[j].value;
1908                     i->write_index_corrupt = false;
1909                 } else if (!i->write_index_corrupt) {
1910                     /* A relative seek was made */
1911                     i->write_index += o->stream->write_index_corrections[j].value;
1912                 }
1913             }
1914 
1915             /* Clear old correction entries */
1916             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1917                 if (!o->stream->write_index_corrections[n].valid)
1918                     continue;
1919 
1920                 if (o->stream->write_index_corrections[n].tag <= tag)
1921                     o->stream->write_index_corrections[n].valid = false;
1922             }
1923         }
1924 
1925         if (o->stream->direction == PA_STREAM_RECORD) {
1926             /* Read index correction */
1927 
1928             if (!i->read_index_corrupt)
1929                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1930         }
1931 
1932         /* Update smoother if we're not corked */
1933         if (o->stream->smoother && !o->stream->corked) {
1934             pa_usec_t u, x;
1935 
1936             u = x = pa_rtclock_now() - i->transport_usec;
1937 
1938             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1939                 pa_usec_t su;
1940 
1941                 /* If we weren't playing then it will take some time
1942                  * until the audio will actually come out through the
1943                  * speakers. Since we follow that timing here, we need
1944                  * to try to fix this up */
1945 
1946                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1947 
1948                 if (su < i->sink_usec)
1949                     x += i->sink_usec - su;
1950             }
1951 
1952             if (!i->playing)
1953                 pa_smoother_pause(o->stream->smoother, x);
1954 
1955             /* Update the smoother */
1956             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1957                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1958                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, true));
1959 
1960             if (i->playing)
1961                 pa_smoother_resume(o->stream->smoother, x, true);
1962         }
1963     }
1964 
1965     o->stream->auto_timing_update_requested = false;
1966 
1967     if (o->stream->latency_update_callback)
1968         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1969 
1970     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1971         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1972         cb(o->stream, o->stream->timing_info_valid, o->userdata);
1973     }
1974 
1975 finish:
1976 
1977     pa_operation_done(o);
1978     pa_operation_unref(o);
1979 }
1980 
pa_stream_update_timing_info(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)1981 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1982     uint32_t tag;
1983     pa_operation *o;
1984     pa_tagstruct *t;
1985     struct timeval now;
1986     int cidx = 0;
1987 
1988     pa_assert(s);
1989     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1990 
1991     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1992     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1993     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1994 
1995     if (s->direction == PA_STREAM_PLAYBACK) {
1996         /* Find a place to store the write_index correction data for this entry */
1997         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1998 
1999         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
2000         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
2001     }
2002     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2003 
2004     t = pa_tagstruct_command(
2005             s->context,
2006             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
2007             &tag);
2008     pa_tagstruct_putu32(t, s->channel);
2009     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
2010 
2011     pa_pstream_send_tagstruct(s->context->pstream, t);
2012     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2013 
2014     if (s->direction == PA_STREAM_PLAYBACK) {
2015         /* Fill in initial correction data */
2016 
2017         s->current_write_index_correction = cidx;
2018 
2019         s->write_index_corrections[cidx].valid = true;
2020         s->write_index_corrections[cidx].absolute = false;
2021         s->write_index_corrections[cidx].corrupt = false;
2022         s->write_index_corrections[cidx].tag = tag;
2023         s->write_index_corrections[cidx].value = 0;
2024     }
2025 
2026     return o;
2027 }
2028 
pa_stream_disconnect_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)2029 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2030     pa_stream *s = userdata;
2031 
2032     pa_assert(pd);
2033     pa_assert(s);
2034     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2035 
2036     pa_stream_ref(s);
2037 
2038     if (command != PA_COMMAND_REPLY) {
2039         if (pa_context_handle_error(s->context, command, t, false) < 0)
2040             goto finish;
2041 
2042         pa_stream_set_state(s, PA_STREAM_FAILED);
2043         goto finish;
2044     } else if (!pa_tagstruct_eof(t)) {
2045         pa_context_fail(s->context, PA_ERR_PROTOCOL);
2046         goto finish;
2047     }
2048 
2049     pa_stream_set_state(s, PA_STREAM_TERMINATED);
2050 
2051 finish:
2052     pa_stream_unref(s);
2053 }
2054 
pa_stream_disconnect(pa_stream * s)2055 int pa_stream_disconnect(pa_stream *s) {
2056     pa_tagstruct *t;
2057     uint32_t tag;
2058 
2059     pa_assert(s);
2060     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2061 
2062     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2063     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
2064     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
2065 
2066     pa_stream_ref(s);
2067 
2068     t = pa_tagstruct_command(
2069             s->context,
2070             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
2071                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
2072             &tag);
2073     pa_tagstruct_putu32(t, s->channel);
2074     pa_pstream_send_tagstruct(s->context->pstream, t);
2075     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
2076 
2077     pa_stream_unref(s);
2078     return 0;
2079 }
2080 
pa_stream_set_read_callback(pa_stream * s,pa_stream_request_cb_t cb,void * userdata)2081 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2082     pa_assert(s);
2083     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2084 
2085     if (pa_detect_fork())
2086         return;
2087 
2088     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2089         return;
2090 
2091     s->read_callback = cb;
2092     s->read_userdata = userdata;
2093 }
2094 
pa_stream_set_write_callback(pa_stream * s,pa_stream_request_cb_t cb,void * userdata)2095 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2096     pa_assert(s);
2097     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2098 
2099     if (pa_detect_fork())
2100         return;
2101 
2102     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2103         return;
2104 
2105     s->write_callback = cb;
2106     s->write_userdata = userdata;
2107 }
2108 
pa_stream_set_state_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2109 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2110     pa_assert(s);
2111     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2112 
2113     if (pa_detect_fork())
2114         return;
2115 
2116     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2117         return;
2118 
2119     s->state_callback = cb;
2120     s->state_userdata = userdata;
2121 }
2122 
pa_stream_set_overflow_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2123 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2124     pa_assert(s);
2125     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2126 
2127     if (pa_detect_fork())
2128         return;
2129 
2130     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2131         return;
2132 
2133     s->overflow_callback = cb;
2134     s->overflow_userdata = userdata;
2135 }
2136 
pa_stream_set_underflow_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2137 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2138     pa_assert(s);
2139     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2140 
2141     if (pa_detect_fork())
2142         return;
2143 
2144     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2145         return;
2146 
2147     s->underflow_callback = cb;
2148     s->underflow_userdata = userdata;
2149 }
2150 
pa_stream_set_latency_update_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2151 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2152     pa_assert(s);
2153     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2154 
2155     if (pa_detect_fork())
2156         return;
2157 
2158     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2159         return;
2160 
2161     s->latency_update_callback = cb;
2162     s->latency_update_userdata = userdata;
2163 }
2164 
pa_stream_set_moved_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2165 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2166     pa_assert(s);
2167     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2168 
2169     if (pa_detect_fork())
2170         return;
2171 
2172     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2173         return;
2174 
2175     s->moved_callback = cb;
2176     s->moved_userdata = userdata;
2177 }
2178 
pa_stream_set_suspended_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2179 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2180     pa_assert(s);
2181     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2182 
2183     if (pa_detect_fork())
2184         return;
2185 
2186     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2187         return;
2188 
2189     s->suspended_callback = cb;
2190     s->suspended_userdata = userdata;
2191 }
2192 
pa_stream_set_started_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2193 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2194     pa_assert(s);
2195     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2196 
2197     if (pa_detect_fork())
2198         return;
2199 
2200     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2201         return;
2202 
2203     s->started_callback = cb;
2204     s->started_userdata = userdata;
2205 }
2206 
pa_stream_set_event_callback(pa_stream * s,pa_stream_event_cb_t cb,void * userdata)2207 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2208     pa_assert(s);
2209     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2210 
2211     if (pa_detect_fork())
2212         return;
2213 
2214     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2215         return;
2216 
2217     s->event_callback = cb;
2218     s->event_userdata = userdata;
2219 }
2220 
pa_stream_set_buffer_attr_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2221 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2222     pa_assert(s);
2223     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2224 
2225     if (pa_detect_fork())
2226         return;
2227 
2228     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2229         return;
2230 
2231     s->buffer_attr_callback = cb;
2232     s->buffer_attr_userdata = userdata;
2233 }
2234 
pa_stream_simple_ack_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)2235 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2236     pa_operation *o = userdata;
2237     int success = 1;
2238 
2239     pa_assert(pd);
2240     pa_assert(o);
2241     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2242 
2243     if (!o->context)
2244         goto finish;
2245 
2246     if (command != PA_COMMAND_REPLY) {
2247         if (pa_context_handle_error(o->context, command, t, false) < 0)
2248             goto finish;
2249 
2250         success = 0;
2251     } else if (!pa_tagstruct_eof(t)) {
2252         pa_context_fail(o->context, PA_ERR_PROTOCOL);
2253         goto finish;
2254     }
2255 
2256     if (o->callback) {
2257         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2258         cb(o->stream, success, o->userdata);
2259     }
2260 
2261 finish:
2262     pa_operation_done(o);
2263     pa_operation_unref(o);
2264 }
2265 
pa_stream_cork(pa_stream * s,int b,pa_stream_success_cb_t cb,void * userdata)2266 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2267     pa_operation *o;
2268     pa_tagstruct *t;
2269     uint32_t tag;
2270 
2271     pa_assert(s);
2272     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2273 
2274     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2275     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2276     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2277 
2278     /* Ask for a timing update before we cork/uncork to get the best
2279      * accuracy for the transport latency suitable for the
2280      * check_smoother_status() call in the started callback */
2281     request_auto_timing_update(s, true);
2282 
2283     s->corked = b;
2284 
2285     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2286 
2287     t = pa_tagstruct_command(
2288             s->context,
2289             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2290             &tag);
2291     pa_tagstruct_putu32(t, s->channel);
2292     pa_tagstruct_put_boolean(t, !!b);
2293     pa_pstream_send_tagstruct(s->context->pstream, t);
2294     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2295 
2296     check_smoother_status(s, false, false, false);
2297 
2298     /* This might cause the indexes to hang/start again, hence let's
2299      * request a timing update, after the cork/uncork, too */
2300     request_auto_timing_update(s, true);
2301 
2302     return o;
2303 }
2304 
stream_send_simple_command(pa_stream * s,uint32_t command,pa_stream_success_cb_t cb,void * userdata)2305 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2306     pa_tagstruct *t;
2307     pa_operation *o;
2308     uint32_t tag;
2309 
2310     pa_assert(s);
2311     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2312 
2313     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2314     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2315 
2316     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2317 
2318     t = pa_tagstruct_command(s->context, command, &tag);
2319     pa_tagstruct_putu32(t, s->channel);
2320     pa_pstream_send_tagstruct(s->context->pstream, t);
2321     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2322 
2323     return o;
2324 }
2325 
pa_stream_flush(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)2326 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2327     pa_operation *o;
2328 
2329     pa_assert(s);
2330     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2331 
2332     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2333     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2334     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2335 
2336     /* Ask for a timing update *before* the flush, so that the
2337      * transport usec is as up to date as possible when we get the
2338      * underflow message and update the smoother status*/
2339     request_auto_timing_update(s, true);
2340 
2341     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2342         return NULL;
2343 
2344     if (s->direction == PA_STREAM_PLAYBACK) {
2345 
2346         if (s->write_index_corrections[s->current_write_index_correction].valid)
2347             s->write_index_corrections[s->current_write_index_correction].corrupt = true;
2348 
2349         if (s->buffer_attr.prebuf > 0)
2350             check_smoother_status(s, false, false, true);
2351 
2352         /* This will change the write index, but leave the
2353          * read index untouched. */
2354         invalidate_indexes(s, false, true);
2355 
2356     } else
2357         /* For record streams this has no influence on the write
2358          * index, but the read index might jump. */
2359         invalidate_indexes(s, true, false);
2360 
2361     /* Note that we do not update requested_bytes here. This is
2362      * because we cannot really know how data actually was dropped
2363      * from the write index due to this. This 'error' will be applied
2364      * by both client and server and hence we should be fine. */
2365 
2366     return o;
2367 }
2368 
pa_stream_prebuf(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)2369 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2370     pa_operation *o;
2371 
2372     pa_assert(s);
2373     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2374 
2375     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2376     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2377     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2378     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2379 
2380     /* Ask for a timing update before we cork/uncork to get the best
2381      * accuracy for the transport latency suitable for the
2382      * check_smoother_status() call in the started callback */
2383     request_auto_timing_update(s, true);
2384 
2385     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2386         return NULL;
2387 
2388     /* This might cause the read index to hang again, hence
2389      * let's request a timing update */
2390     request_auto_timing_update(s, true);
2391 
2392     return o;
2393 }
2394 
pa_stream_trigger(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)2395 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2396     pa_operation *o;
2397 
2398     pa_assert(s);
2399     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2400 
2401     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2402     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2403     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2404     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2405 
2406     /* Ask for a timing update before we cork/uncork to get the best
2407      * accuracy for the transport latency suitable for the
2408      * check_smoother_status() call in the started callback */
2409     request_auto_timing_update(s, true);
2410 
2411     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2412         return NULL;
2413 
2414     /* This might cause the read index to start moving again, hence
2415      * let's request a timing update */
2416     request_auto_timing_update(s, true);
2417 
2418     return o;
2419 }
2420 
pa_stream_set_name(pa_stream * s,const char * name,pa_stream_success_cb_t cb,void * userdata)2421 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2422     pa_operation *o;
2423 
2424     pa_assert(s);
2425     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2426     pa_assert(name);
2427 
2428     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2429     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2430     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2431 
2432     if (s->context->version >= 13) {
2433         pa_proplist *p = pa_proplist_new();
2434 
2435         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2436         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2437         pa_proplist_free(p);
2438     } else {
2439         pa_tagstruct *t;
2440         uint32_t tag;
2441 
2442         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2443         t = pa_tagstruct_command(
2444                 s->context,
2445                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2446                 &tag);
2447         pa_tagstruct_putu32(t, s->channel);
2448         pa_tagstruct_puts(t, name);
2449         pa_pstream_send_tagstruct(s->context->pstream, t);
2450         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2451     }
2452 
2453     return o;
2454 }
2455 
pa_stream_get_time(pa_stream * s,pa_usec_t * r_usec)2456 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2457     pa_usec_t usec;
2458 
2459     pa_assert(s);
2460     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2461 
2462     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2463     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2464     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2465     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2466     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2467     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2468 
2469     if (s->smoother)
2470         usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2471     else
2472         usec = calc_time(s, false);
2473 
2474     /* Make sure the time runs monotonically */
2475     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2476         if (usec < s->previous_time)
2477             usec = s->previous_time;
2478         else
2479             s->previous_time = usec;
2480     }
2481 
2482     if (r_usec)
2483         *r_usec = usec;
2484 
2485     return 0;
2486 }
2487 
time_counter_diff(const pa_stream * s,pa_usec_t a,pa_usec_t b,int * negative)2488 static pa_usec_t time_counter_diff(const pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2489     pa_assert(s);
2490     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2491 
2492     if (negative)
2493         *negative = 0;
2494 
2495     if (a >= b)
2496         return a-b;
2497     else {
2498         if (negative && s->direction == PA_STREAM_RECORD) {
2499             *negative = 1;
2500             return b-a;
2501         } else
2502             return 0;
2503     }
2504 }
2505 
pa_stream_get_latency(pa_stream * s,pa_usec_t * r_usec,int * negative)2506 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2507     pa_usec_t t, c;
2508     int r;
2509     int64_t cindex;
2510 
2511     pa_assert(s);
2512     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2513     pa_assert(r_usec);
2514 
2515     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2516     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2517     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2518     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2519     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2520     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2521 
2522     if ((r = pa_stream_get_time(s, &t)) < 0)
2523         return r;
2524 
2525     if (s->direction == PA_STREAM_PLAYBACK)
2526         cindex = s->timing_info.write_index;
2527     else
2528         cindex = s->timing_info.read_index;
2529 
2530     if (cindex < 0)
2531         cindex = 0;
2532 
2533     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2534 
2535     if (s->direction == PA_STREAM_PLAYBACK)
2536         *r_usec = time_counter_diff(s, c, t, negative);
2537     else
2538         *r_usec = time_counter_diff(s, t, c, negative);
2539 
2540     return 0;
2541 }
2542 
pa_stream_get_timing_info(pa_stream * s)2543 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2544     pa_assert(s);
2545     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2546 
2547     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2548     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2549     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2550     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2551 
2552     return &s->timing_info;
2553 }
2554 
pa_stream_get_sample_spec(pa_stream * s)2555 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2556     pa_assert(s);
2557     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2558 
2559     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2560 
2561     return &s->sample_spec;
2562 }
2563 
pa_stream_get_channel_map(pa_stream * s)2564 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2565     pa_assert(s);
2566     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2567 
2568     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2569 
2570     return &s->channel_map;
2571 }
2572 
pa_stream_get_format_info(const pa_stream * s)2573 const pa_format_info* pa_stream_get_format_info(const pa_stream *s) {
2574     pa_assert(s);
2575     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2576 
2577     /* We don't have the format till routing is done */
2578     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2579     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2580 
2581     return s->format;
2582 }
pa_stream_get_buffer_attr(pa_stream * s)2583 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2584     pa_assert(s);
2585     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2586 
2587     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2588     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2589     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2590 
2591     return &s->buffer_attr;
2592 }
2593 
stream_set_buffer_attr_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)2594 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2595     pa_operation *o = userdata;
2596     int success = 1;
2597 
2598     pa_assert(pd);
2599     pa_assert(o);
2600     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2601 
2602     if (!o->context)
2603         goto finish;
2604 
2605     if (command != PA_COMMAND_REPLY) {
2606         if (pa_context_handle_error(o->context, command, t, false) < 0)
2607             goto finish;
2608 
2609         success = 0;
2610     } else {
2611         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2612             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2613                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2614                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2615                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2616                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2617                 goto finish;
2618             }
2619         } else if (o->stream->direction == PA_STREAM_RECORD) {
2620             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2621                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2622                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2623                 goto finish;
2624             }
2625         }
2626 
2627         if (o->stream->context->version >= 13) {
2628             pa_usec_t usec;
2629 
2630             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2631                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2632                 goto finish;
2633             }
2634 
2635             if (o->stream->direction == PA_STREAM_RECORD)
2636                 o->stream->timing_info.configured_source_usec = usec;
2637             else
2638                 o->stream->timing_info.configured_sink_usec = usec;
2639         }
2640 
2641         if (!pa_tagstruct_eof(t)) {
2642             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2643             goto finish;
2644         }
2645     }
2646 
2647     if (o->callback) {
2648         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2649         cb(o->stream, success, o->userdata);
2650     }
2651 
2652 finish:
2653     pa_operation_done(o);
2654     pa_operation_unref(o);
2655 }
2656 
pa_stream_set_buffer_attr(pa_stream * s,const pa_buffer_attr * attr,pa_stream_success_cb_t cb,void * userdata)2657 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2658     pa_operation *o;
2659     pa_tagstruct *t;
2660     uint32_t tag;
2661     pa_buffer_attr copy;
2662 
2663     pa_assert(s);
2664     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2665     pa_assert(attr);
2666 
2667     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2668     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2669     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2670     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2671 
2672     /* Ask for a timing update before we cork/uncork to get the best
2673      * accuracy for the transport latency suitable for the
2674      * check_smoother_status() call in the started callback */
2675     request_auto_timing_update(s, true);
2676 
2677     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2678 
2679     t = pa_tagstruct_command(
2680             s->context,
2681             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2682             &tag);
2683     pa_tagstruct_putu32(t, s->channel);
2684 
2685     copy = *attr;
2686     patch_buffer_attr(s, &copy, NULL);
2687     attr = &copy;
2688 
2689     pa_tagstruct_putu32(t, attr->maxlength);
2690 
2691     if (s->direction == PA_STREAM_PLAYBACK)
2692         pa_tagstruct_put(
2693                 t,
2694                 PA_TAG_U32, attr->tlength,
2695                 PA_TAG_U32, attr->prebuf,
2696                 PA_TAG_U32, attr->minreq,
2697                 PA_TAG_INVALID);
2698     else
2699         pa_tagstruct_putu32(t, attr->fragsize);
2700 
2701     if (s->context->version >= 13)
2702         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2703 
2704     if (s->context->version >= 14)
2705         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2706 
2707     pa_pstream_send_tagstruct(s->context->pstream, t);
2708     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2709 
2710     /* This might cause changes in the read/write index, hence let's
2711      * request a timing update */
2712     request_auto_timing_update(s, true);
2713 
2714     return o;
2715 }
2716 
pa_stream_get_device_index(const pa_stream * s)2717 uint32_t pa_stream_get_device_index(const pa_stream *s) {
2718     pa_assert(s);
2719     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2720 
2721     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2722     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2723     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2724     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2725     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2726 
2727     return s->device_index;
2728 }
2729 
pa_stream_get_device_name(const pa_stream * s)2730 const char *pa_stream_get_device_name(const pa_stream *s) {
2731     pa_assert(s);
2732     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2733 
2734     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2735     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2736     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2737     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2738     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2739 
2740     return s->device_name;
2741 }
2742 
pa_stream_is_suspended(const pa_stream * s)2743 int pa_stream_is_suspended(const pa_stream *s) {
2744     pa_assert(s);
2745     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2746 
2747     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2748     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2749     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2750     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2751 
2752     return s->suspended;
2753 }
2754 
pa_stream_is_corked(const pa_stream * s)2755 int pa_stream_is_corked(const pa_stream *s) {
2756     pa_assert(s);
2757     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2758 
2759     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2760     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2761     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2762 
2763     return s->corked;
2764 }
2765 
stream_update_sample_rate_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)2766 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2767     pa_operation *o = userdata;
2768     int success = 1;
2769 
2770     pa_assert(pd);
2771     pa_assert(o);
2772     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2773 
2774     if (!o->context)
2775         goto finish;
2776 
2777     if (command != PA_COMMAND_REPLY) {
2778         if (pa_context_handle_error(o->context, command, t, false) < 0)
2779             goto finish;
2780 
2781         success = 0;
2782     } else {
2783 
2784         if (!pa_tagstruct_eof(t)) {
2785             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2786             goto finish;
2787         }
2788     }
2789 
2790     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2791     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2792 
2793     if (o->callback) {
2794         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2795         cb(o->stream, success, o->userdata);
2796     }
2797 
2798 finish:
2799     pa_operation_done(o);
2800     pa_operation_unref(o);
2801 }
2802 
pa_stream_update_sample_rate(pa_stream * s,uint32_t rate,pa_stream_success_cb_t cb,void * userdata)2803 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2804     pa_operation *o;
2805     pa_tagstruct *t;
2806     uint32_t tag;
2807 
2808     pa_assert(s);
2809     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2810 
2811     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2812     PA_CHECK_VALIDITY_RETURN_NULL(s->context, pa_sample_rate_valid(rate), PA_ERR_INVALID);
2813     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2814     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2815     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2816     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2817 
2818     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2819     o->private = PA_UINT_TO_PTR(rate);
2820 
2821     t = pa_tagstruct_command(
2822             s->context,
2823             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2824             &tag);
2825     pa_tagstruct_putu32(t, s->channel);
2826     pa_tagstruct_putu32(t, rate);
2827 
2828     pa_pstream_send_tagstruct(s->context->pstream, t);
2829     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2830 
2831     return o;
2832 }
2833 
pa_stream_proplist_update(pa_stream * s,pa_update_mode_t mode,pa_proplist * p,pa_stream_success_cb_t cb,void * userdata)2834 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2835     pa_operation *o;
2836     pa_tagstruct *t;
2837     uint32_t tag;
2838 
2839     pa_assert(s);
2840     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2841 
2842     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2843     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2844     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2845     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2846     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2847 
2848     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2849 
2850     t = pa_tagstruct_command(
2851             s->context,
2852             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2853             &tag);
2854     pa_tagstruct_putu32(t, s->channel);
2855     pa_tagstruct_putu32(t, (uint32_t) mode);
2856     pa_tagstruct_put_proplist(t, p);
2857 
2858     pa_pstream_send_tagstruct(s->context->pstream, t);
2859     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2860 
2861     /* Please note that we don't update s->proplist here, because we
2862      * don't export that field */
2863 
2864     return o;
2865 }
2866 
pa_stream_proplist_remove(pa_stream * s,const char * const keys[],pa_stream_success_cb_t cb,void * userdata)2867 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2868     pa_operation *o;
2869     pa_tagstruct *t;
2870     uint32_t tag;
2871     const char * const*k;
2872 
2873     pa_assert(s);
2874     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2875 
2876     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2877     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2878     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2879     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2880     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2881 
2882     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2883 
2884     t = pa_tagstruct_command(
2885             s->context,
2886             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2887             &tag);
2888     pa_tagstruct_putu32(t, s->channel);
2889 
2890     for (k = keys; *k; k++)
2891         pa_tagstruct_puts(t, *k);
2892 
2893     pa_tagstruct_puts(t, NULL);
2894 
2895     pa_pstream_send_tagstruct(s->context->pstream, t);
2896     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2897 
2898     /* Please note that we don't update s->proplist here, because we
2899      * don't export that field */
2900 
2901     return o;
2902 }
2903 
pa_stream_set_monitor_stream(pa_stream * s,uint32_t sink_input_idx)2904 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2905     pa_assert(s);
2906     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2907 
2908     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2909     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2910     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2911     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2912 
2913     s->direct_on_input = sink_input_idx;
2914 
2915     return 0;
2916 }
2917 
pa_stream_get_monitor_stream(const pa_stream * s)2918 uint32_t pa_stream_get_monitor_stream(const pa_stream *s) {
2919     pa_assert(s);
2920     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2921 
2922     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2923     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2924     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2925 
2926     return s->direct_on_input;
2927 }
2928