• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /***
2   This file is part of PulseAudio.
3 
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6 
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11 
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16 
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20 
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24 
25 #include <string.h>
26 #include <stdio.h>
27 #include <string.h>
28 
29 #include <pulse/def.h>
30 #include <pulse/timeval.h>
31 #include <pulse/rtclock.h>
32 #include <pulse/xmalloc.h>
33 #include <pulse/fork-detect.h>
34 
35 #include <pulsecore/pstream-util.h>
36 #include <pulsecore/sample-util.h>
37 #include <pulsecore/log.h>
38 #include <pulsecore/hashmap.h>
39 #include <pulsecore/macro.h>
40 #include <pulsecore/core-rtclock.h>
41 #include <pulsecore/core-util.h>
42 
43 #include "internal.h"
44 #include "stream.h"
45 
46 /* #define STREAM_DEBUG */
47 
48 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
49 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
50 
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #ifndef USE_SMOOTHER_2
53 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
54 #define SMOOTHER_MIN_HISTORY (4)
55 #endif
56 
pa_stream_new(pa_context * c,const char * name,const pa_sample_spec * ss,const pa_channel_map * map)57 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
58     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
59 }
60 
reset_callbacks(pa_stream * s)61 static void reset_callbacks(pa_stream *s) {
62     s->read_callback = NULL;
63     s->read_userdata = NULL;
64     s->write_callback = NULL;
65     s->write_userdata = NULL;
66     s->state_callback = NULL;
67     s->state_userdata = NULL;
68     s->overflow_callback = NULL;
69     s->overflow_userdata = NULL;
70     s->underflow_callback = NULL;
71     s->underflow_userdata = NULL;
72     s->latency_update_callback = NULL;
73     s->latency_update_userdata = NULL;
74     s->moved_callback = NULL;
75     s->moved_userdata = NULL;
76     s->suspended_callback = NULL;
77     s->suspended_userdata = NULL;
78     s->started_callback = NULL;
79     s->started_userdata = NULL;
80     s->event_callback = NULL;
81     s->event_userdata = NULL;
82     s->buffer_attr_callback = NULL;
83     s->buffer_attr_userdata = NULL;
84     s->underflow_ohos_callback = NULL;
85     s->underflow_ohos_userdata = NULL;
86 }
87 
pa_stream_new_with_proplist_internal(pa_context * c,const char * name,const pa_sample_spec * ss,const pa_channel_map * map,pa_format_info * const * formats,unsigned int n_formats,pa_proplist * p)88 static pa_stream *pa_stream_new_with_proplist_internal(
89         pa_context *c,
90         const char *name,
91         const pa_sample_spec *ss,
92         const pa_channel_map *map,
93         pa_format_info * const *formats,
94         unsigned int n_formats,
95         pa_proplist *p) {
96 
97     pa_stream *s;
98     unsigned int i;
99 
100     pa_assert(c);
101     pa_assert(PA_REFCNT_VALUE(c) >= 1);
102     pa_assert((ss == NULL && map == NULL) || (formats == NULL && n_formats == 0));
103     pa_assert(n_formats < PA_MAX_FORMATS);
104 
105     PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
106     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
107 
108     s = pa_xnew(pa_stream, 1);
109     PA_REFCNT_INIT(s);
110     s->context = c;
111     s->mainloop = c->mainloop;
112 
113     s->direction = PA_STREAM_NODIRECTION;
114     s->state = PA_STREAM_UNCONNECTED;
115     s->flags = 0;
116 
117     if (ss)
118         s->sample_spec = *ss;
119     else
120         pa_sample_spec_init(&s->sample_spec);
121 
122     if (map)
123         s->channel_map = *map;
124     else
125         pa_channel_map_init(&s->channel_map);
126 
127     s->n_formats = 0;
128     if (formats) {
129         s->n_formats = n_formats;
130         for (i = 0; i < n_formats; i++)
131             s->req_formats[i] = pa_format_info_copy(formats[i]);
132     }
133 
134     /* We'll get the final negotiated format after connecting */
135     s->format = NULL;
136 
137     s->direct_on_input = PA_INVALID_INDEX;
138 
139     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
140     if (name)
141         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
142 
143     s->channel = 0;
144     s->channel_valid = false;
145     s->syncid = c->csyncid++;
146     s->stream_index = PA_INVALID_INDEX;
147 
148     s->requested_bytes = 0;
149     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
150 
151     /* We initialize the target length here, so that if the user
152      * passes no explicit buffering metrics the default is similar to
153      * what older PA versions provided. */
154 
155     s->buffer_attr.maxlength = (uint32_t) -1;
156     if (ss)
157         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
158     else {
159         /* FIXME: We assume a worst-case compressed format corresponding to
160          * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
161         pa_sample_spec tmp_ss = {
162             .format   = PA_SAMPLE_S16NE,
163             .rate     = 48000,
164             .channels = 2,
165         };
166         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
167     }
168     s->buffer_attr.minreq = (uint32_t) -1;
169     s->buffer_attr.prebuf = (uint32_t) -1;
170     s->buffer_attr.fragsize = (uint32_t) -1;
171 
172     s->device_index = PA_INVALID_INDEX;
173     s->device_name = NULL;
174     s->suspended = false;
175     s->corked = false;
176 
177     s->write_memblock = NULL;
178     s->write_data = NULL;
179 
180     pa_memchunk_reset(&s->peek_memchunk);
181     s->peek_data = NULL;
182     s->record_memblockq = NULL;
183 
184     memset(&s->timing_info, 0, sizeof(s->timing_info));
185     s->timing_info_valid = false;
186 
187     s->previous_time = 0;
188     s->latest_underrun_at_index = -1;
189 
190     s->read_index_not_before = 0;
191     s->write_index_not_before = 0;
192     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
193         s->write_index_corrections[i].valid = 0;
194     s->current_write_index_correction = 0;
195 
196     s->auto_timing_update_event = NULL;
197     s->auto_timing_update_requested = false;
198     s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
199 
200     reset_callbacks(s);
201 
202     s->smoother = NULL;
203 
204     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
205     PA_LLIST_PREPEND(pa_stream, c->streams, s);
206     pa_stream_ref(s);
207 
208     return s;
209 }
210 
pa_stream_new_with_proplist(pa_context * c,const char * name,const pa_sample_spec * ss,const pa_channel_map * map,pa_proplist * p)211 pa_stream *pa_stream_new_with_proplist(
212         pa_context *c,
213         const char *name,
214         const pa_sample_spec *ss,
215         const pa_channel_map *map,
216         pa_proplist *p) {
217 
218     pa_channel_map tmap;
219 
220     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
221     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
222     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
223     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
224     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
225 
226     if (!map)
227         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
228 
229     return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, 0, p);
230 }
231 
pa_stream_new_extended(pa_context * c,const char * name,pa_format_info * const * formats,unsigned int n_formats,pa_proplist * p)232 pa_stream *pa_stream_new_extended(
233         pa_context *c,
234         const char *name,
235         pa_format_info * const *formats,
236         unsigned int n_formats,
237         pa_proplist *p) {
238 
239     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
240 
241     return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, n_formats, p);
242 }
243 
stream_unlink(pa_stream * s)244 static void stream_unlink(pa_stream *s) {
245     pa_operation *o, *n;
246     pa_assert(s);
247 
248     if (!s->context)
249         return;
250 
251     /* Detach from context */
252 
253     /* Unref all operation objects that point to us */
254     for (o = s->context->operations; o; o = n) {
255         n = o->next;
256 
257         if (o->stream == s)
258             pa_operation_cancel(o);
259     }
260 
261     /* Drop all outstanding replies for this stream */
262     if (s->context->pdispatch)
263         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
264 
265     if (s->channel_valid) {
266         pa_hashmap_remove((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel));
267         s->channel = 0;
268         s->channel_valid = false;
269     }
270 
271     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
272     pa_stream_unref(s);
273 
274     s->context = NULL;
275 
276     if (s->auto_timing_update_event) {
277         pa_assert(s->mainloop);
278         s->mainloop->time_free(s->auto_timing_update_event);
279     }
280 
281     reset_callbacks(s);
282 }
283 
stream_free(pa_stream * s)284 static void stream_free(pa_stream *s) {
285     unsigned int i;
286 
287     pa_assert(s);
288 
289     stream_unlink(s);
290 
291     if (s->write_memblock) {
292         if (s->write_data)
293             pa_memblock_release(s->write_memblock);
294         pa_memblock_unref(s->write_memblock);
295     }
296 
297     if (s->peek_memchunk.memblock) {
298         if (s->peek_data)
299             pa_memblock_release(s->peek_memchunk.memblock);
300         pa_memblock_unref(s->peek_memchunk.memblock);
301     }
302 
303     if (s->record_memblockq)
304         pa_memblockq_free(s->record_memblockq);
305 
306     if (s->proplist)
307         pa_proplist_free(s->proplist);
308 
309     if (s->smoother)
310 #ifdef USE_SMOOTHER_2
311         pa_smoother_2_free(s->smoother);
312 #else
313         pa_smoother_free(s->smoother);
314 #endif
315 
316     for (i = 0; i < s->n_formats; i++)
317         pa_format_info_free(s->req_formats[i]);
318 
319     if (s->format)
320         pa_format_info_free(s->format);
321 
322     pa_xfree(s->device_name);
323     pa_xfree(s);
324 }
325 
pa_stream_unref(pa_stream * s)326 void pa_stream_unref(pa_stream *s) {
327     pa_assert(s);
328     pa_assert(PA_REFCNT_VALUE(s) >= 1);
329 
330     if (PA_REFCNT_DEC(s) <= 0)
331         stream_free(s);
332 }
333 
pa_stream_ref(pa_stream * s)334 pa_stream* pa_stream_ref(pa_stream *s) {
335     pa_assert(s);
336     pa_assert(PA_REFCNT_VALUE(s) >= 1);
337 
338     PA_REFCNT_INC(s);
339     return s;
340 }
341 
pa_stream_get_state(const pa_stream * s)342 pa_stream_state_t pa_stream_get_state(const pa_stream *s) {
343     pa_assert(s);
344     pa_assert(PA_REFCNT_VALUE(s) >= 1);
345 
346     return s->state;
347 }
348 
pa_stream_get_context(const pa_stream * s)349 pa_context* pa_stream_get_context(const pa_stream *s) {
350     pa_assert(s);
351     pa_assert(PA_REFCNT_VALUE(s) >= 1);
352 
353     return s->context;
354 }
355 
pa_stream_get_index(const pa_stream * s)356 uint32_t pa_stream_get_index(const pa_stream *s) {
357     pa_assert(s);
358     pa_assert(PA_REFCNT_VALUE(s) >= 1);
359 
360     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
361     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
362 
363     return s->stream_index;
364 }
365 
pa_stream_set_state(pa_stream * s,pa_stream_state_t st)366 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
367     pa_assert(s);
368     pa_assert(PA_REFCNT_VALUE(s) >= 1);
369 
370     if (s->state == st)
371         return;
372 
373     pa_stream_ref(s);
374 
375     s->state = st;
376 
377     if (s->state_callback)
378         s->state_callback(s, s->state_userdata);
379 
380     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
381         stream_unlink(s);
382 
383     pa_stream_unref(s);
384 }
385 
request_auto_timing_update(pa_stream * s,bool force)386 static void request_auto_timing_update(pa_stream *s, bool force) {
387     pa_assert(s);
388     pa_assert(PA_REFCNT_VALUE(s) >= 1);
389 
390     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
391         return;
392 
393     if (s->state == PA_STREAM_READY &&
394         (force || !s->auto_timing_update_requested)) {
395         pa_operation *o;
396 
397 #ifdef STREAM_DEBUG
398         pa_log_debug("Automatically requesting new timing data");
399 #endif
400 
401         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
402             pa_operation_unref(o);
403             s->auto_timing_update_requested = true;
404         }
405     }
406 
407     if (s->auto_timing_update_event) {
408         if (s->suspended && !force) {
409             pa_assert(s->mainloop);
410             s->mainloop->time_free(s->auto_timing_update_event);
411             s->auto_timing_update_event = NULL;
412         } else {
413             if (force)
414                 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
415 
416             pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
417 
418             s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
419         }
420     }
421 }
422 
pa_command_stream_killed(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)423 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
424     pa_context *c = userdata;
425     pa_stream *s;
426     uint32_t channel;
427 
428     pa_assert(pd);
429     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
430     pa_assert(t);
431     pa_assert(c);
432     pa_assert(PA_REFCNT_VALUE(c) >= 1);
433 
434     pa_context_ref(c);
435 
436     if (pa_tagstruct_getu32(t, &channel) < 0 ||
437         !pa_tagstruct_eof(t)) {
438         pa_context_fail(c, PA_ERR_PROTOCOL);
439         goto finish;
440     }
441 
442     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
443         goto finish;
444 
445     if (s->state != PA_STREAM_READY)
446         goto finish;
447 
448     pa_context_set_error(c, PA_ERR_KILLED);
449     pa_stream_set_state(s, PA_STREAM_FAILED);
450 
451 finish:
452     pa_context_unref(c);
453 }
454 
check_smoother_status(pa_stream * s,bool aposteriori,bool force_start,bool force_stop)455 static void check_smoother_status(pa_stream *s, bool aposteriori, bool force_start, bool force_stop) {
456     pa_usec_t x;
457 
458     pa_assert(s);
459     pa_assert(!force_start || !force_stop);
460 
461     if (!s->smoother)
462         return;
463 
464     x = pa_rtclock_now();
465 
466     if (s->timing_info_valid) {
467         if (aposteriori)
468             x -= s->timing_info.transport_usec;
469         else
470             x += s->timing_info.transport_usec;
471     }
472 
473     if (s->suspended || s->corked || force_stop)
474 #ifdef USE_SMOOTHER_2
475         pa_smoother_2_pause(s->smoother, x);
476 #else
477         pa_smoother_pause(s->smoother, x);
478 #endif
479     else if (force_start || s->buffer_attr.prebuf == 0) {
480 
481         if (!s->timing_info_valid &&
482             !aposteriori &&
483             !force_start &&
484             !force_stop &&
485             s->context->version >= 13) {
486 
487             /* If the server supports STARTED events we take them as
488              * indications when audio really starts/stops playing, if
489              * we don't have any timing info yet -- instead of trying
490              * to be smart and guessing the server time. Otherwise the
491              * unknown transport delay adds too much noise to our time
492              * calculations. */
493 
494             return;
495         }
496 
497 #ifdef USE_SMOOTHER_2
498         pa_smoother_2_resume(s->smoother, x);
499 #else
500         pa_smoother_resume(s->smoother, x, true);
501 #endif
502     }
503 
504     /* Please note that we have no idea if playback actually started
505      * if prebuf is non-zero! */
506 }
507 
508 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata);
509 
pa_command_stream_moved(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)510 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
511     pa_context *c = userdata;
512     pa_stream *s;
513     uint32_t channel;
514     const char *dn;
515     bool suspended;
516     uint32_t di;
517     pa_usec_t usec = 0;
518     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
519 
520     pa_assert(pd);
521     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
522     pa_assert(t);
523     pa_assert(c);
524     pa_assert(PA_REFCNT_VALUE(c) >= 1);
525 
526     pa_context_ref(c);
527 
528     if (c->version < 12) {
529         pa_context_fail(c, PA_ERR_PROTOCOL);
530         goto finish;
531     }
532 
533     if (pa_tagstruct_getu32(t, &channel) < 0 ||
534         pa_tagstruct_getu32(t, &di) < 0 ||
535         pa_tagstruct_gets(t, &dn) < 0 ||
536         pa_tagstruct_get_boolean(t, &suspended) < 0) {
537         pa_context_fail(c, PA_ERR_PROTOCOL);
538         goto finish;
539     }
540 
541     if (c->version >= 13) {
542 
543         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
544             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
545                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
546                 pa_tagstruct_get_usec(t, &usec) < 0) {
547                 pa_context_fail(c, PA_ERR_PROTOCOL);
548                 goto finish;
549             }
550         } else {
551             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
552                 pa_tagstruct_getu32(t, &tlength) < 0 ||
553                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
554                 pa_tagstruct_getu32(t, &minreq) < 0 ||
555                 pa_tagstruct_get_usec(t, &usec) < 0) {
556                 pa_context_fail(c, PA_ERR_PROTOCOL);
557                 goto finish;
558             }
559         }
560     }
561 
562     if (!pa_tagstruct_eof(t)) {
563         pa_context_fail(c, PA_ERR_PROTOCOL);
564         goto finish;
565     }
566 
567     if (!dn || di == PA_INVALID_INDEX) {
568         pa_context_fail(c, PA_ERR_PROTOCOL);
569         goto finish;
570     }
571 
572     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
573         goto finish;
574 
575     if (s->state != PA_STREAM_READY)
576         goto finish;
577 
578     if (c->version >= 13) {
579         if (s->direction == PA_STREAM_RECORD)
580             s->timing_info.configured_source_usec = usec;
581         else
582             s->timing_info.configured_sink_usec = usec;
583 
584         s->buffer_attr.maxlength = maxlength;
585         s->buffer_attr.fragsize = fragsize;
586         s->buffer_attr.tlength = tlength;
587         s->buffer_attr.prebuf = prebuf;
588         s->buffer_attr.minreq = minreq;
589     }
590 
591     pa_xfree(s->device_name);
592     s->device_name = pa_xstrdup(dn);
593     s->device_index = di;
594 
595     s->suspended = suspended;
596 
597     if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
598         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
599         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
600         request_auto_timing_update(s, true);
601     }
602 
603     check_smoother_status(s, true, false, false);
604     request_auto_timing_update(s, true);
605 
606     if (s->moved_callback)
607         s->moved_callback(s, s->moved_userdata);
608 
609 finish:
610     pa_context_unref(c);
611 }
612 
pa_command_stream_buffer_attr(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)613 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
614     pa_context *c = userdata;
615     pa_stream *s;
616     uint32_t channel;
617     pa_usec_t usec = 0;
618     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
619 
620     pa_assert(pd);
621     pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
622     pa_assert(t);
623     pa_assert(c);
624     pa_assert(PA_REFCNT_VALUE(c) >= 1);
625 
626     pa_context_ref(c);
627 
628     if (c->version < 15) {
629         pa_context_fail(c, PA_ERR_PROTOCOL);
630         goto finish;
631     }
632 
633     if (pa_tagstruct_getu32(t, &channel) < 0) {
634         pa_context_fail(c, PA_ERR_PROTOCOL);
635         goto finish;
636     }
637 
638     if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
639         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
640             pa_tagstruct_getu32(t, &fragsize) < 0 ||
641             pa_tagstruct_get_usec(t, &usec) < 0) {
642             pa_context_fail(c, PA_ERR_PROTOCOL);
643             goto finish;
644         }
645     } else {
646         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
647             pa_tagstruct_getu32(t, &tlength) < 0 ||
648             pa_tagstruct_getu32(t, &prebuf) < 0 ||
649             pa_tagstruct_getu32(t, &minreq) < 0 ||
650             pa_tagstruct_get_usec(t, &usec) < 0) {
651             pa_context_fail(c, PA_ERR_PROTOCOL);
652             goto finish;
653         }
654     }
655 
656     if (!pa_tagstruct_eof(t)) {
657         pa_context_fail(c, PA_ERR_PROTOCOL);
658         goto finish;
659     }
660 
661     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
662         goto finish;
663 
664     if (s->state != PA_STREAM_READY)
665         goto finish;
666 
667     if (s->direction == PA_STREAM_RECORD)
668         s->timing_info.configured_source_usec = usec;
669     else
670         s->timing_info.configured_sink_usec = usec;
671 
672     s->buffer_attr.maxlength = maxlength;
673     s->buffer_attr.fragsize = fragsize;
674     s->buffer_attr.tlength = tlength;
675     s->buffer_attr.prebuf = prebuf;
676     s->buffer_attr.minreq = minreq;
677 
678     request_auto_timing_update(s, true);
679 
680     if (s->buffer_attr_callback)
681         s->buffer_attr_callback(s, s->buffer_attr_userdata);
682 
683 finish:
684     pa_context_unref(c);
685 }
686 
pa_command_stream_suspended(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)687 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
688     pa_context *c = userdata;
689     pa_stream *s;
690     uint32_t channel;
691     bool suspended;
692 
693     pa_assert(pd);
694     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
695     pa_assert(t);
696     pa_assert(c);
697     pa_assert(PA_REFCNT_VALUE(c) >= 1);
698 
699     pa_context_ref(c);
700 
701     if (c->version < 12) {
702         pa_context_fail(c, PA_ERR_PROTOCOL);
703         goto finish;
704     }
705 
706     if (pa_tagstruct_getu32(t, &channel) < 0 ||
707         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
708         !pa_tagstruct_eof(t)) {
709         pa_context_fail(c, PA_ERR_PROTOCOL);
710         goto finish;
711     }
712 
713     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
714         goto finish;
715 
716     if (s->state != PA_STREAM_READY)
717         goto finish;
718 
719     s->suspended = suspended;
720 
721     if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
722         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
723         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
724         request_auto_timing_update(s, true);
725     }
726 
727     check_smoother_status(s, true, false, false);
728     request_auto_timing_update(s, true);
729 
730     if (s->suspended_callback)
731         s->suspended_callback(s, s->suspended_userdata);
732 
733 finish:
734     pa_context_unref(c);
735 }
736 
pa_command_stream_started(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)737 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
738     pa_context *c = userdata;
739     pa_stream *s;
740     uint32_t channel;
741 
742     pa_assert(pd);
743     pa_assert(command == PA_COMMAND_STARTED);
744     pa_assert(t);
745     pa_assert(c);
746     pa_assert(PA_REFCNT_VALUE(c) >= 1);
747 
748     pa_context_ref(c);
749 
750     if (c->version < 13) {
751         pa_context_fail(c, PA_ERR_PROTOCOL);
752         goto finish;
753     }
754 
755     if (pa_tagstruct_getu32(t, &channel) < 0 ||
756         !pa_tagstruct_eof(t)) {
757         pa_context_fail(c, PA_ERR_PROTOCOL);
758         goto finish;
759     }
760 
761     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
762         goto finish;
763 
764     if (s->state != PA_STREAM_READY)
765         goto finish;
766 
767     check_smoother_status(s, true, true, false);
768     request_auto_timing_update(s, true);
769 
770     if (s->started_callback)
771         s->started_callback(s, s->started_userdata);
772 
773 finish:
774     pa_context_unref(c);
775 }
776 
pa_command_stream_event(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)777 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
778     pa_context *c = userdata;
779     pa_stream *s;
780     uint32_t channel;
781     pa_proplist *pl = NULL;
782     const char *event;
783 
784     pa_assert(pd);
785     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
786     pa_assert(t);
787     pa_assert(c);
788     pa_assert(PA_REFCNT_VALUE(c) >= 1);
789 
790     pa_context_ref(c);
791 
792     if (c->version < 15) {
793         pa_context_fail(c, PA_ERR_PROTOCOL);
794         goto finish;
795     }
796 
797     pl = pa_proplist_new();
798 
799     if (pa_tagstruct_getu32(t, &channel) < 0 ||
800         pa_tagstruct_gets(t, &event) < 0 ||
801         pa_tagstruct_get_proplist(t, pl) < 0 ||
802         !pa_tagstruct_eof(t) || !event) {
803         pa_context_fail(c, PA_ERR_PROTOCOL);
804         goto finish;
805     }
806 
807     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
808         goto finish;
809 
810     if (s->state != PA_STREAM_READY)
811         goto finish;
812 
813     if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) {
814         /* Let client know what the running time was when the stream had to be killed  */
815         pa_usec_t stream_time;
816         if (pa_stream_get_time(s, &stream_time) == 0)
817             pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) stream_time);
818     }
819 
820     if (s->event_callback)
821         s->event_callback(s, event, pl, s->event_userdata);
822 
823 finish:
824     pa_context_unref(c);
825 
826     if (pl)
827         pa_proplist_free(pl);
828 }
829 
pa_command_request(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)830 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
831     pa_stream *s;
832     pa_context *c = userdata;
833     uint32_t bytes, channel;
834 
835     pa_assert(pd);
836     pa_assert(command == PA_COMMAND_REQUEST);
837     pa_assert(t);
838     pa_assert(c);
839     pa_assert(PA_REFCNT_VALUE(c) >= 1);
840 
841     pa_context_ref(c);
842 
843     if (pa_tagstruct_getu32(t, &channel) < 0 ||
844         pa_tagstruct_getu32(t, &bytes) < 0 ||
845         !pa_tagstruct_eof(t)) {
846         pa_context_fail(c, PA_ERR_PROTOCOL);
847         goto finish;
848     }
849 
850     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
851         goto finish;
852 
853     if (s->state != PA_STREAM_READY)
854         goto finish;
855 
856     s->requested_bytes += bytes;
857 
858 #ifdef STREAM_DEBUG
859     pa_log_debug("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes);
860 #endif
861 
862     if (s->requested_bytes > 0 && s->write_callback)
863         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
864 
865 finish:
866     pa_context_unref(c);
867 }
868 
pa_stream_get_underflow_index(const pa_stream * p)869 int64_t pa_stream_get_underflow_index(const pa_stream *p) {
870     pa_assert(p);
871     return p->latest_underrun_at_index;
872 }
873 
pa_command_overflow_or_underflow(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)874 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
875     pa_stream *s;
876     pa_context *c = userdata;
877     uint32_t channel;
878     int64_t offset = -1;
879 
880     pa_assert(pd);
881     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW
882         || command == PA_COMMAND_UNDERFLOW_OHOS);
883     pa_assert(t);
884     pa_assert(c);
885     pa_assert(PA_REFCNT_VALUE(c) >= 1);
886 
887     pa_context_ref(c);
888 
889     if (pa_tagstruct_getu32(t, &channel) < 0) {
890         pa_context_fail(c, PA_ERR_PROTOCOL);
891         goto finish;
892     }
893 
894     if (c->version >= 23 && command == PA_COMMAND_UNDERFLOW) {
895         if (pa_tagstruct_gets64(t, &offset) < 0) {
896             pa_context_fail(c, PA_ERR_PROTOCOL);
897             goto finish;
898         }
899     }
900 
901     if (!pa_tagstruct_eof(t)) {
902         pa_context_fail(c, PA_ERR_PROTOCOL);
903         goto finish;
904     }
905 
906     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
907         goto finish;
908 
909     if (s->state != PA_STREAM_READY)
910         goto finish;
911 
912     if (command == PA_COMMAND_UNDERFLOW_OHOS) {
913         if (s->underflow_ohos_callback) {
914             s->underflow_ohos_callback(s, s->underflow_ohos_userdata);
915         }
916         goto finish;
917     }
918 
919     if (offset != -1)
920         s->latest_underrun_at_index = offset;
921 
922     if (s->buffer_attr.prebuf > 0)
923         check_smoother_status(s, true, false, true);
924 
925     request_auto_timing_update(s, true);
926 
927     if (command == PA_COMMAND_OVERFLOW) {
928         if (s->overflow_callback)
929             s->overflow_callback(s, s->overflow_userdata);
930     } else if (command == PA_COMMAND_UNDERFLOW) {
931         if (s->underflow_callback)
932             s->underflow_callback(s, s->underflow_userdata);
933     }
934 
935 finish:
936     pa_context_unref(c);
937 }
938 
invalidate_indexes(pa_stream * s,bool r,bool w)939 static void invalidate_indexes(pa_stream *s, bool r, bool w) {
940     pa_assert(s);
941     pa_assert(PA_REFCNT_VALUE(s) >= 1);
942 
943 #ifdef STREAM_DEBUG
944     pa_log_debug("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag);
945 #endif
946 
947     if (s->state != PA_STREAM_READY)
948         return;
949 
950     if (w) {
951         s->write_index_not_before = s->context->ctag;
952 
953         if (s->timing_info_valid)
954             s->timing_info.write_index_corrupt = true;
955 
956 #ifdef STREAM_DEBUG
957         pa_log_debug("write_index invalidated");
958 #endif
959     }
960 
961     if (r) {
962         s->read_index_not_before = s->context->ctag;
963 
964         if (s->timing_info_valid)
965             s->timing_info.read_index_corrupt = true;
966 
967 #ifdef STREAM_DEBUG
968         pa_log_debug("read_index invalidated");
969 #endif
970     }
971 
972     request_auto_timing_update(s, true);
973 }
974 
auto_timing_update_callback(pa_mainloop_api * m,pa_time_event * e,const struct timeval * t,void * userdata)975 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
976     pa_stream *s = userdata;
977 
978     pa_assert(s);
979     pa_assert(PA_REFCNT_VALUE(s) >= 1);
980 
981     pa_stream_ref(s);
982     request_auto_timing_update(s, false);
983     pa_stream_unref(s);
984 }
985 
create_stream_complete(pa_stream * s)986 static void create_stream_complete(pa_stream *s) {
987     pa_assert(s);
988     pa_assert(PA_REFCNT_VALUE(s) >= 1);
989     pa_assert(s->state == PA_STREAM_CREATING);
990 
991     pa_stream_set_state(s, PA_STREAM_READY);
992 
993     if (s->requested_bytes > 0 && s->write_callback)
994         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
995 
996     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
997         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
998         pa_assert(!s->auto_timing_update_event);
999         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
1000 
1001         request_auto_timing_update(s, true);
1002     }
1003 
1004     check_smoother_status(s, true, false, false);
1005 }
1006 
patch_buffer_attr(pa_stream * s,pa_buffer_attr * attr,pa_stream_flags_t * flags)1007 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
1008     const char *e;
1009 
1010     pa_assert(s);
1011     pa_assert(attr);
1012 
1013     if ((e = getenv("PULSE_LATENCY_MSEC"))) {
1014         uint32_t ms;
1015         pa_sample_spec ss;
1016 
1017         pa_sample_spec_init(&ss);
1018 
1019         if (pa_sample_spec_valid(&s->sample_spec))
1020             ss = s->sample_spec;
1021         else if (s->n_formats == 1)
1022             pa_format_info_to_sample_spec(s->req_formats[0], &ss, NULL);
1023 
1024         if (pa_atou(e, &ms) < 0 || ms <= 0)
1025             pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
1026         else if (!pa_sample_spec_valid(&s->sample_spec))
1027             pa_log_debug("Ignoring $PULSE_LATENCY_MSEC: %s (invalid sample spec)", e);
1028         else {
1029             attr->maxlength = (uint32_t) -1;
1030             attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &ss);
1031             attr->minreq = (uint32_t) -1;
1032             attr->prebuf = (uint32_t) -1;
1033             attr->fragsize = attr->tlength;
1034 
1035             if (flags)
1036                 *flags |= PA_STREAM_ADJUST_LATENCY;
1037         }
1038     }
1039 
1040     if (s->context->version >= 13)
1041         return;
1042 
1043     /* Version older than 0.9.10 didn't do server side buffer_attr
1044      * selection, hence we have to fake it on the client side. */
1045 
1046     /* We choose fairly conservative values here, to not confuse
1047      * old clients with extremely large playback buffers */
1048 
1049     if (attr->maxlength == (uint32_t) -1)
1050         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
1051 
1052     if (attr->tlength == (uint32_t) -1)
1053         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
1054 
1055     if (attr->minreq == (uint32_t) -1)
1056         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
1057 
1058     if (attr->prebuf == (uint32_t) -1)
1059         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
1060 
1061     if (attr->fragsize == (uint32_t) -1)
1062         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
1063 }
1064 
pa_create_stream_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1065 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1066     pa_stream *s = userdata;
1067     uint32_t requested_bytes = 0;
1068 
1069     pa_assert(pd);
1070     pa_assert(s);
1071     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1072     pa_assert(s->state == PA_STREAM_CREATING);
1073 
1074     pa_stream_ref(s);
1075 
1076     if (command != PA_COMMAND_REPLY) {
1077         if (pa_context_handle_error(s->context, command, t, false) < 0)
1078             goto finish;
1079 
1080         pa_stream_set_state(s, PA_STREAM_FAILED);
1081         goto finish;
1082     }
1083 
1084     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
1085         s->channel == PA_INVALID_INDEX ||
1086         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
1087         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
1088         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1089         goto finish;
1090     }
1091 
1092     s->requested_bytes = (int64_t) requested_bytes;
1093 
1094     if (s->context->version >= 9) {
1095         if (s->direction == PA_STREAM_PLAYBACK) {
1096             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1097                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1098                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1099                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1100                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1101                 goto finish;
1102             }
1103         } else if (s->direction == PA_STREAM_RECORD) {
1104             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1105                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1106                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1107                 goto finish;
1108             }
1109         }
1110     }
1111 
1112     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1113         pa_sample_spec ss;
1114         pa_channel_map cm;
1115         const char *dn = NULL;
1116         bool suspended;
1117 
1118         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1119             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1120             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1121             pa_tagstruct_gets(t, &dn) < 0 ||
1122             pa_tagstruct_get_boolean(t, &suspended) < 0) {
1123             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1124             goto finish;
1125         }
1126 
1127         if (!dn || s->device_index == PA_INVALID_INDEX ||
1128             ss.channels != cm.channels ||
1129             !pa_channel_map_valid(&cm) ||
1130             !pa_sample_spec_valid(&ss) ||
1131             (s->n_formats == 0 && (
1132                 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1133                 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1134                 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1135             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1136             goto finish;
1137         }
1138 
1139         pa_xfree(s->device_name);
1140         s->device_name = pa_xstrdup(dn);
1141         s->suspended = suspended;
1142 
1143         s->channel_map = cm;
1144         s->sample_spec = ss;
1145     }
1146 
1147 #ifdef USE_SMOOTHER_2
1148     if (s->flags & PA_STREAM_INTERPOLATE_TIMING)
1149         pa_smoother_2_set_sample_spec(s->smoother, pa_rtclock_now(), &s->sample_spec);
1150 #endif
1151 
1152     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1153         pa_usec_t usec;
1154 
1155         if (pa_tagstruct_get_usec(t, &usec) < 0) {
1156             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1157             goto finish;
1158         }
1159 
1160         if (s->direction == PA_STREAM_RECORD)
1161             s->timing_info.configured_source_usec = usec;
1162         else
1163             s->timing_info.configured_sink_usec = usec;
1164     }
1165 
1166     if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1167         || s->context->version >= 22) {
1168 
1169         pa_format_info *f = pa_format_info_new();
1170 
1171         if (pa_tagstruct_get_format_info(t, f) < 0 || !pa_format_info_valid(f)) {
1172             pa_format_info_free(f);
1173             if (s->n_formats > 0) {
1174                 /* We used the extended API, so we should have got back a proper format */
1175                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1176                 goto finish;
1177             }
1178         } else
1179             s->format = f;
1180     }
1181 
1182     if (!pa_tagstruct_eof(t)) {
1183         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1184         goto finish;
1185     }
1186 
1187     if (s->direction == PA_STREAM_RECORD) {
1188         pa_assert(!s->record_memblockq);
1189 
1190         s->record_memblockq = pa_memblockq_new(
1191                 "client side record memblockq",
1192                 0,
1193                 s->buffer_attr.maxlength,
1194                 0,
1195                 &s->sample_spec,
1196                 1,
1197                 0,
1198                 0,
1199                 NULL);
1200     }
1201 
1202     s->channel_valid = true;
1203     pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1204 
1205     create_stream_complete(s);
1206 
1207 finish:
1208     pa_stream_unref(s);
1209 }
1210 
create_stream(pa_stream_direction_t direction,pa_stream * s,const char * dev,const pa_buffer_attr * attr,pa_stream_flags_t flags,const pa_cvolume * volume,pa_stream * sync_stream)1211 static int create_stream(
1212         pa_stream_direction_t direction,
1213         pa_stream *s,
1214         const char *dev,
1215         const pa_buffer_attr *attr,
1216         pa_stream_flags_t flags,
1217         const pa_cvolume *volume,
1218         pa_stream *sync_stream) {
1219 
1220     pa_tagstruct *t;
1221     uint32_t tag;
1222     bool volume_set = !!volume;
1223     pa_cvolume cv;
1224     uint32_t i;
1225 
1226     pa_assert(s);
1227     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1228     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1229 
1230     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1231     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1232     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1233     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1234                                               PA_STREAM_INTERPOLATE_TIMING|
1235                                               PA_STREAM_NOT_MONOTONIC|
1236                                               PA_STREAM_AUTO_TIMING_UPDATE|
1237                                               PA_STREAM_NO_REMAP_CHANNELS|
1238                                               PA_STREAM_NO_REMIX_CHANNELS|
1239                                               PA_STREAM_FIX_FORMAT|
1240                                               PA_STREAM_FIX_RATE|
1241                                               PA_STREAM_FIX_CHANNELS|
1242                                               PA_STREAM_DONT_MOVE|
1243                                               PA_STREAM_VARIABLE_RATE|
1244                                               PA_STREAM_PEAK_DETECT|
1245                                               PA_STREAM_START_MUTED|
1246                                               PA_STREAM_ADJUST_LATENCY|
1247                                               PA_STREAM_EARLY_REQUESTS|
1248                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1249                                               PA_STREAM_START_UNMUTED|
1250                                               PA_STREAM_FAIL_ON_SUSPEND|
1251                                               PA_STREAM_RELATIVE_VOLUME|
1252                                               PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1253 
1254     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1255     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1256     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1257     /* Although some of the other flags are not supported on older
1258      * version, we don't check for them here, because it doesn't hurt
1259      * when they are passed but actually not supported. This makes
1260      * client development easier */
1261 
1262     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1263     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1264     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1265 
1266     pa_stream_ref(s);
1267 
1268     s->direction = direction;
1269 
1270     if (sync_stream)
1271         s->syncid = sync_stream->syncid;
1272 
1273     if (attr)
1274         s->buffer_attr = *attr;
1275     patch_buffer_attr(s, &s->buffer_attr, &flags);
1276 
1277     s->flags = flags;
1278     s->corked = !!(flags & PA_STREAM_START_CORKED);
1279 
1280     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1281         pa_usec_t x;
1282 
1283         x = pa_rtclock_now();
1284 
1285         pa_assert(!s->smoother);
1286 #ifdef USE_SMOOTHER_2
1287         s->smoother = pa_smoother_2_new(SMOOTHER_HISTORY_TIME, x, 0, 0);
1288 #else
1289         s->smoother = pa_smoother_new(
1290                 SMOOTHER_ADJUST_TIME,
1291                 SMOOTHER_HISTORY_TIME,
1292                 !(flags & PA_STREAM_NOT_MONOTONIC),
1293                 true,
1294                 SMOOTHER_MIN_HISTORY,
1295                 x,
1296                 true);
1297 #endif
1298     }
1299 
1300     if (!dev)
1301         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1302 
1303     t = pa_tagstruct_command(
1304             s->context,
1305             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1306             &tag);
1307 
1308     if (s->context->version < 13)
1309         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1310 
1311     pa_tagstruct_put(
1312             t,
1313             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1314             PA_TAG_CHANNEL_MAP, &s->channel_map,
1315             PA_TAG_U32, PA_INVALID_INDEX,
1316             PA_TAG_STRING, dev,
1317             PA_TAG_U32, s->buffer_attr.maxlength,
1318             PA_TAG_BOOLEAN, s->corked,
1319             PA_TAG_INVALID);
1320 
1321     if (!volume) {
1322         if (pa_sample_spec_valid(&s->sample_spec))
1323             volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1324         else {
1325             /* This is not really relevant, since no volume was set, and
1326              * the real number of channels is embedded in the format_info
1327              * structure */
1328             volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1329         }
1330     }
1331 
1332     if (s->direction == PA_STREAM_PLAYBACK) {
1333         pa_tagstruct_put(
1334                 t,
1335                 PA_TAG_U32, s->buffer_attr.tlength,
1336                 PA_TAG_U32, s->buffer_attr.prebuf,
1337                 PA_TAG_U32, s->buffer_attr.minreq,
1338                 PA_TAG_U32, s->syncid,
1339                 PA_TAG_INVALID);
1340 
1341         pa_tagstruct_put_cvolume(t, volume);
1342     } else
1343         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1344 
1345     if (s->context->version >= 12) {
1346         pa_tagstruct_put(
1347                 t,
1348                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1349                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1350                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1351                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1352                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1353                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1354                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1355                 PA_TAG_INVALID);
1356     }
1357 
1358     if (s->context->version >= 13) {
1359 
1360         if (s->direction == PA_STREAM_PLAYBACK)
1361             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1362         else
1363             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1364 
1365         pa_tagstruct_put(
1366                 t,
1367                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1368                 PA_TAG_PROPLIST, s->proplist,
1369                 PA_TAG_INVALID);
1370 
1371         if (s->direction == PA_STREAM_RECORD)
1372             pa_tagstruct_putu32(t, s->direct_on_input);
1373     }
1374 
1375     if (s->context->version >= 14) {
1376 
1377         if (s->direction == PA_STREAM_PLAYBACK)
1378             pa_tagstruct_put_boolean(t, volume_set);
1379 
1380         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1381     }
1382 
1383     if (s->context->version >= 15) {
1384 
1385         if (s->direction == PA_STREAM_PLAYBACK)
1386             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1387 
1388         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1389         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1390     }
1391 
1392     if (s->context->version >= 17 && s->direction == PA_STREAM_PLAYBACK)
1393         pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1394 
1395     if (s->context->version >= 18 && s->direction == PA_STREAM_PLAYBACK)
1396         pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1397 
1398     if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1399         || s->context->version >= 22) {
1400 
1401         pa_tagstruct_putu8(t, s->n_formats);
1402         for (i = 0; i < s->n_formats; i++)
1403             pa_tagstruct_put_format_info(t, s->req_formats[i]);
1404     }
1405 
1406     if (s->context->version >= 22 && s->direction == PA_STREAM_RECORD) {
1407         pa_tagstruct_put_cvolume(t, volume);
1408         pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1409         pa_tagstruct_put_boolean(t, volume_set);
1410         pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1411         pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1412         pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1413     }
1414 
1415     pa_pstream_send_tagstruct(s->context->pstream, t);
1416     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1417 
1418     pa_stream_set_state(s, PA_STREAM_CREATING);
1419 
1420     pa_stream_unref(s);
1421     return 0;
1422 }
1423 
pa_stream_connect_playback(pa_stream * s,const char * dev,const pa_buffer_attr * attr,pa_stream_flags_t flags,const pa_cvolume * volume,pa_stream * sync_stream)1424 int pa_stream_connect_playback(
1425         pa_stream *s,
1426         const char *dev,
1427         const pa_buffer_attr *attr,
1428         pa_stream_flags_t flags,
1429         const pa_cvolume *volume,
1430         pa_stream *sync_stream) {
1431 
1432     pa_assert(s);
1433     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1434 
1435     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1436 }
1437 
pa_stream_connect_record(pa_stream * s,const char * dev,const pa_buffer_attr * attr,pa_stream_flags_t flags)1438 int pa_stream_connect_record(
1439         pa_stream *s,
1440         const char *dev,
1441         const pa_buffer_attr *attr,
1442         pa_stream_flags_t flags) {
1443 
1444     pa_assert(s);
1445     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1446 
1447     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1448 }
1449 
pa_stream_begin_write(pa_stream * s,void ** data,size_t * nbytes)1450 int pa_stream_begin_write(
1451         pa_stream *s,
1452         void **data,
1453         size_t *nbytes) {
1454 
1455     pa_assert(s);
1456     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1457 
1458     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1459     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1460     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1461     PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1462     PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1463 
1464     if (*nbytes != (size_t) -1) {
1465         size_t m, fs;
1466 
1467         m = pa_mempool_block_size_max(s->context->mempool);
1468         fs = pa_frame_size(&s->sample_spec);
1469 
1470         m = (m / fs) * fs;
1471         if (*nbytes > m)
1472             *nbytes = m;
1473     }
1474 
1475     if (!s->write_memblock) {
1476         s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1477         s->write_data = pa_memblock_acquire(s->write_memblock);
1478     }
1479 
1480     *data = s->write_data;
1481     *nbytes = pa_memblock_get_length(s->write_memblock);
1482 
1483     return 0;
1484 }
1485 
pa_stream_cancel_write(pa_stream * s)1486 int pa_stream_cancel_write(
1487         pa_stream *s) {
1488 
1489     pa_assert(s);
1490     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1491 
1492     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1493     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1494     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1495     PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1496 
1497     pa_assert(s->write_data);
1498 
1499     pa_memblock_release(s->write_memblock);
1500     pa_memblock_unref(s->write_memblock);
1501     s->write_memblock = NULL;
1502     s->write_data = NULL;
1503 
1504     return 0;
1505 }
1506 
pa_stream_write_ext_free(pa_stream * s,const void * data,size_t length,pa_free_cb_t free_cb,void * free_cb_data,int64_t offset,pa_seek_mode_t seek)1507 int pa_stream_write_ext_free(
1508         pa_stream *s,
1509         const void *data,
1510         size_t length,
1511         pa_free_cb_t free_cb,
1512         void *free_cb_data,
1513         int64_t offset,
1514         pa_seek_mode_t seek) {
1515 
1516     pa_assert(s);
1517     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1518     pa_assert(data);
1519 
1520     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1521     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1522     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1523     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1524     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1525     PA_CHECK_VALIDITY(s->context,
1526                       !s->write_memblock ||
1527                       ((data >= s->write_data) &&
1528                        ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1529                       PA_ERR_INVALID);
1530     PA_CHECK_VALIDITY(s->context, offset % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID);
1531     PA_CHECK_VALIDITY(s->context, length % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID);
1532     PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1533 
1534     if (s->write_memblock) {
1535         pa_memchunk chunk;
1536 
1537         /* pa_stream_write_begin() was called before */
1538 
1539         pa_memblock_release(s->write_memblock);
1540 
1541         chunk.memblock = s->write_memblock;
1542         chunk.index = (const char *) data - (const char *) s->write_data;
1543         chunk.length = length;
1544 
1545         s->write_memblock = NULL;
1546         s->write_data = NULL;
1547 
1548         pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1549         pa_memblock_unref(chunk.memblock);
1550 
1551     } else {
1552         pa_seek_mode_t t_seek = seek;
1553         int64_t t_offset = offset;
1554         size_t t_length = length;
1555         const void *t_data = data;
1556 
1557         /* pa_stream_write_begin() was not called before */
1558 
1559         while (t_length > 0) {
1560             pa_memchunk chunk;
1561 
1562             chunk.index = 0;
1563 
1564             if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1565                 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, free_cb_data, 1);
1566                 chunk.length = t_length;
1567             } else {
1568                 void *d;
1569                 size_t blk_size_max;
1570 
1571                 /* Break large audio streams into _aligned_ blocks or the
1572                  * other endpoint will happily discard them upon arrival. */
1573                 blk_size_max = pa_frame_align(pa_mempool_block_size_max(s->context->mempool), &s->sample_spec);
1574                 chunk.length = PA_MIN(t_length, blk_size_max);
1575                 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1576 
1577                 d = pa_memblock_acquire(chunk.memblock);
1578                 memcpy(d, t_data, chunk.length);
1579                 pa_memblock_release(chunk.memblock);
1580             }
1581 
1582             pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1583 
1584             t_offset = 0;
1585             t_seek = PA_SEEK_RELATIVE;
1586 
1587             t_data = (const uint8_t*) t_data + chunk.length;
1588             t_length -= chunk.length;
1589 
1590             pa_memblock_unref(chunk.memblock);
1591         }
1592 
1593         if (free_cb && pa_pstream_get_shm(s->context->pstream))
1594             free_cb(free_cb_data);
1595     }
1596 
1597     /* This is obviously wrong since we ignore the seeking index . But
1598      * that's OK, the server side applies the same error */
1599     s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1600 
1601 #ifdef STREAM_DEBUG
1602     pa_log_debug("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes);
1603 #endif
1604 
1605     if (s->direction == PA_STREAM_PLAYBACK) {
1606 
1607         /* Update latency request correction */
1608         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1609 
1610             if (seek == PA_SEEK_ABSOLUTE) {
1611                 s->write_index_corrections[s->current_write_index_correction].corrupt = false;
1612                 s->write_index_corrections[s->current_write_index_correction].absolute = true;
1613                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1614             } else if (seek == PA_SEEK_RELATIVE) {
1615                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1616                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1617             } else
1618                 s->write_index_corrections[s->current_write_index_correction].corrupt = true;
1619         }
1620 
1621         /* Update the write index in the already available latency data */
1622         if (s->timing_info_valid) {
1623 
1624             if (seek == PA_SEEK_ABSOLUTE) {
1625                 s->timing_info.write_index_corrupt = false;
1626                 s->timing_info.write_index = offset + (int64_t) length;
1627             } else if (seek == PA_SEEK_RELATIVE) {
1628                 if (!s->timing_info.write_index_corrupt)
1629                     s->timing_info.write_index += offset + (int64_t) length;
1630             } else
1631                 s->timing_info.write_index_corrupt = true;
1632         }
1633 
1634         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1635             request_auto_timing_update(s, true);
1636     }
1637 
1638     return 0;
1639 }
1640 
pa_stream_write(pa_stream * s,const void * data,size_t length,pa_free_cb_t free_cb,int64_t offset,pa_seek_mode_t seek)1641 int pa_stream_write(
1642         pa_stream *s,
1643         const void *data,
1644         size_t length,
1645         pa_free_cb_t free_cb,
1646         int64_t offset,
1647         pa_seek_mode_t seek) {
1648 
1649     return pa_stream_write_ext_free(s, data, length, free_cb, (void*) data, offset, seek);
1650 }
1651 
pa_stream_peek(pa_stream * s,const void ** data,size_t * length)1652 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1653     pa_assert(s);
1654     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1655     pa_assert(data);
1656     pa_assert(length);
1657 
1658     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1659     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1660     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1661 
1662     if (!s->peek_memchunk.memblock) {
1663 
1664         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1665             /* record_memblockq is empty. */
1666             *data = NULL;
1667             *length = 0;
1668             return 0;
1669 
1670         } else if (!s->peek_memchunk.memblock) {
1671             /* record_memblockq isn't empty, but it doesn't have any data at
1672              * the current read index. */
1673             *data = NULL;
1674             *length = s->peek_memchunk.length;
1675             return 0;
1676         }
1677 
1678         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1679     }
1680 
1681     pa_assert(s->peek_data);
1682     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1683     *length = s->peek_memchunk.length;
1684     return 0;
1685 }
1686 
pa_stream_drop(pa_stream * s)1687 int pa_stream_drop(pa_stream *s) {
1688     pa_assert(s);
1689     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1690 
1691     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1692     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1693     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1694     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.length > 0, PA_ERR_BADSTATE);
1695 
1696     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1697 
1698     /* Fix the simulated local read index */
1699     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1700         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1701 
1702     if (s->peek_memchunk.memblock) {
1703         pa_assert(s->peek_data);
1704         s->peek_data = NULL;
1705         pa_memblock_release(s->peek_memchunk.memblock);
1706         pa_memblock_unref(s->peek_memchunk.memblock);
1707     }
1708 
1709     pa_memchunk_reset(&s->peek_memchunk);
1710 
1711     return 0;
1712 }
1713 
pa_stream_writable_size(const pa_stream * s)1714 size_t pa_stream_writable_size(const pa_stream *s) {
1715     pa_assert(s);
1716     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1717 
1718     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1719     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1720     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1721 
1722     return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1723 }
1724 
pa_stream_readable_size(const pa_stream * s)1725 size_t pa_stream_readable_size(const pa_stream *s) {
1726     pa_assert(s);
1727     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1728 
1729     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1730     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1731     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1732 
1733     return pa_memblockq_get_length(s->record_memblockq);
1734 }
1735 
pa_stream_drain(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)1736 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1737     pa_operation *o;
1738     pa_tagstruct *t;
1739     uint32_t tag;
1740 
1741     pa_assert(s);
1742     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1743 
1744     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1745     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1746     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1747 
1748     /* Ask for a timing update before we cork/uncork to get the best
1749      * accuracy for the transport latency suitable for the
1750      * check_smoother_status() call in the started callback */
1751     request_auto_timing_update(s, true);
1752 
1753     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1754 
1755     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1756     pa_tagstruct_putu32(t, s->channel);
1757     pa_pstream_send_tagstruct(s->context->pstream, t);
1758     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1759 
1760     /* This might cause the read index to continue again, hence
1761      * let's request a timing update */
1762     request_auto_timing_update(s, true);
1763 
1764     return o;
1765 }
1766 
calc_time(const pa_stream * s,bool ignore_transport)1767 static pa_usec_t calc_time(const pa_stream *s, bool ignore_transport) {
1768     pa_usec_t usec;
1769 
1770     pa_assert(s);
1771     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1772     pa_assert(s->state == PA_STREAM_READY);
1773     pa_assert(s->direction != PA_STREAM_UPLOAD);
1774     pa_assert(s->timing_info_valid);
1775     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1776     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1777 
1778     if (s->direction == PA_STREAM_PLAYBACK) {
1779         /* The last byte that was written into the output device
1780          * had this time value associated */
1781         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1782 
1783         if (!s->corked && !s->suspended) {
1784 
1785             if (!ignore_transport)
1786                 /* Because the latency info took a little time to come
1787                  * to us, we assume that the real output time is actually
1788                  * a little ahead */
1789                 usec += s->timing_info.transport_usec;
1790 
1791             /* However, the output device usually maintains a buffer
1792                too, hence the real sample currently played is a little
1793                back  */
1794             if (s->timing_info.sink_usec >= usec)
1795                 usec = 0;
1796             else
1797                 usec -= s->timing_info.sink_usec;
1798         }
1799 
1800     } else {
1801         pa_assert(s->direction == PA_STREAM_RECORD);
1802 
1803         /* The last byte written into the server side queue had
1804          * this time value associated */
1805         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1806 
1807         if (!s->corked && !s->suspended) {
1808 
1809             if (!ignore_transport)
1810                 /* Add transport latency */
1811                 usec += s->timing_info.transport_usec;
1812 
1813             /* Add latency of data in device buffer */
1814             usec += s->timing_info.source_usec;
1815 
1816             /* If this is a monitor source, we need to correct the
1817              * time by the playback device buffer */
1818             if (s->timing_info.sink_usec >= usec)
1819                 usec = 0;
1820             else
1821                 usec -= s->timing_info.sink_usec;
1822         }
1823     }
1824 
1825     return usec;
1826 }
1827 
1828 #ifdef USE_SMOOTHER_2
calc_bytes(pa_stream * s,bool ignore_transport)1829 static inline uint64_t calc_bytes(pa_stream *s, bool ignore_transport) {
1830     return (uint64_t)(calc_time(s, ignore_transport) * s->sample_spec.rate / PA_USEC_PER_SEC * pa_frame_size(&s->sample_spec));
1831 }
1832 #endif
1833 
stream_get_timing_info_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1834 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1835     pa_operation *o = userdata;
1836     struct timeval local, remote, now;
1837     pa_timing_info *i;
1838     bool playing = false;
1839     uint64_t underrun_for = 0, playing_for = 0;
1840 
1841     pa_assert(pd);
1842     pa_assert(o);
1843     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1844 
1845     if (!o->context || !o->stream)
1846         goto finish;
1847 
1848     i = &o->stream->timing_info;
1849 
1850     o->stream->timing_info_valid = false;
1851     i->write_index_corrupt = true;
1852     i->read_index_corrupt = true;
1853 
1854     if (command != PA_COMMAND_REPLY) {
1855         if (pa_context_handle_error(o->context, command, t, false) < 0)
1856             goto finish;
1857 
1858     } else {
1859 
1860         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1861             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1862             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1863             pa_tagstruct_get_timeval(t, &local) < 0 ||
1864             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1865             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1866             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1867 
1868             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1869             goto finish;
1870         }
1871 
1872         if (o->context->version >= 13 &&
1873             o->stream->direction == PA_STREAM_PLAYBACK)
1874             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1875                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1876 
1877                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1878                 goto finish;
1879             }
1880 
1881         if (!pa_tagstruct_eof(t)) {
1882             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1883             goto finish;
1884         }
1885         o->stream->timing_info_valid = true;
1886         i->write_index_corrupt = false;
1887         i->read_index_corrupt = false;
1888 
1889         i->playing = (int) playing;
1890         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1891 
1892         pa_gettimeofday(&now);
1893 
1894         /* Calculate timestamps */
1895         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1896             /* local and remote seem to have synchronized clocks */
1897 
1898             if (o->stream->direction == PA_STREAM_PLAYBACK)
1899                 i->transport_usec = pa_timeval_diff(&remote, &local);
1900             else
1901                 i->transport_usec = pa_timeval_diff(&now, &remote);
1902 
1903             i->synchronized_clocks = true;
1904             i->timestamp = remote;
1905         } else {
1906             /* clocks are not synchronized, let's estimate latency then */
1907             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1908             i->synchronized_clocks = false;
1909             i->timestamp = local;
1910             pa_timeval_add(&i->timestamp, i->transport_usec);
1911         }
1912 
1913         /* Invalidate read and write indexes if necessary */
1914         if (tag < o->stream->read_index_not_before)
1915             i->read_index_corrupt = true;
1916 
1917         if (tag < o->stream->write_index_not_before)
1918             i->write_index_corrupt = true;
1919 
1920         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1921             /* Write index correction */
1922 
1923             int n, j;
1924             uint32_t ctag = tag;
1925 
1926             /* Go through the saved correction values and add up the
1927              * total correction.*/
1928             for (n = 0, j = o->stream->current_write_index_correction+1;
1929                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1930                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1931 
1932                 /* Step over invalid data or out-of-date data */
1933                 if (!o->stream->write_index_corrections[j].valid ||
1934                     o->stream->write_index_corrections[j].tag < ctag)
1935                     continue;
1936 
1937                 /* Make sure that everything is in order */
1938                 ctag = o->stream->write_index_corrections[j].tag+1;
1939 
1940                 /* Now fix the write index */
1941                 if (o->stream->write_index_corrections[j].corrupt) {
1942                     /* A corrupting seek was made */
1943                     i->write_index_corrupt = true;
1944                 } else if (o->stream->write_index_corrections[j].absolute) {
1945                     /* An absolute seek was made */
1946                     i->write_index = o->stream->write_index_corrections[j].value;
1947                     i->write_index_corrupt = false;
1948                 } else if (!i->write_index_corrupt) {
1949                     /* A relative seek was made */
1950                     i->write_index += o->stream->write_index_corrections[j].value;
1951                 }
1952             }
1953 
1954             /* Clear old correction entries */
1955             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1956                 if (!o->stream->write_index_corrections[n].valid)
1957                     continue;
1958 
1959                 if (o->stream->write_index_corrections[n].tag <= tag)
1960                     o->stream->write_index_corrections[n].valid = false;
1961             }
1962         }
1963 
1964         if (o->stream->direction == PA_STREAM_RECORD) {
1965             /* Read index correction */
1966 
1967             if (!i->read_index_corrupt)
1968                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1969         }
1970 
1971         /* Update smoother if we're not corked */
1972         if (o->stream->smoother && !o->stream->corked) {
1973             pa_usec_t u, x;
1974 
1975             u = x = pa_rtclock_now() - i->transport_usec;
1976 
1977             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1978                 pa_usec_t su;
1979 
1980                 /* If we weren't playing then it will take some time
1981                  * until the audio will actually come out through the
1982                  * speakers. Since we follow that timing here, we need
1983                  * to try to fix this up */
1984 
1985                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1986 
1987                 if (su < i->sink_usec)
1988                     x += i->sink_usec - su;
1989             }
1990 
1991             if (!i->playing)
1992 #ifdef USE_SMOOTHER_2
1993                 pa_smoother_2_pause(o->stream->smoother, x);
1994 #else
1995                 pa_smoother_pause(o->stream->smoother, x);
1996 #endif
1997 
1998             /* Update the smoother */
1999             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
2000                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
2001 #ifdef USE_SMOOTHER_2
2002                 pa_smoother_2_put(o->stream->smoother, u, calc_bytes(o->stream, true));
2003 #else
2004                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, true));
2005 #endif
2006 
2007             if (i->playing)
2008 #ifdef USE_SMOOTHER_2
2009                 pa_smoother_2_resume(o->stream->smoother, x);
2010 #else
2011                 pa_smoother_resume(o->stream->smoother, x, true);
2012 #endif
2013         }
2014     }
2015 
2016     o->stream->auto_timing_update_requested = false;
2017 
2018     if (o->stream->latency_update_callback)
2019         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
2020 
2021     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
2022         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2023         cb(o->stream, o->stream->timing_info_valid, o->userdata);
2024     }
2025 
2026 finish:
2027 
2028     pa_operation_done(o);
2029     pa_operation_unref(o);
2030 }
2031 
pa_stream_update_timing_info(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)2032 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2033     uint32_t tag;
2034     pa_operation *o;
2035     pa_tagstruct *t;
2036     struct timeval now;
2037     int cidx = 0;
2038 
2039     pa_assert(s);
2040     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2041 
2042     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2043     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2044     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2045 
2046     if (s->direction == PA_STREAM_PLAYBACK) {
2047         /* Find a place to store the write_index correction data for this entry */
2048         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
2049 
2050         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
2051         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
2052     }
2053     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2054 
2055     t = pa_tagstruct_command(
2056             s->context,
2057             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
2058             &tag);
2059     pa_tagstruct_putu32(t, s->channel);
2060     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
2061 
2062     pa_pstream_send_tagstruct(s->context->pstream, t);
2063     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2064 
2065     if (s->direction == PA_STREAM_PLAYBACK) {
2066         /* Fill in initial correction data */
2067 
2068         s->current_write_index_correction = cidx;
2069 
2070         s->write_index_corrections[cidx].valid = true;
2071         s->write_index_corrections[cidx].absolute = false;
2072         s->write_index_corrections[cidx].corrupt = false;
2073         s->write_index_corrections[cidx].tag = tag;
2074         s->write_index_corrections[cidx].value = 0;
2075     }
2076 
2077     return o;
2078 }
2079 
pa_stream_disconnect_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)2080 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2081     pa_stream *s = userdata;
2082 
2083     pa_assert(pd);
2084     pa_assert(s);
2085     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2086 
2087     pa_stream_ref(s);
2088 
2089     if (command != PA_COMMAND_REPLY) {
2090         if (pa_context_handle_error(s->context, command, t, false) < 0)
2091             goto finish;
2092 
2093         pa_stream_set_state(s, PA_STREAM_FAILED);
2094         goto finish;
2095     } else if (!pa_tagstruct_eof(t)) {
2096         pa_context_fail(s->context, PA_ERR_PROTOCOL);
2097         goto finish;
2098     }
2099 
2100     pa_stream_set_state(s, PA_STREAM_TERMINATED);
2101 
2102 finish:
2103     pa_stream_unref(s);
2104 }
2105 
pa_stream_disconnect(pa_stream * s)2106 int pa_stream_disconnect(pa_stream *s) {
2107     pa_tagstruct *t;
2108     uint32_t tag;
2109 
2110     pa_assert(s);
2111     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2112 
2113     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2114     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
2115     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
2116 
2117     pa_stream_ref(s);
2118 
2119     t = pa_tagstruct_command(
2120             s->context,
2121             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
2122                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
2123             &tag);
2124     pa_tagstruct_putu32(t, s->channel);
2125     pa_pstream_send_tagstruct(s->context->pstream, t);
2126     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
2127 
2128     pa_stream_unref(s);
2129     return 0;
2130 }
2131 
pa_stream_set_read_callback(pa_stream * s,pa_stream_request_cb_t cb,void * userdata)2132 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2133     pa_assert(s);
2134     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2135 
2136     if (pa_detect_fork())
2137         return;
2138 
2139     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2140         return;
2141 
2142     s->read_callback = cb;
2143     s->read_userdata = userdata;
2144 }
2145 
pa_stream_set_write_callback(pa_stream * s,pa_stream_request_cb_t cb,void * userdata)2146 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2147     pa_assert(s);
2148     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2149 
2150     if (pa_detect_fork())
2151         return;
2152 
2153     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2154         return;
2155 
2156     s->write_callback = cb;
2157     s->write_userdata = userdata;
2158 }
2159 
pa_stream_set_state_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2160 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2161     pa_assert(s);
2162     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2163 
2164     if (pa_detect_fork())
2165         return;
2166 
2167     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2168         return;
2169 
2170     s->state_callback = cb;
2171     s->state_userdata = userdata;
2172 }
2173 
pa_stream_set_overflow_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2174 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2175     pa_assert(s);
2176     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2177 
2178     if (pa_detect_fork())
2179         return;
2180 
2181     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2182         return;
2183 
2184     s->overflow_callback = cb;
2185     s->overflow_userdata = userdata;
2186 }
2187 
pa_stream_set_underflow_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2188 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2189     pa_assert(s);
2190     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2191 
2192     if (pa_detect_fork())
2193         return;
2194 
2195     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2196         return;
2197 
2198     s->underflow_callback = cb;
2199     s->underflow_userdata = userdata;
2200 }
2201 
pa_stream_set_underflow_ohos_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2202 void pa_stream_set_underflow_ohos_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2203     pa_assert(s);
2204     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2205 
2206     if (pa_detect_fork())
2207         return;
2208 
2209     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2210         return;
2211 
2212     s->underflow_ohos_callback = cb;
2213     s->underflow_ohos_userdata = userdata;
2214 }
2215 
pa_stream_set_latency_update_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2216 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2217     pa_assert(s);
2218     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2219 
2220     if (pa_detect_fork())
2221         return;
2222 
2223     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2224         return;
2225 
2226     s->latency_update_callback = cb;
2227     s->latency_update_userdata = userdata;
2228 }
2229 
pa_stream_set_moved_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2230 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2231     pa_assert(s);
2232     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2233 
2234     if (pa_detect_fork())
2235         return;
2236 
2237     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2238         return;
2239 
2240     s->moved_callback = cb;
2241     s->moved_userdata = userdata;
2242 }
2243 
pa_stream_set_suspended_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2244 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2245     pa_assert(s);
2246     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2247 
2248     if (pa_detect_fork())
2249         return;
2250 
2251     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2252         return;
2253 
2254     s->suspended_callback = cb;
2255     s->suspended_userdata = userdata;
2256 }
2257 
pa_stream_set_started_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2258 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2259     pa_assert(s);
2260     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2261 
2262     if (pa_detect_fork())
2263         return;
2264 
2265     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2266         return;
2267 
2268     s->started_callback = cb;
2269     s->started_userdata = userdata;
2270 }
2271 
pa_stream_set_event_callback(pa_stream * s,pa_stream_event_cb_t cb,void * userdata)2272 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2273     pa_assert(s);
2274     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2275 
2276     if (pa_detect_fork())
2277         return;
2278 
2279     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2280         return;
2281 
2282     s->event_callback = cb;
2283     s->event_userdata = userdata;
2284 }
2285 
pa_stream_set_buffer_attr_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2286 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2287     pa_assert(s);
2288     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2289 
2290     if (pa_detect_fork())
2291         return;
2292 
2293     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2294         return;
2295 
2296     s->buffer_attr_callback = cb;
2297     s->buffer_attr_userdata = userdata;
2298 }
2299 
pa_stream_simple_ack_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)2300 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2301     pa_operation *o = userdata;
2302     int success = 1;
2303 
2304     pa_assert(pd);
2305     pa_assert(o);
2306     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2307 
2308     if (!o->context)
2309         goto finish;
2310 
2311     if (command != PA_COMMAND_REPLY) {
2312         if (pa_context_handle_error(o->context, command, t, false) < 0)
2313             goto finish;
2314 
2315         success = 0;
2316     } else if (!pa_tagstruct_eof(t)) {
2317         pa_context_fail(o->context, PA_ERR_PROTOCOL);
2318         goto finish;
2319     }
2320 
2321     if (o->callback) {
2322         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2323         cb(o->stream, success, o->userdata);
2324     }
2325 
2326 finish:
2327     pa_operation_done(o);
2328     pa_operation_unref(o);
2329 }
2330 
pa_stream_cork(pa_stream * s,int b,pa_stream_success_cb_t cb,void * userdata)2331 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2332     pa_operation *o;
2333     pa_tagstruct *t;
2334     uint32_t tag;
2335 
2336     pa_assert(s);
2337     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2338 
2339     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2340     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2341     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2342 
2343     /* Ask for a timing update before we cork/uncork to get the best
2344      * accuracy for the transport latency suitable for the
2345      * check_smoother_status() call in the started callback */
2346     request_auto_timing_update(s, true);
2347 
2348     s->corked = b;
2349 
2350     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2351 
2352     t = pa_tagstruct_command(
2353             s->context,
2354             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2355             &tag);
2356     pa_tagstruct_putu32(t, s->channel);
2357     pa_tagstruct_put_boolean(t, !!b);
2358     pa_pstream_send_tagstruct(s->context->pstream, t);
2359     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2360 
2361     check_smoother_status(s, false, false, false);
2362 
2363     /* This might cause the indexes to hang/start again, hence let's
2364      * request a timing update, after the cork/uncork, too */
2365     request_auto_timing_update(s, true);
2366 
2367     return o;
2368 }
2369 
stream_send_simple_command(pa_stream * s,uint32_t command,pa_stream_success_cb_t cb,void * userdata)2370 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2371     pa_tagstruct *t;
2372     pa_operation *o;
2373     uint32_t tag;
2374 
2375     pa_assert(s);
2376     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2377 
2378     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2379     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2380 
2381     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2382 
2383     t = pa_tagstruct_command(s->context, command, &tag);
2384     pa_tagstruct_putu32(t, s->channel);
2385     pa_pstream_send_tagstruct(s->context->pstream, t);
2386     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2387 
2388     return o;
2389 }
2390 
pa_stream_flush(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)2391 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2392     pa_operation *o;
2393 
2394     pa_assert(s);
2395     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2396 
2397     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2398     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2399     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2400 
2401     /* Ask for a timing update *before* the flush, so that the
2402      * transport usec is as up to date as possible when we get the
2403      * underflow message and update the smoother status*/
2404     request_auto_timing_update(s, true);
2405 
2406     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2407         return NULL;
2408 
2409     if (s->direction == PA_STREAM_PLAYBACK) {
2410 
2411         if (s->write_index_corrections[s->current_write_index_correction].valid)
2412             s->write_index_corrections[s->current_write_index_correction].corrupt = true;
2413 
2414         if (s->buffer_attr.prebuf > 0)
2415             check_smoother_status(s, false, false, true);
2416 
2417         /* This will change the write index, but leave the
2418          * read index untouched. */
2419         invalidate_indexes(s, false, true);
2420 
2421     } else
2422         /* For record streams this has no influence on the write
2423          * index, but the read index might jump. */
2424         invalidate_indexes(s, true, false);
2425 
2426     /* Note that we do not update requested_bytes here. This is
2427      * because we cannot really know how data actually was dropped
2428      * from the write index due to this. This 'error' will be applied
2429      * by both client and server and hence we should be fine. */
2430 
2431     return o;
2432 }
2433 
pa_stream_prebuf(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)2434 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2435     pa_operation *o;
2436 
2437     pa_assert(s);
2438     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2439 
2440     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2441     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2442     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2443     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2444 
2445     /* Ask for a timing update before we cork/uncork to get the best
2446      * accuracy for the transport latency suitable for the
2447      * check_smoother_status() call in the started callback */
2448     request_auto_timing_update(s, true);
2449 
2450     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2451         return NULL;
2452 
2453     /* This might cause the read index to hang again, hence
2454      * let's request a timing update */
2455     request_auto_timing_update(s, true);
2456 
2457     return o;
2458 }
2459 
pa_stream_trigger(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)2460 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2461     pa_operation *o;
2462 
2463     pa_assert(s);
2464     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2465 
2466     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2467     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2468     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2469     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2470 
2471     /* Ask for a timing update before we cork/uncork to get the best
2472      * accuracy for the transport latency suitable for the
2473      * check_smoother_status() call in the started callback */
2474     request_auto_timing_update(s, true);
2475 
2476     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2477         return NULL;
2478 
2479     /* This might cause the read index to start moving again, hence
2480      * let's request a timing update */
2481     request_auto_timing_update(s, true);
2482 
2483     return o;
2484 }
2485 
pa_stream_set_name(pa_stream * s,const char * name,pa_stream_success_cb_t cb,void * userdata)2486 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2487     pa_operation *o;
2488 
2489     pa_assert(s);
2490     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2491     pa_assert(name);
2492 
2493     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2494     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2495     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2496 
2497     if (s->context->version >= 13) {
2498         pa_proplist *p = pa_proplist_new();
2499 
2500         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2501         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2502         pa_proplist_free(p);
2503     } else {
2504         pa_tagstruct *t;
2505         uint32_t tag;
2506 
2507         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2508         t = pa_tagstruct_command(
2509                 s->context,
2510                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2511                 &tag);
2512         pa_tagstruct_putu32(t, s->channel);
2513         pa_tagstruct_puts(t, name);
2514         pa_pstream_send_tagstruct(s->context->pstream, t);
2515         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2516     }
2517 
2518     return o;
2519 }
2520 
pa_stream_get_time(pa_stream * s,pa_usec_t * r_usec)2521 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2522     pa_usec_t usec;
2523 
2524     pa_assert(s);
2525     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2526 
2527     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2528     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2529     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2530     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2531     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2532     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2533 
2534     if (s->smoother)
2535 #ifdef USE_SMOOTHER_2
2536         usec = pa_smoother_2_get(s->smoother, pa_rtclock_now());
2537 #else
2538         usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2539 #endif
2540 
2541     else
2542         usec = calc_time(s, false);
2543 
2544     /* Make sure the time runs monotonically */
2545     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2546         if (usec < s->previous_time)
2547             usec = s->previous_time;
2548         else
2549             s->previous_time = usec;
2550     }
2551 
2552     if (r_usec)
2553         *r_usec = usec;
2554 
2555     return 0;
2556 }
2557 
time_counter_diff(const pa_stream * s,pa_usec_t a,pa_usec_t b,int * negative)2558 static pa_usec_t time_counter_diff(const pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2559     pa_assert(s);
2560     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2561 
2562     if (negative)
2563         *negative = 0;
2564 
2565     if (a >= b)
2566         return a-b;
2567     else {
2568         if (negative && s->direction == PA_STREAM_RECORD) {
2569             *negative = 1;
2570             return b-a;
2571         } else
2572             return 0;
2573     }
2574 }
2575 
pa_stream_get_latency(pa_stream * s,pa_usec_t * r_usec,int * negative)2576 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2577     pa_usec_t t, c;
2578     int r;
2579     int64_t cindex;
2580 
2581     pa_assert(s);
2582     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2583     pa_assert(r_usec);
2584 
2585     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2586     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2587     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2588     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2589     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2590     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2591 
2592     if ((r = pa_stream_get_time(s, &t)) < 0)
2593         return r;
2594 
2595     if (s->direction == PA_STREAM_PLAYBACK)
2596         cindex = s->timing_info.write_index;
2597     else
2598         cindex = s->timing_info.read_index;
2599 
2600     if (cindex < 0)
2601         cindex = 0;
2602 
2603     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2604 
2605     if (s->direction == PA_STREAM_PLAYBACK)
2606         *r_usec = time_counter_diff(s, c, t, negative);
2607     else
2608         *r_usec = time_counter_diff(s, t, c, negative);
2609 
2610     return 0;
2611 }
2612 
pa_stream_get_timing_info(pa_stream * s)2613 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2614     pa_assert(s);
2615     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2616 
2617     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2618     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2619     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2620     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2621 
2622     return &s->timing_info;
2623 }
2624 
pa_stream_get_sample_spec(pa_stream * s)2625 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2626     pa_assert(s);
2627     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2628 
2629     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2630 
2631     return &s->sample_spec;
2632 }
2633 
pa_stream_get_channel_map(pa_stream * s)2634 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2635     pa_assert(s);
2636     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2637 
2638     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2639 
2640     return &s->channel_map;
2641 }
2642 
pa_stream_get_format_info(const pa_stream * s)2643 const pa_format_info* pa_stream_get_format_info(const pa_stream *s) {
2644     pa_assert(s);
2645     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2646 
2647     /* We don't have the format till routing is done */
2648     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2649     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2650 
2651     return s->format;
2652 }
pa_stream_get_buffer_attr(pa_stream * s)2653 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2654     pa_assert(s);
2655     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2656 
2657     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2658     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2659     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2660 
2661     return &s->buffer_attr;
2662 }
2663 
stream_set_buffer_attr_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)2664 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2665     pa_operation *o = userdata;
2666     int success = 1;
2667 
2668     pa_assert(pd);
2669     pa_assert(o);
2670     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2671 
2672     if (!o->context)
2673         goto finish;
2674 
2675     if (command != PA_COMMAND_REPLY) {
2676         if (pa_context_handle_error(o->context, command, t, false) < 0)
2677             goto finish;
2678 
2679         success = 0;
2680     } else {
2681         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2682             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2683                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2684                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2685                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2686                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2687                 goto finish;
2688             }
2689         } else if (o->stream->direction == PA_STREAM_RECORD) {
2690             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2691                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2692                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2693                 goto finish;
2694             }
2695         }
2696 
2697         if (o->stream->context->version >= 13) {
2698             pa_usec_t usec;
2699 
2700             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2701                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2702                 goto finish;
2703             }
2704 
2705             if (o->stream->direction == PA_STREAM_RECORD)
2706                 o->stream->timing_info.configured_source_usec = usec;
2707             else
2708                 o->stream->timing_info.configured_sink_usec = usec;
2709         }
2710 
2711         if (!pa_tagstruct_eof(t)) {
2712             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2713             goto finish;
2714         }
2715     }
2716 
2717     if (o->callback) {
2718         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2719         cb(o->stream, success, o->userdata);
2720     }
2721 
2722 finish:
2723     pa_operation_done(o);
2724     pa_operation_unref(o);
2725 }
2726 
pa_stream_set_buffer_attr(pa_stream * s,const pa_buffer_attr * attr,pa_stream_success_cb_t cb,void * userdata)2727 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2728     pa_operation *o;
2729     pa_tagstruct *t;
2730     uint32_t tag;
2731     pa_buffer_attr copy;
2732 
2733     pa_assert(s);
2734     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2735     pa_assert(attr);
2736 
2737     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2738     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2739     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2740     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2741 
2742     /* Ask for a timing update before we cork/uncork to get the best
2743      * accuracy for the transport latency suitable for the
2744      * check_smoother_status() call in the started callback */
2745     request_auto_timing_update(s, true);
2746 
2747     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2748 
2749     t = pa_tagstruct_command(
2750             s->context,
2751             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2752             &tag);
2753     pa_tagstruct_putu32(t, s->channel);
2754 
2755     copy = *attr;
2756     patch_buffer_attr(s, &copy, NULL);
2757     attr = &copy;
2758 
2759     pa_tagstruct_putu32(t, attr->maxlength);
2760 
2761     if (s->direction == PA_STREAM_PLAYBACK)
2762         pa_tagstruct_put(
2763                 t,
2764                 PA_TAG_U32, attr->tlength,
2765                 PA_TAG_U32, attr->prebuf,
2766                 PA_TAG_U32, attr->minreq,
2767                 PA_TAG_INVALID);
2768     else
2769         pa_tagstruct_putu32(t, attr->fragsize);
2770 
2771     if (s->context->version >= 13)
2772         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2773 
2774     if (s->context->version >= 14)
2775         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2776 
2777     pa_pstream_send_tagstruct(s->context->pstream, t);
2778     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2779 
2780     /* This might cause changes in the read/write index, hence let's
2781      * request a timing update */
2782     request_auto_timing_update(s, true);
2783 
2784     return o;
2785 }
2786 
pa_stream_get_device_index(const pa_stream * s)2787 uint32_t pa_stream_get_device_index(const pa_stream *s) {
2788     pa_assert(s);
2789     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2790 
2791     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2792     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2793     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2794     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2795     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2796 
2797     return s->device_index;
2798 }
2799 
pa_stream_get_device_name(const pa_stream * s)2800 const char *pa_stream_get_device_name(const pa_stream *s) {
2801     pa_assert(s);
2802     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2803 
2804     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2805     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2806     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2807     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2808     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2809 
2810     return s->device_name;
2811 }
2812 
pa_stream_is_suspended(const pa_stream * s)2813 int pa_stream_is_suspended(const pa_stream *s) {
2814     pa_assert(s);
2815     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2816 
2817     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2818     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2819     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2820     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2821 
2822     return s->suspended;
2823 }
2824 
pa_stream_is_corked(const pa_stream * s)2825 int pa_stream_is_corked(const pa_stream *s) {
2826     pa_assert(s);
2827     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2828 
2829     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2830     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2831     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2832 
2833     return s->corked;
2834 }
2835 
stream_update_sample_rate_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)2836 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2837     pa_operation *o = userdata;
2838     int success = 1;
2839 
2840     pa_assert(pd);
2841     pa_assert(o);
2842     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2843 
2844     if (!o->context)
2845         goto finish;
2846 
2847     if (command != PA_COMMAND_REPLY) {
2848         if (pa_context_handle_error(o->context, command, t, false) < 0)
2849             goto finish;
2850 
2851         success = 0;
2852     } else {
2853 
2854         if (!pa_tagstruct_eof(t)) {
2855             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2856             goto finish;
2857         }
2858     }
2859 
2860     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2861 #ifdef USE_SMOOTHER_2
2862     if (o->stream->smoother)
2863         pa_smoother_2_set_rate(o->stream->smoother, pa_rtclock_now(), o->stream->sample_spec.rate);
2864 #endif
2865     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2866 
2867     if (o->callback) {
2868         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2869         cb(o->stream, success, o->userdata);
2870     }
2871 
2872 finish:
2873     pa_operation_done(o);
2874     pa_operation_unref(o);
2875 }
2876 
pa_stream_update_sample_rate(pa_stream * s,uint32_t rate,pa_stream_success_cb_t cb,void * userdata)2877 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2878     pa_operation *o;
2879     pa_tagstruct *t;
2880     uint32_t tag;
2881 
2882     pa_assert(s);
2883     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2884 
2885     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2886     PA_CHECK_VALIDITY_RETURN_NULL(s->context, pa_sample_rate_valid(rate), PA_ERR_INVALID);
2887     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2888     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2889     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2890     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2891 
2892     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2893     o->private = PA_UINT_TO_PTR(rate);
2894 
2895     t = pa_tagstruct_command(
2896             s->context,
2897             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2898             &tag);
2899     pa_tagstruct_putu32(t, s->channel);
2900     pa_tagstruct_putu32(t, rate);
2901 
2902     pa_pstream_send_tagstruct(s->context->pstream, t);
2903     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2904 
2905     return o;
2906 }
2907 
pa_stream_proplist_update(pa_stream * s,pa_update_mode_t mode,pa_proplist * p,pa_stream_success_cb_t cb,void * userdata)2908 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2909     pa_operation *o;
2910     pa_tagstruct *t;
2911     uint32_t tag;
2912 
2913     pa_assert(s);
2914     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2915 
2916     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2917     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2918     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2919     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2920     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2921 
2922     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2923 
2924     t = pa_tagstruct_command(
2925             s->context,
2926             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2927             &tag);
2928     pa_tagstruct_putu32(t, s->channel);
2929     pa_tagstruct_putu32(t, (uint32_t) mode);
2930     pa_tagstruct_put_proplist(t, p);
2931 
2932     pa_pstream_send_tagstruct(s->context->pstream, t);
2933     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2934 
2935     /* Please note that we don't update s->proplist here, because we
2936      * don't export that field */
2937 
2938     return o;
2939 }
2940 
pa_stream_proplist_remove(pa_stream * s,const char * const keys[],pa_stream_success_cb_t cb,void * userdata)2941 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2942     pa_operation *o;
2943     pa_tagstruct *t;
2944     uint32_t tag;
2945     const char * const*k;
2946 
2947     pa_assert(s);
2948     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2949 
2950     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2951     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2952     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2953     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2954     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2955 
2956     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2957 
2958     t = pa_tagstruct_command(
2959             s->context,
2960             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2961             &tag);
2962     pa_tagstruct_putu32(t, s->channel);
2963 
2964     for (k = keys; *k; k++)
2965         pa_tagstruct_puts(t, *k);
2966 
2967     pa_tagstruct_puts(t, NULL);
2968 
2969     pa_pstream_send_tagstruct(s->context->pstream, t);
2970     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2971 
2972     /* Please note that we don't update s->proplist here, because we
2973      * don't export that field */
2974 
2975     return o;
2976 }
2977 
pa_stream_set_monitor_stream(pa_stream * s,uint32_t sink_input_idx)2978 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2979     pa_assert(s);
2980     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2981 
2982     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2983     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2984     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2985     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2986 
2987     s->direct_on_input = sink_input_idx;
2988 
2989     return 0;
2990 }
2991 
pa_stream_get_monitor_stream(const pa_stream * s)2992 uint32_t pa_stream_get_monitor_stream(const pa_stream *s) {
2993     pa_assert(s);
2994     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2995 
2996     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2997     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2998     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2999 
3000     return s->direct_on_input;
3001 }
3002