• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /***
2   This file is part of PulseAudio.
3 
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6 
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11 
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16 
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20 
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24 
25 #include "restart-module.h"
26 
27 #include <unistd.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <sys/types.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33 
34 #ifdef HAVE_X11
35 #include <xcb/xcb.h>
36 #endif
37 
38 #include <pulse/rtclock.h>
39 #include <pulse/timeval.h>
40 #include <pulse/util.h>
41 #include <pulse/version.h>
42 #include <pulse/xmalloc.h>
43 
44 #include <pulsecore/module.h>
45 #include <pulsecore/core-util.h>
46 #include <pulsecore/modargs.h>
47 #include <pulsecore/log.h>
48 #include <pulsecore/core-subscribe.h>
49 #include <pulsecore/pdispatch.h>
50 #include <pulsecore/pstream.h>
51 #include <pulsecore/pstream-util.h>
52 #include <pulsecore/socket-client.h>
53 
54 #ifdef USE_SMOOTHER_2
55 #include <pulsecore/time-smoother_2.h>
56 #else
57 #include <pulsecore/time-smoother.h>
58 #endif
59 
60 #include <pulsecore/thread.h>
61 #include <pulsecore/thread-mq.h>
62 #include <pulsecore/core-rtclock.h>
63 #include <pulsecore/core-error.h>
64 #include <pulsecore/proplist-util.h>
65 #include <pulsecore/auth-cookie.h>
66 #include <pulsecore/mcalign.h>
67 #include <pulsecore/strlist.h>
68 
69 #ifdef HAVE_X11
70 #include <pulsecore/x11prop.h>
71 #endif
72 
73 #define ENV_DEFAULT_SINK "PULSE_SINK"
74 #define ENV_DEFAULT_SOURCE "PULSE_SOURCE"
75 #define ENV_DEFAULT_SERVER "PULSE_SERVER"
76 #define ENV_COOKIE_FILE "PULSE_COOKIE"
77 
78 #ifdef TUNNEL_SINK
79 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
80 PA_MODULE_USAGE(
81         "sink_name=<name for the local sink> "
82         "sink_properties=<properties for the local sink> "
83         "auto=<determine server/sink/cookie automatically> "
84         "server=<address> "
85         "sink=<remote sink name> "
86         "reconnect_interval_ms=<interval to try reconnects, 0 or omitted if disabled> "
87         "cookie=<filename> "
88         "format=<sample format> "
89         "channels=<number of channels> "
90         "rate=<sample rate> "
91         "latency_msec=<fixed latency in ms> "
92         "channel_map=<channel map>");
93 #else
94 PA_MODULE_DESCRIPTION("Tunnel module for sources");
95 PA_MODULE_USAGE(
96         "source_name=<name for the local source> "
97         "source_properties=<properties for the local source> "
98         "auto=<determine server/source/cookie automatically> "
99         "server=<address> "
100         "source=<remote source name> "
101         "reconnect_interval_ms=<interval to try reconnects, 0 or omitted if disabled> "
102         "cookie=<filename> "
103         "format=<sample format> "
104         "channels=<number of channels> "
105         "rate=<sample rate> "
106         "latency_msec=<fixed latency in ms> "
107         "channel_map=<channel map>");
108 #endif
109 
110 PA_MODULE_AUTHOR("Lennart Poettering");
111 PA_MODULE_VERSION(PACKAGE_VERSION);
112 PA_MODULE_LOAD_ONCE(false);
113 
114 static const char* const valid_modargs[] = {
115     "auto",
116     "server",
117     "cookie",
118     "format",
119     "channels",
120     "rate",
121     "latency_msec",
122     "reconnect_interval_ms",
123 #ifdef TUNNEL_SINK
124     "sink_name",
125     "sink_properties",
126     "sink",
127 #else
128     "source_name",
129     "source_properties",
130     "source",
131 #endif
132     "channel_map",
133     NULL,
134 };
135 
136 #define DEFAULT_TIMEOUT 5
137 
138 #define LATENCY_INTERVAL (1*PA_USEC_PER_SEC)
139 
140 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
141 
142 #ifdef TUNNEL_SINK
143 
144 enum {
145     SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
146     SINK_MESSAGE_REMOTE_SUSPEND,
147     SINK_MESSAGE_UPDATE_LATENCY,
148     SINK_MESSAGE_GET_LATENCY_SNAPSHOT,
149     SINK_MESSAGE_POST,
150 };
151 
152 #define DEFAULT_LATENCY_MSEC 100
153 
154 #else
155 
156 enum {
157     SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
158     SOURCE_MESSAGE_REMOTE_SUSPEND,
159     SOURCE_MESSAGE_UPDATE_LATENCY,
160     SOURCE_MESSAGE_GET_LATENCY_SNAPSHOT,
161 };
162 
163 #define DEFAULT_LATENCY_MSEC 25
164 
165 #endif
166 
167 struct tunnel_msg {
168     pa_msgobject parent;
169 };
170 
171 typedef struct tunnel_msg tunnel_msg;
172 PA_DEFINE_PRIVATE_CLASS(tunnel_msg, pa_msgobject);
173 
174 enum {
175     TUNNEL_MESSAGE_MAYBE_RESTART,
176 };
177 
178 static int do_init(pa_module *m);
179 static void do_done(pa_module *m);
180 
181 #ifdef TUNNEL_SINK
182 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
183 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
184 #endif
185 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
186 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
187 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
188 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
189 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
190 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
191 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
192 
193 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
194 #ifdef TUNNEL_SINK
195     [PA_COMMAND_REQUEST] = command_request,
196     [PA_COMMAND_STARTED] = command_started,
197 #endif
198     [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
199     [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
200     [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
201     [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
202     [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
203     [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
204     [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
205     [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
206     [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
207     [PA_COMMAND_PLAYBACK_STREAM_EVENT] = command_stream_or_client_event,
208     [PA_COMMAND_RECORD_STREAM_EVENT] = command_stream_or_client_event,
209     [PA_COMMAND_CLIENT_EVENT] = command_stream_or_client_event,
210     [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed,
211     [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed,
212     [PA_COMMAND_UNDERFLOW_OHOS] = command_overflow_or_underflow,
213 };
214 
215 struct userdata {
216     pa_core *core;
217     pa_module *module;
218 
219     pa_thread_mq thread_mq;
220     pa_rtpoll *rtpoll;
221     pa_thread *thread;
222 
223     pa_socket_client *client;
224     pa_pstream *pstream;
225     pa_pdispatch *pdispatch;
226 
227     char *server_name;
228 #ifdef TUNNEL_SINK
229     char *sink_name;
230     char *configured_sink_name;
231     pa_sink *sink;
232     size_t requested_bytes;
233 #else
234     char *source_name;
235     char *configured_source_name;
236     pa_source *source;
237     pa_mcalign *mcalign;
238 #endif
239 
240     pa_auth_cookie *auth_cookie;
241 
242     uint32_t version;
243     uint32_t ctag;
244     uint32_t device_index;
245     uint32_t channel;
246     uint32_t latency;
247 
248     int64_t counter;
249     uint64_t receive_counter;
250     uint64_t receive_snapshot;
251 
252     bool remote_corked:1;
253     bool remote_suspended:1;
254     bool shutting_down:1;
255 
256     pa_usec_t transport_usec; /* maintained in the main thread */
257     pa_usec_t thread_transport_usec; /* maintained in the IO thread */
258 
259     uint32_t ignore_latency_before;
260 
261     pa_time_event *time_event;
262 
263 #ifdef USE_SMOOTHER_2
264     pa_smoother_2 *smoother;
265 #else
266     pa_smoother *smoother;
267 #endif
268 
269     char *device_description;
270     char *server_fqdn;
271     char *user_name;
272 
273     uint32_t maxlength;
274 #ifdef TUNNEL_SINK
275     uint32_t tlength;
276     uint32_t minreq;
277     uint32_t prebuf;
278 
279     pa_proplist *sink_proplist;
280 #else
281     uint32_t fragsize;
282 
283     pa_proplist *source_proplist;
284 #endif
285 
286     pa_sample_spec sample_spec;
287     pa_channel_map channel_map;
288 
289     tunnel_msg *msg;
290 
291     pa_iochannel *io;
292 
293     pa_usec_t reconnect_interval_us;
294     pa_usec_t snapshot_time;
295 };
296 
297 struct module_restart_data {
298     struct userdata *userdata;
299     pa_restart_data *restart_data;
300 };
301 
302 static void request_latency(struct userdata *u);
303 #ifdef TUNNEL_SINK
304 static void create_sink(struct userdata *u);
305 static void on_sink_created(struct userdata *u);
306 #else
307 static void create_source(struct userdata *u);
308 static void on_source_created(struct userdata *u);
309 #endif
310 
311 /* Do a reinit of the module.  Note that u will be freed as a result of this
312  * call. */
unload_module(struct module_restart_data * rd)313 static void unload_module(struct module_restart_data *rd) {
314     struct userdata *u = rd->userdata;
315 
316     if (rd->restart_data) {
317         pa_log_debug("Restart already pending");
318         return;
319     }
320 
321     if (u->reconnect_interval_us > 0) {
322         /* The handle returned here must be freed when do_init() was successful and when the
323          * module exits. */
324         rd->restart_data = pa_restart_module_reinit(u->module, do_init, do_done, u->reconnect_interval_us);
325     } else
326         pa_module_unload_request(u->module, true);
327 }
328 
329 /* Called from main context */
command_stream_or_client_event(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)330 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
331     pa_log_debug("Got stream or client event.");
332 }
333 
334 /* Called from main context */
command_stream_killed(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)335 static void command_stream_killed(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
336     struct userdata *u = userdata;
337 
338     pa_assert(pd);
339     pa_assert(t);
340     pa_assert(u);
341     pa_assert(u->pdispatch == pd);
342 
343     pa_log_warn("Stream killed");
344     unload_module(u->module->userdata);
345 }
346 
347 /* Called from main context */
command_overflow_or_underflow(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)348 static void command_overflow_or_underflow(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
349     struct userdata *u = userdata;
350 
351     pa_assert(pd);
352     pa_assert(t);
353     pa_assert(u);
354     pa_assert(u->pdispatch == pd);
355 
356     pa_log_info("Server signalled buffer overrun/underrun.");
357     request_latency(u);
358 }
359 
360 /* Called from main context */
command_suspended(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)361 static void command_suspended(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
362     struct userdata *u = userdata;
363     uint32_t channel;
364     bool suspended;
365 
366     pa_assert(pd);
367     pa_assert(t);
368     pa_assert(u);
369     pa_assert(u->pdispatch == pd);
370 
371     if (pa_tagstruct_getu32(t, &channel) < 0 ||
372         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
373         !pa_tagstruct_eof(t)) {
374 
375         pa_log("Invalid packet.");
376         unload_module(u->module->userdata);
377         return;
378     }
379 
380     pa_log_debug("Server reports device suspend.");
381 
382 #ifdef TUNNEL_SINK
383     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(suspended), 0, NULL);
384 #else
385     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(suspended), 0, NULL);
386 #endif
387 
388     request_latency(u);
389 }
390 
391 /* Called from main context */
command_moved(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)392 static void command_moved(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
393     struct userdata *u = userdata;
394     uint32_t channel, di;
395     const char *dn;
396     bool suspended;
397 
398     pa_assert(pd);
399     pa_assert(t);
400     pa_assert(u);
401     pa_assert(u->pdispatch == pd);
402 
403     if (pa_tagstruct_getu32(t, &channel) < 0 ||
404         pa_tagstruct_getu32(t, &di) < 0 ||
405         pa_tagstruct_gets(t, &dn) < 0 ||
406         pa_tagstruct_get_boolean(t, &suspended) < 0) {
407 
408         pa_log_error("Invalid packet.");
409         unload_module(u->module->userdata);
410         return;
411     }
412 
413     pa_log_debug("Server reports a stream move.");
414 
415 #ifdef TUNNEL_SINK
416     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(suspended), 0, NULL);
417 #else
418     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(suspended), 0, NULL);
419 #endif
420 
421     request_latency(u);
422 }
423 
command_stream_buffer_attr_changed(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)424 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
425     struct userdata *u = userdata;
426     uint32_t channel, maxlength, tlength = 0, fragsize, prebuf, minreq;
427     pa_usec_t usec;
428 
429     pa_assert(pd);
430     pa_assert(t);
431     pa_assert(u);
432     pa_assert(u->pdispatch == pd);
433 
434     if (pa_tagstruct_getu32(t, &channel) < 0 ||
435         pa_tagstruct_getu32(t, &maxlength) < 0) {
436 
437         pa_log_error("Invalid packet.");
438         unload_module(u->module->userdata);
439         return;
440     }
441 
442     if (command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED) {
443         if (pa_tagstruct_getu32(t, &fragsize) < 0 ||
444             pa_tagstruct_get_usec(t, &usec) < 0) {
445 
446             pa_log_error("Invalid packet.");
447             unload_module(u->module->userdata);
448             return;
449         }
450     } else {
451         if (pa_tagstruct_getu32(t, &tlength) < 0 ||
452             pa_tagstruct_getu32(t, &prebuf) < 0 ||
453             pa_tagstruct_getu32(t, &minreq) < 0 ||
454             pa_tagstruct_get_usec(t, &usec) < 0) {
455 
456             pa_log_error("Invalid packet.");
457             unload_module(u->module->userdata);
458             return;
459         }
460     }
461 
462 #ifdef TUNNEL_SINK
463     pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength, (unsigned long) u->tlength);
464 #endif
465 
466     request_latency(u);
467 }
468 
469 #ifdef TUNNEL_SINK
470 
471 /* Called from main context */
command_started(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)472 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
473     struct userdata *u = userdata;
474 
475     pa_assert(pd);
476     pa_assert(t);
477     pa_assert(u);
478     pa_assert(u->pdispatch == pd);
479 
480     pa_log_debug("Server reports playback started.");
481     request_latency(u);
482 }
483 
484 #endif
485 
486 /* Called from IO thread context */
check_smoother_status(struct userdata * u,bool past)487 static void check_smoother_status(struct userdata *u, bool past) {
488     pa_usec_t x;
489 
490     pa_assert(u);
491 
492     x = pa_rtclock_now();
493 
494     /* Correct by the time the requested issued needs to travel to the
495      * other side.  This is a valid thread-safe access, because the
496      * main thread is waiting for us */
497 
498     if (past)
499         x -= u->thread_transport_usec;
500     else
501         x += u->thread_transport_usec;
502 
503     if (u->remote_suspended || u->remote_corked)
504 #ifdef USE_SMOOTHER_2
505         pa_smoother_2_pause(u->smoother, x);
506     else
507         pa_smoother_2_resume(u->smoother, x);
508 #else
509         pa_smoother_pause(u->smoother, x);
510     else
511         pa_smoother_resume(u->smoother, x, true);
512 #endif
513 }
514 
515 /* Called from IO thread context */
stream_cork_within_thread(struct userdata * u,bool cork)516 static void stream_cork_within_thread(struct userdata *u, bool cork) {
517     pa_assert(u);
518 
519     if (u->remote_corked == cork)
520         return;
521 
522     u->remote_corked = cork;
523     check_smoother_status(u, false);
524 }
525 
526 /* Called from main context */
stream_cork(struct userdata * u,bool cork)527 static void stream_cork(struct userdata *u, bool cork) {
528     pa_tagstruct *t;
529     pa_assert(u);
530 
531     if (!u->pstream)
532         return;
533 
534     t = pa_tagstruct_new();
535 #ifdef TUNNEL_SINK
536     pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
537 #else
538     pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
539 #endif
540     pa_tagstruct_putu32(t, u->ctag++);
541     pa_tagstruct_putu32(t, u->channel);
542     pa_tagstruct_put_boolean(t, cork);
543     pa_pstream_send_tagstruct(u->pstream, t);
544 
545     request_latency(u);
546 }
547 
548 /* Called from IO thread context */
stream_suspend_within_thread(struct userdata * u,bool suspend)549 static void stream_suspend_within_thread(struct userdata *u, bool suspend) {
550     pa_assert(u);
551 
552     if (u->remote_suspended == suspend)
553         return;
554 
555     u->remote_suspended = suspend;
556     check_smoother_status(u, true);
557 }
558 
559 #ifdef TUNNEL_SINK
560 
561 /* Called from IO thread context */
send_data(struct userdata * u)562 static void send_data(struct userdata *u) {
563     pa_assert(u);
564 
565     while (u->requested_bytes > 0) {
566         pa_memchunk memchunk;
567 
568         pa_sink_render(u->sink, u->requested_bytes, &memchunk);
569         pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
570         pa_memblock_unref(memchunk.memblock);
571 
572         u->requested_bytes -= memchunk.length;
573 
574         u->counter += (int64_t) memchunk.length;
575     }
576 }
577 
578 /* This function is called from IO context -- except when it is not. */
sink_process_msg(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)579 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
580     struct userdata *u = PA_SINK(o)->userdata;
581 
582     switch (code) {
583 
584         case PA_SINK_MESSAGE_SET_STATE: {
585             int r;
586 
587             /* First, change the state, because otherwise pa_sink_render() would fail */
588             if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
589 
590                 stream_cork_within_thread(u, u->sink->thread_info.state == PA_SINK_SUSPENDED);
591 
592                 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
593                     send_data(u);
594             }
595 
596             return r;
597         }
598 
599         case PA_SINK_MESSAGE_GET_LATENCY: {
600             int64_t *usec = data;
601 
602 #ifdef USE_SMOOTHER_2
603             *usec = pa_smoother_2_get_delay(u->smoother, pa_rtclock_now(), u->counter);
604 #else
605             pa_usec_t yl, yr;
606 
607             yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
608             yr = pa_smoother_get(u->smoother, pa_rtclock_now());
609 
610             *usec = (int64_t)yl - yr;
611 #endif
612             return 0;
613         }
614 
615         case SINK_MESSAGE_GET_LATENCY_SNAPSHOT: {
616             int64_t *send_counter = data;
617 
618             *send_counter = u->counter;
619             return 0;
620         }
621 
622         case SINK_MESSAGE_REQUEST:
623 
624             pa_assert(offset > 0);
625             u->requested_bytes += (size_t) offset;
626 
627             if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
628                 send_data(u);
629 
630             return 0;
631 
632         case SINK_MESSAGE_REMOTE_SUSPEND:
633 
634             stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
635             return 0;
636 
637         case SINK_MESSAGE_UPDATE_LATENCY: {
638 #ifdef USE_SMOOTHER_2
639             int64_t bytes;
640 
641             if (offset < 0)
642                 bytes = - pa_usec_to_bytes(- offset, &u->sink->sample_spec);
643             else
644                 bytes = pa_usec_to_bytes(offset, &u->sink->sample_spec);
645 
646             if (u->counter > bytes)
647                 bytes = u->counter - bytes;
648             else
649                 bytes = 0;
650 
651             /* We may use u->snapshot time because the main thread is waiting */
652              pa_smoother_2_put(u->smoother, u->snapshot_time, bytes);
653 #else
654             pa_usec_t y;
655 
656             y = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
657 
658             if (y > (pa_usec_t) offset)
659                 y -= (pa_usec_t) offset;
660             else
661                 y = 0;
662 
663             /* We may use u->snapshot time because the main thread is waiting */
664             pa_smoother_put(u->smoother, u->snapshot_time, y);
665 #endif
666 
667             /* We can access this freely here, since the main thread is waiting for us */
668             u->thread_transport_usec = u->transport_usec;
669 
670             return 0;
671         }
672 
673         case SINK_MESSAGE_POST:
674 
675             /* OK, This might be a bit confusing. This message is
676              * delivered to us from the main context -- NOT from the
677              * IO thread context where the rest of the messages are
678              * dispatched. Yeah, ugly, but I am a lazy bastard. */
679 
680             pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
681 
682             u->receive_counter += chunk->length;
683 
684             return 0;
685     }
686 
687     return pa_sink_process_msg(o, code, data, offset, chunk);
688 }
689 
690 /* Called from main context */
sink_set_state_in_main_thread_cb(pa_sink * s,pa_sink_state_t state,pa_suspend_cause_t suspend_cause)691 static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) {
692     struct userdata *u;
693     pa_sink_assert_ref(s);
694     u = s->userdata;
695 
696     /* It may be that only the suspend cause is changing, in which
697      * case there's nothing to do. */
698     if (state == s->state)
699         return 0;
700 
701     switch ((pa_sink_state_t) state) {
702 
703         case PA_SINK_SUSPENDED:
704             pa_assert(PA_SINK_IS_OPENED(s->state));
705             stream_cork(u, true);
706             break;
707 
708         case PA_SINK_IDLE:
709         case PA_SINK_RUNNING:
710             if (s->state == PA_SINK_SUSPENDED)
711                 stream_cork(u, false);
712             break;
713 
714         case PA_SINK_UNLINKED:
715         case PA_SINK_INIT:
716         case PA_SINK_INVALID_STATE:
717             ;
718     }
719 
720     return 0;
721 }
722 
723 #else
724 
725 /* This function is called from IO context -- except when it is not. */
source_process_msg(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)726 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
727     struct userdata *u = PA_SOURCE(o)->userdata;
728 
729     switch (code) {
730 
731         case PA_SOURCE_MESSAGE_SET_STATE: {
732             int r;
733 
734             if ((r = pa_source_process_msg(o, code, data, offset, chunk)) >= 0)
735                 stream_cork_within_thread(u, u->source->thread_info.state == PA_SOURCE_SUSPENDED);
736 
737             return r;
738         }
739 
740         case PA_SOURCE_MESSAGE_GET_LATENCY: {
741             int64_t *usec = data;
742 
743 #ifdef USE_SMOOTHER_2
744             *usec = - pa_smoother_2_get_delay(u->smoother, pa_rtclock_now(), u->counter);
745 #else
746             pa_usec_t yr, yl;
747 
748             yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SOURCE(o)->sample_spec);
749             yr = pa_smoother_get(u->smoother, pa_rtclock_now());
750 
751             *usec = (int64_t)yr - yl;
752 #endif
753             return 0;
754         }
755 
756         case SOURCE_MESSAGE_GET_LATENCY_SNAPSHOT: {
757             int64_t *send_counter = data;
758 
759             *send_counter = u->counter;
760             return 0;
761         }
762 
763         case SOURCE_MESSAGE_POST: {
764             pa_memchunk c;
765 
766             pa_mcalign_push(u->mcalign, chunk);
767 
768             while (pa_mcalign_pop(u->mcalign, &c) >= 0) {
769 
770                 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
771                     pa_source_post(u->source, &c);
772 
773                 pa_memblock_unref(c.memblock);
774 
775                 u->counter += (int64_t) c.length;
776             }
777 
778             return 0;
779         }
780 
781         case SOURCE_MESSAGE_REMOTE_SUSPEND:
782 
783             stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
784             return 0;
785 
786         case SOURCE_MESSAGE_UPDATE_LATENCY: {
787 #ifdef USE_SMOOTHER_2
788             int64_t bytes;
789 
790             if (offset < 0)
791                 bytes = - pa_usec_to_bytes(- offset, &u->source->sample_spec);
792             else
793                 bytes = pa_usec_to_bytes(offset, &u->source->sample_spec);
794 
795             bytes += u->counter;
796 
797             /* We may use u->snapshot time because the main thread is waiting */
798             pa_smoother_2_put(u->smoother, u->snapshot_time, bytes);
799 #else
800             pa_usec_t y;
801 
802             y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec);
803             y += offset;
804 
805             /* We may use u->snapshot time because the main thread is waiting */
806             pa_smoother_put(u->smoother, u->snapshot_time, y);
807 #endif
808 
809             /* We can access this freely here, since the main thread is waiting for us */
810             u->thread_transport_usec = u->transport_usec;
811 
812             return 0;
813         }
814     }
815 
816     return pa_source_process_msg(o, code, data, offset, chunk);
817 }
818 
819 /* Called from main context */
source_set_state_in_main_thread_cb(pa_source * s,pa_source_state_t state,pa_suspend_cause_t suspend_cause)820 static int source_set_state_in_main_thread_cb(pa_source *s, pa_source_state_t state, pa_suspend_cause_t suspend_cause) {
821     struct userdata *u;
822     pa_source_assert_ref(s);
823     u = s->userdata;
824 
825     /* It may be that only the suspend cause is changing, in which
826      * case there's nothing to do. */
827     if (state == s->state)
828         return 0;
829 
830     switch ((pa_source_state_t) state) {
831 
832         case PA_SOURCE_SUSPENDED:
833             pa_assert(PA_SOURCE_IS_OPENED(s->state));
834             stream_cork(u, true);
835             break;
836 
837         case PA_SOURCE_IDLE:
838         case PA_SOURCE_RUNNING:
839             if (s->state == PA_SOURCE_SUSPENDED)
840                 stream_cork(u, false);
841             break;
842 
843         case PA_SOURCE_UNLINKED:
844         case PA_SOURCE_INIT:
845         case PA_SOURCE_INVALID_STATE:
846             ;
847     }
848 
849     return 0;
850 }
851 
852 #endif
853 
thread_func(void * userdata)854 static void thread_func(void *userdata) {
855     struct userdata *u = userdata;
856 
857     pa_assert(u);
858 
859     pa_log_debug("Thread starting up");
860 
861     pa_thread_mq_install(&u->thread_mq);
862 
863     for (;;) {
864         int ret;
865 
866 #ifdef TUNNEL_SINK
867         if (u->sink && PA_UNLIKELY(u->sink->thread_info.rewind_requested))
868             pa_sink_process_rewind(u->sink, 0);
869 #endif
870 
871         if ((ret = pa_rtpoll_run(u->rtpoll)) < 0)
872             goto fail;
873 
874         if (ret == 0)
875             goto finish;
876     }
877 
878 fail:
879     /* If this was no regular exit from the loop we have to continue
880      * processing messages until we received PA_MESSAGE_SHUTDOWN */
881     pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_MAYBE_RESTART, u, 0, NULL, NULL);
882     pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
883 
884 finish:
885     pa_log_debug("Thread shutting down");
886 }
887 
888 #ifdef TUNNEL_SINK
889 /* Called from main context */
command_request(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)890 static void command_request(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
891     struct userdata *u = userdata;
892     uint32_t bytes, channel;
893 
894     pa_assert(pd);
895     pa_assert(command == PA_COMMAND_REQUEST);
896     pa_assert(t);
897     pa_assert(u);
898     pa_assert(u->pdispatch == pd);
899 
900     if (pa_tagstruct_getu32(t, &channel) < 0 ||
901         pa_tagstruct_getu32(t, &bytes) < 0) {
902         pa_log("Invalid protocol reply");
903         goto fail;
904     }
905 
906     if (channel != u->channel) {
907         pa_log("Received data for invalid channel");
908         goto fail;
909     }
910 
911     pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
912     return;
913 
914 fail:
915     unload_module(u->module->userdata);
916 }
917 
918 #endif
919 
920 /* Called from main context */
stream_get_latency_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)921 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
922     struct userdata *u = userdata;
923     pa_usec_t sink_usec, source_usec;
924     bool playing;
925     int64_t write_index, read_index;
926     struct timeval local, remote, now;
927     pa_sample_spec *ss;
928     int64_t delay;
929 #ifdef TUNNEL_SINK
930     uint64_t send_counter;
931 #endif
932 
933     pa_assert(pd);
934     pa_assert(u);
935 
936     if (command != PA_COMMAND_REPLY) {
937         if (command == PA_COMMAND_ERROR)
938             pa_log("Failed to get latency.");
939         else
940             pa_log("Protocol error.");
941         goto fail;
942     }
943 
944     if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
945         pa_tagstruct_get_usec(t, &source_usec) < 0 ||
946         pa_tagstruct_get_boolean(t, &playing) < 0 ||
947         pa_tagstruct_get_timeval(t, &local) < 0 ||
948         pa_tagstruct_get_timeval(t, &remote) < 0 ||
949         pa_tagstruct_gets64(t, &write_index) < 0 ||
950         pa_tagstruct_gets64(t, &read_index) < 0) {
951         pa_log("Invalid reply.");
952         goto fail;
953     }
954 
955 #ifdef TUNNEL_SINK
956     if (u->version >= 13) {
957         uint64_t underrun_for = 0, playing_for = 0;
958 
959         if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
960             pa_tagstruct_getu64(t, &playing_for) < 0) {
961             pa_log("Invalid reply.");
962             goto fail;
963         }
964     }
965 #endif
966 
967     if (!pa_tagstruct_eof(t)) {
968         pa_log("Invalid reply.");
969         goto fail;
970     }
971 
972     if (tag < u->ignore_latency_before) {
973         return;
974     }
975 
976     pa_gettimeofday(&now);
977 
978     /* Calculate transport usec */
979     if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now) < 0) {
980         /* local and remote seem to have synchronized clocks */
981 #ifdef TUNNEL_SINK
982         u->transport_usec = pa_timeval_diff(&remote, &local);
983 #else
984         u->transport_usec = pa_timeval_diff(&now, &remote);
985 #endif
986     } else
987         u->transport_usec = pa_timeval_diff(&now, &local)/2;
988 
989     /* First, take the device's delay */
990 #ifdef TUNNEL_SINK
991     delay = (int64_t) sink_usec;
992     ss = &u->sink->sample_spec;
993 #else
994     delay = (int64_t) source_usec;
995     ss = &u->source->sample_spec;
996 #endif
997 
998     /* Add the length of our server-side buffer */
999     if (write_index >= read_index)
1000         delay += (int64_t) pa_bytes_to_usec((uint64_t) (write_index-read_index), ss);
1001     else
1002         delay -= (int64_t) pa_bytes_to_usec((uint64_t) (read_index-write_index), ss);
1003 
1004     /* Our measurements are already out of date, hence correct by the     *
1005      * transport latency */
1006 #ifdef TUNNEL_SINK
1007     delay -= (int64_t) u->transport_usec;
1008 #else
1009     delay += (int64_t) u->transport_usec;
1010 #endif
1011 
1012     /* Now correct by what we have have written since we requested the update. This
1013      * is not necessary for the source, because if data is received between request
1014      * and reply, it was already posted before we requested the source latency. */
1015 #ifdef TUNNEL_SINK
1016     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_GET_LATENCY_SNAPSHOT, &send_counter, 0, NULL);
1017     delay += (int64_t) pa_bytes_to_usec(send_counter - u->receive_snapshot, ss);
1018 #endif
1019 
1020     /* It may take some time before the async message is executed, so we take a timestamp here */
1021     u->snapshot_time = pa_rtclock_now();
1022 
1023 #ifdef TUNNEL_SINK
1024     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
1025 #else
1026     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
1027 #endif
1028 
1029     return;
1030 
1031 fail:
1032 
1033     unload_module(u->module->userdata);
1034 }
1035 
1036 /* Called from main context */
request_latency(struct userdata * u)1037 static void request_latency(struct userdata *u) {
1038     pa_tagstruct *t;
1039     struct timeval now;
1040     uint32_t tag;
1041     pa_assert(u);
1042 
1043     t = pa_tagstruct_new();
1044 #ifdef TUNNEL_SINK
1045     pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
1046 #else
1047     pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
1048 #endif
1049     pa_tagstruct_putu32(t, tag = u->ctag++);
1050     pa_tagstruct_putu32(t, u->channel);
1051 
1052     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1053 
1054     pa_pstream_send_tagstruct(u->pstream, t);
1055     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
1056 
1057     u->ignore_latency_before = tag;
1058     u->receive_snapshot = u->receive_counter;
1059 }
1060 
1061 /* Called from main context */
timeout_callback(pa_mainloop_api * m,pa_time_event * e,const struct timeval * t,void * userdata)1062 static void timeout_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
1063     struct userdata *u = userdata;
1064 
1065     pa_assert(m);
1066     pa_assert(e);
1067     pa_assert(u);
1068 
1069     request_latency(u);
1070 
1071     pa_core_rttime_restart(u->core, e, pa_rtclock_now() + LATENCY_INTERVAL);
1072 }
1073 
1074 /* Called from main context */
update_description(struct userdata * u)1075 static void update_description(struct userdata *u) {
1076     char *d;
1077     char un[128], hn[128];
1078     pa_tagstruct *t;
1079 
1080     pa_assert(u);
1081 
1082     if (!u->server_fqdn || !u->user_name || !u->device_description)
1083         return;
1084 
1085     d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
1086 
1087 #ifdef TUNNEL_SINK
1088     pa_sink_set_description(u->sink, d);
1089     pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
1090     pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
1091     pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
1092 #else
1093     pa_source_set_description(u->source, d);
1094     pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
1095     pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
1096     pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
1097 #endif
1098 
1099     pa_xfree(d);
1100 
1101     d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
1102                           pa_get_user_name(un, sizeof(un)),
1103                           pa_get_host_name(hn, sizeof(hn)));
1104 
1105     t = pa_tagstruct_new();
1106 #ifdef TUNNEL_SINK
1107     pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
1108 #else
1109     pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
1110 #endif
1111     pa_tagstruct_putu32(t, u->ctag++);
1112     pa_tagstruct_putu32(t, u->channel);
1113     pa_tagstruct_puts(t, d);
1114     pa_pstream_send_tagstruct(u->pstream, t);
1115 
1116     pa_xfree(d);
1117 }
1118 
1119 /* Called from main context */
server_info_cb(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1120 static void server_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1121     struct userdata *u = userdata;
1122     pa_sample_spec ss;
1123     pa_channel_map cm;
1124     const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
1125     uint32_t cookie;
1126 
1127     pa_assert(pd);
1128     pa_assert(u);
1129 
1130     if (command != PA_COMMAND_REPLY) {
1131         if (command == PA_COMMAND_ERROR)
1132             pa_log("Failed to get info.");
1133         else
1134             pa_log("Protocol error.");
1135         goto fail;
1136     }
1137 
1138     if (pa_tagstruct_gets(t, &server_name) < 0 ||
1139         pa_tagstruct_gets(t, &server_version) < 0 ||
1140         pa_tagstruct_gets(t, &user_name) < 0 ||
1141         pa_tagstruct_gets(t, &host_name) < 0 ||
1142         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1143         pa_tagstruct_gets(t, &default_sink_name) < 0 ||
1144         pa_tagstruct_gets(t, &default_source_name) < 0 ||
1145         pa_tagstruct_getu32(t, &cookie) < 0 ||
1146         (u->version >= 15 && pa_tagstruct_get_channel_map(t, &cm) < 0)) {
1147 
1148         pa_log("Parse failure");
1149         goto fail;
1150     }
1151 
1152     if (!pa_tagstruct_eof(t)) {
1153         pa_log("Packet too long");
1154         goto fail;
1155     }
1156 
1157     pa_xfree(u->server_fqdn);
1158     u->server_fqdn = pa_xstrdup(host_name);
1159 
1160     pa_xfree(u->user_name);
1161     u->user_name = pa_xstrdup(user_name);
1162 
1163     update_description(u);
1164 
1165     return;
1166 
1167 fail:
1168     unload_module(u->module->userdata);
1169 }
1170 
read_ports(struct userdata * u,pa_tagstruct * t)1171 static int read_ports(struct userdata *u, pa_tagstruct *t) {
1172     if (u->version >= 16) {
1173         uint32_t n_ports;
1174         const char *s;
1175 
1176         if (pa_tagstruct_getu32(t, &n_ports)) {
1177             pa_log("Parse failure");
1178             return -PA_ERR_PROTOCOL;
1179         }
1180 
1181         for (uint32_t j = 0; j < n_ports; j++) {
1182             uint32_t priority;
1183 
1184             if (pa_tagstruct_gets(t, &s) < 0 || /* name */
1185                 pa_tagstruct_gets(t, &s) < 0 || /* description */
1186                 pa_tagstruct_getu32(t, &priority) < 0) {
1187 
1188                 pa_log("Parse failure");
1189                 return -PA_ERR_PROTOCOL;
1190             }
1191             if (u->version >= 24) {
1192                 if (pa_tagstruct_getu32(t, &priority) < 0) { /* available */
1193                     pa_log("Parse failure");
1194                     return -PA_ERR_PROTOCOL;
1195                 }
1196                 if (u->version >= 34 &&
1197                     (pa_tagstruct_gets(t, &s) < 0 || /* availability group */
1198                      pa_tagstruct_getu32(t, &priority) < 0)) { /* device port type */
1199                     pa_log("Parse failure");
1200                     return -PA_ERR_PROTOCOL;
1201                 }
1202             }
1203         }
1204 
1205         if (pa_tagstruct_gets(t, &s) < 0) { /* active port */
1206             pa_log("Parse failure");
1207             return -PA_ERR_PROTOCOL;
1208         }
1209     }
1210     return 0;
1211 }
1212 
read_formats(struct userdata * u,pa_tagstruct * t)1213 static int read_formats(struct userdata *u, pa_tagstruct *t) {
1214     uint8_t n_formats;
1215     pa_format_info *format;
1216 
1217     if (pa_tagstruct_getu8(t, &n_formats) < 0) { /* no. of formats */
1218         pa_log("Parse failure");
1219         return -PA_ERR_PROTOCOL;
1220     }
1221 
1222     for (uint8_t j = 0; j < n_formats; j++) {
1223         format = pa_format_info_new();
1224         if (pa_tagstruct_get_format_info(t, format)) { /* format info */
1225             pa_format_info_free(format);
1226             pa_log("Parse failure");
1227             return -PA_ERR_PROTOCOL;
1228         }
1229         pa_format_info_free(format);
1230     }
1231     return 0;
1232 }
1233 
1234 #ifdef TUNNEL_SINK
1235 
1236 /* Called from main context */
sink_info_cb(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1237 static void sink_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1238     struct userdata *u = userdata;
1239     uint32_t idx, owner_module, monitor_source, flags;
1240     const char *name, *description, *monitor_source_name, *driver;
1241     pa_sample_spec ss;
1242     pa_channel_map cm;
1243     pa_cvolume volume;
1244     bool mute;
1245     pa_usec_t latency;
1246 
1247     pa_assert(pd);
1248     pa_assert(u);
1249 
1250     if (command != PA_COMMAND_REPLY) {
1251         if (command == PA_COMMAND_ERROR)
1252             pa_log("Failed to get info.");
1253         else
1254             pa_log("Protocol error.");
1255         goto fail;
1256     }
1257 
1258     if (pa_tagstruct_getu32(t, &idx) < 0 ||
1259         pa_tagstruct_gets(t, &name) < 0 ||
1260         pa_tagstruct_gets(t, &description) < 0 ||
1261         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1262         pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1263         pa_tagstruct_getu32(t, &owner_module) < 0 ||
1264         pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1265         pa_tagstruct_get_boolean(t, &mute) < 0 ||
1266         pa_tagstruct_getu32(t, &monitor_source) < 0 ||
1267         pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
1268         pa_tagstruct_get_usec(t, &latency) < 0 ||
1269         pa_tagstruct_gets(t, &driver) < 0 ||
1270         pa_tagstruct_getu32(t, &flags) < 0) {
1271 
1272         pa_log("Parse failure");
1273         goto fail;
1274     }
1275 
1276     if (u->version >= 13) {
1277         pa_usec_t configured_latency;
1278 
1279         if (pa_tagstruct_get_proplist(t, NULL) < 0 ||
1280             pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1281 
1282             pa_log("Parse failure");
1283             goto fail;
1284         }
1285     }
1286 
1287     if (u->version >= 15) {
1288         pa_volume_t base_volume;
1289         uint32_t state, n_volume_steps, card;
1290 
1291         if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1292             pa_tagstruct_getu32(t, &state) < 0 ||
1293             pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1294             pa_tagstruct_getu32(t, &card) < 0) {
1295 
1296             pa_log("Parse failure");
1297             goto fail;
1298         }
1299     }
1300 
1301     if (read_ports(u, t) < 0)
1302         goto fail;
1303 
1304     if (u->version >= 21 && read_formats(u, t) < 0)
1305         goto fail;
1306 
1307     if (!pa_tagstruct_eof(t)) {
1308         pa_log("Packet too long");
1309         goto fail;
1310     }
1311 
1312     if (!u->sink_name || !pa_streq(name, u->sink_name))
1313         return;
1314 
1315     pa_xfree(u->device_description);
1316     u->device_description = pa_xstrdup(description);
1317 
1318     update_description(u);
1319 
1320     return;
1321 
1322 fail:
1323     unload_module(u->module->userdata);
1324 }
1325 
1326 /* Called from main context */
sink_input_info_cb(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1327 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1328     struct userdata *u = userdata;
1329     uint32_t idx, owner_module, client, sink;
1330     pa_usec_t buffer_usec, sink_usec;
1331     const char *name, *driver, *resample_method;
1332     bool mute = false;
1333     pa_sample_spec sample_spec;
1334     pa_channel_map channel_map;
1335     pa_cvolume volume;
1336     bool b;
1337 
1338     pa_assert(pd);
1339     pa_assert(u);
1340 
1341     if (command != PA_COMMAND_REPLY) {
1342         if (command == PA_COMMAND_ERROR)
1343             pa_log("Failed to get info.");
1344         else
1345             pa_log("Protocol error.");
1346         goto fail;
1347     }
1348 
1349     if (pa_tagstruct_getu32(t, &idx) < 0 ||
1350         pa_tagstruct_gets(t, &name) < 0 ||
1351         pa_tagstruct_getu32(t, &owner_module) < 0 ||
1352         pa_tagstruct_getu32(t, &client) < 0 ||
1353         pa_tagstruct_getu32(t, &sink) < 0 ||
1354         pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1355         pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1356         pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1357         pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1358         pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1359         pa_tagstruct_gets(t, &resample_method) < 0 ||
1360         pa_tagstruct_gets(t, &driver) < 0) {
1361 
1362         pa_log("Parse failure");
1363         goto fail;
1364     }
1365 
1366     if (u->version >= 11) {
1367         if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1368 
1369             pa_log("Parse failure");
1370             goto fail;
1371         }
1372     }
1373 
1374     if (u->version >= 13) {
1375         if (pa_tagstruct_get_proplist(t, NULL) < 0) {
1376 
1377             pa_log("Parse failure");
1378             goto fail;
1379         }
1380     }
1381 
1382     if (u->version >= 19) {
1383         if (pa_tagstruct_get_boolean(t, &b) < 0) {
1384 
1385             pa_log("Parse failure");
1386             goto fail;
1387         }
1388     }
1389 
1390     if (u->version >= 20) {
1391         if (pa_tagstruct_get_boolean(t, &b) < 0 ||
1392             pa_tagstruct_get_boolean(t, &b) < 0) {
1393 
1394             pa_log("Parse failure");
1395             goto fail;
1396         }
1397     }
1398 
1399     if (u->version >= 21) {
1400         pa_format_info *format = pa_format_info_new();
1401 
1402         if (pa_tagstruct_get_format_info(t, format) < 0) {
1403             pa_format_info_free(format);
1404             pa_log("Parse failure");
1405             goto fail;
1406         }
1407         pa_format_info_free(format);
1408     }
1409 
1410     if (!pa_tagstruct_eof(t)) {
1411         pa_log("Packet too long");
1412         goto fail;
1413     }
1414 
1415     if (idx != u->device_index)
1416         return;
1417 
1418     pa_assert(u->sink);
1419 
1420     if ((u->version < 11 || mute == u->sink->muted) &&
1421         pa_cvolume_equal(&volume, &u->sink->real_volume))
1422         return;
1423 
1424     pa_sink_volume_changed(u->sink, &volume);
1425 
1426     if (u->version >= 11)
1427         pa_sink_mute_changed(u->sink, mute);
1428 
1429     return;
1430 
1431 fail:
1432     unload_module(u->module->userdata);
1433 }
1434 
1435 #else
1436 
1437 /* Called from main context */
source_info_cb(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1438 static void source_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1439     struct userdata *u = userdata;
1440     uint32_t idx, owner_module, monitor_of_sink, flags;
1441     const char *name, *description, *monitor_of_sink_name, *driver;
1442     pa_sample_spec ss;
1443     pa_channel_map cm;
1444     pa_cvolume volume;
1445     bool mute;
1446     pa_usec_t latency, configured_latency;
1447 
1448     pa_assert(pd);
1449     pa_assert(u);
1450 
1451     if (command != PA_COMMAND_REPLY) {
1452         if (command == PA_COMMAND_ERROR)
1453             pa_log("Failed to get info.");
1454         else
1455             pa_log("Protocol error.");
1456         goto fail;
1457     }
1458 
1459     if (pa_tagstruct_getu32(t, &idx) < 0 ||
1460         pa_tagstruct_gets(t, &name) < 0 ||
1461         pa_tagstruct_gets(t, &description) < 0 ||
1462         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1463         pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1464         pa_tagstruct_getu32(t, &owner_module) < 0 ||
1465         pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1466         pa_tagstruct_get_boolean(t, &mute) < 0 ||
1467         pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1468         pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1469         pa_tagstruct_get_usec(t, &latency) < 0 ||
1470         pa_tagstruct_gets(t, &driver) < 0 ||
1471         pa_tagstruct_getu32(t, &flags) < 0) {
1472 
1473         pa_log("Parse failure");
1474         goto fail;
1475     }
1476 
1477     if (u->version >= 13) {
1478         if (pa_tagstruct_get_proplist(t, NULL) < 0 ||
1479             pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1480 
1481             pa_log("Parse failure");
1482             goto fail;
1483         }
1484     }
1485 
1486     if (u->version >= 15) {
1487         pa_volume_t base_volume;
1488         uint32_t state, n_volume_steps, card;
1489 
1490         if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1491             pa_tagstruct_getu32(t, &state) < 0 ||
1492             pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1493             pa_tagstruct_getu32(t, &card) < 0) {
1494 
1495             pa_log("Parse failure");
1496             goto fail;
1497         }
1498     }
1499 
1500     if (read_ports(u, t) < 0)
1501         goto fail;
1502 
1503     if (u->version >= 22 && read_formats(u, t) < 0)
1504         goto fail;
1505 
1506     if (!pa_tagstruct_eof(t)) {
1507         pa_log("Packet too long");
1508         goto fail;
1509     }
1510 
1511     if (!u->source_name || !pa_streq(name, u->source_name))
1512         return;
1513 
1514     pa_xfree(u->device_description);
1515     u->device_description = pa_xstrdup(description);
1516 
1517     update_description(u);
1518 
1519     return;
1520 
1521 fail:
1522     unload_module(u->module->userdata);
1523 }
1524 
1525 #endif
1526 
1527 /* Called from main context */
request_info(struct userdata * u)1528 static void request_info(struct userdata *u) {
1529     pa_tagstruct *t;
1530     uint32_t tag;
1531     pa_assert(u);
1532 
1533     t = pa_tagstruct_new();
1534     pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1535     pa_tagstruct_putu32(t, tag = u->ctag++);
1536     pa_pstream_send_tagstruct(u->pstream, t);
1537     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1538 
1539 #ifdef TUNNEL_SINK
1540     t = pa_tagstruct_new();
1541     pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1542     pa_tagstruct_putu32(t, tag = u->ctag++);
1543     pa_tagstruct_putu32(t, u->device_index);
1544     pa_pstream_send_tagstruct(u->pstream, t);
1545     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1546 
1547     if (u->sink_name) {
1548         t = pa_tagstruct_new();
1549         pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1550         pa_tagstruct_putu32(t, tag = u->ctag++);
1551         pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1552         pa_tagstruct_puts(t, u->sink_name);
1553         pa_pstream_send_tagstruct(u->pstream, t);
1554         pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1555     }
1556 #else
1557     if (u->source_name) {
1558         t = pa_tagstruct_new();
1559         pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1560         pa_tagstruct_putu32(t, tag = u->ctag++);
1561         pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1562         pa_tagstruct_puts(t, u->source_name);
1563         pa_pstream_send_tagstruct(u->pstream, t);
1564         pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1565     }
1566 #endif
1567 }
1568 
1569 /* Called from main context */
command_subscribe_event(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1570 static void command_subscribe_event(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1571     struct userdata *u = userdata;
1572     pa_subscription_event_type_t e;
1573     uint32_t idx;
1574 
1575     pa_assert(pd);
1576     pa_assert(t);
1577     pa_assert(u);
1578     pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1579 
1580     if (pa_tagstruct_getu32(t, &e) < 0 ||
1581         pa_tagstruct_getu32(t, &idx) < 0) {
1582         pa_log("Invalid protocol reply");
1583         unload_module(u->module->userdata);
1584         return;
1585     }
1586 
1587     if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1588 #ifdef TUNNEL_SINK
1589         e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1590         e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1591 #else
1592         e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1593 #endif
1594         )
1595         return;
1596 
1597     request_info(u);
1598 }
1599 
1600 /* Called from main context */
start_subscribe(struct userdata * u)1601 static void start_subscribe(struct userdata *u) {
1602     pa_tagstruct *t;
1603     pa_assert(u);
1604 
1605     t = pa_tagstruct_new();
1606     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1607     pa_tagstruct_putu32(t, u->ctag++);
1608     pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1609 #ifdef TUNNEL_SINK
1610                         PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1611 #else
1612                         PA_SUBSCRIPTION_MASK_SOURCE
1613 #endif
1614                         );
1615 
1616     pa_pstream_send_tagstruct(u->pstream, t);
1617 }
1618 
1619 /* Called from main context */
create_stream_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1620 static void create_stream_callback(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1621     struct userdata *u = userdata;
1622 #ifdef TUNNEL_SINK
1623     uint32_t bytes;
1624 #endif
1625 
1626     pa_assert(pd);
1627     pa_assert(u);
1628     pa_assert(u->pdispatch == pd);
1629 
1630     if (command != PA_COMMAND_REPLY) {
1631         if (command == PA_COMMAND_ERROR)
1632             pa_log("Failed to create stream.");
1633         else
1634             pa_log("Protocol error.");
1635         goto fail;
1636     }
1637 
1638     if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1639         pa_tagstruct_getu32(t, &u->device_index) < 0
1640 #ifdef TUNNEL_SINK
1641         || pa_tagstruct_getu32(t, &bytes) < 0
1642 #endif
1643         )
1644         goto parse_error;
1645 
1646     if (u->version >= 9) {
1647 #ifdef TUNNEL_SINK
1648         if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1649             pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1650             pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1651             pa_tagstruct_getu32(t, &u->minreq) < 0)
1652             goto parse_error;
1653 #else
1654         if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1655             pa_tagstruct_getu32(t, &u->fragsize) < 0)
1656             goto parse_error;
1657 #endif
1658     }
1659 
1660     if (u->version >= 12) {
1661         pa_sample_spec ss;
1662         pa_channel_map cm;
1663         uint32_t device_index;
1664         const char *dn;
1665         bool suspended;
1666 
1667         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1668             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1669             pa_tagstruct_getu32(t, &device_index) < 0 ||
1670             pa_tagstruct_gets(t, &dn) < 0 ||
1671             pa_tagstruct_get_boolean(t, &suspended) < 0)
1672             goto parse_error;
1673 
1674 #ifdef TUNNEL_SINK
1675         pa_xfree(u->sink_name);
1676         u->sink_name = pa_xstrdup(dn);
1677 #else
1678         pa_xfree(u->source_name);
1679         u->source_name = pa_xstrdup(dn);
1680 #endif
1681     }
1682 
1683     if (u->version >= 13) {
1684         pa_usec_t usec;
1685 
1686         if (pa_tagstruct_get_usec(t, &usec) < 0)
1687             goto parse_error;
1688 
1689 /* #ifdef TUNNEL_SINK */
1690 /*         pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1691 /* #else */
1692 /*         pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1693 /* #endif */
1694     }
1695 
1696     if (u->version >= 21) {
1697         pa_format_info *format = pa_format_info_new();
1698 
1699         if (pa_tagstruct_get_format_info(t, format) < 0) {
1700             pa_format_info_free(format);
1701             goto parse_error;
1702         }
1703 
1704         pa_format_info_free(format);
1705     }
1706 
1707     if (!pa_tagstruct_eof(t))
1708         goto parse_error;
1709 
1710     start_subscribe(u);
1711     request_info(u);
1712 
1713     pa_assert(!u->time_event);
1714     u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + LATENCY_INTERVAL, timeout_callback, u);
1715 
1716     request_latency(u);
1717 
1718     pa_log_debug("Stream created.");
1719 
1720 #ifdef TUNNEL_SINK
1721     pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1722 #endif
1723 
1724     return;
1725 
1726 parse_error:
1727     pa_log("Invalid reply. (Create stream)");
1728 
1729 fail:
1730     unload_module(u->module->userdata);
1731 
1732 }
1733 
1734 /* Called from main context */
setup_complete_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1735 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1736     struct userdata *u = userdata;
1737     pa_tagstruct *reply;
1738     char name[256], un[128], hn[128];
1739     pa_cvolume volume;
1740 
1741     pa_assert(pd);
1742     pa_assert(u);
1743     pa_assert(u->pdispatch == pd);
1744 
1745     if (command != PA_COMMAND_REPLY ||
1746         pa_tagstruct_getu32(t, &u->version) < 0 ||
1747         !pa_tagstruct_eof(t)) {
1748 
1749         if (command == PA_COMMAND_ERROR)
1750             pa_log("Failed to authenticate");
1751         else
1752             pa_log("Protocol error.");
1753 
1754         goto fail;
1755     }
1756 
1757     /* Minimum supported protocol version */
1758     if (u->version < 8) {
1759         pa_log("Incompatible protocol version");
1760         goto fail;
1761     }
1762 
1763     /* Starting with protocol version 13 the MSB of the version tag
1764     reflects if shm is enabled for this connection or not. We don't
1765     support SHM here at all, so we just ignore this. */
1766 
1767     if (u->version >= 13)
1768         u->version &= 0x7FFFFFFFU;
1769 
1770     pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1771 
1772 #ifdef TUNNEL_SINK
1773     pa_proplist_setf(u->sink->proplist, "tunnel.remote_version", "%u", u->version);
1774     pa_sink_update_proplist(u->sink, 0, NULL);
1775 
1776     pa_snprintf(name, sizeof(name), "%s for %s@%s",
1777                 u->sink_name,
1778                 pa_get_user_name(un, sizeof(un)),
1779                 pa_get_host_name(hn, sizeof(hn)));
1780 #else
1781     pa_proplist_setf(u->source->proplist, "tunnel.remote_version", "%u", u->version);
1782     pa_source_update_proplist(u->source, 0, NULL);
1783 
1784     pa_snprintf(name, sizeof(name), "%s for %s@%s",
1785                 u->source_name,
1786                 pa_get_user_name(un, sizeof(un)),
1787                 pa_get_host_name(hn, sizeof(hn)));
1788 #endif
1789 
1790     reply = pa_tagstruct_new();
1791     pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1792     pa_tagstruct_putu32(reply, u->ctag++);
1793 
1794     if (u->version >= 13) {
1795         pa_proplist *pl;
1796         pl = pa_proplist_new();
1797         pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1798         pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1799         pa_init_proplist(pl);
1800         pa_tagstruct_put_proplist(reply, pl);
1801         pa_proplist_free(pl);
1802     } else
1803         pa_tagstruct_puts(reply, "PulseAudio");
1804 
1805     pa_pstream_send_tagstruct(u->pstream, reply);
1806     /* We ignore the server's reply here */
1807 
1808     reply = pa_tagstruct_new();
1809 
1810     if (u->version < 13)
1811         /* Only for older PA versions we need to fill in the maxlength */
1812         u->maxlength = 4*1024*1024;
1813 
1814 #ifdef TUNNEL_SINK
1815     u->tlength = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * u->latency, &u->sink->sample_spec);
1816     u->minreq = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * u->latency / 4, &u->sink->sample_spec);
1817     u->prebuf = u->tlength;
1818 #else
1819     u->fragsize = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * u->latency, &u->source->sample_spec);
1820 #endif
1821 
1822 #ifdef TUNNEL_SINK
1823     pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1824     pa_tagstruct_putu32(reply, tag = u->ctag++);
1825 
1826     if (u->version < 13)
1827         pa_tagstruct_puts(reply, name);
1828 
1829     pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1830     pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1831     pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1832     pa_tagstruct_puts(reply, u->sink_name);
1833     pa_tagstruct_putu32(reply, u->maxlength);
1834     pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(u->sink->state));
1835     pa_tagstruct_putu32(reply, u->tlength);
1836     pa_tagstruct_putu32(reply, u->prebuf);
1837     pa_tagstruct_putu32(reply, u->minreq);
1838     pa_tagstruct_putu32(reply, 0);
1839     pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1840     pa_tagstruct_put_cvolume(reply, &volume);
1841 #else
1842     pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1843     pa_tagstruct_putu32(reply, tag = u->ctag++);
1844 
1845     if (u->version < 13)
1846         pa_tagstruct_puts(reply, name);
1847 
1848     pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1849     pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1850     pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1851     pa_tagstruct_puts(reply, u->source_name);
1852     pa_tagstruct_putu32(reply, u->maxlength);
1853     pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(u->source->state));
1854     pa_tagstruct_putu32(reply, u->fragsize);
1855 #endif
1856 
1857     if (u->version >= 12) {
1858         pa_tagstruct_put_boolean(reply, false); /* no_remap */
1859         pa_tagstruct_put_boolean(reply, false); /* no_remix */
1860         pa_tagstruct_put_boolean(reply, false); /* fix_format */
1861         pa_tagstruct_put_boolean(reply, false); /* fix_rate */
1862         pa_tagstruct_put_boolean(reply, false); /* fix_channels */
1863         pa_tagstruct_put_boolean(reply, true); /* no_move */
1864         pa_tagstruct_put_boolean(reply, false); /* variable_rate */
1865     }
1866 
1867     if (u->version >= 13) {
1868         pa_proplist *pl;
1869 
1870         pa_tagstruct_put_boolean(reply, false); /* start muted/peak detect*/
1871         pa_tagstruct_put_boolean(reply, true); /* adjust_latency */
1872 
1873         pl = pa_proplist_new();
1874         pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1875         pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1876         pa_tagstruct_put_proplist(reply, pl);
1877         pa_proplist_free(pl);
1878 
1879 #ifndef TUNNEL_SINK
1880         pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1881 #endif
1882     }
1883 
1884     if (u->version >= 14) {
1885 #ifdef TUNNEL_SINK
1886         pa_tagstruct_put_boolean(reply, false); /* volume_set */
1887 #endif
1888         pa_tagstruct_put_boolean(reply, true); /* early rquests */
1889     }
1890 
1891     if (u->version >= 15) {
1892 #ifdef TUNNEL_SINK
1893         pa_tagstruct_put_boolean(reply, false); /* muted_set */
1894 #endif
1895         pa_tagstruct_put_boolean(reply, false); /* don't inhibit auto suspend */
1896         pa_tagstruct_put_boolean(reply, false); /* fail on suspend */
1897     }
1898 
1899 #ifdef TUNNEL_SINK
1900     if (u->version >= 17)
1901         pa_tagstruct_put_boolean(reply, false); /* relative volume */
1902 
1903     if (u->version >= 18)
1904         pa_tagstruct_put_boolean(reply, false); /* passthrough stream */
1905 #endif
1906 
1907 #ifdef TUNNEL_SINK
1908     if (u->version >= 21) {
1909         /* We're not using the extended API, so n_formats = 0 and that's that */
1910         pa_tagstruct_putu8(reply, 0);
1911     }
1912 #else
1913     if (u->version >= 22) {
1914         /* We're not using the extended API, so n_formats = 0 and that's that */
1915         pa_tagstruct_putu8(reply, 0);
1916         pa_cvolume_reset(&volume, u->source->sample_spec.channels);
1917         pa_tagstruct_put_cvolume(reply, &volume);
1918         pa_tagstruct_put_boolean(reply, false); /* muted */
1919         pa_tagstruct_put_boolean(reply, false); /* volume_set */
1920         pa_tagstruct_put_boolean(reply, false); /* muted_set */
1921         pa_tagstruct_put_boolean(reply, false); /* relative volume */
1922         pa_tagstruct_put_boolean(reply, false); /* passthrough stream */
1923     }
1924 #endif
1925 
1926     pa_pstream_send_tagstruct(u->pstream, reply);
1927     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1928 
1929     pa_log_debug("Connection authenticated, creating stream ...");
1930 
1931     return;
1932 
1933 fail:
1934     unload_module(u->module->userdata);
1935 }
1936 
1937 /* Called from main context */
pstream_die_callback(pa_pstream * p,void * userdata)1938 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1939     struct userdata *u = userdata;
1940 
1941     pa_assert(p);
1942     pa_assert(u);
1943 
1944     pa_log_warn("Stream died.");
1945     unload_module(u->module->userdata);
1946 }
1947 
1948 /* Called from main context */
pstream_packet_callback(pa_pstream * p,pa_packet * packet,pa_cmsg_ancil_data * ancil_data,void * userdata)1949 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, pa_cmsg_ancil_data *ancil_data, void *userdata) {
1950     struct userdata *u = userdata;
1951 
1952     pa_assert(p);
1953     pa_assert(packet);
1954     pa_assert(u);
1955 
1956     if (pa_pdispatch_run(u->pdispatch, packet, ancil_data, u) < 0) {
1957         pa_log("Invalid packet");
1958         unload_module(u->module->userdata);
1959         return;
1960     }
1961 }
1962 
1963 #ifndef TUNNEL_SINK
1964 /* Called from main context */
pstream_memblock_callback(pa_pstream * p,uint32_t channel,int64_t offset,pa_seek_mode_t seek,const pa_memchunk * chunk,void * userdata)1965 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1966     struct userdata *u = userdata;
1967 
1968     pa_assert(p);
1969     pa_assert(chunk);
1970     pa_assert(u);
1971 
1972     if (channel != u->channel) {
1973         pa_log("Received memory block on bad channel.");
1974         unload_module(u->module->userdata);
1975         return;
1976     }
1977 
1978     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1979 
1980     u->receive_counter += chunk->length;
1981 }
1982 #endif
1983 
1984 /* Called from main context */
on_connection(pa_socket_client * sc,pa_iochannel * io,void * userdata)1985 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1986     struct userdata *u = userdata;
1987 
1988     pa_assert_ctl_context();
1989 
1990     pa_assert(sc);
1991     pa_assert(u);
1992     pa_assert(u->client == sc);
1993 
1994     pa_socket_client_unref(u->client);
1995     u->client = NULL;
1996 
1997     if (!io) {
1998         pa_log("Connection failed: %s", pa_cstrerror(errno));
1999         unload_module(u->module->userdata);
2000         return;
2001     }
2002 
2003     u->io = io;
2004 
2005 #ifdef TUNNEL_SINK
2006     create_sink(u);
2007     if (!u->sink) {
2008         unload_module(u->module->userdata);
2009         return;
2010     }
2011     on_sink_created(u);
2012 #else
2013     create_source(u);
2014     if (!u->source) {
2015         unload_module(u->module->userdata);
2016         return;
2017     }
2018     on_source_created(u);
2019 #endif
2020 }
2021 
2022 #ifdef TUNNEL_SINK
on_sink_created(struct userdata * u)2023 static void on_sink_created(struct userdata *u)
2024 #else
2025 static void on_source_created(struct userdata *u)
2026 #endif
2027 {
2028     pa_tagstruct *t;
2029     uint32_t tag;
2030 
2031     u->pstream = pa_pstream_new(u->core->mainloop, u->io, u->core->mempool);
2032     u->pdispatch = pa_pdispatch_new(u->core->mainloop, true, command_table, PA_COMMAND_MAX);
2033 
2034     pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
2035     pa_pstream_set_receive_packet_callback(u->pstream, pstream_packet_callback, u);
2036 #ifndef TUNNEL_SINK
2037     pa_pstream_set_receive_memblock_callback(u->pstream, pstream_memblock_callback, u);
2038 #endif
2039 
2040     t = pa_tagstruct_new();
2041     pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
2042     pa_tagstruct_putu32(t, tag = u->ctag++);
2043     pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
2044 
2045     pa_tagstruct_put_arbitrary(t, pa_auth_cookie_read(u->auth_cookie, PA_NATIVE_COOKIE_LENGTH), PA_NATIVE_COOKIE_LENGTH);
2046 
2047 #ifdef HAVE_CREDS
2048 {
2049     pa_creds ucred;
2050 
2051     if (pa_iochannel_creds_supported(u->io))
2052         pa_iochannel_creds_enable(u->io);
2053 
2054     ucred.uid = getuid();
2055     ucred.gid = getgid();
2056 
2057     pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
2058 }
2059 #else
2060     pa_pstream_send_tagstruct(u->pstream, t);
2061 #endif
2062 
2063     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
2064 
2065     pa_log_debug("Connection established, authenticating ...");
2066 }
2067 
2068 #ifdef TUNNEL_SINK
2069 
2070 /* Called from main context */
sink_set_volume(pa_sink * sink)2071 static void sink_set_volume(pa_sink *sink) {
2072     struct userdata *u;
2073     pa_tagstruct *t;
2074 
2075     pa_assert(sink);
2076     u = sink->userdata;
2077     pa_assert(u);
2078 
2079     t = pa_tagstruct_new();
2080     pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
2081     pa_tagstruct_putu32(t, u->ctag++);
2082     pa_tagstruct_putu32(t, u->device_index);
2083     pa_tagstruct_put_cvolume(t, &sink->real_volume);
2084     pa_pstream_send_tagstruct(u->pstream, t);
2085 }
2086 
2087 /* Called from main context */
sink_set_mute(pa_sink * sink)2088 static void sink_set_mute(pa_sink *sink) {
2089     struct userdata *u;
2090     pa_tagstruct *t;
2091 
2092     pa_assert(sink);
2093     u = sink->userdata;
2094     pa_assert(u);
2095 
2096     if (u->version < 11)
2097         return;
2098 
2099     t = pa_tagstruct_new();
2100     pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
2101     pa_tagstruct_putu32(t, u->ctag++);
2102     pa_tagstruct_putu32(t, u->device_index);
2103     pa_tagstruct_put_boolean(t, sink->muted);
2104     pa_pstream_send_tagstruct(u->pstream, t);
2105 }
2106 
2107 #endif
2108 
2109 #ifdef TUNNEL_SINK
create_sink(struct userdata * u)2110 static void create_sink(struct userdata *u) {
2111     pa_sink_new_data data;
2112     char *data_name = NULL;
2113 
2114     if (!(data_name = pa_xstrdup(u->configured_sink_name)))
2115         data_name = pa_sprintf_malloc("tunnel-sink.%s", u->server_name);
2116 
2117     pa_sink_new_data_init(&data);
2118     data.driver = __FILE__;
2119     data.module = u->module;
2120     data.namereg_fail = false;
2121     pa_sink_new_data_set_name(&data, data_name);
2122     pa_sink_new_data_set_sample_spec(&data, &u->sample_spec);
2123     pa_sink_new_data_set_channel_map(&data, &u->channel_map);
2124     pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
2125     pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
2126     if (u->sink_name)
2127         pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
2128 
2129     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, u->sink_proplist);
2130 
2131     u->sink = pa_sink_new(u->module->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY);
2132 
2133     if (!u->sink) {
2134         pa_log("Failed to create sink.");
2135         goto finish;
2136     }
2137 
2138     u->sink->parent.process_msg = sink_process_msg;
2139     u->sink->userdata = u;
2140     u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
2141     pa_sink_set_set_volume_callback(u->sink, sink_set_volume);
2142     pa_sink_set_set_mute_callback(u->sink, sink_set_mute);
2143 
2144     u->sink->refresh_volume = u->sink->refresh_muted = false;
2145 
2146 /*     pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
2147 
2148     pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
2149     pa_sink_set_rtpoll(u->sink, u->rtpoll);
2150     pa_sink_set_fixed_latency(u->sink, u->latency * PA_USEC_PER_MSEC);
2151 
2152     pa_sink_put(u->sink);
2153 
2154 finish:
2155     pa_sink_new_data_done(&data);
2156     pa_xfree(data_name);
2157 }
2158 #else
create_source(struct userdata * u)2159 static void create_source(struct userdata *u) {
2160     pa_source_new_data data;
2161     char *data_name = NULL;
2162 
2163     if (!(data_name = pa_xstrdup(u->configured_source_name)))
2164         data_name = pa_sprintf_malloc("tunnel-source.%s", u->server_name);
2165 
2166     pa_source_new_data_init(&data);
2167     data.driver = __FILE__;
2168     data.module = u->module;
2169     data.namereg_fail = false;
2170     pa_source_new_data_set_name(&data, data_name);
2171     pa_source_new_data_set_sample_spec(&data, &u->sample_spec);
2172     pa_source_new_data_set_channel_map(&data, &u->channel_map);
2173     pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
2174     pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
2175     if (u->source_name)
2176         pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
2177 
2178     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, u->source_proplist);
2179 
2180     u->source = pa_source_new(u->module->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
2181 
2182     if (!u->source) {
2183         pa_log("Failed to create source.");
2184         goto finish;
2185     }
2186 
2187     u->source->parent.process_msg = source_process_msg;
2188     u->source->set_state_in_main_thread = source_set_state_in_main_thread_cb;
2189     u->source->userdata = u;
2190 
2191 /*     pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
2192 
2193     pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
2194     pa_source_set_rtpoll(u->source, u->rtpoll);
2195     pa_source_set_fixed_latency(u->source, u->latency * PA_USEC_PER_MSEC);
2196 
2197     u->mcalign = pa_mcalign_new(pa_frame_size(&u->source->sample_spec));
2198 
2199     pa_source_put(u->source);
2200 
2201 finish:
2202     pa_source_new_data_done(&data);
2203     pa_xfree(data_name);
2204 }
2205 #endif
2206 
2207 /* Runs in PA mainloop context */
tunnel_process_msg(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)2208 static int tunnel_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
2209     struct userdata *u = (struct userdata *) data;
2210 
2211     pa_assert(u);
2212     pa_assert_ctl_context();
2213 
2214     if (u->shutting_down)
2215         return 0;
2216 
2217     switch (code) {
2218 
2219         case TUNNEL_MESSAGE_MAYBE_RESTART:
2220             unload_module(u->module->userdata);
2221             break;
2222     }
2223 
2224     return 0;
2225 }
2226 
start_connect(struct userdata * u,char * server,bool automatic)2227 static int start_connect(struct userdata *u, char *server, bool automatic) {
2228     pa_strlist *server_list = NULL;
2229     int rc = 0;
2230 
2231     if (server) {
2232         if (!(server_list = pa_strlist_parse(server))) {
2233             pa_log("Invalid server specified.");
2234             rc = -1;
2235             goto done;
2236         }
2237     } else {
2238         char *ufn;
2239 
2240         if (!automatic) {
2241             pa_log("No server specified.");
2242             rc = -1;
2243             goto done;
2244         }
2245 
2246         pa_log("No server address found. Attempting default local sockets.");
2247 
2248         /* The system wide instance via PF_LOCAL */
2249         server_list = pa_strlist_prepend(server_list, PA_SYSTEM_RUNTIME_PATH PA_PATH_SEP PA_NATIVE_DEFAULT_UNIX_SOCKET);
2250 
2251         /* The user instance via PF_LOCAL */
2252         if ((ufn = pa_runtime_path(PA_NATIVE_DEFAULT_UNIX_SOCKET))) {
2253             server_list = pa_strlist_prepend(server_list, ufn);
2254             pa_xfree(ufn);
2255         }
2256     }
2257 
2258     for (;;) {
2259         server_list = pa_strlist_pop(server_list, &u->server_name);
2260 
2261         if (!u->server_name) {
2262             if (server)
2263                 pa_log("Failed to connect to server '%s'", server);
2264             else
2265                 pa_log("Failed to connect");
2266             rc = -1;
2267             goto done;
2268         }
2269 
2270         pa_log_debug("Trying to connect to %s...", u->server_name);
2271 
2272         if (!(u->client = pa_socket_client_new_string(u->module->core->mainloop, true, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
2273             pa_xfree(u->server_name);
2274             u->server_name = NULL;
2275             continue;
2276         }
2277 
2278         break;
2279     }
2280 
2281     if (u->client)
2282         pa_socket_client_set_callback(u->client, on_connection, u);
2283 
2284 done:
2285     pa_strlist_free(server_list);
2286 
2287     return rc;
2288 }
2289 
do_init(pa_module * m)2290 static int do_init(pa_module *m) {
2291     pa_modargs *ma = NULL;
2292     struct userdata *u = NULL;
2293     struct module_restart_data *rd;
2294     char *server = NULL;
2295     uint32_t latency_msec;
2296     bool automatic;
2297 #ifdef HAVE_X11
2298     xcb_connection_t *xcb = NULL;
2299 #endif
2300     const char *cookie_path;
2301     uint32_t reconnect_interval_ms = 0;
2302 
2303     pa_assert(m);
2304     pa_assert(m->userdata);
2305 
2306     rd = m->userdata;
2307 
2308     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
2309         pa_log("Failed to parse module arguments");
2310         goto fail;
2311     }
2312 
2313     rd->userdata = u = pa_xnew0(struct userdata, 1);
2314     u->core = m->core;
2315     u->module = m;
2316     u->client = NULL;
2317     u->pdispatch = NULL;
2318     u->pstream = NULL;
2319     u->server_name = NULL;
2320 #ifdef TUNNEL_SINK
2321     u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
2322     u->configured_sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL));
2323     u->sink = NULL;
2324     u->requested_bytes = 0;
2325 #else
2326     u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
2327     u->configured_source_name = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL));
2328     u->source = NULL;
2329 #endif
2330 #ifndef USE_SMOOTHER_2
2331     u->smoother = pa_smoother_new(
2332             PA_USEC_PER_SEC,
2333             PA_USEC_PER_SEC*2,
2334             true,
2335             true,
2336             10,
2337             pa_rtclock_now(),
2338             false);
2339 #endif
2340     u->ctag = 1;
2341     u->device_index = u->channel = PA_INVALID_INDEX;
2342     u->time_event = NULL;
2343     u->ignore_latency_before = 0;
2344     u->transport_usec = u->thread_transport_usec = 0;
2345     u->remote_suspended = u->remote_corked = false;
2346     u->counter = 0;
2347     u->receive_snapshot = 0;
2348     u->receive_counter = 0;
2349 
2350     u->msg = pa_msgobject_new(tunnel_msg);
2351     u->msg->parent.process_msg = tunnel_process_msg;
2352 
2353     u->rtpoll = pa_rtpoll_new();
2354 
2355     if (pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll) < 0) {
2356         pa_log("pa_thread_mq_init() failed.");
2357         goto fail;
2358     }
2359 
2360     automatic = false;
2361     if (pa_modargs_get_value_boolean(ma, "auto", &automatic) < 0) {
2362         pa_log("Failed to parse argument \"auto\".");
2363         goto fail;
2364     }
2365 
2366     /* Allow latencies between 5ms and 500ms */
2367     latency_msec = DEFAULT_LATENCY_MSEC;
2368     if (pa_modargs_get_value_u32(ma, "latency_msec", &latency_msec) < 0 || latency_msec < 5 || latency_msec > 500) {
2369         pa_log("Invalid latency specification");
2370         goto fail;
2371     }
2372 
2373     u->latency = latency_msec;
2374 
2375     cookie_path = pa_modargs_get_value(ma, "cookie", NULL);
2376     server = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL));
2377 
2378     if (automatic) {
2379 #ifdef HAVE_X11
2380         /* Need an X11 connection to get root properties */
2381         if (getenv("DISPLAY") != NULL) {
2382             if (!(xcb = xcb_connect(getenv("DISPLAY"), NULL)))
2383                 pa_log("xcb_connect() failed");
2384             else {
2385                 if (xcb_connection_has_error(xcb)) {
2386                     pa_log("xcb_connection_has_error() returned true");
2387                     xcb_disconnect(xcb);
2388                     xcb = NULL;
2389                 }
2390             }
2391         }
2392 #endif
2393 
2394         /* Figure out the cookie the same way a normal client would */
2395         if (!cookie_path)
2396             cookie_path = getenv(ENV_COOKIE_FILE);
2397 
2398 #ifdef HAVE_X11
2399         if (!cookie_path && xcb) {
2400             char t[1024];
2401             if (pa_x11_get_prop(xcb, 0, "PULSE_COOKIE", t, sizeof(t))) {
2402                 uint8_t cookie[PA_NATIVE_COOKIE_LENGTH];
2403 
2404                 if (pa_parsehex(t, cookie, sizeof(cookie)) != sizeof(cookie))
2405                     pa_log("Failed to parse cookie data");
2406                 else {
2407                     if (!(u->auth_cookie = pa_auth_cookie_create(u->core, cookie, sizeof(cookie))))
2408                         goto fail;
2409                 }
2410             }
2411         }
2412 #endif
2413 
2414         /* Same thing for the server name */
2415         if (!server)
2416             server = pa_xstrdup(getenv(ENV_DEFAULT_SERVER));
2417 
2418 #ifdef HAVE_X11
2419         if (!server && xcb) {
2420             char t[1024];
2421             if (pa_x11_get_prop(xcb, 0, "PULSE_SERVER", t, sizeof(t)))
2422                 server = pa_xstrdup(t);
2423         }
2424 #endif
2425 
2426         /* Also determine the default sink/source on the other server */
2427 #ifdef TUNNEL_SINK
2428         if (!u->sink_name)
2429             u->sink_name = pa_xstrdup(getenv(ENV_DEFAULT_SINK));
2430 
2431 #ifdef HAVE_X11
2432         if (!u->sink_name && xcb) {
2433             char t[1024];
2434             if (pa_x11_get_prop(xcb, 0, "PULSE_SINK", t, sizeof(t)))
2435                 u->sink_name = pa_xstrdup(t);
2436         }
2437 #endif
2438 #else
2439         if (!u->source_name)
2440             u->source_name = pa_xstrdup(getenv(ENV_DEFAULT_SOURCE));
2441 
2442 #ifdef HAVE_X11
2443         if (!u->source_name && xcb) {
2444             char t[1024];
2445             if (pa_x11_get_prop(xcb, 0, "PULSE_SOURCE", t, sizeof(t)))
2446                 u->source_name = pa_xstrdup(t);
2447         }
2448 #endif
2449 #endif
2450     }
2451 
2452     if (!cookie_path && !u->auth_cookie)
2453         cookie_path = PA_NATIVE_COOKIE_FILE;
2454 
2455     if (cookie_path) {
2456         if (!(u->auth_cookie = pa_auth_cookie_get(u->core, cookie_path, true, PA_NATIVE_COOKIE_LENGTH)))
2457             goto fail;
2458     }
2459 
2460     u->sample_spec = m->core->default_sample_spec;
2461     u->channel_map = m->core->default_channel_map;
2462     if (pa_modargs_get_sample_spec_and_channel_map(ma, &u->sample_spec, &u->channel_map, PA_CHANNEL_MAP_DEFAULT) < 0) {
2463         pa_log("Invalid sample format specification");
2464         goto fail;
2465     }
2466 
2467 #ifdef USE_SMOOTHER_2
2468     /* Smoother window must be larger than time between updates. */
2469     u->smoother = pa_smoother_2_new(LATENCY_INTERVAL + 5*PA_USEC_PER_SEC, pa_rtclock_now(), pa_frame_size(&u->sample_spec), u->sample_spec.rate);
2470 #endif
2471 
2472     pa_modargs_get_value_u32(ma, "reconnect_interval_ms", &reconnect_interval_ms);
2473     u->reconnect_interval_us = reconnect_interval_ms * PA_USEC_PER_MSEC;
2474 
2475 #ifdef TUNNEL_SINK
2476 
2477     u->sink_proplist = pa_proplist_new();
2478     if (pa_modargs_get_proplist(ma, "sink_properties", u->sink_proplist, PA_UPDATE_REPLACE) < 0) {
2479         pa_log("Invalid properties");
2480         goto fail;
2481     }
2482 
2483 #else
2484 
2485     u->source_proplist = pa_proplist_new();
2486     if (pa_modargs_get_proplist(ma, "source_properties", u->source_proplist, PA_UPDATE_REPLACE) < 0) {
2487         pa_log("Invalid properties");
2488         goto fail;
2489     }
2490 
2491 #endif
2492 
2493     u->time_event = NULL;
2494 
2495     u->maxlength = (uint32_t) -1;
2496 #ifdef TUNNEL_SINK
2497     u->tlength = u->minreq = u->prebuf = (uint32_t) -1;
2498 #else
2499     u->fragsize = (uint32_t) -1;
2500 #endif
2501 
2502     if (start_connect(u, server, automatic) < 0) {
2503         goto fail;
2504     }
2505 
2506     if (!(u->thread = pa_thread_new("module-tunnel", thread_func, u))) {
2507         pa_log("Failed to create thread.");
2508         goto fail;
2509     }
2510 
2511     if (server)
2512         pa_xfree(server);
2513 
2514 #ifdef HAVE_X11
2515     if (xcb)
2516         xcb_disconnect(xcb);
2517 #endif
2518 
2519     /* If the module is restarting and do_init() finishes successfully, the
2520      * restart data is no longer needed. If do_init() fails, don't touch the
2521      * restart data, because following restart attempts will continue to use
2522      * the same data. If restart_data is NULL, that means no restart is
2523      * currently pending. */
2524     if (rd->restart_data) {
2525         pa_restart_free(rd->restart_data);
2526         rd->restart_data = NULL;
2527     }
2528 
2529     pa_modargs_free(ma);
2530 
2531     return 0;
2532 
2533 fail:
2534     if (server)
2535         pa_xfree(server);
2536 
2537 #ifdef HAVE_X11
2538     if (xcb)
2539         xcb_disconnect(xcb);
2540 #endif
2541 
2542     if (ma)
2543         pa_modargs_free(ma);
2544 
2545     return -1;
2546 }
2547 
do_done(pa_module * m)2548 static void do_done(pa_module *m) {
2549     struct userdata *u = NULL;
2550     struct module_restart_data *rd;
2551 
2552     pa_assert(m);
2553 
2554     if (!(rd = m->userdata))
2555         return;
2556     if (!(u = rd->userdata))
2557         return;
2558 
2559     u->shutting_down = true;
2560 
2561 #ifdef TUNNEL_SINK
2562     if (u->sink)
2563         pa_sink_unlink(u->sink);
2564 #else
2565     if (u->source)
2566         pa_source_unlink(u->source);
2567 #endif
2568 
2569     if (u->thread) {
2570         pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
2571         pa_thread_free(u->thread);
2572     }
2573 
2574     pa_thread_mq_done(&u->thread_mq);
2575 
2576 #ifdef TUNNEL_SINK
2577     if (u->sink)
2578         pa_sink_unref(u->sink);
2579 #else
2580     if (u->source)
2581         pa_source_unref(u->source);
2582 #endif
2583 
2584     if (u->rtpoll)
2585         pa_rtpoll_free(u->rtpoll);
2586 
2587     if (u->pstream) {
2588         pa_pstream_unlink(u->pstream);
2589         pa_pstream_unref(u->pstream);
2590     }
2591 
2592     if (u->pdispatch)
2593         pa_pdispatch_unref(u->pdispatch);
2594 
2595     if (u->client)
2596         pa_socket_client_unref(u->client);
2597 
2598     if (u->auth_cookie)
2599         pa_auth_cookie_unref(u->auth_cookie);
2600 
2601     if (u->smoother)
2602 #ifdef USE_SMOOTHER_2
2603         pa_smoother_2_free(u->smoother);
2604 #else
2605         pa_smoother_free(u->smoother);
2606 #endif
2607 
2608     if (u->time_event)
2609         u->core->mainloop->time_free(u->time_event);
2610 
2611 #ifndef TUNNEL_SINK
2612     if (u->mcalign)
2613         pa_mcalign_free(u->mcalign);
2614 #endif
2615 
2616 #ifdef TUNNEL_SINK
2617     pa_xfree(u->sink_name);
2618     pa_xfree(u->configured_sink_name);
2619     pa_proplist_free(u->sink_proplist);
2620 #else
2621     pa_xfree(u->source_name);
2622     pa_xfree(u->configured_source_name);
2623     pa_proplist_free(u->source_proplist);
2624 #endif
2625     pa_xfree(u->server_name);
2626 
2627     pa_xfree(u->device_description);
2628     pa_xfree(u->server_fqdn);
2629     pa_xfree(u->user_name);
2630 
2631     pa_xfree(u->msg);
2632 
2633     pa_xfree(u);
2634 
2635     rd->userdata = NULL;
2636 }
2637 
pa__init(pa_module * m)2638 int pa__init(pa_module *m) {
2639     int ret;
2640 
2641     pa_assert(m);
2642 
2643     m->userdata = pa_xnew0(struct module_restart_data, 1);
2644 
2645     ret = do_init(m);
2646 
2647     if (ret < 0)
2648         pa__done(m);
2649 
2650     return ret;
2651 }
2652 
pa__done(pa_module * m)2653 void pa__done(pa_module *m) {
2654     pa_assert(m);
2655 
2656     do_done(m);
2657 
2658     if (m->userdata) {
2659         struct module_restart_data *rd = m->userdata;
2660 
2661         if (rd->restart_data)
2662             pa_restart_free(rd->restart_data);
2663 
2664         pa_xfree(m->userdata);
2665     }
2666 }
2667