1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24
25 #include <string.h>
26 #include <stdio.h>
27 #include <string.h>
28
29 #include <pulse/def.h>
30 #include <pulse/timeval.h>
31 #include <pulse/rtclock.h>
32 #include <pulse/xmalloc.h>
33 #include <pulse/fork-detect.h>
34
35 #include <pulsecore/pstream-util.h>
36 #include <pulsecore/sample-util.h>
37 #include <pulsecore/log.h>
38 #include <pulsecore/hashmap.h>
39 #include <pulsecore/macro.h>
40 #include <pulsecore/core-rtclock.h>
41 #include <pulsecore/core-util.h>
42
43 #include "internal.h"
44 #include "stream.h"
45
46 /* #define STREAM_DEBUG */
47
48 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
49 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
50
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #ifndef USE_SMOOTHER_2
53 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
54 #define SMOOTHER_MIN_HISTORY (4)
55 #endif
56
pa_stream_new(pa_context * c,const char * name,const pa_sample_spec * ss,const pa_channel_map * map)57 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
58 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
59 }
60
reset_callbacks(pa_stream * s)61 static void reset_callbacks(pa_stream *s) {
62 s->read_callback = NULL;
63 s->read_userdata = NULL;
64 s->write_callback = NULL;
65 s->write_userdata = NULL;
66 s->state_callback = NULL;
67 s->state_userdata = NULL;
68 s->overflow_callback = NULL;
69 s->overflow_userdata = NULL;
70 s->underflow_callback = NULL;
71 s->underflow_userdata = NULL;
72 s->latency_update_callback = NULL;
73 s->latency_update_userdata = NULL;
74 s->moved_callback = NULL;
75 s->moved_userdata = NULL;
76 s->suspended_callback = NULL;
77 s->suspended_userdata = NULL;
78 s->started_callback = NULL;
79 s->started_userdata = NULL;
80 s->event_callback = NULL;
81 s->event_userdata = NULL;
82 s->buffer_attr_callback = NULL;
83 s->buffer_attr_userdata = NULL;
84 s->underflow_ohos_callback = NULL;
85 s->underflow_ohos_userdata = NULL;
86 }
87
pa_stream_new_with_proplist_internal(pa_context * c,const char * name,const pa_sample_spec * ss,const pa_channel_map * map,pa_format_info * const * formats,unsigned int n_formats,pa_proplist * p)88 static pa_stream *pa_stream_new_with_proplist_internal(
89 pa_context *c,
90 const char *name,
91 const pa_sample_spec *ss,
92 const pa_channel_map *map,
93 pa_format_info * const *formats,
94 unsigned int n_formats,
95 pa_proplist *p) {
96
97 pa_stream *s;
98 unsigned int i;
99
100 pa_assert(c);
101 pa_assert(PA_REFCNT_VALUE(c) >= 1);
102 pa_assert((ss == NULL && map == NULL) || (formats == NULL && n_formats == 0));
103 pa_assert(n_formats < PA_MAX_FORMATS);
104
105 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
106 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
107
108 s = pa_xnew(pa_stream, 1);
109 PA_REFCNT_INIT(s);
110 s->context = c;
111 s->mainloop = c->mainloop;
112
113 s->direction = PA_STREAM_NODIRECTION;
114 s->state = PA_STREAM_UNCONNECTED;
115 s->flags = 0;
116
117 if (ss)
118 s->sample_spec = *ss;
119 else
120 pa_sample_spec_init(&s->sample_spec);
121
122 if (map)
123 s->channel_map = *map;
124 else
125 pa_channel_map_init(&s->channel_map);
126
127 s->n_formats = 0;
128 if (formats) {
129 s->n_formats = n_formats;
130 for (i = 0; i < n_formats; i++)
131 s->req_formats[i] = pa_format_info_copy(formats[i]);
132 }
133
134 /* We'll get the final negotiated format after connecting */
135 s->format = NULL;
136
137 s->direct_on_input = PA_INVALID_INDEX;
138
139 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
140 if (name)
141 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
142
143 s->channel = 0;
144 s->channel_valid = false;
145 s->syncid = c->csyncid++;
146 s->stream_index = PA_INVALID_INDEX;
147
148 s->requested_bytes = 0;
149 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
150
151 /* We initialize the target length here, so that if the user
152 * passes no explicit buffering metrics the default is similar to
153 * what older PA versions provided. */
154
155 s->buffer_attr.maxlength = (uint32_t) -1;
156 if (ss)
157 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
158 else {
159 /* FIXME: We assume a worst-case compressed format corresponding to
160 * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
161 pa_sample_spec tmp_ss = {
162 .format = PA_SAMPLE_S16NE,
163 .rate = 48000,
164 .channels = 2,
165 };
166 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
167 }
168 s->buffer_attr.minreq = (uint32_t) -1;
169 s->buffer_attr.prebuf = (uint32_t) -1;
170 s->buffer_attr.fragsize = (uint32_t) -1;
171
172 s->device_index = PA_INVALID_INDEX;
173 s->device_name = NULL;
174 s->suspended = false;
175 s->corked = false;
176
177 s->write_memblock = NULL;
178 s->write_data = NULL;
179
180 pa_memchunk_reset(&s->peek_memchunk);
181 s->peek_data = NULL;
182 s->record_memblockq = NULL;
183
184 memset(&s->timing_info, 0, sizeof(s->timing_info));
185 s->timing_info_valid = false;
186
187 s->previous_time = 0;
188 s->latest_underrun_at_index = -1;
189
190 s->read_index_not_before = 0;
191 s->write_index_not_before = 0;
192 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
193 s->write_index_corrections[i].valid = 0;
194 s->current_write_index_correction = 0;
195
196 s->auto_timing_update_event = NULL;
197 s->auto_timing_update_requested = false;
198 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
199
200 reset_callbacks(s);
201
202 s->smoother = NULL;
203
204 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
205 PA_LLIST_PREPEND(pa_stream, c->streams, s);
206 pa_stream_ref(s);
207
208 return s;
209 }
210
pa_stream_new_with_proplist(pa_context * c,const char * name,const pa_sample_spec * ss,const pa_channel_map * map,pa_proplist * p)211 pa_stream *pa_stream_new_with_proplist(
212 pa_context *c,
213 const char *name,
214 const pa_sample_spec *ss,
215 const pa_channel_map *map,
216 pa_proplist *p) {
217
218 pa_channel_map tmap;
219
220 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
221 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
222 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
223 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
224 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
225
226 if (!map)
227 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
228
229 return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, 0, p);
230 }
231
pa_stream_new_extended(pa_context * c,const char * name,pa_format_info * const * formats,unsigned int n_formats,pa_proplist * p)232 pa_stream *pa_stream_new_extended(
233 pa_context *c,
234 const char *name,
235 pa_format_info * const *formats,
236 unsigned int n_formats,
237 pa_proplist *p) {
238
239 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
240
241 return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, n_formats, p);
242 }
243
stream_unlink(pa_stream * s)244 static void stream_unlink(pa_stream *s) {
245 pa_operation *o, *n;
246 pa_assert(s);
247
248 if (!s->context)
249 return;
250
251 /* Detach from context */
252
253 /* Unref all operation objects that point to us */
254 for (o = s->context->operations; o; o = n) {
255 n = o->next;
256
257 if (o->stream == s)
258 pa_operation_cancel(o);
259 }
260
261 /* Drop all outstanding replies for this stream */
262 if (s->context->pdispatch)
263 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
264
265 if (s->channel_valid) {
266 pa_hashmap_remove((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel));
267 s->channel = 0;
268 s->channel_valid = false;
269 }
270
271 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
272 pa_stream_unref(s);
273
274 s->context = NULL;
275
276 if (s->auto_timing_update_event) {
277 pa_assert(s->mainloop);
278 s->mainloop->time_free(s->auto_timing_update_event);
279 }
280
281 reset_callbacks(s);
282 }
283
stream_free(pa_stream * s)284 static void stream_free(pa_stream *s) {
285 unsigned int i;
286
287 pa_assert(s);
288
289 stream_unlink(s);
290
291 if (s->write_memblock) {
292 if (s->write_data)
293 pa_memblock_release(s->write_memblock);
294 pa_memblock_unref(s->write_memblock);
295 }
296
297 if (s->peek_memchunk.memblock) {
298 if (s->peek_data)
299 pa_memblock_release(s->peek_memchunk.memblock);
300 pa_memblock_unref(s->peek_memchunk.memblock);
301 }
302
303 if (s->record_memblockq)
304 pa_memblockq_free(s->record_memblockq);
305
306 if (s->proplist)
307 pa_proplist_free(s->proplist);
308
309 if (s->smoother)
310 #ifdef USE_SMOOTHER_2
311 pa_smoother_2_free(s->smoother);
312 #else
313 pa_smoother_free(s->smoother);
314 #endif
315
316 for (i = 0; i < s->n_formats; i++)
317 pa_format_info_free(s->req_formats[i]);
318
319 if (s->format)
320 pa_format_info_free(s->format);
321
322 pa_xfree(s->device_name);
323 pa_xfree(s);
324 }
325
pa_stream_unref(pa_stream * s)326 void pa_stream_unref(pa_stream *s) {
327 pa_assert(s);
328 pa_assert(PA_REFCNT_VALUE(s) >= 1);
329
330 if (PA_REFCNT_DEC(s) <= 0)
331 stream_free(s);
332 }
333
pa_stream_ref(pa_stream * s)334 pa_stream* pa_stream_ref(pa_stream *s) {
335 pa_assert(s);
336 pa_assert(PA_REFCNT_VALUE(s) >= 1);
337
338 PA_REFCNT_INC(s);
339 return s;
340 }
341
pa_stream_get_state(const pa_stream * s)342 pa_stream_state_t pa_stream_get_state(const pa_stream *s) {
343 pa_assert(s);
344 pa_assert(PA_REFCNT_VALUE(s) >= 1);
345
346 return s->state;
347 }
348
pa_stream_get_context(const pa_stream * s)349 pa_context* pa_stream_get_context(const pa_stream *s) {
350 pa_assert(s);
351 pa_assert(PA_REFCNT_VALUE(s) >= 1);
352
353 return s->context;
354 }
355
pa_stream_get_index(const pa_stream * s)356 uint32_t pa_stream_get_index(const pa_stream *s) {
357 pa_assert(s);
358 pa_assert(PA_REFCNT_VALUE(s) >= 1);
359
360 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
361 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
362
363 return s->stream_index;
364 }
365
pa_stream_set_state(pa_stream * s,pa_stream_state_t st)366 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
367 pa_assert(s);
368 pa_assert(PA_REFCNT_VALUE(s) >= 1);
369
370 if (s->state == st)
371 return;
372
373 pa_stream_ref(s);
374
375 s->state = st;
376
377 if (s->state_callback)
378 s->state_callback(s, s->state_userdata);
379
380 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
381 stream_unlink(s);
382
383 pa_stream_unref(s);
384 }
385
request_auto_timing_update(pa_stream * s,bool force)386 static void request_auto_timing_update(pa_stream *s, bool force) {
387 pa_assert(s);
388 pa_assert(PA_REFCNT_VALUE(s) >= 1);
389
390 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
391 return;
392
393 if (s->state == PA_STREAM_READY &&
394 (force || !s->auto_timing_update_requested)) {
395 pa_operation *o;
396
397 #ifdef STREAM_DEBUG
398 pa_log_debug("Automatically requesting new timing data");
399 #endif
400
401 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
402 pa_operation_unref(o);
403 s->auto_timing_update_requested = true;
404 }
405 }
406
407 if (s->auto_timing_update_event) {
408 if (s->suspended && !force) {
409 pa_assert(s->mainloop);
410 s->mainloop->time_free(s->auto_timing_update_event);
411 s->auto_timing_update_event = NULL;
412 } else {
413 if (force)
414 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
415
416 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
417
418 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
419 }
420 }
421 }
422
pa_command_stream_killed(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)423 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
424 pa_context *c = userdata;
425 pa_stream *s;
426 uint32_t channel;
427
428 pa_assert(pd);
429 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
430 pa_assert(t);
431 pa_assert(c);
432 pa_assert(PA_REFCNT_VALUE(c) >= 1);
433
434 pa_context_ref(c);
435
436 if (pa_tagstruct_getu32(t, &channel) < 0 ||
437 !pa_tagstruct_eof(t)) {
438 pa_context_fail(c, PA_ERR_PROTOCOL);
439 goto finish;
440 }
441
442 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
443 goto finish;
444
445 if (s->state != PA_STREAM_READY)
446 goto finish;
447
448 pa_context_set_error(c, PA_ERR_KILLED);
449 pa_stream_set_state(s, PA_STREAM_FAILED);
450
451 finish:
452 pa_context_unref(c);
453 }
454
check_smoother_status(pa_stream * s,bool aposteriori,bool force_start,bool force_stop)455 static void check_smoother_status(pa_stream *s, bool aposteriori, bool force_start, bool force_stop) {
456 pa_usec_t x;
457
458 pa_assert(s);
459 pa_assert(!force_start || !force_stop);
460
461 if (!s->smoother)
462 return;
463
464 x = pa_rtclock_now();
465
466 if (s->timing_info_valid) {
467 if (aposteriori)
468 x -= s->timing_info.transport_usec;
469 else
470 x += s->timing_info.transport_usec;
471 }
472
473 if (s->suspended || s->corked || force_stop)
474 #ifdef USE_SMOOTHER_2
475 pa_smoother_2_pause(s->smoother, x);
476 #else
477 pa_smoother_pause(s->smoother, x);
478 #endif
479 else if (force_start || s->buffer_attr.prebuf == 0) {
480
481 if (!s->timing_info_valid &&
482 !aposteriori &&
483 !force_start &&
484 !force_stop &&
485 s->context->version >= 13) {
486
487 /* If the server supports STARTED events we take them as
488 * indications when audio really starts/stops playing, if
489 * we don't have any timing info yet -- instead of trying
490 * to be smart and guessing the server time. Otherwise the
491 * unknown transport delay adds too much noise to our time
492 * calculations. */
493
494 return;
495 }
496
497 #ifdef USE_SMOOTHER_2
498 pa_smoother_2_resume(s->smoother, x);
499 #else
500 pa_smoother_resume(s->smoother, x, true);
501 #endif
502 }
503
504 /* Please note that we have no idea if playback actually started
505 * if prebuf is non-zero! */
506 }
507
508 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata);
509
pa_command_stream_moved(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)510 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
511 pa_context *c = userdata;
512 pa_stream *s;
513 uint32_t channel;
514 const char *dn;
515 bool suspended;
516 uint32_t di;
517 pa_usec_t usec = 0;
518 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
519
520 pa_assert(pd);
521 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
522 pa_assert(t);
523 pa_assert(c);
524 pa_assert(PA_REFCNT_VALUE(c) >= 1);
525
526 pa_context_ref(c);
527
528 if (c->version < 12) {
529 pa_context_fail(c, PA_ERR_PROTOCOL);
530 goto finish;
531 }
532
533 if (pa_tagstruct_getu32(t, &channel) < 0 ||
534 pa_tagstruct_getu32(t, &di) < 0 ||
535 pa_tagstruct_gets(t, &dn) < 0 ||
536 pa_tagstruct_get_boolean(t, &suspended) < 0) {
537 pa_context_fail(c, PA_ERR_PROTOCOL);
538 goto finish;
539 }
540
541 if (c->version >= 13) {
542
543 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
544 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
545 pa_tagstruct_getu32(t, &fragsize) < 0 ||
546 pa_tagstruct_get_usec(t, &usec) < 0) {
547 pa_context_fail(c, PA_ERR_PROTOCOL);
548 goto finish;
549 }
550 } else {
551 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
552 pa_tagstruct_getu32(t, &tlength) < 0 ||
553 pa_tagstruct_getu32(t, &prebuf) < 0 ||
554 pa_tagstruct_getu32(t, &minreq) < 0 ||
555 pa_tagstruct_get_usec(t, &usec) < 0) {
556 pa_context_fail(c, PA_ERR_PROTOCOL);
557 goto finish;
558 }
559 }
560 }
561
562 if (!pa_tagstruct_eof(t)) {
563 pa_context_fail(c, PA_ERR_PROTOCOL);
564 goto finish;
565 }
566
567 if (!dn || di == PA_INVALID_INDEX) {
568 pa_context_fail(c, PA_ERR_PROTOCOL);
569 goto finish;
570 }
571
572 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
573 goto finish;
574
575 if (s->state != PA_STREAM_READY)
576 goto finish;
577
578 if (c->version >= 13) {
579 if (s->direction == PA_STREAM_RECORD)
580 s->timing_info.configured_source_usec = usec;
581 else
582 s->timing_info.configured_sink_usec = usec;
583
584 s->buffer_attr.maxlength = maxlength;
585 s->buffer_attr.fragsize = fragsize;
586 s->buffer_attr.tlength = tlength;
587 s->buffer_attr.prebuf = prebuf;
588 s->buffer_attr.minreq = minreq;
589 }
590
591 pa_xfree(s->device_name);
592 s->device_name = pa_xstrdup(dn);
593 s->device_index = di;
594
595 s->suspended = suspended;
596
597 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
598 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
599 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
600 request_auto_timing_update(s, true);
601 }
602
603 check_smoother_status(s, true, false, false);
604 request_auto_timing_update(s, true);
605
606 if (s->moved_callback)
607 s->moved_callback(s, s->moved_userdata);
608
609 finish:
610 pa_context_unref(c);
611 }
612
pa_command_stream_buffer_attr(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)613 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
614 pa_context *c = userdata;
615 pa_stream *s;
616 uint32_t channel;
617 pa_usec_t usec = 0;
618 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
619
620 pa_assert(pd);
621 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
622 pa_assert(t);
623 pa_assert(c);
624 pa_assert(PA_REFCNT_VALUE(c) >= 1);
625
626 pa_context_ref(c);
627
628 if (c->version < 15) {
629 pa_context_fail(c, PA_ERR_PROTOCOL);
630 goto finish;
631 }
632
633 if (pa_tagstruct_getu32(t, &channel) < 0) {
634 pa_context_fail(c, PA_ERR_PROTOCOL);
635 goto finish;
636 }
637
638 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
639 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
640 pa_tagstruct_getu32(t, &fragsize) < 0 ||
641 pa_tagstruct_get_usec(t, &usec) < 0) {
642 pa_context_fail(c, PA_ERR_PROTOCOL);
643 goto finish;
644 }
645 } else {
646 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
647 pa_tagstruct_getu32(t, &tlength) < 0 ||
648 pa_tagstruct_getu32(t, &prebuf) < 0 ||
649 pa_tagstruct_getu32(t, &minreq) < 0 ||
650 pa_tagstruct_get_usec(t, &usec) < 0) {
651 pa_context_fail(c, PA_ERR_PROTOCOL);
652 goto finish;
653 }
654 }
655
656 if (!pa_tagstruct_eof(t)) {
657 pa_context_fail(c, PA_ERR_PROTOCOL);
658 goto finish;
659 }
660
661 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
662 goto finish;
663
664 if (s->state != PA_STREAM_READY)
665 goto finish;
666
667 if (s->direction == PA_STREAM_RECORD)
668 s->timing_info.configured_source_usec = usec;
669 else
670 s->timing_info.configured_sink_usec = usec;
671
672 s->buffer_attr.maxlength = maxlength;
673 s->buffer_attr.fragsize = fragsize;
674 s->buffer_attr.tlength = tlength;
675 s->buffer_attr.prebuf = prebuf;
676 s->buffer_attr.minreq = minreq;
677
678 request_auto_timing_update(s, true);
679
680 if (s->buffer_attr_callback)
681 s->buffer_attr_callback(s, s->buffer_attr_userdata);
682
683 finish:
684 pa_context_unref(c);
685 }
686
pa_command_stream_suspended(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)687 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
688 pa_context *c = userdata;
689 pa_stream *s;
690 uint32_t channel;
691 bool suspended;
692
693 pa_assert(pd);
694 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
695 pa_assert(t);
696 pa_assert(c);
697 pa_assert(PA_REFCNT_VALUE(c) >= 1);
698
699 pa_context_ref(c);
700
701 if (c->version < 12) {
702 pa_context_fail(c, PA_ERR_PROTOCOL);
703 goto finish;
704 }
705
706 if (pa_tagstruct_getu32(t, &channel) < 0 ||
707 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
708 !pa_tagstruct_eof(t)) {
709 pa_context_fail(c, PA_ERR_PROTOCOL);
710 goto finish;
711 }
712
713 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
714 goto finish;
715
716 if (s->state != PA_STREAM_READY)
717 goto finish;
718
719 s->suspended = suspended;
720
721 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
722 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
723 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
724 request_auto_timing_update(s, true);
725 }
726
727 check_smoother_status(s, true, false, false);
728 request_auto_timing_update(s, true);
729
730 if (s->suspended_callback)
731 s->suspended_callback(s, s->suspended_userdata);
732
733 finish:
734 pa_context_unref(c);
735 }
736
pa_command_stream_started(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)737 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
738 pa_context *c = userdata;
739 pa_stream *s;
740 uint32_t channel;
741
742 pa_assert(pd);
743 pa_assert(command == PA_COMMAND_STARTED);
744 pa_assert(t);
745 pa_assert(c);
746 pa_assert(PA_REFCNT_VALUE(c) >= 1);
747
748 pa_context_ref(c);
749
750 if (c->version < 13) {
751 pa_context_fail(c, PA_ERR_PROTOCOL);
752 goto finish;
753 }
754
755 if (pa_tagstruct_getu32(t, &channel) < 0 ||
756 !pa_tagstruct_eof(t)) {
757 pa_context_fail(c, PA_ERR_PROTOCOL);
758 goto finish;
759 }
760
761 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
762 goto finish;
763
764 if (s->state != PA_STREAM_READY)
765 goto finish;
766
767 check_smoother_status(s, true, true, false);
768 request_auto_timing_update(s, true);
769
770 if (s->started_callback)
771 s->started_callback(s, s->started_userdata);
772
773 finish:
774 pa_context_unref(c);
775 }
776
pa_command_stream_event(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)777 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
778 pa_context *c = userdata;
779 pa_stream *s;
780 uint32_t channel;
781 pa_proplist *pl = NULL;
782 const char *event;
783
784 pa_assert(pd);
785 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
786 pa_assert(t);
787 pa_assert(c);
788 pa_assert(PA_REFCNT_VALUE(c) >= 1);
789
790 pa_context_ref(c);
791
792 if (c->version < 15) {
793 pa_context_fail(c, PA_ERR_PROTOCOL);
794 goto finish;
795 }
796
797 pl = pa_proplist_new();
798
799 if (pa_tagstruct_getu32(t, &channel) < 0 ||
800 pa_tagstruct_gets(t, &event) < 0 ||
801 pa_tagstruct_get_proplist(t, pl) < 0 ||
802 !pa_tagstruct_eof(t) || !event) {
803 pa_context_fail(c, PA_ERR_PROTOCOL);
804 goto finish;
805 }
806
807 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
808 goto finish;
809
810 if (s->state != PA_STREAM_READY)
811 goto finish;
812
813 if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) {
814 /* Let client know what the running time was when the stream had to be killed */
815 pa_usec_t stream_time;
816 if (pa_stream_get_time(s, &stream_time) == 0)
817 pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) stream_time);
818 }
819
820 if (s->event_callback)
821 s->event_callback(s, event, pl, s->event_userdata);
822
823 finish:
824 pa_context_unref(c);
825
826 if (pl)
827 pa_proplist_free(pl);
828 }
829
pa_command_request(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)830 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
831 pa_stream *s;
832 pa_context *c = userdata;
833 uint32_t bytes, channel;
834
835 pa_assert(pd);
836 pa_assert(command == PA_COMMAND_REQUEST);
837 pa_assert(t);
838 pa_assert(c);
839 pa_assert(PA_REFCNT_VALUE(c) >= 1);
840
841 pa_context_ref(c);
842
843 if (pa_tagstruct_getu32(t, &channel) < 0 ||
844 pa_tagstruct_getu32(t, &bytes) < 0 ||
845 !pa_tagstruct_eof(t)) {
846 pa_context_fail(c, PA_ERR_PROTOCOL);
847 goto finish;
848 }
849
850 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
851 goto finish;
852
853 if (s->state != PA_STREAM_READY)
854 goto finish;
855
856 s->requested_bytes += bytes;
857
858 #ifdef STREAM_DEBUG
859 pa_log_debug("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes);
860 #endif
861
862 if (s->requested_bytes > 0 && s->write_callback)
863 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
864
865 finish:
866 pa_context_unref(c);
867 }
868
pa_stream_get_underflow_index(const pa_stream * p)869 int64_t pa_stream_get_underflow_index(const pa_stream *p) {
870 pa_assert(p);
871 return p->latest_underrun_at_index;
872 }
873
pa_command_overflow_or_underflow(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)874 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
875 pa_stream *s;
876 pa_context *c = userdata;
877 uint32_t channel;
878 int64_t offset = -1;
879
880 pa_assert(pd);
881 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW
882 || command == PA_COMMAND_UNDERFLOW_OHOS);
883 pa_assert(t);
884 pa_assert(c);
885 pa_assert(PA_REFCNT_VALUE(c) >= 1);
886
887 pa_context_ref(c);
888
889 if (pa_tagstruct_getu32(t, &channel) < 0) {
890 pa_context_fail(c, PA_ERR_PROTOCOL);
891 goto finish;
892 }
893
894 if (c->version >= 23 && command == PA_COMMAND_UNDERFLOW) {
895 if (pa_tagstruct_gets64(t, &offset) < 0) {
896 pa_context_fail(c, PA_ERR_PROTOCOL);
897 goto finish;
898 }
899 }
900
901 if (!pa_tagstruct_eof(t)) {
902 pa_context_fail(c, PA_ERR_PROTOCOL);
903 goto finish;
904 }
905
906 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
907 goto finish;
908
909 if (s->state != PA_STREAM_READY)
910 goto finish;
911
912 if (command == PA_COMMAND_UNDERFLOW_OHOS) {
913 if (s->underflow_ohos_callback) {
914 s->underflow_ohos_callback(s, s->underflow_ohos_userdata);
915 }
916 goto finish;
917 }
918
919 if (offset != -1)
920 s->latest_underrun_at_index = offset;
921
922 if (s->buffer_attr.prebuf > 0)
923 check_smoother_status(s, true, false, true);
924
925 request_auto_timing_update(s, true);
926
927 if (command == PA_COMMAND_OVERFLOW) {
928 if (s->overflow_callback)
929 s->overflow_callback(s, s->overflow_userdata);
930 } else if (command == PA_COMMAND_UNDERFLOW) {
931 if (s->underflow_callback)
932 s->underflow_callback(s, s->underflow_userdata);
933 }
934
935 finish:
936 pa_context_unref(c);
937 }
938
invalidate_indexes(pa_stream * s,bool r,bool w)939 static void invalidate_indexes(pa_stream *s, bool r, bool w) {
940 pa_assert(s);
941 pa_assert(PA_REFCNT_VALUE(s) >= 1);
942
943 #ifdef STREAM_DEBUG
944 pa_log_debug("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag);
945 #endif
946
947 if (s->state != PA_STREAM_READY)
948 return;
949
950 if (w) {
951 s->write_index_not_before = s->context->ctag;
952
953 if (s->timing_info_valid)
954 s->timing_info.write_index_corrupt = true;
955
956 #ifdef STREAM_DEBUG
957 pa_log_debug("write_index invalidated");
958 #endif
959 }
960
961 if (r) {
962 s->read_index_not_before = s->context->ctag;
963
964 if (s->timing_info_valid)
965 s->timing_info.read_index_corrupt = true;
966
967 #ifdef STREAM_DEBUG
968 pa_log_debug("read_index invalidated");
969 #endif
970 }
971
972 request_auto_timing_update(s, true);
973 }
974
auto_timing_update_callback(pa_mainloop_api * m,pa_time_event * e,const struct timeval * t,void * userdata)975 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
976 pa_stream *s = userdata;
977
978 pa_assert(s);
979 pa_assert(PA_REFCNT_VALUE(s) >= 1);
980
981 pa_stream_ref(s);
982 request_auto_timing_update(s, false);
983 pa_stream_unref(s);
984 }
985
create_stream_complete(pa_stream * s)986 static void create_stream_complete(pa_stream *s) {
987 pa_assert(s);
988 pa_assert(PA_REFCNT_VALUE(s) >= 1);
989 pa_assert(s->state == PA_STREAM_CREATING);
990
991 pa_stream_set_state(s, PA_STREAM_READY);
992
993 if (s->requested_bytes > 0 && s->write_callback)
994 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
995
996 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
997 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
998 pa_assert(!s->auto_timing_update_event);
999 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
1000
1001 request_auto_timing_update(s, true);
1002 }
1003
1004 check_smoother_status(s, true, false, false);
1005 }
1006
patch_buffer_attr(pa_stream * s,pa_buffer_attr * attr,pa_stream_flags_t * flags)1007 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
1008 const char *e;
1009
1010 pa_assert(s);
1011 pa_assert(attr);
1012
1013 if ((e = getenv("PULSE_LATENCY_MSEC"))) {
1014 uint32_t ms;
1015 pa_sample_spec ss;
1016
1017 pa_sample_spec_init(&ss);
1018
1019 if (pa_sample_spec_valid(&s->sample_spec))
1020 ss = s->sample_spec;
1021 else if (s->n_formats == 1)
1022 pa_format_info_to_sample_spec(s->req_formats[0], &ss, NULL);
1023
1024 if (pa_atou(e, &ms) < 0 || ms <= 0)
1025 pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
1026 else if (!pa_sample_spec_valid(&s->sample_spec))
1027 pa_log_debug("Ignoring $PULSE_LATENCY_MSEC: %s (invalid sample spec)", e);
1028 else {
1029 attr->maxlength = (uint32_t) -1;
1030 attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &ss);
1031 attr->minreq = (uint32_t) -1;
1032 attr->prebuf = (uint32_t) -1;
1033 attr->fragsize = attr->tlength;
1034
1035 if (flags)
1036 *flags |= PA_STREAM_ADJUST_LATENCY;
1037 }
1038 }
1039
1040 if (s->context->version >= 13)
1041 return;
1042
1043 /* Version older than 0.9.10 didn't do server side buffer_attr
1044 * selection, hence we have to fake it on the client side. */
1045
1046 /* We choose fairly conservative values here, to not confuse
1047 * old clients with extremely large playback buffers */
1048
1049 if (attr->maxlength == (uint32_t) -1)
1050 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
1051
1052 if (attr->tlength == (uint32_t) -1)
1053 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
1054
1055 if (attr->minreq == (uint32_t) -1)
1056 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
1057
1058 if (attr->prebuf == (uint32_t) -1)
1059 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
1060
1061 if (attr->fragsize == (uint32_t) -1)
1062 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
1063 }
1064
pa_create_stream_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1065 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1066 pa_stream *s = userdata;
1067 uint32_t requested_bytes = 0;
1068
1069 pa_assert(pd);
1070 pa_assert(s);
1071 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1072 pa_assert(s->state == PA_STREAM_CREATING);
1073
1074 pa_stream_ref(s);
1075
1076 if (command != PA_COMMAND_REPLY) {
1077 if (pa_context_handle_error(s->context, command, t, false) < 0)
1078 goto finish;
1079
1080 pa_stream_set_state(s, PA_STREAM_FAILED);
1081 goto finish;
1082 }
1083
1084 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
1085 s->channel == PA_INVALID_INDEX ||
1086 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
1087 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
1088 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1089 goto finish;
1090 }
1091
1092 s->requested_bytes = (int64_t) requested_bytes;
1093
1094 if (s->context->version >= 9) {
1095 if (s->direction == PA_STREAM_PLAYBACK) {
1096 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1097 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1098 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1099 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1100 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1101 goto finish;
1102 }
1103 } else if (s->direction == PA_STREAM_RECORD) {
1104 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1105 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1106 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1107 goto finish;
1108 }
1109 }
1110 }
1111
1112 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1113 pa_sample_spec ss;
1114 pa_channel_map cm;
1115 const char *dn = NULL;
1116 bool suspended;
1117
1118 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1119 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1120 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1121 pa_tagstruct_gets(t, &dn) < 0 ||
1122 pa_tagstruct_get_boolean(t, &suspended) < 0) {
1123 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1124 goto finish;
1125 }
1126
1127 if (!dn || s->device_index == PA_INVALID_INDEX ||
1128 ss.channels != cm.channels ||
1129 !pa_channel_map_valid(&cm) ||
1130 !pa_sample_spec_valid(&ss) ||
1131 (s->n_formats == 0 && (
1132 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1133 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1134 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1135 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1136 goto finish;
1137 }
1138
1139 pa_xfree(s->device_name);
1140 s->device_name = pa_xstrdup(dn);
1141 s->suspended = suspended;
1142
1143 s->channel_map = cm;
1144 s->sample_spec = ss;
1145 }
1146
1147 #ifdef USE_SMOOTHER_2
1148 if (s->flags & PA_STREAM_INTERPOLATE_TIMING)
1149 pa_smoother_2_set_sample_spec(s->smoother, pa_rtclock_now(), &s->sample_spec);
1150 #endif
1151
1152 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1153 pa_usec_t usec;
1154
1155 if (pa_tagstruct_get_usec(t, &usec) < 0) {
1156 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1157 goto finish;
1158 }
1159
1160 if (s->direction == PA_STREAM_RECORD)
1161 s->timing_info.configured_source_usec = usec;
1162 else
1163 s->timing_info.configured_sink_usec = usec;
1164 }
1165
1166 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1167 || s->context->version >= 22) {
1168
1169 pa_format_info *f = pa_format_info_new();
1170
1171 if (pa_tagstruct_get_format_info(t, f) < 0 || !pa_format_info_valid(f)) {
1172 pa_format_info_free(f);
1173 if (s->n_formats > 0) {
1174 /* We used the extended API, so we should have got back a proper format */
1175 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1176 goto finish;
1177 }
1178 } else
1179 s->format = f;
1180 }
1181
1182 if (!pa_tagstruct_eof(t)) {
1183 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1184 goto finish;
1185 }
1186
1187 if (s->direction == PA_STREAM_RECORD) {
1188 pa_assert(!s->record_memblockq);
1189
1190 s->record_memblockq = pa_memblockq_new(
1191 "client side record memblockq",
1192 0,
1193 s->buffer_attr.maxlength,
1194 0,
1195 &s->sample_spec,
1196 1,
1197 0,
1198 0,
1199 NULL);
1200 }
1201
1202 s->channel_valid = true;
1203 pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1204
1205 create_stream_complete(s);
1206
1207 finish:
1208 pa_stream_unref(s);
1209 }
1210
create_stream(pa_stream_direction_t direction,pa_stream * s,const char * dev,const pa_buffer_attr * attr,pa_stream_flags_t flags,const pa_cvolume * volume,pa_stream * sync_stream)1211 static int create_stream(
1212 pa_stream_direction_t direction,
1213 pa_stream *s,
1214 const char *dev,
1215 const pa_buffer_attr *attr,
1216 pa_stream_flags_t flags,
1217 const pa_cvolume *volume,
1218 pa_stream *sync_stream) {
1219
1220 pa_tagstruct *t;
1221 uint32_t tag;
1222 bool volume_set = !!volume;
1223 pa_cvolume cv;
1224 uint32_t i;
1225
1226 pa_assert(s);
1227 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1228 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1229
1230 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1231 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1232 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1233 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1234 PA_STREAM_INTERPOLATE_TIMING|
1235 PA_STREAM_NOT_MONOTONIC|
1236 PA_STREAM_AUTO_TIMING_UPDATE|
1237 PA_STREAM_NO_REMAP_CHANNELS|
1238 PA_STREAM_NO_REMIX_CHANNELS|
1239 PA_STREAM_FIX_FORMAT|
1240 PA_STREAM_FIX_RATE|
1241 PA_STREAM_FIX_CHANNELS|
1242 PA_STREAM_DONT_MOVE|
1243 PA_STREAM_VARIABLE_RATE|
1244 PA_STREAM_PEAK_DETECT|
1245 PA_STREAM_START_MUTED|
1246 PA_STREAM_ADJUST_LATENCY|
1247 PA_STREAM_EARLY_REQUESTS|
1248 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1249 PA_STREAM_START_UNMUTED|
1250 PA_STREAM_FAIL_ON_SUSPEND|
1251 PA_STREAM_RELATIVE_VOLUME|
1252 PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1253
1254 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1255 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1256 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1257 /* Although some of the other flags are not supported on older
1258 * version, we don't check for them here, because it doesn't hurt
1259 * when they are passed but actually not supported. This makes
1260 * client development easier */
1261
1262 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1263 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1264 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1265
1266 pa_stream_ref(s);
1267
1268 s->direction = direction;
1269
1270 if (sync_stream)
1271 s->syncid = sync_stream->syncid;
1272
1273 if (attr)
1274 s->buffer_attr = *attr;
1275 patch_buffer_attr(s, &s->buffer_attr, &flags);
1276
1277 s->flags = flags;
1278 s->corked = !!(flags & PA_STREAM_START_CORKED);
1279
1280 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1281 pa_usec_t x;
1282
1283 x = pa_rtclock_now();
1284
1285 pa_assert(!s->smoother);
1286 #ifdef USE_SMOOTHER_2
1287 s->smoother = pa_smoother_2_new(SMOOTHER_HISTORY_TIME, x, 0, 0);
1288 #else
1289 s->smoother = pa_smoother_new(
1290 SMOOTHER_ADJUST_TIME,
1291 SMOOTHER_HISTORY_TIME,
1292 !(flags & PA_STREAM_NOT_MONOTONIC),
1293 true,
1294 SMOOTHER_MIN_HISTORY,
1295 x,
1296 true);
1297 #endif
1298 }
1299
1300 if (!dev)
1301 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1302
1303 t = pa_tagstruct_command(
1304 s->context,
1305 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1306 &tag);
1307
1308 if (s->context->version < 13)
1309 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1310
1311 pa_tagstruct_put(
1312 t,
1313 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1314 PA_TAG_CHANNEL_MAP, &s->channel_map,
1315 PA_TAG_U32, PA_INVALID_INDEX,
1316 PA_TAG_STRING, dev,
1317 PA_TAG_U32, s->buffer_attr.maxlength,
1318 PA_TAG_BOOLEAN, s->corked,
1319 PA_TAG_INVALID);
1320
1321 if (!volume) {
1322 if (pa_sample_spec_valid(&s->sample_spec))
1323 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1324 else {
1325 /* This is not really relevant, since no volume was set, and
1326 * the real number of channels is embedded in the format_info
1327 * structure */
1328 volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1329 }
1330 }
1331
1332 if (s->direction == PA_STREAM_PLAYBACK) {
1333 pa_tagstruct_put(
1334 t,
1335 PA_TAG_U32, s->buffer_attr.tlength,
1336 PA_TAG_U32, s->buffer_attr.prebuf,
1337 PA_TAG_U32, s->buffer_attr.minreq,
1338 PA_TAG_U32, s->syncid,
1339 PA_TAG_INVALID);
1340
1341 pa_tagstruct_put_cvolume(t, volume);
1342 } else
1343 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1344
1345 if (s->context->version >= 12) {
1346 pa_tagstruct_put(
1347 t,
1348 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1349 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1350 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1351 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1352 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1353 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1354 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1355 PA_TAG_INVALID);
1356 }
1357
1358 if (s->context->version >= 13) {
1359
1360 if (s->direction == PA_STREAM_PLAYBACK)
1361 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1362 else
1363 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1364
1365 pa_tagstruct_put(
1366 t,
1367 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1368 PA_TAG_PROPLIST, s->proplist,
1369 PA_TAG_INVALID);
1370
1371 if (s->direction == PA_STREAM_RECORD)
1372 pa_tagstruct_putu32(t, s->direct_on_input);
1373 }
1374
1375 if (s->context->version >= 14) {
1376
1377 if (s->direction == PA_STREAM_PLAYBACK)
1378 pa_tagstruct_put_boolean(t, volume_set);
1379
1380 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1381 }
1382
1383 if (s->context->version >= 15) {
1384
1385 if (s->direction == PA_STREAM_PLAYBACK)
1386 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1387
1388 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1389 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1390 }
1391
1392 if (s->context->version >= 17 && s->direction == PA_STREAM_PLAYBACK)
1393 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1394
1395 if (s->context->version >= 18 && s->direction == PA_STREAM_PLAYBACK)
1396 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1397
1398 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1399 || s->context->version >= 22) {
1400
1401 pa_tagstruct_putu8(t, s->n_formats);
1402 for (i = 0; i < s->n_formats; i++)
1403 pa_tagstruct_put_format_info(t, s->req_formats[i]);
1404 }
1405
1406 if (s->context->version >= 22 && s->direction == PA_STREAM_RECORD) {
1407 pa_tagstruct_put_cvolume(t, volume);
1408 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1409 pa_tagstruct_put_boolean(t, volume_set);
1410 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1411 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1412 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1413 }
1414
1415 pa_pstream_send_tagstruct(s->context->pstream, t);
1416 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1417
1418 pa_stream_set_state(s, PA_STREAM_CREATING);
1419
1420 pa_stream_unref(s);
1421 return 0;
1422 }
1423
pa_stream_connect_playback(pa_stream * s,const char * dev,const pa_buffer_attr * attr,pa_stream_flags_t flags,const pa_cvolume * volume,pa_stream * sync_stream)1424 int pa_stream_connect_playback(
1425 pa_stream *s,
1426 const char *dev,
1427 const pa_buffer_attr *attr,
1428 pa_stream_flags_t flags,
1429 const pa_cvolume *volume,
1430 pa_stream *sync_stream) {
1431
1432 pa_assert(s);
1433 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1434
1435 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1436 }
1437
pa_stream_connect_record(pa_stream * s,const char * dev,const pa_buffer_attr * attr,pa_stream_flags_t flags)1438 int pa_stream_connect_record(
1439 pa_stream *s,
1440 const char *dev,
1441 const pa_buffer_attr *attr,
1442 pa_stream_flags_t flags) {
1443
1444 pa_assert(s);
1445 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1446
1447 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1448 }
1449
pa_stream_begin_write(pa_stream * s,void ** data,size_t * nbytes)1450 int pa_stream_begin_write(
1451 pa_stream *s,
1452 void **data,
1453 size_t *nbytes) {
1454
1455 pa_assert(s);
1456 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1457
1458 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1459 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1460 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1461 PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1462 PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1463
1464 if (*nbytes != (size_t) -1) {
1465 size_t m, fs;
1466
1467 m = pa_mempool_block_size_max(s->context->mempool);
1468 fs = pa_frame_size(&s->sample_spec);
1469
1470 m = (m / fs) * fs;
1471 if (*nbytes > m)
1472 *nbytes = m;
1473 }
1474
1475 if (!s->write_memblock) {
1476 s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1477 s->write_data = pa_memblock_acquire(s->write_memblock);
1478 }
1479
1480 *data = s->write_data;
1481 *nbytes = pa_memblock_get_length(s->write_memblock);
1482
1483 return 0;
1484 }
1485
pa_stream_cancel_write(pa_stream * s)1486 int pa_stream_cancel_write(
1487 pa_stream *s) {
1488
1489 pa_assert(s);
1490 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1491
1492 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1493 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1494 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1495 PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1496
1497 pa_assert(s->write_data);
1498
1499 pa_memblock_release(s->write_memblock);
1500 pa_memblock_unref(s->write_memblock);
1501 s->write_memblock = NULL;
1502 s->write_data = NULL;
1503
1504 return 0;
1505 }
1506
pa_stream_write_ext_free(pa_stream * s,const void * data,size_t length,pa_free_cb_t free_cb,void * free_cb_data,int64_t offset,pa_seek_mode_t seek)1507 int pa_stream_write_ext_free(
1508 pa_stream *s,
1509 const void *data,
1510 size_t length,
1511 pa_free_cb_t free_cb,
1512 void *free_cb_data,
1513 int64_t offset,
1514 pa_seek_mode_t seek) {
1515
1516 pa_assert(s);
1517 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1518 pa_assert(data);
1519
1520 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1521 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1522 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1523 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1524 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1525 PA_CHECK_VALIDITY(s->context,
1526 !s->write_memblock ||
1527 ((data >= s->write_data) &&
1528 ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1529 PA_ERR_INVALID);
1530 PA_CHECK_VALIDITY(s->context, offset % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID);
1531 PA_CHECK_VALIDITY(s->context, length % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID);
1532 PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1533
1534 if (s->write_memblock) {
1535 pa_memchunk chunk;
1536
1537 /* pa_stream_write_begin() was called before */
1538
1539 pa_memblock_release(s->write_memblock);
1540
1541 chunk.memblock = s->write_memblock;
1542 chunk.index = (const char *) data - (const char *) s->write_data;
1543 chunk.length = length;
1544
1545 s->write_memblock = NULL;
1546 s->write_data = NULL;
1547
1548 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1549 pa_memblock_unref(chunk.memblock);
1550
1551 } else {
1552 pa_seek_mode_t t_seek = seek;
1553 int64_t t_offset = offset;
1554 size_t t_length = length;
1555 const void *t_data = data;
1556
1557 /* pa_stream_write_begin() was not called before */
1558
1559 while (t_length > 0) {
1560 pa_memchunk chunk;
1561
1562 chunk.index = 0;
1563
1564 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1565 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, free_cb_data, 1);
1566 chunk.length = t_length;
1567 } else {
1568 void *d;
1569 size_t blk_size_max;
1570
1571 /* Break large audio streams into _aligned_ blocks or the
1572 * other endpoint will happily discard them upon arrival. */
1573 blk_size_max = pa_frame_align(pa_mempool_block_size_max(s->context->mempool), &s->sample_spec);
1574 chunk.length = PA_MIN(t_length, blk_size_max);
1575 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1576
1577 d = pa_memblock_acquire(chunk.memblock);
1578 memcpy(d, t_data, chunk.length);
1579 pa_memblock_release(chunk.memblock);
1580 }
1581
1582 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1583
1584 t_offset = 0;
1585 t_seek = PA_SEEK_RELATIVE;
1586
1587 t_data = (const uint8_t*) t_data + chunk.length;
1588 t_length -= chunk.length;
1589
1590 pa_memblock_unref(chunk.memblock);
1591 }
1592
1593 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1594 free_cb(free_cb_data);
1595 }
1596
1597 /* This is obviously wrong since we ignore the seeking index . But
1598 * that's OK, the server side applies the same error */
1599 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1600
1601 #ifdef STREAM_DEBUG
1602 pa_log_debug("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes);
1603 #endif
1604
1605 if (s->direction == PA_STREAM_PLAYBACK) {
1606
1607 /* Update latency request correction */
1608 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1609
1610 if (seek == PA_SEEK_ABSOLUTE) {
1611 s->write_index_corrections[s->current_write_index_correction].corrupt = false;
1612 s->write_index_corrections[s->current_write_index_correction].absolute = true;
1613 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1614 } else if (seek == PA_SEEK_RELATIVE) {
1615 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1616 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1617 } else
1618 s->write_index_corrections[s->current_write_index_correction].corrupt = true;
1619 }
1620
1621 /* Update the write index in the already available latency data */
1622 if (s->timing_info_valid) {
1623
1624 if (seek == PA_SEEK_ABSOLUTE) {
1625 s->timing_info.write_index_corrupt = false;
1626 s->timing_info.write_index = offset + (int64_t) length;
1627 } else if (seek == PA_SEEK_RELATIVE) {
1628 if (!s->timing_info.write_index_corrupt)
1629 s->timing_info.write_index += offset + (int64_t) length;
1630 } else
1631 s->timing_info.write_index_corrupt = true;
1632 }
1633
1634 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1635 request_auto_timing_update(s, true);
1636 }
1637
1638 return 0;
1639 }
1640
pa_stream_write(pa_stream * s,const void * data,size_t length,pa_free_cb_t free_cb,int64_t offset,pa_seek_mode_t seek)1641 int pa_stream_write(
1642 pa_stream *s,
1643 const void *data,
1644 size_t length,
1645 pa_free_cb_t free_cb,
1646 int64_t offset,
1647 pa_seek_mode_t seek) {
1648
1649 return pa_stream_write_ext_free(s, data, length, free_cb, (void*) data, offset, seek);
1650 }
1651
pa_stream_peek(pa_stream * s,const void ** data,size_t * length)1652 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1653 pa_assert(s);
1654 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1655 pa_assert(data);
1656 pa_assert(length);
1657
1658 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1659 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1660 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1661
1662 if (!s->peek_memchunk.memblock) {
1663
1664 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1665 /* record_memblockq is empty. */
1666 *data = NULL;
1667 *length = 0;
1668 return 0;
1669
1670 } else if (!s->peek_memchunk.memblock) {
1671 /* record_memblockq isn't empty, but it doesn't have any data at
1672 * the current read index. */
1673 *data = NULL;
1674 *length = s->peek_memchunk.length;
1675 return 0;
1676 }
1677
1678 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1679 }
1680
1681 pa_assert(s->peek_data);
1682 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1683 *length = s->peek_memchunk.length;
1684 return 0;
1685 }
1686
pa_stream_drop(pa_stream * s)1687 int pa_stream_drop(pa_stream *s) {
1688 pa_assert(s);
1689 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1690
1691 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1692 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1693 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1694 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.length > 0, PA_ERR_BADSTATE);
1695
1696 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1697
1698 /* Fix the simulated local read index */
1699 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1700 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1701
1702 if (s->peek_memchunk.memblock) {
1703 pa_assert(s->peek_data);
1704 s->peek_data = NULL;
1705 pa_memblock_release(s->peek_memchunk.memblock);
1706 pa_memblock_unref(s->peek_memchunk.memblock);
1707 }
1708
1709 pa_memchunk_reset(&s->peek_memchunk);
1710
1711 return 0;
1712 }
1713
pa_stream_writable_size(const pa_stream * s)1714 size_t pa_stream_writable_size(const pa_stream *s) {
1715 pa_assert(s);
1716 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1717
1718 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1719 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1720 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1721
1722 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1723 }
1724
pa_stream_readable_size(const pa_stream * s)1725 size_t pa_stream_readable_size(const pa_stream *s) {
1726 pa_assert(s);
1727 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1728
1729 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1730 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1731 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1732
1733 return pa_memblockq_get_length(s->record_memblockq);
1734 }
1735
pa_stream_drain(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)1736 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1737 pa_operation *o;
1738 pa_tagstruct *t;
1739 uint32_t tag;
1740
1741 pa_assert(s);
1742 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1743
1744 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1745 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1746 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1747
1748 /* Ask for a timing update before we cork/uncork to get the best
1749 * accuracy for the transport latency suitable for the
1750 * check_smoother_status() call in the started callback */
1751 request_auto_timing_update(s, true);
1752
1753 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1754
1755 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1756 pa_tagstruct_putu32(t, s->channel);
1757 pa_pstream_send_tagstruct(s->context->pstream, t);
1758 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1759
1760 /* This might cause the read index to continue again, hence
1761 * let's request a timing update */
1762 request_auto_timing_update(s, true);
1763
1764 return o;
1765 }
1766
calc_time(const pa_stream * s,bool ignore_transport)1767 static pa_usec_t calc_time(const pa_stream *s, bool ignore_transport) {
1768 pa_usec_t usec;
1769
1770 pa_assert(s);
1771 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1772 pa_assert(s->state == PA_STREAM_READY);
1773 pa_assert(s->direction != PA_STREAM_UPLOAD);
1774 pa_assert(s->timing_info_valid);
1775 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1776 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1777
1778 if (s->direction == PA_STREAM_PLAYBACK) {
1779 /* The last byte that was written into the output device
1780 * had this time value associated */
1781 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1782
1783 if (!s->corked && !s->suspended) {
1784
1785 if (!ignore_transport)
1786 /* Because the latency info took a little time to come
1787 * to us, we assume that the real output time is actually
1788 * a little ahead */
1789 usec += s->timing_info.transport_usec;
1790
1791 /* However, the output device usually maintains a buffer
1792 too, hence the real sample currently played is a little
1793 back */
1794 if (s->timing_info.sink_usec >= usec)
1795 usec = 0;
1796 else
1797 usec -= s->timing_info.sink_usec;
1798 }
1799
1800 } else {
1801 pa_assert(s->direction == PA_STREAM_RECORD);
1802
1803 /* The last byte written into the server side queue had
1804 * this time value associated */
1805 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1806
1807 if (!s->corked && !s->suspended) {
1808
1809 if (!ignore_transport)
1810 /* Add transport latency */
1811 usec += s->timing_info.transport_usec;
1812
1813 /* Add latency of data in device buffer */
1814 usec += s->timing_info.source_usec;
1815
1816 /* If this is a monitor source, we need to correct the
1817 * time by the playback device buffer */
1818 if (s->timing_info.sink_usec >= usec)
1819 usec = 0;
1820 else
1821 usec -= s->timing_info.sink_usec;
1822 }
1823 }
1824
1825 return usec;
1826 }
1827
1828 #ifdef USE_SMOOTHER_2
calc_bytes(pa_stream * s,bool ignore_transport)1829 static inline uint64_t calc_bytes(pa_stream *s, bool ignore_transport) {
1830 return (uint64_t)(calc_time(s, ignore_transport) * s->sample_spec.rate / PA_USEC_PER_SEC * pa_frame_size(&s->sample_spec));
1831 }
1832 #endif
1833
stream_get_timing_info_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1834 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1835 pa_operation *o = userdata;
1836 struct timeval local, remote, now;
1837 pa_timing_info *i;
1838 bool playing = false;
1839 uint64_t underrun_for = 0, playing_for = 0;
1840
1841 pa_assert(pd);
1842 pa_assert(o);
1843 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1844
1845 if (!o->context || !o->stream)
1846 goto finish;
1847
1848 i = &o->stream->timing_info;
1849
1850 o->stream->timing_info_valid = false;
1851 i->write_index_corrupt = true;
1852 i->read_index_corrupt = true;
1853
1854 if (command != PA_COMMAND_REPLY) {
1855 if (pa_context_handle_error(o->context, command, t, false) < 0)
1856 goto finish;
1857
1858 } else {
1859
1860 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1861 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1862 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1863 pa_tagstruct_get_timeval(t, &local) < 0 ||
1864 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1865 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1866 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1867
1868 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1869 goto finish;
1870 }
1871
1872 if (o->context->version >= 13 &&
1873 o->stream->direction == PA_STREAM_PLAYBACK)
1874 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1875 pa_tagstruct_getu64(t, &playing_for) < 0) {
1876
1877 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1878 goto finish;
1879 }
1880
1881 if (!pa_tagstruct_eof(t)) {
1882 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1883 goto finish;
1884 }
1885 o->stream->timing_info_valid = true;
1886 i->write_index_corrupt = false;
1887 i->read_index_corrupt = false;
1888
1889 i->playing = (int) playing;
1890 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1891
1892 pa_gettimeofday(&now);
1893
1894 /* Calculate timestamps */
1895 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1896 /* local and remote seem to have synchronized clocks */
1897
1898 if (o->stream->direction == PA_STREAM_PLAYBACK)
1899 i->transport_usec = pa_timeval_diff(&remote, &local);
1900 else
1901 i->transport_usec = pa_timeval_diff(&now, &remote);
1902
1903 i->synchronized_clocks = true;
1904 i->timestamp = remote;
1905 } else {
1906 /* clocks are not synchronized, let's estimate latency then */
1907 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1908 i->synchronized_clocks = false;
1909 i->timestamp = local;
1910 pa_timeval_add(&i->timestamp, i->transport_usec);
1911 }
1912
1913 /* Invalidate read and write indexes if necessary */
1914 if (tag < o->stream->read_index_not_before)
1915 i->read_index_corrupt = true;
1916
1917 if (tag < o->stream->write_index_not_before)
1918 i->write_index_corrupt = true;
1919
1920 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1921 /* Write index correction */
1922
1923 int n, j;
1924 uint32_t ctag = tag;
1925
1926 /* Go through the saved correction values and add up the
1927 * total correction.*/
1928 for (n = 0, j = o->stream->current_write_index_correction+1;
1929 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1930 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1931
1932 /* Step over invalid data or out-of-date data */
1933 if (!o->stream->write_index_corrections[j].valid ||
1934 o->stream->write_index_corrections[j].tag < ctag)
1935 continue;
1936
1937 /* Make sure that everything is in order */
1938 ctag = o->stream->write_index_corrections[j].tag+1;
1939
1940 /* Now fix the write index */
1941 if (o->stream->write_index_corrections[j].corrupt) {
1942 /* A corrupting seek was made */
1943 i->write_index_corrupt = true;
1944 } else if (o->stream->write_index_corrections[j].absolute) {
1945 /* An absolute seek was made */
1946 i->write_index = o->stream->write_index_corrections[j].value;
1947 i->write_index_corrupt = false;
1948 } else if (!i->write_index_corrupt) {
1949 /* A relative seek was made */
1950 i->write_index += o->stream->write_index_corrections[j].value;
1951 }
1952 }
1953
1954 /* Clear old correction entries */
1955 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1956 if (!o->stream->write_index_corrections[n].valid)
1957 continue;
1958
1959 if (o->stream->write_index_corrections[n].tag <= tag)
1960 o->stream->write_index_corrections[n].valid = false;
1961 }
1962 }
1963
1964 if (o->stream->direction == PA_STREAM_RECORD) {
1965 /* Read index correction */
1966
1967 if (!i->read_index_corrupt)
1968 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1969 }
1970
1971 /* Update smoother if we're not corked */
1972 if (o->stream->smoother && !o->stream->corked) {
1973 pa_usec_t u, x;
1974
1975 u = x = pa_rtclock_now() - i->transport_usec;
1976
1977 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1978 pa_usec_t su;
1979
1980 /* If we weren't playing then it will take some time
1981 * until the audio will actually come out through the
1982 * speakers. Since we follow that timing here, we need
1983 * to try to fix this up */
1984
1985 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1986
1987 if (su < i->sink_usec)
1988 x += i->sink_usec - su;
1989 }
1990
1991 if (!i->playing)
1992 #ifdef USE_SMOOTHER_2
1993 pa_smoother_2_pause(o->stream->smoother, x);
1994 #else
1995 pa_smoother_pause(o->stream->smoother, x);
1996 #endif
1997
1998 /* Update the smoother */
1999 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
2000 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
2001 #ifdef USE_SMOOTHER_2
2002 pa_smoother_2_put(o->stream->smoother, u, calc_bytes(o->stream, true));
2003 #else
2004 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, true));
2005 #endif
2006
2007 if (i->playing)
2008 #ifdef USE_SMOOTHER_2
2009 pa_smoother_2_resume(o->stream->smoother, x);
2010 #else
2011 pa_smoother_resume(o->stream->smoother, x, true);
2012 #endif
2013 }
2014 }
2015
2016 o->stream->auto_timing_update_requested = false;
2017
2018 if (o->stream->latency_update_callback)
2019 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
2020
2021 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
2022 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2023 cb(o->stream, o->stream->timing_info_valid, o->userdata);
2024 }
2025
2026 finish:
2027
2028 pa_operation_done(o);
2029 pa_operation_unref(o);
2030 }
2031
pa_stream_update_timing_info(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)2032 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2033 uint32_t tag;
2034 pa_operation *o;
2035 pa_tagstruct *t;
2036 struct timeval now;
2037 int cidx = 0;
2038
2039 pa_assert(s);
2040 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2041
2042 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2043 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2044 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2045
2046 if (s->direction == PA_STREAM_PLAYBACK) {
2047 /* Find a place to store the write_index correction data for this entry */
2048 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
2049
2050 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
2051 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
2052 }
2053 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2054
2055 t = pa_tagstruct_command(
2056 s->context,
2057 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
2058 &tag);
2059 pa_tagstruct_putu32(t, s->channel);
2060 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
2061
2062 pa_pstream_send_tagstruct(s->context->pstream, t);
2063 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2064
2065 if (s->direction == PA_STREAM_PLAYBACK) {
2066 /* Fill in initial correction data */
2067
2068 s->current_write_index_correction = cidx;
2069
2070 s->write_index_corrections[cidx].valid = true;
2071 s->write_index_corrections[cidx].absolute = false;
2072 s->write_index_corrections[cidx].corrupt = false;
2073 s->write_index_corrections[cidx].tag = tag;
2074 s->write_index_corrections[cidx].value = 0;
2075 }
2076
2077 return o;
2078 }
2079
pa_stream_disconnect_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)2080 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2081 pa_stream *s = userdata;
2082
2083 pa_assert(pd);
2084 pa_assert(s);
2085 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2086
2087 pa_stream_ref(s);
2088
2089 if (command != PA_COMMAND_REPLY) {
2090 if (pa_context_handle_error(s->context, command, t, false) < 0)
2091 goto finish;
2092
2093 pa_stream_set_state(s, PA_STREAM_FAILED);
2094 goto finish;
2095 } else if (!pa_tagstruct_eof(t)) {
2096 pa_context_fail(s->context, PA_ERR_PROTOCOL);
2097 goto finish;
2098 }
2099
2100 pa_stream_set_state(s, PA_STREAM_TERMINATED);
2101
2102 finish:
2103 pa_stream_unref(s);
2104 }
2105
pa_stream_disconnect(pa_stream * s)2106 int pa_stream_disconnect(pa_stream *s) {
2107 pa_tagstruct *t;
2108 uint32_t tag;
2109
2110 pa_assert(s);
2111 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2112
2113 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2114 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
2115 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
2116
2117 pa_stream_ref(s);
2118
2119 t = pa_tagstruct_command(
2120 s->context,
2121 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
2122 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
2123 &tag);
2124 pa_tagstruct_putu32(t, s->channel);
2125 pa_pstream_send_tagstruct(s->context->pstream, t);
2126 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
2127
2128 pa_stream_unref(s);
2129 return 0;
2130 }
2131
pa_stream_set_read_callback(pa_stream * s,pa_stream_request_cb_t cb,void * userdata)2132 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2133 pa_assert(s);
2134 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2135
2136 if (pa_detect_fork())
2137 return;
2138
2139 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2140 return;
2141
2142 s->read_callback = cb;
2143 s->read_userdata = userdata;
2144 }
2145
pa_stream_set_write_callback(pa_stream * s,pa_stream_request_cb_t cb,void * userdata)2146 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2147 pa_assert(s);
2148 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2149
2150 if (pa_detect_fork())
2151 return;
2152
2153 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2154 return;
2155
2156 s->write_callback = cb;
2157 s->write_userdata = userdata;
2158 }
2159
pa_stream_set_state_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2160 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2161 pa_assert(s);
2162 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2163
2164 if (pa_detect_fork())
2165 return;
2166
2167 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2168 return;
2169
2170 s->state_callback = cb;
2171 s->state_userdata = userdata;
2172 }
2173
pa_stream_set_overflow_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2174 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2175 pa_assert(s);
2176 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2177
2178 if (pa_detect_fork())
2179 return;
2180
2181 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2182 return;
2183
2184 s->overflow_callback = cb;
2185 s->overflow_userdata = userdata;
2186 }
2187
pa_stream_set_underflow_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2188 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2189 pa_assert(s);
2190 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2191
2192 if (pa_detect_fork())
2193 return;
2194
2195 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2196 return;
2197
2198 s->underflow_callback = cb;
2199 s->underflow_userdata = userdata;
2200 }
2201
pa_stream_set_underflow_ohos_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2202 void pa_stream_set_underflow_ohos_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2203 pa_assert(s);
2204 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2205
2206 if (pa_detect_fork())
2207 return;
2208
2209 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2210 return;
2211
2212 s->underflow_ohos_callback = cb;
2213 s->underflow_ohos_userdata = userdata;
2214 }
2215
pa_stream_set_latency_update_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2216 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2217 pa_assert(s);
2218 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2219
2220 if (pa_detect_fork())
2221 return;
2222
2223 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2224 return;
2225
2226 s->latency_update_callback = cb;
2227 s->latency_update_userdata = userdata;
2228 }
2229
pa_stream_set_moved_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2230 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2231 pa_assert(s);
2232 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2233
2234 if (pa_detect_fork())
2235 return;
2236
2237 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2238 return;
2239
2240 s->moved_callback = cb;
2241 s->moved_userdata = userdata;
2242 }
2243
pa_stream_set_suspended_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2244 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2245 pa_assert(s);
2246 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2247
2248 if (pa_detect_fork())
2249 return;
2250
2251 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2252 return;
2253
2254 s->suspended_callback = cb;
2255 s->suspended_userdata = userdata;
2256 }
2257
pa_stream_set_started_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2258 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2259 pa_assert(s);
2260 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2261
2262 if (pa_detect_fork())
2263 return;
2264
2265 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2266 return;
2267
2268 s->started_callback = cb;
2269 s->started_userdata = userdata;
2270 }
2271
pa_stream_set_event_callback(pa_stream * s,pa_stream_event_cb_t cb,void * userdata)2272 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2273 pa_assert(s);
2274 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2275
2276 if (pa_detect_fork())
2277 return;
2278
2279 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2280 return;
2281
2282 s->event_callback = cb;
2283 s->event_userdata = userdata;
2284 }
2285
pa_stream_set_buffer_attr_callback(pa_stream * s,pa_stream_notify_cb_t cb,void * userdata)2286 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2287 pa_assert(s);
2288 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2289
2290 if (pa_detect_fork())
2291 return;
2292
2293 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2294 return;
2295
2296 s->buffer_attr_callback = cb;
2297 s->buffer_attr_userdata = userdata;
2298 }
2299
pa_stream_simple_ack_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)2300 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2301 pa_operation *o = userdata;
2302 int success = 1;
2303
2304 pa_assert(pd);
2305 pa_assert(o);
2306 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2307
2308 if (!o->context)
2309 goto finish;
2310
2311 if (command != PA_COMMAND_REPLY) {
2312 if (pa_context_handle_error(o->context, command, t, false) < 0)
2313 goto finish;
2314
2315 success = 0;
2316 } else if (!pa_tagstruct_eof(t)) {
2317 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2318 goto finish;
2319 }
2320
2321 if (o->callback) {
2322 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2323 cb(o->stream, success, o->userdata);
2324 }
2325
2326 finish:
2327 pa_operation_done(o);
2328 pa_operation_unref(o);
2329 }
2330
pa_stream_cork(pa_stream * s,int b,pa_stream_success_cb_t cb,void * userdata)2331 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2332 pa_operation *o;
2333 pa_tagstruct *t;
2334 uint32_t tag;
2335
2336 pa_assert(s);
2337 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2338
2339 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2340 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2341 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2342
2343 /* Ask for a timing update before we cork/uncork to get the best
2344 * accuracy for the transport latency suitable for the
2345 * check_smoother_status() call in the started callback */
2346 request_auto_timing_update(s, true);
2347
2348 s->corked = b;
2349
2350 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2351
2352 t = pa_tagstruct_command(
2353 s->context,
2354 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2355 &tag);
2356 pa_tagstruct_putu32(t, s->channel);
2357 pa_tagstruct_put_boolean(t, !!b);
2358 pa_pstream_send_tagstruct(s->context->pstream, t);
2359 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2360
2361 check_smoother_status(s, false, false, false);
2362
2363 /* This might cause the indexes to hang/start again, hence let's
2364 * request a timing update, after the cork/uncork, too */
2365 request_auto_timing_update(s, true);
2366
2367 return o;
2368 }
2369
stream_send_simple_command(pa_stream * s,uint32_t command,pa_stream_success_cb_t cb,void * userdata)2370 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2371 pa_tagstruct *t;
2372 pa_operation *o;
2373 uint32_t tag;
2374
2375 pa_assert(s);
2376 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2377
2378 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2379 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2380
2381 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2382
2383 t = pa_tagstruct_command(s->context, command, &tag);
2384 pa_tagstruct_putu32(t, s->channel);
2385 pa_pstream_send_tagstruct(s->context->pstream, t);
2386 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2387
2388 return o;
2389 }
2390
pa_stream_flush(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)2391 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2392 pa_operation *o;
2393
2394 pa_assert(s);
2395 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2396
2397 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2398 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2399 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2400
2401 /* Ask for a timing update *before* the flush, so that the
2402 * transport usec is as up to date as possible when we get the
2403 * underflow message and update the smoother status*/
2404 request_auto_timing_update(s, true);
2405
2406 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2407 return NULL;
2408
2409 if (s->direction == PA_STREAM_PLAYBACK) {
2410
2411 if (s->write_index_corrections[s->current_write_index_correction].valid)
2412 s->write_index_corrections[s->current_write_index_correction].corrupt = true;
2413
2414 if (s->buffer_attr.prebuf > 0)
2415 check_smoother_status(s, false, false, true);
2416
2417 /* This will change the write index, but leave the
2418 * read index untouched. */
2419 invalidate_indexes(s, false, true);
2420
2421 } else
2422 /* For record streams this has no influence on the write
2423 * index, but the read index might jump. */
2424 invalidate_indexes(s, true, false);
2425
2426 /* Note that we do not update requested_bytes here. This is
2427 * because we cannot really know how data actually was dropped
2428 * from the write index due to this. This 'error' will be applied
2429 * by both client and server and hence we should be fine. */
2430
2431 return o;
2432 }
2433
pa_stream_prebuf(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)2434 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2435 pa_operation *o;
2436
2437 pa_assert(s);
2438 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2439
2440 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2441 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2442 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2443 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2444
2445 /* Ask for a timing update before we cork/uncork to get the best
2446 * accuracy for the transport latency suitable for the
2447 * check_smoother_status() call in the started callback */
2448 request_auto_timing_update(s, true);
2449
2450 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2451 return NULL;
2452
2453 /* This might cause the read index to hang again, hence
2454 * let's request a timing update */
2455 request_auto_timing_update(s, true);
2456
2457 return o;
2458 }
2459
pa_stream_trigger(pa_stream * s,pa_stream_success_cb_t cb,void * userdata)2460 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2461 pa_operation *o;
2462
2463 pa_assert(s);
2464 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2465
2466 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2467 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2468 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2469 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2470
2471 /* Ask for a timing update before we cork/uncork to get the best
2472 * accuracy for the transport latency suitable for the
2473 * check_smoother_status() call in the started callback */
2474 request_auto_timing_update(s, true);
2475
2476 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2477 return NULL;
2478
2479 /* This might cause the read index to start moving again, hence
2480 * let's request a timing update */
2481 request_auto_timing_update(s, true);
2482
2483 return o;
2484 }
2485
pa_stream_set_name(pa_stream * s,const char * name,pa_stream_success_cb_t cb,void * userdata)2486 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2487 pa_operation *o;
2488
2489 pa_assert(s);
2490 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2491 pa_assert(name);
2492
2493 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2494 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2495 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2496
2497 if (s->context->version >= 13) {
2498 pa_proplist *p = pa_proplist_new();
2499
2500 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2501 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2502 pa_proplist_free(p);
2503 } else {
2504 pa_tagstruct *t;
2505 uint32_t tag;
2506
2507 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2508 t = pa_tagstruct_command(
2509 s->context,
2510 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2511 &tag);
2512 pa_tagstruct_putu32(t, s->channel);
2513 pa_tagstruct_puts(t, name);
2514 pa_pstream_send_tagstruct(s->context->pstream, t);
2515 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2516 }
2517
2518 return o;
2519 }
2520
pa_stream_get_time(pa_stream * s,pa_usec_t * r_usec)2521 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2522 pa_usec_t usec;
2523
2524 pa_assert(s);
2525 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2526
2527 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2528 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2529 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2530 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2531 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2532 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2533
2534 if (s->smoother)
2535 #ifdef USE_SMOOTHER_2
2536 usec = pa_smoother_2_get(s->smoother, pa_rtclock_now());
2537 #else
2538 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2539 #endif
2540
2541 else
2542 usec = calc_time(s, false);
2543
2544 /* Make sure the time runs monotonically */
2545 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2546 if (usec < s->previous_time)
2547 usec = s->previous_time;
2548 else
2549 s->previous_time = usec;
2550 }
2551
2552 if (r_usec)
2553 *r_usec = usec;
2554
2555 return 0;
2556 }
2557
time_counter_diff(const pa_stream * s,pa_usec_t a,pa_usec_t b,int * negative)2558 static pa_usec_t time_counter_diff(const pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2559 pa_assert(s);
2560 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2561
2562 if (negative)
2563 *negative = 0;
2564
2565 if (a >= b)
2566 return a-b;
2567 else {
2568 if (negative && s->direction == PA_STREAM_RECORD) {
2569 *negative = 1;
2570 return b-a;
2571 } else
2572 return 0;
2573 }
2574 }
2575
pa_stream_get_latency(pa_stream * s,pa_usec_t * r_usec,int * negative)2576 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2577 pa_usec_t t, c;
2578 int r;
2579 int64_t cindex;
2580
2581 pa_assert(s);
2582 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2583 pa_assert(r_usec);
2584
2585 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2586 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2587 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2588 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2589 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2590 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2591
2592 if ((r = pa_stream_get_time(s, &t)) < 0)
2593 return r;
2594
2595 if (s->direction == PA_STREAM_PLAYBACK)
2596 cindex = s->timing_info.write_index;
2597 else
2598 cindex = s->timing_info.read_index;
2599
2600 if (cindex < 0)
2601 cindex = 0;
2602
2603 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2604
2605 if (s->direction == PA_STREAM_PLAYBACK)
2606 *r_usec = time_counter_diff(s, c, t, negative);
2607 else
2608 *r_usec = time_counter_diff(s, t, c, negative);
2609
2610 return 0;
2611 }
2612
pa_stream_get_timing_info(pa_stream * s)2613 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2614 pa_assert(s);
2615 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2616
2617 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2618 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2619 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2620 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2621
2622 return &s->timing_info;
2623 }
2624
pa_stream_get_sample_spec(pa_stream * s)2625 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2626 pa_assert(s);
2627 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2628
2629 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2630
2631 return &s->sample_spec;
2632 }
2633
pa_stream_get_channel_map(pa_stream * s)2634 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2635 pa_assert(s);
2636 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2637
2638 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2639
2640 return &s->channel_map;
2641 }
2642
pa_stream_get_format_info(const pa_stream * s)2643 const pa_format_info* pa_stream_get_format_info(const pa_stream *s) {
2644 pa_assert(s);
2645 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2646
2647 /* We don't have the format till routing is done */
2648 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2649 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2650
2651 return s->format;
2652 }
pa_stream_get_buffer_attr(pa_stream * s)2653 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2654 pa_assert(s);
2655 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2656
2657 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2658 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2659 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2660
2661 return &s->buffer_attr;
2662 }
2663
stream_set_buffer_attr_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)2664 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2665 pa_operation *o = userdata;
2666 int success = 1;
2667
2668 pa_assert(pd);
2669 pa_assert(o);
2670 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2671
2672 if (!o->context)
2673 goto finish;
2674
2675 if (command != PA_COMMAND_REPLY) {
2676 if (pa_context_handle_error(o->context, command, t, false) < 0)
2677 goto finish;
2678
2679 success = 0;
2680 } else {
2681 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2682 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2683 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2684 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2685 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2686 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2687 goto finish;
2688 }
2689 } else if (o->stream->direction == PA_STREAM_RECORD) {
2690 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2691 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2692 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2693 goto finish;
2694 }
2695 }
2696
2697 if (o->stream->context->version >= 13) {
2698 pa_usec_t usec;
2699
2700 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2701 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2702 goto finish;
2703 }
2704
2705 if (o->stream->direction == PA_STREAM_RECORD)
2706 o->stream->timing_info.configured_source_usec = usec;
2707 else
2708 o->stream->timing_info.configured_sink_usec = usec;
2709 }
2710
2711 if (!pa_tagstruct_eof(t)) {
2712 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2713 goto finish;
2714 }
2715 }
2716
2717 if (o->callback) {
2718 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2719 cb(o->stream, success, o->userdata);
2720 }
2721
2722 finish:
2723 pa_operation_done(o);
2724 pa_operation_unref(o);
2725 }
2726
pa_stream_set_buffer_attr(pa_stream * s,const pa_buffer_attr * attr,pa_stream_success_cb_t cb,void * userdata)2727 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2728 pa_operation *o;
2729 pa_tagstruct *t;
2730 uint32_t tag;
2731 pa_buffer_attr copy;
2732
2733 pa_assert(s);
2734 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2735 pa_assert(attr);
2736
2737 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2738 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2739 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2740 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2741
2742 /* Ask for a timing update before we cork/uncork to get the best
2743 * accuracy for the transport latency suitable for the
2744 * check_smoother_status() call in the started callback */
2745 request_auto_timing_update(s, true);
2746
2747 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2748
2749 t = pa_tagstruct_command(
2750 s->context,
2751 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2752 &tag);
2753 pa_tagstruct_putu32(t, s->channel);
2754
2755 copy = *attr;
2756 patch_buffer_attr(s, ©, NULL);
2757 attr = ©
2758
2759 pa_tagstruct_putu32(t, attr->maxlength);
2760
2761 if (s->direction == PA_STREAM_PLAYBACK)
2762 pa_tagstruct_put(
2763 t,
2764 PA_TAG_U32, attr->tlength,
2765 PA_TAG_U32, attr->prebuf,
2766 PA_TAG_U32, attr->minreq,
2767 PA_TAG_INVALID);
2768 else
2769 pa_tagstruct_putu32(t, attr->fragsize);
2770
2771 if (s->context->version >= 13)
2772 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2773
2774 if (s->context->version >= 14)
2775 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2776
2777 pa_pstream_send_tagstruct(s->context->pstream, t);
2778 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2779
2780 /* This might cause changes in the read/write index, hence let's
2781 * request a timing update */
2782 request_auto_timing_update(s, true);
2783
2784 return o;
2785 }
2786
pa_stream_get_device_index(const pa_stream * s)2787 uint32_t pa_stream_get_device_index(const pa_stream *s) {
2788 pa_assert(s);
2789 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2790
2791 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2792 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2793 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2794 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2795 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2796
2797 return s->device_index;
2798 }
2799
pa_stream_get_device_name(const pa_stream * s)2800 const char *pa_stream_get_device_name(const pa_stream *s) {
2801 pa_assert(s);
2802 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2803
2804 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2805 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2806 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2807 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2808 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2809
2810 return s->device_name;
2811 }
2812
pa_stream_is_suspended(const pa_stream * s)2813 int pa_stream_is_suspended(const pa_stream *s) {
2814 pa_assert(s);
2815 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2816
2817 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2818 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2819 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2820 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2821
2822 return s->suspended;
2823 }
2824
pa_stream_is_corked(const pa_stream * s)2825 int pa_stream_is_corked(const pa_stream *s) {
2826 pa_assert(s);
2827 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2828
2829 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2830 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2831 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2832
2833 return s->corked;
2834 }
2835
stream_update_sample_rate_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)2836 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2837 pa_operation *o = userdata;
2838 int success = 1;
2839
2840 pa_assert(pd);
2841 pa_assert(o);
2842 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2843
2844 if (!o->context)
2845 goto finish;
2846
2847 if (command != PA_COMMAND_REPLY) {
2848 if (pa_context_handle_error(o->context, command, t, false) < 0)
2849 goto finish;
2850
2851 success = 0;
2852 } else {
2853
2854 if (!pa_tagstruct_eof(t)) {
2855 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2856 goto finish;
2857 }
2858 }
2859
2860 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2861 #ifdef USE_SMOOTHER_2
2862 if (o->stream->smoother)
2863 pa_smoother_2_set_rate(o->stream->smoother, pa_rtclock_now(), o->stream->sample_spec.rate);
2864 #endif
2865 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2866
2867 if (o->callback) {
2868 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2869 cb(o->stream, success, o->userdata);
2870 }
2871
2872 finish:
2873 pa_operation_done(o);
2874 pa_operation_unref(o);
2875 }
2876
pa_stream_update_sample_rate(pa_stream * s,uint32_t rate,pa_stream_success_cb_t cb,void * userdata)2877 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2878 pa_operation *o;
2879 pa_tagstruct *t;
2880 uint32_t tag;
2881
2882 pa_assert(s);
2883 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2884
2885 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2886 PA_CHECK_VALIDITY_RETURN_NULL(s->context, pa_sample_rate_valid(rate), PA_ERR_INVALID);
2887 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2888 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2889 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2890 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2891
2892 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2893 o->private = PA_UINT_TO_PTR(rate);
2894
2895 t = pa_tagstruct_command(
2896 s->context,
2897 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2898 &tag);
2899 pa_tagstruct_putu32(t, s->channel);
2900 pa_tagstruct_putu32(t, rate);
2901
2902 pa_pstream_send_tagstruct(s->context->pstream, t);
2903 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2904
2905 return o;
2906 }
2907
pa_stream_proplist_update(pa_stream * s,pa_update_mode_t mode,pa_proplist * p,pa_stream_success_cb_t cb,void * userdata)2908 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2909 pa_operation *o;
2910 pa_tagstruct *t;
2911 uint32_t tag;
2912
2913 pa_assert(s);
2914 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2915
2916 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2917 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2918 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2919 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2920 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2921
2922 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2923
2924 t = pa_tagstruct_command(
2925 s->context,
2926 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2927 &tag);
2928 pa_tagstruct_putu32(t, s->channel);
2929 pa_tagstruct_putu32(t, (uint32_t) mode);
2930 pa_tagstruct_put_proplist(t, p);
2931
2932 pa_pstream_send_tagstruct(s->context->pstream, t);
2933 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2934
2935 /* Please note that we don't update s->proplist here, because we
2936 * don't export that field */
2937
2938 return o;
2939 }
2940
pa_stream_proplist_remove(pa_stream * s,const char * const keys[],pa_stream_success_cb_t cb,void * userdata)2941 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2942 pa_operation *o;
2943 pa_tagstruct *t;
2944 uint32_t tag;
2945 const char * const*k;
2946
2947 pa_assert(s);
2948 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2949
2950 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2951 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2952 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2953 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2954 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2955
2956 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2957
2958 t = pa_tagstruct_command(
2959 s->context,
2960 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2961 &tag);
2962 pa_tagstruct_putu32(t, s->channel);
2963
2964 for (k = keys; *k; k++)
2965 pa_tagstruct_puts(t, *k);
2966
2967 pa_tagstruct_puts(t, NULL);
2968
2969 pa_pstream_send_tagstruct(s->context->pstream, t);
2970 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2971
2972 /* Please note that we don't update s->proplist here, because we
2973 * don't export that field */
2974
2975 return o;
2976 }
2977
pa_stream_set_monitor_stream(pa_stream * s,uint32_t sink_input_idx)2978 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2979 pa_assert(s);
2980 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2981
2982 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2983 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2984 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2985 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2986
2987 s->direct_on_input = sink_input_idx;
2988
2989 return 0;
2990 }
2991
pa_stream_get_monitor_stream(const pa_stream * s)2992 uint32_t pa_stream_get_monitor_stream(const pa_stream *s) {
2993 pa_assert(s);
2994 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2995
2996 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2997 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2998 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2999
3000 return s->direct_on_input;
3001 }
3002