• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /***
2   This file is part of PulseAudio.
3 
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6 
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11 
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16 
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20 
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24 
25 #include <unistd.h>
26 #include <string.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <stdio.h>
30 #include <stdlib.h>
31 
32 #ifdef HAVE_X11
33 #include <xcb/xcb.h>
34 #endif
35 
36 #include <pulse/rtclock.h>
37 #include <pulse/timeval.h>
38 #include <pulse/util.h>
39 #include <pulse/version.h>
40 #include <pulse/xmalloc.h>
41 
42 #include <pulsecore/module.h>
43 #include <pulsecore/core-util.h>
44 #include <pulsecore/modargs.h>
45 #include <pulsecore/log.h>
46 #include <pulsecore/core-subscribe.h>
47 #include <pulsecore/pdispatch.h>
48 #include <pulsecore/pstream.h>
49 #include <pulsecore/pstream-util.h>
50 #include <pulsecore/socket-client.h>
51 #include <pulsecore/time-smoother.h>
52 #include <pulsecore/thread.h>
53 #include <pulsecore/thread-mq.h>
54 #include <pulsecore/core-rtclock.h>
55 #include <pulsecore/core-error.h>
56 #include <pulsecore/proplist-util.h>
57 #include <pulsecore/auth-cookie.h>
58 #include <pulsecore/mcalign.h>
59 #include <pulsecore/strlist.h>
60 
61 #ifdef HAVE_X11
62 #include <pulsecore/x11prop.h>
63 #endif
64 
65 #define ENV_DEFAULT_SINK "PULSE_SINK"
66 #define ENV_DEFAULT_SOURCE "PULSE_SOURCE"
67 #define ENV_DEFAULT_SERVER "PULSE_SERVER"
68 #define ENV_COOKIE_FILE "PULSE_COOKIE"
69 
70 #ifdef TUNNEL_SINK
71 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
72 PA_MODULE_USAGE(
73         "sink_name=<name for the local sink> "
74         "sink_properties=<properties for the local sink> "
75         "auto=<determine server/sink/cookie automatically> "
76         "server=<address> "
77         "sink=<remote sink name> "
78         "cookie=<filename> "
79         "format=<sample format> "
80         "channels=<number of channels> "
81         "rate=<sample rate> "
82         "channel_map=<channel map>");
83 #else
84 PA_MODULE_DESCRIPTION("Tunnel module for sources");
85 PA_MODULE_USAGE(
86         "source_name=<name for the local source> "
87         "source_properties=<properties for the local source> "
88         "auto=<determine server/source/cookie automatically> "
89         "server=<address> "
90         "source=<remote source name> "
91         "cookie=<filename> "
92         "format=<sample format> "
93         "channels=<number of channels> "
94         "rate=<sample rate> "
95         "channel_map=<channel map>");
96 #endif
97 
98 PA_MODULE_AUTHOR("Lennart Poettering");
99 PA_MODULE_VERSION(PACKAGE_VERSION);
100 PA_MODULE_LOAD_ONCE(false);
101 
102 static const char* const valid_modargs[] = {
103     "auto",
104     "server",
105     "cookie",
106     "format",
107     "channels",
108     "rate",
109 #ifdef TUNNEL_SINK
110     "sink_name",
111     "sink_properties",
112     "sink",
113 #else
114     "source_name",
115     "source_properties",
116     "source",
117 #endif
118     "channel_map",
119     NULL,
120 };
121 
122 #define DEFAULT_TIMEOUT 5
123 
124 #define LATENCY_INTERVAL (10*PA_USEC_PER_SEC)
125 
126 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
127 
128 #ifdef TUNNEL_SINK
129 
130 enum {
131     SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
132     SINK_MESSAGE_REMOTE_SUSPEND,
133     SINK_MESSAGE_UPDATE_LATENCY,
134     SINK_MESSAGE_POST
135 };
136 
137 #define DEFAULT_TLENGTH_MSEC 150
138 #define DEFAULT_MINREQ_MSEC 25
139 
140 #else
141 
142 enum {
143     SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
144     SOURCE_MESSAGE_REMOTE_SUSPEND,
145     SOURCE_MESSAGE_UPDATE_LATENCY
146 };
147 
148 #define DEFAULT_FRAGSIZE_MSEC 25
149 
150 #endif
151 
152 #ifdef TUNNEL_SINK
153 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
154 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
155 #endif
156 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
157 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
158 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
159 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
160 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
161 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
162 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
163 
164 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
165 #ifdef TUNNEL_SINK
166     [PA_COMMAND_REQUEST] = command_request,
167     [PA_COMMAND_STARTED] = command_started,
168 #endif
169     [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
170     [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
171     [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
172     [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
173     [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
174     [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
175     [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
176     [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
177     [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
178     [PA_COMMAND_PLAYBACK_STREAM_EVENT] = command_stream_or_client_event,
179     [PA_COMMAND_RECORD_STREAM_EVENT] = command_stream_or_client_event,
180     [PA_COMMAND_CLIENT_EVENT] = command_stream_or_client_event,
181     [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed,
182     [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed
183 };
184 
185 struct userdata {
186     pa_core *core;
187     pa_module *module;
188 
189     pa_thread_mq thread_mq;
190     pa_rtpoll *rtpoll;
191     pa_thread *thread;
192 
193     pa_socket_client *client;
194     pa_pstream *pstream;
195     pa_pdispatch *pdispatch;
196 
197     char *server_name;
198 #ifdef TUNNEL_SINK
199     char *sink_name;
200     pa_sink *sink;
201     size_t requested_bytes;
202 #else
203     char *source_name;
204     pa_source *source;
205     pa_mcalign *mcalign;
206 #endif
207 
208     pa_auth_cookie *auth_cookie;
209 
210     uint32_t version;
211     uint32_t ctag;
212     uint32_t device_index;
213     uint32_t channel;
214 
215     int64_t counter, counter_delta;
216 
217     bool remote_corked:1;
218     bool remote_suspended:1;
219 
220     pa_usec_t transport_usec; /* maintained in the main thread */
221     pa_usec_t thread_transport_usec; /* maintained in the IO thread */
222 
223     uint32_t ignore_latency_before;
224 
225     pa_time_event *time_event;
226 
227     pa_smoother *smoother;
228 
229     char *device_description;
230     char *server_fqdn;
231     char *user_name;
232 
233     uint32_t maxlength;
234 #ifdef TUNNEL_SINK
235     uint32_t tlength;
236     uint32_t minreq;
237     uint32_t prebuf;
238 #else
239     uint32_t fragsize;
240 #endif
241 };
242 
243 static void request_latency(struct userdata *u);
244 
245 /* Called from main context */
command_stream_or_client_event(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)246 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
247     pa_log_debug("Got stream or client event.");
248 }
249 
250 /* Called from main context */
command_stream_killed(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)251 static void command_stream_killed(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
252     struct userdata *u = userdata;
253 
254     pa_assert(pd);
255     pa_assert(t);
256     pa_assert(u);
257     pa_assert(u->pdispatch == pd);
258 
259     pa_log_warn("Stream killed");
260     pa_module_unload_request(u->module, true);
261 }
262 
263 /* Called from main context */
command_overflow_or_underflow(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)264 static void command_overflow_or_underflow(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
265     struct userdata *u = userdata;
266 
267     pa_assert(pd);
268     pa_assert(t);
269     pa_assert(u);
270     pa_assert(u->pdispatch == pd);
271 
272     pa_log_info("Server signalled buffer overrun/underrun.");
273     request_latency(u);
274 }
275 
276 /* Called from main context */
command_suspended(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)277 static void command_suspended(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
278     struct userdata *u = userdata;
279     uint32_t channel;
280     bool suspended;
281 
282     pa_assert(pd);
283     pa_assert(t);
284     pa_assert(u);
285     pa_assert(u->pdispatch == pd);
286 
287     if (pa_tagstruct_getu32(t, &channel) < 0 ||
288         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
289         !pa_tagstruct_eof(t)) {
290 
291         pa_log("Invalid packet.");
292         pa_module_unload_request(u->module, true);
293         return;
294     }
295 
296     pa_log_debug("Server reports device suspend.");
297 
298 #ifdef TUNNEL_SINK
299     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(suspended), 0, NULL);
300 #else
301     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(suspended), 0, NULL);
302 #endif
303 
304     request_latency(u);
305 }
306 
307 /* Called from main context */
command_moved(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)308 static void command_moved(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
309     struct userdata *u = userdata;
310     uint32_t channel, di;
311     const char *dn;
312     bool suspended;
313 
314     pa_assert(pd);
315     pa_assert(t);
316     pa_assert(u);
317     pa_assert(u->pdispatch == pd);
318 
319     if (pa_tagstruct_getu32(t, &channel) < 0 ||
320         pa_tagstruct_getu32(t, &di) < 0 ||
321         pa_tagstruct_gets(t, &dn) < 0 ||
322         pa_tagstruct_get_boolean(t, &suspended) < 0) {
323 
324         pa_log_error("Invalid packet.");
325         pa_module_unload_request(u->module, true);
326         return;
327     }
328 
329     pa_log_debug("Server reports a stream move.");
330 
331 #ifdef TUNNEL_SINK
332     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(suspended), 0, NULL);
333 #else
334     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(suspended), 0, NULL);
335 #endif
336 
337     request_latency(u);
338 }
339 
command_stream_buffer_attr_changed(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)340 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
341     struct userdata *u = userdata;
342     uint32_t channel, maxlength, tlength = 0, fragsize, prebuf, minreq;
343     pa_usec_t usec;
344 
345     pa_assert(pd);
346     pa_assert(t);
347     pa_assert(u);
348     pa_assert(u->pdispatch == pd);
349 
350     if (pa_tagstruct_getu32(t, &channel) < 0 ||
351         pa_tagstruct_getu32(t, &maxlength) < 0) {
352 
353         pa_log_error("Invalid packet.");
354         pa_module_unload_request(u->module, true);
355         return;
356     }
357 
358     if (command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED) {
359         if (pa_tagstruct_getu32(t, &fragsize) < 0 ||
360             pa_tagstruct_get_usec(t, &usec) < 0) {
361 
362             pa_log_error("Invalid packet.");
363             pa_module_unload_request(u->module, true);
364             return;
365         }
366     } else {
367         if (pa_tagstruct_getu32(t, &tlength) < 0 ||
368             pa_tagstruct_getu32(t, &prebuf) < 0 ||
369             pa_tagstruct_getu32(t, &minreq) < 0 ||
370             pa_tagstruct_get_usec(t, &usec) < 0) {
371 
372             pa_log_error("Invalid packet.");
373             pa_module_unload_request(u->module, true);
374             return;
375         }
376     }
377 
378 #ifdef TUNNEL_SINK
379     pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength, (unsigned long) u->tlength);
380 #endif
381 
382     request_latency(u);
383 }
384 
385 #ifdef TUNNEL_SINK
386 
387 /* Called from main context */
command_started(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)388 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
389     struct userdata *u = userdata;
390 
391     pa_assert(pd);
392     pa_assert(t);
393     pa_assert(u);
394     pa_assert(u->pdispatch == pd);
395 
396     pa_log_debug("Server reports playback started.");
397     request_latency(u);
398 }
399 
400 #endif
401 
402 /* Called from IO thread context */
check_smoother_status(struct userdata * u,bool past)403 static void check_smoother_status(struct userdata *u, bool past) {
404     pa_usec_t x;
405 
406     pa_assert(u);
407 
408     x = pa_rtclock_now();
409 
410     /* Correct by the time the requested issued needs to travel to the
411      * other side.  This is a valid thread-safe access, because the
412      * main thread is waiting for us */
413 
414     if (past)
415         x -= u->thread_transport_usec;
416     else
417         x += u->thread_transport_usec;
418 
419     if (u->remote_suspended || u->remote_corked)
420         pa_smoother_pause(u->smoother, x);
421     else
422         pa_smoother_resume(u->smoother, x, true);
423 }
424 
425 /* Called from IO thread context */
stream_cork_within_thread(struct userdata * u,bool cork)426 static void stream_cork_within_thread(struct userdata *u, bool cork) {
427     pa_assert(u);
428 
429     if (u->remote_corked == cork)
430         return;
431 
432     u->remote_corked = cork;
433     check_smoother_status(u, false);
434 }
435 
436 /* Called from main context */
stream_cork(struct userdata * u,bool cork)437 static void stream_cork(struct userdata *u, bool cork) {
438     pa_tagstruct *t;
439     pa_assert(u);
440 
441     if (!u->pstream)
442         return;
443 
444     t = pa_tagstruct_new();
445 #ifdef TUNNEL_SINK
446     pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
447 #else
448     pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
449 #endif
450     pa_tagstruct_putu32(t, u->ctag++);
451     pa_tagstruct_putu32(t, u->channel);
452     pa_tagstruct_put_boolean(t, cork);
453     pa_pstream_send_tagstruct(u->pstream, t);
454 
455     request_latency(u);
456 }
457 
458 /* Called from IO thread context */
stream_suspend_within_thread(struct userdata * u,bool suspend)459 static void stream_suspend_within_thread(struct userdata *u, bool suspend) {
460     pa_assert(u);
461 
462     if (u->remote_suspended == suspend)
463         return;
464 
465     u->remote_suspended = suspend;
466     check_smoother_status(u, true);
467 }
468 
469 #ifdef TUNNEL_SINK
470 
471 /* Called from IO thread context */
send_data(struct userdata * u)472 static void send_data(struct userdata *u) {
473     pa_assert(u);
474 
475     while (u->requested_bytes > 0) {
476         pa_memchunk memchunk;
477 
478         pa_sink_render(u->sink, u->requested_bytes, &memchunk);
479         pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
480         pa_memblock_unref(memchunk.memblock);
481 
482         u->requested_bytes -= memchunk.length;
483 
484         u->counter += (int64_t) memchunk.length;
485     }
486 }
487 
488 /* This function is called from IO context -- except when it is not. */
sink_process_msg(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)489 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
490     struct userdata *u = PA_SINK(o)->userdata;
491 
492     switch (code) {
493 
494         case PA_SINK_MESSAGE_SET_STATE: {
495             int r;
496 
497             /* First, change the state, because otherwise pa_sink_render() would fail */
498             if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
499 
500                 stream_cork_within_thread(u, u->sink->thread_info.state == PA_SINK_SUSPENDED);
501 
502                 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
503                     send_data(u);
504             }
505 
506             return r;
507         }
508 
509         case PA_SINK_MESSAGE_GET_LATENCY: {
510             pa_usec_t yl, yr;
511             int64_t *usec = data;
512 
513             yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
514             yr = pa_smoother_get(u->smoother, pa_rtclock_now());
515 
516             *usec = (int64_t)yl - yr;
517             return 0;
518         }
519 
520         case SINK_MESSAGE_REQUEST:
521 
522             pa_assert(offset > 0);
523             u->requested_bytes += (size_t) offset;
524 
525             if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
526                 send_data(u);
527 
528             return 0;
529 
530         case SINK_MESSAGE_REMOTE_SUSPEND:
531 
532             stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
533             return 0;
534 
535         case SINK_MESSAGE_UPDATE_LATENCY: {
536             pa_usec_t y;
537 
538             y = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
539 
540             if (y > (pa_usec_t) offset)
541                 y -= (pa_usec_t) offset;
542             else
543                 y = 0;
544 
545             pa_smoother_put(u->smoother, pa_rtclock_now(), y);
546 
547             /* We can access this freely here, since the main thread is waiting for us */
548             u->thread_transport_usec = u->transport_usec;
549 
550             return 0;
551         }
552 
553         case SINK_MESSAGE_POST:
554 
555             /* OK, This might be a bit confusing. This message is
556              * delivered to us from the main context -- NOT from the
557              * IO thread context where the rest of the messages are
558              * dispatched. Yeah, ugly, but I am a lazy bastard. */
559 
560             pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
561 
562             u->counter_delta += (int64_t) chunk->length;
563 
564             return 0;
565     }
566 
567     return pa_sink_process_msg(o, code, data, offset, chunk);
568 }
569 
570 /* Called from main context */
sink_set_state_in_main_thread_cb(pa_sink * s,pa_sink_state_t state,pa_suspend_cause_t suspend_cause)571 static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) {
572     struct userdata *u;
573     pa_sink_assert_ref(s);
574     u = s->userdata;
575 
576     /* It may be that only the suspend cause is changing, in which
577      * case there's nothing to do. */
578     if (state == s->state)
579         return 0;
580 
581     switch ((pa_sink_state_t) state) {
582 
583         case PA_SINK_SUSPENDED:
584             pa_assert(PA_SINK_IS_OPENED(s->state));
585             stream_cork(u, true);
586             break;
587 
588         case PA_SINK_IDLE:
589         case PA_SINK_RUNNING:
590             if (s->state == PA_SINK_SUSPENDED)
591                 stream_cork(u, false);
592             break;
593 
594         case PA_SINK_UNLINKED:
595         case PA_SINK_INIT:
596         case PA_SINK_INVALID_STATE:
597             ;
598     }
599 
600     return 0;
601 }
602 
603 #else
604 
605 /* This function is called from IO context -- except when it is not. */
source_process_msg(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)606 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
607     struct userdata *u = PA_SOURCE(o)->userdata;
608 
609     switch (code) {
610 
611         case PA_SOURCE_MESSAGE_SET_STATE: {
612             int r;
613 
614             if ((r = pa_source_process_msg(o, code, data, offset, chunk)) >= 0)
615                 stream_cork_within_thread(u, u->source->thread_info.state == PA_SOURCE_SUSPENDED);
616 
617             return r;
618         }
619 
620         case PA_SOURCE_MESSAGE_GET_LATENCY: {
621             pa_usec_t yr, yl;
622             int64_t *usec = data;
623 
624             yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SOURCE(o)->sample_spec);
625             yr = pa_smoother_get(u->smoother, pa_rtclock_now());
626 
627             *usec = (int64_t)yr - yl;
628             return 0;
629         }
630 
631         case SOURCE_MESSAGE_POST: {
632             pa_memchunk c;
633 
634             pa_mcalign_push(u->mcalign, chunk);
635 
636             while (pa_mcalign_pop(u->mcalign, &c) >= 0) {
637 
638                 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
639                     pa_source_post(u->source, &c);
640 
641                 pa_memblock_unref(c.memblock);
642 
643                 u->counter += (int64_t) c.length;
644             }
645 
646             return 0;
647         }
648 
649         case SOURCE_MESSAGE_REMOTE_SUSPEND:
650 
651             stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
652             return 0;
653 
654         case SOURCE_MESSAGE_UPDATE_LATENCY: {
655             pa_usec_t y;
656 
657             y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec);
658             y += (pa_usec_t) offset;
659 
660             pa_smoother_put(u->smoother, pa_rtclock_now(), y);
661 
662             /* We can access this freely here, since the main thread is waiting for us */
663             u->thread_transport_usec = u->transport_usec;
664 
665             return 0;
666         }
667     }
668 
669     return pa_source_process_msg(o, code, data, offset, chunk);
670 }
671 
672 /* Called from main context */
source_set_state_in_main_thread_cb(pa_source * s,pa_source_state_t state,pa_suspend_cause_t suspend_cause)673 static int source_set_state_in_main_thread_cb(pa_source *s, pa_source_state_t state, pa_suspend_cause_t suspend_cause) {
674     struct userdata *u;
675     pa_source_assert_ref(s);
676     u = s->userdata;
677 
678     /* It may be that only the suspend cause is changing, in which
679      * case there's nothing to do. */
680     if (state == s->state)
681         return 0;
682 
683     switch ((pa_source_state_t) state) {
684 
685         case PA_SOURCE_SUSPENDED:
686             pa_assert(PA_SOURCE_IS_OPENED(s->state));
687             stream_cork(u, true);
688             break;
689 
690         case PA_SOURCE_IDLE:
691         case PA_SOURCE_RUNNING:
692             if (s->state == PA_SOURCE_SUSPENDED)
693                 stream_cork(u, false);
694             break;
695 
696         case PA_SOURCE_UNLINKED:
697         case PA_SOURCE_INIT:
698         case PA_SOURCE_INVALID_STATE:
699             ;
700     }
701 
702     return 0;
703 }
704 
705 #endif
706 
thread_func(void * userdata)707 static void thread_func(void *userdata) {
708     struct userdata *u = userdata;
709 
710     pa_assert(u);
711 
712     pa_log_debug("Thread starting up");
713 
714     pa_thread_mq_install(&u->thread_mq);
715 
716     for (;;) {
717         int ret;
718 
719 #ifdef TUNNEL_SINK
720         if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
721             pa_sink_process_rewind(u->sink, 0);
722 #endif
723 
724         if ((ret = pa_rtpoll_run(u->rtpoll)) < 0)
725             goto fail;
726 
727         if (ret == 0)
728             goto finish;
729     }
730 
731 fail:
732     /* If this was no regular exit from the loop we have to continue
733      * processing messages until we received PA_MESSAGE_SHUTDOWN */
734     pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
735     pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
736 
737 finish:
738     pa_log_debug("Thread shutting down");
739 }
740 
741 #ifdef TUNNEL_SINK
742 /* Called from main context */
command_request(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)743 static void command_request(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
744     struct userdata *u = userdata;
745     uint32_t bytes, channel;
746 
747     pa_assert(pd);
748     pa_assert(command == PA_COMMAND_REQUEST);
749     pa_assert(t);
750     pa_assert(u);
751     pa_assert(u->pdispatch == pd);
752 
753     if (pa_tagstruct_getu32(t, &channel) < 0 ||
754         pa_tagstruct_getu32(t, &bytes) < 0) {
755         pa_log("Invalid protocol reply");
756         goto fail;
757     }
758 
759     if (channel != u->channel) {
760         pa_log("Received data for invalid channel");
761         goto fail;
762     }
763 
764     pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
765     return;
766 
767 fail:
768     pa_module_unload_request(u->module, true);
769 }
770 
771 #endif
772 
773 /* Called from main context */
stream_get_latency_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)774 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
775     struct userdata *u = userdata;
776     pa_usec_t sink_usec, source_usec;
777     bool playing;
778     int64_t write_index, read_index;
779     struct timeval local, remote, now;
780     pa_sample_spec *ss;
781     int64_t delay;
782 
783     pa_assert(pd);
784     pa_assert(u);
785 
786     if (command != PA_COMMAND_REPLY) {
787         if (command == PA_COMMAND_ERROR)
788             pa_log("Failed to get latency.");
789         else
790             pa_log("Protocol error.");
791         goto fail;
792     }
793 
794     if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
795         pa_tagstruct_get_usec(t, &source_usec) < 0 ||
796         pa_tagstruct_get_boolean(t, &playing) < 0 ||
797         pa_tagstruct_get_timeval(t, &local) < 0 ||
798         pa_tagstruct_get_timeval(t, &remote) < 0 ||
799         pa_tagstruct_gets64(t, &write_index) < 0 ||
800         pa_tagstruct_gets64(t, &read_index) < 0) {
801         pa_log("Invalid reply.");
802         goto fail;
803     }
804 
805 #ifdef TUNNEL_SINK
806     if (u->version >= 13) {
807         uint64_t underrun_for = 0, playing_for = 0;
808 
809         if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
810             pa_tagstruct_getu64(t, &playing_for) < 0) {
811             pa_log("Invalid reply.");
812             goto fail;
813         }
814     }
815 #endif
816 
817     if (!pa_tagstruct_eof(t)) {
818         pa_log("Invalid reply.");
819         goto fail;
820     }
821 
822     if (tag < u->ignore_latency_before) {
823         return;
824     }
825 
826     pa_gettimeofday(&now);
827 
828     /* Calculate transport usec */
829     if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
830         /* local and remote seem to have synchronized clocks */
831 #ifdef TUNNEL_SINK
832         u->transport_usec = pa_timeval_diff(&remote, &local);
833 #else
834         u->transport_usec = pa_timeval_diff(&now, &remote);
835 #endif
836     } else
837         u->transport_usec = pa_timeval_diff(&now, &local)/2;
838 
839     /* First, take the device's delay */
840 #ifdef TUNNEL_SINK
841     delay = (int64_t) sink_usec;
842     ss = &u->sink->sample_spec;
843 #else
844     delay = (int64_t) source_usec;
845     ss = &u->source->sample_spec;
846 #endif
847 
848     /* Add the length of our server-side buffer */
849     if (write_index >= read_index)
850         delay += (int64_t) pa_bytes_to_usec((uint64_t) (write_index-read_index), ss);
851     else
852         delay -= (int64_t) pa_bytes_to_usec((uint64_t) (read_index-write_index), ss);
853 
854     /* Our measurements are already out of date, hence correct by the     *
855      * transport latency */
856 #ifdef TUNNEL_SINK
857     delay -= (int64_t) u->transport_usec;
858 #else
859     delay += (int64_t) u->transport_usec;
860 #endif
861 
862     /* Now correct by what we have have read/written since we requested the update */
863 #ifdef TUNNEL_SINK
864     delay += (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
865 #else
866     delay -= (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
867 #endif
868 
869 #ifdef TUNNEL_SINK
870     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
871 #else
872     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
873 #endif
874 
875     return;
876 
877 fail:
878 
879     pa_module_unload_request(u->module, true);
880 }
881 
882 /* Called from main context */
request_latency(struct userdata * u)883 static void request_latency(struct userdata *u) {
884     pa_tagstruct *t;
885     struct timeval now;
886     uint32_t tag;
887     pa_assert(u);
888 
889     t = pa_tagstruct_new();
890 #ifdef TUNNEL_SINK
891     pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
892 #else
893     pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
894 #endif
895     pa_tagstruct_putu32(t, tag = u->ctag++);
896     pa_tagstruct_putu32(t, u->channel);
897 
898     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
899 
900     pa_pstream_send_tagstruct(u->pstream, t);
901     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
902 
903     u->ignore_latency_before = tag;
904     u->counter_delta = 0;
905 }
906 
907 /* Called from main context */
timeout_callback(pa_mainloop_api * m,pa_time_event * e,const struct timeval * t,void * userdata)908 static void timeout_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
909     struct userdata *u = userdata;
910 
911     pa_assert(m);
912     pa_assert(e);
913     pa_assert(u);
914 
915     request_latency(u);
916 
917     pa_core_rttime_restart(u->core, e, pa_rtclock_now() + LATENCY_INTERVAL);
918 }
919 
920 /* Called from main context */
update_description(struct userdata * u)921 static void update_description(struct userdata *u) {
922     char *d;
923     char un[128], hn[128];
924     pa_tagstruct *t;
925 
926     pa_assert(u);
927 
928     if (!u->server_fqdn || !u->user_name || !u->device_description)
929         return;
930 
931     d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
932 
933 #ifdef TUNNEL_SINK
934     pa_sink_set_description(u->sink, d);
935     pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
936     pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
937     pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
938 #else
939     pa_source_set_description(u->source, d);
940     pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
941     pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
942     pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
943 #endif
944 
945     pa_xfree(d);
946 
947     d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
948                           pa_get_user_name(un, sizeof(un)),
949                           pa_get_host_name(hn, sizeof(hn)));
950 
951     t = pa_tagstruct_new();
952 #ifdef TUNNEL_SINK
953     pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
954 #else
955     pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
956 #endif
957     pa_tagstruct_putu32(t, u->ctag++);
958     pa_tagstruct_putu32(t, u->channel);
959     pa_tagstruct_puts(t, d);
960     pa_pstream_send_tagstruct(u->pstream, t);
961 
962     pa_xfree(d);
963 }
964 
965 /* Called from main context */
server_info_cb(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)966 static void server_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
967     struct userdata *u = userdata;
968     pa_sample_spec ss;
969     pa_channel_map cm;
970     const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
971     uint32_t cookie;
972 
973     pa_assert(pd);
974     pa_assert(u);
975 
976     if (command != PA_COMMAND_REPLY) {
977         if (command == PA_COMMAND_ERROR)
978             pa_log("Failed to get info.");
979         else
980             pa_log("Protocol error.");
981         goto fail;
982     }
983 
984     if (pa_tagstruct_gets(t, &server_name) < 0 ||
985         pa_tagstruct_gets(t, &server_version) < 0 ||
986         pa_tagstruct_gets(t, &user_name) < 0 ||
987         pa_tagstruct_gets(t, &host_name) < 0 ||
988         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
989         pa_tagstruct_gets(t, &default_sink_name) < 0 ||
990         pa_tagstruct_gets(t, &default_source_name) < 0 ||
991         pa_tagstruct_getu32(t, &cookie) < 0 ||
992         (u->version >= 15 && pa_tagstruct_get_channel_map(t, &cm) < 0)) {
993 
994         pa_log("Parse failure");
995         goto fail;
996     }
997 
998     if (!pa_tagstruct_eof(t)) {
999         pa_log("Packet too long");
1000         goto fail;
1001     }
1002 
1003     pa_xfree(u->server_fqdn);
1004     u->server_fqdn = pa_xstrdup(host_name);
1005 
1006     pa_xfree(u->user_name);
1007     u->user_name = pa_xstrdup(user_name);
1008 
1009     update_description(u);
1010 
1011     return;
1012 
1013 fail:
1014     pa_module_unload_request(u->module, true);
1015 }
1016 
read_ports(struct userdata * u,pa_tagstruct * t)1017 static int read_ports(struct userdata *u, pa_tagstruct *t) {
1018     if (u->version >= 16) {
1019         uint32_t n_ports;
1020         const char *s;
1021 
1022         if (pa_tagstruct_getu32(t, &n_ports)) {
1023             pa_log("Parse failure");
1024             return -PA_ERR_PROTOCOL;
1025         }
1026 
1027         for (uint32_t j = 0; j < n_ports; j++) {
1028             uint32_t priority;
1029 
1030             if (pa_tagstruct_gets(t, &s) < 0 || /* name */
1031                 pa_tagstruct_gets(t, &s) < 0 || /* description */
1032                 pa_tagstruct_getu32(t, &priority) < 0) {
1033 
1034                 pa_log("Parse failure");
1035                 return -PA_ERR_PROTOCOL;
1036             }
1037             if (u->version >= 24) {
1038                 if (pa_tagstruct_getu32(t, &priority) < 0) { /* available */
1039                     pa_log("Parse failure");
1040                     return -PA_ERR_PROTOCOL;
1041                 }
1042                 if (u->version >= 34 &&
1043                     (pa_tagstruct_gets(t, &s) < 0 || /* availability group */
1044                      pa_tagstruct_getu32(t, &priority) < 0)) { /* device port type */
1045                     pa_log("Parse failure");
1046                     return -PA_ERR_PROTOCOL;
1047                 }
1048             }
1049         }
1050 
1051         if (pa_tagstruct_gets(t, &s) < 0) { /* active port */
1052             pa_log("Parse failure");
1053             return -PA_ERR_PROTOCOL;
1054         }
1055     }
1056     return 0;
1057 }
1058 
read_formats(struct userdata * u,pa_tagstruct * t)1059 static int read_formats(struct userdata *u, pa_tagstruct *t) {
1060     uint8_t n_formats;
1061     pa_format_info *format;
1062 
1063     if (pa_tagstruct_getu8(t, &n_formats) < 0) { /* no. of formats */
1064         pa_log("Parse failure");
1065         return -PA_ERR_PROTOCOL;
1066     }
1067 
1068     for (uint8_t j = 0; j < n_formats; j++) {
1069         format = pa_format_info_new();
1070         if (pa_tagstruct_get_format_info(t, format)) { /* format info */
1071             pa_format_info_free(format);
1072             pa_log("Parse failure");
1073             return -PA_ERR_PROTOCOL;
1074         }
1075         pa_format_info_free(format);
1076     }
1077     return 0;
1078 }
1079 
1080 #ifdef TUNNEL_SINK
1081 
1082 /* Called from main context */
sink_info_cb(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1083 static void sink_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1084     struct userdata *u = userdata;
1085     uint32_t idx, owner_module, monitor_source, flags;
1086     const char *name, *description, *monitor_source_name, *driver;
1087     pa_sample_spec ss;
1088     pa_channel_map cm;
1089     pa_cvolume volume;
1090     bool mute;
1091     pa_usec_t latency;
1092 
1093     pa_assert(pd);
1094     pa_assert(u);
1095 
1096     if (command != PA_COMMAND_REPLY) {
1097         if (command == PA_COMMAND_ERROR)
1098             pa_log("Failed to get info.");
1099         else
1100             pa_log("Protocol error.");
1101         goto fail;
1102     }
1103 
1104     if (pa_tagstruct_getu32(t, &idx) < 0 ||
1105         pa_tagstruct_gets(t, &name) < 0 ||
1106         pa_tagstruct_gets(t, &description) < 0 ||
1107         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1108         pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1109         pa_tagstruct_getu32(t, &owner_module) < 0 ||
1110         pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1111         pa_tagstruct_get_boolean(t, &mute) < 0 ||
1112         pa_tagstruct_getu32(t, &monitor_source) < 0 ||
1113         pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
1114         pa_tagstruct_get_usec(t, &latency) < 0 ||
1115         pa_tagstruct_gets(t, &driver) < 0 ||
1116         pa_tagstruct_getu32(t, &flags) < 0) {
1117 
1118         pa_log("Parse failure");
1119         goto fail;
1120     }
1121 
1122     if (u->version >= 13) {
1123         pa_usec_t configured_latency;
1124 
1125         if (pa_tagstruct_get_proplist(t, NULL) < 0 ||
1126             pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1127 
1128             pa_log("Parse failure");
1129             goto fail;
1130         }
1131     }
1132 
1133     if (u->version >= 15) {
1134         pa_volume_t base_volume;
1135         uint32_t state, n_volume_steps, card;
1136 
1137         if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1138             pa_tagstruct_getu32(t, &state) < 0 ||
1139             pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1140             pa_tagstruct_getu32(t, &card) < 0) {
1141 
1142             pa_log("Parse failure");
1143             goto fail;
1144         }
1145     }
1146 
1147     if (read_ports(u, t) < 0)
1148         goto fail;
1149 
1150     if (u->version >= 21 && read_formats(u, t) < 0)
1151         goto fail;
1152 
1153     if (!pa_tagstruct_eof(t)) {
1154         pa_log("Packet too long");
1155         goto fail;
1156     }
1157 
1158     if (!u->sink_name || !pa_streq(name, u->sink_name))
1159         return;
1160 
1161     pa_xfree(u->device_description);
1162     u->device_description = pa_xstrdup(description);
1163 
1164     update_description(u);
1165 
1166     return;
1167 
1168 fail:
1169     pa_module_unload_request(u->module, true);
1170 }
1171 
1172 /* Called from main context */
sink_input_info_cb(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1173 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1174     struct userdata *u = userdata;
1175     uint32_t idx, owner_module, client, sink;
1176     pa_usec_t buffer_usec, sink_usec;
1177     const char *name, *driver, *resample_method;
1178     bool mute = false;
1179     pa_sample_spec sample_spec;
1180     pa_channel_map channel_map;
1181     pa_cvolume volume;
1182     bool b;
1183 
1184     pa_assert(pd);
1185     pa_assert(u);
1186 
1187     if (command != PA_COMMAND_REPLY) {
1188         if (command == PA_COMMAND_ERROR)
1189             pa_log("Failed to get info.");
1190         else
1191             pa_log("Protocol error.");
1192         goto fail;
1193     }
1194 
1195     if (pa_tagstruct_getu32(t, &idx) < 0 ||
1196         pa_tagstruct_gets(t, &name) < 0 ||
1197         pa_tagstruct_getu32(t, &owner_module) < 0 ||
1198         pa_tagstruct_getu32(t, &client) < 0 ||
1199         pa_tagstruct_getu32(t, &sink) < 0 ||
1200         pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1201         pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1202         pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1203         pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1204         pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1205         pa_tagstruct_gets(t, &resample_method) < 0 ||
1206         pa_tagstruct_gets(t, &driver) < 0) {
1207 
1208         pa_log("Parse failure");
1209         goto fail;
1210     }
1211 
1212     if (u->version >= 11) {
1213         if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1214 
1215             pa_log("Parse failure");
1216             goto fail;
1217         }
1218     }
1219 
1220     if (u->version >= 13) {
1221         if (pa_tagstruct_get_proplist(t, NULL) < 0) {
1222 
1223             pa_log("Parse failure");
1224             goto fail;
1225         }
1226     }
1227 
1228     if (u->version >= 19) {
1229         if (pa_tagstruct_get_boolean(t, &b) < 0) {
1230 
1231             pa_log("Parse failure");
1232             goto fail;
1233         }
1234     }
1235 
1236     if (u->version >= 20) {
1237         if (pa_tagstruct_get_boolean(t, &b) < 0 ||
1238             pa_tagstruct_get_boolean(t, &b) < 0) {
1239 
1240             pa_log("Parse failure");
1241             goto fail;
1242         }
1243     }
1244 
1245     if (u->version >= 21) {
1246         pa_format_info *format = pa_format_info_new();
1247 
1248         if (pa_tagstruct_get_format_info(t, format) < 0) {
1249             pa_format_info_free(format);
1250             pa_log("Parse failure");
1251             goto fail;
1252         }
1253         pa_format_info_free(format);
1254     }
1255 
1256     if (!pa_tagstruct_eof(t)) {
1257         pa_log("Packet too long");
1258         goto fail;
1259     }
1260 
1261     if (idx != u->device_index)
1262         return;
1263 
1264     pa_assert(u->sink);
1265 
1266     if ((u->version < 11 || mute == u->sink->muted) &&
1267         pa_cvolume_equal(&volume, &u->sink->real_volume))
1268         return;
1269 
1270     pa_sink_volume_changed(u->sink, &volume);
1271 
1272     if (u->version >= 11)
1273         pa_sink_mute_changed(u->sink, mute);
1274 
1275     return;
1276 
1277 fail:
1278     pa_module_unload_request(u->module, true);
1279 }
1280 
1281 #else
1282 
1283 /* Called from main context */
source_info_cb(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1284 static void source_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1285     struct userdata *u = userdata;
1286     uint32_t idx, owner_module, monitor_of_sink, flags;
1287     const char *name, *description, *monitor_of_sink_name, *driver;
1288     pa_sample_spec ss;
1289     pa_channel_map cm;
1290     pa_cvolume volume;
1291     bool mute;
1292     pa_usec_t latency, configured_latency;
1293 
1294     pa_assert(pd);
1295     pa_assert(u);
1296 
1297     if (command != PA_COMMAND_REPLY) {
1298         if (command == PA_COMMAND_ERROR)
1299             pa_log("Failed to get info.");
1300         else
1301             pa_log("Protocol error.");
1302         goto fail;
1303     }
1304 
1305     if (pa_tagstruct_getu32(t, &idx) < 0 ||
1306         pa_tagstruct_gets(t, &name) < 0 ||
1307         pa_tagstruct_gets(t, &description) < 0 ||
1308         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1309         pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1310         pa_tagstruct_getu32(t, &owner_module) < 0 ||
1311         pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1312         pa_tagstruct_get_boolean(t, &mute) < 0 ||
1313         pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1314         pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1315         pa_tagstruct_get_usec(t, &latency) < 0 ||
1316         pa_tagstruct_gets(t, &driver) < 0 ||
1317         pa_tagstruct_getu32(t, &flags) < 0) {
1318 
1319         pa_log("Parse failure");
1320         goto fail;
1321     }
1322 
1323     if (u->version >= 13) {
1324         if (pa_tagstruct_get_proplist(t, NULL) < 0 ||
1325             pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1326 
1327             pa_log("Parse failure");
1328             goto fail;
1329         }
1330     }
1331 
1332     if (u->version >= 15) {
1333         pa_volume_t base_volume;
1334         uint32_t state, n_volume_steps, card;
1335 
1336         if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1337             pa_tagstruct_getu32(t, &state) < 0 ||
1338             pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1339             pa_tagstruct_getu32(t, &card) < 0) {
1340 
1341             pa_log("Parse failure");
1342             goto fail;
1343         }
1344     }
1345 
1346     if (read_ports(u, t) < 0)
1347         goto fail;
1348 
1349     if (u->version >= 22 && read_formats(u, t) < 0)
1350         goto fail;
1351 
1352     if (!pa_tagstruct_eof(t)) {
1353         pa_log("Packet too long");
1354         goto fail;
1355     }
1356 
1357     if (!u->source_name || !pa_streq(name, u->source_name))
1358         return;
1359 
1360     pa_xfree(u->device_description);
1361     u->device_description = pa_xstrdup(description);
1362 
1363     update_description(u);
1364 
1365     return;
1366 
1367 fail:
1368     pa_module_unload_request(u->module, true);
1369 }
1370 
1371 #endif
1372 
1373 /* Called from main context */
request_info(struct userdata * u)1374 static void request_info(struct userdata *u) {
1375     pa_tagstruct *t;
1376     uint32_t tag;
1377     pa_assert(u);
1378 
1379     t = pa_tagstruct_new();
1380     pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1381     pa_tagstruct_putu32(t, tag = u->ctag++);
1382     pa_pstream_send_tagstruct(u->pstream, t);
1383     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1384 
1385 #ifdef TUNNEL_SINK
1386     t = pa_tagstruct_new();
1387     pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1388     pa_tagstruct_putu32(t, tag = u->ctag++);
1389     pa_tagstruct_putu32(t, u->device_index);
1390     pa_pstream_send_tagstruct(u->pstream, t);
1391     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1392 
1393     if (u->sink_name) {
1394         t = pa_tagstruct_new();
1395         pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1396         pa_tagstruct_putu32(t, tag = u->ctag++);
1397         pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1398         pa_tagstruct_puts(t, u->sink_name);
1399         pa_pstream_send_tagstruct(u->pstream, t);
1400         pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1401     }
1402 #else
1403     if (u->source_name) {
1404         t = pa_tagstruct_new();
1405         pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1406         pa_tagstruct_putu32(t, tag = u->ctag++);
1407         pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1408         pa_tagstruct_puts(t, u->source_name);
1409         pa_pstream_send_tagstruct(u->pstream, t);
1410         pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1411     }
1412 #endif
1413 }
1414 
1415 /* Called from main context */
command_subscribe_event(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1416 static void command_subscribe_event(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1417     struct userdata *u = userdata;
1418     pa_subscription_event_type_t e;
1419     uint32_t idx;
1420 
1421     pa_assert(pd);
1422     pa_assert(t);
1423     pa_assert(u);
1424     pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1425 
1426     if (pa_tagstruct_getu32(t, &e) < 0 ||
1427         pa_tagstruct_getu32(t, &idx) < 0) {
1428         pa_log("Invalid protocol reply");
1429         pa_module_unload_request(u->module, true);
1430         return;
1431     }
1432 
1433     if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1434 #ifdef TUNNEL_SINK
1435         e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1436         e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1437 #else
1438         e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1439 #endif
1440         )
1441         return;
1442 
1443     request_info(u);
1444 }
1445 
1446 /* Called from main context */
start_subscribe(struct userdata * u)1447 static void start_subscribe(struct userdata *u) {
1448     pa_tagstruct *t;
1449     pa_assert(u);
1450 
1451     t = pa_tagstruct_new();
1452     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1453     pa_tagstruct_putu32(t, u->ctag++);
1454     pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1455 #ifdef TUNNEL_SINK
1456                         PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1457 #else
1458                         PA_SUBSCRIPTION_MASK_SOURCE
1459 #endif
1460                         );
1461 
1462     pa_pstream_send_tagstruct(u->pstream, t);
1463 }
1464 
1465 /* Called from main context */
create_stream_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1466 static void create_stream_callback(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1467     struct userdata *u = userdata;
1468 #ifdef TUNNEL_SINK
1469     uint32_t bytes;
1470 #endif
1471 
1472     pa_assert(pd);
1473     pa_assert(u);
1474     pa_assert(u->pdispatch == pd);
1475 
1476     if (command != PA_COMMAND_REPLY) {
1477         if (command == PA_COMMAND_ERROR)
1478             pa_log("Failed to create stream.");
1479         else
1480             pa_log("Protocol error.");
1481         goto fail;
1482     }
1483 
1484     if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1485         pa_tagstruct_getu32(t, &u->device_index) < 0
1486 #ifdef TUNNEL_SINK
1487         || pa_tagstruct_getu32(t, &bytes) < 0
1488 #endif
1489         )
1490         goto parse_error;
1491 
1492     if (u->version >= 9) {
1493 #ifdef TUNNEL_SINK
1494         if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1495             pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1496             pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1497             pa_tagstruct_getu32(t, &u->minreq) < 0)
1498             goto parse_error;
1499 #else
1500         if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1501             pa_tagstruct_getu32(t, &u->fragsize) < 0)
1502             goto parse_error;
1503 #endif
1504     }
1505 
1506     if (u->version >= 12) {
1507         pa_sample_spec ss;
1508         pa_channel_map cm;
1509         uint32_t device_index;
1510         const char *dn;
1511         bool suspended;
1512 
1513         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1514             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1515             pa_tagstruct_getu32(t, &device_index) < 0 ||
1516             pa_tagstruct_gets(t, &dn) < 0 ||
1517             pa_tagstruct_get_boolean(t, &suspended) < 0)
1518             goto parse_error;
1519 
1520 #ifdef TUNNEL_SINK
1521         pa_xfree(u->sink_name);
1522         u->sink_name = pa_xstrdup(dn);
1523 #else
1524         pa_xfree(u->source_name);
1525         u->source_name = pa_xstrdup(dn);
1526 #endif
1527     }
1528 
1529     if (u->version >= 13) {
1530         pa_usec_t usec;
1531 
1532         if (pa_tagstruct_get_usec(t, &usec) < 0)
1533             goto parse_error;
1534 
1535 /* #ifdef TUNNEL_SINK */
1536 /*         pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1537 /* #else */
1538 /*         pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1539 /* #endif */
1540     }
1541 
1542     if (u->version >= 21) {
1543         pa_format_info *format = pa_format_info_new();
1544 
1545         if (pa_tagstruct_get_format_info(t, format) < 0) {
1546             pa_format_info_free(format);
1547             goto parse_error;
1548         }
1549 
1550         pa_format_info_free(format);
1551     }
1552 
1553     if (!pa_tagstruct_eof(t))
1554         goto parse_error;
1555 
1556     start_subscribe(u);
1557     request_info(u);
1558 
1559     pa_assert(!u->time_event);
1560     u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + LATENCY_INTERVAL, timeout_callback, u);
1561 
1562     request_latency(u);
1563 
1564     pa_log_debug("Stream created.");
1565 
1566 #ifdef TUNNEL_SINK
1567     pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1568 #endif
1569 
1570     return;
1571 
1572 parse_error:
1573     pa_log("Invalid reply. (Create stream)");
1574 
1575 fail:
1576     pa_module_unload_request(u->module, true);
1577 
1578 }
1579 
1580 /* Called from main context */
setup_complete_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1581 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1582     struct userdata *u = userdata;
1583     pa_tagstruct *reply;
1584     char name[256], un[128], hn[128];
1585     pa_cvolume volume;
1586 
1587     pa_assert(pd);
1588     pa_assert(u);
1589     pa_assert(u->pdispatch == pd);
1590 
1591     if (command != PA_COMMAND_REPLY ||
1592         pa_tagstruct_getu32(t, &u->version) < 0 ||
1593         !pa_tagstruct_eof(t)) {
1594 
1595         if (command == PA_COMMAND_ERROR)
1596             pa_log("Failed to authenticate");
1597         else
1598             pa_log("Protocol error.");
1599 
1600         goto fail;
1601     }
1602 
1603     /* Minimum supported protocol version */
1604     if (u->version < 8) {
1605         pa_log("Incompatible protocol version");
1606         goto fail;
1607     }
1608 
1609     /* Starting with protocol version 13 the MSB of the version tag
1610     reflects if shm is enabled for this connection or not. We don't
1611     support SHM here at all, so we just ignore this. */
1612 
1613     if (u->version >= 13)
1614         u->version &= 0x7FFFFFFFU;
1615 
1616     pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1617 
1618 #ifdef TUNNEL_SINK
1619     pa_proplist_setf(u->sink->proplist, "tunnel.remote_version", "%u", u->version);
1620     pa_sink_update_proplist(u->sink, 0, NULL);
1621 
1622     pa_snprintf(name, sizeof(name), "%s for %s@%s",
1623                 u->sink_name,
1624                 pa_get_user_name(un, sizeof(un)),
1625                 pa_get_host_name(hn, sizeof(hn)));
1626 #else
1627     pa_proplist_setf(u->source->proplist, "tunnel.remote_version", "%u", u->version);
1628     pa_source_update_proplist(u->source, 0, NULL);
1629 
1630     pa_snprintf(name, sizeof(name), "%s for %s@%s",
1631                 u->source_name,
1632                 pa_get_user_name(un, sizeof(un)),
1633                 pa_get_host_name(hn, sizeof(hn)));
1634 #endif
1635 
1636     reply = pa_tagstruct_new();
1637     pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1638     pa_tagstruct_putu32(reply, u->ctag++);
1639 
1640     if (u->version >= 13) {
1641         pa_proplist *pl;
1642         pl = pa_proplist_new();
1643         pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1644         pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1645         pa_init_proplist(pl);
1646         pa_tagstruct_put_proplist(reply, pl);
1647         pa_proplist_free(pl);
1648     } else
1649         pa_tagstruct_puts(reply, "PulseAudio");
1650 
1651     pa_pstream_send_tagstruct(u->pstream, reply);
1652     /* We ignore the server's reply here */
1653 
1654     reply = pa_tagstruct_new();
1655 
1656     if (u->version < 13)
1657         /* Only for older PA versions we need to fill in the maxlength */
1658         u->maxlength = 4*1024*1024;
1659 
1660 #ifdef TUNNEL_SINK
1661     u->tlength = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &u->sink->sample_spec);
1662     u->minreq = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &u->sink->sample_spec);
1663     u->prebuf = u->tlength;
1664 #else
1665     u->fragsize = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &u->source->sample_spec);
1666 #endif
1667 
1668 #ifdef TUNNEL_SINK
1669     pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1670     pa_tagstruct_putu32(reply, tag = u->ctag++);
1671 
1672     if (u->version < 13)
1673         pa_tagstruct_puts(reply, name);
1674 
1675     pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1676     pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1677     pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1678     pa_tagstruct_puts(reply, u->sink_name);
1679     pa_tagstruct_putu32(reply, u->maxlength);
1680     pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(u->sink->state));
1681     pa_tagstruct_putu32(reply, u->tlength);
1682     pa_tagstruct_putu32(reply, u->prebuf);
1683     pa_tagstruct_putu32(reply, u->minreq);
1684     pa_tagstruct_putu32(reply, 0);
1685     pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1686     pa_tagstruct_put_cvolume(reply, &volume);
1687 #else
1688     pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1689     pa_tagstruct_putu32(reply, tag = u->ctag++);
1690 
1691     if (u->version < 13)
1692         pa_tagstruct_puts(reply, name);
1693 
1694     pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1695     pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1696     pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1697     pa_tagstruct_puts(reply, u->source_name);
1698     pa_tagstruct_putu32(reply, u->maxlength);
1699     pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(u->source->state));
1700     pa_tagstruct_putu32(reply, u->fragsize);
1701 #endif
1702 
1703     if (u->version >= 12) {
1704         pa_tagstruct_put_boolean(reply, false); /* no_remap */
1705         pa_tagstruct_put_boolean(reply, false); /* no_remix */
1706         pa_tagstruct_put_boolean(reply, false); /* fix_format */
1707         pa_tagstruct_put_boolean(reply, false); /* fix_rate */
1708         pa_tagstruct_put_boolean(reply, false); /* fix_channels */
1709         pa_tagstruct_put_boolean(reply, true); /* no_move */
1710         pa_tagstruct_put_boolean(reply, false); /* variable_rate */
1711     }
1712 
1713     if (u->version >= 13) {
1714         pa_proplist *pl;
1715 
1716         pa_tagstruct_put_boolean(reply, false); /* start muted/peak detect*/
1717         pa_tagstruct_put_boolean(reply, true); /* adjust_latency */
1718 
1719         pl = pa_proplist_new();
1720         pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1721         pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1722         pa_tagstruct_put_proplist(reply, pl);
1723         pa_proplist_free(pl);
1724 
1725 #ifndef TUNNEL_SINK
1726         pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1727 #endif
1728     }
1729 
1730     if (u->version >= 14) {
1731 #ifdef TUNNEL_SINK
1732         pa_tagstruct_put_boolean(reply, false); /* volume_set */
1733 #endif
1734         pa_tagstruct_put_boolean(reply, true); /* early rquests */
1735     }
1736 
1737     if (u->version >= 15) {
1738 #ifdef TUNNEL_SINK
1739         pa_tagstruct_put_boolean(reply, false); /* muted_set */
1740 #endif
1741         pa_tagstruct_put_boolean(reply, false); /* don't inhibit auto suspend */
1742         pa_tagstruct_put_boolean(reply, false); /* fail on suspend */
1743     }
1744 
1745 #ifdef TUNNEL_SINK
1746     if (u->version >= 17)
1747         pa_tagstruct_put_boolean(reply, false); /* relative volume */
1748 
1749     if (u->version >= 18)
1750         pa_tagstruct_put_boolean(reply, false); /* passthrough stream */
1751 #endif
1752 
1753 #ifdef TUNNEL_SINK
1754     if (u->version >= 21) {
1755         /* We're not using the extended API, so n_formats = 0 and that's that */
1756         pa_tagstruct_putu8(reply, 0);
1757     }
1758 #else
1759     if (u->version >= 22) {
1760         /* We're not using the extended API, so n_formats = 0 and that's that */
1761         pa_tagstruct_putu8(reply, 0);
1762         pa_cvolume_reset(&volume, u->source->sample_spec.channels);
1763         pa_tagstruct_put_cvolume(reply, &volume);
1764         pa_tagstruct_put_boolean(reply, false); /* muted */
1765         pa_tagstruct_put_boolean(reply, false); /* volume_set */
1766         pa_tagstruct_put_boolean(reply, false); /* muted_set */
1767         pa_tagstruct_put_boolean(reply, false); /* relative volume */
1768         pa_tagstruct_put_boolean(reply, false); /* passthrough stream */
1769     }
1770 #endif
1771 
1772     pa_pstream_send_tagstruct(u->pstream, reply);
1773     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1774 
1775     pa_log_debug("Connection authenticated, creating stream ...");
1776 
1777     return;
1778 
1779 fail:
1780     pa_module_unload_request(u->module, true);
1781 }
1782 
1783 /* Called from main context */
pstream_die_callback(pa_pstream * p,void * userdata)1784 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1785     struct userdata *u = userdata;
1786 
1787     pa_assert(p);
1788     pa_assert(u);
1789 
1790     pa_log_warn("Stream died.");
1791     pa_module_unload_request(u->module, true);
1792 }
1793 
1794 /* Called from main context */
pstream_packet_callback(pa_pstream * p,pa_packet * packet,pa_cmsg_ancil_data * ancil_data,void * userdata)1795 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, pa_cmsg_ancil_data *ancil_data, void *userdata) {
1796     struct userdata *u = userdata;
1797 
1798     pa_assert(p);
1799     pa_assert(packet);
1800     pa_assert(u);
1801 
1802     if (pa_pdispatch_run(u->pdispatch, packet, ancil_data, u) < 0) {
1803         pa_log("Invalid packet");
1804         pa_module_unload_request(u->module, true);
1805         return;
1806     }
1807 }
1808 
1809 #ifndef TUNNEL_SINK
1810 /* Called from main context */
pstream_memblock_callback(pa_pstream * p,uint32_t channel,int64_t offset,pa_seek_mode_t seek,const pa_memchunk * chunk,void * userdata)1811 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1812     struct userdata *u = userdata;
1813 
1814     pa_assert(p);
1815     pa_assert(chunk);
1816     pa_assert(u);
1817 
1818     if (channel != u->channel) {
1819         pa_log("Received memory block on bad channel.");
1820         pa_module_unload_request(u->module, true);
1821         return;
1822     }
1823 
1824     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1825 
1826     u->counter_delta += (int64_t) chunk->length;
1827 }
1828 #endif
1829 
1830 /* Called from main context */
on_connection(pa_socket_client * sc,pa_iochannel * io,void * userdata)1831 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1832     struct userdata *u = userdata;
1833     pa_tagstruct *t;
1834     uint32_t tag;
1835 
1836     pa_assert(sc);
1837     pa_assert(u);
1838     pa_assert(u->client == sc);
1839 
1840     pa_socket_client_unref(u->client);
1841     u->client = NULL;
1842 
1843     if (!io) {
1844         pa_log("Connection failed: %s", pa_cstrerror(errno));
1845         pa_module_unload_request(u->module, true);
1846         return;
1847     }
1848 
1849     u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
1850     u->pdispatch = pa_pdispatch_new(u->core->mainloop, true, command_table, PA_COMMAND_MAX);
1851 
1852     pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
1853     pa_pstream_set_receive_packet_callback(u->pstream, pstream_packet_callback, u);
1854 #ifndef TUNNEL_SINK
1855     pa_pstream_set_receive_memblock_callback(u->pstream, pstream_memblock_callback, u);
1856 #endif
1857 
1858     t = pa_tagstruct_new();
1859     pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
1860     pa_tagstruct_putu32(t, tag = u->ctag++);
1861     pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
1862 
1863     pa_tagstruct_put_arbitrary(t, pa_auth_cookie_read(u->auth_cookie, PA_NATIVE_COOKIE_LENGTH), PA_NATIVE_COOKIE_LENGTH);
1864 
1865 #ifdef HAVE_CREDS
1866 {
1867     pa_creds ucred;
1868 
1869     if (pa_iochannel_creds_supported(io))
1870         pa_iochannel_creds_enable(io);
1871 
1872     ucred.uid = getuid();
1873     ucred.gid = getgid();
1874 
1875     pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
1876 }
1877 #else
1878     pa_pstream_send_tagstruct(u->pstream, t);
1879 #endif
1880 
1881     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
1882 
1883     pa_log_debug("Connection established, authenticating ...");
1884 }
1885 
1886 #ifdef TUNNEL_SINK
1887 
1888 /* Called from main context */
sink_set_volume(pa_sink * sink)1889 static void sink_set_volume(pa_sink *sink) {
1890     struct userdata *u;
1891     pa_tagstruct *t;
1892 
1893     pa_assert(sink);
1894     u = sink->userdata;
1895     pa_assert(u);
1896 
1897     t = pa_tagstruct_new();
1898     pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
1899     pa_tagstruct_putu32(t, u->ctag++);
1900     pa_tagstruct_putu32(t, u->device_index);
1901     pa_tagstruct_put_cvolume(t, &sink->real_volume);
1902     pa_pstream_send_tagstruct(u->pstream, t);
1903 }
1904 
1905 /* Called from main context */
sink_set_mute(pa_sink * sink)1906 static void sink_set_mute(pa_sink *sink) {
1907     struct userdata *u;
1908     pa_tagstruct *t;
1909 
1910     pa_assert(sink);
1911     u = sink->userdata;
1912     pa_assert(u);
1913 
1914     if (u->version < 11)
1915         return;
1916 
1917     t = pa_tagstruct_new();
1918     pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
1919     pa_tagstruct_putu32(t, u->ctag++);
1920     pa_tagstruct_putu32(t, u->device_index);
1921     pa_tagstruct_put_boolean(t, sink->muted);
1922     pa_pstream_send_tagstruct(u->pstream, t);
1923 }
1924 
1925 #endif
1926 
pa__init(pa_module * m)1927 int pa__init(pa_module*m) {
1928     pa_modargs *ma = NULL;
1929     struct userdata *u = NULL;
1930     char *server = NULL;
1931     pa_strlist *server_list = NULL;
1932     pa_sample_spec ss;
1933     pa_channel_map map;
1934     char *dn = NULL;
1935 #ifdef TUNNEL_SINK
1936     pa_sink_new_data data;
1937 #else
1938     pa_source_new_data data;
1939 #endif
1940     bool automatic;
1941 #ifdef HAVE_X11
1942     xcb_connection_t *xcb = NULL;
1943 #endif
1944     const char *cookie_path;
1945 
1946     pa_assert(m);
1947 
1948     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1949         pa_log("Failed to parse module arguments");
1950         goto fail;
1951     }
1952 
1953     m->userdata = u = pa_xnew0(struct userdata, 1);
1954     u->core = m->core;
1955     u->module = m;
1956     u->client = NULL;
1957     u->pdispatch = NULL;
1958     u->pstream = NULL;
1959     u->server_name = NULL;
1960 #ifdef TUNNEL_SINK
1961     u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
1962     u->sink = NULL;
1963     u->requested_bytes = 0;
1964 #else
1965     u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
1966     u->source = NULL;
1967 #endif
1968     u->smoother = pa_smoother_new(
1969             PA_USEC_PER_SEC,
1970             PA_USEC_PER_SEC*2,
1971             true,
1972             true,
1973             10,
1974             pa_rtclock_now(),
1975             false);
1976     u->ctag = 1;
1977     u->device_index = u->channel = PA_INVALID_INDEX;
1978     u->time_event = NULL;
1979     u->ignore_latency_before = 0;
1980     u->transport_usec = u->thread_transport_usec = 0;
1981     u->remote_suspended = u->remote_corked = false;
1982     u->counter = u->counter_delta = 0;
1983 
1984     u->rtpoll = pa_rtpoll_new();
1985 
1986     if (pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll) < 0) {
1987         pa_log("pa_thread_mq_init() failed.");
1988         goto fail;
1989     }
1990 
1991     if (pa_modargs_get_value_boolean(ma, "auto", &automatic) < 0) {
1992         pa_log("Failed to parse argument \"auto\".");
1993         goto fail;
1994     }
1995 
1996     cookie_path = pa_modargs_get_value(ma, "cookie", NULL);
1997     server = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL));
1998 
1999     if (automatic) {
2000 #ifdef HAVE_X11
2001         /* Need an X11 connection to get root properties */
2002         if (getenv("DISPLAY") != NULL) {
2003             if (!(xcb = xcb_connect(getenv("DISPLAY"), NULL)))
2004                 pa_log("xcb_connect() failed");
2005             else {
2006                 if (xcb_connection_has_error(xcb)) {
2007                     pa_log("xcb_connection_has_error() returned true");
2008                     xcb_disconnect(xcb);
2009                     xcb = NULL;
2010                 }
2011             }
2012         }
2013 #endif
2014 
2015         /* Figure out the cookie the same way a normal client would */
2016         if (!cookie_path)
2017             cookie_path = getenv(ENV_COOKIE_FILE);
2018 
2019 #ifdef HAVE_X11
2020         if (!cookie_path && xcb) {
2021             char t[1024];
2022             if (pa_x11_get_prop(xcb, 0, "PULSE_COOKIE", t, sizeof(t))) {
2023                 uint8_t cookie[PA_NATIVE_COOKIE_LENGTH];
2024 
2025                 if (pa_parsehex(t, cookie, sizeof(cookie)) != sizeof(cookie))
2026                     pa_log("Failed to parse cookie data");
2027                 else {
2028                     if (!(u->auth_cookie = pa_auth_cookie_create(u->core, cookie, sizeof(cookie))))
2029                         goto fail;
2030                 }
2031             }
2032         }
2033 #endif
2034 
2035         /* Same thing for the server name */
2036         if (!server)
2037             server = pa_xstrdup(getenv(ENV_DEFAULT_SERVER));
2038 
2039 #ifdef HAVE_X11
2040         if (!server && xcb) {
2041             char t[1024];
2042             if (pa_x11_get_prop(xcb, 0, "PULSE_SERVER", t, sizeof(t)))
2043                 server = pa_xstrdup(t);
2044         }
2045 #endif
2046 
2047         /* Also determine the default sink/source on the other server */
2048 #ifdef TUNNEL_SINK
2049         if (!u->sink_name)
2050             u->sink_name = pa_xstrdup(getenv(ENV_DEFAULT_SINK));
2051 
2052 #ifdef HAVE_X11
2053         if (!u->sink_name && xcb) {
2054             char t[1024];
2055             if (pa_x11_get_prop(xcb, 0, "PULSE_SINK", t, sizeof(t)))
2056                 u->sink_name = pa_xstrdup(t);
2057         }
2058 #endif
2059 #else
2060         if (!u->source_name)
2061             u->source_name = pa_xstrdup(getenv(ENV_DEFAULT_SOURCE));
2062 
2063 #ifdef HAVE_X11
2064         if (!u->source_name && xcb) {
2065             char t[1024];
2066             if (pa_x11_get_prop(xcb, 0, "PULSE_SOURCE", t, sizeof(t)))
2067                 u->source_name = pa_xstrdup(t);
2068         }
2069 #endif
2070 #endif
2071     }
2072 
2073     if (!cookie_path && !u->auth_cookie)
2074         cookie_path = PA_NATIVE_COOKIE_FILE;
2075 
2076     if (cookie_path) {
2077         if (!(u->auth_cookie = pa_auth_cookie_get(u->core, cookie_path, true, PA_NATIVE_COOKIE_LENGTH)))
2078             goto fail;
2079     }
2080 
2081     if (server) {
2082         if (!(server_list = pa_strlist_parse(server))) {
2083             pa_log("Invalid server specified.");
2084             goto fail;
2085         }
2086     } else {
2087         char *ufn;
2088 
2089         if (!automatic) {
2090             pa_log("No server specified.");
2091             goto fail;
2092         }
2093 
2094         pa_log("No server address found. Attempting default local sockets.");
2095 
2096         /* The system wide instance via PF_LOCAL */
2097         server_list = pa_strlist_prepend(server_list, PA_SYSTEM_RUNTIME_PATH PA_PATH_SEP PA_NATIVE_DEFAULT_UNIX_SOCKET);
2098 
2099         /* The user instance via PF_LOCAL */
2100         if ((ufn = pa_runtime_path(PA_NATIVE_DEFAULT_UNIX_SOCKET))) {
2101             server_list = pa_strlist_prepend(server_list, ufn);
2102             pa_xfree(ufn);
2103         }
2104     }
2105 
2106     ss = m->core->default_sample_spec;
2107     map = m->core->default_channel_map;
2108     if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
2109         pa_log("Invalid sample format specification");
2110         goto fail;
2111     }
2112 
2113     for (;;) {
2114         server_list = pa_strlist_pop(server_list, &u->server_name);
2115 
2116         if (!u->server_name) {
2117             pa_log("Failed to connect to server '%s'", server);
2118             goto fail;
2119         }
2120 
2121         pa_log_debug("Trying to connect to %s...", u->server_name);
2122 
2123         if (!(u->client = pa_socket_client_new_string(m->core->mainloop, true, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
2124             pa_xfree(u->server_name);
2125             u->server_name = NULL;
2126             continue;
2127         }
2128 
2129         break;
2130      }
2131 
2132     pa_socket_client_set_callback(u->client, on_connection, u);
2133 
2134 #ifdef TUNNEL_SINK
2135 
2136     if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
2137         dn = pa_sprintf_malloc("tunnel-sink.%s", u->server_name);
2138 
2139     pa_sink_new_data_init(&data);
2140     data.driver = __FILE__;
2141     data.module = m;
2142     data.namereg_fail = false;
2143     pa_sink_new_data_set_name(&data, dn);
2144     pa_sink_new_data_set_sample_spec(&data, &ss);
2145     pa_sink_new_data_set_channel_map(&data, &map);
2146     pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
2147     pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
2148     if (u->sink_name)
2149         pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
2150 
2151     if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
2152         pa_log("Invalid properties");
2153         pa_sink_new_data_done(&data);
2154         goto fail;
2155     }
2156 
2157     u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY);
2158     pa_sink_new_data_done(&data);
2159 
2160     if (!u->sink) {
2161         pa_log("Failed to create sink.");
2162         goto fail;
2163     }
2164 
2165     u->sink->parent.process_msg = sink_process_msg;
2166     u->sink->userdata = u;
2167     u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
2168     pa_sink_set_set_volume_callback(u->sink, sink_set_volume);
2169     pa_sink_set_set_mute_callback(u->sink, sink_set_mute);
2170 
2171     u->sink->refresh_volume = u->sink->refresh_muted = false;
2172 
2173 /*     pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
2174 
2175     pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
2176     pa_sink_set_rtpoll(u->sink, u->rtpoll);
2177 
2178 #else
2179 
2180     if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
2181         dn = pa_sprintf_malloc("tunnel-source.%s", u->server_name);
2182 
2183     pa_source_new_data_init(&data);
2184     data.driver = __FILE__;
2185     data.module = m;
2186     data.namereg_fail = false;
2187     pa_source_new_data_set_name(&data, dn);
2188     pa_source_new_data_set_sample_spec(&data, &ss);
2189     pa_source_new_data_set_channel_map(&data, &map);
2190     pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
2191     pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
2192     if (u->source_name)
2193         pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
2194 
2195     if (pa_modargs_get_proplist(ma, "source_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
2196         pa_log("Invalid properties");
2197         pa_source_new_data_done(&data);
2198         goto fail;
2199     }
2200 
2201     u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
2202     pa_source_new_data_done(&data);
2203 
2204     if (!u->source) {
2205         pa_log("Failed to create source.");
2206         goto fail;
2207     }
2208 
2209     u->source->parent.process_msg = source_process_msg;
2210     u->source->set_state_in_main_thread = source_set_state_in_main_thread_cb;
2211     u->source->userdata = u;
2212 
2213 /*     pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
2214 
2215     pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
2216     pa_source_set_rtpoll(u->source, u->rtpoll);
2217 
2218     u->mcalign = pa_mcalign_new(pa_frame_size(&u->source->sample_spec));
2219 #endif
2220 
2221     u->time_event = NULL;
2222 
2223     u->maxlength = (uint32_t) -1;
2224 #ifdef TUNNEL_SINK
2225     u->tlength = u->minreq = u->prebuf = (uint32_t) -1;
2226 #else
2227     u->fragsize = (uint32_t) -1;
2228 #endif
2229 
2230     if (!(u->thread = pa_thread_new("module-tunnel", thread_func, u))) {
2231         pa_log("Failed to create thread.");
2232         goto fail;
2233     }
2234 
2235 #ifdef TUNNEL_SINK
2236     pa_sink_put(u->sink);
2237 #else
2238     pa_source_put(u->source);
2239 #endif
2240 
2241     pa_xfree(dn);
2242 
2243     if (server)
2244         pa_xfree(server);
2245 
2246     if (server_list)
2247         pa_strlist_free(server_list);
2248 
2249 #ifdef HAVE_X11
2250     if (xcb)
2251         xcb_disconnect(xcb);
2252 #endif
2253 
2254     pa_modargs_free(ma);
2255 
2256     return 0;
2257 
2258 fail:
2259     pa__done(m);
2260 
2261     if (server)
2262         pa_xfree(server);
2263 
2264     if (server_list)
2265         pa_strlist_free(server_list);
2266 
2267 #ifdef HAVE_X11
2268     if (xcb)
2269         xcb_disconnect(xcb);
2270 #endif
2271 
2272     if (ma)
2273         pa_modargs_free(ma);
2274 
2275     pa_xfree(dn);
2276 
2277     return -1;
2278 }
2279 
pa__done(pa_module * m)2280 void pa__done(pa_module*m) {
2281     struct userdata* u;
2282 
2283     pa_assert(m);
2284 
2285     if (!(u = m->userdata))
2286         return;
2287 
2288 #ifdef TUNNEL_SINK
2289     if (u->sink)
2290         pa_sink_unlink(u->sink);
2291 #else
2292     if (u->source)
2293         pa_source_unlink(u->source);
2294 #endif
2295 
2296     if (u->thread) {
2297         pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
2298         pa_thread_free(u->thread);
2299     }
2300 
2301     pa_thread_mq_done(&u->thread_mq);
2302 
2303 #ifdef TUNNEL_SINK
2304     if (u->sink)
2305         pa_sink_unref(u->sink);
2306 #else
2307     if (u->source)
2308         pa_source_unref(u->source);
2309 #endif
2310 
2311     if (u->rtpoll)
2312         pa_rtpoll_free(u->rtpoll);
2313 
2314     if (u->pstream) {
2315         pa_pstream_unlink(u->pstream);
2316         pa_pstream_unref(u->pstream);
2317     }
2318 
2319     if (u->pdispatch)
2320         pa_pdispatch_unref(u->pdispatch);
2321 
2322     if (u->client)
2323         pa_socket_client_unref(u->client);
2324 
2325     if (u->auth_cookie)
2326         pa_auth_cookie_unref(u->auth_cookie);
2327 
2328     if (u->smoother)
2329         pa_smoother_free(u->smoother);
2330 
2331     if (u->time_event)
2332         u->core->mainloop->time_free(u->time_event);
2333 
2334 #ifndef TUNNEL_SINK
2335     if (u->mcalign)
2336         pa_mcalign_free(u->mcalign);
2337 #endif
2338 
2339 #ifdef TUNNEL_SINK
2340     pa_xfree(u->sink_name);
2341 #else
2342     pa_xfree(u->source_name);
2343 #endif
2344     pa_xfree(u->server_name);
2345 
2346     pa_xfree(u->device_description);
2347     pa_xfree(u->server_fqdn);
2348     pa_xfree(u->user_name);
2349 
2350     pa_xfree(u);
2351 }
2352