1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 // GCC does not warn for unused *static inline* functions, but clang does.
17 #ifdef __clang__
18 #pragma clang diagnostic push
19 #pragma clang diagnostic ignored "-Wunused-function"
20 #endif
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <stdio.h>
27
28 #include <pulse/xmalloc.h>
29
30 #include <pulsecore/sink-input.h>
31 #include <pulsecore/module.h>
32 #include <pulsecore/modargs.h>
33 #include <pulsecore/namereg.h>
34 #include <pulsecore/log.h>
35 #include <pulsecore/core-util.h>
36
37 #include <pulse/rtclock.h>
38 #include <pulse/timeval.h>
39
40 #include "audio_log.h"
41
42 PA_MODULE_AUTHOR("OpenHarmony");
43 PA_MODULE_DESCRIPTION(_("Loopback from source to sink"));
44 PA_MODULE_VERSION(PACKAGE_VERSION);
45 PA_MODULE_LOAD_ONCE(false);
46 PA_MODULE_USAGE(
47 "source=<source to connect to> "
48 "sink=<sink to connect to> ");
49
50 #define DEFAULT_LATENCY_MSEC 200
51 #define MAX_LATENCY_MSEC 30000
52 #define MEMBLOCKQ_MAXLENGTH (1024 * 1024 * 32)
53 #define MIN_DEVICE_LATENCY (2.5 * PA_USEC_PER_MSEC)
54 #define DEFAULT_ADJUST_TIME_USEC (10 * PA_USEC_PER_SEC)
55 #define MAX_FAST_ADJUST_THRESHOLD 100
56 #define MIN_SAMPLE_RATE 4000
57 #define DEFAULT_SAMPLE_RATE 8000
58 #define MULTIPLE_FACTOR 2
59 #define TRIPLE_FACTOR 3
60 #define ADJUST_TIME_FOR_CHANGE (333 * PA_USEC_PER_MSEC)
61 #define TIME_HOUR_TO_SECOND_UNIT 3600
62 #define MIN_VISIBLE_UNDERRUN_COUNTER 2
63 #define SLEEP_ADJUST_TIME_MULTIPLE_TIMES 1.05
64 #define LATENCY_OFFSET_ADJUST_TIMES 0.01
65 #define LATENCY_INCREMENT_FIVE_MSEC (5 * PA_USEC_PER_MSEC)
66
67 typedef struct loopback_msg loopback_msg;
68
69 struct userdata {
70 pa_core *core;
71 pa_module *module;
72
73 loopback_msg *msg;
74
75 pa_sink_input *sink_input;
76 pa_source_output *source_output;
77
78 pa_asyncmsgq *asyncmsgq;
79 pa_memblockq *memblockq;
80
81 pa_rtpoll_item *rtpoll_item_read, *rtpoll_item_write;
82
83 pa_time_event *time_event;
84
85 /* Variables used to calculate the average time between
86 * subsequent calls of AdjustRates() */
87 pa_usec_t adjust_time_stamp;
88 pa_usec_t real_adjust_time;
89 pa_usec_t real_adjust_time_sum;
90
91 /* Values from command line configuration */
92 pa_usec_t latency;
93 pa_usec_t max_latency;
94 pa_usec_t adjust_time;
95 pa_usec_t fast_adjust_threshold;
96
97 /* Latency boundaries and current values */
98 pa_usec_t min_source_latency;
99 pa_usec_t max_source_latency;
100 pa_usec_t min_sink_latency;
101 pa_usec_t max_sink_latency;
102 pa_usec_t configured_sink_latency;
103 pa_usec_t configured_source_latency;
104 int64_t source_latency_offset;
105 int64_t sink_latency_offset;
106 pa_usec_t minimum_latency;
107
108 /* lower latency limit found by underruns */
109 pa_usec_t underrun_latency_limit;
110
111 /* Various counters */
112 uint32_t iteration_counter;
113 uint32_t underrun_counter;
114 uint32_t adjust_counter;
115
116 bool fixed_alsa_source;
117 bool source_sink_changed;
118
119 pa_sample_spec ss;
120 pa_channel_map map;
121
122 /* Used for sink input and source output snapshots */
123 struct {
124 int64_t send_counter;
125 int64_t source_latency;
126 pa_usec_t source_timestamp;
127
128 int64_t recv_counter;
129 size_t loopback_memblockq_length;
130 int64_t sink_latency;
131 pa_usec_t sink_timestamp;
132 } latency_snapshot;
133
134 /* Input thread variable */
135 int64_t send_counter;
136
137 /* Output thread variables */
138 struct {
139 int64_t recv_counter;
140 pa_usec_t effective_source_latency;
141
142 /* Copied from main thread */
143 pa_usec_t minimum_latency;
144
145 /* Various booleans */
146 bool in_pop;
147 bool pop_called;
148 bool pop_adjust;
149 bool first_pop_done;
150 bool push_called;
151 } output_thread_info;
152 };
153
154 struct loopback_msg {
155 pa_msgobject parent;
156 struct userdata *userdata;
157 };
158
159 PA_DEFINE_PRIVATE_CLASS(loopback_msg, pa_msgobject);
160 #define LOOPBACK_MSG(o) (loopback_msg_cast(o))
161
162 static const char * const VALID_MODARGS[] = {
163 "source",
164 "sink",
165 NULL,
166 };
167
168 enum {
169 SINK_INPUT_MESSAGE_POST = PA_SINK_INPUT_MESSAGE_MAX,
170 SINK_INPUT_MESSAGE_REWIND,
171 SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT,
172 SINK_INPUT_MESSAGE_SOURCE_CHANGED,
173 SINK_INPUT_MESSAGE_SET_EFFECTIVE_SOURCE_LATENCY,
174 SINK_INPUT_MESSAGE_UPDATE_MIN_LATENCY,
175 SINK_INPUT_MESSAGE_FAST_ADJUST,
176 };
177
178 enum {
179 SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT = PA_SOURCE_OUTPUT_MESSAGE_MAX,
180 };
181
182 enum {
183 LOOPBACK_MESSAGE_SOURCE_LATENCY_RANGE_CHANGED,
184 LOOPBACK_MESSAGE_SINK_LATENCY_RANGE_CHANGED,
185 LOOPBACK_MESSAGE_UNDERRUN,
186 };
187
188 static void EnableAdjustTimer(struct userdata *u, bool enable);
189
190 /* Called from main context */
TearDown(struct userdata * u)191 static void TearDown(struct userdata *u)
192 {
193 pa_assert(u);
194 pa_assert_ctl_context();
195
196 u->adjust_time = 0;
197 EnableAdjustTimer(u, false);
198
199 /* Handling the asyncmsgq between the source output and the sink input
200 * requires some care. When the source output is unlinked, nothing needs
201 * to be done for the asyncmsgq, because the source output is the sending
202 * end. But when the sink input is unlinked, we should ensure that the
203 * asyncmsgq is emptied, because the messages in the queue hold references
204 * to the sink input. Also, we need to ensure that new messages won't be
205 * written to the queue after we have emptied it.
206 *
207 * Emptying the queue can be done in the state_change() callback of the
208 * sink input, when the new state is "unlinked".
209 *
210 * Preventing new messages from being written to the queue can be achieved
211 * by unlinking the source output before unlinking the sink input. There
212 * are no other writers for that queue, so this is sufficient. */
213
214 if (u->source_output) {
215 pa_source_output_unlink(u->source_output);
216 pa_source_output_unref(u->source_output);
217 u->source_output = NULL;
218 }
219
220 if (u->sink_input) {
221 pa_sink_input_unlink(u->sink_input);
222 pa_sink_input_unref(u->sink_input);
223 u->sink_input = NULL;
224 }
225 }
226
227 /* rate controller, called from main context
228 * - maximum deviation from base rate is less than 1%
229 * - can create audible artifacts by changing the rate too quickly
230 * - exhibits hunting with USB or Bluetooth sources
231 */
RateController(uint32_t baseRate,pa_usec_t adjustTime,int32_t latencyDifferenceUsec)232 static uint32_t RateController(uint32_t baseRate, pa_usec_t adjustTime, int32_t latencyDifferenceUsec)
233 {
234 uint32_t newRate;
235 double minCycles;
236
237 /* Calculate best rate to correct the current latency offset, limit at
238 * slightly below 1% difference from base_rate */
239 minCycles = (double)abs(latencyDifferenceUsec) / adjustTime / LATENCY_OFFSET_ADJUST_TIMES + 1;
240 newRate = baseRate * (1.0 + (double)latencyDifferenceUsec / minCycles / adjustTime);
241
242 return newRate;
243 }
244
245 /* Called from main thread.
246 * It has been a matter of discussion how to correctly calculate the minimum
247 * latency that module-loopback can deliver with a given source and sink.
248 * The calculation has been placed in a separate function so that the definition
249 * can easily be changed. The resulting estimate is not very exact because it
250 * depends on the reported latency ranges. In cases were the lower bounds of
251 * source and sink latency are not reported correctly (USB) the result will
252 * be wrong. */
UpdateMinimumLatency(struct userdata * u,pa_sink * sink,bool printMsg)253 static void UpdateMinimumLatency(struct userdata *u, pa_sink *sink, bool printMsg)
254 {
255 if (u->underrun_latency_limit) {
256 /* If we already detected a real latency limit because of underruns, use it */
257 u->minimum_latency = u->underrun_latency_limit;
258 } else {
259 /* Calculate latency limit from latency ranges */
260
261 u->minimum_latency = u->min_sink_latency;
262 if (u->fixed_alsa_source) {
263 /* If we are using an alsa source with fixed latency, we will get a wakeup when
264 * one fragment is filled, and then we empty the source buffer, so the source
265 * latency never grows much beyond one fragment (assuming that the CPU doesn't
266 * cause a bottleneck). */
267 u->minimum_latency += u->core->default_fragment_size_msec * PA_USEC_PER_MSEC;
268 } else {
269 /* In all other cases the source will deliver new data at latest after one source latency.
270 * Make sure there is enough data available that the sink can keep on playing until new
271 * data is pushed. */
272 u->minimum_latency += u->min_source_latency;
273 }
274
275 /* Multiply by 1.1 as a safety margin for delays that are proportional to the buffer sizes */
276 u->minimum_latency *= 1.1;
277
278 /* Add 1.5 ms as a safety margin for delays not related to the buffer sizes */
279 u->minimum_latency += 1.5 * PA_USEC_PER_MSEC;
280 }
281
282 /* Add the latency offsets */
283 if (-(u->sink_latency_offset + u->source_latency_offset) <= (int64_t)u->minimum_latency) {
284 u->minimum_latency += u->sink_latency_offset + u->source_latency_offset;
285 } else {
286 u->minimum_latency = 0;
287 }
288
289 /* If the sink is valid, send a message to update the minimum latency to
290 * the output thread, else set the variable directly */
291 if (sink) {
292 pa_asyncmsgq_send(sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_UPDATE_MIN_LATENCY,
293 NULL, u->minimum_latency, NULL);
294 } else {
295 u->output_thread_info.minimum_latency = u->minimum_latency;
296 }
297
298 if (printMsg) {
299 AUDIO_INFO_LOG("Minimum possible end to end latency: %0.2f ms", (double)u->minimum_latency / PA_USEC_PER_MSEC);
300 if (u->latency < u->minimum_latency) {
301 AUDIO_WARNING_LOG("Configured latency of %0.2f ms is smaller than minimum latency, using minimum instead",
302 (double)u->latency / PA_USEC_PER_MSEC);
303 }
304 }
305 }
306
CalculateAdjustTime(struct userdata * u,uint32_t * baseRate,int32_t * latencyDifference)307 static int CalculateAdjustTime(struct userdata *u, uint32_t *baseRate, int32_t *latencyDifference)
308 {
309 size_t buffer;
310 uint32_t oldRate;
311 pa_usec_t currentBufferLatency, snapshotDelay;
312 int64_t currentSourceSinkLatency, currentLatency, latencyAtOptimumRate;
313 pa_usec_t finalLatency, now, timePassed;
314
315 now = pa_rtclock_now();
316 timePassed = now - u->adjust_time_stamp;
317 if (!u->source_sink_changed && timePassed < u->adjust_time * SLEEP_ADJUST_TIME_MULTIPLE_TIMES) {
318 u->adjust_counter++;
319 u->real_adjust_time_sum += timePassed;
320 u->real_adjust_time = u->real_adjust_time_sum / u->adjust_counter;
321 }
322 u->adjust_time_stamp = now;
323
324 /* Rates and latencies */
325 oldRate = u->sink_input->sample_spec.rate;
326 *baseRate = u->source_output->sample_spec.rate;
327
328 buffer = u->latency_snapshot.loopback_memblockq_length;
329 if (u->latency_snapshot.recv_counter <= u->latency_snapshot.send_counter) {
330 buffer += (size_t) (u->latency_snapshot.send_counter - u->latency_snapshot.recv_counter);
331 } else {
332 buffer = PA_CLIP_SUB(buffer, (size_t) (u->latency_snapshot.recv_counter - u->latency_snapshot.send_counter));
333 }
334
335 currentBufferLatency = pa_bytes_to_usec(buffer, &u->sink_input->sample_spec);
336 snapshotDelay = u->latency_snapshot.source_timestamp - u->latency_snapshot.sink_timestamp;
337 currentSourceSinkLatency = u->latency_snapshot.sink_latency +
338 u->latency_snapshot.source_latency - snapshotDelay;
339
340 /* Current latency */
341 currentLatency = currentSourceSinkLatency + currentBufferLatency;
342
343 /* Latency at base rate */
344 latencyAtOptimumRate = currentSourceSinkLatency + currentBufferLatency * oldRate / (*baseRate);
345
346 finalLatency = PA_MAX(u->latency, u->minimum_latency);
347 *latencyDifference = (int32_t)(latencyAtOptimumRate - finalLatency);
348
349 AUDIO_DEBUG_LOG("Loopback overall latency is %0.2f ms + %0.2f ms + %0.2f ms = %0.2f ms",
350 (double) u->latency_snapshot.sink_latency / PA_USEC_PER_MSEC,
351 (double) currentBufferLatency / PA_USEC_PER_MSEC,
352 (double) u->latency_snapshot.source_latency / PA_USEC_PER_MSEC,
353 (double) currentLatency / PA_USEC_PER_MSEC);
354
355 AUDIO_DEBUG_LOG("Loopback latency at base rate is %0.2f ms", (double)latencyAtOptimumRate / PA_USEC_PER_MSEC);
356
357 /* Drop or insert samples if fast_adjust_threshold_msec was specified and the latency difference is too large. */
358 if (u->fast_adjust_threshold > 0 && abs((*latencyDifference)) > u->fast_adjust_threshold) {
359 AUDIO_DEBUG_LOG ("Latency difference larger than %" PRIu64 " msec, skipping or inserting samples.",
360 u->fast_adjust_threshold / PA_USEC_PER_MSEC);
361
362 pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input),
363 SINK_INPUT_MESSAGE_FAST_ADJUST, NULL, currentSourceSinkLatency, NULL);
364
365 /* Skip real adjust time calculation on next iteration. */
366 u->source_sink_changed = true;
367 return -1;
368 }
369 return 0;
370 }
371
372 /* Called from main context */
AdjustRates(struct userdata * u)373 static void AdjustRates(struct userdata *u)
374 {
375 uint32_t baseRate, newRate, runHours;
376 int32_t latencyDifference;
377
378 pa_assert(u);
379 pa_assert_ctl_context();
380
381 /* Runtime and counters since last change of source or sink
382 * or source/sink latency */
383 runHours = u->iteration_counter * u->real_adjust_time / PA_USEC_PER_SEC / TIME_HOUR_TO_SECOND_UNIT;
384 u->iteration_counter += 1;
385
386 /* If we are seeing underruns then the latency is too small */
387 if (u->underrun_counter > MIN_VISIBLE_UNDERRUN_COUNTER) {
388 pa_usec_t target_latency;
389
390 target_latency = PA_MAX(u->latency, u->minimum_latency) + LATENCY_INCREMENT_FIVE_MSEC;
391 if (u->max_latency == 0 || target_latency < u->max_latency) {
392 u->underrun_latency_limit =
393 PA_CLIP_SUB((int64_t)target_latency, u->sink_latency_offset + u->source_latency_offset);
394 AUDIO_WARNING_LOG("Too many underruns, increasing latency to %0.2f ms",
395 (double)target_latency / PA_USEC_PER_MSEC);
396 } else {
397 u->underrun_latency_limit = PA_CLIP_SUB((int64_t)u->max_latency,
398 u->sink_latency_offset + u->source_latency_offset);
399 AUDIO_WARNING_LOG("Too many underruns, configured maximum latency of %0.2f ms is reached",
400 (double)u->max_latency / PA_USEC_PER_MSEC);
401 AUDIO_WARNING_LOG("Consider increasing the max_latency_msec");
402 }
403
404 UpdateMinimumLatency(u, u->sink_input->sink, false);
405 u->underrun_counter = 0;
406 }
407
408 /* Allow one underrun per hour */
409 if (u->iteration_counter * u->real_adjust_time / PA_USEC_PER_SEC / TIME_HOUR_TO_SECOND_UNIT > runHours) {
410 u->underrun_counter = PA_CLIP_SUB(u->underrun_counter, 1u);
411 AUDIO_INFO_LOG("Underrun counter: %u", u->underrun_counter);
412 }
413
414 /* Calculate real adjust time if source or sink did not change and if the system has
415 * not been suspended. If the time between two calls is more than 5% longer than the
416 * configured adjust time, we assume that the system has been sleeping and skip the
417 * calculation for this iteration. */
418 if (CalculateAdjustTime(u, &baseRate, &latencyDifference) != 0) {
419 return;
420 }
421
422 /* Calculate new rate */
423 newRate = RateController(baseRate, u->real_adjust_time, latencyDifference);
424
425 u->source_sink_changed = false;
426
427 /* Set rate */
428 pa_sink_input_set_rate(u->sink_input, newRate);
429 AUDIO_DEBUG_LOG("[%s] Updated sampling rate to %lu Hz.", u->sink_input->sink->name, (unsigned long) newRate);
430 }
431
432 /* Called from main context */
TimeCallback(pa_mainloop_api * a,pa_time_event * e,const struct timeval * t,void * userdata)433 static void TimeCallback(pa_mainloop_api *a, pa_time_event *e, const struct timeval *t, void *userdata)
434 {
435 struct userdata *u = userdata;
436
437 pa_assert(u);
438 pa_assert(a);
439 pa_assert(u->time_event == e);
440
441 /* Restart timer right away */
442 pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
443
444 /* Get sink and source latency snapshot */
445 pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq,
446 PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, NULL, 0, NULL);
447 pa_asyncmsgq_send(u->source_output->source->asyncmsgq,
448 PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT, NULL, 0, NULL);
449
450 AdjustRates(u);
451 }
452
453 /* Called from main context
454 * When source or sink changes,
455 * give it a third of a second to settle down, then call AdjustRates for the first time */
EnableAdjustTimer(struct userdata * u,bool enable)456 static void EnableAdjustTimer(struct userdata *u, bool enable)
457 {
458 if (enable) {
459 if (!u->adjust_time) {
460 return;
461 }
462 if (u->time_event) {
463 u->core->mainloop->time_free(u->time_event);
464 }
465
466 u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + ADJUST_TIME_FOR_CHANGE, TimeCallback, u);
467 } else {
468 if (!u->time_event) {
469 return;
470 }
471
472 u->core->mainloop->time_free(u->time_event);
473 u->time_event = NULL;
474 }
475 }
476
477 /* Called from main context */
UpdateAdjustTimer(struct userdata * u)478 static void UpdateAdjustTimer(struct userdata *u)
479 {
480 if (u->sink_input->state == PA_SINK_INPUT_CORKED || u->source_output->state == PA_SOURCE_OUTPUT_CORKED) {
481 EnableAdjustTimer(u, false);
482 } else {
483 EnableAdjustTimer(u, true);
484 }
485 }
486
UpdateFixedLatency(struct userdata * u,pa_source * source)487 static void UpdateFixedLatency(struct userdata *u, pa_source *source)
488 {
489 if (u == NULL || source == NULL) {
490 return;
491 }
492 u->min_source_latency = pa_source_get_fixed_latency(source);
493 u->max_source_latency = u->min_source_latency;
494 const char *s;
495 if ((s = pa_proplist_gets(source->proplist, PA_PROP_DEVICE_API))) {
496 if (pa_streq(s, "alsa")) {
497 u->fixed_alsa_source = true;
498 }
499 }
500 }
501
502 /* Called from main thread
503 * Calculates minimum and maximum possible latency for source and sink */
UpdateLatencyBoundaries(struct userdata * u,pa_source * source,pa_sink * sink)504 static void UpdateLatencyBoundaries(struct userdata *u, pa_source *source, pa_sink *sink)
505 {
506 if (source) {
507 /* Source latencies */
508 u->fixed_alsa_source = false;
509 if (source->flags & PA_SOURCE_DYNAMIC_LATENCY) {
510 pa_source_get_latency_range(source, &u->min_source_latency, &u->max_source_latency);
511 } else {
512 UpdateFixedLatency(u, source);
513 }
514 /* Source offset */
515 u->source_latency_offset = source->port_latency_offset;
516
517 /* Latencies below 2.5 ms cause problems, limit source latency if possible */
518 if (u->max_source_latency >= MIN_DEVICE_LATENCY) {
519 u->min_source_latency = PA_MAX(u->min_source_latency, MIN_DEVICE_LATENCY);
520 } else {
521 u->min_source_latency = u->max_source_latency;
522 }
523 }
524
525 if (sink) {
526 /* Sink latencies */
527 if (sink->flags & PA_SINK_DYNAMIC_LATENCY) {
528 pa_sink_get_latency_range(sink, &u->min_sink_latency, &u->max_sink_latency);
529 } else {
530 u->min_sink_latency = pa_sink_get_fixed_latency(sink);
531 u->max_sink_latency = u->min_sink_latency;
532 }
533 /* Sink offset */
534 u->sink_latency_offset = sink->port_latency_offset;
535
536 /* Latencies below 2.5 ms cause problems, limit sink latency if possible */
537 if (u->max_sink_latency >= MIN_DEVICE_LATENCY) {
538 u->min_sink_latency = PA_MAX(u->min_sink_latency, MIN_DEVICE_LATENCY);
539 } else {
540 u->min_sink_latency = u->max_sink_latency;
541 }
542 }
543
544 UpdateMinimumLatency(u, sink, true);
545 }
546
547 /* Called from output context
548 * Sets the memblockq to the configured latency corrected by latency_offset_usec */
MemblockqAdjust(struct userdata * u,int64_t latencyOffsetUsec,bool allowPush)549 static void MemblockqAdjust(struct userdata *u, int64_t latencyOffsetUsec, bool allowPush)
550 {
551 size_t currentMemblockqLength, requestedMemblockqLength, bufferCorrection;
552 int64_t requestedBufferLatency;
553 pa_usec_t finalLatency, requestedSinkLatency;
554
555 finalLatency = PA_MAX(u->latency, u->output_thread_info.minimum_latency);
556
557 /* If source or sink have some large negative latency offset, we might want to
558 * hold more than final_latency in the memblockq */
559 requestedBufferLatency = (int64_t)finalLatency - latencyOffsetUsec;
560
561 /* Keep at least one sink latency in the queue to make sure that the sink
562 * never underruns initially */
563 requestedSinkLatency = pa_sink_get_requested_latency_within_thread(u->sink_input->sink);
564 if (requestedBufferLatency < (int64_t)requestedSinkLatency)
565 requestedBufferLatency = requestedSinkLatency;
566
567 requestedMemblockqLength = pa_usec_to_bytes(requestedBufferLatency, &u->sink_input->sample_spec);
568 currentMemblockqLength = pa_memblockq_get_length(u->memblockq);
569 if (currentMemblockqLength > requestedMemblockqLength) {
570 /* Drop audio from queue */
571 bufferCorrection = currentMemblockqLength - requestedMemblockqLength;
572 AUDIO_INFO_LOG("Dropping %" PRIu64 " usec of audio from queue",
573 pa_bytes_to_usec(bufferCorrection, &u->sink_input->sample_spec));
574 pa_memblockq_drop(u->memblockq, bufferCorrection);
575 } else if (currentMemblockqLength < requestedMemblockqLength && allowPush) {
576 /* Add silence to queue */
577 bufferCorrection = requestedMemblockqLength - currentMemblockqLength;
578 AUDIO_INFO_LOG("Adding %" PRIu64 " usec of silence to queue",
579 pa_bytes_to_usec(bufferCorrection, &u->sink_input->sample_spec));
580 pa_memblockq_seek(u->memblockq, (int64_t)bufferCorrection, PA_SEEK_RELATIVE, true);
581 }
582 }
583
584 /* Called from input thread context */
SourceOutputPushCb(pa_source_output * o,const pa_memchunk * chunk)585 static void SourceOutputPushCb(pa_source_output *o, const pa_memchunk *chunk)
586 {
587 struct userdata *u;
588 pa_usec_t pushTime;
589 int64_t currentSourceLatency;
590
591 pa_source_output_assert_ref(o);
592 pa_source_output_assert_io_context(o);
593 pa_assert_se(u = o->userdata);
594
595 /* Send current source latency and timestamp with the message */
596 pushTime = pa_rtclock_now();
597 currentSourceLatency = pa_source_get_latency_within_thread(u->source_output->source, true);
598
599 pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_POST,
600 PA_INT_TO_PTR(currentSourceLatency), pushTime, chunk, NULL);
601 u->send_counter += (int64_t) chunk->length;
602 }
603
604 /* Called from input thread context */
SourceOutputProcessRewindCb(pa_source_output * o,size_t nbytes)605 static void SourceOutputProcessRewindCb(pa_source_output *o, size_t nbytes)
606 {
607 struct userdata *u;
608
609 pa_source_output_assert_ref(o);
610 pa_source_output_assert_io_context(o);
611 pa_assert_se(u = o->userdata);
612
613 pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_REWIND,
614 NULL, (int64_t) nbytes, NULL, NULL);
615 u->send_counter -= (int64_t) nbytes;
616 }
617
618 /* Called from input thread context */
SourceOutputProcessMsgCb(pa_msgobject * obj,int code,void * data,int64_t offset,pa_memchunk * chunk)619 static int SourceOutputProcessMsgCb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk)
620 {
621 struct userdata *u = PA_SOURCE_OUTPUT(obj)->userdata;
622
623 if (code == SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT) {
624 size_t length;
625 length = pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq);
626
627 u->latency_snapshot.send_counter = u->send_counter;
628 /* Add content of delay memblockq to the source latency */
629 u->latency_snapshot.source_latency = pa_source_get_latency_within_thread(u->source_output->source, true) +
630 pa_bytes_to_usec(length, &u->source_output->source->sample_spec);
631 u->latency_snapshot.source_timestamp = pa_rtclock_now();
632
633 return 0;
634 }
635 return pa_source_output_process_msg(obj, code, data, offset, chunk);
636 }
637
638 /* Called from main thread.
639 * Get current effective latency of the source. If the source is in use with
640 * smaller latency than the configured latency, it will continue running with
641 * the smaller value when the source output is switched to the source. */
UpdateEffectiveSourceLatency(struct userdata * u,pa_source * source,pa_sink * sink)642 static void UpdateEffectiveSourceLatency(struct userdata *u, pa_source *source, pa_sink *sink)
643 {
644 pa_usec_t effectiveSourceLatency;
645
646 effectiveSourceLatency = u->configured_source_latency;
647
648 if (source) {
649 effectiveSourceLatency = pa_source_get_requested_latency(source);
650 if (effectiveSourceLatency == 0 || effectiveSourceLatency > u->configured_source_latency)
651 effectiveSourceLatency = u->configured_source_latency;
652 }
653
654 /* If the sink is valid, send a message to the output thread, else set the variable directly */
655 if (sink) {
656 pa_asyncmsgq_send(sink->asyncmsgq, PA_MSGOBJECT(u->sink_input),
657 SINK_INPUT_MESSAGE_SET_EFFECTIVE_SOURCE_LATENCY, NULL, (int64_t)effectiveSourceLatency, NULL);
658 } else {
659 u->output_thread_info.effective_source_latency = effectiveSourceLatency;
660 }
661 }
662
663 /* Called from main thread.
664 * Set source output latency to one third of the overall latency if possible.
665 * The choice of one third is rather arbitrary somewhere between the minimum
666 * possible latency which would cause a lot of CPU load and half the configured
667 * latency which would quickly lead to underruns */
SetSourceOutputLatency(struct userdata * u,pa_source * source)668 static void SetSourceOutputLatency(struct userdata *u, pa_source *source)
669 {
670 pa_usec_t latency, requestedLatency;
671
672 requestedLatency = u->latency / TRIPLE_FACTOR;
673
674 /* Normally we try to configure sink and source latency equally. If the
675 * sink latency cannot match the requested source latency try to set the
676 * source latency to a smaller value to avoid underruns */
677 if (u->min_sink_latency > requestedLatency) {
678 latency = PA_MAX(u->latency, u->minimum_latency);
679 requestedLatency = (latency - u->min_sink_latency) / MULTIPLE_FACTOR;
680 }
681
682 latency = PA_CLAMP(requestedLatency, u->min_source_latency, u->max_source_latency);
683 u->configured_source_latency = pa_source_output_set_requested_latency(u->source_output, latency);
684 if (u->configured_source_latency != requestedLatency)
685 AUDIO_WARNING_LOG("Cannot set requested source latency of %0.2f ms, adjusting to %0.2f ms",
686 (double)requestedLatency / PA_USEC_PER_MSEC, (double)u->configured_source_latency / PA_USEC_PER_MSEC);
687 }
688
689 /* Called from input thread context */
SourceOutputAttachCb(pa_source_output * o)690 static void SourceOutputAttachCb(pa_source_output *o)
691 {
692 struct userdata *u;
693
694 pa_source_output_assert_ref(o);
695 pa_source_output_assert_io_context(o);
696 pa_assert_se(u = o->userdata);
697
698 u->rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
699 o->source->thread_info.rtpoll,
700 PA_RTPOLL_LATE,
701 u->asyncmsgq);
702 }
703
704 /* Called from input thread context */
SourceOutputDetachCb(pa_source_output * o)705 static void SourceOutputDetachCb(pa_source_output *o)
706 {
707 struct userdata *u;
708
709 pa_source_output_assert_ref(o);
710 pa_source_output_assert_io_context(o);
711 pa_assert_se(u = o->userdata);
712
713 if (u->rtpoll_item_write) {
714 pa_rtpoll_item_free(u->rtpoll_item_write);
715 u->rtpoll_item_write = NULL;
716 }
717 }
718
719 /* Called from main thread */
SourceOutputKillCb(pa_source_output * o)720 static void SourceOutputKillCb(pa_source_output *o)
721 {
722 struct userdata *u;
723
724 pa_source_output_assert_ref(o);
725 pa_assert_ctl_context();
726 pa_assert_se(u = o->userdata);
727
728 TearDown(u);
729 pa_module_unload_request(u->module, true);
730 }
731
732 /* Called from main thread */
SourceOutputSuspendCb(pa_source_output * o,pa_source_state_t oldState,pa_suspend_cause_t oldSuspendCause)733 static void SourceOutputSuspendCb(pa_source_output *o, pa_source_state_t oldState,
734 pa_suspend_cause_t oldSuspendCause)
735 {
736 struct userdata *u;
737 bool suspended;
738
739 pa_source_output_assert_ref(o);
740 pa_assert_ctl_context();
741 pa_assert_se(u = o->userdata);
742
743 /* State has not changed, nothing to do */
744 if (oldState == o->source->state) {
745 return;
746 }
747
748 suspended = (o->source->state == PA_SOURCE_SUSPENDED);
749
750 /* If the source has been suspended, we need to handle this like
751 * a source change when the source is resumed */
752 if (suspended) {
753 if (u->sink_input->sink) {
754 pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input),
755 SINK_INPUT_MESSAGE_SOURCE_CHANGED, NULL, 0, NULL);
756 } else {
757 u->output_thread_info.push_called = false;
758 }
759 } else {
760 /* Get effective source latency on unsuspend */
761 UpdateEffectiveSourceLatency(u, u->source_output->source, u->sink_input->sink);
762 }
763
764 pa_sink_input_cork(u->sink_input, suspended);
765
766 UpdateAdjustTimer(u);
767 }
768
769 /* Called from input thread context */
UpdateSourceLatencyRangeCb(pa_source_output * i)770 static void UpdateSourceLatencyRangeCb(pa_source_output *i)
771 {
772 struct userdata *u;
773
774 pa_source_output_assert_ref(i);
775 pa_source_output_assert_io_context(i);
776 pa_assert_se(u = i->userdata);
777
778 /* Source latency may have changed */
779 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg),
780 LOOPBACK_MESSAGE_SOURCE_LATENCY_RANGE_CHANGED, NULL, 0, NULL, NULL);
781 }
782
783 /* Called from output thread context */
SinkInputPopCb(pa_sink_input * i,size_t nbytes,pa_memchunk * chunk)784 static int SinkInputPopCb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk)
785 {
786 struct userdata *u;
787
788 pa_sink_input_assert_ref(i);
789 pa_sink_input_assert_io_context(i);
790 pa_assert_se(u = i->userdata);
791 pa_assert(chunk);
792
793 /* It seems necessary to handle outstanding push messages here, though it is not clear
794 * why. Removing this part leads to underruns when low latencies are configured. */
795 u->output_thread_info.in_pop = true;
796 while (pa_asyncmsgq_process_one(u->asyncmsgq) > 0) {
797 ;
798 }
799 u->output_thread_info.in_pop = false;
800
801 /* While pop has not been called, latency adjustments in SINK_INPUT_MESSAGE_POST are
802 * enabled. Disable them on second pop and enable the final adjustment during the
803 * next push. The adjustment must be done on the next push, because there is no way
804 * to retrieve the source latency here. We are waiting for the second pop, because
805 * the first pop may be called before the sink is actually started. */
806 if (!u->output_thread_info.pop_called && u->output_thread_info.first_pop_done) {
807 u->output_thread_info.pop_adjust = true;
808 u->output_thread_info.pop_called = true;
809 }
810 u->output_thread_info.first_pop_done = true;
811
812 if (pa_memblockq_peek(u->memblockq, chunk) < 0) {
813 return -1;
814 }
815
816 chunk->length = PA_MIN(chunk->length, nbytes);
817 pa_memblockq_drop(u->memblockq, chunk->length);
818
819 /* Adjust the memblockq to ensure that there is
820 * enough data in the queue to avoid underruns. */
821 if (!u->output_thread_info.push_called) {
822 MemblockqAdjust(u, 0, true);
823 }
824
825 return 0;
826 }
827
828 /* Called from output thread context */
SinkInputProcessRewindCb(pa_sink_input * i,size_t nbytes)829 static void SinkInputProcessRewindCb(pa_sink_input *i, size_t nbytes)
830 {
831 struct userdata *u;
832
833 pa_sink_input_assert_ref(i);
834 pa_sink_input_assert_io_context(i);
835 pa_assert_se(u = i->userdata);
836
837 pa_memblockq_rewind(u->memblockq, nbytes);
838 }
839
ProcessSinkInputMessagePost(struct userdata * u,void * data,int64_t offset,pa_memchunk * chunk)840 static void ProcessSinkInputMessagePost(struct userdata *u, void *data, int64_t offset, pa_memchunk *chunk)
841 {
842 pa_memblockq_push_align(u->memblockq, chunk);
843
844 /* If push has not been called yet, latency adjustments in SinkInputPopCb()
845 * are enabled. Disable them on first push and correct the memblockq. If pop
846 * has not been called yet, wait until the pop_cb() requests the adjustment */
847 if (u->output_thread_info.pop_called && (!u->output_thread_info.push_called ||
848 u->output_thread_info.pop_adjust)) {
849 int64_t time_delta;
850
851 /* This is the source latency at the time push was called */
852 time_delta = PA_PTR_TO_INT(data);
853 /* Add the time between push and post */
854 time_delta += pa_rtclock_now() - (pa_usec_t) offset;
855 /* Add the sink latency */
856 time_delta += pa_sink_get_latency_within_thread(u->sink_input->sink, true);
857
858 /* The source latency report includes the audio in the chunk,
859 * but since we already pushed the chunk to the memblockq, we need
860 * to subtract the chunk size from the source latency so that it
861 * won't be counted towards both the memblockq latency and the
862 * source latency.
863 *
864 * Sometimes the alsa source reports way too low latency (might
865 * be a bug in the alsa source code). This seems to happen when
866 * there's an overrun. As an attempt to detect overruns, we
867 * check if the chunk size is larger than the configured source
868 * latency. If so, we assume that the source should have pushed
869 * a chunk whose size equals the configured latency, so we
870 * modify time_delta only by that amount, which makes
871 * MemblockqAdjust() drop more data than it would otherwise.
872 * This seems to work quite well, but it's possible that the
873 * next push also contains too much data, and in that case the
874 * resulting latency will be wrong. */
875 if (pa_bytes_to_usec(chunk->length,
876 &u->sink_input->sample_spec) > u->output_thread_info.effective_source_latency) {
877 time_delta -= (int64_t)u->output_thread_info.effective_source_latency;
878 } else {
879 time_delta -= (int64_t)pa_bytes_to_usec(chunk->length, &u->sink_input->sample_spec);
880 }
881
882 /* FIXME: We allow pushing silence here to fix up the latency. This
883 * might lead to a gap in the stream */
884 MemblockqAdjust(u, time_delta, true);
885
886 u->output_thread_info.pop_adjust = false;
887 u->output_thread_info.push_called = true;
888 }
889
890 /* If pop has not been called yet, make sure the latency does not grow too much.
891 * Don't push any silence here, because we already have new data in the queue */
892 if (!u->output_thread_info.pop_called) {
893 MemblockqAdjust(u, 0, false);
894 }
895
896 /* Is this the end of an underrun? Then let's start things
897 * right-away */
898 if (u->sink_input->sink->thread_info.state != PA_SINK_SUSPENDED &&
899 u->sink_input->thread_info.underrun_for > 0 &&
900 pa_memblockq_is_readable(u->memblockq)) {
901 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), LOOPBACK_MESSAGE_UNDERRUN,
902 NULL, 0, NULL, NULL);
903 /* If called from within the pop callback skip the rewind */
904 if (!u->output_thread_info.in_pop) {
905 AUDIO_DEBUG_LOG("Requesting rewind due to end of underrun.");
906 pa_sink_input_request_rewind(u->sink_input,
907 (size_t) (u->sink_input->thread_info.underrun_for ==
908 (size_t) -1 ? 0 : u->sink_input->thread_info.underrun_for),
909 false, true, false);
910 }
911 }
912
913 u->output_thread_info.recv_counter += (int64_t) chunk->length;
914 }
915
916 /* Called from output thread context */
SinkInputProcessMsgCb(pa_msgobject * obj,int code,void * data,int64_t offset,pa_memchunk * chunk)917 static int SinkInputProcessMsgCb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk)
918 {
919 struct userdata *u = PA_SINK_INPUT(obj)->userdata;
920
921 pa_sink_input_assert_io_context(u->sink_input);
922
923 switch (code) {
924 case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
925 pa_usec_t *r = data;
926 *r = pa_bytes_to_usec(pa_memblockq_get_length(u->memblockq), &u->sink_input->sample_spec);
927
928 /* Fall through, the default handler will add in the extra
929 * latency added by the resampler */
930 break;
931 }
932
933 case SINK_INPUT_MESSAGE_POST:
934 ProcessSinkInputMessagePost(u, data, offset, chunk);
935 return 0;
936
937 case SINK_INPUT_MESSAGE_REWIND:
938 /* Do not try to rewind if no data was pushed yet */
939 if (u->output_thread_info.push_called) {
940 pa_memblockq_seek(u->memblockq, -offset, PA_SEEK_RELATIVE, true);
941 }
942 u->output_thread_info.recv_counter -= offset;
943 return 0;
944
945 case SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT: {
946 size_t length;
947 length = pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq);
948
949 u->latency_snapshot.recv_counter = u->output_thread_info.recv_counter;
950 u->latency_snapshot.loopback_memblockq_length = pa_memblockq_get_length(u->memblockq);
951 /* Add content of render memblockq to sink latency */
952 u->latency_snapshot.sink_latency = pa_sink_get_latency_within_thread(u->sink_input->sink, true) +
953 pa_bytes_to_usec(length, &u->sink_input->sink->sample_spec);
954 u->latency_snapshot.sink_timestamp = pa_rtclock_now();
955 return 0;
956 }
957
958 case SINK_INPUT_MESSAGE_SOURCE_CHANGED:
959 u->output_thread_info.push_called = false;
960 return 0;
961
962 case SINK_INPUT_MESSAGE_SET_EFFECTIVE_SOURCE_LATENCY:
963 u->output_thread_info.effective_source_latency = (pa_usec_t)offset;
964 return 0;
965
966 case SINK_INPUT_MESSAGE_UPDATE_MIN_LATENCY:
967 u->output_thread_info.minimum_latency = (pa_usec_t)offset;
968 return 0;
969
970 case SINK_INPUT_MESSAGE_FAST_ADJUST:
971 MemblockqAdjust(u, offset, true);
972 return 0;
973
974 default:
975 break;
976 }
977
978 return pa_sink_input_process_msg(obj, code, data, offset, chunk);
979 }
980 /* Called from main thread.
981 * Set sink input latency to one third of the overall latency if possible.
982 * The choice of one third is rather arbitrary somewhere between the minimum
983 * possible latency which would cause a lot of CPU load and half the configured
984 * latency which would quickly lead to underruns. */
SetSinkInputLatency(struct userdata * u,pa_sink * sink)985 static void SetSinkInputLatency(struct userdata *u, pa_sink *sink)
986 {
987 pa_usec_t latency, requestedLatency;
988
989 requestedLatency = u->latency / TRIPLE_FACTOR;
990
991 /* Normally we try to configure sink and source latency equally. If the
992 * source latency cannot match the requested sink latency try to set the
993 * sink latency to a smaller value to avoid underruns */
994 if (u->min_source_latency > requestedLatency) {
995 latency = PA_MAX(u->latency, u->minimum_latency);
996 requestedLatency = (latency - u->min_source_latency) / MULTIPLE_FACTOR;
997 }
998
999 latency = PA_CLAMP(requestedLatency, u->min_sink_latency, u->max_sink_latency);
1000 u->configured_sink_latency = pa_sink_input_set_requested_latency(u->sink_input, latency);
1001 if (u->configured_sink_latency != requestedLatency) {
1002 AUDIO_WARNING_LOG("Cannot set requested sink latency of %0.2f ms, adjusting to %0.2f ms",
1003 (double)requestedLatency / PA_USEC_PER_MSEC, (double)u->configured_sink_latency / PA_USEC_PER_MSEC);
1004 }
1005 }
1006
1007 /* Called from output thread context */
SinkInputAttachCb(pa_sink_input * i)1008 static void SinkInputAttachCb(pa_sink_input *i)
1009 {
1010 struct userdata *u;
1011
1012 pa_sink_input_assert_ref(i);
1013 pa_sink_input_assert_io_context(i);
1014 pa_assert_se(u = i->userdata);
1015
1016 u->rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
1017 i->sink->thread_info.rtpoll,
1018 PA_RTPOLL_LATE,
1019 u->asyncmsgq);
1020
1021 pa_memblockq_set_prebuf(u->memblockq, pa_sink_input_get_max_request(i) * MULTIPLE_FACTOR);
1022 pa_memblockq_set_maxrewind(u->memblockq, pa_sink_input_get_max_rewind(i));
1023 }
1024
1025 /* Called from output thread context */
SinkInputDetachCb(pa_sink_input * i)1026 static void SinkInputDetachCb(pa_sink_input *i)
1027 {
1028 struct userdata *u;
1029
1030 pa_sink_input_assert_ref(i);
1031 pa_sink_input_assert_io_context(i);
1032 pa_assert_se(u = i->userdata);
1033
1034 if (u->rtpoll_item_read) {
1035 pa_rtpoll_item_free(u->rtpoll_item_read);
1036 u->rtpoll_item_read = NULL;
1037 }
1038 }
1039
1040 /* Called from output thread context */
SinkInputUpdateMaxRewindCb(pa_sink_input * i,size_t nbytes)1041 static void SinkInputUpdateMaxRewindCb(pa_sink_input *i, size_t nbytes)
1042 {
1043 struct userdata *u;
1044
1045 pa_sink_input_assert_ref(i);
1046 pa_sink_input_assert_io_context(i);
1047 pa_assert_se(u = i->userdata);
1048
1049 pa_memblockq_set_maxrewind(u->memblockq, nbytes);
1050 }
1051
1052 /* Called from output thread context */
SinkInputUpdateMaxRequestCb(pa_sink_input * i,size_t nbytes)1053 static void SinkInputUpdateMaxRequestCb(pa_sink_input *i, size_t nbytes)
1054 {
1055 struct userdata *u;
1056
1057 pa_sink_input_assert_ref(i);
1058 pa_sink_input_assert_io_context(i);
1059 pa_assert_se(u = i->userdata);
1060
1061 pa_memblockq_set_prebuf(u->memblockq, nbytes * MULTIPLE_FACTOR);
1062 AUDIO_INFO_LOG("Max request changed");
1063 }
1064
1065 /* Called from main thread */
SinkInputKillCb(pa_sink_input * i)1066 static void SinkInputKillCb(pa_sink_input *i)
1067 {
1068 struct userdata *u;
1069
1070 pa_sink_input_assert_ref(i);
1071 pa_assert_ctl_context();
1072 pa_assert_se(u = i->userdata);
1073
1074 TearDown(u);
1075 pa_module_unload_request(u->module, true);
1076 }
1077
1078 /* Called from the output thread context */
SinkInputStateChangeCb(pa_sink_input * i,pa_sink_input_state_t state)1079 static void SinkInputStateChangeCb(pa_sink_input *i, pa_sink_input_state_t state)
1080 {
1081 struct userdata *u;
1082
1083 pa_sink_input_assert_ref(i);
1084 pa_assert_se(u = i->userdata);
1085
1086 if (state == PA_SINK_INPUT_UNLINKED) {
1087 pa_asyncmsgq_flush(u->asyncmsgq, false);
1088 }
1089 }
1090
1091 /* Called from main thread */
SinkInputSuspendCb(pa_sink_input * i,pa_sink_state_t oldState,pa_suspend_cause_t oldSuspendCause)1092 static void SinkInputSuspendCb(pa_sink_input *i, pa_sink_state_t oldState, pa_suspend_cause_t oldSuspendCause)
1093 {
1094 struct userdata *u;
1095 bool suspended;
1096
1097 pa_sink_input_assert_ref(i);
1098 pa_assert_ctl_context();
1099 pa_assert_se(u = i->userdata);
1100
1101 /* State has not changed, nothing to do */
1102 if (oldState == i->sink->state) {
1103 return;
1104 }
1105
1106 suspended = (i->sink->state == PA_SINK_SUSPENDED);
1107
1108 /* If the sink has been suspended, we need to handle this like
1109 * a sink change when the sink is resumed. Because the sink
1110 * is suspended, we can set the variables directly. */
1111 if (suspended) {
1112 u->output_thread_info.pop_called = false;
1113 u->output_thread_info.first_pop_done = false;
1114 } else {
1115 /* Set effective source latency on unsuspend */
1116 UpdateEffectiveSourceLatency(u, u->source_output->source, u->sink_input->sink);
1117 }
1118
1119 pa_source_output_cork(u->source_output, suspended);
1120
1121 UpdateAdjustTimer(u);
1122 }
1123
1124 /* Called from output thread context */
UpdateSinkLatencyRangeCb(pa_sink_input * i)1125 static void UpdateSinkLatencyRangeCb(pa_sink_input *i)
1126 {
1127 struct userdata *u;
1128
1129 pa_sink_input_assert_ref(i);
1130 pa_sink_input_assert_io_context(i);
1131 pa_assert_se(u = i->userdata);
1132
1133 /* Sink latency may have changed */
1134 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg),
1135 LOOPBACK_MESSAGE_SINK_LATENCY_RANGE_CHANGED, NULL, 0, NULL, NULL);
1136 }
1137
1138 /* Called from main context */
LoopbackProcessMsgCb(pa_msgobject * o,int code,void * userdata,int64_t offset,pa_memchunk * chunk)1139 static int LoopbackProcessMsgCb(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk)
1140 {
1141 struct loopback_msg *msg;
1142 struct userdata *u;
1143 pa_usec_t currentLatency;
1144
1145 pa_assert(o);
1146 pa_assert_ctl_context();
1147
1148 msg = LOOPBACK_MSG(o);
1149 pa_assert_se(u = msg->userdata);
1150
1151 switch (code) {
1152 case LOOPBACK_MESSAGE_SOURCE_LATENCY_RANGE_CHANGED:
1153 UpdateEffectiveSourceLatency(u, u->source_output->source, u->sink_input->sink);
1154 currentLatency = pa_source_get_requested_latency(u->source_output->source);
1155 if (currentLatency > u->configured_source_latency) {
1156 /* The minimum latency has changed to a value larger than the configured latency, so
1157 * the source latency has been increased. The case that the minimum latency changes
1158 * back to a smaller value is not handled because this never happens with the current
1159 * source implementations. */
1160 AUDIO_WARNING_LOG("Source minimum latency increased to %0.2f ms",
1161 (double)currentLatency / PA_USEC_PER_MSEC);
1162 u->configured_source_latency = currentLatency;
1163 UpdateLatencyBoundaries(u, u->source_output->source, u->sink_input->sink);
1164 /* We re-start counting when the latency has changed */
1165 u->iteration_counter = 0;
1166 u->underrun_counter = 0;
1167 }
1168 return 0;
1169
1170 case LOOPBACK_MESSAGE_SINK_LATENCY_RANGE_CHANGED:
1171 currentLatency = pa_sink_get_requested_latency(u->sink_input->sink);
1172 if (currentLatency > u->configured_sink_latency) {
1173 /* The minimum latency has changed to a value larger than the configured latency, so
1174 * the sink latency has been increased. The case that the minimum latency changes back
1175 * to a smaller value is not handled because this never happens with the current sink
1176 * implementations. */
1177 AUDIO_WARNING_LOG("Sink minimum latency increased to %0.2f ms",
1178 (double)currentLatency / PA_USEC_PER_MSEC);
1179 u->configured_sink_latency = currentLatency;
1180 UpdateLatencyBoundaries(u, u->source_output->source, u->sink_input->sink);
1181 /* We re-start counting when the latency has changed */
1182 u->iteration_counter = 0;
1183 u->underrun_counter = 0;
1184 }
1185 return 0;
1186
1187 case LOOPBACK_MESSAGE_UNDERRUN:
1188 u->underrun_counter++;
1189 AUDIO_DEBUG_LOG("Underrun detected, counter incremented to %u", u->underrun_counter);
1190 return 0;
1191 default:
1192 break;
1193 }
1194
1195 return 0;
1196 }
1197
SinkPortLatencyOffsetChangedCb(pa_core * core,pa_sink * sink,struct userdata * u)1198 static pa_hook_result_t SinkPortLatencyOffsetChangedCb(pa_core *core, pa_sink *sink, struct userdata *u)
1199 {
1200 if (sink != u->sink_input->sink) {
1201 return PA_HOOK_OK;
1202 }
1203
1204 u->sink_latency_offset = sink->port_latency_offset;
1205 UpdateMinimumLatency(u, sink, true);
1206
1207 return PA_HOOK_OK;
1208 }
1209
SourcePortLatencyOffsetChangedCb(pa_core * core,pa_source * source,struct userdata * u)1210 static pa_hook_result_t SourcePortLatencyOffsetChangedCb(pa_core *core, pa_source *source, struct userdata *u)
1211 {
1212 if (source != u->source_output->source) {
1213 return PA_HOOK_OK;
1214 }
1215
1216 u->source_latency_offset = source->port_latency_offset;
1217 UpdateMinimumLatency(u, u->sink_input->sink, true);
1218
1219 return PA_HOOK_OK;
1220 }
1221
InitFailed(pa_module * m,pa_modargs * ma)1222 int InitFailed(pa_module *m, pa_modargs *ma)
1223 {
1224 AUDIO_ERR_LOG("Init Loopback failed.");
1225 if (ma) {
1226 pa_modargs_free(ma);
1227 }
1228
1229 pa__done(m);
1230
1231 return -1;
1232 }
1233
InitSampleSpecAndChannelMap(pa_modargs * ma,struct userdata * u,pa_sink * sink,pa_source * source)1234 static int InitSampleSpecAndChannelMap(pa_modargs *ma, struct userdata *u, pa_sink *sink, pa_source *source)
1235 {
1236 if (source) {
1237 u->ss = source->sample_spec;
1238 u->map = source->channel_map;
1239 } else if (sink) {
1240 u->ss = sink->sample_spec;
1241 u->map = sink->channel_map;
1242 } else {
1243 /* FIXME: Dummy stream format, needed because pa_sink_input_new()
1244 * requires valid sample spec and channel map even when all the FIX_*
1245 * stream flags are specified. pa_sink_input_new() should be changed
1246 * to ignore the sample spec and channel map when the FIX_* flags are
1247 * present. */
1248 u->ss.format = PA_SAMPLE_U8;
1249 u->ss.rate = DEFAULT_SAMPLE_RATE;
1250 u->ss.channels = 1;
1251 u->map.channels = 1;
1252 u->map.map[0] = PA_CHANNEL_POSITION_MONO;
1253 }
1254
1255 if (pa_modargs_get_sample_spec_and_channel_map(ma, &(u->ss), &(u->map), PA_CHANNEL_MAP_DEFAULT) < 0) {
1256 AUDIO_ERR_LOG("Invalid sample format specification or channel map");
1257 return -1;
1258 }
1259
1260 if (u->ss.rate < MIN_SAMPLE_RATE || u->ss.rate > PA_RATE_MAX) {
1261 AUDIO_ERR_LOG("Invalid rate specification, valid range is 4000 Hz to %i Hz", PA_RATE_MAX);
1262 return -1;
1263 }
1264
1265 return 0;
1266 }
1267
InitUserData(struct userdata * u)1268 static void InitUserData(struct userdata *u)
1269 {
1270 uint32_t latencyMsec;
1271 uint32_t maxLatencyMsec;
1272 uint32_t fastAdjustThreshold;
1273 uint32_t adjustTimeSec;
1274 latencyMsec = DEFAULT_LATENCY_MSEC;
1275 fastAdjustThreshold = 0;
1276 maxLatencyMsec = 0;
1277
1278 if (maxLatencyMsec < latencyMsec) {
1279 AUDIO_WARNING_LOG("Configured maximum latency is smaller than latency, using latency instead");
1280 maxLatencyMsec = latencyMsec;
1281 }
1282
1283 u->latency = (pa_usec_t) latencyMsec * PA_USEC_PER_MSEC;
1284 u->max_latency = (pa_usec_t) maxLatencyMsec * PA_USEC_PER_MSEC;
1285 u->output_thread_info.pop_called = false;
1286 u->output_thread_info.pop_adjust = false;
1287 u->output_thread_info.push_called = false;
1288 u->iteration_counter = 0;
1289 u->underrun_counter = 0;
1290 u->underrun_latency_limit = 0;
1291 u->source_sink_changed = true;
1292 u->real_adjust_time_sum = 0;
1293 u->adjust_counter = 0;
1294 u->fast_adjust_threshold = fastAdjustThreshold * PA_USEC_PER_MSEC;
1295
1296 adjustTimeSec = DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC;
1297 if (adjustTimeSec != DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC) {
1298 u->adjust_time = adjustTimeSec * PA_USEC_PER_SEC;
1299 } else {
1300 u->adjust_time = DEFAULT_ADJUST_TIME_USEC;
1301 }
1302
1303 u->real_adjust_time = u->adjust_time;
1304 }
1305
CreateSinkInput(pa_module * m,pa_modargs * ma,struct userdata * u,pa_sink * sink)1306 static int CreateSinkInput(pa_module *m, pa_modargs *ma, struct userdata *u, pa_sink *sink)
1307 {
1308 pa_sink_input_new_data sinkInputData;
1309
1310 pa_sink_input_new_data_init(&sinkInputData);
1311 sinkInputData.driver = __FILE__;
1312 sinkInputData.module = m;
1313
1314 if (sink) {
1315 pa_sink_input_new_data_set_sink(&sinkInputData, sink, false, true);
1316 }
1317
1318 if (pa_modargs_get_proplist(ma, "sink_input_properties", sinkInputData.proplist, PA_UPDATE_REPLACE) < 0) {
1319 AUDIO_ERR_LOG("Failed to parse the sink_input_properties value.");
1320 pa_sink_input_new_data_done(&sinkInputData);
1321 return -1;
1322 }
1323
1324 if (!pa_proplist_contains(sinkInputData.proplist, PA_PROP_MEDIA_ROLE)) {
1325 pa_proplist_sets(sinkInputData.proplist, PA_PROP_MEDIA_ROLE, "abstract");
1326 }
1327
1328 pa_sink_input_new_data_set_sample_spec(&sinkInputData, &(u->ss));
1329 pa_sink_input_new_data_set_channel_map(&sinkInputData, &(u->map));
1330 sinkInputData.flags = PA_SINK_INPUT_VARIABLE_RATE | PA_SINK_INPUT_START_CORKED | PA_SINK_INPUT_DONT_MOVE;
1331
1332 pa_sink_input_new(&u->sink_input, m->core, &sinkInputData);
1333 pa_sink_input_new_data_done(&sinkInputData);
1334
1335 if (!u->sink_input) {
1336 AUDIO_ERR_LOG("Create sink input failed");
1337 return -1;
1338 }
1339 return 0;
1340 }
1341
ConfigSinkInput(struct userdata * u,pa_source * source)1342 static int ConfigSinkInput(struct userdata *u, pa_source *source)
1343 {
1344 const char *n;
1345 pa_memchunk silence;
1346
1347 u->sink_input->parent.process_msg = SinkInputProcessMsgCb;
1348 u->sink_input->pop = SinkInputPopCb;
1349 u->sink_input->process_rewind = SinkInputProcessRewindCb;
1350 u->sink_input->kill = SinkInputKillCb;
1351 u->sink_input->state_change = SinkInputStateChangeCb;
1352 u->sink_input->attach = SinkInputAttachCb;
1353 u->sink_input->detach = SinkInputDetachCb;
1354 u->sink_input->update_max_rewind = SinkInputUpdateMaxRewindCb;
1355 u->sink_input->update_max_request = SinkInputUpdateMaxRequestCb;
1356 u->sink_input->suspend = SinkInputSuspendCb;
1357 u->sink_input->update_sink_latency_range = UpdateSinkLatencyRangeCb;
1358 u->sink_input->update_sink_fixed_latency = UpdateSinkLatencyRangeCb;
1359 u->sink_input->userdata = u;
1360
1361 UpdateLatencyBoundaries(u, u->source_output->source, u->sink_input->sink);
1362 SetSinkInputLatency(u, u->sink_input->sink);
1363 SetSourceOutputLatency(u, u->source_output->source);
1364
1365 pa_sink_input_get_silence(u->sink_input, &silence);
1366 u->memblockq = pa_memblockq_new(
1367 "module-loopback memblockq",
1368 0, /* idx */
1369 MEMBLOCKQ_MAXLENGTH, /* maxlength */
1370 MEMBLOCKQ_MAXLENGTH, /* tlength */
1371 &(u->ss), /* sample_spec */
1372 0, /* prebuf */
1373 0, /* minreq */
1374 0, /* maxrewind */
1375 &silence); /* silence frame */
1376 pa_memblock_unref(silence.memblock);
1377 /* Fill the memblockq with silence */
1378 pa_memblockq_seek(u->memblockq, pa_usec_to_bytes(u->latency, &u->sink_input->sample_spec), PA_SEEK_RELATIVE, true);
1379
1380 u->asyncmsgq = pa_asyncmsgq_new(0);
1381 if (!u->asyncmsgq) {
1382 AUDIO_ERR_LOG("pa_asyncmsgq_new() failed.");
1383 return -1;
1384 }
1385
1386 if (!pa_proplist_contains(u->source_output->proplist, PA_PROP_MEDIA_NAME)) {
1387 pa_proplist_setf(u->source_output->proplist, PA_PROP_MEDIA_NAME, "Loopback to %s",
1388 pa_strnull(pa_proplist_gets(u->sink_input->sink->proplist, PA_PROP_DEVICE_DESCRIPTION)));
1389 }
1390
1391 if (!pa_proplist_contains(u->source_output->proplist, PA_PROP_MEDIA_ICON_NAME) &&
1392 (n = pa_proplist_gets(u->sink_input->sink->proplist, PA_PROP_DEVICE_ICON_NAME))) {
1393 pa_proplist_sets(u->source_output->proplist, PA_PROP_MEDIA_ICON_NAME, n);
1394 }
1395
1396 if (!pa_proplist_contains(u->sink_input->proplist, PA_PROP_MEDIA_NAME)) {
1397 pa_proplist_setf(u->sink_input->proplist, PA_PROP_MEDIA_NAME, "Loopback from %s",
1398 pa_strnull(pa_proplist_gets(u->source_output->source->proplist, PA_PROP_DEVICE_DESCRIPTION)));
1399 }
1400
1401 if (source && !pa_proplist_contains(u->sink_input->proplist, PA_PROP_MEDIA_ICON_NAME) &&
1402 (n = pa_proplist_gets(u->source_output->source->proplist, PA_PROP_DEVICE_ICON_NAME))) {
1403 pa_proplist_sets(u->sink_input->proplist, PA_PROP_MEDIA_ICON_NAME, n);
1404 }
1405
1406 return 0;
1407 }
1408
CreateSourceOutput(pa_module * m,pa_modargs * ma,struct userdata * u,pa_source * source)1409 static int CreateSourceOutput(pa_module *m, pa_modargs *ma, struct userdata *u, pa_source *source)
1410 {
1411 pa_source_output_new_data sourceOutputData;
1412 pa_sample_spec ss = u->ss;
1413 pa_channel_map map = u->map;
1414
1415 pa_source_output_new_data_init(&sourceOutputData);
1416 sourceOutputData.driver = __FILE__;
1417 sourceOutputData.module = m;
1418 if (source) {
1419 pa_source_output_new_data_set_source(&sourceOutputData, source, false, true);
1420 }
1421
1422 if (pa_modargs_get_proplist(ma, "source_output_properties", sourceOutputData.proplist, PA_UPDATE_REPLACE) < 0) {
1423 AUDIO_ERR_LOG("Failed to parse the source_output_properties value.");
1424 pa_source_output_new_data_done(&sourceOutputData);
1425 return -1;
1426 }
1427
1428 if (!pa_proplist_contains(sourceOutputData.proplist, PA_PROP_MEDIA_ROLE))
1429 pa_proplist_sets(sourceOutputData.proplist, PA_PROP_MEDIA_ROLE, "abstract");
1430
1431 pa_source_output_new_data_set_sample_spec(&sourceOutputData, &ss);
1432 pa_source_output_new_data_set_channel_map(&sourceOutputData, &map);
1433 sourceOutputData.flags = PA_SOURCE_OUTPUT_START_CORKED | PA_SOURCE_OUTPUT_DONT_MOVE;
1434
1435 AUDIO_INFO_LOG("sourceOutputData.driver:%{public}s, module:%{pulic}s, source_output addr:%{public}p",
1436 sourceOutputData.driver, sourceOutputData.module->name, &u->source_output);
1437
1438 pa_source_output_new(&u->source_output, m->core, &sourceOutputData);
1439 AUDIO_INFO_LOG("pa_source_output_new DONE");
1440 pa_source_output_new_data_done(&sourceOutputData);
1441
1442 if (!u->source_output) {
1443 AUDIO_ERR_LOG("Create source output failed");
1444 return -1;
1445 }
1446 return 0;
1447 }
1448
ConfigSourceOutput(struct userdata * u)1449 static void ConfigSourceOutput(struct userdata *u)
1450 {
1451 u->source_output->parent.process_msg = SourceOutputProcessMsgCb;
1452 u->source_output->push = SourceOutputPushCb;
1453 u->source_output->process_rewind = SourceOutputProcessRewindCb;
1454 u->source_output->kill = SourceOutputKillCb;
1455 u->source_output->attach = SourceOutputAttachCb;
1456 u->source_output->detach = SourceOutputDetachCb;
1457 u->source_output->suspend = SourceOutputSuspendCb;
1458 u->source_output->update_source_latency_range = UpdateSourceLatencyRangeCb;
1459 u->source_output->update_source_fixed_latency = UpdateSourceLatencyRangeCb;
1460 u->source_output->userdata = u;
1461
1462 /* If format, rate or channels were originally unset, they are set now
1463 * after the pa_source_output_new() call. */
1464 u->ss = u->source_output->sample_spec;
1465 u->map = u->source_output->channel_map;
1466 }
1467
ConfigLoopback(pa_module * m,struct userdata * u)1468 static void ConfigLoopback(pa_module *m, struct userdata *u)
1469 {
1470 /* Hooks to track changes of latency offsets */
1471 pa_module_hook_connect(m, &m->core->hooks[PA_CORE_HOOK_SINK_PORT_LATENCY_OFFSET_CHANGED],
1472 PA_HOOK_NORMAL, (pa_hook_cb_t) SinkPortLatencyOffsetChangedCb, u);
1473 pa_module_hook_connect(m, &m->core->hooks[PA_CORE_HOOK_SOURCE_PORT_LATENCY_OFFSET_CHANGED],
1474 PA_HOOK_NORMAL, (pa_hook_cb_t) SourcePortLatencyOffsetChangedCb, u);
1475
1476 /* Setup message handler for main thread */
1477 u->msg = pa_msgobject_new(loopback_msg);
1478 u->msg->parent.process_msg = LoopbackProcessMsgCb;
1479 u->msg->userdata = u;
1480
1481 /* The output thread is not yet running, set effective_source_latency directly */
1482 UpdateEffectiveSourceLatency(u, u->source_output->source, NULL);
1483
1484 pa_sink_input_put(u->sink_input);
1485 pa_source_output_put(u->source_output);
1486
1487 if (u->source_output->source->state != PA_SOURCE_SUSPENDED) {
1488 pa_sink_input_cork(u->sink_input, false);
1489 }
1490
1491 if (u->sink_input->sink->state != PA_SINK_SUSPENDED) {
1492 pa_source_output_cork(u->source_output, false);
1493 }
1494 }
1495
pa__init(pa_module * m)1496 int pa__init(pa_module *m)
1497 {
1498 pa_modargs *ma = NULL;
1499 struct userdata *u;
1500 pa_sink *sink = NULL;
1501 pa_source *source = NULL;
1502 const char *n;
1503
1504 pa_assert(m);
1505
1506 if (!(ma = pa_modargs_new(m->argument, VALID_MODARGS))) {
1507 AUDIO_ERR_LOG("Failed to parse module arguments");
1508 return InitFailed(m, ma);
1509 }
1510
1511 n = pa_modargs_get_value(ma, "source", NULL);
1512 if (n && !(source = pa_namereg_get(m->core, n, PA_NAMEREG_SOURCE))) {
1513 AUDIO_ERR_LOG("No such source.");
1514 return InitFailed(m, ma);
1515 }
1516
1517 n = pa_modargs_get_value(ma, "sink", NULL);
1518 if (n && !(sink = pa_namereg_get(m->core, n, PA_NAMEREG_SINK))) {
1519 AUDIO_ERR_LOG("No such sink.");
1520 return InitFailed(m, ma);
1521 }
1522
1523 m->userdata = u = pa_xnew0(struct userdata, 1);
1524 u->core = m->core;
1525 u->module = m;
1526
1527 if (InitSampleSpecAndChannelMap(ma, u, sink, source) != 0) {
1528 return InitFailed(m, ma);
1529 }
1530 InitUserData(u);
1531
1532 if (CreateSourceOutput(m, ma, u, source) != 0) {
1533 return InitFailed(m, ma);
1534 }
1535 ConfigSourceOutput(u);
1536
1537 if (CreateSinkInput(m, ma, u, sink) != 0 || ConfigSinkInput(u, source) != 0) {
1538 return InitFailed(m, ma);
1539 }
1540
1541 ConfigLoopback(m, u);
1542
1543 pa_modargs_free(ma);
1544 return 0;
1545 }
1546
pa__done(pa_module * m)1547 void pa__done(pa_module*m)
1548 {
1549 struct userdata *u;
1550 pa_assert(m);
1551
1552 if (!(u = m->userdata)) {
1553 return;
1554 }
1555
1556 TearDown(u);
1557
1558 if (u->memblockq) {
1559 pa_memblockq_free(u->memblockq);
1560 }
1561 if (u->asyncmsgq) {
1562 pa_asyncmsgq_unref(u->asyncmsgq);
1563 }
1564
1565 pa_xfree(u);
1566 }