• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /***
2   This file is part of PulseAudio.
3 
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6 
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11 
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16 
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20 
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24 
25 #include <string.h>
26 #include <stdio.h>
27 #include <string.h>
28 
29 #include <pulse/def.h>
30 #include <pulse/timeval.h>
31 #include <pulse/rtclock.h>
32 #include <pulse/xmalloc.h>
33 #include <pulse/fork-detect.h>
34 
35 #include <pulsecore/pstream-util.h>
36 #include <pulsecore/sample-util.h>
37 #include <pulsecore/log.h>
38 #include <pulsecore/hashmap.h>
39 #include <pulsecore/macro.h>
40 #include <pulsecore/core-rtclock.h>
41 #include <pulsecore/core-util.h>
42 
43 #include "internal.h"
44 #include "stream.h"
45 
46 /* #define STREAM_DEBUG */
47 
48 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
49 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
50 
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #ifndef USE_SMOOTHER_2
53 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
54 #define SMOOTHER_MIN_HISTORY (4)
55 #endif
56 
pa_stream_new(pa_context * c,const char * name,const pa_sample_spec * ss,const pa_channel_map * map)57 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
58     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
59 }
60 
reset_callbacks(pa_stream * s)61 static void reset_callbacks(pa_stream *s) {
62     s->read_callback = NULL;
63     s->read_userdata = NULL;
64     s->write_callback = NULL;
65     s->write_userdata = NULL;
66     s->state_callback = NULL;
67     s->state_userdata = NULL;
68     s->overflow_callback = NULL;
69     s->overflow_userdata = NULL;
70     s->underflow_callback = NULL;
71     s->underflow_userdata = NULL;
72     s->latency_update_callback = NULL;
73     s->latency_update_userdata = NULL;
74     s->moved_callback = NULL;
75     s->moved_userdata = NULL;
76     s->suspended_callback = NULL;
77     s->suspended_userdata = NULL;
78     s->started_callback = NULL;
79     s->started_userdata = NULL;
80     s->event_callback = NULL;
81     s->event_userdata = NULL;
82     s->buffer_attr_callback = NULL;
83     s->buffer_attr_userdata = NULL;
84 }
85 
pa_stream_new_with_proplist_internal(pa_context * c,const char * name,const pa_sample_spec * ss,const pa_channel_map * map,pa_format_info * const * formats,unsigned int n_formats,pa_proplist * p)86 static pa_stream *pa_stream_new_with_proplist_internal(
87         pa_context *c,
88         const char *name,
89         const pa_sample_spec *ss,
90         const pa_channel_map *map,
91         pa_format_info * const *formats,
92         unsigned int n_formats,
93         pa_proplist *p) {
94 
95     pa_stream *s;
96     unsigned int i;
97 
98     pa_assert(c);
99     pa_assert(PA_REFCNT_VALUE(c) >= 1);
100     pa_assert((ss == NULL && map == NULL) || (formats == NULL && n_formats == 0));
101     pa_assert(n_formats < PA_MAX_FORMATS);
102 
103     PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
104     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
105 
106     s = pa_xnew(pa_stream, 1);
107     PA_REFCNT_INIT(s);
108     s->context = c;
109     s->mainloop = c->mainloop;
110 
111     s->direction = PA_STREAM_NODIRECTION;
112     s->state = PA_STREAM_UNCONNECTED;
113     s->flags = 0;
114 
115     if (ss)
116         s->sample_spec = *ss;
117     else
118         pa_sample_spec_init(&s->sample_spec);
119 
120     if (map)
121         s->channel_map = *map;
122     else
123         pa_channel_map_init(&s->channel_map);
124 
125     s->n_formats = 0;
126     if (formats) {
127         s->n_formats = n_formats;
128         for (i = 0; i < n_formats; i++)
129             s->req_formats[i] = pa_format_info_copy(formats[i]);
130     }
131 
132     /* We'll get the final negotiated format after connecting */
133     s->format = NULL;
134 
135     s->direct_on_input = PA_INVALID_INDEX;
136 
137     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
138     if (name)
139         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
140 
141     s->channel = 0;
142     s->channel_valid = false;
143     s->syncid = c->csyncid++;
144     s->stream_index = PA_INVALID_INDEX;
145 
146     s->requested_bytes = 0;
147     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
148 
149     /* We initialize the target length here, so that if the user
150      * passes no explicit buffering metrics the default is similar to
151      * what older PA versions provided. */
152 
153     s->buffer_attr.maxlength = (uint32_t) -1;
154     if (ss)
155         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
156     else {
157         /* FIXME: We assume a worst-case compressed format corresponding to
158          * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
159         pa_sample_spec tmp_ss = {
160             .format   = PA_SAMPLE_S16NE,
161             .rate     = 48000,
162             .channels = 2,
163         };
164         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
165     }
166     s->buffer_attr.minreq = (uint32_t) -1;
167     s->buffer_attr.prebuf = (uint32_t) -1;
168     s->buffer_attr.fragsize = (uint32_t) -1;
169 
170     s->device_index = PA_INVALID_INDEX;
171     s->device_name = NULL;
172     s->suspended = false;
173     s->corked = false;
174 
175     s->write_memblock = NULL;
176     s->write_data = NULL;
177 
178     pa_memchunk_reset(&s->peek_memchunk);
179     s->peek_data = NULL;
180     s->record_memblockq = NULL;
181 
182     memset(&s->timing_info, 0, sizeof(s->timing_info));
183     s->timing_info_valid = false;
184 
185     s->previous_time = 0;
186     s->latest_underrun_at_index = -1;
187 
188     s->read_index_not_before = 0;
189     s->write_index_not_before = 0;
190     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
191         s->write_index_corrections[i].valid = 0;
192     s->current_write_index_correction = 0;
193 
194     s->auto_timing_update_event = NULL;
195     s->auto_timing_update_requested = false;
196     s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
197 
198     reset_callbacks(s);
199 
200     s->smoother = NULL;
201 
202     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
203     PA_LLIST_PREPEND(pa_stream, c->streams, s);
204     pa_stream_ref(s);
205 
206     return s;
207 }
208 
pa_stream_new_with_proplist(pa_context * c,const char * name,const pa_sample_spec * ss,const pa_channel_map * map,pa_proplist * p)209 pa_stream *pa_stream_new_with_proplist(
210         pa_context *c,
211         const char *name,
212         const pa_sample_spec *ss,
213         const pa_channel_map *map,
214         pa_proplist *p) {
215 
216     pa_channel_map tmap;
217 
218     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
219     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
220     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
221     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
222     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
223 
224     if (!map)
225         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
226 
227     return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, 0, p);
228 }
229 
pa_stream_new_extended(pa_context * c,const char * name,pa_format_info * const * formats,unsigned int n_formats,pa_proplist * p)230 pa_stream *pa_stream_new_extended(
231         pa_context *c,
232         const char *name,
233         pa_format_info * const *formats,
234         unsigned int n_formats,
235         pa_proplist *p) {
236 
237     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
238 
239     return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, n_formats, p);
240 }
241 
stream_unlink(pa_stream * s)242 static void stream_unlink(pa_stream *s) {
243     pa_operation *o, *n;
244     pa_assert(s);
245 
246     if (!s->context)
247         return;
248 
249     /* Detach from context */
250 
251     /* Unref all operation objects that point to us */
252     for (o = s->context->operations; o; o = n) {
253         n = o->next;
254 
255         if (o->stream == s)
256             pa_operation_cancel(o);
257     }
258 
259     /* Drop all outstanding replies for this stream */
260     if (s->context->pdispatch)
261         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
262 
263     if (s->channel_valid) {
264         pa_hashmap_remove((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel));
265         s->channel = 0;
266         s->channel_valid = false;
267     }
268 
269     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
270     pa_stream_unref(s);
271 
272     s->context = NULL;
273 
274     if (s->auto_timing_update_event) {
275         pa_assert(s->mainloop);
276         s->mainloop->time_free(s->auto_timing_update_event);
277     }
278 
279     reset_callbacks(s);
280 }
281 
stream_free(pa_stream * s)282 static void stream_free(pa_stream *s) {
283     unsigned int i;
284 
285     pa_assert(s);
286 
287     stream_unlink(s);
288 
289     if (s->write_memblock) {
290         if (s->write_data)
291             pa_memblock_release(s->write_memblock);
292         pa_memblock_unref(s->write_memblock);
293     }
294 
295     if (s->peek_memchunk.memblock) {
296         if (s->peek_data)
297             pa_memblock_release(s->peek_memchunk.memblock);
298         pa_memblock_unref(s->peek_memchunk.memblock);
299     }
300 
301     if (s->record_memblockq)
302         pa_memblockq_free(s->record_memblockq);
303 
304     if (s->proplist)
305         pa_proplist_free(s->proplist);
306 
307     if (s->smoother)
308 #ifdef USE_SMOOTHER_2
309         pa_smoother_2_free(s->smoother);
310 #else
311         pa_smoother_free(s->smoother);
312 #endif
313 
314     for (i = 0; i < s->n_formats; i++)
315         pa_format_info_free(s->req_formats[i]);
316 
317     if (s->format)
318         pa_format_info_free(s->format);
319 
320     pa_xfree(s->device_name);
321     pa_xfree(s);
322 }
323 
pa_stream_unref(pa_stream * s)324 void pa_stream_unref(pa_stream *s) {
325     pa_assert(s);
326     pa_assert(PA_REFCNT_VALUE(s) >= 1);
327 
328     if (PA_REFCNT_DEC(s) <= 0)
329         stream_free(s);
330 }
331 
pa_stream_ref(pa_stream * s)332 pa_stream* pa_stream_ref(pa_stream *s) {
333     pa_assert(s);
334     pa_assert(PA_REFCNT_VALUE(s) >= 1);
335 
336     PA_REFCNT_INC(s);
337     return s;
338 }
339 
pa_stream_get_state(const pa_stream * s)340 pa_stream_state_t pa_stream_get_state(const pa_stream *s) {
341     pa_assert(s);
342     pa_assert(PA_REFCNT_VALUE(s) >= 1);
343 
344     return s->state;
345 }
346 
pa_stream_get_context(const pa_stream * s)347 pa_context* pa_stream_get_context(const pa_stream *s) {
348     pa_assert(s);
349     pa_assert(PA_REFCNT_VALUE(s) >= 1);
350 
351     return s->context;
352 }
353 
pa_stream_get_index(const pa_stream * s)354 uint32_t pa_stream_get_index(const pa_stream *s) {
355     pa_assert(s);
356     pa_assert(PA_REFCNT_VALUE(s) >= 1);
357 
358     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
359     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
360 
361     return s->stream_index;
362 }
363 
pa_stream_set_state(pa_stream * s,pa_stream_state_t st)364 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
365     pa_assert(s);
366     pa_assert(PA_REFCNT_VALUE(s) >= 1);
367 
368     if (s->state == st)
369         return;
370 
371     pa_stream_ref(s);
372 
373     s->state = st;
374 
375     if (s->state_callback)
376         s->state_callback(s, s->state_userdata);
377 
378     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
379         stream_unlink(s);
380 
381     pa_stream_unref(s);
382 }
383 
request_auto_timing_update(pa_stream * s,bool force)384 static void request_auto_timing_update(pa_stream *s, bool force) {
385     pa_assert(s);
386     pa_assert(PA_REFCNT_VALUE(s) >= 1);
387 
388     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
389         return;
390 
391     if (s->state == PA_STREAM_READY &&
392         (force || !s->auto_timing_update_requested)) {
393         pa_operation *o;
394 
395 #ifdef STREAM_DEBUG
396         pa_log_debug("Automatically requesting new timing data");
397 #endif
398 
399         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
400             pa_operation_unref(o);
401             s->auto_timing_update_requested = true;
402         }
403     }
404 
405     if (s->auto_timing_update_event) {
406         if (s->suspended && !force) {
407             pa_assert(s->mainloop);
408             s->mainloop->time_free(s->auto_timing_update_event);
409             s->auto_timing_update_event = NULL;
410         } else {
411             if (force)
412                 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
413 
414             pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
415 
416             s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
417         }
418     }
419 }
420 
pa_command_stream_killed(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)421 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
422     pa_context *c = userdata;
423     pa_stream *s;
424     uint32_t channel;
425 
426     pa_assert(pd);
427     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
428     pa_assert(t);
429     pa_assert(c);
430     pa_assert(PA_REFCNT_VALUE(c) >= 1);
431 
432     pa_context_ref(c);
433 
434     if (pa_tagstruct_getu32(t, &channel) < 0 ||
435         !pa_tagstruct_eof(t)) {
436         pa_context_fail(c, PA_ERR_PROTOCOL);
437         goto finish;
438     }
439 
440     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
441         goto finish;
442 
443     if (s->state != PA_STREAM_READY)
444         goto finish;
445 
446     pa_context_set_error(c, PA_ERR_KILLED);
447     pa_stream_set_state(s, PA_STREAM_FAILED);
448 
449 finish:
450     pa_context_unref(c);
451 }
452 
check_smoother_status(pa_stream * s,bool aposteriori,bool force_start,bool force_stop)453 static void check_smoother_status(pa_stream *s, bool aposteriori, bool force_start, bool force_stop) {
454     pa_usec_t x;
455 
456     pa_assert(s);
457     pa_assert(!force_start || !force_stop);
458 
459     if (!s->smoother)
460         return;
461 
462     x = pa_rtclock_now();
463 
464     if (s->timing_info_valid) {
465         if (aposteriori)
466             x -= s->timing_info.transport_usec;
467         else
468             x += s->timing_info.transport_usec;
469     }
470 
471     if (s->suspended || s->corked || force_stop)
472 #ifdef USE_SMOOTHER_2
473         pa_smoother_2_pause(s->smoother, x);
474 #else
475         pa_smoother_pause(s->smoother, x);
476 #endif
477     else if (force_start || s->buffer_attr.prebuf == 0) {
478 
479         if (!s->timing_info_valid &&
480             !aposteriori &&
481             !force_start &&
482             !force_stop &&
483             s->context->version >= 13) {
484 
485             /* If the server supports STARTED events we take them as
486              * indications when audio really starts/stops playing, if
487              * we don't have any timing info yet -- instead of trying
488              * to be smart and guessing the server time. Otherwise the
489              * unknown transport delay adds too much noise to our time
490              * calculations. */
491 
492             return;
493         }
494 
495 #ifdef USE_SMOOTHER_2
496         pa_smoother_2_resume(s->smoother, x);
497 #else
498         pa_smoother_resume(s->smoother, x, true);
499 #endif
500     }
501 
502     /* Please note that we have no idea if playback actually started
503      * if prebuf is non-zero! */
504 }
505 
506 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata);
507 
pa_command_stream_moved(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)508 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
509     pa_context *c = userdata;
510     pa_stream *s;
511     uint32_t channel;
512     const char *dn;
513     bool suspended;
514     uint32_t di;
515     pa_usec_t usec = 0;
516     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
517 
518     pa_assert(pd);
519     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
520     pa_assert(t);
521     pa_assert(c);
522     pa_assert(PA_REFCNT_VALUE(c) >= 1);
523 
524     pa_context_ref(c);
525 
526     if (c->version < 12) {
527         pa_context_fail(c, PA_ERR_PROTOCOL);
528         goto finish;
529     }
530 
531     if (pa_tagstruct_getu32(t, &channel) < 0 ||
532         pa_tagstruct_getu32(t, &di) < 0 ||
533         pa_tagstruct_gets(t, &dn) < 0 ||
534         pa_tagstruct_get_boolean(t, &suspended) < 0) {
535         pa_context_fail(c, PA_ERR_PROTOCOL);
536         goto finish;
537     }
538 
539     if (c->version >= 13) {
540 
541         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
542             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
543                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
544                 pa_tagstruct_get_usec(t, &usec) < 0) {
545                 pa_context_fail(c, PA_ERR_PROTOCOL);
546                 goto finish;
547             }
548         } else {
549             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
550                 pa_tagstruct_getu32(t, &tlength) < 0 ||
551                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
552                 pa_tagstruct_getu32(t, &minreq) < 0 ||
553                 pa_tagstruct_get_usec(t, &usec) < 0) {
554                 pa_context_fail(c, PA_ERR_PROTOCOL);
555                 goto finish;
556             }
557         }
558     }
559 
560     if (!pa_tagstruct_eof(t)) {
561         pa_context_fail(c, PA_ERR_PROTOCOL);
562         goto finish;
563     }
564 
565     if (!dn || di == PA_INVALID_INDEX) {
566         pa_context_fail(c, PA_ERR_PROTOCOL);
567         goto finish;
568     }
569 
570     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
571         goto finish;
572 
573     if (s->state != PA_STREAM_READY)
574         goto finish;
575 
576     if (c->version >= 13) {
577         if (s->direction == PA_STREAM_RECORD)
578             s->timing_info.configured_source_usec = usec;
579         else
580             s->timing_info.configured_sink_usec = usec;
581 
582         s->buffer_attr.maxlength = maxlength;
583         s->buffer_attr.fragsize = fragsize;
584         s->buffer_attr.tlength = tlength;
585         s->buffer_attr.prebuf = prebuf;
586         s->buffer_attr.minreq = minreq;
587     }
588 
589     pa_xfree(s->device_name);
590     s->device_name = pa_xstrdup(dn);
591     s->device_index = di;
592 
593     s->suspended = suspended;
594 
595     if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
596         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
597         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
598         request_auto_timing_update(s, true);
599     }
600 
601     check_smoother_status(s, true, false, false);
602     request_auto_timing_update(s, true);
603 
604     if (s->moved_callback)
605         s->moved_callback(s, s->moved_userdata);
606 
607 finish:
608     pa_context_unref(c);
609 }
610 
pa_command_stream_buffer_attr(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)611 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
612     pa_context *c = userdata;
613     pa_stream *s;
614     uint32_t channel;
615     pa_usec_t usec = 0;
616     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
617 
618     pa_assert(pd);
619     pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
620     pa_assert(t);
621     pa_assert(c);
622     pa_assert(PA_REFCNT_VALUE(c) >= 1);
623 
624     pa_context_ref(c);
625 
626     if (c->version < 15) {
627         pa_context_fail(c, PA_ERR_PROTOCOL);
628         goto finish;
629     }
630 
631     if (pa_tagstruct_getu32(t, &channel) < 0) {
632         pa_context_fail(c, PA_ERR_PROTOCOL);
633         goto finish;
634     }
635 
636     if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
637         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
638             pa_tagstruct_getu32(t, &fragsize) < 0 ||
639             pa_tagstruct_get_usec(t, &usec) < 0) {
640             pa_context_fail(c, PA_ERR_PROTOCOL);
641             goto finish;
642         }
643     } else {
644         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
645             pa_tagstruct_getu32(t, &tlength) < 0 ||
646             pa_tagstruct_getu32(t, &prebuf) < 0 ||
647             pa_tagstruct_getu32(t, &minreq) < 0 ||
648             pa_tagstruct_get_usec(t, &usec) < 0) {
649             pa_context_fail(c, PA_ERR_PROTOCOL);
650             goto finish;
651         }
652     }
653 
654     if (!pa_tagstruct_eof(t)) {
655         pa_context_fail(c, PA_ERR_PROTOCOL);
656         goto finish;
657     }
658 
659     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
660         goto finish;
661 
662     if (s->state != PA_STREAM_READY)
663         goto finish;
664 
665     if (s->direction == PA_STREAM_RECORD)
666         s->timing_info.configured_source_usec = usec;
667     else
668         s->timing_info.configured_sink_usec = usec;
669 
670     s->buffer_attr.maxlength = maxlength;
671     s->buffer_attr.fragsize = fragsize;
672     s->buffer_attr.tlength = tlength;
673     s->buffer_attr.prebuf = prebuf;
674     s->buffer_attr.minreq = minreq;
675 
676     request_auto_timing_update(s, true);
677 
678     if (s->buffer_attr_callback)
679         s->buffer_attr_callback(s, s->buffer_attr_userdata);
680 
681 finish:
682     pa_context_unref(c);
683 }
684 
pa_command_stream_suspended(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)685 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
686     pa_context *c = userdata;
687     pa_stream *s;
688     uint32_t channel;
689     bool suspended;
690 
691     pa_assert(pd);
692     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
693     pa_assert(t);
694     pa_assert(c);
695     pa_assert(PA_REFCNT_VALUE(c) >= 1);
696 
697     pa_context_ref(c);
698 
699     if (c->version < 12) {
700         pa_context_fail(c, PA_ERR_PROTOCOL);
701         goto finish;
702     }
703 
704     if (pa_tagstruct_getu32(t, &channel) < 0 ||
705         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
706         !pa_tagstruct_eof(t)) {
707         pa_context_fail(c, PA_ERR_PROTOCOL);
708         goto finish;
709     }
710 
711     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
712         goto finish;
713 
714     if (s->state != PA_STREAM_READY)
715         goto finish;
716 
717     s->suspended = suspended;
718 
719     if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
720         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
721         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
722         request_auto_timing_update(s, true);
723     }
724 
725     check_smoother_status(s, true, false, false);
726     request_auto_timing_update(s, true);
727 
728     if (s->suspended_callback)
729         s->suspended_callback(s, s->suspended_userdata);
730 
731 finish:
732     pa_context_unref(c);
733 }
734 
pa_command_stream_started(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)735 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
736     pa_context *c = userdata;
737     pa_stream *s;
738     uint32_t channel;
739 
740     pa_assert(pd);
741     pa_assert(command == PA_COMMAND_STARTED);
742     pa_assert(t);
743     pa_assert(c);
744     pa_assert(PA_REFCNT_VALUE(c) >= 1);
745 
746     pa_context_ref(c);
747 
748     if (c->version < 13) {
749         pa_context_fail(c, PA_ERR_PROTOCOL);
750         goto finish;
751     }
752 
753     if (pa_tagstruct_getu32(t, &channel) < 0 ||
754         !pa_tagstruct_eof(t)) {
755         pa_context_fail(c, PA_ERR_PROTOCOL);
756         goto finish;
757     }
758 
759     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
760         goto finish;
761 
762     if (s->state != PA_STREAM_READY)
763         goto finish;
764 
765     check_smoother_status(s, true, true, false);
766     request_auto_timing_update(s, true);
767 
768     if (s->started_callback)
769         s->started_callback(s, s->started_userdata);
770 
771 finish:
772     pa_context_unref(c);
773 }
774 
pa_command_stream_event(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)775 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
776     pa_context *c = userdata;
777     pa_stream *s;
778     uint32_t channel;
779     pa_proplist *pl = NULL;
780     const char *event;
781 
782     pa_assert(pd);
783     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
784     pa_assert(t);
785     pa_assert(c);
786     pa_assert(PA_REFCNT_VALUE(c) >= 1);
787 
788     pa_context_ref(c);
789 
790     if (c->version < 15) {
791         pa_context_fail(c, PA_ERR_PROTOCOL);
792         goto finish;
793     }
794 
795     pl = pa_proplist_new();
796 
797     if (pa_tagstruct_getu32(t, &channel) < 0 ||
798         pa_tagstruct_gets(t, &event) < 0 ||
799         pa_tagstruct_get_proplist(t, pl) < 0 ||
800         !pa_tagstruct_eof(t) || !event) {
801         pa_context_fail(c, PA_ERR_PROTOCOL);
802         goto finish;
803     }
804 
805     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
806         goto finish;
807 
808     if (s->state != PA_STREAM_READY)
809         goto finish;
810 
811     if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) {
812         /* Let client know what the running time was when the stream had to be killed  */
813         pa_usec_t stream_time;
814         if (pa_stream_get_time(s, &stream_time) == 0)
815             pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) stream_time);
816     }
817 
818     if (s->event_callback)
819         s->event_callback(s, event, pl, s->event_userdata);
820 
821 finish:
822     pa_context_unref(c);
823 
824     if (pl)
825         pa_proplist_free(pl);
826 }
827 
pa_command_request(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)828 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
829     pa_stream *s;
830     pa_context *c = userdata;
831     uint32_t bytes, channel;
832 
833     pa_assert(pd);
834     pa_assert(command == PA_COMMAND_REQUEST);
835     pa_assert(t);
836     pa_assert(c);
837     pa_assert(PA_REFCNT_VALUE(c) >= 1);
838 
839     pa_context_ref(c);
840 
841     if (pa_tagstruct_getu32(t, &channel) < 0 ||
842         pa_tagstruct_getu32(t, &bytes) < 0 ||
843         !pa_tagstruct_eof(t)) {
844         pa_context_fail(c, PA_ERR_PROTOCOL);
845         goto finish;
846     }
847 
848     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
849         goto finish;
850 
851     if (s->state != PA_STREAM_READY)
852         goto finish;
853 
854     s->requested_bytes += bytes;
855 
856 #ifdef STREAM_DEBUG
857     pa_log_debug("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes);
858 #endif
859 
860     if (s->requested_bytes > 0 && s->write_callback)
861         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
862 
863 finish:
864     pa_context_unref(c);
865 }
866 
pa_stream_get_underflow_index(const pa_stream * p)867 int64_t pa_stream_get_underflow_index(const pa_stream *p) {
868     pa_assert(p);
869     return p->latest_underrun_at_index;
870 }
871 
pa_command_overflow_or_underflow(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)872 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
873     pa_stream *s;
874     pa_context *c = userdata;
875     uint32_t channel;
876     int64_t offset = -1;
877 
878     pa_assert(pd);
879     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
880     pa_assert(t);
881     pa_assert(c);
882     pa_assert(PA_REFCNT_VALUE(c) >= 1);
883 
884     pa_context_ref(c);
885 
886     if (pa_tagstruct_getu32(t, &channel) < 0) {
887         pa_context_fail(c, PA_ERR_PROTOCOL);
888         goto finish;
889     }
890 
891     if (c->version >= 23 && command == PA_COMMAND_UNDERFLOW) {
892         if (pa_tagstruct_gets64(t, &offset) < 0) {
893             pa_context_fail(c, PA_ERR_PROTOCOL);
894             goto finish;
895         }
896     }
897 
898     if (!pa_tagstruct_eof(t)) {
899         pa_context_fail(c, PA_ERR_PROTOCOL);
900         goto finish;
901     }
902 
903     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
904         goto finish;
905 
906     if (s->state != PA_STREAM_READY)
907         goto finish;
908 
909     if (offset != -1)
910         s->latest_underrun_at_index = offset;
911 
912     if (s->buffer_attr.prebuf > 0)
913         check_smoother_status(s, true, false, true);
914 
915     request_auto_timing_update(s, true);
916 
917     if (command == PA_COMMAND_OVERFLOW) {
918         if (s->overflow_callback)
919             s->overflow_callback(s, s->overflow_userdata);
920     } else if (command == PA_COMMAND_UNDERFLOW) {
921         if (s->underflow_callback)
922             s->underflow_callback(s, s->underflow_userdata);
923     }
924 
925 finish:
926     pa_context_unref(c);
927 }
928 
invalidate_indexes(pa_stream * s,bool r,bool w)929 static void invalidate_indexes(pa_stream *s, bool r, bool w) {
930     pa_assert(s);
931     pa_assert(PA_REFCNT_VALUE(s) >= 1);
932 
933 #ifdef STREAM_DEBUG
934     pa_log_debug("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag);
935 #endif
936 
937     if (s->state != PA_STREAM_READY)
938         return;
939 
940     if (w) {
941         s->write_index_not_before = s->context->ctag;
942 
943         if (s->timing_info_valid)
944             s->timing_info.write_index_corrupt = true;
945 
946 #ifdef STREAM_DEBUG
947         pa_log_debug("write_index invalidated");
948 #endif
949     }
950 
951     if (r) {
952         s->read_index_not_before = s->context->ctag;
953 
954         if (s->timing_info_valid)
955             s->timing_info.read_index_corrupt = true;
956 
957 #ifdef STREAM_DEBUG
958         pa_log_debug("read_index invalidated");
959 #endif
960     }
961 
962     request_auto_timing_update(s, true);
963 }
964 
auto_timing_update_callback(pa_mainloop_api * m,pa_time_event * e,const struct timeval * t,void * userdata)965 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
966     pa_stream *s = userdata;
967 
968     pa_assert(s);
969     pa_assert(PA_REFCNT_VALUE(s) >= 1);
970 
971     pa_stream_ref(s);
972     request_auto_timing_update(s, false);
973     pa_stream_unref(s);
974 }
975 
create_stream_complete(pa_stream * s)976 static void create_stream_complete(pa_stream *s) {
977     pa_assert(s);
978     pa_assert(PA_REFCNT_VALUE(s) >= 1);
979     pa_assert(s->state == PA_STREAM_CREATING);
980 
981     pa_stream_set_state(s, PA_STREAM_READY);
982 
983     if (s->requested_bytes > 0 && s->write_callback)
984         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
985 
986     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
987         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
988         pa_assert(!s->auto_timing_update_event);
989         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
990 
991         request_auto_timing_update(s, true);
992     }
993 
994     check_smoother_status(s, true, false, false);
995 }
996 
patch_buffer_attr(pa_stream * s,pa_buffer_attr * attr,pa_stream_flags_t * flags)997 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
998     const char *e;
999 
1000     pa_assert(s);
1001     pa_assert(attr);
1002 
1003     if ((e = getenv("PULSE_LATENCY_MSEC"))) {
1004         uint32_t ms;
1005         pa_sample_spec ss;
1006 
1007         pa_sample_spec_init(&ss);
1008 
1009         if (pa_sample_spec_valid(&s->sample_spec))
1010             ss = s->sample_spec;
1011         else if (s->n_formats == 1)
1012             pa_format_info_to_sample_spec(s->req_formats[0], &ss, NULL);
1013 
1014         if (pa_atou(e, &ms) < 0 || ms <= 0)
1015             pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
1016         else if (!pa_sample_spec_valid(&s->sample_spec))
1017             pa_log_debug("Ignoring $PULSE_LATENCY_MSEC: %s (invalid sample spec)", e);
1018         else {
1019             attr->maxlength = (uint32_t) -1;
1020             attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &ss);
1021             attr->minreq = (uint32_t) -1;
1022             attr->prebuf = (uint32_t) -1;
1023             attr->fragsize = attr->tlength;
1024 
1025             if (flags)
1026                 *flags |= PA_STREAM_ADJUST_LATENCY;
1027         }
1028     }
1029 
1030     if (s->context->version >= 13)
1031         return;
1032 
1033     /* Version older than 0.9.10 didn't do server side buffer_attr
1034      * selection, hence we have to fake it on the client side. */
1035 
1036     /* We choose fairly conservative values here, to not confuse
1037      * old clients with extremely large playback buffers */
1038 
1039     if (attr->maxlength == (uint32_t) -1)
1040         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
1041 
1042     if (attr->tlength == (uint32_t) -1)
1043         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
1044 
1045     if (attr->minreq == (uint32_t) -1)
1046         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
1047 
1048     if (attr->prebuf == (uint32_t) -1)
1049         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
1050 
1051     if (attr->fragsize == (uint32_t) -1)
1052         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
1053 }
1054 
pa_create_stream_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1055 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1056     pa_stream *s = userdata;
1057     uint32_t requested_bytes = 0;
1058 
1059     pa_assert(pd);
1060     pa_assert(s);
1061     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1062     pa_assert(s->state == PA_STREAM_CREATING);
1063 
1064     pa_stream_ref(s);
1065 
1066     if (command != PA_COMMAND_REPLY) {
1067         if (pa_context_handle_error(s->context, command, t, false) < 0)
1068             goto finish;
1069 
1070         pa_stream_set_state(s, PA_STREAM_FAILED);
1071         goto finish;
1072     }
1073 
1074     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
1075         s->channel == PA_INVALID_INDEX ||
1076         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
1077         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
1078         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1079         goto finish;
1080     }
1081 
1082     s->requested_bytes = (int64_t) requested_bytes;
1083 
1084     if (s->context->version >= 9) {
1085         if (s->direction == PA_STREAM_PLAYBACK) {
1086             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1087                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1088                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1089                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1090                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1091                 goto finish;
1092             }
1093         } else if (s->direction == PA_STREAM_RECORD) {
1094             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1095                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1096                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1097                 goto finish;
1098             }
1099         }
1100     }
1101 
1102     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1103         pa_sample_spec ss;
1104         pa_channel_map cm;
1105         const char *dn = NULL;
1106         bool suspended;
1107 
1108         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1109             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1110             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1111             pa_tagstruct_gets(t, &dn) < 0 ||
1112             pa_tagstruct_get_boolean(t, &suspended) < 0) {
1113             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1114             goto finish;
1115         }
1116 
1117         if (!dn || s->device_index == PA_INVALID_INDEX ||
1118             ss.channels != cm.channels ||
1119             !pa_channel_map_valid(&cm) ||
1120             !pa_sample_spec_valid(&ss) ||
1121             (s->n_formats == 0 && (
1122                 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1123                 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1124                 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1125             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1126             goto finish;
1127         }
1128 
1129         pa_xfree(s->device_name);
1130         s->device_name = pa_xstrdup(dn);
1131         s->suspended = suspended;
1132 
1133         s->channel_map = cm;
1134         s->sample_spec = ss;
1135     }
1136 
1137 #ifdef USE_SMOOTHER_2
1138     if (s->flags & PA_STREAM_INTERPOLATE_TIMING)
1139         pa_smoother_2_set_sample_spec(s->smoother, pa_rtclock_now(), &s->sample_spec);
1140 #endif
1141 
1142     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1143         pa_usec_t usec;
1144 
1145         if (pa_tagstruct_get_usec(t, &usec) < 0) {
1146             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1147             goto finish;
1148         }
1149 
1150         if (s->direction == PA_STREAM_RECORD)
1151             s->timing_info.configured_source_usec = usec;
1152         else
1153             s->timing_info.configured_sink_usec = usec;
1154     }
1155 
1156     if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1157         || s->context->version >= 22) {
1158 
1159         pa_format_info *f = pa_format_info_new();
1160 
1161         if (pa_tagstruct_get_format_info(t, f) < 0 || !pa_format_info_valid(f)) {
1162             pa_format_info_free(f);
1163             if (s->n_formats > 0) {
1164                 /* We used the extended API, so we should have got back a proper format */
1165                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1166                 goto finish;
1167             }
1168         } else
1169             s->format = f;
1170     }
1171 
1172     if (!pa_tagstruct_eof(t)) {
1173         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1174         goto finish;
1175     }
1176 
1177     if (s->direction == PA_STREAM_RECORD) {
1178         pa_assert(!s->record_memblockq);
1179 
1180         s->record_memblockq = pa_memblockq_new(
1181                 "client side record memblockq",
1182                 0,
1183                 s->buffer_attr.maxlength,
1184                 0,
1185                 &s->sample_spec,
1186                 1,
1187                 0,
1188                 0,
1189                 NULL);
1190     }
1191 
1192     s->channel_valid = true;
1193     pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1194 
1195     create_stream_complete(s);
1196 
1197 finish:
1198     pa_stream_unref(s);
1199 }
1200 
create_stream(pa_stream_direction_t direction,pa_stream * s,const char * dev,const pa_buffer_attr * attr,pa_stream_flags_t flags,const pa_cvolume * volume,pa_stream * sync_stream)1201 static int create_stream(
1202         pa_stream_direction_t direction,
1203         pa_stream *s,
1204         const char *dev,
1205         const pa_buffer_attr *attr,
1206         pa_stream_flags_t flags,
1207         const pa_cvolume *volume,
1208         pa_stream *sync_stream) {
1209 
1210     pa_tagstruct *t;
1211     uint32_t tag;
1212     bool volume_set = !!volume;
1213     pa_cvolume cv;
1214     uint32_t i;
1215 
1216     pa_assert(s);
1217     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1218     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1219 
1220     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1221     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1222     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1223     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1224                                               PA_STREAM_INTERPOLATE_TIMING|
1225                                               PA_STREAM_NOT_MONOTONIC|
1226                                               PA_STREAM_AUTO_TIMING_UPDATE|
1227                                               PA_STREAM_NO_REMAP_CHANNELS|
1228                                               PA_STREAM_NO_REMIX_CHANNELS|
1229                                               PA_STREAM_FIX_FORMAT|
1230                                               PA_STREAM_FIX_RATE|
1231                                               PA_STREAM_FIX_CHANNELS|
1232                                               PA_STREAM_DONT_MOVE|
1233                                               PA_STREAM_VARIABLE_RATE|
1234                                               PA_STREAM_PEAK_DETECT|
1235                                               PA_STREAM_START_MUTED|
1236                                               PA_STREAM_ADJUST_LATENCY|
1237                                               PA_STREAM_EARLY_REQUESTS|
1238                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1239                                               PA_STREAM_START_UNMUTED|
1240                                               PA_STREAM_FAIL_ON_SUSPEND|
1241                                               PA_STREAM_RELATIVE_VOLUME|
1242                                               PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1243 
1244     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1245     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1246     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1247     /* Although some of the other flags are not supported on older
1248      * version, we don't check for them here, because it doesn't hurt
1249      * when they are passed but actually not supported. This makes
1250      * client development easier */
1251 
1252     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1253     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1254     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1255 
1256     pa_stream_ref(s);
1257 
1258     s->direction = direction;
1259 
1260     if (sync_stream)
1261         s->syncid = sync_stream->syncid;
1262 
1263     if (attr)
1264         s->buffer_attr = *attr;
1265     patch_buffer_attr(s, &s->buffer_attr, &flags);
1266 
1267     s->flags = flags;
1268     s->corked = !!(flags & PA_STREAM_START_CORKED);
1269 
1270     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1271         pa_usec_t x;
1272 
1273         x = pa_rtclock_now();
1274 
1275         pa_assert(!s->smoother);
1276 #ifdef USE_SMOOTHER_2
1277         s->smoother = pa_smoother_2_new(SMOOTHER_HISTORY_TIME, x, 0, 0);
1278 #else
1279         s->smoother = pa_smoother_new(
1280                 SMOOTHER_ADJUST_TIME,
1281                 SMOOTHER_HISTORY_TIME,
1282                 !(flags & PA_STREAM_NOT_MONOTONIC),
1283                 true,
1284                 SMOOTHER_MIN_HISTORY,
1285                 x,
1286                 true);
1287 #endif
1288     }
1289 
1290     if (!dev)
1291         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1292 
1293     t = pa_tagstruct_command(
1294             s->context,
1295             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1296             &tag);
1297 
1298     if (s->context->version < 13)
1299         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1300 
1301     pa_tagstruct_put(
1302             t,
1303             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1304             PA_TAG_CHANNEL_MAP, &s->channel_map,
1305             PA_TAG_U32, PA_INVALID_INDEX,
1306             PA_TAG_STRING, dev,
1307             PA_TAG_U32, s->buffer_attr.maxlength,
1308             PA_TAG_BOOLEAN, s->corked,
1309             PA_TAG_INVALID);
1310 
1311     if (!volume) {
1312         if (pa_sample_spec_valid(&s->sample_spec))
1313             volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1314         else {
1315             /* This is not really relevant, since no volume was set, and
1316              * the real number of channels is embedded in the format_info
1317              * structure */
1318             volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1319         }
1320     }
1321 
1322     if (s->direction == PA_STREAM_PLAYBACK) {
1323         pa_tagstruct_put(
1324                 t,
1325                 PA_TAG_U32, s->buffer_attr.tlength,
1326                 PA_TAG_U32, s->buffer_attr.prebuf,
1327                 PA_TAG_U32, s->buffer_attr.minreq,
1328                 PA_TAG_U32, s->syncid,
1329                 PA_TAG_INVALID);
1330 
1331         pa_tagstruct_put_cvolume(t, volume);
1332     } else
1333         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1334 
1335     if (s->context->version >= 12) {
1336         pa_tagstruct_put(
1337                 t,
1338                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1339                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1340                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1341                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1342                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1343                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1344                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1345                 PA_TAG_INVALID);
1346     }
1347 
1348     if (s->context->version >= 13) {
1349 
1350         if (s->direction == PA_STREAM_PLAYBACK)
1351             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1352         else
1353             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1354 
1355         pa_tagstruct_put(
1356                 t,
1357                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1358                 PA_TAG_PROPLIST, s->proplist,
1359                 PA_TAG_INVALID);
1360 
1361         if (s->direction == PA_STREAM_RECORD)
1362             pa_tagstruct_putu32(t, s->direct_on_input);
1363     }
1364 
1365     if (s->context->version >= 14) {
1366 
1367         if (s->direction == PA_STREAM_PLAYBACK)
1368             pa_tagstruct_put_boolean(t, volume_set);
1369 
1370         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1371     }
1372 
1373     if (s->context->version >= 15) {
1374 
1375         if (s->direction == PA_STREAM_PLAYBACK)
1376             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1377 
1378         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1379         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1380     }
1381 
1382     if (s->context->version >= 17 && s->direction == PA_STREAM_PLAYBACK)
1383         pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1384 
1385     if (s->context->version >= 18 && s->direction == PA_STREAM_PLAYBACK)
1386         pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1387 
1388     if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1389         || s->context->version >= 22) {
1390 
1391         pa_tagstruct_putu8(t, s->n_formats);
1392         for (i = 0; i < s->n_formats; i++)
1393             pa_tagstruct_put_format_info(t, s->req_formats[i]);
1394     }
1395 
1396     if (s->context->version >= 22 && s->direction == PA_STREAM_RECORD) {
1397         pa_tagstruct_put_cvolume(t, volume);
1398         pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1399         pa_tagstruct_put_boolean(t, volume_set);
1400         pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1401         pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1402         pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1403     }
1404 
1405     pa_pstream_send_tagstruct(s->context->pstream, t);
1406     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1407 
1408     pa_stream_set_state(s, PA_STREAM_CREATING);
1409 
1410     pa_stream_unref(s);
1411     return 0;
1412 }
1413 
pa_stream_connect_playback(pa_stream * s,const char * dev,const pa_buffer_attr * attr,pa_stream_flags_t flags,const pa_cvolume * volume,pa_stream * sync_stream)1414 int pa_stream_connect_playback(
1415         pa_stream *s,
1416         const char *dev,
1417         const pa_buffer_attr *attr,
1418         pa_stream_flags_t flags,
1419         const pa_cvolume *volume,
1420         pa_stream *sync_stream) {
1421 
1422     pa_assert(s);
1423     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1424 
1425     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1426 }
1427 
pa_stream_connect_record(pa_stream * s,const char * dev,const pa_buffer_attr * attr,pa_stream_flags_t flags)1428 int pa_stream_connect_record(
1429         pa_stream *s,
1430         const char *dev,
1431         const pa_buffer_attr *attr,
1432         pa_stream_flags_t flags) {
1433 
1434     pa_assert(s);
1435     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1436 
1437     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1438 }
1439 
pa_stream_begin_write(pa_stream * s,void ** data,size_t * nbytes)1440 int pa_stream_begin_write(
1441         pa_stream *s,
1442         void **data,
1443         size_t *nbytes) {
1444 
1445     pa_assert(s);
1446     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1447 
1448     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1449     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1450     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1451     PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1452     PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1453 
1454     if (*nbytes != (size_t) -1) {
1455         size_t m, fs;
1456 
1457         m = pa_mempool_block_size_max(s->context->mempool);
1458         fs = pa_frame_size(&s->sample_spec);
1459 
1460         m = (m / fs) * fs;
1461         if (*nbytes > m)
1462             *nbytes = m;
1463     }
1464 
1465     if (!s->write_memblock) {
1466         s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1467         s->write_data = pa_memblock_acquire(s->write_memblock);
1468     }
1469 
1470     *data = s->write_data;
1471     *nbytes = pa_memblock_get_length(s->write_memblock);
1472 
1473     return 0;
1474 }
1475 
pa_stream_cancel_write(pa_stream * s)1476 int pa_stream_cancel_write(
1477         pa_stream *s) {
1478 
1479     pa_assert(s);
1480     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1481 
1482     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1483     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1484     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1485     PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1486 
1487     pa_assert(s->write_data);
1488 
1489     pa_memblock_release(s->write_memblock);
1490     pa_memblock_unref(s->write_memblock);
1491     s->write_memblock = NULL;
1492     s->write_data = NULL;
1493 
1494     return 0;
1495 }
1496 
pa_stream_write_ext_free(pa_stream * s,const void * data,size_t length,pa_free_cb_t free_cb,void * free_cb_data,int64_t offset,pa_seek_mode_t seek)1497 int pa_stream_write_ext_free(
1498         pa_stream *s,
1499         const void *data,
1500         size_t length,
1501         pa_free_cb_t free_cb,
1502         void *free_cb_data,
1503         int64_t offset,
1504         pa_seek_mode_t seek) {
1505 
1506     pa_assert(s);
1507     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1508     pa_assert(data);
1509 
1510     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1511     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1512     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1513     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1514     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1515     PA_CHECK_VALIDITY(s->context,
1516                       !s->write_memblock ||
1517                       ((data >= s->write_data) &&
1518                        ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1519                       PA_ERR_INVALID);
1520     PA_CHECK_VALIDITY(s->context, offset % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID);
1521     PA_CHECK_VALIDITY(s->context, length % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID);
1522     PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1523 
1524     if (s->write_memblock) {
1525         pa_memchunk chunk;
1526 
1527         /* pa_stream_write_begin() was called before */
1528 
1529         pa_memblock_release(s->write_memblock);
1530 
1531         chunk.memblock = s->write_memblock;
1532         chunk.index = (const char *) data - (const char *) s->write_data;
1533         chunk.length = length;
1534 
1535         s->write_memblock = NULL;
1536         s->write_data = NULL;
1537 
1538         pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1539         pa_memblock_unref(chunk.memblock);
1540 
1541     } else {
1542         pa_seek_mode_t t_seek = seek;
1543         int64_t t_offset = offset;
1544         size_t t_length = length;
1545         const void *t_data = data;
1546 
1547         /* pa_stream_write_begin() was not called before */
1548 
1549         while (t_length > 0) {
1550             pa_memchunk chunk;
1551 
1552             chunk.index = 0;
1553 
1554             if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1555                 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, free_cb_data, 1);
1556                 chunk.length = t_length;
1557             } else {
1558                 void *d;
1559                 size_t blk_size_max;
1560 
1561                 /* Break large audio streams into _aligned_ blocks or the
1562                  * other endpoint will happily discard them upon arrival. */
1563                 blk_size_max = pa_frame_align(pa_mempool_block_size_max(s->context->mempool), &s->sample_spec);
1564                 chunk.length = PA_MIN(t_length, blk_size_max);
1565                 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1566 
1567                 d = pa_memblock_acquire(chunk.memblock);
1568                 memcpy(d, t_data, chunk.length);
1569                 pa_memblock_release(chunk.memblock);
1570             }
1571 
1572             pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1573 
1574             t_offset = 0;
1575             t_seek = PA_SEEK_RELATIVE;
1576 
1577             t_data = (const uint8_t*) t_data + chunk.length;
1578             t_length -= chunk.length;
1579 
1580             pa_memblock_unref(chunk.memblock);
1581         }
1582 
1583         if (free_cb && pa_pstream_get_shm(s->context->pstream))
1584             free_cb(free_cb_data);
1585     }
1586 
1587     /* This is obviously wrong since we ignore the seeking index . But
1588      * that's OK, the server side applies the same error */
1589     s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1590 
1591 #ifdef STREAM_DEBUG
1592     pa_log_debug("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes);
1593 #endif
1594 
1595     if (s->direction == PA_STREAM_PLAYBACK) {
1596 
1597         /* Update latency request correction */
1598         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1599 
1600             if (seek == PA_SEEK_ABSOLUTE) {
1601                 s->write_index_corrections[s->current_write_index_correction].corrupt = false;
1602                 s->write_index_corrections[s->current_write_index_correction].absolute = true;
1603                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1604             } else if (seek == PA_SEEK_RELATIVE) {
1605                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1606                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1607             } else
1608                 s->write_index_corrections[s->current_write_index_correction].corrupt = true;
1609         }
1610 
1611         /* Update the write index in the already available latency data */
1612         if (s->timing_info_valid) {
1613 
1614             if (seek == PA_SEEK_ABSOLUTE) {
1615                 s->timing_info.write_index_corrupt = false;
1616                 s->timing_info.write_index = offset + (int64_t) length;
1617             } else if (seek == PA_SEEK_RELATIVE) {
1618                 if (!s->timing_info.write_index_corrupt)
1619                     s->timing_info.write_index += offset + (int64_t) length;
1620             } else
1621                 s->timing_info.write_index_corrupt = true;
1622         }
1623 
1624         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1625             request_auto_timing_update(s, true);
1626     }
1627 
1628     return 0;
1629 }
1630 
pa_stream_write(pa_stream * s,const void * data,size_t length,pa_free_cb_t free_cb,int64_t offset,pa_seek_mode_t seek)1631 int pa_stream_write(
1632         pa_stream *s,
1633         const void *data,
1634         size_t length,
1635         pa_free_cb_t free_cb,
1636         int64_t offset,
1637         pa_seek_mode_t seek) {
1638 
1639     return pa_stream_write_ext_free(s, data, length, free_cb, (void*) data, offset, seek);
1640 }
1641 
pa_stream_peek(pa_stream * s,const void ** data,size_t * length)1642 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1643     pa_assert(s);
1644     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1645     pa_assert(data);
1646     pa_assert(length);
1647 
1648     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1649     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1650     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1651 
1652     if (!s->peek_memchunk.memblock) {
1653 
1654         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1655             /* record_memblockq is empty. */
1656             *data = NULL;
1657             *length = 0;
1658             return 0;
1659 
1660         } else if (!s->peek_memchunk.memblock) {
1661             /* record_memblockq isn't empty, but it doesn't have any data at
1662              * the current read index. */
1663             *data = NULL;
1664             *length = s->peek_memchunk.length;
1665             return 0;
1666         }
1667 
1668         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1669     }
1670 
1671     pa_assert(s->peek_data);
1672     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1673     *length = s->peek_memchunk.length;
1674     return 0;
1675 }
1676 
pa_stream_drop(pa_stream * s)1677 int pa_stream_drop(pa_stream *s) {
1678     pa_assert(s);
1679     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1680 
1681     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1682     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1683     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1684     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.length > 0, PA_ERR_BADSTATE);
1685 
1686     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1687 
1688     /* Fix the simulated local read index */
1689     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1690         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1691 
1692     if (s->peek_memchunk.memblock) {
1693         pa_assert(s->peek_data);
1694         s->peek_data = NULL;
1695         pa_memblock_release(s->peek_memchunk.memblock);
1696         pa_memblock_unref(s->peek_memchunk.memblock);
1697     }
1698 
1699     pa_memchunk_reset(&s->peek_memchunk);
1700 
1701     return 0;
1702 }
1703 
pa_stream_writable_size(const pa_stream * s)1704 size_t pa_stream_writable_size(const pa_stream *s) {
1705     pa_assert(s);
1706     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1707 
1708     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1709     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1710     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1711 
1712     return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1713 }
1714 
pa_stream_readable_size(const pa_stream * s)1715 size_t pa_stream_readable_size(const pa_stream *s) {
1716     pa_assert(s);
1717     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1718 
1719     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1720     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1721     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1722 
1723     return pa_memblockq_get_length(s->record_memblockq);
1724 }
1725 
pa_stream_drain(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)1726 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1727     pa_operation *o;
1728     pa_tagstruct *t;
1729     uint32_t tag;
1730 
1731     pa_assert(s);
1732     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1733 
1734     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1735     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1736     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1737 
1738     /* Ask for a timing update before we cork/uncork to get the best
1739      * accuracy for the transport latency suitable for the
1740      * check_smoother_status() call in the started callback */
1741     request_auto_timing_update(s, true);
1742 
1743     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1744 
1745     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1746     pa_tagstruct_putu32(t, s->channel);
1747     pa_pstream_send_tagstruct(s->context->pstream, t);
1748     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1749 
1750     /* This might cause the read index to continue again, hence
1751      * let's request a timing update */
1752     request_auto_timing_update(s, true);
1753 
1754     return o;
1755 }
1756 
calc_time(const pa_stream * s,bool ignore_transport)1757 static pa_usec_t calc_time(const pa_stream *s, bool ignore_transport) {
1758     pa_usec_t usec;
1759 
1760     pa_assert(s);
1761     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1762     pa_assert(s->state == PA_STREAM_READY);
1763     pa_assert(s->direction != PA_STREAM_UPLOAD);
1764     pa_assert(s->timing_info_valid);
1765     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1766     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1767 
1768     if (s->direction == PA_STREAM_PLAYBACK) {
1769         /* The last byte that was written into the output device
1770          * had this time value associated */
1771         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1772 
1773         if (!s->corked && !s->suspended) {
1774 
1775             if (!ignore_transport)
1776                 /* Because the latency info took a little time to come
1777                  * to us, we assume that the real output time is actually
1778                  * a little ahead */
1779                 usec += s->timing_info.transport_usec;
1780 
1781             /* However, the output device usually maintains a buffer
1782                too, hence the real sample currently played is a little
1783                back  */
1784             if (s->timing_info.sink_usec >= usec)
1785                 usec = 0;
1786             else
1787                 usec -= s->timing_info.sink_usec;
1788         }
1789 
1790     } else {
1791         pa_assert(s->direction == PA_STREAM_RECORD);
1792 
1793         /* The last byte written into the server side queue had
1794          * this time value associated */
1795         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1796 
1797         if (!s->corked && !s->suspended) {
1798 
1799             if (!ignore_transport)
1800                 /* Add transport latency */
1801                 usec += s->timing_info.transport_usec;
1802 
1803             /* Add latency of data in device buffer */
1804             usec += s->timing_info.source_usec;
1805 
1806             /* If this is a monitor source, we need to correct the
1807              * time by the playback device buffer */
1808             if (s->timing_info.sink_usec >= usec)
1809                 usec = 0;
1810             else
1811                 usec -= s->timing_info.sink_usec;
1812         }
1813     }
1814 
1815     return usec;
1816 }
1817 
1818 #ifdef USE_SMOOTHER_2
calc_bytes(pa_stream * s,bool ignore_transport)1819 static inline uint64_t calc_bytes(pa_stream *s, bool ignore_transport) {
1820     return (uint64_t)(calc_time(s, ignore_transport) * s->sample_spec.rate / PA_USEC_PER_SEC * pa_frame_size(&s->sample_spec));
1821 }
1822 #endif
1823 
stream_get_timing_info_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1824 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1825     pa_operation *o = userdata;
1826     struct timeval local, remote, now;
1827     pa_timing_info *i;
1828     bool playing = false;
1829     uint64_t underrun_for = 0, playing_for = 0;
1830 
1831     pa_assert(pd);
1832     pa_assert(o);
1833     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1834 
1835     if (!o->context || !o->stream)
1836         goto finish;
1837 
1838     i = &o->stream->timing_info;
1839 
1840     o->stream->timing_info_valid = false;
1841     i->write_index_corrupt = true;
1842     i->read_index_corrupt = true;
1843 
1844     if (command != PA_COMMAND_REPLY) {
1845         if (pa_context_handle_error(o->context, command, t, false) < 0)
1846             goto finish;
1847 
1848     } else {
1849 
1850         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1851             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1852             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1853             pa_tagstruct_get_timeval(t, &local) < 0 ||
1854             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1855             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1856             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1857 
1858             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1859             goto finish;
1860         }
1861 
1862         if (o->context->version >= 13 &&
1863             o->stream->direction == PA_STREAM_PLAYBACK)
1864             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1865                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1866 
1867                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1868                 goto finish;
1869             }
1870 
1871         if (!pa_tagstruct_eof(t)) {
1872             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1873             goto finish;
1874         }
1875         o->stream->timing_info_valid = true;
1876         i->write_index_corrupt = false;
1877         i->read_index_corrupt = false;
1878 
1879         i->playing = (int) playing;
1880         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1881 
1882         pa_gettimeofday(&now);
1883 
1884         /* Calculate timestamps */
1885         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1886             /* local and remote seem to have synchronized clocks */
1887 
1888             if (o->stream->direction == PA_STREAM_PLAYBACK)
1889                 i->transport_usec = pa_timeval_diff(&remote, &local);
1890             else
1891                 i->transport_usec = pa_timeval_diff(&now, &remote);
1892 
1893             i->synchronized_clocks = true;
1894             i->timestamp = remote;
1895         } else {
1896             /* clocks are not synchronized, let's estimate latency then */
1897             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1898             i->synchronized_clocks = false;
1899             i->timestamp = local;
1900             pa_timeval_add(&i->timestamp, i->transport_usec);
1901         }
1902 
1903         /* Invalidate read and write indexes if necessary */
1904         if (tag < o->stream->read_index_not_before)
1905             i->read_index_corrupt = true;
1906 
1907         if (tag < o->stream->write_index_not_before)
1908             i->write_index_corrupt = true;
1909 
1910         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1911             /* Write index correction */
1912 
1913             int n, j;
1914             uint32_t ctag = tag;
1915 
1916             /* Go through the saved correction values and add up the
1917              * total correction.*/
1918             for (n = 0, j = o->stream->current_write_index_correction+1;
1919                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1920                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1921 
1922                 /* Step over invalid data or out-of-date data */
1923                 if (!o->stream->write_index_corrections[j].valid ||
1924                     o->stream->write_index_corrections[j].tag < ctag)
1925                     continue;
1926 
1927                 /* Make sure that everything is in order */
1928                 ctag = o->stream->write_index_corrections[j].tag+1;
1929 
1930                 /* Now fix the write index */
1931                 if (o->stream->write_index_corrections[j].corrupt) {
1932                     /* A corrupting seek was made */
1933                     i->write_index_corrupt = true;
1934                 } else if (o->stream->write_index_corrections[j].absolute) {
1935                     /* An absolute seek was made */
1936                     i->write_index = o->stream->write_index_corrections[j].value;
1937                     i->write_index_corrupt = false;
1938                 } else if (!i->write_index_corrupt) {
1939                     /* A relative seek was made */
1940                     i->write_index += o->stream->write_index_corrections[j].value;
1941                 }
1942             }
1943 
1944             /* Clear old correction entries */
1945             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1946                 if (!o->stream->write_index_corrections[n].valid)
1947                     continue;
1948 
1949                 if (o->stream->write_index_corrections[n].tag <= tag)
1950                     o->stream->write_index_corrections[n].valid = false;
1951             }
1952         }
1953 
1954         if (o->stream->direction == PA_STREAM_RECORD) {
1955             /* Read index correction */
1956 
1957             if (!i->read_index_corrupt)
1958                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1959         }
1960 
1961         /* Update smoother if we're not corked */
1962         if (o->stream->smoother && !o->stream->corked) {
1963             pa_usec_t u, x;
1964 
1965             u = x = pa_rtclock_now() - i->transport_usec;
1966 
1967             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1968                 pa_usec_t su;
1969 
1970                 /* If we weren't playing then it will take some time
1971                  * until the audio will actually come out through the
1972                  * speakers. Since we follow that timing here, we need
1973                  * to try to fix this up */
1974 
1975                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1976 
1977                 if (su < i->sink_usec)
1978                     x += i->sink_usec - su;
1979             }
1980 
1981             if (!i->playing)
1982 #ifdef USE_SMOOTHER_2
1983                 pa_smoother_2_pause(o->stream->smoother, x);
1984 #else
1985                 pa_smoother_pause(o->stream->smoother, x);
1986 #endif
1987 
1988             /* Update the smoother */
1989             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1990                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1991 #ifdef USE_SMOOTHER_2
1992                 pa_smoother_2_put(o->stream->smoother, u, calc_bytes(o->stream, true));
1993 #else
1994                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, true));
1995 #endif
1996 
1997             if (i->playing)
1998 #ifdef USE_SMOOTHER_2
1999                 pa_smoother_2_resume(o->stream->smoother, x);
2000 #else
2001                 pa_smoother_resume(o->stream->smoother, x, true);
2002 #endif
2003         }
2004     }
2005 
2006     o->stream->auto_timing_update_requested = false;
2007 
2008     if (o->stream->latency_update_callback)
2009         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
2010 
2011     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
2012         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2013         cb(o->stream, o->stream->timing_info_valid, o->userdata);
2014     }
2015 
2016 finish:
2017 
2018     pa_operation_done(o);
2019     pa_operation_unref(o);
2020 }
2021 
pa_stream_update_timing_info(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)2022 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2023     uint32_t tag;
2024     pa_operation *o;
2025     pa_tagstruct *t;
2026     struct timeval now;
2027     int cidx = 0;
2028 
2029     pa_assert(s);
2030     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2031 
2032     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2033     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2034     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2035 
2036     if (s->direction == PA_STREAM_PLAYBACK) {
2037         /* Find a place to store the write_index correction data for this entry */
2038         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
2039 
2040         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
2041         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
2042     }
2043     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2044 
2045     t = pa_tagstruct_command(
2046             s->context,
2047             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
2048             &tag);
2049     pa_tagstruct_putu32(t, s->channel);
2050     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
2051 
2052     pa_pstream_send_tagstruct(s->context->pstream, t);
2053     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2054 
2055     if (s->direction == PA_STREAM_PLAYBACK) {
2056         /* Fill in initial correction data */
2057 
2058         s->current_write_index_correction = cidx;
2059 
2060         s->write_index_corrections[cidx].valid = true;
2061         s->write_index_corrections[cidx].absolute = false;
2062         s->write_index_corrections[cidx].corrupt = false;
2063         s->write_index_corrections[cidx].tag = tag;
2064         s->write_index_corrections[cidx].value = 0;
2065     }
2066 
2067     return o;
2068 }
2069 
pa_stream_disconnect_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)2070 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2071     pa_stream *s = userdata;
2072 
2073     pa_assert(pd);
2074     pa_assert(s);
2075     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2076 
2077     pa_stream_ref(s);
2078 
2079     if (command != PA_COMMAND_REPLY) {
2080         if (pa_context_handle_error(s->context, command, t, false) < 0)
2081             goto finish;
2082 
2083         pa_stream_set_state(s, PA_STREAM_FAILED);
2084         goto finish;
2085     } else if (!pa_tagstruct_eof(t)) {
2086         pa_context_fail(s->context, PA_ERR_PROTOCOL);
2087         goto finish;
2088     }
2089 
2090     pa_stream_set_state(s, PA_STREAM_TERMINATED);
2091 
2092 finish:
2093     pa_stream_unref(s);
2094 }
2095 
pa_stream_disconnect(pa_stream * s)2096 int pa_stream_disconnect(pa_stream *s) {
2097     pa_tagstruct *t;
2098     uint32_t tag;
2099 
2100     pa_assert(s);
2101     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2102 
2103     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2104     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
2105     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
2106 
2107     pa_stream_ref(s);
2108 
2109     t = pa_tagstruct_command(
2110             s->context,
2111             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
2112                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
2113             &tag);
2114     pa_tagstruct_putu32(t, s->channel);
2115     pa_pstream_send_tagstruct(s->context->pstream, t);
2116     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
2117 
2118     pa_stream_unref(s);
2119     return 0;
2120 }
2121 
pa_stream_set_read_callback(pa_stream * s,pa_stream_request_cb_t cb,void * userdata)2122 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2123     pa_assert(s);
2124     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2125 
2126     if (pa_detect_fork())
2127         return;
2128 
2129     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2130         return;
2131 
2132     s->read_callback = cb;
2133     s->read_userdata = userdata;
2134 }
2135 
pa_stream_set_write_callback(pa_stream * s,pa_stream_request_cb_t cb,void * userdata)2136 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2137     pa_assert(s);
2138     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2139 
2140     if (pa_detect_fork())
2141         return;
2142 
2143     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2144         return;
2145 
2146     s->write_callback = cb;
2147     s->write_userdata = userdata;
2148 }
2149 
pa_stream_set_state_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2150 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2151     pa_assert(s);
2152     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2153 
2154     if (pa_detect_fork())
2155         return;
2156 
2157     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2158         return;
2159 
2160     s->state_callback = cb;
2161     s->state_userdata = userdata;
2162 }
2163 
pa_stream_set_overflow_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2164 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2165     pa_assert(s);
2166     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2167 
2168     if (pa_detect_fork())
2169         return;
2170 
2171     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2172         return;
2173 
2174     s->overflow_callback = cb;
2175     s->overflow_userdata = userdata;
2176 }
2177 
pa_stream_set_underflow_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2178 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2179     pa_assert(s);
2180     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2181 
2182     if (pa_detect_fork())
2183         return;
2184 
2185     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2186         return;
2187 
2188     s->underflow_callback = cb;
2189     s->underflow_userdata = userdata;
2190 }
2191 
pa_stream_set_latency_update_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2192 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2193     pa_assert(s);
2194     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2195 
2196     if (pa_detect_fork())
2197         return;
2198 
2199     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2200         return;
2201 
2202     s->latency_update_callback = cb;
2203     s->latency_update_userdata = userdata;
2204 }
2205 
pa_stream_set_moved_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2206 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2207     pa_assert(s);
2208     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2209 
2210     if (pa_detect_fork())
2211         return;
2212 
2213     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2214         return;
2215 
2216     s->moved_callback = cb;
2217     s->moved_userdata = userdata;
2218 }
2219 
pa_stream_set_suspended_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2220 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2221     pa_assert(s);
2222     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2223 
2224     if (pa_detect_fork())
2225         return;
2226 
2227     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2228         return;
2229 
2230     s->suspended_callback = cb;
2231     s->suspended_userdata = userdata;
2232 }
2233 
pa_stream_set_started_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2234 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2235     pa_assert(s);
2236     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2237 
2238     if (pa_detect_fork())
2239         return;
2240 
2241     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2242         return;
2243 
2244     s->started_callback = cb;
2245     s->started_userdata = userdata;
2246 }
2247 
pa_stream_set_event_callback(pa_stream * s,pa_stream_event_cb_t cb,void * userdata)2248 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2249     pa_assert(s);
2250     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2251 
2252     if (pa_detect_fork())
2253         return;
2254 
2255     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2256         return;
2257 
2258     s->event_callback = cb;
2259     s->event_userdata = userdata;
2260 }
2261 
pa_stream_set_buffer_attr_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2262 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2263     pa_assert(s);
2264     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2265 
2266     if (pa_detect_fork())
2267         return;
2268 
2269     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2270         return;
2271 
2272     s->buffer_attr_callback = cb;
2273     s->buffer_attr_userdata = userdata;
2274 }
2275 
pa_stream_simple_ack_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)2276 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2277     pa_operation *o = userdata;
2278     int success = 1;
2279 
2280     pa_assert(pd);
2281     pa_assert(o);
2282     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2283 
2284     if (!o->context)
2285         goto finish;
2286 
2287     if (command != PA_COMMAND_REPLY) {
2288         if (pa_context_handle_error(o->context, command, t, false) < 0)
2289             goto finish;
2290 
2291         success = 0;
2292     } else if (!pa_tagstruct_eof(t)) {
2293         pa_context_fail(o->context, PA_ERR_PROTOCOL);
2294         goto finish;
2295     }
2296 
2297     if (o->callback) {
2298         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2299         cb(o->stream, success, o->userdata);
2300     }
2301 
2302 finish:
2303     pa_operation_done(o);
2304     pa_operation_unref(o);
2305 }
2306 
pa_stream_cork(pa_stream * s,int b,pa_stream_success_cb_t cb,void * userdata)2307 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2308     pa_operation *o;
2309     pa_tagstruct *t;
2310     uint32_t tag;
2311 
2312     pa_assert(s);
2313     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2314 
2315     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2316     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2317     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2318 
2319     /* Ask for a timing update before we cork/uncork to get the best
2320      * accuracy for the transport latency suitable for the
2321      * check_smoother_status() call in the started callback */
2322     request_auto_timing_update(s, true);
2323 
2324     s->corked = b;
2325 
2326     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2327 
2328     t = pa_tagstruct_command(
2329             s->context,
2330             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2331             &tag);
2332     pa_tagstruct_putu32(t, s->channel);
2333     pa_tagstruct_put_boolean(t, !!b);
2334     pa_pstream_send_tagstruct(s->context->pstream, t);
2335     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2336 
2337     check_smoother_status(s, false, false, false);
2338 
2339     /* This might cause the indexes to hang/start again, hence let's
2340      * request a timing update, after the cork/uncork, too */
2341     request_auto_timing_update(s, true);
2342 
2343     return o;
2344 }
2345 
stream_send_simple_command(pa_stream * s,uint32_t command,pa_stream_success_cb_t cb,void * userdata)2346 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2347     pa_tagstruct *t;
2348     pa_operation *o;
2349     uint32_t tag;
2350 
2351     pa_assert(s);
2352     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2353 
2354     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2355     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2356 
2357     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2358 
2359     t = pa_tagstruct_command(s->context, command, &tag);
2360     pa_tagstruct_putu32(t, s->channel);
2361     pa_pstream_send_tagstruct(s->context->pstream, t);
2362     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2363 
2364     return o;
2365 }
2366 
pa_stream_flush(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)2367 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2368     pa_operation *o;
2369 
2370     pa_assert(s);
2371     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2372 
2373     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2374     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2375     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2376 
2377     /* Ask for a timing update *before* the flush, so that the
2378      * transport usec is as up to date as possible when we get the
2379      * underflow message and update the smoother status*/
2380     request_auto_timing_update(s, true);
2381 
2382     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2383         return NULL;
2384 
2385     if (s->direction == PA_STREAM_PLAYBACK) {
2386 
2387         if (s->write_index_corrections[s->current_write_index_correction].valid)
2388             s->write_index_corrections[s->current_write_index_correction].corrupt = true;
2389 
2390         if (s->buffer_attr.prebuf > 0)
2391             check_smoother_status(s, false, false, true);
2392 
2393         /* This will change the write index, but leave the
2394          * read index untouched. */
2395         invalidate_indexes(s, false, true);
2396 
2397     } else
2398         /* For record streams this has no influence on the write
2399          * index, but the read index might jump. */
2400         invalidate_indexes(s, true, false);
2401 
2402     /* Note that we do not update requested_bytes here. This is
2403      * because we cannot really know how data actually was dropped
2404      * from the write index due to this. This 'error' will be applied
2405      * by both client and server and hence we should be fine. */
2406 
2407     return o;
2408 }
2409 
pa_stream_prebuf(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)2410 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2411     pa_operation *o;
2412 
2413     pa_assert(s);
2414     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2415 
2416     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2417     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2418     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2419     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2420 
2421     /* Ask for a timing update before we cork/uncork to get the best
2422      * accuracy for the transport latency suitable for the
2423      * check_smoother_status() call in the started callback */
2424     request_auto_timing_update(s, true);
2425 
2426     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2427         return NULL;
2428 
2429     /* This might cause the read index to hang again, hence
2430      * let's request a timing update */
2431     request_auto_timing_update(s, true);
2432 
2433     return o;
2434 }
2435 
pa_stream_trigger(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)2436 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2437     pa_operation *o;
2438 
2439     pa_assert(s);
2440     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2441 
2442     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2443     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2444     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2445     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2446 
2447     /* Ask for a timing update before we cork/uncork to get the best
2448      * accuracy for the transport latency suitable for the
2449      * check_smoother_status() call in the started callback */
2450     request_auto_timing_update(s, true);
2451 
2452     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2453         return NULL;
2454 
2455     /* This might cause the read index to start moving again, hence
2456      * let's request a timing update */
2457     request_auto_timing_update(s, true);
2458 
2459     return o;
2460 }
2461 
pa_stream_set_name(pa_stream * s,const char * name,pa_stream_success_cb_t cb,void * userdata)2462 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2463     pa_operation *o;
2464 
2465     pa_assert(s);
2466     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2467     pa_assert(name);
2468 
2469     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2470     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2471     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2472 
2473     if (s->context->version >= 13) {
2474         pa_proplist *p = pa_proplist_new();
2475 
2476         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2477         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2478         pa_proplist_free(p);
2479     } else {
2480         pa_tagstruct *t;
2481         uint32_t tag;
2482 
2483         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2484         t = pa_tagstruct_command(
2485                 s->context,
2486                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2487                 &tag);
2488         pa_tagstruct_putu32(t, s->channel);
2489         pa_tagstruct_puts(t, name);
2490         pa_pstream_send_tagstruct(s->context->pstream, t);
2491         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2492     }
2493 
2494     return o;
2495 }
2496 
pa_stream_get_time(pa_stream * s,pa_usec_t * r_usec)2497 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2498     pa_usec_t usec;
2499 
2500     pa_assert(s);
2501     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2502 
2503     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2504     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2505     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2506     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2507     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2508     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2509 
2510     if (s->smoother)
2511 #ifdef USE_SMOOTHER_2
2512         usec = pa_smoother_2_get(s->smoother, pa_rtclock_now());
2513 #else
2514         usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2515 #endif
2516 
2517     else
2518         usec = calc_time(s, false);
2519 
2520     /* Make sure the time runs monotonically */
2521     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2522         if (usec < s->previous_time)
2523             usec = s->previous_time;
2524         else
2525             s->previous_time = usec;
2526     }
2527 
2528     if (r_usec)
2529         *r_usec = usec;
2530 
2531     return 0;
2532 }
2533 
time_counter_diff(const pa_stream * s,pa_usec_t a,pa_usec_t b,int * negative)2534 static pa_usec_t time_counter_diff(const pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2535     pa_assert(s);
2536     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2537 
2538     if (negative)
2539         *negative = 0;
2540 
2541     if (a >= b)
2542         return a-b;
2543     else {
2544         if (negative && s->direction == PA_STREAM_RECORD) {
2545             *negative = 1;
2546             return b-a;
2547         } else
2548             return 0;
2549     }
2550 }
2551 
pa_stream_get_latency(pa_stream * s,pa_usec_t * r_usec,int * negative)2552 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2553     pa_usec_t t, c;
2554     int r;
2555     int64_t cindex;
2556 
2557     pa_assert(s);
2558     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2559     pa_assert(r_usec);
2560 
2561     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2562     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2563     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2564     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2565     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2566     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2567 
2568     if ((r = pa_stream_get_time(s, &t)) < 0)
2569         return r;
2570 
2571     if (s->direction == PA_STREAM_PLAYBACK)
2572         cindex = s->timing_info.write_index;
2573     else
2574         cindex = s->timing_info.read_index;
2575 
2576     if (cindex < 0)
2577         cindex = 0;
2578 
2579     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2580 
2581     if (s->direction == PA_STREAM_PLAYBACK)
2582         *r_usec = time_counter_diff(s, c, t, negative);
2583     else
2584         *r_usec = time_counter_diff(s, t, c, negative);
2585 
2586     return 0;
2587 }
2588 
pa_stream_get_timing_info(pa_stream * s)2589 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2590     pa_assert(s);
2591     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2592 
2593     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2594     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2595     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2596     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2597 
2598     return &s->timing_info;
2599 }
2600 
pa_stream_get_sample_spec(pa_stream * s)2601 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2602     pa_assert(s);
2603     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2604 
2605     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2606 
2607     return &s->sample_spec;
2608 }
2609 
pa_stream_get_channel_map(pa_stream * s)2610 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2611     pa_assert(s);
2612     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2613 
2614     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2615 
2616     return &s->channel_map;
2617 }
2618 
pa_stream_get_format_info(const pa_stream * s)2619 const pa_format_info* pa_stream_get_format_info(const pa_stream *s) {
2620     pa_assert(s);
2621     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2622 
2623     /* We don't have the format till routing is done */
2624     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2625     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2626 
2627     return s->format;
2628 }
pa_stream_get_buffer_attr(pa_stream * s)2629 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2630     pa_assert(s);
2631     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2632 
2633     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2634     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2635     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2636 
2637     return &s->buffer_attr;
2638 }
2639 
stream_set_buffer_attr_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)2640 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2641     pa_operation *o = userdata;
2642     int success = 1;
2643 
2644     pa_assert(pd);
2645     pa_assert(o);
2646     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2647 
2648     if (!o->context)
2649         goto finish;
2650 
2651     if (command != PA_COMMAND_REPLY) {
2652         if (pa_context_handle_error(o->context, command, t, false) < 0)
2653             goto finish;
2654 
2655         success = 0;
2656     } else {
2657         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2658             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2659                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2660                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2661                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2662                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2663                 goto finish;
2664             }
2665         } else if (o->stream->direction == PA_STREAM_RECORD) {
2666             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2667                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2668                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2669                 goto finish;
2670             }
2671         }
2672 
2673         if (o->stream->context->version >= 13) {
2674             pa_usec_t usec;
2675 
2676             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2677                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2678                 goto finish;
2679             }
2680 
2681             if (o->stream->direction == PA_STREAM_RECORD)
2682                 o->stream->timing_info.configured_source_usec = usec;
2683             else
2684                 o->stream->timing_info.configured_sink_usec = usec;
2685         }
2686 
2687         if (!pa_tagstruct_eof(t)) {
2688             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2689             goto finish;
2690         }
2691     }
2692 
2693     if (o->callback) {
2694         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2695         cb(o->stream, success, o->userdata);
2696     }
2697 
2698 finish:
2699     pa_operation_done(o);
2700     pa_operation_unref(o);
2701 }
2702 
pa_stream_set_buffer_attr(pa_stream * s,const pa_buffer_attr * attr,pa_stream_success_cb_t cb,void * userdata)2703 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2704     pa_operation *o;
2705     pa_tagstruct *t;
2706     uint32_t tag;
2707     pa_buffer_attr copy;
2708 
2709     pa_assert(s);
2710     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2711     pa_assert(attr);
2712 
2713     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2714     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2715     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2716     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2717 
2718     /* Ask for a timing update before we cork/uncork to get the best
2719      * accuracy for the transport latency suitable for the
2720      * check_smoother_status() call in the started callback */
2721     request_auto_timing_update(s, true);
2722 
2723     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2724 
2725     t = pa_tagstruct_command(
2726             s->context,
2727             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2728             &tag);
2729     pa_tagstruct_putu32(t, s->channel);
2730 
2731     copy = *attr;
2732     patch_buffer_attr(s, &copy, NULL);
2733     attr = &copy;
2734 
2735     pa_tagstruct_putu32(t, attr->maxlength);
2736 
2737     if (s->direction == PA_STREAM_PLAYBACK)
2738         pa_tagstruct_put(
2739                 t,
2740                 PA_TAG_U32, attr->tlength,
2741                 PA_TAG_U32, attr->prebuf,
2742                 PA_TAG_U32, attr->minreq,
2743                 PA_TAG_INVALID);
2744     else
2745         pa_tagstruct_putu32(t, attr->fragsize);
2746 
2747     if (s->context->version >= 13)
2748         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2749 
2750     if (s->context->version >= 14)
2751         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2752 
2753     pa_pstream_send_tagstruct(s->context->pstream, t);
2754     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2755 
2756     /* This might cause changes in the read/write index, hence let's
2757      * request a timing update */
2758     request_auto_timing_update(s, true);
2759 
2760     return o;
2761 }
2762 
pa_stream_get_device_index(const pa_stream * s)2763 uint32_t pa_stream_get_device_index(const pa_stream *s) {
2764     pa_assert(s);
2765     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2766 
2767     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2768     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2769     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2770     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2771     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2772 
2773     return s->device_index;
2774 }
2775 
pa_stream_get_device_name(const pa_stream * s)2776 const char *pa_stream_get_device_name(const pa_stream *s) {
2777     pa_assert(s);
2778     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2779 
2780     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2781     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2782     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2783     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2784     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2785 
2786     return s->device_name;
2787 }
2788 
pa_stream_is_suspended(const pa_stream * s)2789 int pa_stream_is_suspended(const pa_stream *s) {
2790     pa_assert(s);
2791     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2792 
2793     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2794     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2795     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2796     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2797 
2798     return s->suspended;
2799 }
2800 
pa_stream_is_corked(const pa_stream * s)2801 int pa_stream_is_corked(const pa_stream *s) {
2802     pa_assert(s);
2803     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2804 
2805     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2806     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2807     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2808 
2809     return s->corked;
2810 }
2811 
stream_update_sample_rate_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)2812 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2813     pa_operation *o = userdata;
2814     int success = 1;
2815 
2816     pa_assert(pd);
2817     pa_assert(o);
2818     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2819 
2820     if (!o->context)
2821         goto finish;
2822 
2823     if (command != PA_COMMAND_REPLY) {
2824         if (pa_context_handle_error(o->context, command, t, false) < 0)
2825             goto finish;
2826 
2827         success = 0;
2828     } else {
2829 
2830         if (!pa_tagstruct_eof(t)) {
2831             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2832             goto finish;
2833         }
2834     }
2835 
2836     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2837 #ifdef USE_SMOOTHER_2
2838     if (o->stream->smoother)
2839         pa_smoother_2_set_rate(o->stream->smoother, pa_rtclock_now(), o->stream->sample_spec.rate);
2840 #endif
2841     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2842 
2843     if (o->callback) {
2844         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2845         cb(o->stream, success, o->userdata);
2846     }
2847 
2848 finish:
2849     pa_operation_done(o);
2850     pa_operation_unref(o);
2851 }
2852 
pa_stream_update_sample_rate(pa_stream * s,uint32_t rate,pa_stream_success_cb_t cb,void * userdata)2853 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2854     pa_operation *o;
2855     pa_tagstruct *t;
2856     uint32_t tag;
2857 
2858     pa_assert(s);
2859     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2860 
2861     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2862     PA_CHECK_VALIDITY_RETURN_NULL(s->context, pa_sample_rate_valid(rate), PA_ERR_INVALID);
2863     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2864     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2865     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2866     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2867 
2868     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2869     o->private = PA_UINT_TO_PTR(rate);
2870 
2871     t = pa_tagstruct_command(
2872             s->context,
2873             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2874             &tag);
2875     pa_tagstruct_putu32(t, s->channel);
2876     pa_tagstruct_putu32(t, rate);
2877 
2878     pa_pstream_send_tagstruct(s->context->pstream, t);
2879     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2880 
2881     return o;
2882 }
2883 
pa_stream_proplist_update(pa_stream * s,pa_update_mode_t mode,pa_proplist * p,pa_stream_success_cb_t cb,void * userdata)2884 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2885     pa_operation *o;
2886     pa_tagstruct *t;
2887     uint32_t tag;
2888 
2889     pa_assert(s);
2890     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2891 
2892     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2893     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2894     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2895     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2896     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2897 
2898     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2899 
2900     t = pa_tagstruct_command(
2901             s->context,
2902             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2903             &tag);
2904     pa_tagstruct_putu32(t, s->channel);
2905     pa_tagstruct_putu32(t, (uint32_t) mode);
2906     pa_tagstruct_put_proplist(t, p);
2907 
2908     pa_pstream_send_tagstruct(s->context->pstream, t);
2909     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2910 
2911     /* Please note that we don't update s->proplist here, because we
2912      * don't export that field */
2913 
2914     return o;
2915 }
2916 
pa_stream_proplist_remove(pa_stream * s,const char * const keys[],pa_stream_success_cb_t cb,void * userdata)2917 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2918     pa_operation *o;
2919     pa_tagstruct *t;
2920     uint32_t tag;
2921     const char * const*k;
2922 
2923     pa_assert(s);
2924     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2925 
2926     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2927     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2928     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2929     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2930     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2931 
2932     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2933 
2934     t = pa_tagstruct_command(
2935             s->context,
2936             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2937             &tag);
2938     pa_tagstruct_putu32(t, s->channel);
2939 
2940     for (k = keys; *k; k++)
2941         pa_tagstruct_puts(t, *k);
2942 
2943     pa_tagstruct_puts(t, NULL);
2944 
2945     pa_pstream_send_tagstruct(s->context->pstream, t);
2946     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2947 
2948     /* Please note that we don't update s->proplist here, because we
2949      * don't export that field */
2950 
2951     return o;
2952 }
2953 
pa_stream_set_monitor_stream(pa_stream * s,uint32_t sink_input_idx)2954 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2955     pa_assert(s);
2956     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2957 
2958     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2959     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2960     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2961     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2962 
2963     s->direct_on_input = sink_input_idx;
2964 
2965     return 0;
2966 }
2967 
pa_stream_get_monitor_stream(const pa_stream * s)2968 uint32_t pa_stream_get_monitor_stream(const pa_stream *s) {
2969     pa_assert(s);
2970     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2971 
2972     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2973     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2974     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2975 
2976     return s->direct_on_input;
2977 }
2978