• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 // GCC does not warn for unused *static inline* functions, but clang does.
17 #ifdef __clang__
18 #pragma clang diagnostic push
19 #pragma clang diagnostic ignored "-Wunused-function"
20 #endif
21 
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25 
26 #include <stdio.h>
27 
28 #include <pulse/xmalloc.h>
29 
30 #include <pulsecore/sink-input.h>
31 #include <pulsecore/module.h>
32 #include <pulsecore/modargs.h>
33 #include <pulsecore/namereg.h>
34 #include <pulsecore/log.h>
35 #include <pulsecore/core-util.h>
36 
37 #include <pulse/rtclock.h>
38 #include <pulse/timeval.h>
39 
40 #include "audio_log.h"
41 
42 PA_MODULE_AUTHOR("OpenHarmony");
43 PA_MODULE_DESCRIPTION(_("Loopback from source to sink"));
44 PA_MODULE_VERSION(PACKAGE_VERSION);
45 PA_MODULE_LOAD_ONCE(false);
46 PA_MODULE_USAGE(
47         "source=<source to connect to> "
48         "sink=<sink to connect to> ");
49 
50 #define DEFAULT_LATENCY_MSEC 200
51 #define MAX_LATENCY_MSEC 30000
52 #define MEMBLOCKQ_MAXLENGTH (1024 * 1024 * 32)
53 #define MIN_DEVICE_LATENCY (2.5 * PA_USEC_PER_MSEC)
54 #define DEFAULT_ADJUST_TIME_USEC (10 * PA_USEC_PER_SEC)
55 #define MAX_FAST_ADJUST_THRESHOLD 100
56 #define MIN_SAMPLE_RATE 4000
57 #define DEFAULT_SAMPLE_RATE 8000
58 #define MULTIPLE_FACTOR 2
59 #define TRIPLE_FACTOR 3
60 #define ADJUST_TIME_FOR_CHANGE (333 * PA_USEC_PER_MSEC)
61 #define TIME_HOUR_TO_SECOND_UNIT 3600
62 #define MIN_VISIBLE_UNDERRUN_COUNTER 2
63 #define SLEEP_ADJUST_TIME_MULTIPLE_TIMES 1.05
64 #define LATENCY_OFFSET_ADJUST_TIMES 0.01
65 #define LATENCY_INCREMENT_FIVE_MSEC (5 * PA_USEC_PER_MSEC)
66 
67 typedef struct loopback_msg loopback_msg;
68 
69 struct userdata {
70     pa_core *core;
71     pa_module *module;
72 
73     loopback_msg *msg;
74 
75     pa_sink_input *sink_input;
76     pa_source_output *source_output;
77 
78     pa_asyncmsgq *asyncmsgq;
79     pa_memblockq *memblockq;
80 
81     pa_rtpoll_item *rtpoll_item_read, *rtpoll_item_write;
82 
83     pa_time_event *time_event;
84 
85     /* Variables used to calculate the average time between
86      * subsequent calls of AdjustRates() */
87     pa_usec_t adjust_time_stamp;
88     pa_usec_t real_adjust_time;
89     pa_usec_t real_adjust_time_sum;
90 
91     /* Values from command line configuration */
92     pa_usec_t latency;
93     pa_usec_t max_latency;
94     pa_usec_t adjust_time;
95     pa_usec_t fast_adjust_threshold;
96 
97     /* Latency boundaries and current values */
98     pa_usec_t min_source_latency;
99     pa_usec_t max_source_latency;
100     pa_usec_t min_sink_latency;
101     pa_usec_t max_sink_latency;
102     pa_usec_t configured_sink_latency;
103     pa_usec_t configured_source_latency;
104     int64_t source_latency_offset;
105     int64_t sink_latency_offset;
106     pa_usec_t minimum_latency;
107 
108     /* lower latency limit found by underruns */
109     pa_usec_t underrun_latency_limit;
110 
111     /* Various counters */
112     uint32_t iteration_counter;
113     uint32_t underrun_counter;
114     uint32_t adjust_counter;
115 
116     bool fixed_alsa_source;
117     bool source_sink_changed;
118 
119     pa_sample_spec ss;
120     pa_channel_map map;
121 
122     /* Used for sink input and source output snapshots */
123     struct {
124         int64_t send_counter;
125         int64_t source_latency;
126         pa_usec_t source_timestamp;
127 
128         int64_t recv_counter;
129         size_t loopback_memblockq_length;
130         int64_t sink_latency;
131         pa_usec_t sink_timestamp;
132     } latency_snapshot;
133 
134     /* Input thread variable */
135     int64_t send_counter;
136 
137     /* Output thread variables */
138     struct {
139         int64_t recv_counter;
140         pa_usec_t effective_source_latency;
141 
142         /* Copied from main thread */
143         pa_usec_t minimum_latency;
144 
145         /* Various booleans */
146         bool in_pop;
147         bool pop_called;
148         bool pop_adjust;
149         bool first_pop_done;
150         bool push_called;
151     } output_thread_info;
152 };
153 
154 struct loopback_msg {
155     pa_msgobject parent;
156     struct userdata *userdata;
157 };
158 
159 PA_DEFINE_PRIVATE_CLASS(loopback_msg, pa_msgobject);
160 #define LOOPBACK_MSG(o) (loopback_msg_cast(o))
161 
162 static const char * const VALID_MODARGS[] = {
163     "source",
164     "sink",
165     NULL,
166 };
167 
168 enum {
169     SINK_INPUT_MESSAGE_POST = PA_SINK_INPUT_MESSAGE_MAX,
170     SINK_INPUT_MESSAGE_REWIND,
171     SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT,
172     SINK_INPUT_MESSAGE_SOURCE_CHANGED,
173     SINK_INPUT_MESSAGE_SET_EFFECTIVE_SOURCE_LATENCY,
174     SINK_INPUT_MESSAGE_UPDATE_MIN_LATENCY,
175     SINK_INPUT_MESSAGE_FAST_ADJUST,
176 };
177 
178 enum {
179     SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT = PA_SOURCE_OUTPUT_MESSAGE_MAX,
180 };
181 
182 enum {
183     LOOPBACK_MESSAGE_SOURCE_LATENCY_RANGE_CHANGED,
184     LOOPBACK_MESSAGE_SINK_LATENCY_RANGE_CHANGED,
185     LOOPBACK_MESSAGE_UNDERRUN,
186 };
187 
188 static void EnableAdjustTimer(struct userdata *u, bool enable);
189 
190 /* Called from main context */
TearDown(struct userdata * u)191 static void TearDown(struct userdata *u)
192 {
193     pa_assert(u);
194     pa_assert_ctl_context();
195 
196     u->adjust_time = 0;
197     EnableAdjustTimer(u, false);
198 
199     /* Handling the asyncmsgq between the source output and the sink input
200      * requires some care. When the source output is unlinked, nothing needs
201      * to be done for the asyncmsgq, because the source output is the sending
202      * end. But when the sink input is unlinked, we should ensure that the
203      * asyncmsgq is emptied, because the messages in the queue hold references
204      * to the sink input. Also, we need to ensure that new messages won't be
205      * written to the queue after we have emptied it.
206      *
207      * Emptying the queue can be done in the state_change() callback of the
208      * sink input, when the new state is "unlinked".
209      *
210      * Preventing new messages from being written to the queue can be achieved
211      * by unlinking the source output before unlinking the sink input. There
212      * are no other writers for that queue, so this is sufficient. */
213 
214     if (u->source_output) {
215         pa_source_output_unlink(u->source_output);
216         pa_source_output_unref(u->source_output);
217         u->source_output = NULL;
218     }
219 
220     if (u->sink_input) {
221         pa_sink_input_unlink(u->sink_input);
222         pa_sink_input_unref(u->sink_input);
223         u->sink_input = NULL;
224     }
225 }
226 
227 /* rate controller, called from main context
228  * - maximum deviation from base rate is less than 1%
229  * - can create audible artifacts by changing the rate too quickly
230  * - exhibits hunting with USB or Bluetooth sources
231  */
RateController(uint32_t baseRate,pa_usec_t adjustTime,int32_t latencyDifferenceUsec)232 static uint32_t RateController(uint32_t baseRate, pa_usec_t adjustTime, int32_t latencyDifferenceUsec)
233 {
234     uint32_t newRate;
235     double minCycles;
236 
237     /* Calculate best rate to correct the current latency offset, limit at
238      * slightly below 1% difference from base_rate */
239     minCycles = (double)abs(latencyDifferenceUsec) / adjustTime / LATENCY_OFFSET_ADJUST_TIMES + 1;
240     newRate = baseRate * (1.0 + (double)latencyDifferenceUsec / minCycles / adjustTime);
241 
242     return newRate;
243 }
244 
245 /* Called from main thread.
246  * It has been a matter of discussion how to correctly calculate the minimum
247  * latency that module-loopback can deliver with a given source and sink.
248  * The calculation has been placed in a separate function so that the definition
249  * can easily be changed. The resulting estimate is not very exact because it
250  * depends on the reported latency ranges. In cases were the lower bounds of
251  * source and sink latency are not reported correctly (USB) the result will
252  * be wrong. */
UpdateMinimumLatency(struct userdata * u,pa_sink * sink,bool printMsg)253 static void UpdateMinimumLatency(struct userdata *u, pa_sink *sink, bool printMsg)
254 {
255     if (u->underrun_latency_limit) {
256         /* If we already detected a real latency limit because of underruns, use it */
257         u->minimum_latency = u->underrun_latency_limit;
258     } else {
259         /* Calculate latency limit from latency ranges */
260 
261         u->minimum_latency = u->min_sink_latency;
262         if (u->fixed_alsa_source) {
263             /* If we are using an alsa source with fixed latency, we will get a wakeup when
264              * one fragment is filled, and then we empty the source buffer, so the source
265              * latency never grows much beyond one fragment (assuming that the CPU doesn't
266              * cause a bottleneck). */
267             u->minimum_latency += u->core->default_fragment_size_msec * PA_USEC_PER_MSEC;
268         } else {
269             /* In all other cases the source will deliver new data at latest after one source latency.
270              * Make sure there is enough data available that the sink can keep on playing until new
271              * data is pushed. */
272             u->minimum_latency += u->min_source_latency;
273         }
274 
275         /* Multiply by 1.1 as a safety margin for delays that are proportional to the buffer sizes */
276         u->minimum_latency *= 1.1;
277 
278         /* Add 1.5 ms as a safety margin for delays not related to the buffer sizes */
279         u->minimum_latency += 1.5 * PA_USEC_PER_MSEC;
280     }
281 
282     /* Add the latency offsets */
283     if (-(u->sink_latency_offset + u->source_latency_offset) <= (int64_t)u->minimum_latency) {
284         u->minimum_latency += u->sink_latency_offset + u->source_latency_offset;
285     } else {
286         u->minimum_latency = 0;
287     }
288 
289     /* If the sink is valid, send a message to update the minimum latency to
290      * the output thread, else set the variable directly */
291     if (sink) {
292         pa_asyncmsgq_send(sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_UPDATE_MIN_LATENCY,
293             NULL, u->minimum_latency, NULL);
294     } else {
295         u->output_thread_info.minimum_latency = u->minimum_latency;
296     }
297 
298     if (printMsg) {
299         AUDIO_INFO_LOG("Minimum possible end to end latency: %0.2f ms", (double)u->minimum_latency / PA_USEC_PER_MSEC);
300         if (u->latency < u->minimum_latency) {
301             AUDIO_WARNING_LOG("Configured latency of %0.2f ms is smaller than minimum latency, using minimum instead",
302                 (double)u->latency / PA_USEC_PER_MSEC);
303         }
304     }
305 }
306 
CalculateAdjustTime(struct userdata * u,uint32_t * baseRate,int32_t * latencyDifference)307 static int CalculateAdjustTime(struct userdata *u, uint32_t *baseRate, int32_t *latencyDifference)
308 {
309     size_t buffer;
310     uint32_t oldRate;
311     pa_usec_t currentBufferLatency, snapshotDelay;
312     int64_t currentSourceSinkLatency, currentLatency, latencyAtOptimumRate;
313     pa_usec_t finalLatency, now, timePassed;
314 
315     now = pa_rtclock_now();
316     timePassed = now - u->adjust_time_stamp;
317     if (!u->source_sink_changed && timePassed < u->adjust_time * SLEEP_ADJUST_TIME_MULTIPLE_TIMES) {
318         u->adjust_counter++;
319         u->real_adjust_time_sum += timePassed;
320         u->real_adjust_time = u->real_adjust_time_sum / u->adjust_counter;
321     }
322     u->adjust_time_stamp = now;
323 
324     /* Rates and latencies */
325     oldRate = u->sink_input->sample_spec.rate;
326     *baseRate = u->source_output->sample_spec.rate;
327 
328     buffer = u->latency_snapshot.loopback_memblockq_length;
329     if (u->latency_snapshot.recv_counter <= u->latency_snapshot.send_counter) {
330         buffer += (size_t) (u->latency_snapshot.send_counter - u->latency_snapshot.recv_counter);
331     } else {
332         buffer = PA_CLIP_SUB(buffer, (size_t) (u->latency_snapshot.recv_counter - u->latency_snapshot.send_counter));
333     }
334 
335     currentBufferLatency = pa_bytes_to_usec(buffer, &u->sink_input->sample_spec);
336     snapshotDelay = u->latency_snapshot.source_timestamp - u->latency_snapshot.sink_timestamp;
337     currentSourceSinkLatency = u->latency_snapshot.sink_latency +
338         u->latency_snapshot.source_latency - snapshotDelay;
339 
340     /* Current latency */
341     currentLatency = currentSourceSinkLatency + currentBufferLatency;
342 
343     /* Latency at base rate */
344     latencyAtOptimumRate = currentSourceSinkLatency + currentBufferLatency * oldRate / (*baseRate);
345 
346     finalLatency = PA_MAX(u->latency, u->minimum_latency);
347     *latencyDifference = (int32_t)(latencyAtOptimumRate - finalLatency);
348 
349     AUDIO_DEBUG_LOG("Loopback overall latency is %0.2f ms + %0.2f ms + %0.2f ms = %0.2f ms",
350         (double) u->latency_snapshot.sink_latency / PA_USEC_PER_MSEC,
351         (double) currentBufferLatency / PA_USEC_PER_MSEC,
352         (double) u->latency_snapshot.source_latency / PA_USEC_PER_MSEC,
353         (double) currentLatency / PA_USEC_PER_MSEC);
354 
355     AUDIO_DEBUG_LOG("Loopback latency at base rate is %0.2f ms", (double)latencyAtOptimumRate / PA_USEC_PER_MSEC);
356 
357     /* Drop or insert samples if fast_adjust_threshold_msec was specified and the latency difference is too large. */
358     if (u->fast_adjust_threshold > 0 && abs((*latencyDifference)) > u->fast_adjust_threshold) {
359         AUDIO_DEBUG_LOG ("Latency difference larger than %" PRIu64 " msec, skipping or inserting samples.",
360             u->fast_adjust_threshold / PA_USEC_PER_MSEC);
361 
362         pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input),
363             SINK_INPUT_MESSAGE_FAST_ADJUST, NULL, currentSourceSinkLatency, NULL);
364 
365         /* Skip real adjust time calculation on next iteration. */
366         u->source_sink_changed = true;
367         return -1;
368     }
369     return 0;
370 }
371 
372 /* Called from main context */
AdjustRates(struct userdata * u)373 static void AdjustRates(struct userdata *u)
374 {
375     uint32_t baseRate, newRate, runHours;
376     int32_t latencyDifference;
377 
378     pa_assert(u);
379     pa_assert_ctl_context();
380 
381     /* Runtime and counters since last change of source or sink
382      * or source/sink latency */
383     runHours = u->iteration_counter * u->real_adjust_time / PA_USEC_PER_SEC / TIME_HOUR_TO_SECOND_UNIT;
384     u->iteration_counter += 1;
385 
386     /* If we are seeing underruns then the latency is too small */
387     if (u->underrun_counter > MIN_VISIBLE_UNDERRUN_COUNTER) {
388         pa_usec_t target_latency;
389 
390         target_latency = PA_MAX(u->latency, u->minimum_latency) + LATENCY_INCREMENT_FIVE_MSEC;
391         if (u->max_latency == 0 || target_latency < u->max_latency) {
392             u->underrun_latency_limit =
393                 PA_CLIP_SUB((int64_t)target_latency, u->sink_latency_offset + u->source_latency_offset);
394             AUDIO_WARNING_LOG("Too many underruns, increasing latency to %0.2f ms",
395                 (double)target_latency / PA_USEC_PER_MSEC);
396         } else {
397             u->underrun_latency_limit = PA_CLIP_SUB((int64_t)u->max_latency,
398                 u->sink_latency_offset + u->source_latency_offset);
399             AUDIO_WARNING_LOG("Too many underruns, configured maximum latency of %0.2f ms is reached",
400                 (double)u->max_latency / PA_USEC_PER_MSEC);
401             AUDIO_WARNING_LOG("Consider increasing the max_latency_msec");
402         }
403 
404         UpdateMinimumLatency(u, u->sink_input->sink, false);
405         u->underrun_counter = 0;
406     }
407 
408     /* Allow one underrun per hour */
409     if (u->iteration_counter * u->real_adjust_time / PA_USEC_PER_SEC / TIME_HOUR_TO_SECOND_UNIT > runHours) {
410         u->underrun_counter = PA_CLIP_SUB(u->underrun_counter, 1u);
411         AUDIO_INFO_LOG("Underrun counter: %u", u->underrun_counter);
412     }
413 
414     /* Calculate real adjust time if source or sink did not change and if the system has
415      * not been suspended. If the time between two calls is more than 5% longer than the
416      * configured adjust time, we assume that the system has been sleeping and skip the
417      * calculation for this iteration. */
418     if (CalculateAdjustTime(u, &baseRate, &latencyDifference) != 0) {
419         return;
420     }
421 
422     /* Calculate new rate */
423     newRate = RateController(baseRate, u->real_adjust_time, latencyDifference);
424 
425     u->source_sink_changed = false;
426 
427     /* Set rate */
428     pa_sink_input_set_rate(u->sink_input, newRate);
429     AUDIO_DEBUG_LOG("[%s] Updated sampling rate to %lu Hz.", u->sink_input->sink->name, (unsigned long) newRate);
430 }
431 
432 /* Called from main context */
TimeCallback(pa_mainloop_api * a,pa_time_event * e,const struct timeval * t,void * userdata)433 static void TimeCallback(pa_mainloop_api *a, pa_time_event *e, const struct timeval *t, void *userdata)
434 {
435     struct userdata *u = userdata;
436 
437     pa_assert(u);
438     pa_assert(a);
439     pa_assert(u->time_event == e);
440 
441     /* Restart timer right away */
442     pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
443 
444     /* Get sink and source latency snapshot */
445     pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq,
446         PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, NULL, 0, NULL);
447     pa_asyncmsgq_send(u->source_output->source->asyncmsgq,
448         PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT, NULL, 0, NULL);
449 
450     AdjustRates(u);
451 }
452 
453 /* Called from main context
454  * When source or sink changes,
455  * give it a third of a second to settle down, then call AdjustRates for the first time */
EnableAdjustTimer(struct userdata * u,bool enable)456 static void EnableAdjustTimer(struct userdata *u, bool enable)
457 {
458     if (enable) {
459         if (!u->adjust_time) {
460             return;
461         }
462         if (u->time_event) {
463             u->core->mainloop->time_free(u->time_event);
464         }
465 
466         u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + ADJUST_TIME_FOR_CHANGE, TimeCallback, u);
467     } else {
468         if (!u->time_event) {
469             return;
470         }
471 
472         u->core->mainloop->time_free(u->time_event);
473         u->time_event = NULL;
474     }
475 }
476 
477 /* Called from main context */
UpdateAdjustTimer(struct userdata * u)478 static void UpdateAdjustTimer(struct userdata *u)
479 {
480     if (u->sink_input->state == PA_SINK_INPUT_CORKED || u->source_output->state == PA_SOURCE_OUTPUT_CORKED) {
481         EnableAdjustTimer(u, false);
482     } else {
483         EnableAdjustTimer(u, true);
484     }
485 }
486 
UpdateFixedLatency(struct userdata * u,pa_source * source)487 static void UpdateFixedLatency(struct userdata *u, pa_source *source)
488 {
489     if (u == NULL || source == NULL) {
490         return;
491     }
492     u->min_source_latency = pa_source_get_fixed_latency(source);
493     u->max_source_latency = u->min_source_latency;
494     const char *s;
495     if ((s = pa_proplist_gets(source->proplist, PA_PROP_DEVICE_API))) {
496         if (pa_streq(s, "alsa")) {
497             u->fixed_alsa_source = true;
498         }
499     }
500 }
501 
502 /* Called from main thread
503  * Calculates minimum and maximum possible latency for source and sink */
UpdateLatencyBoundaries(struct userdata * u,pa_source * source,pa_sink * sink)504 static void UpdateLatencyBoundaries(struct userdata *u, pa_source *source, pa_sink *sink)
505 {
506     if (source) {
507         /* Source latencies */
508         u->fixed_alsa_source = false;
509         if (source->flags & PA_SOURCE_DYNAMIC_LATENCY) {
510             pa_source_get_latency_range(source, &u->min_source_latency, &u->max_source_latency);
511         } else {
512             UpdateFixedLatency(u, source);
513         }
514         /* Source offset */
515         u->source_latency_offset = source->port_latency_offset;
516 
517         /* Latencies below 2.5 ms cause problems, limit source latency if possible */
518         if (u->max_source_latency >= MIN_DEVICE_LATENCY) {
519             u->min_source_latency = PA_MAX(u->min_source_latency, MIN_DEVICE_LATENCY);
520         } else {
521             u->min_source_latency = u->max_source_latency;
522         }
523     }
524 
525     if (sink) {
526         /* Sink latencies */
527         if (sink->flags & PA_SINK_DYNAMIC_LATENCY) {
528             pa_sink_get_latency_range(sink, &u->min_sink_latency, &u->max_sink_latency);
529         } else {
530             u->min_sink_latency = pa_sink_get_fixed_latency(sink);
531             u->max_sink_latency = u->min_sink_latency;
532         }
533         /* Sink offset */
534         u->sink_latency_offset = sink->port_latency_offset;
535 
536         /* Latencies below 2.5 ms cause problems, limit sink latency if possible */
537         if (u->max_sink_latency >= MIN_DEVICE_LATENCY) {
538             u->min_sink_latency = PA_MAX(u->min_sink_latency, MIN_DEVICE_LATENCY);
539         } else {
540             u->min_sink_latency = u->max_sink_latency;
541         }
542     }
543 
544     UpdateMinimumLatency(u, sink, true);
545 }
546 
547 /* Called from output context
548  * Sets the memblockq to the configured latency corrected by latency_offset_usec */
MemblockqAdjust(struct userdata * u,int64_t latencyOffsetUsec,bool allowPush)549 static void MemblockqAdjust(struct userdata *u, int64_t latencyOffsetUsec, bool allowPush)
550 {
551     size_t currentMemblockqLength, requestedMemblockqLength, bufferCorrection;
552     int64_t requestedBufferLatency;
553     pa_usec_t finalLatency, requestedSinkLatency;
554 
555     finalLatency = PA_MAX(u->latency, u->output_thread_info.minimum_latency);
556 
557     /* If source or sink have some large negative latency offset, we might want to
558      * hold more than final_latency in the memblockq */
559     requestedBufferLatency = (int64_t)finalLatency - latencyOffsetUsec;
560 
561     /* Keep at least one sink latency in the queue to make sure that the sink
562      * never underruns initially */
563     requestedSinkLatency = pa_sink_get_requested_latency_within_thread(u->sink_input->sink);
564     if (requestedBufferLatency < (int64_t)requestedSinkLatency)
565         requestedBufferLatency = requestedSinkLatency;
566 
567     requestedMemblockqLength = pa_usec_to_bytes(requestedBufferLatency, &u->sink_input->sample_spec);
568     currentMemblockqLength = pa_memblockq_get_length(u->memblockq);
569     if (currentMemblockqLength > requestedMemblockqLength) {
570         /* Drop audio from queue */
571         bufferCorrection = currentMemblockqLength - requestedMemblockqLength;
572         AUDIO_INFO_LOG("Dropping %" PRIu64 " usec of audio from queue",
573             pa_bytes_to_usec(bufferCorrection, &u->sink_input->sample_spec));
574         pa_memblockq_drop(u->memblockq, bufferCorrection);
575     } else if (currentMemblockqLength < requestedMemblockqLength && allowPush) {
576         /* Add silence to queue */
577         bufferCorrection = requestedMemblockqLength - currentMemblockqLength;
578         AUDIO_INFO_LOG("Adding %" PRIu64 " usec of silence to queue",
579             pa_bytes_to_usec(bufferCorrection, &u->sink_input->sample_spec));
580         pa_memblockq_seek(u->memblockq, (int64_t)bufferCorrection, PA_SEEK_RELATIVE, true);
581     }
582 }
583 
584 /* Called from input thread context */
SourceOutputPushCb(pa_source_output * o,const pa_memchunk * chunk)585 static void SourceOutputPushCb(pa_source_output *o, const pa_memchunk *chunk)
586 {
587     struct userdata *u;
588     pa_usec_t pushTime;
589     int64_t currentSourceLatency;
590 
591     pa_source_output_assert_ref(o);
592     pa_source_output_assert_io_context(o);
593     pa_assert_se(u = o->userdata);
594 
595     /* Send current source latency and timestamp with the message */
596     pushTime = pa_rtclock_now();
597     currentSourceLatency = pa_source_get_latency_within_thread(u->source_output->source, true);
598 
599     pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_POST,
600         PA_INT_TO_PTR(currentSourceLatency), pushTime, chunk, NULL);
601     u->send_counter += (int64_t) chunk->length;
602 }
603 
604 /* Called from input thread context */
SourceOutputProcessRewindCb(pa_source_output * o,size_t nbytes)605 static void SourceOutputProcessRewindCb(pa_source_output *o, size_t nbytes)
606 {
607     struct userdata *u;
608 
609     pa_source_output_assert_ref(o);
610     pa_source_output_assert_io_context(o);
611     pa_assert_se(u = o->userdata);
612 
613     pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_REWIND,
614         NULL, (int64_t) nbytes, NULL, NULL);
615     u->send_counter -= (int64_t) nbytes;
616 }
617 
618 /* Called from input thread context */
SourceOutputProcessMsgCb(pa_msgobject * obj,int code,void * data,int64_t offset,pa_memchunk * chunk)619 static int SourceOutputProcessMsgCb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk)
620 {
621     struct userdata *u = PA_SOURCE_OUTPUT(obj)->userdata;
622 
623     if (code == SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT) {
624         size_t length;
625         length = pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq);
626 
627         u->latency_snapshot.send_counter = u->send_counter;
628         /* Add content of delay memblockq to the source latency */
629         u->latency_snapshot.source_latency = pa_source_get_latency_within_thread(u->source_output->source, true) +
630             pa_bytes_to_usec(length, &u->source_output->source->sample_spec);
631         u->latency_snapshot.source_timestamp = pa_rtclock_now();
632 
633         return 0;
634     }
635     return pa_source_output_process_msg(obj, code, data, offset, chunk);
636 }
637 
638 /* Called from main thread.
639  * Get current effective latency of the source. If the source is in use with
640  * smaller latency than the configured latency, it will continue running with
641  * the smaller value when the source output is switched to the source. */
UpdateEffectiveSourceLatency(struct userdata * u,pa_source * source,pa_sink * sink)642 static void UpdateEffectiveSourceLatency(struct userdata *u, pa_source *source, pa_sink *sink)
643 {
644     pa_usec_t effectiveSourceLatency;
645 
646     effectiveSourceLatency = u->configured_source_latency;
647 
648     if (source) {
649         effectiveSourceLatency = pa_source_get_requested_latency(source);
650         if (effectiveSourceLatency == 0 || effectiveSourceLatency > u->configured_source_latency)
651             effectiveSourceLatency = u->configured_source_latency;
652     }
653 
654     /* If the sink is valid, send a message to the output thread, else set the variable directly */
655     if (sink) {
656         pa_asyncmsgq_send(sink->asyncmsgq, PA_MSGOBJECT(u->sink_input),
657             SINK_INPUT_MESSAGE_SET_EFFECTIVE_SOURCE_LATENCY, NULL, (int64_t)effectiveSourceLatency, NULL);
658     } else {
659         u->output_thread_info.effective_source_latency = effectiveSourceLatency;
660     }
661 }
662 
663 /* Called from main thread.
664  * Set source output latency to one third of the overall latency if possible.
665  * The choice of one third is rather arbitrary somewhere between the minimum
666  * possible latency which would cause a lot of CPU load and half the configured
667  * latency which would quickly lead to underruns */
SetSourceOutputLatency(struct userdata * u,pa_source * source)668 static void SetSourceOutputLatency(struct userdata *u, pa_source *source)
669 {
670     pa_usec_t latency, requestedLatency;
671 
672     requestedLatency = u->latency / TRIPLE_FACTOR;
673 
674     /* Normally we try to configure sink and source latency equally. If the
675      * sink latency cannot match the requested source latency try to set the
676      * source latency to a smaller value to avoid underruns */
677     if (u->min_sink_latency > requestedLatency) {
678         latency = PA_MAX(u->latency, u->minimum_latency);
679         requestedLatency = (latency - u->min_sink_latency) / MULTIPLE_FACTOR;
680     }
681 
682     latency = PA_CLAMP(requestedLatency, u->min_source_latency, u->max_source_latency);
683     u->configured_source_latency = pa_source_output_set_requested_latency(u->source_output, latency);
684     if (u->configured_source_latency != requestedLatency)
685         AUDIO_WARNING_LOG("Cannot set requested source latency of %0.2f ms, adjusting to %0.2f ms",
686             (double)requestedLatency / PA_USEC_PER_MSEC, (double)u->configured_source_latency / PA_USEC_PER_MSEC);
687 }
688 
689 /* Called from input thread context */
SourceOutputAttachCb(pa_source_output * o)690 static void SourceOutputAttachCb(pa_source_output *o)
691 {
692     struct userdata *u;
693 
694     pa_source_output_assert_ref(o);
695     pa_source_output_assert_io_context(o);
696     pa_assert_se(u = o->userdata);
697 
698     u->rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
699             o->source->thread_info.rtpoll,
700             PA_RTPOLL_LATE,
701             u->asyncmsgq);
702 }
703 
704 /* Called from input thread context */
SourceOutputDetachCb(pa_source_output * o)705 static void SourceOutputDetachCb(pa_source_output *o)
706 {
707     struct userdata *u;
708 
709     pa_source_output_assert_ref(o);
710     pa_source_output_assert_io_context(o);
711     pa_assert_se(u = o->userdata);
712 
713     if (u->rtpoll_item_write) {
714         pa_rtpoll_item_free(u->rtpoll_item_write);
715         u->rtpoll_item_write = NULL;
716     }
717 }
718 
719 /* Called from main thread */
SourceOutputKillCb(pa_source_output * o)720 static void SourceOutputKillCb(pa_source_output *o)
721 {
722     struct userdata *u;
723 
724     pa_source_output_assert_ref(o);
725     pa_assert_ctl_context();
726     pa_assert_se(u = o->userdata);
727 
728     TearDown(u);
729     pa_module_unload_request(u->module, true);
730 }
731 
732 /* Called from main thread */
SourceOutputSuspendCb(pa_source_output * o,pa_source_state_t oldState,pa_suspend_cause_t oldSuspendCause)733 static void SourceOutputSuspendCb(pa_source_output *o, pa_source_state_t oldState,
734     pa_suspend_cause_t oldSuspendCause)
735 {
736     struct userdata *u;
737     bool suspended;
738 
739     pa_source_output_assert_ref(o);
740     pa_assert_ctl_context();
741     pa_assert_se(u = o->userdata);
742 
743     /* State has not changed, nothing to do */
744     if (oldState == o->source->state) {
745         return;
746     }
747 
748     suspended = (o->source->state == PA_SOURCE_SUSPENDED);
749 
750     /* If the source has been suspended, we need to handle this like
751      * a source change when the source is resumed */
752     if (suspended) {
753         if (u->sink_input->sink) {
754             pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input),
755                 SINK_INPUT_MESSAGE_SOURCE_CHANGED, NULL, 0, NULL);
756         } else {
757             u->output_thread_info.push_called = false;
758         }
759     } else {
760         /* Get effective source latency on unsuspend */
761         UpdateEffectiveSourceLatency(u, u->source_output->source, u->sink_input->sink);
762     }
763 
764     pa_sink_input_cork(u->sink_input, suspended);
765 
766     UpdateAdjustTimer(u);
767 }
768 
769 /* Called from input thread context */
UpdateSourceLatencyRangeCb(pa_source_output * i)770 static void UpdateSourceLatencyRangeCb(pa_source_output *i)
771 {
772     struct userdata *u;
773 
774     pa_source_output_assert_ref(i);
775     pa_source_output_assert_io_context(i);
776     pa_assert_se(u = i->userdata);
777 
778     /* Source latency may have changed */
779     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg),
780         LOOPBACK_MESSAGE_SOURCE_LATENCY_RANGE_CHANGED, NULL, 0, NULL, NULL);
781 }
782 
783 /* Called from output thread context */
SinkInputPopCb(pa_sink_input * i,size_t nbytes,pa_memchunk * chunk)784 static int SinkInputPopCb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk)
785 {
786     struct userdata *u;
787 
788     pa_sink_input_assert_ref(i);
789     pa_sink_input_assert_io_context(i);
790     pa_assert_se(u = i->userdata);
791     pa_assert(chunk);
792 
793     /* It seems necessary to handle outstanding push messages here, though it is not clear
794      * why. Removing this part leads to underruns when low latencies are configured. */
795     u->output_thread_info.in_pop = true;
796     while (pa_asyncmsgq_process_one(u->asyncmsgq) > 0) {
797         ;
798     }
799     u->output_thread_info.in_pop = false;
800 
801     /* While pop has not been called, latency adjustments in SINK_INPUT_MESSAGE_POST are
802      * enabled. Disable them on second pop and enable the final adjustment during the
803      * next push. The adjustment must be done on the next push, because there is no way
804      * to retrieve the source latency here. We are waiting for the second pop, because
805      * the first pop may be called before the sink is actually started. */
806     if (!u->output_thread_info.pop_called && u->output_thread_info.first_pop_done) {
807         u->output_thread_info.pop_adjust = true;
808         u->output_thread_info.pop_called = true;
809     }
810     u->output_thread_info.first_pop_done = true;
811 
812     if (pa_memblockq_peek(u->memblockq, chunk) < 0) {
813         return -1;
814     }
815 
816     chunk->length = PA_MIN(chunk->length, nbytes);
817     pa_memblockq_drop(u->memblockq, chunk->length);
818 
819     /* Adjust the memblockq to ensure that there is
820      * enough data in the queue to avoid underruns. */
821     if (!u->output_thread_info.push_called) {
822         MemblockqAdjust(u, 0, true);
823     }
824 
825     return 0;
826 }
827 
828 /* Called from output thread context */
SinkInputProcessRewindCb(pa_sink_input * i,size_t nbytes)829 static void SinkInputProcessRewindCb(pa_sink_input *i, size_t nbytes)
830 {
831     struct userdata *u;
832 
833     pa_sink_input_assert_ref(i);
834     pa_sink_input_assert_io_context(i);
835     pa_assert_se(u = i->userdata);
836 
837     pa_memblockq_rewind(u->memblockq, nbytes);
838 }
839 
ProcessSinkInputMessagePost(struct userdata * u,void * data,int64_t offset,pa_memchunk * chunk)840 static void ProcessSinkInputMessagePost(struct userdata *u, void *data, int64_t offset, pa_memchunk *chunk)
841 {
842     pa_memblockq_push_align(u->memblockq, chunk);
843 
844     /* If push has not been called yet, latency adjustments in SinkInputPopCb()
845      * are enabled. Disable them on first push and correct the memblockq. If pop
846      * has not been called yet, wait until the pop_cb() requests the adjustment */
847     if (u->output_thread_info.pop_called && (!u->output_thread_info.push_called ||
848         u->output_thread_info.pop_adjust)) {
849         int64_t time_delta;
850 
851         /* This is the source latency at the time push was called */
852         time_delta = PA_PTR_TO_INT(data);
853         /* Add the time between push and post */
854         time_delta += pa_rtclock_now() - (pa_usec_t) offset;
855         /* Add the sink latency */
856         time_delta += pa_sink_get_latency_within_thread(u->sink_input->sink, true);
857 
858         /* The source latency report includes the audio in the chunk,
859          * but since we already pushed the chunk to the memblockq, we need
860          * to subtract the chunk size from the source latency so that it
861          * won't be counted towards both the memblockq latency and the
862          * source latency.
863          *
864          * Sometimes the alsa source reports way too low latency (might
865          * be a bug in the alsa source code). This seems to happen when
866          * there's an overrun. As an attempt to detect overruns, we
867          * check if the chunk size is larger than the configured source
868          * latency. If so, we assume that the source should have pushed
869          * a chunk whose size equals the configured latency, so we
870          * modify time_delta only by that amount, which makes
871          * MemblockqAdjust() drop more data than it would otherwise.
872          * This seems to work quite well, but it's possible that the
873          * next push also contains too much data, and in that case the
874          * resulting latency will be wrong. */
875         if (pa_bytes_to_usec(chunk->length,
876             &u->sink_input->sample_spec) > u->output_thread_info.effective_source_latency) {
877             time_delta -= (int64_t)u->output_thread_info.effective_source_latency;
878         } else {
879             time_delta -= (int64_t)pa_bytes_to_usec(chunk->length, &u->sink_input->sample_spec);
880         }
881 
882         /* FIXME: We allow pushing silence here to fix up the latency. This
883             * might lead to a gap in the stream */
884         MemblockqAdjust(u, time_delta, true);
885 
886         u->output_thread_info.pop_adjust = false;
887         u->output_thread_info.push_called = true;
888     }
889 
890     /* If pop has not been called yet, make sure the latency does not grow too much.
891         * Don't push any silence here, because we already have new data in the queue */
892     if (!u->output_thread_info.pop_called) {
893         MemblockqAdjust(u, 0, false);
894     }
895 
896     /* Is this the end of an underrun? Then let's start things
897      * right-away */
898     if (u->sink_input->sink->thread_info.state != PA_SINK_SUSPENDED &&
899         u->sink_input->thread_info.underrun_for > 0 &&
900         pa_memblockq_is_readable(u->memblockq)) {
901         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), LOOPBACK_MESSAGE_UNDERRUN,
902             NULL, 0, NULL, NULL);
903         /* If called from within the pop callback skip the rewind */
904         if (!u->output_thread_info.in_pop) {
905             AUDIO_DEBUG_LOG("Requesting rewind due to end of underrun.");
906             pa_sink_input_request_rewind(u->sink_input,
907                 (size_t) (u->sink_input->thread_info.underrun_for ==
908                 (size_t) -1 ? 0 : u->sink_input->thread_info.underrun_for),
909                 false, true, false);
910         }
911     }
912 
913     u->output_thread_info.recv_counter += (int64_t) chunk->length;
914 }
915 
916 /* Called from output thread context */
SinkInputProcessMsgCb(pa_msgobject * obj,int code,void * data,int64_t offset,pa_memchunk * chunk)917 static int SinkInputProcessMsgCb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk)
918 {
919     struct userdata *u = PA_SINK_INPUT(obj)->userdata;
920 
921     pa_sink_input_assert_io_context(u->sink_input);
922 
923     switch (code) {
924         case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
925             pa_usec_t *r = data;
926             *r = pa_bytes_to_usec(pa_memblockq_get_length(u->memblockq), &u->sink_input->sample_spec);
927 
928             /* Fall through, the default handler will add in the extra
929              * latency added by the resampler */
930             break;
931         }
932 
933         case SINK_INPUT_MESSAGE_POST:
934             ProcessSinkInputMessagePost(u, data, offset, chunk);
935             return 0;
936 
937         case SINK_INPUT_MESSAGE_REWIND:
938             /* Do not try to rewind if no data was pushed yet */
939             if (u->output_thread_info.push_called) {
940                 pa_memblockq_seek(u->memblockq, -offset, PA_SEEK_RELATIVE, true);
941             }
942             u->output_thread_info.recv_counter -= offset;
943             return 0;
944 
945         case SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT: {
946             size_t length;
947             length = pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq);
948 
949             u->latency_snapshot.recv_counter = u->output_thread_info.recv_counter;
950             u->latency_snapshot.loopback_memblockq_length = pa_memblockq_get_length(u->memblockq);
951             /* Add content of render memblockq to sink latency */
952             u->latency_snapshot.sink_latency = pa_sink_get_latency_within_thread(u->sink_input->sink, true) +
953                 pa_bytes_to_usec(length, &u->sink_input->sink->sample_spec);
954             u->latency_snapshot.sink_timestamp = pa_rtclock_now();
955             return 0;
956         }
957 
958         case SINK_INPUT_MESSAGE_SOURCE_CHANGED:
959             u->output_thread_info.push_called = false;
960             return 0;
961 
962         case SINK_INPUT_MESSAGE_SET_EFFECTIVE_SOURCE_LATENCY:
963             u->output_thread_info.effective_source_latency = (pa_usec_t)offset;
964             return 0;
965 
966         case SINK_INPUT_MESSAGE_UPDATE_MIN_LATENCY:
967             u->output_thread_info.minimum_latency = (pa_usec_t)offset;
968             return 0;
969 
970         case SINK_INPUT_MESSAGE_FAST_ADJUST:
971             MemblockqAdjust(u, offset, true);
972             return 0;
973 
974         default:
975             break;
976     }
977 
978     return pa_sink_input_process_msg(obj, code, data, offset, chunk);
979 }
980 /* Called from main thread.
981  * Set sink input latency to one third of the overall latency if possible.
982  * The choice of one third is rather arbitrary somewhere between the minimum
983  * possible latency which would cause a lot of CPU load and half the configured
984  * latency which would quickly lead to underruns. */
SetSinkInputLatency(struct userdata * u,pa_sink * sink)985 static void SetSinkInputLatency(struct userdata *u, pa_sink *sink)
986 {
987     pa_usec_t latency, requestedLatency;
988 
989     requestedLatency = u->latency / TRIPLE_FACTOR;
990 
991     /* Normally we try to configure sink and source latency equally. If the
992      * source latency cannot match the requested sink latency try to set the
993      * sink latency to a smaller value to avoid underruns */
994     if (u->min_source_latency > requestedLatency) {
995         latency = PA_MAX(u->latency, u->minimum_latency);
996         requestedLatency = (latency - u->min_source_latency) / MULTIPLE_FACTOR;
997     }
998 
999     latency = PA_CLAMP(requestedLatency, u->min_sink_latency, u->max_sink_latency);
1000     u->configured_sink_latency = pa_sink_input_set_requested_latency(u->sink_input, latency);
1001     if (u->configured_sink_latency != requestedLatency) {
1002         AUDIO_WARNING_LOG("Cannot set requested sink latency of %0.2f ms, adjusting to %0.2f ms",
1003             (double)requestedLatency / PA_USEC_PER_MSEC, (double)u->configured_sink_latency / PA_USEC_PER_MSEC);
1004     }
1005 }
1006 
1007 /* Called from output thread context */
SinkInputAttachCb(pa_sink_input * i)1008 static void SinkInputAttachCb(pa_sink_input *i)
1009 {
1010     struct userdata *u;
1011 
1012     pa_sink_input_assert_ref(i);
1013     pa_sink_input_assert_io_context(i);
1014     pa_assert_se(u = i->userdata);
1015 
1016     u->rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
1017             i->sink->thread_info.rtpoll,
1018             PA_RTPOLL_LATE,
1019             u->asyncmsgq);
1020 
1021     pa_memblockq_set_prebuf(u->memblockq, pa_sink_input_get_max_request(i) * MULTIPLE_FACTOR);
1022     pa_memblockq_set_maxrewind(u->memblockq, pa_sink_input_get_max_rewind(i));
1023 }
1024 
1025 /* Called from output thread context */
SinkInputDetachCb(pa_sink_input * i)1026 static void SinkInputDetachCb(pa_sink_input *i)
1027 {
1028     struct userdata *u;
1029 
1030     pa_sink_input_assert_ref(i);
1031     pa_sink_input_assert_io_context(i);
1032     pa_assert_se(u = i->userdata);
1033 
1034     if (u->rtpoll_item_read) {
1035         pa_rtpoll_item_free(u->rtpoll_item_read);
1036         u->rtpoll_item_read = NULL;
1037     }
1038 }
1039 
1040 /* Called from output thread context */
SinkInputUpdateMaxRewindCb(pa_sink_input * i,size_t nbytes)1041 static void SinkInputUpdateMaxRewindCb(pa_sink_input *i, size_t nbytes)
1042 {
1043     struct userdata *u;
1044 
1045     pa_sink_input_assert_ref(i);
1046     pa_sink_input_assert_io_context(i);
1047     pa_assert_se(u = i->userdata);
1048 
1049     pa_memblockq_set_maxrewind(u->memblockq, nbytes);
1050 }
1051 
1052 /* Called from output thread context */
SinkInputUpdateMaxRequestCb(pa_sink_input * i,size_t nbytes)1053 static void SinkInputUpdateMaxRequestCb(pa_sink_input *i, size_t nbytes)
1054 {
1055     struct userdata *u;
1056 
1057     pa_sink_input_assert_ref(i);
1058     pa_sink_input_assert_io_context(i);
1059     pa_assert_se(u = i->userdata);
1060 
1061     pa_memblockq_set_prebuf(u->memblockq, nbytes * MULTIPLE_FACTOR);
1062     AUDIO_INFO_LOG("Max request changed");
1063 }
1064 
1065 /* Called from main thread */
SinkInputKillCb(pa_sink_input * i)1066 static void SinkInputKillCb(pa_sink_input *i)
1067 {
1068     struct userdata *u;
1069 
1070     pa_sink_input_assert_ref(i);
1071     pa_assert_ctl_context();
1072     pa_assert_se(u = i->userdata);
1073 
1074     TearDown(u);
1075     pa_module_unload_request(u->module, true);
1076 }
1077 
1078 /* Called from the output thread context */
SinkInputStateChangeCb(pa_sink_input * i,pa_sink_input_state_t state)1079 static void SinkInputStateChangeCb(pa_sink_input *i, pa_sink_input_state_t state)
1080 {
1081     struct userdata *u;
1082 
1083     pa_sink_input_assert_ref(i);
1084     pa_assert_se(u = i->userdata);
1085 
1086     if (state == PA_SINK_INPUT_UNLINKED) {
1087         pa_asyncmsgq_flush(u->asyncmsgq, false);
1088     }
1089 }
1090 
1091 /* Called from main thread */
SinkInputSuspendCb(pa_sink_input * i,pa_sink_state_t oldState,pa_suspend_cause_t oldSuspendCause)1092 static void SinkInputSuspendCb(pa_sink_input *i, pa_sink_state_t oldState, pa_suspend_cause_t oldSuspendCause)
1093 {
1094     struct userdata *u;
1095     bool suspended;
1096 
1097     pa_sink_input_assert_ref(i);
1098     pa_assert_ctl_context();
1099     pa_assert_se(u = i->userdata);
1100 
1101     /* State has not changed, nothing to do */
1102     if (oldState == i->sink->state) {
1103         return;
1104     }
1105 
1106     suspended = (i->sink->state == PA_SINK_SUSPENDED);
1107 
1108     /* If the sink has been suspended, we need to handle this like
1109      * a sink change when the sink is resumed. Because the sink
1110      * is suspended, we can set the variables directly. */
1111     if (suspended) {
1112         u->output_thread_info.pop_called = false;
1113         u->output_thread_info.first_pop_done = false;
1114     } else {
1115         /* Set effective source latency on unsuspend */
1116         UpdateEffectiveSourceLatency(u, u->source_output->source, u->sink_input->sink);
1117     }
1118 
1119     pa_source_output_cork(u->source_output, suspended);
1120 
1121     UpdateAdjustTimer(u);
1122 }
1123 
1124 /* Called from output thread context */
UpdateSinkLatencyRangeCb(pa_sink_input * i)1125 static void UpdateSinkLatencyRangeCb(pa_sink_input *i)
1126 {
1127     struct userdata *u;
1128 
1129     pa_sink_input_assert_ref(i);
1130     pa_sink_input_assert_io_context(i);
1131     pa_assert_se(u = i->userdata);
1132 
1133     /* Sink latency may have changed */
1134     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg),
1135         LOOPBACK_MESSAGE_SINK_LATENCY_RANGE_CHANGED, NULL, 0, NULL, NULL);
1136 }
1137 
1138 /* Called from main context */
LoopbackProcessMsgCb(pa_msgobject * o,int code,void * userdata,int64_t offset,pa_memchunk * chunk)1139 static int LoopbackProcessMsgCb(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk)
1140 {
1141     struct loopback_msg *msg;
1142     struct userdata *u;
1143     pa_usec_t currentLatency;
1144 
1145     pa_assert(o);
1146     pa_assert_ctl_context();
1147 
1148     msg = LOOPBACK_MSG(o);
1149     pa_assert_se(u = msg->userdata);
1150 
1151     switch (code) {
1152         case LOOPBACK_MESSAGE_SOURCE_LATENCY_RANGE_CHANGED:
1153             UpdateEffectiveSourceLatency(u, u->source_output->source, u->sink_input->sink);
1154             currentLatency = pa_source_get_requested_latency(u->source_output->source);
1155             if (currentLatency > u->configured_source_latency) {
1156                 /* The minimum latency has changed to a value larger than the configured latency, so
1157                  * the source latency has been increased. The case that the minimum latency changes
1158                  * back to a smaller value is not handled because this never happens with the current
1159                  * source implementations. */
1160                 AUDIO_WARNING_LOG("Source minimum latency increased to %0.2f ms",
1161                     (double)currentLatency / PA_USEC_PER_MSEC);
1162                 u->configured_source_latency = currentLatency;
1163                 UpdateLatencyBoundaries(u, u->source_output->source, u->sink_input->sink);
1164                 /* We re-start counting when the latency has changed */
1165                 u->iteration_counter = 0;
1166                 u->underrun_counter = 0;
1167             }
1168             return 0;
1169 
1170         case LOOPBACK_MESSAGE_SINK_LATENCY_RANGE_CHANGED:
1171             currentLatency = pa_sink_get_requested_latency(u->sink_input->sink);
1172             if (currentLatency > u->configured_sink_latency) {
1173                 /* The minimum latency has changed to a value larger than the configured latency, so
1174                  * the sink latency has been increased. The case that the minimum latency changes back
1175                  * to a smaller value is not handled because this never happens with the current sink
1176                  * implementations. */
1177                 AUDIO_WARNING_LOG("Sink minimum latency increased to %0.2f ms",
1178                     (double)currentLatency / PA_USEC_PER_MSEC);
1179                 u->configured_sink_latency = currentLatency;
1180                 UpdateLatencyBoundaries(u, u->source_output->source, u->sink_input->sink);
1181                 /* We re-start counting when the latency has changed */
1182                 u->iteration_counter = 0;
1183                 u->underrun_counter = 0;
1184             }
1185             return 0;
1186 
1187         case LOOPBACK_MESSAGE_UNDERRUN:
1188             u->underrun_counter++;
1189             AUDIO_DEBUG_LOG("Underrun detected, counter incremented to %u", u->underrun_counter);
1190             return 0;
1191         default:
1192             break;
1193     }
1194 
1195     return 0;
1196 }
1197 
SinkPortLatencyOffsetChangedCb(pa_core * core,pa_sink * sink,struct userdata * u)1198 static pa_hook_result_t SinkPortLatencyOffsetChangedCb(pa_core *core, pa_sink *sink, struct userdata *u)
1199 {
1200     if (sink != u->sink_input->sink) {
1201         return PA_HOOK_OK;
1202     }
1203 
1204     u->sink_latency_offset = sink->port_latency_offset;
1205     UpdateMinimumLatency(u, sink, true);
1206 
1207     return PA_HOOK_OK;
1208 }
1209 
SourcePortLatencyOffsetChangedCb(pa_core * core,pa_source * source,struct userdata * u)1210 static pa_hook_result_t SourcePortLatencyOffsetChangedCb(pa_core *core, pa_source *source, struct userdata *u)
1211 {
1212     if (source != u->source_output->source) {
1213         return PA_HOOK_OK;
1214     }
1215 
1216     u->source_latency_offset = source->port_latency_offset;
1217     UpdateMinimumLatency(u, u->sink_input->sink, true);
1218 
1219     return PA_HOOK_OK;
1220 }
1221 
InitFailed(pa_module * m,pa_modargs * ma)1222 int InitFailed(pa_module *m, pa_modargs *ma)
1223 {
1224     AUDIO_ERR_LOG("Init Loopback failed.");
1225     if (ma) {
1226         pa_modargs_free(ma);
1227     }
1228 
1229     pa__done(m);
1230 
1231     return -1;
1232 }
1233 
InitSampleSpecAndChannelMap(pa_modargs * ma,struct userdata * u,pa_sink * sink,pa_source * source)1234 static int InitSampleSpecAndChannelMap(pa_modargs *ma, struct userdata *u, pa_sink *sink, pa_source *source)
1235 {
1236     if (source) {
1237         u->ss = source->sample_spec;
1238         u->map = source->channel_map;
1239     } else if (sink) {
1240         u->ss = sink->sample_spec;
1241         u->map = sink->channel_map;
1242     } else {
1243         /* FIXME: Dummy stream format, needed because pa_sink_input_new()
1244          * requires valid sample spec and channel map even when all the FIX_*
1245          * stream flags are specified. pa_sink_input_new() should be changed
1246          * to ignore the sample spec and channel map when the FIX_* flags are
1247          * present. */
1248         u->ss.format = PA_SAMPLE_U8;
1249         u->ss.rate = DEFAULT_SAMPLE_RATE;
1250         u->ss.channels = 1;
1251         u->map.channels = 1;
1252         u->map.map[0] = PA_CHANNEL_POSITION_MONO;
1253     }
1254 
1255     if (pa_modargs_get_sample_spec_and_channel_map(ma, &(u->ss), &(u->map), PA_CHANNEL_MAP_DEFAULT) < 0) {
1256         AUDIO_ERR_LOG("Invalid sample format specification or channel map");
1257         return -1;
1258     }
1259 
1260     if (u->ss.rate < MIN_SAMPLE_RATE || u->ss.rate > PA_RATE_MAX) {
1261         AUDIO_ERR_LOG("Invalid rate specification, valid range is 4000 Hz to %i Hz", PA_RATE_MAX);
1262         return -1;
1263     }
1264 
1265     return 0;
1266 }
1267 
InitUserData(struct userdata * u)1268 static void InitUserData(struct userdata *u)
1269 {
1270     uint32_t latencyMsec;
1271     uint32_t maxLatencyMsec;
1272     uint32_t fastAdjustThreshold;
1273     uint32_t adjustTimeSec;
1274     latencyMsec = DEFAULT_LATENCY_MSEC;
1275     fastAdjustThreshold = 0;
1276     maxLatencyMsec = 0;
1277 
1278     if (maxLatencyMsec < latencyMsec) {
1279         AUDIO_WARNING_LOG("Configured maximum latency is smaller than latency, using latency instead");
1280         maxLatencyMsec = latencyMsec;
1281     }
1282 
1283     u->latency = (pa_usec_t) latencyMsec * PA_USEC_PER_MSEC;
1284     u->max_latency = (pa_usec_t) maxLatencyMsec * PA_USEC_PER_MSEC;
1285     u->output_thread_info.pop_called = false;
1286     u->output_thread_info.pop_adjust = false;
1287     u->output_thread_info.push_called = false;
1288     u->iteration_counter = 0;
1289     u->underrun_counter = 0;
1290     u->underrun_latency_limit = 0;
1291     u->source_sink_changed = true;
1292     u->real_adjust_time_sum = 0;
1293     u->adjust_counter = 0;
1294     u->fast_adjust_threshold = fastAdjustThreshold * PA_USEC_PER_MSEC;
1295 
1296     adjustTimeSec = DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC;
1297     if (adjustTimeSec != DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC) {
1298         u->adjust_time = adjustTimeSec * PA_USEC_PER_SEC;
1299     } else {
1300         u->adjust_time = DEFAULT_ADJUST_TIME_USEC;
1301     }
1302 
1303     u->real_adjust_time = u->adjust_time;
1304 }
1305 
CreateSinkInput(pa_module * m,pa_modargs * ma,struct userdata * u,pa_sink * sink)1306 static int CreateSinkInput(pa_module *m, pa_modargs *ma, struct userdata *u, pa_sink *sink)
1307 {
1308     pa_sink_input_new_data sinkInputData;
1309 
1310     pa_sink_input_new_data_init(&sinkInputData);
1311     sinkInputData.driver = __FILE__;
1312     sinkInputData.module = m;
1313 
1314     if (sink) {
1315         pa_sink_input_new_data_set_sink(&sinkInputData, sink, false, true);
1316     }
1317 
1318     if (pa_modargs_get_proplist(ma, "sink_input_properties", sinkInputData.proplist, PA_UPDATE_REPLACE) < 0) {
1319         AUDIO_ERR_LOG("Failed to parse the sink_input_properties value.");
1320         pa_sink_input_new_data_done(&sinkInputData);
1321         return -1;
1322     }
1323 
1324     if (!pa_proplist_contains(sinkInputData.proplist, PA_PROP_MEDIA_ROLE)) {
1325         pa_proplist_sets(sinkInputData.proplist, PA_PROP_MEDIA_ROLE, "abstract");
1326     }
1327 
1328     pa_sink_input_new_data_set_sample_spec(&sinkInputData, &(u->ss));
1329     pa_sink_input_new_data_set_channel_map(&sinkInputData, &(u->map));
1330     sinkInputData.flags = PA_SINK_INPUT_VARIABLE_RATE | PA_SINK_INPUT_START_CORKED | PA_SINK_INPUT_DONT_MOVE;
1331 
1332     pa_sink_input_new(&u->sink_input, m->core, &sinkInputData);
1333     pa_sink_input_new_data_done(&sinkInputData);
1334 
1335     if (!u->sink_input) {
1336         AUDIO_ERR_LOG("Create sink input failed");
1337         return -1;
1338     }
1339     return 0;
1340 }
1341 
ConfigSinkInput(struct userdata * u,pa_source * source)1342 static int ConfigSinkInput(struct userdata *u, pa_source *source)
1343 {
1344     const char *n;
1345     pa_memchunk silence;
1346 
1347     u->sink_input->parent.process_msg = SinkInputProcessMsgCb;
1348     u->sink_input->pop = SinkInputPopCb;
1349     u->sink_input->process_rewind = SinkInputProcessRewindCb;
1350     u->sink_input->kill = SinkInputKillCb;
1351     u->sink_input->state_change = SinkInputStateChangeCb;
1352     u->sink_input->attach = SinkInputAttachCb;
1353     u->sink_input->detach = SinkInputDetachCb;
1354     u->sink_input->update_max_rewind = SinkInputUpdateMaxRewindCb;
1355     u->sink_input->update_max_request = SinkInputUpdateMaxRequestCb;
1356     u->sink_input->suspend = SinkInputSuspendCb;
1357     u->sink_input->update_sink_latency_range = UpdateSinkLatencyRangeCb;
1358     u->sink_input->update_sink_fixed_latency = UpdateSinkLatencyRangeCb;
1359     u->sink_input->userdata = u;
1360 
1361     UpdateLatencyBoundaries(u, u->source_output->source, u->sink_input->sink);
1362     SetSinkInputLatency(u, u->sink_input->sink);
1363     SetSourceOutputLatency(u, u->source_output->source);
1364 
1365     pa_sink_input_get_silence(u->sink_input, &silence);
1366     u->memblockq = pa_memblockq_new(
1367             "module-loopback memblockq",
1368             0,                      /* idx */
1369             MEMBLOCKQ_MAXLENGTH,    /* maxlength */
1370             MEMBLOCKQ_MAXLENGTH,    /* tlength */
1371             &(u->ss),                    /* sample_spec */
1372             0,                      /* prebuf */
1373             0,                      /* minreq */
1374             0,                      /* maxrewind */
1375             &silence);              /* silence frame */
1376     pa_memblock_unref(silence.memblock);
1377     /* Fill the memblockq with silence */
1378     pa_memblockq_seek(u->memblockq, pa_usec_to_bytes(u->latency, &u->sink_input->sample_spec), PA_SEEK_RELATIVE, true);
1379 
1380     u->asyncmsgq = pa_asyncmsgq_new(0);
1381     if (!u->asyncmsgq) {
1382         AUDIO_ERR_LOG("pa_asyncmsgq_new() failed.");
1383         return -1;
1384     }
1385 
1386     if (!pa_proplist_contains(u->source_output->proplist, PA_PROP_MEDIA_NAME)) {
1387         pa_proplist_setf(u->source_output->proplist, PA_PROP_MEDIA_NAME, "Loopback to %s",
1388             pa_strnull(pa_proplist_gets(u->sink_input->sink->proplist, PA_PROP_DEVICE_DESCRIPTION)));
1389     }
1390 
1391     if (!pa_proplist_contains(u->source_output->proplist, PA_PROP_MEDIA_ICON_NAME) &&
1392         (n = pa_proplist_gets(u->sink_input->sink->proplist, PA_PROP_DEVICE_ICON_NAME))) {
1393         pa_proplist_sets(u->source_output->proplist, PA_PROP_MEDIA_ICON_NAME, n);
1394     }
1395 
1396     if (!pa_proplist_contains(u->sink_input->proplist, PA_PROP_MEDIA_NAME)) {
1397         pa_proplist_setf(u->sink_input->proplist, PA_PROP_MEDIA_NAME, "Loopback from %s",
1398             pa_strnull(pa_proplist_gets(u->source_output->source->proplist, PA_PROP_DEVICE_DESCRIPTION)));
1399     }
1400 
1401     if (source && !pa_proplist_contains(u->sink_input->proplist, PA_PROP_MEDIA_ICON_NAME) &&
1402         (n = pa_proplist_gets(u->source_output->source->proplist, PA_PROP_DEVICE_ICON_NAME))) {
1403         pa_proplist_sets(u->sink_input->proplist, PA_PROP_MEDIA_ICON_NAME, n);
1404     }
1405 
1406     return 0;
1407 }
1408 
CreateSourceOutput(pa_module * m,pa_modargs * ma,struct userdata * u,pa_source * source)1409 static int CreateSourceOutput(pa_module *m, pa_modargs *ma, struct userdata *u, pa_source *source)
1410 {
1411     pa_source_output_new_data sourceOutputData;
1412     pa_sample_spec ss = u->ss;
1413     pa_channel_map map = u->map;
1414 
1415     pa_source_output_new_data_init(&sourceOutputData);
1416     sourceOutputData.driver = __FILE__;
1417     sourceOutputData.module = m;
1418     if (source) {
1419         pa_source_output_new_data_set_source(&sourceOutputData, source, false, true);
1420     }
1421 
1422     if (pa_modargs_get_proplist(ma, "source_output_properties", sourceOutputData.proplist, PA_UPDATE_REPLACE) < 0) {
1423         AUDIO_ERR_LOG("Failed to parse the source_output_properties value.");
1424         pa_source_output_new_data_done(&sourceOutputData);
1425         return -1;
1426     }
1427 
1428     if (!pa_proplist_contains(sourceOutputData.proplist, PA_PROP_MEDIA_ROLE))
1429         pa_proplist_sets(sourceOutputData.proplist, PA_PROP_MEDIA_ROLE, "abstract");
1430 
1431     pa_source_output_new_data_set_sample_spec(&sourceOutputData, &ss);
1432     pa_source_output_new_data_set_channel_map(&sourceOutputData, &map);
1433     sourceOutputData.flags = PA_SOURCE_OUTPUT_START_CORKED | PA_SOURCE_OUTPUT_DONT_MOVE;
1434 
1435     AUDIO_INFO_LOG("sourceOutputData.driver:%{public}s, module:%{pulic}s, source_output addr:%{public}p",
1436         sourceOutputData.driver, sourceOutputData.module->name, &u->source_output);
1437 
1438     pa_source_output_new(&u->source_output, m->core, &sourceOutputData);
1439     AUDIO_INFO_LOG("pa_source_output_new DONE");
1440     pa_source_output_new_data_done(&sourceOutputData);
1441 
1442     if (!u->source_output) {
1443         AUDIO_ERR_LOG("Create source output failed");
1444         return -1;
1445     }
1446     return 0;
1447 }
1448 
ConfigSourceOutput(struct userdata * u)1449 static void ConfigSourceOutput(struct userdata *u)
1450 {
1451     u->source_output->parent.process_msg = SourceOutputProcessMsgCb;
1452     u->source_output->push = SourceOutputPushCb;
1453     u->source_output->process_rewind = SourceOutputProcessRewindCb;
1454     u->source_output->kill = SourceOutputKillCb;
1455     u->source_output->attach = SourceOutputAttachCb;
1456     u->source_output->detach = SourceOutputDetachCb;
1457     u->source_output->suspend = SourceOutputSuspendCb;
1458     u->source_output->update_source_latency_range = UpdateSourceLatencyRangeCb;
1459     u->source_output->update_source_fixed_latency = UpdateSourceLatencyRangeCb;
1460     u->source_output->userdata = u;
1461 
1462     /* If format, rate or channels were originally unset, they are set now
1463      * after the pa_source_output_new() call. */
1464     u->ss = u->source_output->sample_spec;
1465     u->map = u->source_output->channel_map;
1466 }
1467 
ConfigLoopback(pa_module * m,struct userdata * u)1468 static void ConfigLoopback(pa_module *m, struct userdata *u)
1469 {
1470     /* Hooks to track changes of latency offsets */
1471     pa_module_hook_connect(m, &m->core->hooks[PA_CORE_HOOK_SINK_PORT_LATENCY_OFFSET_CHANGED],
1472         PA_HOOK_NORMAL, (pa_hook_cb_t) SinkPortLatencyOffsetChangedCb, u);
1473     pa_module_hook_connect(m, &m->core->hooks[PA_CORE_HOOK_SOURCE_PORT_LATENCY_OFFSET_CHANGED],
1474         PA_HOOK_NORMAL, (pa_hook_cb_t) SourcePortLatencyOffsetChangedCb, u);
1475 
1476     /* Setup message handler for main thread */
1477     u->msg = pa_msgobject_new(loopback_msg);
1478     u->msg->parent.process_msg = LoopbackProcessMsgCb;
1479     u->msg->userdata = u;
1480 
1481     /* The output thread is not yet running, set effective_source_latency directly */
1482     UpdateEffectiveSourceLatency(u, u->source_output->source, NULL);
1483 
1484     pa_sink_input_put(u->sink_input);
1485     pa_source_output_put(u->source_output);
1486 
1487     if (u->source_output->source->state != PA_SOURCE_SUSPENDED) {
1488         pa_sink_input_cork(u->sink_input, false);
1489     }
1490 
1491     if (u->sink_input->sink->state != PA_SINK_SUSPENDED) {
1492         pa_source_output_cork(u->source_output, false);
1493     }
1494 }
1495 
pa__init(pa_module * m)1496 int pa__init(pa_module *m)
1497 {
1498     pa_modargs *ma = NULL;
1499     struct userdata *u;
1500     pa_sink *sink = NULL;
1501     pa_source *source = NULL;
1502     const char *n;
1503 
1504     pa_assert(m);
1505 
1506     if (!(ma = pa_modargs_new(m->argument, VALID_MODARGS))) {
1507         AUDIO_ERR_LOG("Failed to parse module arguments");
1508         return InitFailed(m, ma);
1509     }
1510 
1511     n = pa_modargs_get_value(ma, "source", NULL);
1512     if (n && !(source = pa_namereg_get(m->core, n, PA_NAMEREG_SOURCE))) {
1513         AUDIO_ERR_LOG("No such source.");
1514         return InitFailed(m, ma);
1515     }
1516 
1517     n = pa_modargs_get_value(ma, "sink", NULL);
1518     if (n && !(sink = pa_namereg_get(m->core, n, PA_NAMEREG_SINK))) {
1519         AUDIO_ERR_LOG("No such sink.");
1520         return InitFailed(m, ma);
1521     }
1522 
1523     m->userdata = u = pa_xnew0(struct userdata, 1);
1524     u->core = m->core;
1525     u->module = m;
1526 
1527     if (InitSampleSpecAndChannelMap(ma, u, sink, source) != 0) {
1528         return InitFailed(m, ma);
1529     }
1530     InitUserData(u);
1531 
1532     if (CreateSourceOutput(m, ma, u, source) != 0) {
1533         return InitFailed(m, ma);
1534     }
1535     ConfigSourceOutput(u);
1536 
1537     if (CreateSinkInput(m, ma, u, sink) != 0 || ConfigSinkInput(u, source) != 0) {
1538         return InitFailed(m, ma);
1539     }
1540 
1541     ConfigLoopback(m, u);
1542 
1543     pa_modargs_free(ma);
1544     return 0;
1545 }
1546 
pa__done(pa_module * m)1547 void pa__done(pa_module*m)
1548 {
1549     struct userdata *u;
1550     pa_assert(m);
1551 
1552     if (!(u = m->userdata)) {
1553         return;
1554     }
1555 
1556     TearDown(u);
1557 
1558     if (u->memblockq) {
1559         pa_memblockq_free(u->memblockq);
1560     }
1561     if (u->asyncmsgq) {
1562         pa_asyncmsgq_unref(u->asyncmsgq);
1563     }
1564 
1565     pa_xfree(u);
1566 }