1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2009 Intel Corporation
5 Contributor: Pierre-Louis Bossart <pierre-louis.bossart@intel.com>
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24
25 #include <stdio.h>
26
27 #include <pulse/xmalloc.h>
28
29 #include <pulsecore/sink-input.h>
30 #include <pulsecore/module.h>
31 #include <pulsecore/modargs.h>
32 #include <pulsecore/namereg.h>
33 #include <pulsecore/log.h>
34 #include <pulsecore/core-util.h>
35
36 #include <pulse/rtclock.h>
37 #include <pulse/timeval.h>
38
39 PA_MODULE_AUTHOR("Pierre-Louis Bossart");
40 PA_MODULE_DESCRIPTION("Loopback from source to sink");
41 PA_MODULE_VERSION(PACKAGE_VERSION);
42 PA_MODULE_LOAD_ONCE(false);
43 PA_MODULE_USAGE(
44 "source=<source to connect to> "
45 "sink=<sink to connect to> "
46 "adjust_time=<how often to readjust rates in s> "
47 "latency_msec=<latency in ms> "
48 "max_latency_msec=<maximum latency in ms> "
49 "fast_adjust_threshold_msec=<threshold for fast adjust in ms> "
50 "format=<sample format> "
51 "rate=<sample rate> "
52 "channels=<number of channels> "
53 "channel_map=<channel map> "
54 "sink_input_properties=<proplist> "
55 "source_output_properties=<proplist> "
56 "source_dont_move=<boolean> "
57 "sink_dont_move=<boolean> "
58 "remix=<remix channels?> ");
59
60 #define DEFAULT_LATENCY_MSEC 200
61
62 #define MEMBLOCKQ_MAXLENGTH (1024*1024*32)
63
64 #define MIN_DEVICE_LATENCY (2.5*PA_USEC_PER_MSEC)
65
66 #define DEFAULT_ADJUST_TIME_USEC (10*PA_USEC_PER_SEC)
67
68 typedef struct loopback_msg loopback_msg;
69
70 struct userdata {
71 pa_core *core;
72 pa_module *module;
73
74 loopback_msg *msg;
75
76 pa_sink_input *sink_input;
77 pa_source_output *source_output;
78
79 pa_asyncmsgq *asyncmsgq;
80 pa_memblockq *memblockq;
81
82 pa_rtpoll_item *rtpoll_item_read, *rtpoll_item_write;
83
84 pa_time_event *time_event;
85
86 /* Variables used to calculate the average time between
87 * subsequent calls of adjust_rates() */
88 pa_usec_t adjust_time_stamp;
89 pa_usec_t real_adjust_time;
90 pa_usec_t real_adjust_time_sum;
91
92 /* Values from command line configuration */
93 pa_usec_t latency;
94 pa_usec_t max_latency;
95 pa_usec_t adjust_time;
96 pa_usec_t fast_adjust_threshold;
97
98 /* Latency boundaries and current values */
99 pa_usec_t min_source_latency;
100 pa_usec_t max_source_latency;
101 pa_usec_t min_sink_latency;
102 pa_usec_t max_sink_latency;
103 pa_usec_t configured_sink_latency;
104 pa_usec_t configured_source_latency;
105 int64_t source_latency_offset;
106 int64_t sink_latency_offset;
107 pa_usec_t minimum_latency;
108
109 /* lower latency limit found by underruns */
110 pa_usec_t underrun_latency_limit;
111
112 /* Various counters */
113 uint32_t iteration_counter;
114 uint32_t underrun_counter;
115 uint32_t adjust_counter;
116
117 bool fixed_alsa_source;
118 bool source_sink_changed;
119
120 /* Used for sink input and source output snapshots */
121 struct {
122 int64_t send_counter;
123 int64_t source_latency;
124 pa_usec_t source_timestamp;
125
126 int64_t recv_counter;
127 size_t loopback_memblockq_length;
128 int64_t sink_latency;
129 pa_usec_t sink_timestamp;
130 } latency_snapshot;
131
132 /* Input thread variable */
133 int64_t send_counter;
134
135 /* Output thread variables */
136 struct {
137 int64_t recv_counter;
138 pa_usec_t effective_source_latency;
139
140 /* Copied from main thread */
141 pa_usec_t minimum_latency;
142
143 /* Various booleans */
144 bool in_pop;
145 bool pop_called;
146 bool pop_adjust;
147 bool first_pop_done;
148 bool push_called;
149 } output_thread_info;
150 };
151
152 struct loopback_msg {
153 pa_msgobject parent;
154 struct userdata *userdata;
155 };
156
157 PA_DEFINE_PRIVATE_CLASS(loopback_msg, pa_msgobject);
158 #define LOOPBACK_MSG(o) (loopback_msg_cast(o))
159
160 static const char* const valid_modargs[] = {
161 "source",
162 "sink",
163 "adjust_time",
164 "latency_msec",
165 "max_latency_msec",
166 "fast_adjust_threshold_msec",
167 "format",
168 "rate",
169 "channels",
170 "channel_map",
171 "sink_input_properties",
172 "source_output_properties",
173 "source_dont_move",
174 "sink_dont_move",
175 "remix",
176 NULL,
177 };
178
179 enum {
180 SINK_INPUT_MESSAGE_POST = PA_SINK_INPUT_MESSAGE_MAX,
181 SINK_INPUT_MESSAGE_REWIND,
182 SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT,
183 SINK_INPUT_MESSAGE_SOURCE_CHANGED,
184 SINK_INPUT_MESSAGE_SET_EFFECTIVE_SOURCE_LATENCY,
185 SINK_INPUT_MESSAGE_UPDATE_MIN_LATENCY,
186 SINK_INPUT_MESSAGE_FAST_ADJUST,
187 };
188
189 enum {
190 SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT = PA_SOURCE_OUTPUT_MESSAGE_MAX,
191 };
192
193 enum {
194 LOOPBACK_MESSAGE_SOURCE_LATENCY_RANGE_CHANGED,
195 LOOPBACK_MESSAGE_SINK_LATENCY_RANGE_CHANGED,
196 LOOPBACK_MESSAGE_UNDERRUN,
197 };
198
199 static void enable_adjust_timer(struct userdata *u, bool enable);
200
201 /* Called from main context */
teardown(struct userdata * u)202 static void teardown(struct userdata *u) {
203 pa_assert(u);
204 pa_assert_ctl_context();
205
206 u->adjust_time = 0;
207 enable_adjust_timer(u, false);
208
209 /* Handling the asyncmsgq between the source output and the sink input
210 * requires some care. When the source output is unlinked, nothing needs
211 * to be done for the asyncmsgq, because the source output is the sending
212 * end. But when the sink input is unlinked, we should ensure that the
213 * asyncmsgq is emptied, because the messages in the queue hold references
214 * to the sink input. Also, we need to ensure that new messages won't be
215 * written to the queue after we have emptied it.
216 *
217 * Emptying the queue can be done in the state_change() callback of the
218 * sink input, when the new state is "unlinked".
219 *
220 * Preventing new messages from being written to the queue can be achieved
221 * by unlinking the source output before unlinking the sink input. There
222 * are no other writers for that queue, so this is sufficient. */
223
224 if (u->source_output) {
225 pa_source_output_unlink(u->source_output);
226 pa_source_output_unref(u->source_output);
227 u->source_output = NULL;
228 }
229
230 if (u->sink_input) {
231 pa_sink_input_unlink(u->sink_input);
232 pa_sink_input_unref(u->sink_input);
233 u->sink_input = NULL;
234 }
235 }
236
237 /* rate controller, called from main context
238 * - maximum deviation from base rate is less than 1%
239 * - can create audible artifacts by changing the rate too quickly
240 * - exhibits hunting with USB or Bluetooth sources
241 */
rate_controller(uint32_t base_rate,pa_usec_t adjust_time,int32_t latency_difference_usec)242 static uint32_t rate_controller(
243 uint32_t base_rate,
244 pa_usec_t adjust_time,
245 int32_t latency_difference_usec) {
246
247 uint32_t new_rate;
248 double min_cycles;
249
250 /* Calculate best rate to correct the current latency offset, limit at
251 * slightly below 1% difference from base_rate */
252 min_cycles = (double)abs(latency_difference_usec) / adjust_time / 0.01 + 1;
253 new_rate = base_rate * (1.0 + (double)latency_difference_usec / min_cycles / adjust_time);
254
255 return new_rate;
256 }
257
258 /* Called from main thread.
259 * It has been a matter of discussion how to correctly calculate the minimum
260 * latency that module-loopback can deliver with a given source and sink.
261 * The calculation has been placed in a separate function so that the definition
262 * can easily be changed. The resulting estimate is not very exact because it
263 * depends on the reported latency ranges. In cases were the lower bounds of
264 * source and sink latency are not reported correctly (USB) the result will
265 * be wrong. */
update_minimum_latency(struct userdata * u,pa_sink * sink,bool print_msg)266 static void update_minimum_latency(struct userdata *u, pa_sink *sink, bool print_msg) {
267
268 if (u->underrun_latency_limit)
269 /* If we already detected a real latency limit because of underruns, use it */
270 u->minimum_latency = u->underrun_latency_limit;
271
272 else {
273 /* Calculate latency limit from latency ranges */
274
275 u->minimum_latency = u->min_sink_latency;
276 if (u->fixed_alsa_source)
277 /* If we are using an alsa source with fixed latency, we will get a wakeup when
278 * one fragment is filled, and then we empty the source buffer, so the source
279 * latency never grows much beyond one fragment (assuming that the CPU doesn't
280 * cause a bottleneck). */
281 u->minimum_latency += u->core->default_fragment_size_msec * PA_USEC_PER_MSEC;
282
283 else
284 /* In all other cases the source will deliver new data at latest after one source latency.
285 * Make sure there is enough data available that the sink can keep on playing until new
286 * data is pushed. */
287 u->minimum_latency += u->min_source_latency;
288
289 /* Multiply by 1.1 as a safety margin for delays that are proportional to the buffer sizes */
290 u->minimum_latency *= 1.1;
291
292 /* Add 1.5 ms as a safety margin for delays not related to the buffer sizes */
293 u->minimum_latency += 1.5 * PA_USEC_PER_MSEC;
294 }
295
296 /* Add the latency offsets */
297 if (-(u->sink_latency_offset + u->source_latency_offset) <= (int64_t)u->minimum_latency)
298 u->minimum_latency += u->sink_latency_offset + u->source_latency_offset;
299 else
300 u->minimum_latency = 0;
301
302 /* If the sink is valid, send a message to update the minimum latency to
303 * the output thread, else set the variable directly */
304 if (sink)
305 pa_asyncmsgq_send(sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_UPDATE_MIN_LATENCY, NULL, u->minimum_latency, NULL);
306 else
307 u->output_thread_info.minimum_latency = u->minimum_latency;
308
309 if (print_msg) {
310 pa_log_info("Minimum possible end to end latency: %0.2f ms", (double)u->minimum_latency / PA_USEC_PER_MSEC);
311 if (u->latency < u->minimum_latency)
312 pa_log_warn("Configured latency of %0.2f ms is smaller than minimum latency, using minimum instead", (double)u->latency / PA_USEC_PER_MSEC);
313 }
314 }
315
316 /* Called from main context */
adjust_rates(struct userdata * u)317 static void adjust_rates(struct userdata *u) {
318 size_t buffer;
319 uint32_t old_rate, base_rate, new_rate, run_hours;
320 int32_t latency_difference;
321 pa_usec_t current_buffer_latency, snapshot_delay;
322 int64_t current_source_sink_latency, current_latency, latency_at_optimum_rate;
323 pa_usec_t final_latency, now, time_passed;
324
325 pa_assert(u);
326 pa_assert_ctl_context();
327
328 /* Runtime and counters since last change of source or sink
329 * or source/sink latency */
330 run_hours = u->iteration_counter * u->real_adjust_time / PA_USEC_PER_SEC / 3600;
331 u->iteration_counter +=1;
332
333 /* If we are seeing underruns then the latency is too small */
334 if (u->underrun_counter > 2) {
335 pa_usec_t target_latency;
336
337 target_latency = PA_MAX(u->latency, u->minimum_latency) + 5 * PA_USEC_PER_MSEC;
338
339 if (u->max_latency == 0 || target_latency < u->max_latency) {
340 u->underrun_latency_limit = PA_CLIP_SUB((int64_t)target_latency, u->sink_latency_offset + u->source_latency_offset);
341 pa_log_warn("Too many underruns, increasing latency to %0.2f ms", (double)target_latency / PA_USEC_PER_MSEC);
342 } else {
343 u->underrun_latency_limit = PA_CLIP_SUB((int64_t)u->max_latency, u->sink_latency_offset + u->source_latency_offset);
344 pa_log_warn("Too many underruns, configured maximum latency of %0.2f ms is reached", (double)u->max_latency / PA_USEC_PER_MSEC);
345 pa_log_warn("Consider increasing the max_latency_msec");
346 }
347
348 update_minimum_latency(u, u->sink_input->sink, false);
349 u->underrun_counter = 0;
350 }
351
352 /* Allow one underrun per hour */
353 if (u->iteration_counter * u->real_adjust_time / PA_USEC_PER_SEC / 3600 > run_hours) {
354 u->underrun_counter = PA_CLIP_SUB(u->underrun_counter, 1u);
355 pa_log_info("Underrun counter: %u", u->underrun_counter);
356 }
357
358 /* Calculate real adjust time if source or sink did not change and if the system has
359 * not been suspended. If the time between two calls is more than 5% longer than the
360 * configured adjust time, we assume that the system has been sleeping and skip the
361 * calculation for this iteration. */
362 now = pa_rtclock_now();
363 time_passed = now - u->adjust_time_stamp;
364 if (!u->source_sink_changed && time_passed < u->adjust_time * 1.05) {
365 u->adjust_counter++;
366 u->real_adjust_time_sum += time_passed;
367 u->real_adjust_time = u->real_adjust_time_sum / u->adjust_counter;
368 }
369 u->adjust_time_stamp = now;
370
371 /* Rates and latencies */
372 old_rate = u->sink_input->sample_spec.rate;
373 base_rate = u->source_output->sample_spec.rate;
374
375 buffer = u->latency_snapshot.loopback_memblockq_length;
376 if (u->latency_snapshot.recv_counter <= u->latency_snapshot.send_counter)
377 buffer += (size_t) (u->latency_snapshot.send_counter - u->latency_snapshot.recv_counter);
378 else
379 buffer = PA_CLIP_SUB(buffer, (size_t) (u->latency_snapshot.recv_counter - u->latency_snapshot.send_counter));
380
381 current_buffer_latency = pa_bytes_to_usec(buffer, &u->sink_input->sample_spec);
382 snapshot_delay = u->latency_snapshot.source_timestamp - u->latency_snapshot.sink_timestamp;
383 current_source_sink_latency = u->latency_snapshot.sink_latency + u->latency_snapshot.source_latency - snapshot_delay;
384
385 /* Current latency */
386 current_latency = current_source_sink_latency + current_buffer_latency;
387
388 /* Latency at base rate */
389 latency_at_optimum_rate = current_source_sink_latency + current_buffer_latency * old_rate / base_rate;
390
391 final_latency = PA_MAX(u->latency, u->minimum_latency);
392 latency_difference = (int32_t)(latency_at_optimum_rate - final_latency);
393
394 pa_log_debug("Loopback overall latency is %0.2f ms + %0.2f ms + %0.2f ms = %0.2f ms",
395 (double) u->latency_snapshot.sink_latency / PA_USEC_PER_MSEC,
396 (double) current_buffer_latency / PA_USEC_PER_MSEC,
397 (double) u->latency_snapshot.source_latency / PA_USEC_PER_MSEC,
398 (double) current_latency / PA_USEC_PER_MSEC);
399
400 pa_log_debug("Loopback latency at base rate is %0.2f ms", (double)latency_at_optimum_rate / PA_USEC_PER_MSEC);
401
402 /* Drop or insert samples if fast_adjust_threshold_msec was specified and the latency difference is too large. */
403 if (u->fast_adjust_threshold > 0 && abs(latency_difference) > u->fast_adjust_threshold) {
404 pa_log_debug ("Latency difference larger than %" PRIu64 " msec, skipping or inserting samples.", u->fast_adjust_threshold / PA_USEC_PER_MSEC);
405
406 pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_FAST_ADJUST, NULL, current_source_sink_latency, NULL);
407
408 /* Skip real adjust time calculation on next iteration. */
409 u->source_sink_changed = true;
410 return;
411 }
412
413 /* Calculate new rate */
414 new_rate = rate_controller(base_rate, u->real_adjust_time, latency_difference);
415
416 u->source_sink_changed = false;
417
418 /* Set rate */
419 pa_sink_input_set_rate(u->sink_input, new_rate);
420 pa_log_debug("[%s] Updated sampling rate to %lu Hz.", u->sink_input->sink->name, (unsigned long) new_rate);
421 }
422
423 /* Called from main context */
time_callback(pa_mainloop_api * a,pa_time_event * e,const struct timeval * t,void * userdata)424 static void time_callback(pa_mainloop_api *a, pa_time_event *e, const struct timeval *t, void *userdata) {
425 struct userdata *u = userdata;
426
427 pa_assert(u);
428 pa_assert(a);
429 pa_assert(u->time_event == e);
430
431 /* Restart timer right away */
432 pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
433
434 /* Get sink and source latency snapshot */
435 pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, NULL, 0, NULL);
436 pa_asyncmsgq_send(u->source_output->source->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT, NULL, 0, NULL);
437
438 adjust_rates(u);
439 }
440
441 /* Called from main context
442 * When source or sink changes, give it a third of a second to settle down, then call adjust_rates for the first time */
enable_adjust_timer(struct userdata * u,bool enable)443 static void enable_adjust_timer(struct userdata *u, bool enable) {
444 if (enable) {
445 if (!u->adjust_time)
446 return;
447 if (u->time_event)
448 u->core->mainloop->time_free(u->time_event);
449
450 u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + 333 * PA_USEC_PER_MSEC, time_callback, u);
451 } else {
452 if (!u->time_event)
453 return;
454
455 u->core->mainloop->time_free(u->time_event);
456 u->time_event = NULL;
457 }
458 }
459
460 /* Called from main context */
update_adjust_timer(struct userdata * u)461 static void update_adjust_timer(struct userdata *u) {
462 if (u->sink_input->state == PA_SINK_INPUT_CORKED || u->source_output->state == PA_SOURCE_OUTPUT_CORKED)
463 enable_adjust_timer(u, false);
464 else
465 enable_adjust_timer(u, true);
466 }
467
468 /* Called from main thread
469 * Calculates minimum and maximum possible latency for source and sink */
update_latency_boundaries(struct userdata * u,pa_source * source,pa_sink * sink)470 static void update_latency_boundaries(struct userdata *u, pa_source *source, pa_sink *sink) {
471 const char *s;
472
473 if (source) {
474 /* Source latencies */
475 u->fixed_alsa_source = false;
476 if (source->flags & PA_SOURCE_DYNAMIC_LATENCY)
477 pa_source_get_latency_range(source, &u->min_source_latency, &u->max_source_latency);
478 else {
479 u->min_source_latency = pa_source_get_fixed_latency(source);
480 u->max_source_latency = u->min_source_latency;
481 if ((s = pa_proplist_gets(source->proplist, PA_PROP_DEVICE_API))) {
482 if (pa_streq(s, "alsa"))
483 u->fixed_alsa_source = true;
484 }
485 }
486 /* Source offset */
487 u->source_latency_offset = source->port_latency_offset;
488
489 /* Latencies below 2.5 ms cause problems, limit source latency if possible */
490 if (u->max_source_latency >= MIN_DEVICE_LATENCY)
491 u->min_source_latency = PA_MAX(u->min_source_latency, MIN_DEVICE_LATENCY);
492 else
493 u->min_source_latency = u->max_source_latency;
494 }
495
496 if (sink) {
497 /* Sink latencies */
498 if (sink->flags & PA_SINK_DYNAMIC_LATENCY)
499 pa_sink_get_latency_range(sink, &u->min_sink_latency, &u->max_sink_latency);
500 else {
501 u->min_sink_latency = pa_sink_get_fixed_latency(sink);
502 u->max_sink_latency = u->min_sink_latency;
503 }
504 /* Sink offset */
505 u->sink_latency_offset = sink->port_latency_offset;
506
507 /* Latencies below 2.5 ms cause problems, limit sink latency if possible */
508 if (u->max_sink_latency >= MIN_DEVICE_LATENCY)
509 u->min_sink_latency = PA_MAX(u->min_sink_latency, MIN_DEVICE_LATENCY);
510 else
511 u->min_sink_latency = u->max_sink_latency;
512 }
513
514 update_minimum_latency(u, sink, true);
515 }
516
517 /* Called from output context
518 * Sets the memblockq to the configured latency corrected by latency_offset_usec */
memblockq_adjust(struct userdata * u,int64_t latency_offset_usec,bool allow_push)519 static void memblockq_adjust(struct userdata *u, int64_t latency_offset_usec, bool allow_push) {
520 size_t current_memblockq_length, requested_memblockq_length, buffer_correction;
521 int64_t requested_buffer_latency;
522 pa_usec_t final_latency, requested_sink_latency;
523
524 final_latency = PA_MAX(u->latency, u->output_thread_info.minimum_latency);
525
526 /* If source or sink have some large negative latency offset, we might want to
527 * hold more than final_latency in the memblockq */
528 requested_buffer_latency = (int64_t)final_latency - latency_offset_usec;
529
530 /* Keep at least one sink latency in the queue to make sure that the sink
531 * never underruns initially */
532 requested_sink_latency = pa_sink_get_requested_latency_within_thread(u->sink_input->sink);
533 if (requested_buffer_latency < (int64_t)requested_sink_latency)
534 requested_buffer_latency = requested_sink_latency;
535
536 requested_memblockq_length = pa_usec_to_bytes(requested_buffer_latency, &u->sink_input->sample_spec);
537 current_memblockq_length = pa_memblockq_get_length(u->memblockq);
538
539 if (current_memblockq_length > requested_memblockq_length) {
540 /* Drop audio from queue */
541 buffer_correction = current_memblockq_length - requested_memblockq_length;
542 pa_log_info("Dropping %" PRIu64 " usec of audio from queue", pa_bytes_to_usec(buffer_correction, &u->sink_input->sample_spec));
543 pa_memblockq_drop(u->memblockq, buffer_correction);
544
545 } else if (current_memblockq_length < requested_memblockq_length && allow_push) {
546 /* Add silence to queue */
547 buffer_correction = requested_memblockq_length - current_memblockq_length;
548 pa_log_info("Adding %" PRIu64 " usec of silence to queue", pa_bytes_to_usec(buffer_correction, &u->sink_input->sample_spec));
549 pa_memblockq_seek(u->memblockq, (int64_t)buffer_correction, PA_SEEK_RELATIVE, true);
550 }
551 }
552
553 /* Called from input thread context */
source_output_push_cb(pa_source_output * o,const pa_memchunk * chunk)554 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
555 struct userdata *u;
556 pa_usec_t push_time;
557 int64_t current_source_latency;
558
559 pa_source_output_assert_ref(o);
560 pa_source_output_assert_io_context(o);
561 pa_assert_se(u = o->userdata);
562
563 /* Send current source latency and timestamp with the message */
564 push_time = pa_rtclock_now();
565 current_source_latency = pa_source_get_latency_within_thread(u->source_output->source, true);
566
567 pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_POST, PA_INT_TO_PTR(current_source_latency), push_time, chunk, NULL);
568 u->send_counter += (int64_t) chunk->length;
569 }
570
571 /* Called from input thread context */
source_output_process_rewind_cb(pa_source_output * o,size_t nbytes)572 static void source_output_process_rewind_cb(pa_source_output *o, size_t nbytes) {
573 struct userdata *u;
574
575 pa_source_output_assert_ref(o);
576 pa_source_output_assert_io_context(o);
577 pa_assert_se(u = o->userdata);
578
579 pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_REWIND, NULL, (int64_t) nbytes, NULL, NULL);
580 u->send_counter -= (int64_t) nbytes;
581 }
582
583 /* Called from input thread context */
source_output_process_msg_cb(pa_msgobject * obj,int code,void * data,int64_t offset,pa_memchunk * chunk)584 static int source_output_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
585 struct userdata *u = PA_SOURCE_OUTPUT(obj)->userdata;
586
587 switch (code) {
588
589 case SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT: {
590 size_t length;
591
592 length = pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq);
593
594 u->latency_snapshot.send_counter = u->send_counter;
595 /* Add content of delay memblockq to the source latency */
596 u->latency_snapshot.source_latency = pa_source_get_latency_within_thread(u->source_output->source, true) +
597 pa_bytes_to_usec(length, &u->source_output->source->sample_spec);
598 u->latency_snapshot.source_timestamp = pa_rtclock_now();
599
600 return 0;
601 }
602 }
603
604 return pa_source_output_process_msg(obj, code, data, offset, chunk);
605 }
606
607 /* Called from main thread.
608 * Get current effective latency of the source. If the source is in use with
609 * smaller latency than the configured latency, it will continue running with
610 * the smaller value when the source output is switched to the source. */
update_effective_source_latency(struct userdata * u,pa_source * source,pa_sink * sink)611 static void update_effective_source_latency(struct userdata *u, pa_source *source, pa_sink *sink) {
612 pa_usec_t effective_source_latency;
613
614 effective_source_latency = u->configured_source_latency;
615
616 if (source) {
617 effective_source_latency = pa_source_get_requested_latency(source);
618 if (effective_source_latency == 0 || effective_source_latency > u->configured_source_latency)
619 effective_source_latency = u->configured_source_latency;
620 }
621
622 /* If the sink is valid, send a message to the output thread, else set the variable directly */
623 if (sink)
624 pa_asyncmsgq_send(sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_SET_EFFECTIVE_SOURCE_LATENCY, NULL, (int64_t)effective_source_latency, NULL);
625 else
626 u->output_thread_info.effective_source_latency = effective_source_latency;
627 }
628
629 /* Called from main thread.
630 * Set source output latency to one third of the overall latency if possible.
631 * The choice of one third is rather arbitrary somewhere between the minimum
632 * possible latency which would cause a lot of CPU load and half the configured
633 * latency which would quickly lead to underruns */
set_source_output_latency(struct userdata * u,pa_source * source)634 static void set_source_output_latency(struct userdata *u, pa_source *source) {
635 pa_usec_t latency, requested_latency;
636
637 requested_latency = u->latency / 3;
638
639 /* Normally we try to configure sink and source latency equally. If the
640 * sink latency cannot match the requested source latency try to set the
641 * source latency to a smaller value to avoid underruns */
642 if (u->min_sink_latency > requested_latency) {
643 latency = PA_MAX(u->latency, u->minimum_latency);
644 requested_latency = (latency - u->min_sink_latency) / 2;
645 }
646
647 latency = PA_CLAMP(requested_latency , u->min_source_latency, u->max_source_latency);
648 u->configured_source_latency = pa_source_output_set_requested_latency(u->source_output, latency);
649 if (u->configured_source_latency != requested_latency)
650 pa_log_warn("Cannot set requested source latency of %0.2f ms, adjusting to %0.2f ms", (double)requested_latency / PA_USEC_PER_MSEC, (double)u->configured_source_latency / PA_USEC_PER_MSEC);
651 }
652
653 /* Called from input thread context */
source_output_attach_cb(pa_source_output * o)654 static void source_output_attach_cb(pa_source_output *o) {
655 struct userdata *u;
656
657 pa_source_output_assert_ref(o);
658 pa_source_output_assert_io_context(o);
659 pa_assert_se(u = o->userdata);
660
661 u->rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
662 o->source->thread_info.rtpoll,
663 PA_RTPOLL_LATE,
664 u->asyncmsgq);
665 }
666
667 /* Called from input thread context */
source_output_detach_cb(pa_source_output * o)668 static void source_output_detach_cb(pa_source_output *o) {
669 struct userdata *u;
670
671 pa_source_output_assert_ref(o);
672 pa_source_output_assert_io_context(o);
673 pa_assert_se(u = o->userdata);
674
675 if (u->rtpoll_item_write) {
676 pa_rtpoll_item_free(u->rtpoll_item_write);
677 u->rtpoll_item_write = NULL;
678 }
679 }
680
681 /* Called from main thread */
source_output_kill_cb(pa_source_output * o)682 static void source_output_kill_cb(pa_source_output *o) {
683 struct userdata *u;
684
685 pa_source_output_assert_ref(o);
686 pa_assert_ctl_context();
687 pa_assert_se(u = o->userdata);
688
689 teardown(u);
690 pa_module_unload_request(u->module, true);
691 }
692
693 /* Called from main thread */
source_output_may_move_to_cb(pa_source_output * o,pa_source * dest)694 static bool source_output_may_move_to_cb(pa_source_output *o, pa_source *dest) {
695 struct userdata *u;
696
697 pa_source_output_assert_ref(o);
698 pa_assert_ctl_context();
699 pa_assert_se(u = o->userdata);
700
701 if (!u->sink_input || !u->sink_input->sink)
702 return true;
703
704 return dest != u->sink_input->sink->monitor_source;
705 }
706
707 /* Called from main thread */
source_output_moving_cb(pa_source_output * o,pa_source * dest)708 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
709 struct userdata *u;
710 char *input_description;
711 const char *n;
712
713 if (!dest)
714 return;
715
716 pa_source_output_assert_ref(o);
717 pa_assert_ctl_context();
718 pa_assert_se(u = o->userdata);
719
720 input_description = pa_sprintf_malloc("Loopback of %s",
721 pa_strnull(pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION)));
722 pa_sink_input_set_property(u->sink_input, PA_PROP_MEDIA_NAME, input_description);
723 pa_xfree(input_description);
724
725 if ((n = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_ICON_NAME)))
726 pa_sink_input_set_property(u->sink_input, PA_PROP_DEVICE_ICON_NAME, n);
727
728 /* Set latency and calculate latency limits */
729 u->underrun_latency_limit = 0;
730 update_latency_boundaries(u, dest, u->sink_input->sink);
731 set_source_output_latency(u, dest);
732 update_effective_source_latency(u, dest, u->sink_input->sink);
733
734 /* Uncork the sink input unless the destination is suspended for other
735 * reasons than idle. */
736 if (dest->state == PA_SOURCE_SUSPENDED)
737 pa_sink_input_cork(u->sink_input, (dest->suspend_cause != PA_SUSPEND_IDLE));
738 else
739 pa_sink_input_cork(u->sink_input, false);
740
741 update_adjust_timer(u);
742
743 /* Reset counters */
744 u->iteration_counter = 0;
745 u->underrun_counter = 0;
746
747 u->source_sink_changed = true;
748
749 /* Send a mesage to the output thread that the source has changed.
750 * If the sink is invalid here during a profile switching situation
751 * we can safely set push_called to false directly. */
752 if (u->sink_input->sink)
753 pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_SOURCE_CHANGED, NULL, 0, NULL);
754 else
755 u->output_thread_info.push_called = false;
756
757 /* The sampling rate may be far away from the default rate if we are still
758 * recovering from a previous source or sink change, so reset rate to
759 * default before moving the source. */
760 pa_sink_input_set_rate(u->sink_input, u->source_output->sample_spec.rate);
761 }
762
763 /* Called from main thread */
source_output_suspend_cb(pa_source_output * o,pa_source_state_t old_state,pa_suspend_cause_t old_suspend_cause)764 static void source_output_suspend_cb(pa_source_output *o, pa_source_state_t old_state, pa_suspend_cause_t old_suspend_cause) {
765 struct userdata *u;
766 bool suspended;
767
768 pa_source_output_assert_ref(o);
769 pa_assert_ctl_context();
770 pa_assert_se(u = o->userdata);
771
772 /* State has not changed, nothing to do */
773 if (old_state == o->source->state)
774 return;
775
776 suspended = (o->source->state == PA_SOURCE_SUSPENDED);
777
778 /* If the source has been suspended, we need to handle this like
779 * a source change when the source is resumed */
780 if (suspended) {
781 if (u->sink_input->sink)
782 pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_SOURCE_CHANGED, NULL, 0, NULL);
783 else
784 u->output_thread_info.push_called = false;
785
786 } else
787 /* Get effective source latency on unsuspend */
788 update_effective_source_latency(u, u->source_output->source, u->sink_input->sink);
789
790 pa_sink_input_cork(u->sink_input, suspended);
791
792 update_adjust_timer(u);
793 }
794
795 /* Called from input thread context */
update_source_latency_range_cb(pa_source_output * i)796 static void update_source_latency_range_cb(pa_source_output *i) {
797 struct userdata *u;
798
799 pa_source_output_assert_ref(i);
800 pa_source_output_assert_io_context(i);
801 pa_assert_se(u = i->userdata);
802
803 /* Source latency may have changed */
804 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), LOOPBACK_MESSAGE_SOURCE_LATENCY_RANGE_CHANGED, NULL, 0, NULL, NULL);
805 }
806
807 /* Called from output thread context */
sink_input_pop_cb(pa_sink_input * i,size_t nbytes,pa_memchunk * chunk)808 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
809 struct userdata *u;
810
811 pa_sink_input_assert_ref(i);
812 pa_sink_input_assert_io_context(i);
813 pa_assert_se(u = i->userdata);
814 pa_assert(chunk);
815
816 /* It seems necessary to handle outstanding push messages here, though it is not clear
817 * why. Removing this part leads to underruns when low latencies are configured. */
818 u->output_thread_info.in_pop = true;
819 while (pa_asyncmsgq_process_one(u->asyncmsgq) > 0)
820 ;
821 u->output_thread_info.in_pop = false;
822
823 /* While pop has not been called, latency adjustments in SINK_INPUT_MESSAGE_POST are
824 * enabled. Disable them on second pop and enable the final adjustment during the
825 * next push. The adjustment must be done on the next push, because there is no way
826 * to retrieve the source latency here. We are waiting for the second pop, because
827 * the first pop may be called before the sink is actually started. */
828 if (!u->output_thread_info.pop_called && u->output_thread_info.first_pop_done) {
829 u->output_thread_info.pop_adjust = true;
830 u->output_thread_info.pop_called = true;
831 }
832 u->output_thread_info.first_pop_done = true;
833
834 if (pa_memblockq_peek(u->memblockq, chunk) < 0) {
835 pa_log_info("Could not peek into queue");
836 return -1;
837 }
838
839 chunk->length = PA_MIN(chunk->length, nbytes);
840 pa_memblockq_drop(u->memblockq, chunk->length);
841
842 /* Adjust the memblockq to ensure that there is
843 * enough data in the queue to avoid underruns. */
844 if (!u->output_thread_info.push_called)
845 memblockq_adjust(u, 0, true);
846
847 return 0;
848 }
849
850 /* Called from output thread context */
sink_input_process_rewind_cb(pa_sink_input * i,size_t nbytes)851 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
852 struct userdata *u;
853
854 pa_sink_input_assert_ref(i);
855 pa_sink_input_assert_io_context(i);
856 pa_assert_se(u = i->userdata);
857
858 pa_memblockq_rewind(u->memblockq, nbytes);
859 }
860
861 /* Called from output thread context */
sink_input_process_msg_cb(pa_msgobject * obj,int code,void * data,int64_t offset,pa_memchunk * chunk)862 static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
863 struct userdata *u = PA_SINK_INPUT(obj)->userdata;
864
865 pa_sink_input_assert_io_context(u->sink_input);
866
867 switch (code) {
868
869 case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
870 pa_usec_t *r = data;
871
872 *r = pa_bytes_to_usec(pa_memblockq_get_length(u->memblockq), &u->sink_input->sample_spec);
873
874 /* Fall through, the default handler will add in the extra
875 * latency added by the resampler */
876 break;
877 }
878
879 case SINK_INPUT_MESSAGE_POST:
880
881 pa_memblockq_push_align(u->memblockq, chunk);
882
883 /* If push has not been called yet, latency adjustments in sink_input_pop_cb()
884 * are enabled. Disable them on first push and correct the memblockq. If pop
885 * has not been called yet, wait until the pop_cb() requests the adjustment */
886 if (u->output_thread_info.pop_called && (!u->output_thread_info.push_called || u->output_thread_info.pop_adjust)) {
887 int64_t time_delta;
888
889 /* This is the source latency at the time push was called */
890 time_delta = PA_PTR_TO_INT(data);
891 /* Add the time between push and post */
892 time_delta += pa_rtclock_now() - (pa_usec_t) offset;
893 /* Add the sink latency */
894 time_delta += pa_sink_get_latency_within_thread(u->sink_input->sink, true);
895
896 /* The source latency report includes the audio in the chunk,
897 * but since we already pushed the chunk to the memblockq, we need
898 * to subtract the chunk size from the source latency so that it
899 * won't be counted towards both the memblockq latency and the
900 * source latency.
901 *
902 * Sometimes the alsa source reports way too low latency (might
903 * be a bug in the alsa source code). This seems to happen when
904 * there's an overrun. As an attempt to detect overruns, we
905 * check if the chunk size is larger than the configured source
906 * latency. If so, we assume that the source should have pushed
907 * a chunk whose size equals the configured latency, so we
908 * modify time_delta only by that amount, which makes
909 * memblockq_adjust() drop more data than it would otherwise.
910 * This seems to work quite well, but it's possible that the
911 * next push also contains too much data, and in that case the
912 * resulting latency will be wrong. */
913 if (pa_bytes_to_usec(chunk->length, &u->sink_input->sample_spec) > u->output_thread_info.effective_source_latency)
914 time_delta -= (int64_t)u->output_thread_info.effective_source_latency;
915 else
916 time_delta -= (int64_t)pa_bytes_to_usec(chunk->length, &u->sink_input->sample_spec);
917
918 /* FIXME: We allow pushing silence here to fix up the latency. This
919 * might lead to a gap in the stream */
920 memblockq_adjust(u, time_delta, true);
921
922 u->output_thread_info.pop_adjust = false;
923 u->output_thread_info.push_called = true;
924 }
925
926 /* If pop has not been called yet, make sure the latency does not grow too much.
927 * Don't push any silence here, because we already have new data in the queue */
928 if (!u->output_thread_info.pop_called)
929 memblockq_adjust(u, 0, false);
930
931 /* Is this the end of an underrun? Then let's start things
932 * right-away */
933 if (u->sink_input->sink->thread_info.state != PA_SINK_SUSPENDED &&
934 u->sink_input->thread_info.underrun_for > 0 &&
935 pa_memblockq_is_readable(u->memblockq)) {
936
937 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), LOOPBACK_MESSAGE_UNDERRUN, NULL, 0, NULL, NULL);
938 /* If called from within the pop callback skip the rewind */
939 if (!u->output_thread_info.in_pop) {
940 pa_log_debug("Requesting rewind due to end of underrun.");
941 pa_sink_input_request_rewind(u->sink_input,
942 (size_t) (u->sink_input->thread_info.underrun_for == (size_t) -1 ? 0 : u->sink_input->thread_info.underrun_for),
943 false, true, false);
944 }
945 }
946
947 u->output_thread_info.recv_counter += (int64_t) chunk->length;
948
949 return 0;
950
951 case SINK_INPUT_MESSAGE_REWIND:
952
953 /* Do not try to rewind if no data was pushed yet */
954 if (u->output_thread_info.push_called)
955 pa_memblockq_seek(u->memblockq, -offset, PA_SEEK_RELATIVE, true);
956
957 u->output_thread_info.recv_counter -= offset;
958
959 return 0;
960
961 case SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT: {
962 size_t length;
963
964 length = pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq);
965
966 u->latency_snapshot.recv_counter = u->output_thread_info.recv_counter;
967 u->latency_snapshot.loopback_memblockq_length = pa_memblockq_get_length(u->memblockq);
968 /* Add content of render memblockq to sink latency */
969 u->latency_snapshot.sink_latency = pa_sink_get_latency_within_thread(u->sink_input->sink, true) +
970 pa_bytes_to_usec(length, &u->sink_input->sink->sample_spec);
971 u->latency_snapshot.sink_timestamp = pa_rtclock_now();
972
973 return 0;
974 }
975
976 case SINK_INPUT_MESSAGE_SOURCE_CHANGED:
977
978 u->output_thread_info.push_called = false;
979
980 return 0;
981
982 case SINK_INPUT_MESSAGE_SET_EFFECTIVE_SOURCE_LATENCY:
983
984 u->output_thread_info.effective_source_latency = (pa_usec_t)offset;
985
986 return 0;
987
988 case SINK_INPUT_MESSAGE_UPDATE_MIN_LATENCY:
989
990 u->output_thread_info.minimum_latency = (pa_usec_t)offset;
991
992 return 0;
993
994 case SINK_INPUT_MESSAGE_FAST_ADJUST:
995
996 memblockq_adjust(u, offset, true);
997
998 return 0;
999 }
1000
1001 return pa_sink_input_process_msg(obj, code, data, offset, chunk);
1002 }
1003 /* Called from main thread.
1004 * Set sink input latency to one third of the overall latency if possible.
1005 * The choice of one third is rather arbitrary somewhere between the minimum
1006 * possible latency which would cause a lot of CPU load and half the configured
1007 * latency which would quickly lead to underruns. */
set_sink_input_latency(struct userdata * u,pa_sink * sink)1008 static void set_sink_input_latency(struct userdata *u, pa_sink *sink) {
1009 pa_usec_t latency, requested_latency;
1010
1011 requested_latency = u->latency / 3;
1012
1013 /* Normally we try to configure sink and source latency equally. If the
1014 * source latency cannot match the requested sink latency try to set the
1015 * sink latency to a smaller value to avoid underruns */
1016 if (u->min_source_latency > requested_latency) {
1017 latency = PA_MAX(u->latency, u->minimum_latency);
1018 requested_latency = (latency - u->min_source_latency) / 2;
1019 }
1020
1021 latency = PA_CLAMP(requested_latency , u->min_sink_latency, u->max_sink_latency);
1022 u->configured_sink_latency = pa_sink_input_set_requested_latency(u->sink_input, latency);
1023 if (u->configured_sink_latency != requested_latency)
1024 pa_log_warn("Cannot set requested sink latency of %0.2f ms, adjusting to %0.2f ms", (double)requested_latency / PA_USEC_PER_MSEC, (double)u->configured_sink_latency / PA_USEC_PER_MSEC);
1025 }
1026
1027 /* Called from output thread context */
sink_input_attach_cb(pa_sink_input * i)1028 static void sink_input_attach_cb(pa_sink_input *i) {
1029 struct userdata *u;
1030
1031 pa_sink_input_assert_ref(i);
1032 pa_sink_input_assert_io_context(i);
1033 pa_assert_se(u = i->userdata);
1034
1035 u->rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
1036 i->sink->thread_info.rtpoll,
1037 PA_RTPOLL_LATE,
1038 u->asyncmsgq);
1039
1040 pa_memblockq_set_prebuf(u->memblockq, pa_sink_input_get_max_request(i)*2);
1041 pa_memblockq_set_maxrewind(u->memblockq, pa_sink_input_get_max_rewind(i));
1042 }
1043
1044 /* Called from output thread context */
sink_input_detach_cb(pa_sink_input * i)1045 static void sink_input_detach_cb(pa_sink_input *i) {
1046 struct userdata *u;
1047
1048 pa_sink_input_assert_ref(i);
1049 pa_sink_input_assert_io_context(i);
1050 pa_assert_se(u = i->userdata);
1051
1052 if (u->rtpoll_item_read) {
1053 pa_rtpoll_item_free(u->rtpoll_item_read);
1054 u->rtpoll_item_read = NULL;
1055 }
1056 }
1057
1058 /* Called from output thread context */
sink_input_update_max_rewind_cb(pa_sink_input * i,size_t nbytes)1059 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1060 struct userdata *u;
1061
1062 pa_sink_input_assert_ref(i);
1063 pa_sink_input_assert_io_context(i);
1064 pa_assert_se(u = i->userdata);
1065
1066 pa_memblockq_set_maxrewind(u->memblockq, nbytes);
1067 }
1068
1069 /* Called from output thread context */
sink_input_update_max_request_cb(pa_sink_input * i,size_t nbytes)1070 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1071 struct userdata *u;
1072
1073 pa_sink_input_assert_ref(i);
1074 pa_sink_input_assert_io_context(i);
1075 pa_assert_se(u = i->userdata);
1076
1077 pa_memblockq_set_prebuf(u->memblockq, nbytes*2);
1078 pa_log_info("Max request changed");
1079 }
1080
1081 /* Called from main thread */
sink_input_kill_cb(pa_sink_input * i)1082 static void sink_input_kill_cb(pa_sink_input *i) {
1083 struct userdata *u;
1084
1085 pa_sink_input_assert_ref(i);
1086 pa_assert_ctl_context();
1087 pa_assert_se(u = i->userdata);
1088
1089 teardown(u);
1090 pa_module_unload_request(u->module, true);
1091 }
1092
1093 /* Called from the output thread context */
sink_input_state_change_cb(pa_sink_input * i,pa_sink_input_state_t state)1094 static void sink_input_state_change_cb(pa_sink_input *i, pa_sink_input_state_t state) {
1095 struct userdata *u;
1096
1097 pa_sink_input_assert_ref(i);
1098 pa_assert_se(u = i->userdata);
1099
1100 if (state == PA_SINK_INPUT_UNLINKED)
1101 pa_asyncmsgq_flush(u->asyncmsgq, false);
1102 }
1103
1104 /* Called from main thread */
sink_input_moving_cb(pa_sink_input * i,pa_sink * dest)1105 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1106 struct userdata *u;
1107 char *output_description;
1108 const char *n;
1109
1110 if (!dest)
1111 return;
1112
1113 pa_sink_input_assert_ref(i);
1114 pa_assert_ctl_context();
1115 pa_assert_se(u = i->userdata);
1116
1117 output_description = pa_sprintf_malloc("Loopback to %s",
1118 pa_strnull(pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION)));
1119 pa_source_output_set_property(u->source_output, PA_PROP_MEDIA_NAME, output_description);
1120 pa_xfree(output_description);
1121
1122 if ((n = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_ICON_NAME)))
1123 pa_source_output_set_property(u->source_output, PA_PROP_MEDIA_ICON_NAME, n);
1124
1125 /* Set latency and calculate latency limits */
1126 u->underrun_latency_limit = 0;
1127 update_latency_boundaries(u, NULL, dest);
1128 set_sink_input_latency(u, dest);
1129 update_effective_source_latency(u, u->source_output->source, dest);
1130
1131 /* Uncork the source output unless the destination is suspended for other
1132 * reasons than idle */
1133 if (dest->state == PA_SINK_SUSPENDED)
1134 pa_source_output_cork(u->source_output, (dest->suspend_cause != PA_SUSPEND_IDLE));
1135 else
1136 pa_source_output_cork(u->source_output, false);
1137
1138 update_adjust_timer(u);
1139
1140 /* Reset counters */
1141 u->iteration_counter = 0;
1142 u->underrun_counter = 0;
1143
1144 u->source_sink_changed = true;
1145
1146 u->output_thread_info.pop_called = false;
1147 u->output_thread_info.first_pop_done = false;
1148
1149 /* Sample rate may be far away from the default rate if we are still
1150 * recovering from a previous source or sink change, so reset rate to
1151 * default before moving the sink. */
1152 pa_sink_input_set_rate(u->sink_input, u->source_output->sample_spec.rate);
1153 }
1154
1155 /* Called from main thread */
sink_input_may_move_to_cb(pa_sink_input * i,pa_sink * dest)1156 static bool sink_input_may_move_to_cb(pa_sink_input *i, pa_sink *dest) {
1157 struct userdata *u;
1158
1159 pa_sink_input_assert_ref(i);
1160 pa_assert_ctl_context();
1161 pa_assert_se(u = i->userdata);
1162
1163 if (!u->source_output || !u->source_output->source)
1164 return true;
1165
1166 return dest != u->source_output->source->monitor_of;
1167 }
1168
1169 /* Called from main thread */
sink_input_suspend_cb(pa_sink_input * i,pa_sink_state_t old_state,pa_suspend_cause_t old_suspend_cause)1170 static void sink_input_suspend_cb(pa_sink_input *i, pa_sink_state_t old_state, pa_suspend_cause_t old_suspend_cause) {
1171 struct userdata *u;
1172 bool suspended;
1173
1174 pa_sink_input_assert_ref(i);
1175 pa_assert_ctl_context();
1176 pa_assert_se(u = i->userdata);
1177
1178 /* State has not changed, nothing to do */
1179 if (old_state == i->sink->state)
1180 return;
1181
1182 suspended = (i->sink->state == PA_SINK_SUSPENDED);
1183
1184 /* If the sink has been suspended, we need to handle this like
1185 * a sink change when the sink is resumed. Because the sink
1186 * is suspended, we can set the variables directly. */
1187 if (suspended) {
1188 u->output_thread_info.pop_called = false;
1189 u->output_thread_info.first_pop_done = false;
1190
1191 } else
1192 /* Set effective source latency on unsuspend */
1193 update_effective_source_latency(u, u->source_output->source, u->sink_input->sink);
1194
1195 pa_source_output_cork(u->source_output, suspended);
1196
1197 update_adjust_timer(u);
1198 }
1199
1200 /* Called from output thread context */
update_sink_latency_range_cb(pa_sink_input * i)1201 static void update_sink_latency_range_cb(pa_sink_input *i) {
1202 struct userdata *u;
1203
1204 pa_sink_input_assert_ref(i);
1205 pa_sink_input_assert_io_context(i);
1206 pa_assert_se(u = i->userdata);
1207
1208 /* Sink latency may have changed */
1209 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), LOOPBACK_MESSAGE_SINK_LATENCY_RANGE_CHANGED, NULL, 0, NULL, NULL);
1210 }
1211
1212 /* Called from main context */
loopback_process_msg_cb(pa_msgobject * o,int code,void * userdata,int64_t offset,pa_memchunk * chunk)1213 static int loopback_process_msg_cb(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1214 struct loopback_msg *msg;
1215 struct userdata *u;
1216 pa_usec_t current_latency;
1217
1218 pa_assert(o);
1219 pa_assert_ctl_context();
1220
1221 msg = LOOPBACK_MSG(o);
1222 pa_assert_se(u = msg->userdata);
1223
1224 switch (code) {
1225
1226 case LOOPBACK_MESSAGE_SOURCE_LATENCY_RANGE_CHANGED:
1227
1228 update_effective_source_latency(u, u->source_output->source, u->sink_input->sink);
1229 current_latency = pa_source_get_requested_latency(u->source_output->source);
1230 if (current_latency > u->configured_source_latency) {
1231 /* The minimum latency has changed to a value larger than the configured latency, so
1232 * the source latency has been increased. The case that the minimum latency changes
1233 * back to a smaller value is not handled because this never happens with the current
1234 * source implementations. */
1235 pa_log_warn("Source minimum latency increased to %0.2f ms", (double)current_latency / PA_USEC_PER_MSEC);
1236 u->configured_source_latency = current_latency;
1237 update_latency_boundaries(u, u->source_output->source, u->sink_input->sink);
1238 /* We re-start counting when the latency has changed */
1239 u->iteration_counter = 0;
1240 u->underrun_counter = 0;
1241 }
1242
1243 return 0;
1244
1245 case LOOPBACK_MESSAGE_SINK_LATENCY_RANGE_CHANGED:
1246
1247 current_latency = pa_sink_get_requested_latency(u->sink_input->sink);
1248 if (current_latency > u->configured_sink_latency) {
1249 /* The minimum latency has changed to a value larger than the configured latency, so
1250 * the sink latency has been increased. The case that the minimum latency changes back
1251 * to a smaller value is not handled because this never happens with the current sink
1252 * implementations. */
1253 pa_log_warn("Sink minimum latency increased to %0.2f ms", (double)current_latency / PA_USEC_PER_MSEC);
1254 u->configured_sink_latency = current_latency;
1255 update_latency_boundaries(u, u->source_output->source, u->sink_input->sink);
1256 /* We re-start counting when the latency has changed */
1257 u->iteration_counter = 0;
1258 u->underrun_counter = 0;
1259 }
1260
1261 return 0;
1262
1263 case LOOPBACK_MESSAGE_UNDERRUN:
1264
1265 u->underrun_counter++;
1266 pa_log_debug("Underrun detected, counter incremented to %u", u->underrun_counter);
1267
1268 return 0;
1269
1270 }
1271
1272 return 0;
1273 }
1274
sink_port_latency_offset_changed_cb(pa_core * core,pa_sink * sink,struct userdata * u)1275 static pa_hook_result_t sink_port_latency_offset_changed_cb(pa_core *core, pa_sink *sink, struct userdata *u) {
1276
1277 if (sink != u->sink_input->sink)
1278 return PA_HOOK_OK;
1279
1280 u->sink_latency_offset = sink->port_latency_offset;
1281 update_minimum_latency(u, sink, true);
1282
1283 return PA_HOOK_OK;
1284 }
1285
source_port_latency_offset_changed_cb(pa_core * core,pa_source * source,struct userdata * u)1286 static pa_hook_result_t source_port_latency_offset_changed_cb(pa_core *core, pa_source *source, struct userdata *u) {
1287
1288 if (source != u->source_output->source)
1289 return PA_HOOK_OK;
1290
1291 u->source_latency_offset = source->port_latency_offset;
1292 update_minimum_latency(u, u->sink_input->sink, true);
1293
1294 return PA_HOOK_OK;
1295 }
1296
pa__init(pa_module * m)1297 int pa__init(pa_module *m) {
1298 pa_modargs *ma = NULL;
1299 struct userdata *u;
1300 pa_sink *sink = NULL;
1301 pa_sink_input_new_data sink_input_data;
1302 bool sink_dont_move;
1303 pa_source *source = NULL;
1304 pa_source_output_new_data source_output_data;
1305 bool source_dont_move;
1306 uint32_t latency_msec;
1307 uint32_t max_latency_msec;
1308 uint32_t fast_adjust_threshold;
1309 pa_sample_spec ss;
1310 pa_channel_map map;
1311 bool format_set = false;
1312 bool rate_set = false;
1313 bool channels_set = false;
1314 pa_memchunk silence;
1315 uint32_t adjust_time_sec;
1316 const char *n;
1317 bool remix = true;
1318
1319 pa_assert(m);
1320
1321 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1322 pa_log("Failed to parse module arguments");
1323 goto fail;
1324 }
1325
1326 n = pa_modargs_get_value(ma, "source", NULL);
1327 if (n && !(source = pa_namereg_get(m->core, n, PA_NAMEREG_SOURCE))) {
1328 pa_log("No such source.");
1329 goto fail;
1330 }
1331
1332 n = pa_modargs_get_value(ma, "sink", NULL);
1333 if (n && !(sink = pa_namereg_get(m->core, n, PA_NAMEREG_SINK))) {
1334 pa_log("No such sink.");
1335 goto fail;
1336 }
1337
1338 if (pa_modargs_get_value_boolean(ma, "remix", &remix) < 0) {
1339 pa_log("Invalid boolean remix parameter");
1340 goto fail;
1341 }
1342
1343 if (source) {
1344 ss = source->sample_spec;
1345 map = source->channel_map;
1346 format_set = true;
1347 rate_set = true;
1348 channels_set = true;
1349 } else if (sink) {
1350 ss = sink->sample_spec;
1351 map = sink->channel_map;
1352 format_set = true;
1353 rate_set = true;
1354 channels_set = true;
1355 } else {
1356 /* FIXME: Dummy stream format, needed because pa_sink_input_new()
1357 * requires valid sample spec and channel map even when all the FIX_*
1358 * stream flags are specified. pa_sink_input_new() should be changed
1359 * to ignore the sample spec and channel map when the FIX_* flags are
1360 * present. */
1361 ss.format = PA_SAMPLE_U8;
1362 ss.rate = 8000;
1363 ss.channels = 1;
1364 map.channels = 1;
1365 map.map[0] = PA_CHANNEL_POSITION_MONO;
1366 }
1367
1368 if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1369 pa_log("Invalid sample format specification or channel map");
1370 goto fail;
1371 }
1372
1373 if (ss.rate < 4000 || ss.rate > PA_RATE_MAX) {
1374 pa_log("Invalid rate specification, valid range is 4000 Hz to %i Hz", PA_RATE_MAX);
1375 goto fail;
1376 }
1377
1378 if (pa_modargs_get_value(ma, "format", NULL))
1379 format_set = true;
1380
1381 if (pa_modargs_get_value(ma, "rate", NULL))
1382 rate_set = true;
1383
1384 if (pa_modargs_get_value(ma, "channels", NULL) || pa_modargs_get_value(ma, "channel_map", NULL))
1385 channels_set = true;
1386
1387 latency_msec = DEFAULT_LATENCY_MSEC;
1388 if (pa_modargs_get_value_u32(ma, "latency_msec", &latency_msec) < 0 || latency_msec < 1 || latency_msec > 30000) {
1389 pa_log("Invalid latency specification");
1390 goto fail;
1391 }
1392
1393 fast_adjust_threshold = 0;
1394 if (pa_modargs_get_value_u32(ma, "fast_adjust_threshold_msec", &fast_adjust_threshold) < 0 || (fast_adjust_threshold != 0 && fast_adjust_threshold < 100)) {
1395 pa_log("Invalid fast adjust threshold specification");
1396 goto fail;
1397 }
1398
1399 max_latency_msec = 0;
1400 if (pa_modargs_get_value_u32(ma, "max_latency_msec", &max_latency_msec) < 0) {
1401 pa_log("Invalid maximum latency specification");
1402 goto fail;
1403 }
1404
1405 if (max_latency_msec > 0 && max_latency_msec < latency_msec) {
1406 pa_log_warn("Configured maximum latency is smaller than latency, using latency instead");
1407 max_latency_msec = latency_msec;
1408 }
1409
1410 m->userdata = u = pa_xnew0(struct userdata, 1);
1411 u->core = m->core;
1412 u->module = m;
1413 u->latency = (pa_usec_t) latency_msec * PA_USEC_PER_MSEC;
1414 u->max_latency = (pa_usec_t) max_latency_msec * PA_USEC_PER_MSEC;
1415 u->output_thread_info.pop_called = false;
1416 u->output_thread_info.pop_adjust = false;
1417 u->output_thread_info.push_called = false;
1418 u->iteration_counter = 0;
1419 u->underrun_counter = 0;
1420 u->underrun_latency_limit = 0;
1421 u->source_sink_changed = true;
1422 u->real_adjust_time_sum = 0;
1423 u->adjust_counter = 0;
1424 u->fast_adjust_threshold = fast_adjust_threshold * PA_USEC_PER_MSEC;
1425
1426 adjust_time_sec = DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC;
1427 if (pa_modargs_get_value_u32(ma, "adjust_time", &adjust_time_sec) < 0) {
1428 pa_log("Failed to parse adjust_time value");
1429 goto fail;
1430 }
1431
1432 if (adjust_time_sec != DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC)
1433 u->adjust_time = adjust_time_sec * PA_USEC_PER_SEC;
1434 else
1435 u->adjust_time = DEFAULT_ADJUST_TIME_USEC;
1436
1437 u->real_adjust_time = u->adjust_time;
1438
1439 pa_source_output_new_data_init(&source_output_data);
1440 source_output_data.driver = __FILE__;
1441 source_output_data.module = m;
1442 if (source)
1443 pa_source_output_new_data_set_source(&source_output_data, source, false, true);
1444
1445 if (pa_modargs_get_proplist(ma, "source_output_properties", source_output_data.proplist, PA_UPDATE_REPLACE) < 0) {
1446 pa_log("Failed to parse the source_output_properties value.");
1447 pa_source_output_new_data_done(&source_output_data);
1448 goto fail;
1449 }
1450
1451 if (!pa_proplist_contains(source_output_data.proplist, PA_PROP_MEDIA_ROLE))
1452 pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_ROLE, "abstract");
1453
1454 pa_source_output_new_data_set_sample_spec(&source_output_data, &ss);
1455 pa_source_output_new_data_set_channel_map(&source_output_data, &map);
1456 source_output_data.flags = PA_SOURCE_OUTPUT_START_CORKED;
1457
1458 if (!remix)
1459 source_output_data.flags |= PA_SOURCE_OUTPUT_NO_REMIX;
1460
1461 if (!format_set)
1462 source_output_data.flags |= PA_SOURCE_OUTPUT_FIX_FORMAT;
1463
1464 if (!rate_set)
1465 source_output_data.flags |= PA_SOURCE_OUTPUT_FIX_RATE;
1466
1467 if (!channels_set)
1468 source_output_data.flags |= PA_SOURCE_OUTPUT_FIX_CHANNELS;
1469
1470 source_dont_move = false;
1471 if (pa_modargs_get_value_boolean(ma, "source_dont_move", &source_dont_move) < 0) {
1472 pa_log("source_dont_move= expects a boolean argument.");
1473 goto fail;
1474 }
1475
1476 if (source_dont_move)
1477 source_output_data.flags |= PA_SOURCE_OUTPUT_DONT_MOVE;
1478
1479 pa_source_output_new(&u->source_output, m->core, &source_output_data);
1480 pa_source_output_new_data_done(&source_output_data);
1481
1482 if (!u->source_output)
1483 goto fail;
1484
1485 u->source_output->parent.process_msg = source_output_process_msg_cb;
1486 u->source_output->push = source_output_push_cb;
1487 u->source_output->process_rewind = source_output_process_rewind_cb;
1488 u->source_output->kill = source_output_kill_cb;
1489 u->source_output->attach = source_output_attach_cb;
1490 u->source_output->detach = source_output_detach_cb;
1491 u->source_output->may_move_to = source_output_may_move_to_cb;
1492 u->source_output->moving = source_output_moving_cb;
1493 u->source_output->suspend = source_output_suspend_cb;
1494 u->source_output->update_source_latency_range = update_source_latency_range_cb;
1495 u->source_output->update_source_fixed_latency = update_source_latency_range_cb;
1496 u->source_output->userdata = u;
1497
1498 /* If format, rate or channels were originally unset, they are set now
1499 * after the pa_source_output_new() call. */
1500 ss = u->source_output->sample_spec;
1501 map = u->source_output->channel_map;
1502
1503 pa_sink_input_new_data_init(&sink_input_data);
1504 sink_input_data.driver = __FILE__;
1505 sink_input_data.module = m;
1506
1507 if (sink)
1508 pa_sink_input_new_data_set_sink(&sink_input_data, sink, false, true);
1509
1510 if (pa_modargs_get_proplist(ma, "sink_input_properties", sink_input_data.proplist, PA_UPDATE_REPLACE) < 0) {
1511 pa_log("Failed to parse the sink_input_properties value.");
1512 pa_sink_input_new_data_done(&sink_input_data);
1513 goto fail;
1514 }
1515
1516 if (!pa_proplist_contains(sink_input_data.proplist, PA_PROP_MEDIA_ROLE))
1517 pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_ROLE, "abstract");
1518
1519 pa_sink_input_new_data_set_sample_spec(&sink_input_data, &ss);
1520 pa_sink_input_new_data_set_channel_map(&sink_input_data, &map);
1521 sink_input_data.flags = PA_SINK_INPUT_VARIABLE_RATE | PA_SINK_INPUT_START_CORKED;
1522
1523 if (!remix)
1524 sink_input_data.flags |= PA_SINK_INPUT_NO_REMIX;
1525
1526 sink_dont_move = false;
1527 if (pa_modargs_get_value_boolean(ma, "sink_dont_move", &sink_dont_move) < 0) {
1528 pa_log("sink_dont_move= expects a boolean argument.");
1529 goto fail;
1530 }
1531
1532 if (sink_dont_move)
1533 sink_input_data.flags |= PA_SINK_INPUT_DONT_MOVE;
1534
1535 pa_sink_input_new(&u->sink_input, m->core, &sink_input_data);
1536 pa_sink_input_new_data_done(&sink_input_data);
1537
1538 if (!u->sink_input)
1539 goto fail;
1540
1541 u->sink_input->parent.process_msg = sink_input_process_msg_cb;
1542 u->sink_input->pop = sink_input_pop_cb;
1543 u->sink_input->process_rewind = sink_input_process_rewind_cb;
1544 u->sink_input->kill = sink_input_kill_cb;
1545 u->sink_input->state_change = sink_input_state_change_cb;
1546 u->sink_input->attach = sink_input_attach_cb;
1547 u->sink_input->detach = sink_input_detach_cb;
1548 u->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1549 u->sink_input->update_max_request = sink_input_update_max_request_cb;
1550 u->sink_input->may_move_to = sink_input_may_move_to_cb;
1551 u->sink_input->moving = sink_input_moving_cb;
1552 u->sink_input->suspend = sink_input_suspend_cb;
1553 u->sink_input->update_sink_latency_range = update_sink_latency_range_cb;
1554 u->sink_input->update_sink_fixed_latency = update_sink_latency_range_cb;
1555 u->sink_input->userdata = u;
1556
1557 update_latency_boundaries(u, u->source_output->source, u->sink_input->sink);
1558 set_sink_input_latency(u, u->sink_input->sink);
1559 set_source_output_latency(u, u->source_output->source);
1560
1561 pa_sink_input_get_silence(u->sink_input, &silence);
1562 u->memblockq = pa_memblockq_new(
1563 "module-loopback memblockq",
1564 0, /* idx */
1565 MEMBLOCKQ_MAXLENGTH, /* maxlength */
1566 MEMBLOCKQ_MAXLENGTH, /* tlength */
1567 &ss, /* sample_spec */
1568 0, /* prebuf */
1569 0, /* minreq */
1570 0, /* maxrewind */
1571 &silence); /* silence frame */
1572 pa_memblock_unref(silence.memblock);
1573 /* Fill the memblockq with silence */
1574 pa_memblockq_seek(u->memblockq, pa_usec_to_bytes(u->latency, &u->sink_input->sample_spec), PA_SEEK_RELATIVE, true);
1575
1576 u->asyncmsgq = pa_asyncmsgq_new(0);
1577 if (!u->asyncmsgq) {
1578 pa_log("pa_asyncmsgq_new() failed.");
1579 goto fail;
1580 }
1581
1582 if (!pa_proplist_contains(u->source_output->proplist, PA_PROP_MEDIA_NAME))
1583 pa_proplist_setf(u->source_output->proplist, PA_PROP_MEDIA_NAME, "Loopback to %s",
1584 pa_strnull(pa_proplist_gets(u->sink_input->sink->proplist, PA_PROP_DEVICE_DESCRIPTION)));
1585
1586 if (!pa_proplist_contains(u->source_output->proplist, PA_PROP_MEDIA_ICON_NAME)
1587 && (n = pa_proplist_gets(u->sink_input->sink->proplist, PA_PROP_DEVICE_ICON_NAME)))
1588 pa_proplist_sets(u->source_output->proplist, PA_PROP_MEDIA_ICON_NAME, n);
1589
1590 if (!pa_proplist_contains(u->sink_input->proplist, PA_PROP_MEDIA_NAME))
1591 pa_proplist_setf(u->sink_input->proplist, PA_PROP_MEDIA_NAME, "Loopback from %s",
1592 pa_strnull(pa_proplist_gets(u->source_output->source->proplist, PA_PROP_DEVICE_DESCRIPTION)));
1593
1594 if (source && !pa_proplist_contains(u->sink_input->proplist, PA_PROP_MEDIA_ICON_NAME)
1595 && (n = pa_proplist_gets(u->source_output->source->proplist, PA_PROP_DEVICE_ICON_NAME)))
1596 pa_proplist_sets(u->sink_input->proplist, PA_PROP_MEDIA_ICON_NAME, n);
1597
1598 /* Hooks to track changes of latency offsets */
1599 pa_module_hook_connect(m, &m->core->hooks[PA_CORE_HOOK_SINK_PORT_LATENCY_OFFSET_CHANGED],
1600 PA_HOOK_NORMAL, (pa_hook_cb_t) sink_port_latency_offset_changed_cb, u);
1601 pa_module_hook_connect(m, &m->core->hooks[PA_CORE_HOOK_SOURCE_PORT_LATENCY_OFFSET_CHANGED],
1602 PA_HOOK_NORMAL, (pa_hook_cb_t) source_port_latency_offset_changed_cb, u);
1603
1604 /* Setup message handler for main thread */
1605 u->msg = pa_msgobject_new(loopback_msg);
1606 u->msg->parent.process_msg = loopback_process_msg_cb;
1607 u->msg->userdata = u;
1608
1609 /* The output thread is not yet running, set effective_source_latency directly */
1610 update_effective_source_latency(u, u->source_output->source, NULL);
1611
1612 pa_sink_input_put(u->sink_input);
1613 pa_source_output_put(u->source_output);
1614
1615 if (u->source_output->source->state != PA_SOURCE_SUSPENDED)
1616 pa_sink_input_cork(u->sink_input, false);
1617
1618 if (u->sink_input->sink->state != PA_SINK_SUSPENDED)
1619 pa_source_output_cork(u->source_output, false);
1620
1621 update_adjust_timer(u);
1622
1623 pa_modargs_free(ma);
1624 return 0;
1625
1626 fail:
1627 if (ma)
1628 pa_modargs_free(ma);
1629
1630 pa__done(m);
1631
1632 return -1;
1633 }
1634
pa__done(pa_module * m)1635 void pa__done(pa_module*m) {
1636 struct userdata *u;
1637
1638 pa_assert(m);
1639
1640 if (!(u = m->userdata))
1641 return;
1642
1643 teardown(u);
1644
1645 if (u->memblockq)
1646 pa_memblockq_free(u->memblockq);
1647
1648 if (u->asyncmsgq)
1649 pa_asyncmsgq_unref(u->asyncmsgq);
1650
1651 pa_xfree(u);
1652 }
1653