• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /***
2   This file is part of PulseAudio.
3 
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6 
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11 
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16 
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20 
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24 
25 #include <string.h>
26 #include <stdio.h>
27 
28 #include <pulse/def.h>
29 #include <pulse/timeval.h>
30 #include <pulse/rtclock.h>
31 #include <pulse/xmalloc.h>
32 #include <pulse/fork-detect.h>
33 
34 #include <pulsecore/pstream-util.h>
35 #include <pulsecore/sample-util.h>
36 #include <pulsecore/log.h>
37 #include <pulsecore/hashmap.h>
38 #include <pulsecore/macro.h>
39 #include <pulsecore/core-rtclock.h>
40 #include <pulsecore/core-util.h>
41 
42 #include "internal.h"
43 #include "stream.h"
44 
45 /* #define STREAM_DEBUG */
46 
47 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
48 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
49 
50 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
51 #ifndef USE_SMOOTHER_2
52 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
53 #define SMOOTHER_MIN_HISTORY (4)
54 #endif
55 
pa_stream_new(pa_context * c,const char * name,const pa_sample_spec * ss,const pa_channel_map * map)56 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
57     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
58 }
59 
reset_callbacks(pa_stream * s)60 static void reset_callbacks(pa_stream *s) {
61     s->read_callback = NULL;
62     s->read_userdata = NULL;
63     s->write_callback = NULL;
64     s->write_userdata = NULL;
65     s->state_callback = NULL;
66     s->state_userdata = NULL;
67     s->overflow_callback = NULL;
68     s->overflow_userdata = NULL;
69     s->underflow_callback = NULL;
70     s->underflow_userdata = NULL;
71     s->latency_update_callback = NULL;
72     s->latency_update_userdata = NULL;
73     s->moved_callback = NULL;
74     s->moved_userdata = NULL;
75     s->suspended_callback = NULL;
76     s->suspended_userdata = NULL;
77     s->started_callback = NULL;
78     s->started_userdata = NULL;
79     s->event_callback = NULL;
80     s->event_userdata = NULL;
81     s->buffer_attr_callback = NULL;
82     s->buffer_attr_userdata = NULL;
83     s->underflow_ohos_callback = NULL;
84     s->underflow_ohos_userdata = NULL;
85 }
86 
pa_stream_new_with_proplist_internal(pa_context * c,const char * name,const pa_sample_spec * ss,const pa_channel_map * map,pa_format_info * const * formats,unsigned int n_formats,pa_proplist * p)87 static pa_stream *pa_stream_new_with_proplist_internal(
88         pa_context *c,
89         const char *name,
90         const pa_sample_spec *ss,
91         const pa_channel_map *map,
92         pa_format_info * const *formats,
93         unsigned int n_formats,
94         pa_proplist *p) {
95 
96     pa_stream *s;
97     unsigned int i;
98 
99     pa_assert(c);
100     pa_assert(PA_REFCNT_VALUE(c) >= 1);
101     pa_assert((ss == NULL && map == NULL) || (formats == NULL && n_formats == 0));
102     pa_assert(n_formats < PA_MAX_FORMATS);
103 
104     PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
105     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
106 
107     s = pa_xnew(pa_stream, 1);
108     PA_REFCNT_INIT(s);
109     s->context = c;
110     s->mainloop = c->mainloop;
111 
112     s->direction = PA_STREAM_NODIRECTION;
113     s->state = PA_STREAM_UNCONNECTED;
114     s->flags = 0;
115 
116     if (ss)
117         s->sample_spec = *ss;
118     else
119         pa_sample_spec_init(&s->sample_spec);
120 
121     if (map)
122         s->channel_map = *map;
123     else
124         pa_channel_map_init(&s->channel_map);
125 
126     s->n_formats = 0;
127     if (formats) {
128         s->n_formats = n_formats;
129         for (i = 0; i < n_formats; i++)
130             s->req_formats[i] = pa_format_info_copy(formats[i]);
131     }
132 
133     /* We'll get the final negotiated format after connecting */
134     s->format = NULL;
135 
136     s->direct_on_input = PA_INVALID_INDEX;
137 
138     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
139     if (name)
140         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
141 
142     s->channel = 0;
143     s->channel_valid = false;
144     s->syncid = c->csyncid++;
145     s->stream_index = PA_INVALID_INDEX;
146 
147     s->requested_bytes = 0;
148     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
149 
150     /* We initialize the target length here, so that if the user
151      * passes no explicit buffering metrics the default is similar to
152      * what older PA versions provided. */
153 
154     s->buffer_attr.maxlength = (uint32_t) -1;
155     if (ss)
156         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
157     else {
158         /* FIXME: We assume a worst-case compressed format corresponding to
159          * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
160         pa_sample_spec tmp_ss = {
161             .format   = PA_SAMPLE_S16NE,
162             .rate     = 48000,
163             .channels = 2,
164         };
165         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
166     }
167     s->buffer_attr.minreq = (uint32_t) -1;
168     s->buffer_attr.prebuf = (uint32_t) -1;
169     s->buffer_attr.fragsize = (uint32_t) -1;
170 
171     s->device_index = PA_INVALID_INDEX;
172     s->device_name = NULL;
173     s->suspended = false;
174     s->corked = false;
175 
176     s->write_memblock = NULL;
177     s->write_data = NULL;
178 
179     pa_memchunk_reset(&s->peek_memchunk);
180     s->peek_data = NULL;
181     s->record_memblockq = NULL;
182 
183     memset(&s->timing_info, 0, sizeof(s->timing_info));
184     s->timing_info_valid = false;
185 
186     s->previous_time = 0;
187     s->latest_underrun_at_index = -1;
188 
189     s->read_index_not_before = 0;
190     s->write_index_not_before = 0;
191     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
192         s->write_index_corrections[i].valid = 0;
193     s->current_write_index_correction = 0;
194 
195     s->auto_timing_update_event = NULL;
196     s->auto_timing_update_requested = false;
197     s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
198 
199     reset_callbacks(s);
200 
201     s->smoother = NULL;
202 
203     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
204     PA_LLIST_PREPEND(pa_stream, c->streams, s);
205     pa_stream_ref(s);
206 
207     return s;
208 }
209 
pa_stream_new_with_proplist(pa_context * c,const char * name,const pa_sample_spec * ss,const pa_channel_map * map,pa_proplist * p)210 pa_stream *pa_stream_new_with_proplist(
211         pa_context *c,
212         const char *name,
213         const pa_sample_spec *ss,
214         const pa_channel_map *map,
215         pa_proplist *p) {
216 
217     pa_channel_map tmap;
218 
219     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
220     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
221     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
222     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
223     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
224 
225     if (!map)
226         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
227 
228     return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, 0, p);
229 }
230 
pa_stream_new_extended(pa_context * c,const char * name,pa_format_info * const * formats,unsigned int n_formats,pa_proplist * p)231 pa_stream *pa_stream_new_extended(
232         pa_context *c,
233         const char *name,
234         pa_format_info * const *formats,
235         unsigned int n_formats,
236         pa_proplist *p) {
237 
238     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
239 
240     return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, n_formats, p);
241 }
242 
stream_unlink(pa_stream * s)243 static void stream_unlink(pa_stream *s) {
244     pa_operation *o, *n;
245     pa_assert(s);
246 
247     if (!s->context)
248         return;
249 
250     /* Detach from context */
251 
252     /* Unref all operation objects that point to us */
253     for (o = s->context->operations; o; o = n) {
254         n = o->next;
255 
256         if (o->stream == s)
257             pa_operation_cancel(o);
258     }
259 
260     /* Drop all outstanding replies for this stream */
261     if (s->context->pdispatch)
262         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
263 
264     if (s->channel_valid) {
265         pa_hashmap_remove((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel));
266         s->channel = 0;
267         s->channel_valid = false;
268     }
269 
270     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
271     pa_stream_unref(s);
272 
273     s->context = NULL;
274 
275     if (s->auto_timing_update_event) {
276         pa_assert(s->mainloop);
277         s->mainloop->time_free(s->auto_timing_update_event);
278     }
279 
280     reset_callbacks(s);
281 }
282 
stream_free(pa_stream * s)283 static void stream_free(pa_stream *s) {
284     unsigned int i;
285 
286     pa_assert(s);
287 
288     stream_unlink(s);
289 
290     if (s->write_memblock) {
291         if (s->write_data)
292             pa_memblock_release(s->write_memblock);
293         pa_memblock_unref(s->write_memblock);
294     }
295 
296     if (s->peek_memchunk.memblock) {
297         if (s->peek_data)
298             pa_memblock_release(s->peek_memchunk.memblock);
299         pa_memblock_unref(s->peek_memchunk.memblock);
300     }
301 
302     if (s->record_memblockq)
303         pa_memblockq_free(s->record_memblockq);
304 
305     if (s->proplist)
306         pa_proplist_free(s->proplist);
307 
308     if (s->smoother)
309 #ifdef USE_SMOOTHER_2
310         pa_smoother_2_free(s->smoother);
311 #else
312         pa_smoother_free(s->smoother);
313 #endif
314 
315     for (i = 0; i < s->n_formats; i++)
316         pa_format_info_free(s->req_formats[i]);
317 
318     if (s->format)
319         pa_format_info_free(s->format);
320 
321     pa_xfree(s->device_name);
322     pa_xfree(s);
323 }
324 
pa_stream_unref(pa_stream * s)325 void pa_stream_unref(pa_stream *s) {
326     pa_assert(s);
327     pa_assert(PA_REFCNT_VALUE(s) >= 1);
328 
329     if (PA_REFCNT_DEC(s) <= 0)
330         stream_free(s);
331 }
332 
pa_stream_ref(pa_stream * s)333 pa_stream* pa_stream_ref(pa_stream *s) {
334     pa_assert(s);
335     pa_assert(PA_REFCNT_VALUE(s) >= 1);
336 
337     PA_REFCNT_INC(s);
338     return s;
339 }
340 
pa_stream_get_state(const pa_stream * s)341 pa_stream_state_t pa_stream_get_state(const pa_stream *s) {
342     pa_assert(s);
343     pa_assert(PA_REFCNT_VALUE(s) >= 1);
344 
345     return s->state;
346 }
347 
pa_stream_terminate(pa_stream * s)348 void pa_stream_terminate(pa_stream *s) {
349     pa_stream_set_state(s, PA_STREAM_TERMINATED);
350 }
351 
pa_stream_get_context(const pa_stream * s)352 pa_context* pa_stream_get_context(const pa_stream *s) {
353     pa_assert(s);
354     pa_assert(PA_REFCNT_VALUE(s) >= 1);
355 
356     return s->context;
357 }
358 
pa_stream_get_index(const pa_stream * s)359 uint32_t pa_stream_get_index(const pa_stream *s) {
360     pa_assert(s);
361     pa_assert(PA_REFCNT_VALUE(s) >= 1);
362 
363     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
364     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
365 
366     return s->stream_index;
367 }
368 
pa_stream_set_state(pa_stream * s,pa_stream_state_t st)369 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
370     pa_assert(s);
371     pa_assert(PA_REFCNT_VALUE(s) >= 1);
372 
373     if (s->state == st)
374         return;
375 
376     pa_stream_ref(s);
377 
378     s->state = st;
379 
380     if (s->state_callback)
381         s->state_callback(s, s->state_userdata);
382 
383     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
384         stream_unlink(s);
385 
386     pa_stream_unref(s);
387 }
388 
request_auto_timing_update(pa_stream * s,bool force)389 static void request_auto_timing_update(pa_stream *s, bool force) {
390     pa_assert(s);
391     pa_assert(PA_REFCNT_VALUE(s) >= 1);
392 
393     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
394         return;
395 
396     if (s->state == PA_STREAM_READY &&
397         (force || !s->auto_timing_update_requested)) {
398         pa_operation *o;
399 
400 #ifdef STREAM_DEBUG
401         pa_log_debug("Automatically requesting new timing data");
402 #endif
403 
404         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
405             pa_operation_unref(o);
406             s->auto_timing_update_requested = true;
407         }
408     }
409 
410     if (s->auto_timing_update_event) {
411         if (s->suspended && !force) {
412             pa_assert(s->mainloop);
413             s->mainloop->time_free(s->auto_timing_update_event);
414             s->auto_timing_update_event = NULL;
415         } else {
416             if (force)
417                 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
418 
419             pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
420 
421             s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
422         }
423     }
424 }
425 
pa_command_stream_killed(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)426 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
427     pa_context *c = userdata;
428     pa_stream *s;
429     uint32_t channel;
430 
431     pa_assert(pd);
432     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
433     pa_assert(t);
434     pa_assert(c);
435     pa_assert(PA_REFCNT_VALUE(c) >= 1);
436 
437     pa_context_ref(c);
438 
439     if (pa_tagstruct_getu32(t, &channel) < 0 ||
440         !pa_tagstruct_eof(t)) {
441         pa_context_fail(c, PA_ERR_PROTOCOL);
442         goto finish;
443     }
444 
445     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
446         goto finish;
447 
448     if (s->state != PA_STREAM_READY)
449         goto finish;
450 
451     pa_context_set_error(c, PA_ERR_KILLED);
452     pa_stream_set_state(s, PA_STREAM_FAILED);
453 
454 finish:
455     pa_context_unref(c);
456 }
457 
check_smoother_status(pa_stream * s,bool aposteriori,bool force_start,bool force_stop)458 static void check_smoother_status(pa_stream *s, bool aposteriori, bool force_start, bool force_stop) {
459     pa_usec_t x;
460 
461     pa_assert(s);
462     pa_assert(!force_start || !force_stop);
463 
464     if (!s->smoother)
465         return;
466 
467     x = pa_rtclock_now();
468 
469     if (s->timing_info_valid) {
470         if (aposteriori)
471             x -= s->timing_info.transport_usec;
472         else
473             x += s->timing_info.transport_usec;
474     }
475 
476     if (s->suspended || s->corked || force_stop)
477 #ifdef USE_SMOOTHER_2
478         pa_smoother_2_pause(s->smoother, x);
479 #else
480         pa_smoother_pause(s->smoother, x);
481 #endif
482     else if (force_start || s->buffer_attr.prebuf == 0) {
483 
484         if (!s->timing_info_valid &&
485             !aposteriori &&
486             !force_start &&
487             !force_stop &&
488             s->context->version >= 13) {
489 
490             /* If the server supports STARTED events we take them as
491              * indications when audio really starts/stops playing, if
492              * we don't have any timing info yet -- instead of trying
493              * to be smart and guessing the server time. Otherwise the
494              * unknown transport delay adds too much noise to our time
495              * calculations. */
496 
497             return;
498         }
499 
500 #ifdef USE_SMOOTHER_2
501         pa_smoother_2_resume(s->smoother, x);
502 #else
503         pa_smoother_resume(s->smoother, x, true);
504 #endif
505     }
506 
507     /* Please note that we have no idea if playback actually started
508      * if prebuf is non-zero! */
509 }
510 
511 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata);
512 
pa_command_stream_moved(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)513 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
514     pa_context *c = userdata;
515     pa_stream *s;
516     uint32_t channel;
517     const char *dn;
518     bool suspended;
519     uint32_t di;
520     pa_usec_t usec = 0;
521     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
522 
523     pa_assert(pd);
524     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
525     pa_assert(t);
526     pa_assert(c);
527     pa_assert(PA_REFCNT_VALUE(c) >= 1);
528 
529     pa_context_ref(c);
530 
531     if (c->version < 12) {
532         pa_context_fail(c, PA_ERR_PROTOCOL);
533         goto finish;
534     }
535 
536     if (pa_tagstruct_getu32(t, &channel) < 0 ||
537         pa_tagstruct_getu32(t, &di) < 0 ||
538         pa_tagstruct_gets(t, &dn) < 0 ||
539         pa_tagstruct_get_boolean(t, &suspended) < 0) {
540         pa_context_fail(c, PA_ERR_PROTOCOL);
541         goto finish;
542     }
543 
544     if (c->version >= 13) {
545 
546         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
547             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
548                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
549                 pa_tagstruct_get_usec(t, &usec) < 0) {
550                 pa_context_fail(c, PA_ERR_PROTOCOL);
551                 goto finish;
552             }
553         } else {
554             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
555                 pa_tagstruct_getu32(t, &tlength) < 0 ||
556                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
557                 pa_tagstruct_getu32(t, &minreq) < 0 ||
558                 pa_tagstruct_get_usec(t, &usec) < 0) {
559                 pa_context_fail(c, PA_ERR_PROTOCOL);
560                 goto finish;
561             }
562         }
563     }
564 
565     if (!pa_tagstruct_eof(t)) {
566         pa_context_fail(c, PA_ERR_PROTOCOL);
567         goto finish;
568     }
569 
570     if (!dn || di == PA_INVALID_INDEX) {
571         pa_context_fail(c, PA_ERR_PROTOCOL);
572         goto finish;
573     }
574 
575     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
576         goto finish;
577 
578     if (s->state != PA_STREAM_READY)
579         goto finish;
580 
581     if (c->version >= 13) {
582         if (s->direction == PA_STREAM_RECORD)
583             s->timing_info.configured_source_usec = usec;
584         else
585             s->timing_info.configured_sink_usec = usec;
586 
587         s->buffer_attr.maxlength = maxlength;
588         s->buffer_attr.fragsize = fragsize;
589         s->buffer_attr.tlength = tlength;
590         s->buffer_attr.prebuf = prebuf;
591         s->buffer_attr.minreq = minreq;
592     }
593 
594     pa_xfree(s->device_name);
595     s->device_name = pa_xstrdup(dn);
596     s->device_index = di;
597 
598     s->suspended = suspended;
599 
600     if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
601         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
602         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
603         request_auto_timing_update(s, true);
604     }
605 
606     check_smoother_status(s, true, false, false);
607     request_auto_timing_update(s, true);
608 
609     if (s->moved_callback)
610         s->moved_callback(s, s->moved_userdata);
611 
612 finish:
613     pa_context_unref(c);
614 }
615 
pa_command_stream_buffer_attr(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)616 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
617     pa_context *c = userdata;
618     pa_stream *s;
619     uint32_t channel;
620     pa_usec_t usec = 0;
621     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
622 
623     pa_assert(pd);
624     pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
625     pa_assert(t);
626     pa_assert(c);
627     pa_assert(PA_REFCNT_VALUE(c) >= 1);
628 
629     pa_context_ref(c);
630 
631     if (c->version < 15) {
632         pa_context_fail(c, PA_ERR_PROTOCOL);
633         goto finish;
634     }
635 
636     if (pa_tagstruct_getu32(t, &channel) < 0) {
637         pa_context_fail(c, PA_ERR_PROTOCOL);
638         goto finish;
639     }
640 
641     if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
642         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
643             pa_tagstruct_getu32(t, &fragsize) < 0 ||
644             pa_tagstruct_get_usec(t, &usec) < 0) {
645             pa_context_fail(c, PA_ERR_PROTOCOL);
646             goto finish;
647         }
648     } else {
649         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
650             pa_tagstruct_getu32(t, &tlength) < 0 ||
651             pa_tagstruct_getu32(t, &prebuf) < 0 ||
652             pa_tagstruct_getu32(t, &minreq) < 0 ||
653             pa_tagstruct_get_usec(t, &usec) < 0) {
654             pa_context_fail(c, PA_ERR_PROTOCOL);
655             goto finish;
656         }
657     }
658 
659     if (!pa_tagstruct_eof(t)) {
660         pa_context_fail(c, PA_ERR_PROTOCOL);
661         goto finish;
662     }
663 
664     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
665         goto finish;
666 
667     if (s->state != PA_STREAM_READY)
668         goto finish;
669 
670     if (s->direction == PA_STREAM_RECORD)
671         s->timing_info.configured_source_usec = usec;
672     else
673         s->timing_info.configured_sink_usec = usec;
674 
675     s->buffer_attr.maxlength = maxlength;
676     s->buffer_attr.fragsize = fragsize;
677     s->buffer_attr.tlength = tlength;
678     s->buffer_attr.prebuf = prebuf;
679     s->buffer_attr.minreq = minreq;
680 
681     request_auto_timing_update(s, true);
682 
683     if (s->buffer_attr_callback)
684         s->buffer_attr_callback(s, s->buffer_attr_userdata);
685 
686 finish:
687     pa_context_unref(c);
688 }
689 
pa_command_stream_suspended(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)690 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
691     pa_context *c = userdata;
692     pa_stream *s;
693     uint32_t channel;
694     bool suspended;
695 
696     pa_assert(pd);
697     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
698     pa_assert(t);
699     pa_assert(c);
700     pa_assert(PA_REFCNT_VALUE(c) >= 1);
701 
702     pa_context_ref(c);
703 
704     if (c->version < 12) {
705         pa_context_fail(c, PA_ERR_PROTOCOL);
706         goto finish;
707     }
708 
709     if (pa_tagstruct_getu32(t, &channel) < 0 ||
710         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
711         !pa_tagstruct_eof(t)) {
712         pa_context_fail(c, PA_ERR_PROTOCOL);
713         goto finish;
714     }
715 
716     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
717         goto finish;
718 
719     if (s->state != PA_STREAM_READY)
720         goto finish;
721 
722     s->suspended = suspended;
723 
724     if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
725         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
726         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
727         request_auto_timing_update(s, true);
728     }
729 
730     check_smoother_status(s, true, false, false);
731     request_auto_timing_update(s, true);
732 
733     if (s->suspended_callback)
734         s->suspended_callback(s, s->suspended_userdata);
735 
736 finish:
737     pa_context_unref(c);
738 }
739 
pa_command_stream_started(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)740 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
741     pa_context *c = userdata;
742     pa_stream *s;
743     uint32_t channel;
744 
745     pa_assert(pd);
746     pa_assert(command == PA_COMMAND_STARTED);
747     pa_assert(t);
748     pa_assert(c);
749     pa_assert(PA_REFCNT_VALUE(c) >= 1);
750 
751     pa_context_ref(c);
752 
753     if (c->version < 13) {
754         pa_context_fail(c, PA_ERR_PROTOCOL);
755         goto finish;
756     }
757 
758     if (pa_tagstruct_getu32(t, &channel) < 0 ||
759         !pa_tagstruct_eof(t)) {
760         pa_context_fail(c, PA_ERR_PROTOCOL);
761         goto finish;
762     }
763 
764     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
765         goto finish;
766 
767     if (s->state != PA_STREAM_READY)
768         goto finish;
769 
770     check_smoother_status(s, true, true, false);
771     request_auto_timing_update(s, true);
772 
773     if (s->started_callback)
774         s->started_callback(s, s->started_userdata);
775 
776 finish:
777     pa_context_unref(c);
778 }
779 
pa_command_stream_event(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)780 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
781     pa_context *c = userdata;
782     pa_stream *s;
783     uint32_t channel;
784     pa_proplist *pl = NULL;
785     const char *event;
786 
787     pa_assert(pd);
788     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
789     pa_assert(t);
790     pa_assert(c);
791     pa_assert(PA_REFCNT_VALUE(c) >= 1);
792 
793     pa_context_ref(c);
794 
795     if (c->version < 15) {
796         pa_context_fail(c, PA_ERR_PROTOCOL);
797         goto finish;
798     }
799 
800     pl = pa_proplist_new();
801 
802     if (pa_tagstruct_getu32(t, &channel) < 0 ||
803         pa_tagstruct_gets(t, &event) < 0 ||
804         pa_tagstruct_get_proplist(t, pl) < 0 ||
805         !pa_tagstruct_eof(t) || !event) {
806         pa_context_fail(c, PA_ERR_PROTOCOL);
807         goto finish;
808     }
809 
810     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
811         goto finish;
812 
813     if (s->state != PA_STREAM_READY)
814         goto finish;
815 
816     if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) {
817         /* Let client know what the running time was when the stream had to be killed  */
818         pa_usec_t stream_time;
819         if (pa_stream_get_time(s, &stream_time) == 0)
820             pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) stream_time);
821     }
822 
823     if (s->event_callback)
824         s->event_callback(s, event, pl, s->event_userdata);
825 
826 finish:
827     pa_context_unref(c);
828 
829     if (pl)
830         pa_proplist_free(pl);
831 }
832 
pa_command_request(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)833 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
834     pa_stream *s;
835     pa_context *c = userdata;
836     uint32_t bytes, channel;
837 
838     pa_assert(pd);
839     pa_assert(command == PA_COMMAND_REQUEST);
840     pa_assert(t);
841     pa_assert(c);
842     pa_assert(PA_REFCNT_VALUE(c) >= 1);
843 
844     pa_context_ref(c);
845 
846     if (pa_tagstruct_getu32(t, &channel) < 0 ||
847         pa_tagstruct_getu32(t, &bytes) < 0 ||
848         !pa_tagstruct_eof(t)) {
849         pa_context_fail(c, PA_ERR_PROTOCOL);
850         goto finish;
851     }
852 
853     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
854         goto finish;
855 
856     if (s->state != PA_STREAM_READY)
857         goto finish;
858 
859     s->requested_bytes += bytes;
860 
861 #ifdef STREAM_DEBUG
862     pa_log_debug("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes);
863 #endif
864 
865     if (s->requested_bytes > 0 && s->write_callback)
866         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
867 
868 finish:
869     pa_context_unref(c);
870 }
871 
pa_stream_get_underflow_index(const pa_stream * p)872 int64_t pa_stream_get_underflow_index(const pa_stream *p) {
873     pa_assert(p);
874     return p->latest_underrun_at_index;
875 }
876 
pa_command_overflow_or_underflow(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)877 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
878     pa_stream *s;
879     pa_context *c = userdata;
880     uint32_t channel;
881     int64_t offset = -1;
882 
883     pa_assert(pd);
884     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW
885         || command == PA_COMMAND_UNDERFLOW_OHOS);
886     pa_assert(t);
887     pa_assert(c);
888     pa_assert(PA_REFCNT_VALUE(c) >= 1);
889 
890     pa_context_ref(c);
891 
892     if (pa_tagstruct_getu32(t, &channel) < 0) {
893         pa_context_fail(c, PA_ERR_PROTOCOL);
894         goto finish;
895     }
896 
897     if (c->version >= 23 && command == PA_COMMAND_UNDERFLOW) {
898         if (pa_tagstruct_gets64(t, &offset) < 0) {
899             pa_context_fail(c, PA_ERR_PROTOCOL);
900             goto finish;
901         }
902     }
903 
904     if (!pa_tagstruct_eof(t)) {
905         pa_context_fail(c, PA_ERR_PROTOCOL);
906         goto finish;
907     }
908 
909     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
910         goto finish;
911 
912     if (s->state != PA_STREAM_READY)
913         goto finish;
914 
915     if (command == PA_COMMAND_UNDERFLOW_OHOS) {
916         if (s->underflow_ohos_callback) {
917             s->underflow_ohos_callback(s, s->underflow_ohos_userdata);
918         }
919         goto finish;
920     }
921 
922     if (offset != -1)
923         s->latest_underrun_at_index = offset;
924 
925     if (s->buffer_attr.prebuf > 0)
926         check_smoother_status(s, true, false, true);
927 
928     request_auto_timing_update(s, true);
929 
930     if (command == PA_COMMAND_OVERFLOW) {
931         if (s->overflow_callback)
932             s->overflow_callback(s, s->overflow_userdata);
933     } else if (command == PA_COMMAND_UNDERFLOW) {
934         if (s->underflow_callback)
935             s->underflow_callback(s, s->underflow_userdata);
936     }
937 
938 finish:
939     pa_context_unref(c);
940 }
941 
invalidate_indexes(pa_stream * s,bool r,bool w)942 static void invalidate_indexes(pa_stream *s, bool r, bool w) {
943     pa_assert(s);
944     pa_assert(PA_REFCNT_VALUE(s) >= 1);
945 
946 #ifdef STREAM_DEBUG
947     pa_log_debug("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag);
948 #endif
949 
950     if (s->state != PA_STREAM_READY)
951         return;
952 
953     if (w) {
954         s->write_index_not_before = s->context->ctag;
955 
956         if (s->timing_info_valid)
957             s->timing_info.write_index_corrupt = true;
958 
959 #ifdef STREAM_DEBUG
960         pa_log_debug("write_index invalidated");
961 #endif
962     }
963 
964     if (r) {
965         s->read_index_not_before = s->context->ctag;
966 
967         if (s->timing_info_valid)
968             s->timing_info.read_index_corrupt = true;
969 
970 #ifdef STREAM_DEBUG
971         pa_log_debug("read_index invalidated");
972 #endif
973     }
974 
975     request_auto_timing_update(s, true);
976 }
977 
auto_timing_update_callback(pa_mainloop_api * m,pa_time_event * e,const struct timeval * t,void * userdata)978 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
979     pa_stream *s = userdata;
980 
981     pa_assert(s);
982     pa_assert(PA_REFCNT_VALUE(s) >= 1);
983 
984     pa_stream_ref(s);
985     request_auto_timing_update(s, false);
986     pa_stream_unref(s);
987 }
988 
create_stream_complete(pa_stream * s)989 static void create_stream_complete(pa_stream *s) {
990     pa_assert(s);
991     pa_assert(PA_REFCNT_VALUE(s) >= 1);
992     pa_assert(s->state == PA_STREAM_CREATING);
993 
994     pa_stream_set_state(s, PA_STREAM_READY);
995 
996     if (s->requested_bytes > 0 && s->write_callback)
997         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
998 
999     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
1000         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
1001         pa_assert(!s->auto_timing_update_event);
1002         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
1003 
1004         request_auto_timing_update(s, true);
1005     }
1006 
1007     check_smoother_status(s, true, false, false);
1008 }
1009 
patch_buffer_attr(pa_stream * s,pa_buffer_attr * attr,pa_stream_flags_t * flags)1010 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
1011     const char *e;
1012 
1013     pa_assert(s);
1014     pa_assert(attr);
1015 
1016     if ((e = getenv("PULSE_LATENCY_MSEC"))) {
1017         uint32_t ms;
1018         pa_sample_spec ss;
1019 
1020         pa_sample_spec_init(&ss);
1021 
1022         if (pa_sample_spec_valid(&s->sample_spec))
1023             ss = s->sample_spec;
1024         else if (s->n_formats == 1)
1025             pa_format_info_to_sample_spec(s->req_formats[0], &ss, NULL);
1026 
1027         if (pa_atou(e, &ms) < 0 || ms <= 0)
1028             pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
1029         else if (!pa_sample_spec_valid(&s->sample_spec))
1030             pa_log_debug("Ignoring $PULSE_LATENCY_MSEC: %s (invalid sample spec)", e);
1031         else {
1032             attr->maxlength = (uint32_t) -1;
1033             attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &ss);
1034             attr->minreq = (uint32_t) -1;
1035             attr->prebuf = (uint32_t) -1;
1036             attr->fragsize = attr->tlength;
1037 
1038             if (flags)
1039                 *flags |= PA_STREAM_ADJUST_LATENCY;
1040         }
1041     }
1042 
1043     if (s->context->version >= 13)
1044         return;
1045 
1046     /* Version older than 0.9.10 didn't do server side buffer_attr
1047      * selection, hence we have to fake it on the client side. */
1048 
1049     /* We choose fairly conservative values here, to not confuse
1050      * old clients with extremely large playback buffers */
1051 
1052     if (attr->maxlength == (uint32_t) -1)
1053         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
1054 
1055     if (attr->tlength == (uint32_t) -1)
1056         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
1057 
1058     if (attr->minreq == (uint32_t) -1)
1059         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
1060 
1061     if (attr->prebuf == (uint32_t) -1)
1062         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
1063 
1064     if (attr->fragsize == (uint32_t) -1)
1065         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
1066 }
1067 
pa_create_stream_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1068 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1069     pa_stream *s = userdata;
1070     uint32_t requested_bytes = 0;
1071 
1072     pa_assert(pd);
1073     pa_assert(s);
1074     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1075     pa_assert(s->state == PA_STREAM_CREATING);
1076 
1077     pa_stream_ref(s);
1078 
1079     if (command != PA_COMMAND_REPLY) {
1080         if (pa_context_handle_error(s->context, command, t, false) < 0)
1081             goto finish;
1082 
1083         pa_stream_set_state(s, PA_STREAM_FAILED);
1084         goto finish;
1085     }
1086 
1087     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
1088         s->channel == PA_INVALID_INDEX ||
1089         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
1090         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
1091         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1092         goto finish;
1093     }
1094 
1095     s->requested_bytes = (int64_t) requested_bytes;
1096 
1097     if (s->context->version >= 9) {
1098         if (s->direction == PA_STREAM_PLAYBACK) {
1099             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1100                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1101                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1102                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1103                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1104                 goto finish;
1105             }
1106         } else if (s->direction == PA_STREAM_RECORD) {
1107             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1108                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1109                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1110                 goto finish;
1111             }
1112         }
1113     }
1114 
1115     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1116         pa_sample_spec ss;
1117         pa_channel_map cm;
1118         const char *dn = NULL;
1119         bool suspended;
1120 
1121         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1122             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1123             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1124             pa_tagstruct_gets(t, &dn) < 0 ||
1125             pa_tagstruct_get_boolean(t, &suspended) < 0) {
1126             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1127             goto finish;
1128         }
1129 
1130         if (!dn || s->device_index == PA_INVALID_INDEX ||
1131             ss.channels != cm.channels ||
1132             !pa_channel_map_valid(&cm) ||
1133             !pa_sample_spec_valid(&ss) ||
1134             (s->n_formats == 0 && (
1135                 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1136                 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1137                 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1138             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1139             goto finish;
1140         }
1141 
1142         pa_xfree(s->device_name);
1143         s->device_name = pa_xstrdup(dn);
1144         s->suspended = suspended;
1145 
1146         s->channel_map = cm;
1147         s->sample_spec = ss;
1148     }
1149 
1150 #ifdef USE_SMOOTHER_2
1151     if (s->flags & PA_STREAM_INTERPOLATE_TIMING)
1152         pa_smoother_2_set_sample_spec(s->smoother, pa_rtclock_now(), &s->sample_spec);
1153 #endif
1154 
1155     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1156         pa_usec_t usec;
1157 
1158         if (pa_tagstruct_get_usec(t, &usec) < 0) {
1159             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1160             goto finish;
1161         }
1162 
1163         if (s->direction == PA_STREAM_RECORD)
1164             s->timing_info.configured_source_usec = usec;
1165         else
1166             s->timing_info.configured_sink_usec = usec;
1167     }
1168 
1169     if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1170         || s->context->version >= 22) {
1171 
1172         pa_format_info *f = pa_format_info_new();
1173 
1174         if (pa_tagstruct_get_format_info(t, f) < 0 || !pa_format_info_valid(f)) {
1175             pa_format_info_free(f);
1176             if (s->n_formats > 0) {
1177                 /* We used the extended API, so we should have got back a proper format */
1178                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1179                 goto finish;
1180             }
1181         } else
1182             s->format = f;
1183     }
1184 
1185     if (!pa_tagstruct_eof(t)) {
1186         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1187         goto finish;
1188     }
1189 
1190     if (s->direction == PA_STREAM_RECORD) {
1191         pa_assert(!s->record_memblockq);
1192 
1193         s->record_memblockq = pa_memblockq_new(
1194                 "client side record memblockq",
1195                 0,
1196                 s->buffer_attr.maxlength,
1197                 0,
1198                 &s->sample_spec,
1199                 1,
1200                 0,
1201                 0,
1202                 NULL);
1203     }
1204 
1205     s->channel_valid = true;
1206     pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1207 
1208     create_stream_complete(s);
1209 
1210 finish:
1211     pa_stream_unref(s);
1212 }
1213 
create_stream(pa_stream_direction_t direction,pa_stream * s,const char * dev,const pa_buffer_attr * attr,pa_stream_flags_t flags,const pa_cvolume * volume,pa_stream * sync_stream)1214 static int create_stream(
1215         pa_stream_direction_t direction,
1216         pa_stream *s,
1217         const char *dev,
1218         const pa_buffer_attr *attr,
1219         pa_stream_flags_t flags,
1220         const pa_cvolume *volume,
1221         pa_stream *sync_stream) {
1222 
1223     pa_tagstruct *t;
1224     uint32_t tag;
1225     bool volume_set = !!volume;
1226     pa_cvolume cv;
1227     uint32_t i;
1228 
1229     pa_assert(s);
1230     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1231     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1232 
1233     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1234     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1235     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1236     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1237                                               PA_STREAM_INTERPOLATE_TIMING|
1238                                               PA_STREAM_NOT_MONOTONIC|
1239                                               PA_STREAM_AUTO_TIMING_UPDATE|
1240                                               PA_STREAM_NO_REMAP_CHANNELS|
1241                                               PA_STREAM_NO_REMIX_CHANNELS|
1242                                               PA_STREAM_FIX_FORMAT|
1243                                               PA_STREAM_FIX_RATE|
1244                                               PA_STREAM_FIX_CHANNELS|
1245                                               PA_STREAM_DONT_MOVE|
1246                                               PA_STREAM_VARIABLE_RATE|
1247                                               PA_STREAM_PEAK_DETECT|
1248                                               PA_STREAM_START_MUTED|
1249                                               PA_STREAM_ADJUST_LATENCY|
1250                                               PA_STREAM_EARLY_REQUESTS|
1251                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1252                                               PA_STREAM_START_UNMUTED|
1253                                               PA_STREAM_FAIL_ON_SUSPEND|
1254                                               PA_STREAM_RELATIVE_VOLUME|
1255                                               PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1256 
1257     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1258     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1259     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1260     /* Although some of the other flags are not supported on older
1261      * version, we don't check for them here, because it doesn't hurt
1262      * when they are passed but actually not supported. This makes
1263      * client development easier */
1264 
1265     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1266     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1267     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1268 
1269     pa_stream_ref(s);
1270 
1271     s->direction = direction;
1272 
1273     if (sync_stream)
1274         s->syncid = sync_stream->syncid;
1275 
1276     if (attr)
1277         s->buffer_attr = *attr;
1278     patch_buffer_attr(s, &s->buffer_attr, &flags);
1279 
1280     s->flags = flags;
1281     s->corked = !!(flags & PA_STREAM_START_CORKED);
1282 
1283     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1284         pa_usec_t x;
1285 
1286         x = pa_rtclock_now();
1287 
1288         pa_assert(!s->smoother);
1289 #ifdef USE_SMOOTHER_2
1290         s->smoother = pa_smoother_2_new(SMOOTHER_HISTORY_TIME, x, 0, 0);
1291 #else
1292         s->smoother = pa_smoother_new(
1293                 SMOOTHER_ADJUST_TIME,
1294                 SMOOTHER_HISTORY_TIME,
1295                 !(flags & PA_STREAM_NOT_MONOTONIC),
1296                 true,
1297                 SMOOTHER_MIN_HISTORY,
1298                 x,
1299                 true);
1300 #endif
1301     }
1302 
1303     if (!dev)
1304         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1305 
1306     t = pa_tagstruct_command(
1307             s->context,
1308             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1309             &tag);
1310 
1311     if (s->context->version < 13)
1312         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1313 
1314     pa_tagstruct_put(
1315             t,
1316             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1317             PA_TAG_CHANNEL_MAP, &s->channel_map,
1318             PA_TAG_U32, PA_INVALID_INDEX,
1319             PA_TAG_STRING, dev,
1320             PA_TAG_U32, s->buffer_attr.maxlength,
1321             PA_TAG_BOOLEAN, s->corked,
1322             PA_TAG_INVALID);
1323 
1324     if (!volume) {
1325         if (pa_sample_spec_valid(&s->sample_spec))
1326             volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1327         else {
1328             /* This is not really relevant, since no volume was set, and
1329              * the real number of channels is embedded in the format_info
1330              * structure */
1331             volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1332         }
1333     }
1334 
1335     if (s->direction == PA_STREAM_PLAYBACK) {
1336         pa_tagstruct_put(
1337                 t,
1338                 PA_TAG_U32, s->buffer_attr.tlength,
1339                 PA_TAG_U32, s->buffer_attr.prebuf,
1340                 PA_TAG_U32, s->buffer_attr.minreq,
1341                 PA_TAG_U32, s->syncid,
1342                 PA_TAG_INVALID);
1343 
1344         pa_tagstruct_put_cvolume(t, volume);
1345     } else
1346         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1347 
1348     if (s->context->version >= 12) {
1349         pa_tagstruct_put(
1350                 t,
1351                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1352                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1353                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1354                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1355                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1356                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1357                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1358                 PA_TAG_INVALID);
1359     }
1360 
1361     if (s->context->version >= 13) {
1362 
1363         if (s->direction == PA_STREAM_PLAYBACK)
1364             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1365         else
1366             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1367 
1368         pa_tagstruct_put(
1369                 t,
1370                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1371                 PA_TAG_PROPLIST, s->proplist,
1372                 PA_TAG_INVALID);
1373 
1374         if (s->direction == PA_STREAM_RECORD)
1375             pa_tagstruct_putu32(t, s->direct_on_input);
1376     }
1377 
1378     if (s->context->version >= 14) {
1379 
1380         if (s->direction == PA_STREAM_PLAYBACK)
1381             pa_tagstruct_put_boolean(t, volume_set);
1382 
1383         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1384     }
1385 
1386     if (s->context->version >= 15) {
1387 
1388         if (s->direction == PA_STREAM_PLAYBACK)
1389             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1390 
1391         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1392         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1393     }
1394 
1395     if (s->context->version >= 17 && s->direction == PA_STREAM_PLAYBACK)
1396         pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1397 
1398     if (s->context->version >= 18 && s->direction == PA_STREAM_PLAYBACK)
1399         pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1400 
1401     if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1402         || s->context->version >= 22) {
1403 
1404         pa_tagstruct_putu8(t, s->n_formats);
1405         for (i = 0; i < s->n_formats; i++)
1406             pa_tagstruct_put_format_info(t, s->req_formats[i]);
1407     }
1408 
1409     if (s->context->version >= 22 && s->direction == PA_STREAM_RECORD) {
1410         pa_tagstruct_put_cvolume(t, volume);
1411         pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1412         pa_tagstruct_put_boolean(t, volume_set);
1413         pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1414         pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1415         pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1416     }
1417 
1418     pa_pstream_send_tagstruct(s->context->pstream, t);
1419     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1420 
1421     pa_stream_set_state(s, PA_STREAM_CREATING);
1422 
1423     pa_stream_unref(s);
1424     return 0;
1425 }
1426 
pa_stream_connect_playback(pa_stream * s,const char * dev,const pa_buffer_attr * attr,pa_stream_flags_t flags,const pa_cvolume * volume,pa_stream * sync_stream)1427 int pa_stream_connect_playback(
1428         pa_stream *s,
1429         const char *dev,
1430         const pa_buffer_attr *attr,
1431         pa_stream_flags_t flags,
1432         const pa_cvolume *volume,
1433         pa_stream *sync_stream) {
1434 
1435     pa_assert(s);
1436     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1437 
1438     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1439 }
1440 
pa_stream_connect_record(pa_stream * s,const char * dev,const pa_buffer_attr * attr,pa_stream_flags_t flags)1441 int pa_stream_connect_record(
1442         pa_stream *s,
1443         const char *dev,
1444         const pa_buffer_attr *attr,
1445         pa_stream_flags_t flags) {
1446 
1447     pa_assert(s);
1448     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1449 
1450     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1451 }
1452 
pa_stream_begin_write(pa_stream * s,void ** data,size_t * nbytes)1453 int pa_stream_begin_write(
1454         pa_stream *s,
1455         void **data,
1456         size_t *nbytes) {
1457 
1458     pa_assert(s);
1459     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1460 
1461     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1462     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1463     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1464     PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1465     PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1466 
1467     if (*nbytes != (size_t) -1) {
1468         size_t m, fs;
1469 
1470         m = pa_mempool_block_size_max(s->context->mempool);
1471         fs = pa_frame_size(&s->sample_spec);
1472 
1473         m = (m / fs) * fs;
1474         if (*nbytes > m)
1475             *nbytes = m;
1476     }
1477 
1478     if (!s->write_memblock) {
1479         s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1480         s->write_data = pa_memblock_acquire(s->write_memblock);
1481     }
1482 
1483     *data = s->write_data;
1484     *nbytes = pa_memblock_get_length(s->write_memblock);
1485 
1486     return 0;
1487 }
1488 
pa_stream_cancel_write(pa_stream * s)1489 int pa_stream_cancel_write(
1490         pa_stream *s) {
1491 
1492     pa_assert(s);
1493     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1494 
1495     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1496     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1497     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1498     PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1499 
1500     pa_assert(s->write_data);
1501 
1502     pa_memblock_release(s->write_memblock);
1503     pa_memblock_unref(s->write_memblock);
1504     s->write_memblock = NULL;
1505     s->write_data = NULL;
1506 
1507     return 0;
1508 }
1509 
pa_stream_write_ext_free(pa_stream * s,const void * data,size_t length,pa_free_cb_t free_cb,void * free_cb_data,int64_t offset,pa_seek_mode_t seek)1510 int pa_stream_write_ext_free(
1511         pa_stream *s,
1512         const void *data,
1513         size_t length,
1514         pa_free_cb_t free_cb,
1515         void *free_cb_data,
1516         int64_t offset,
1517         pa_seek_mode_t seek) {
1518 
1519     pa_assert(s);
1520     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1521     pa_assert(data);
1522 
1523     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1524     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1525     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1526     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1527     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1528     PA_CHECK_VALIDITY(s->context,
1529                       !s->write_memblock ||
1530                       ((data >= s->write_data) &&
1531                        ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1532                       PA_ERR_INVALID);
1533     PA_CHECK_VALIDITY(s->context, offset % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID);
1534     PA_CHECK_VALIDITY(s->context, length % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID);
1535     PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1536 
1537     if (s->write_memblock) {
1538         pa_memchunk chunk;
1539 
1540         /* pa_stream_write_begin() was called before */
1541 
1542         pa_memblock_release(s->write_memblock);
1543 
1544         chunk.memblock = s->write_memblock;
1545         chunk.index = (const char *) data - (const char *) s->write_data;
1546         chunk.length = length;
1547 
1548         s->write_memblock = NULL;
1549         s->write_data = NULL;
1550 
1551         pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk, pa_frame_size(&s->sample_spec));
1552         pa_memblock_unref(chunk.memblock);
1553 
1554     } else {
1555         pa_seek_mode_t t_seek = seek;
1556         int64_t t_offset = offset;
1557         size_t t_length = length;
1558         const void *t_data = data;
1559 
1560         /* pa_stream_write_begin() was not called before */
1561 
1562         while (t_length > 0) {
1563             pa_memchunk chunk;
1564 
1565             chunk.index = 0;
1566 
1567             if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1568                 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, free_cb_data, 1);
1569                 chunk.length = t_length;
1570             } else {
1571                 void *d;
1572                 size_t blk_size_max;
1573 
1574                 /* Break large audio streams into _aligned_ blocks or the
1575                  * other endpoint will happily discard them upon arrival. */
1576                 blk_size_max = pa_frame_align(pa_mempool_block_size_max(s->context->mempool), &s->sample_spec);
1577                 chunk.length = PA_MIN(t_length, blk_size_max);
1578                 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1579 
1580                 d = pa_memblock_acquire(chunk.memblock);
1581                 memcpy(d, t_data, chunk.length);
1582                 pa_memblock_release(chunk.memblock);
1583             }
1584 
1585             pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk, pa_frame_size(&s->sample_spec));
1586 
1587             t_offset = 0;
1588             t_seek = PA_SEEK_RELATIVE;
1589 
1590             t_data = (const uint8_t*) t_data + chunk.length;
1591             t_length -= chunk.length;
1592 
1593             pa_memblock_unref(chunk.memblock);
1594         }
1595 
1596         if (free_cb && pa_pstream_get_shm(s->context->pstream))
1597             free_cb(free_cb_data);
1598     }
1599 
1600     /* This is obviously wrong since we ignore the seeking index . But
1601      * that's OK, the server side applies the same error */
1602     s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1603 
1604 #ifdef STREAM_DEBUG
1605     pa_log_debug("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes);
1606 #endif
1607 
1608     if (s->direction == PA_STREAM_PLAYBACK) {
1609 
1610         /* Update latency request correction */
1611         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1612 
1613             if (seek == PA_SEEK_ABSOLUTE) {
1614                 s->write_index_corrections[s->current_write_index_correction].corrupt = false;
1615                 s->write_index_corrections[s->current_write_index_correction].absolute = true;
1616                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1617             } else if (seek == PA_SEEK_RELATIVE) {
1618                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1619                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1620             } else
1621                 s->write_index_corrections[s->current_write_index_correction].corrupt = true;
1622         }
1623 
1624         /* Update the write index in the already available latency data */
1625         if (s->timing_info_valid) {
1626 
1627             if (seek == PA_SEEK_ABSOLUTE) {
1628                 s->timing_info.write_index_corrupt = false;
1629                 s->timing_info.write_index = offset + (int64_t) length;
1630             } else if (seek == PA_SEEK_RELATIVE) {
1631                 if (!s->timing_info.write_index_corrupt)
1632                     s->timing_info.write_index += offset + (int64_t) length;
1633             } else
1634                 s->timing_info.write_index_corrupt = true;
1635         }
1636 
1637         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1638             request_auto_timing_update(s, true);
1639     }
1640 
1641     return 0;
1642 }
1643 
pa_stream_write(pa_stream * s,const void * data,size_t length,pa_free_cb_t free_cb,int64_t offset,pa_seek_mode_t seek)1644 int pa_stream_write(
1645         pa_stream *s,
1646         const void *data,
1647         size_t length,
1648         pa_free_cb_t free_cb,
1649         int64_t offset,
1650         pa_seek_mode_t seek) {
1651 
1652     return pa_stream_write_ext_free(s, data, length, free_cb, (void*) data, offset, seek);
1653 }
1654 
pa_stream_peek(pa_stream * s,const void ** data,size_t * length)1655 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1656     pa_assert(s);
1657     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1658     pa_assert(data);
1659     pa_assert(length);
1660 
1661     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1662     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1663     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1664 
1665     if (!s->peek_memchunk.memblock) {
1666 
1667         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1668             /* record_memblockq is empty. */
1669             *data = NULL;
1670             *length = 0;
1671             return 0;
1672 
1673         } else if (!s->peek_memchunk.memblock) {
1674             /* record_memblockq isn't empty, but it doesn't have any data at
1675              * the current read index. */
1676             *data = NULL;
1677             *length = s->peek_memchunk.length;
1678             return 0;
1679         }
1680 
1681         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1682     }
1683 
1684     pa_assert(s->peek_data);
1685     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1686     *length = s->peek_memchunk.length;
1687     return 0;
1688 }
1689 
pa_stream_drop(pa_stream * s)1690 int pa_stream_drop(pa_stream *s) {
1691     pa_assert(s);
1692     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1693 
1694     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1695     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1696     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1697     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.length > 0, PA_ERR_BADSTATE);
1698 
1699     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1700 
1701     /* Fix the simulated local read index */
1702     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1703         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1704 
1705     if (s->peek_memchunk.memblock) {
1706         pa_assert(s->peek_data);
1707         s->peek_data = NULL;
1708         pa_memblock_release(s->peek_memchunk.memblock);
1709         pa_memblock_unref(s->peek_memchunk.memblock);
1710     }
1711 
1712     pa_memchunk_reset(&s->peek_memchunk);
1713 
1714     return 0;
1715 }
1716 
pa_stream_writable_size(const pa_stream * s)1717 size_t pa_stream_writable_size(const pa_stream *s) {
1718     pa_assert(s);
1719     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1720 
1721     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1722     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1723     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1724 
1725     return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1726 }
1727 
pa_stream_readable_size(const pa_stream * s)1728 size_t pa_stream_readable_size(const pa_stream *s) {
1729     pa_assert(s);
1730     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1731 
1732     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1733     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1734     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1735 
1736     return pa_memblockq_get_length(s->record_memblockq);
1737 }
1738 
pa_stream_drain(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)1739 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1740     pa_operation *o;
1741     pa_tagstruct *t;
1742     uint32_t tag;
1743 
1744     pa_assert(s);
1745     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1746 
1747     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1748     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1749     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1750 
1751     /* Ask for a timing update before we cork/uncork to get the best
1752      * accuracy for the transport latency suitable for the
1753      * check_smoother_status() call in the started callback */
1754     request_auto_timing_update(s, true);
1755 
1756     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1757 
1758     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1759     pa_tagstruct_putu32(t, s->channel);
1760     pa_pstream_send_tagstruct(s->context->pstream, t);
1761     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1762 
1763     /* This might cause the read index to continue again, hence
1764      * let's request a timing update */
1765     request_auto_timing_update(s, true);
1766 
1767     return o;
1768 }
1769 
calc_time(const pa_stream * s,bool ignore_transport)1770 static pa_usec_t calc_time(const pa_stream *s, bool ignore_transport) {
1771     pa_usec_t usec;
1772 
1773     pa_assert(s);
1774     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1775     pa_assert(s->state == PA_STREAM_READY);
1776     pa_assert(s->direction != PA_STREAM_UPLOAD);
1777     pa_assert(s->timing_info_valid);
1778     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1779     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1780 
1781     if (s->direction == PA_STREAM_PLAYBACK) {
1782         /* The last byte that was written into the output device
1783          * had this time value associated */
1784         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1785 
1786         if (!s->corked && !s->suspended) {
1787 
1788             if (!ignore_transport)
1789                 /* Because the latency info took a little time to come
1790                  * to us, we assume that the real output time is actually
1791                  * a little ahead */
1792                 usec += s->timing_info.transport_usec;
1793 
1794             /* However, the output device usually maintains a buffer
1795                too, hence the real sample currently played is a little
1796                back  */
1797             if (s->timing_info.sink_usec >= usec)
1798                 usec = 0;
1799             else
1800                 usec -= s->timing_info.sink_usec;
1801         }
1802 
1803     } else {
1804         pa_assert(s->direction == PA_STREAM_RECORD);
1805 
1806         /* The last byte written into the server side queue had
1807          * this time value associated */
1808         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1809 
1810         if (!s->corked && !s->suspended) {
1811 
1812             if (!ignore_transport)
1813                 /* Add transport latency */
1814                 usec += s->timing_info.transport_usec;
1815 
1816             /* Add latency of data in device buffer */
1817             usec += s->timing_info.source_usec;
1818 
1819             /* If this is a monitor source, we need to correct the
1820              * time by the playback device buffer */
1821             if (s->timing_info.sink_usec >= usec)
1822                 usec = 0;
1823             else
1824                 usec -= s->timing_info.sink_usec;
1825         }
1826     }
1827 
1828     return usec;
1829 }
1830 
1831 #ifdef USE_SMOOTHER_2
calc_bytes(pa_stream * s,bool ignore_transport)1832 static inline uint64_t calc_bytes(pa_stream *s, bool ignore_transport) {
1833     return (uint64_t)(calc_time(s, ignore_transport) * s->sample_spec.rate / PA_USEC_PER_SEC * pa_frame_size(&s->sample_spec));
1834 }
1835 #endif
1836 
stream_get_timing_info_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1837 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1838     pa_operation *o = userdata;
1839     struct timeval local, remote, now;
1840     pa_timing_info *i;
1841     bool playing = false;
1842     uint64_t underrun_for = 0, playing_for = 0;
1843 
1844     pa_assert(pd);
1845     pa_assert(o);
1846     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1847 
1848     if (!o->context || !o->stream)
1849         goto finish;
1850 
1851     i = &o->stream->timing_info;
1852 
1853     o->stream->timing_info_valid = false;
1854     i->write_index_corrupt = true;
1855     i->read_index_corrupt = true;
1856 
1857     if (command != PA_COMMAND_REPLY) {
1858         if (pa_context_handle_error(o->context, command, t, false) < 0)
1859             goto finish;
1860 
1861     } else {
1862 
1863         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1864             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1865             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1866             pa_tagstruct_get_timeval(t, &local) < 0 ||
1867             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1868             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1869             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1870 
1871             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1872             goto finish;
1873         }
1874 
1875         if (o->context->version >= 13 &&
1876             o->stream->direction == PA_STREAM_PLAYBACK)
1877             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1878                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1879 
1880                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1881                 goto finish;
1882             }
1883 
1884         if (!pa_tagstruct_eof(t)) {
1885             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1886             goto finish;
1887         }
1888         o->stream->timing_info_valid = true;
1889         i->write_index_corrupt = false;
1890         i->read_index_corrupt = false;
1891 
1892         i->playing = (int) playing;
1893         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1894 
1895         pa_gettimeofday(&now);
1896 
1897         /* Calculate timestamps */
1898         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1899             /* local and remote seem to have synchronized clocks */
1900 
1901             if (o->stream->direction == PA_STREAM_PLAYBACK)
1902                 i->transport_usec = pa_timeval_diff(&remote, &local);
1903             else
1904                 i->transport_usec = pa_timeval_diff(&now, &remote);
1905 
1906             i->synchronized_clocks = true;
1907             i->timestamp = remote;
1908         } else {
1909             /* clocks are not synchronized, let's estimate latency then */
1910             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1911             i->synchronized_clocks = false;
1912             i->timestamp = local;
1913             pa_timeval_add(&i->timestamp, i->transport_usec);
1914         }
1915 
1916         /* Invalidate read and write indexes if necessary */
1917         if (tag < o->stream->read_index_not_before)
1918             i->read_index_corrupt = true;
1919 
1920         if (tag < o->stream->write_index_not_before)
1921             i->write_index_corrupt = true;
1922 
1923         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1924             /* Write index correction */
1925 
1926             int n, j;
1927             uint32_t ctag = tag;
1928 
1929             /* Go through the saved correction values and add up the
1930              * total correction.*/
1931             for (n = 0, j = o->stream->current_write_index_correction+1;
1932                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1933                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1934 
1935                 /* Step over invalid data or out-of-date data */
1936                 if (!o->stream->write_index_corrections[j].valid ||
1937                     o->stream->write_index_corrections[j].tag < ctag)
1938                     continue;
1939 
1940                 /* Make sure that everything is in order */
1941                 ctag = o->stream->write_index_corrections[j].tag+1;
1942 
1943                 /* Now fix the write index */
1944                 if (o->stream->write_index_corrections[j].corrupt) {
1945                     /* A corrupting seek was made */
1946                     i->write_index_corrupt = true;
1947                 } else if (o->stream->write_index_corrections[j].absolute) {
1948                     /* An absolute seek was made */
1949                     i->write_index = o->stream->write_index_corrections[j].value;
1950                     i->write_index_corrupt = false;
1951                 } else if (!i->write_index_corrupt) {
1952                     /* A relative seek was made */
1953                     i->write_index += o->stream->write_index_corrections[j].value;
1954                 }
1955             }
1956 
1957             /* Clear old correction entries */
1958             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1959                 if (!o->stream->write_index_corrections[n].valid)
1960                     continue;
1961 
1962                 if (o->stream->write_index_corrections[n].tag <= tag)
1963                     o->stream->write_index_corrections[n].valid = false;
1964             }
1965         }
1966 
1967         if (o->stream->direction == PA_STREAM_RECORD) {
1968             /* Read index correction */
1969 
1970             if (!i->read_index_corrupt)
1971                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1972         }
1973 
1974         /* Update smoother if we're not corked */
1975         if (o->stream->smoother && !o->stream->corked) {
1976             pa_usec_t u, x;
1977 
1978             u = x = pa_rtclock_now() - i->transport_usec;
1979 
1980             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1981                 pa_usec_t su;
1982 
1983                 /* If we weren't playing then it will take some time
1984                  * until the audio will actually come out through the
1985                  * speakers. Since we follow that timing here, we need
1986                  * to try to fix this up */
1987 
1988                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1989 
1990                 if (su < i->sink_usec)
1991                     x += i->sink_usec - su;
1992             }
1993 
1994             if (!i->playing)
1995 #ifdef USE_SMOOTHER_2
1996                 pa_smoother_2_pause(o->stream->smoother, x);
1997 #else
1998                 pa_smoother_pause(o->stream->smoother, x);
1999 #endif
2000 
2001             /* Update the smoother */
2002             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
2003                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
2004 #ifdef USE_SMOOTHER_2
2005                 pa_smoother_2_put(o->stream->smoother, u, calc_bytes(o->stream, true));
2006 #else
2007                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, true));
2008 #endif
2009 
2010             if (i->playing)
2011 #ifdef USE_SMOOTHER_2
2012                 pa_smoother_2_resume(o->stream->smoother, x);
2013 #else
2014                 pa_smoother_resume(o->stream->smoother, x, true);
2015 #endif
2016         }
2017     }
2018 
2019     o->stream->auto_timing_update_requested = false;
2020 
2021     if (o->stream->latency_update_callback)
2022         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
2023 
2024     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
2025         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2026         cb(o->stream, o->stream->timing_info_valid, o->userdata);
2027     }
2028 
2029 finish:
2030 
2031     pa_operation_done(o);
2032     pa_operation_unref(o);
2033 }
2034 
pa_stream_update_timing_info(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)2035 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2036     uint32_t tag;
2037     pa_operation *o;
2038     pa_tagstruct *t;
2039     struct timeval now;
2040     int cidx = 0;
2041 
2042     pa_assert(s);
2043     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2044 
2045     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2046     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2047     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2048 
2049     if (s->direction == PA_STREAM_PLAYBACK) {
2050         /* Find a place to store the write_index correction data for this entry */
2051         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
2052 
2053         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
2054         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
2055     }
2056     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2057 
2058     t = pa_tagstruct_command(
2059             s->context,
2060             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
2061             &tag);
2062     pa_tagstruct_putu32(t, s->channel);
2063     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
2064 
2065     pa_pstream_send_tagstruct(s->context->pstream, t);
2066     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2067 
2068     if (s->direction == PA_STREAM_PLAYBACK) {
2069         /* Fill in initial correction data */
2070 
2071         s->current_write_index_correction = cidx;
2072 
2073         s->write_index_corrections[cidx].valid = true;
2074         s->write_index_corrections[cidx].absolute = false;
2075         s->write_index_corrections[cidx].corrupt = false;
2076         s->write_index_corrections[cidx].tag = tag;
2077         s->write_index_corrections[cidx].value = 0;
2078     }
2079 
2080     return o;
2081 }
2082 
pa_stream_disconnect_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)2083 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2084     pa_stream *s = userdata;
2085 
2086     pa_assert(pd);
2087     pa_assert(s);
2088     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2089 
2090     pa_stream_ref(s);
2091 
2092     if (command != PA_COMMAND_REPLY) {
2093         if (pa_context_handle_error(s->context, command, t, false) < 0)
2094             goto finish;
2095 
2096         pa_stream_set_state(s, PA_STREAM_FAILED);
2097         goto finish;
2098     } else if (!pa_tagstruct_eof(t)) {
2099         pa_context_fail(s->context, PA_ERR_PROTOCOL);
2100         goto finish;
2101     }
2102 
2103     pa_stream_set_state(s, PA_STREAM_TERMINATED);
2104 
2105 finish:
2106     pa_stream_unref(s);
2107 }
2108 
pa_stream_disconnect(pa_stream * s)2109 int pa_stream_disconnect(pa_stream *s) {
2110     pa_tagstruct *t;
2111     uint32_t tag;
2112 
2113     pa_assert(s);
2114     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2115 
2116     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2117     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
2118     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
2119 
2120     pa_stream_ref(s);
2121 
2122     t = pa_tagstruct_command(
2123             s->context,
2124             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
2125                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
2126             &tag);
2127     pa_tagstruct_putu32(t, s->channel);
2128     pa_pstream_send_tagstruct(s->context->pstream, t);
2129     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
2130 
2131     pa_stream_unref(s);
2132     return 0;
2133 }
2134 
pa_stream_set_read_callback(pa_stream * s,pa_stream_request_cb_t cb,void * userdata)2135 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2136     pa_assert(s);
2137     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2138 
2139     if (pa_detect_fork())
2140         return;
2141 
2142     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2143         return;
2144 
2145     s->read_callback = cb;
2146     s->read_userdata = userdata;
2147 }
2148 
pa_stream_set_write_callback(pa_stream * s,pa_stream_request_cb_t cb,void * userdata)2149 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2150     pa_assert(s);
2151     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2152 
2153     if (pa_detect_fork())
2154         return;
2155 
2156     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2157         return;
2158 
2159     s->write_callback = cb;
2160     s->write_userdata = userdata;
2161 }
2162 
pa_stream_set_state_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2163 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2164     pa_assert(s);
2165     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2166 
2167     if (pa_detect_fork())
2168         return;
2169 
2170     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2171         return;
2172 
2173     s->state_callback = cb;
2174     s->state_userdata = userdata;
2175 }
2176 
pa_stream_set_overflow_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2177 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2178     pa_assert(s);
2179     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2180 
2181     if (pa_detect_fork())
2182         return;
2183 
2184     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2185         return;
2186 
2187     s->overflow_callback = cb;
2188     s->overflow_userdata = userdata;
2189 }
2190 
pa_stream_set_underflow_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2191 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2192     pa_assert(s);
2193     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2194 
2195     if (pa_detect_fork())
2196         return;
2197 
2198     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2199         return;
2200 
2201     s->underflow_callback = cb;
2202     s->underflow_userdata = userdata;
2203 }
2204 
pa_stream_set_underflow_ohos_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2205 void pa_stream_set_underflow_ohos_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2206     pa_assert(s);
2207     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2208 
2209     if (pa_detect_fork())
2210         return;
2211 
2212     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2213         return;
2214 
2215     s->underflow_ohos_callback = cb;
2216     s->underflow_ohos_userdata = userdata;
2217 }
2218 
pa_stream_set_latency_update_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2219 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2220     pa_assert(s);
2221     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2222 
2223     if (pa_detect_fork())
2224         return;
2225 
2226     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2227         return;
2228 
2229     s->latency_update_callback = cb;
2230     s->latency_update_userdata = userdata;
2231 }
2232 
pa_stream_set_moved_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2233 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2234     pa_assert(s);
2235     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2236 
2237     if (pa_detect_fork())
2238         return;
2239 
2240     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2241         return;
2242 
2243     s->moved_callback = cb;
2244     s->moved_userdata = userdata;
2245 }
2246 
pa_stream_set_suspended_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2247 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2248     pa_assert(s);
2249     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2250 
2251     if (pa_detect_fork())
2252         return;
2253 
2254     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2255         return;
2256 
2257     s->suspended_callback = cb;
2258     s->suspended_userdata = userdata;
2259 }
2260 
pa_stream_set_started_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2261 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2262     pa_assert(s);
2263     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2264 
2265     if (pa_detect_fork())
2266         return;
2267 
2268     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2269         return;
2270 
2271     s->started_callback = cb;
2272     s->started_userdata = userdata;
2273 }
2274 
pa_stream_set_event_callback(pa_stream * s,pa_stream_event_cb_t cb,void * userdata)2275 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2276     pa_assert(s);
2277     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2278 
2279     if (pa_detect_fork())
2280         return;
2281 
2282     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2283         return;
2284 
2285     s->event_callback = cb;
2286     s->event_userdata = userdata;
2287 }
2288 
pa_stream_set_buffer_attr_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2289 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2290     pa_assert(s);
2291     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2292 
2293     if (pa_detect_fork())
2294         return;
2295 
2296     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2297         return;
2298 
2299     s->buffer_attr_callback = cb;
2300     s->buffer_attr_userdata = userdata;
2301 }
2302 
pa_stream_simple_ack_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)2303 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2304     pa_operation *o = userdata;
2305     int success = 1;
2306 
2307     pa_assert(pd);
2308     pa_assert(o);
2309     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2310 
2311     if (!o->context)
2312         goto finish;
2313 
2314     if (command != PA_COMMAND_REPLY) {
2315         if (pa_context_handle_error(o->context, command, t, false) < 0)
2316             goto finish;
2317 
2318         success = 0;
2319     } else if (!pa_tagstruct_eof(t)) {
2320         pa_context_fail(o->context, PA_ERR_PROTOCOL);
2321         goto finish;
2322     }
2323 
2324     if (o->callback) {
2325         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2326         cb(o->stream, success, o->userdata);
2327     }
2328 
2329 finish:
2330     pa_operation_done(o);
2331     pa_operation_unref(o);
2332 }
2333 
pa_stream_cork(pa_stream * s,int b,pa_stream_success_cb_t cb,void * userdata)2334 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2335     pa_operation *o;
2336     pa_tagstruct *t;
2337     uint32_t tag;
2338 
2339     pa_assert(s);
2340     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2341 
2342     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2343     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2344     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2345 
2346     /* Ask for a timing update before we cork/uncork to get the best
2347      * accuracy for the transport latency suitable for the
2348      * check_smoother_status() call in the started callback */
2349     request_auto_timing_update(s, true);
2350 
2351     s->corked = b;
2352 
2353     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2354 
2355     t = pa_tagstruct_command(
2356             s->context,
2357             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2358             &tag);
2359     pa_tagstruct_putu32(t, s->channel);
2360     pa_tagstruct_put_boolean(t, !!b);
2361     pa_pstream_send_tagstruct(s->context->pstream, t);
2362     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2363 
2364     check_smoother_status(s, false, false, false);
2365 
2366     /* This might cause the indexes to hang/start again, hence let's
2367      * request a timing update, after the cork/uncork, too */
2368     request_auto_timing_update(s, true);
2369 
2370     return o;
2371 }
2372 
stream_send_simple_command(pa_stream * s,uint32_t command,pa_stream_success_cb_t cb,void * userdata)2373 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2374     pa_tagstruct *t;
2375     pa_operation *o;
2376     uint32_t tag;
2377 
2378     pa_assert(s);
2379     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2380 
2381     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2382     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2383 
2384     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2385 
2386     t = pa_tagstruct_command(s->context, command, &tag);
2387     pa_tagstruct_putu32(t, s->channel);
2388     pa_pstream_send_tagstruct(s->context->pstream, t);
2389     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2390 
2391     return o;
2392 }
2393 
pa_stream_flush(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)2394 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2395     pa_operation *o;
2396 
2397     pa_assert(s);
2398     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2399 
2400     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2401     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2402     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2403 
2404     /* Ask for a timing update *before* the flush, so that the
2405      * transport usec is as up to date as possible when we get the
2406      * underflow message and update the smoother status*/
2407     request_auto_timing_update(s, true);
2408 
2409     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2410         return NULL;
2411 
2412     if (s->direction == PA_STREAM_PLAYBACK) {
2413 
2414         if (s->write_index_corrections[s->current_write_index_correction].valid)
2415             s->write_index_corrections[s->current_write_index_correction].corrupt = true;
2416 
2417         if (s->buffer_attr.prebuf > 0)
2418             check_smoother_status(s, false, false, true);
2419 
2420         /* This will change the write index, but leave the
2421          * read index untouched. */
2422         invalidate_indexes(s, false, true);
2423 
2424     } else
2425         /* For record streams this has no influence on the write
2426          * index, but the read index might jump. */
2427         invalidate_indexes(s, true, false);
2428 
2429     /* Note that we do not update requested_bytes here. This is
2430      * because we cannot really know how data actually was dropped
2431      * from the write index due to this. This 'error' will be applied
2432      * by both client and server and hence we should be fine. */
2433 
2434     return o;
2435 }
2436 
pa_stream_prebuf(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)2437 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2438     pa_operation *o;
2439 
2440     pa_assert(s);
2441     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2442 
2443     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2444     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2445     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2446     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2447 
2448     /* Ask for a timing update before we cork/uncork to get the best
2449      * accuracy for the transport latency suitable for the
2450      * check_smoother_status() call in the started callback */
2451     request_auto_timing_update(s, true);
2452 
2453     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2454         return NULL;
2455 
2456     /* This might cause the read index to hang again, hence
2457      * let's request a timing update */
2458     request_auto_timing_update(s, true);
2459 
2460     return o;
2461 }
2462 
pa_stream_trigger(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)2463 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2464     pa_operation *o;
2465 
2466     pa_assert(s);
2467     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2468 
2469     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2470     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2471     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2472     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2473 
2474     /* Ask for a timing update before we cork/uncork to get the best
2475      * accuracy for the transport latency suitable for the
2476      * check_smoother_status() call in the started callback */
2477     request_auto_timing_update(s, true);
2478 
2479     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2480         return NULL;
2481 
2482     /* This might cause the read index to start moving again, hence
2483      * let's request a timing update */
2484     request_auto_timing_update(s, true);
2485 
2486     return o;
2487 }
2488 
pa_stream_set_name(pa_stream * s,const char * name,pa_stream_success_cb_t cb,void * userdata)2489 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2490     pa_operation *o;
2491 
2492     pa_assert(s);
2493     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2494     pa_assert(name);
2495 
2496     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2497     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2498     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2499 
2500     if (s->context->version >= 13) {
2501         pa_proplist *p = pa_proplist_new();
2502 
2503         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2504         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2505         pa_proplist_free(p);
2506     } else {
2507         pa_tagstruct *t;
2508         uint32_t tag;
2509 
2510         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2511         t = pa_tagstruct_command(
2512                 s->context,
2513                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2514                 &tag);
2515         pa_tagstruct_putu32(t, s->channel);
2516         pa_tagstruct_puts(t, name);
2517         pa_pstream_send_tagstruct(s->context->pstream, t);
2518         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2519     }
2520 
2521     return o;
2522 }
2523 
pa_stream_get_time(pa_stream * s,pa_usec_t * r_usec)2524 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2525     pa_usec_t usec;
2526 
2527     pa_assert(s);
2528     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2529 
2530     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2531     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2532     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2533     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2534     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2535     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2536 
2537     if (s->smoother)
2538 #ifdef USE_SMOOTHER_2
2539         usec = pa_smoother_2_get(s->smoother, pa_rtclock_now());
2540 #else
2541         usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2542 #endif
2543 
2544     else
2545         usec = calc_time(s, false);
2546 
2547     /* Make sure the time runs monotonically */
2548     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2549         if (usec < s->previous_time)
2550             usec = s->previous_time;
2551         else
2552             s->previous_time = usec;
2553     }
2554 
2555     if (r_usec)
2556         *r_usec = usec;
2557 
2558     return 0;
2559 }
2560 
time_counter_diff(const pa_stream * s,pa_usec_t a,pa_usec_t b,int * negative)2561 static pa_usec_t time_counter_diff(const pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2562     pa_assert(s);
2563     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2564 
2565     if (negative)
2566         *negative = 0;
2567 
2568     if (a >= b)
2569         return a-b;
2570     else {
2571         if (negative && s->direction == PA_STREAM_RECORD) {
2572             *negative = 1;
2573             return b-a;
2574         } else
2575             return 0;
2576     }
2577 }
2578 
pa_stream_get_latency(pa_stream * s,pa_usec_t * r_usec,int * negative)2579 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2580     pa_usec_t t, c;
2581     int r;
2582     int64_t cindex;
2583 
2584     pa_assert(s);
2585     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2586     pa_assert(r_usec);
2587 
2588     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2589     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2590     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2591     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2592     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2593     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2594 
2595     if ((r = pa_stream_get_time(s, &t)) < 0)
2596         return r;
2597 
2598     if (s->direction == PA_STREAM_PLAYBACK)
2599         cindex = s->timing_info.write_index;
2600     else
2601         cindex = s->timing_info.read_index;
2602 
2603     if (cindex < 0)
2604         cindex = 0;
2605 
2606     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2607 
2608     if (s->direction == PA_STREAM_PLAYBACK)
2609         *r_usec = time_counter_diff(s, c, t, negative);
2610     else
2611         *r_usec = time_counter_diff(s, t, c, negative);
2612 
2613     return 0;
2614 }
2615 
pa_stream_get_timing_info(pa_stream * s)2616 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2617     pa_assert(s);
2618     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2619 
2620     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2621     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2622     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2623     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2624 
2625     return &s->timing_info;
2626 }
2627 
pa_stream_get_sample_spec(pa_stream * s)2628 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2629     pa_assert(s);
2630     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2631 
2632     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2633 
2634     return &s->sample_spec;
2635 }
2636 
pa_stream_get_channel_map(pa_stream * s)2637 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2638     pa_assert(s);
2639     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2640 
2641     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2642 
2643     return &s->channel_map;
2644 }
2645 
pa_stream_get_format_info(const pa_stream * s)2646 const pa_format_info* pa_stream_get_format_info(const pa_stream *s) {
2647     pa_assert(s);
2648     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2649 
2650     /* We don't have the format till routing is done */
2651     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2652     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2653 
2654     return s->format;
2655 }
pa_stream_get_buffer_attr(pa_stream * s)2656 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2657     pa_assert(s);
2658     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2659 
2660     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2661     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2662     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2663 
2664     return &s->buffer_attr;
2665 }
2666 
stream_set_buffer_attr_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)2667 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2668     pa_operation *o = userdata;
2669     int success = 1;
2670 
2671     pa_assert(pd);
2672     pa_assert(o);
2673     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2674 
2675     if (!o->context)
2676         goto finish;
2677 
2678     if (command != PA_COMMAND_REPLY) {
2679         if (pa_context_handle_error(o->context, command, t, false) < 0)
2680             goto finish;
2681 
2682         success = 0;
2683     } else {
2684         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2685             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2686                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2687                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2688                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2689                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2690                 goto finish;
2691             }
2692         } else if (o->stream->direction == PA_STREAM_RECORD) {
2693             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2694                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2695                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2696                 goto finish;
2697             }
2698         }
2699 
2700         if (o->stream->context->version >= 13) {
2701             pa_usec_t usec;
2702 
2703             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2704                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2705                 goto finish;
2706             }
2707 
2708             if (o->stream->direction == PA_STREAM_RECORD)
2709                 o->stream->timing_info.configured_source_usec = usec;
2710             else
2711                 o->stream->timing_info.configured_sink_usec = usec;
2712         }
2713 
2714         if (!pa_tagstruct_eof(t)) {
2715             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2716             goto finish;
2717         }
2718     }
2719 
2720     if (o->callback) {
2721         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2722         cb(o->stream, success, o->userdata);
2723     }
2724 
2725 finish:
2726     pa_operation_done(o);
2727     pa_operation_unref(o);
2728 }
2729 
pa_stream_set_buffer_attr(pa_stream * s,const pa_buffer_attr * attr,pa_stream_success_cb_t cb,void * userdata)2730 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2731     pa_operation *o;
2732     pa_tagstruct *t;
2733     uint32_t tag;
2734     pa_buffer_attr copy;
2735 
2736     pa_assert(s);
2737     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2738     pa_assert(attr);
2739 
2740     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2741     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2742     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2743     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2744 
2745     /* Ask for a timing update before we cork/uncork to get the best
2746      * accuracy for the transport latency suitable for the
2747      * check_smoother_status() call in the started callback */
2748     request_auto_timing_update(s, true);
2749 
2750     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2751 
2752     t = pa_tagstruct_command(
2753             s->context,
2754             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2755             &tag);
2756     pa_tagstruct_putu32(t, s->channel);
2757 
2758     copy = *attr;
2759     patch_buffer_attr(s, &copy, NULL);
2760     attr = &copy;
2761 
2762     pa_tagstruct_putu32(t, attr->maxlength);
2763 
2764     if (s->direction == PA_STREAM_PLAYBACK)
2765         pa_tagstruct_put(
2766                 t,
2767                 PA_TAG_U32, attr->tlength,
2768                 PA_TAG_U32, attr->prebuf,
2769                 PA_TAG_U32, attr->minreq,
2770                 PA_TAG_INVALID);
2771     else
2772         pa_tagstruct_putu32(t, attr->fragsize);
2773 
2774     if (s->context->version >= 13)
2775         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2776 
2777     if (s->context->version >= 14)
2778         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2779 
2780     pa_pstream_send_tagstruct(s->context->pstream, t);
2781     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2782 
2783     /* This might cause changes in the read/write index, hence let's
2784      * request a timing update */
2785     request_auto_timing_update(s, true);
2786 
2787     return o;
2788 }
2789 
pa_stream_get_device_index(const pa_stream * s)2790 uint32_t pa_stream_get_device_index(const pa_stream *s) {
2791     pa_assert(s);
2792     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2793 
2794     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2795     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2796     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2797     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2798     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2799 
2800     return s->device_index;
2801 }
2802 
pa_stream_get_device_name(const pa_stream * s)2803 const char *pa_stream_get_device_name(const pa_stream *s) {
2804     pa_assert(s);
2805     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2806 
2807     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2808     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2809     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2810     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2811     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2812 
2813     return s->device_name;
2814 }
2815 
pa_stream_is_suspended(const pa_stream * s)2816 int pa_stream_is_suspended(const pa_stream *s) {
2817     pa_assert(s);
2818     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2819 
2820     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2821     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2822     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2823     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2824 
2825     return s->suspended;
2826 }
2827 
pa_stream_is_corked(const pa_stream * s)2828 int pa_stream_is_corked(const pa_stream *s) {
2829     pa_assert(s);
2830     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2831 
2832     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2833     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2834     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2835 
2836     return s->corked;
2837 }
2838 
stream_update_sample_rate_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)2839 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2840     pa_operation *o = userdata;
2841     int success = 1;
2842 
2843     pa_assert(pd);
2844     pa_assert(o);
2845     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2846 
2847     if (!o->context)
2848         goto finish;
2849 
2850     if (command != PA_COMMAND_REPLY) {
2851         if (pa_context_handle_error(o->context, command, t, false) < 0)
2852             goto finish;
2853 
2854         success = 0;
2855     } else {
2856 
2857         if (!pa_tagstruct_eof(t)) {
2858             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2859             goto finish;
2860         }
2861     }
2862 
2863     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2864 #ifdef USE_SMOOTHER_2
2865     if (o->stream->smoother)
2866         pa_smoother_2_set_rate(o->stream->smoother, pa_rtclock_now(), o->stream->sample_spec.rate);
2867 #endif
2868     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2869 
2870     if (o->callback) {
2871         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2872         cb(o->stream, success, o->userdata);
2873     }
2874 
2875 finish:
2876     pa_operation_done(o);
2877     pa_operation_unref(o);
2878 }
2879 
pa_stream_update_sample_rate(pa_stream * s,uint32_t rate,pa_stream_success_cb_t cb,void * userdata)2880 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2881     pa_operation *o;
2882     pa_tagstruct *t;
2883     uint32_t tag;
2884 
2885     pa_assert(s);
2886     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2887 
2888     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2889     PA_CHECK_VALIDITY_RETURN_NULL(s->context, pa_sample_rate_valid(rate), PA_ERR_INVALID);
2890     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2891     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2892     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2893     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2894 
2895     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2896     o->private = PA_UINT_TO_PTR(rate);
2897 
2898     t = pa_tagstruct_command(
2899             s->context,
2900             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2901             &tag);
2902     pa_tagstruct_putu32(t, s->channel);
2903     pa_tagstruct_putu32(t, rate);
2904 
2905     pa_pstream_send_tagstruct(s->context->pstream, t);
2906     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2907 
2908     return o;
2909 }
2910 
pa_stream_proplist_update(pa_stream * s,pa_update_mode_t mode,pa_proplist * p,pa_stream_success_cb_t cb,void * userdata)2911 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2912     pa_operation *o;
2913     pa_tagstruct *t;
2914     uint32_t tag;
2915 
2916     pa_assert(s);
2917     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2918 
2919     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2920     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2921     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2922     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2923     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2924 
2925     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2926 
2927     t = pa_tagstruct_command(
2928             s->context,
2929             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2930             &tag);
2931     pa_tagstruct_putu32(t, s->channel);
2932     pa_tagstruct_putu32(t, (uint32_t) mode);
2933     pa_tagstruct_put_proplist(t, p);
2934 
2935     pa_pstream_send_tagstruct(s->context->pstream, t);
2936     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2937 
2938     /* Please note that we don't update s->proplist here, because we
2939      * don't export that field */
2940 
2941     return o;
2942 }
2943 
pa_stream_proplist_remove(pa_stream * s,const char * const keys[],pa_stream_success_cb_t cb,void * userdata)2944 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2945     pa_operation *o;
2946     pa_tagstruct *t;
2947     uint32_t tag;
2948     const char * const*k;
2949 
2950     pa_assert(s);
2951     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2952 
2953     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2954     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2955     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2956     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2957     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2958 
2959     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2960 
2961     t = pa_tagstruct_command(
2962             s->context,
2963             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2964             &tag);
2965     pa_tagstruct_putu32(t, s->channel);
2966 
2967     for (k = keys; *k; k++)
2968         pa_tagstruct_puts(t, *k);
2969 
2970     pa_tagstruct_puts(t, NULL);
2971 
2972     pa_pstream_send_tagstruct(s->context->pstream, t);
2973     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2974 
2975     /* Please note that we don't update s->proplist here, because we
2976      * don't export that field */
2977 
2978     return o;
2979 }
2980 
pa_stream_set_monitor_stream(pa_stream * s,uint32_t sink_input_idx)2981 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2982     pa_assert(s);
2983     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2984 
2985     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2986     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2987     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2988     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2989 
2990     s->direct_on_input = sink_input_idx;
2991 
2992     return 0;
2993 }
2994 
pa_stream_get_monitor_stream(const pa_stream * s)2995 uint32_t pa_stream_get_monitor_stream(const pa_stream *s) {
2996     pa_assert(s);
2997     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2998 
2999     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
3000     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
3001     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
3002 
3003     return s->direct_on_input;
3004 }
3005