1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24
25 #include "restart-module.h"
26
27 #include <unistd.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <sys/types.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33
34 #ifdef HAVE_X11
35 #include <xcb/xcb.h>
36 #endif
37
38 #include <pulse/rtclock.h>
39 #include <pulse/timeval.h>
40 #include <pulse/util.h>
41 #include <pulse/version.h>
42 #include <pulse/xmalloc.h>
43
44 #include <pulsecore/module.h>
45 #include <pulsecore/core-util.h>
46 #include <pulsecore/modargs.h>
47 #include <pulsecore/log.h>
48 #include <pulsecore/core-subscribe.h>
49 #include <pulsecore/pdispatch.h>
50 #include <pulsecore/pstream.h>
51 #include <pulsecore/pstream-util.h>
52 #include <pulsecore/socket-client.h>
53
54 #ifdef USE_SMOOTHER_2
55 #include <pulsecore/time-smoother_2.h>
56 #else
57 #include <pulsecore/time-smoother.h>
58 #endif
59
60 #include <pulsecore/thread.h>
61 #include <pulsecore/thread-mq.h>
62 #include <pulsecore/core-rtclock.h>
63 #include <pulsecore/core-error.h>
64 #include <pulsecore/proplist-util.h>
65 #include <pulsecore/auth-cookie.h>
66 #include <pulsecore/mcalign.h>
67 #include <pulsecore/strlist.h>
68
69 #ifdef HAVE_X11
70 #include <pulsecore/x11prop.h>
71 #endif
72
73 #define ENV_DEFAULT_SINK "PULSE_SINK"
74 #define ENV_DEFAULT_SOURCE "PULSE_SOURCE"
75 #define ENV_DEFAULT_SERVER "PULSE_SERVER"
76 #define ENV_COOKIE_FILE "PULSE_COOKIE"
77
78 #ifdef TUNNEL_SINK
79 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
80 PA_MODULE_USAGE(
81 "sink_name=<name for the local sink> "
82 "sink_properties=<properties for the local sink> "
83 "auto=<determine server/sink/cookie automatically> "
84 "server=<address> "
85 "sink=<remote sink name> "
86 "reconnect_interval_ms=<interval to try reconnects, 0 or omitted if disabled> "
87 "cookie=<filename> "
88 "format=<sample format> "
89 "channels=<number of channels> "
90 "rate=<sample rate> "
91 "latency_msec=<fixed latency in ms> "
92 "channel_map=<channel map>");
93 #else
94 PA_MODULE_DESCRIPTION("Tunnel module for sources");
95 PA_MODULE_USAGE(
96 "source_name=<name for the local source> "
97 "source_properties=<properties for the local source> "
98 "auto=<determine server/source/cookie automatically> "
99 "server=<address> "
100 "source=<remote source name> "
101 "reconnect_interval_ms=<interval to try reconnects, 0 or omitted if disabled> "
102 "cookie=<filename> "
103 "format=<sample format> "
104 "channels=<number of channels> "
105 "rate=<sample rate> "
106 "latency_msec=<fixed latency in ms> "
107 "channel_map=<channel map>");
108 #endif
109
110 PA_MODULE_AUTHOR("Lennart Poettering");
111 PA_MODULE_VERSION(PACKAGE_VERSION);
112 PA_MODULE_LOAD_ONCE(false);
113
114 static const char* const valid_modargs[] = {
115 "auto",
116 "server",
117 "cookie",
118 "format",
119 "channels",
120 "rate",
121 "latency_msec",
122 "reconnect_interval_ms",
123 #ifdef TUNNEL_SINK
124 "sink_name",
125 "sink_properties",
126 "sink",
127 #else
128 "source_name",
129 "source_properties",
130 "source",
131 #endif
132 "channel_map",
133 NULL,
134 };
135
136 #define DEFAULT_TIMEOUT 5
137
138 #define LATENCY_INTERVAL (1*PA_USEC_PER_SEC)
139
140 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
141
142 #ifdef TUNNEL_SINK
143
144 enum {
145 SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
146 SINK_MESSAGE_REMOTE_SUSPEND,
147 SINK_MESSAGE_UPDATE_LATENCY,
148 SINK_MESSAGE_GET_LATENCY_SNAPSHOT,
149 SINK_MESSAGE_POST,
150 };
151
152 #define DEFAULT_LATENCY_MSEC 100
153
154 #else
155
156 enum {
157 SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
158 SOURCE_MESSAGE_REMOTE_SUSPEND,
159 SOURCE_MESSAGE_UPDATE_LATENCY,
160 SOURCE_MESSAGE_GET_LATENCY_SNAPSHOT,
161 };
162
163 #define DEFAULT_LATENCY_MSEC 25
164
165 #endif
166
167 struct tunnel_msg {
168 pa_msgobject parent;
169 };
170
171 typedef struct tunnel_msg tunnel_msg;
172 PA_DEFINE_PRIVATE_CLASS(tunnel_msg, pa_msgobject);
173
174 enum {
175 TUNNEL_MESSAGE_MAYBE_RESTART,
176 };
177
178 static int do_init(pa_module *m);
179 static void do_done(pa_module *m);
180
181 #ifdef TUNNEL_SINK
182 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
183 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
184 #endif
185 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
186 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
187 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
188 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
189 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
190 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
191 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
192
193 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
194 #ifdef TUNNEL_SINK
195 [PA_COMMAND_REQUEST] = command_request,
196 [PA_COMMAND_STARTED] = command_started,
197 #endif
198 [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
199 [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
200 [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
201 [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
202 [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
203 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
204 [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
205 [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
206 [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
207 [PA_COMMAND_PLAYBACK_STREAM_EVENT] = command_stream_or_client_event,
208 [PA_COMMAND_RECORD_STREAM_EVENT] = command_stream_or_client_event,
209 [PA_COMMAND_CLIENT_EVENT] = command_stream_or_client_event,
210 [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed,
211 [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed,
212 [PA_COMMAND_UNDERFLOW_OHOS] = command_overflow_or_underflow,
213 };
214
215 struct userdata {
216 pa_core *core;
217 pa_module *module;
218
219 pa_thread_mq thread_mq;
220 pa_rtpoll *rtpoll;
221 pa_thread *thread;
222
223 pa_socket_client *client;
224 pa_pstream *pstream;
225 pa_pdispatch *pdispatch;
226
227 char *server_name;
228 #ifdef TUNNEL_SINK
229 char *sink_name;
230 char *configured_sink_name;
231 pa_sink *sink;
232 size_t requested_bytes;
233 #else
234 char *source_name;
235 char *configured_source_name;
236 pa_source *source;
237 pa_mcalign *mcalign;
238 #endif
239
240 pa_auth_cookie *auth_cookie;
241
242 uint32_t version;
243 uint32_t ctag;
244 uint32_t device_index;
245 uint32_t channel;
246 uint32_t latency;
247
248 int64_t counter;
249 uint64_t receive_counter;
250 uint64_t receive_snapshot;
251
252 bool remote_corked:1;
253 bool remote_suspended:1;
254 bool shutting_down:1;
255
256 pa_usec_t transport_usec; /* maintained in the main thread */
257 pa_usec_t thread_transport_usec; /* maintained in the IO thread */
258
259 uint32_t ignore_latency_before;
260
261 pa_time_event *time_event;
262
263 #ifdef USE_SMOOTHER_2
264 pa_smoother_2 *smoother;
265 #else
266 pa_smoother *smoother;
267 #endif
268
269 char *device_description;
270 char *server_fqdn;
271 char *user_name;
272
273 uint32_t maxlength;
274 #ifdef TUNNEL_SINK
275 uint32_t tlength;
276 uint32_t minreq;
277 uint32_t prebuf;
278
279 pa_proplist *sink_proplist;
280 #else
281 uint32_t fragsize;
282
283 pa_proplist *source_proplist;
284 #endif
285
286 pa_sample_spec sample_spec;
287 pa_channel_map channel_map;
288
289 tunnel_msg *msg;
290
291 pa_iochannel *io;
292
293 pa_usec_t reconnect_interval_us;
294 pa_usec_t snapshot_time;
295 };
296
297 struct module_restart_data {
298 struct userdata *userdata;
299 pa_restart_data *restart_data;
300 };
301
302 static void request_latency(struct userdata *u);
303 #ifdef TUNNEL_SINK
304 static void create_sink(struct userdata *u);
305 static void on_sink_created(struct userdata *u);
306 #else
307 static void create_source(struct userdata *u);
308 static void on_source_created(struct userdata *u);
309 #endif
310
311 /* Do a reinit of the module. Note that u will be freed as a result of this
312 * call. */
unload_module(struct module_restart_data * rd)313 static void unload_module(struct module_restart_data *rd) {
314 struct userdata *u = rd->userdata;
315
316 if (rd->restart_data) {
317 pa_log_debug("Restart already pending");
318 return;
319 }
320
321 if (u->reconnect_interval_us > 0) {
322 /* The handle returned here must be freed when do_init() was successful and when the
323 * module exits. */
324 rd->restart_data = pa_restart_module_reinit(u->module, do_init, do_done, u->reconnect_interval_us);
325 } else
326 pa_module_unload_request(u->module, true);
327 }
328
329 /* Called from main context */
command_stream_or_client_event(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)330 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
331 pa_log_debug("Got stream or client event.");
332 }
333
334 /* Called from main context */
command_stream_killed(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)335 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
336 struct userdata *u = userdata;
337
338 pa_assert(pd);
339 pa_assert(t);
340 pa_assert(u);
341 pa_assert(u->pdispatch == pd);
342
343 pa_log_warn("Stream killed");
344 unload_module(u->module->userdata);
345 }
346
347 /* Called from main context */
command_overflow_or_underflow(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)348 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
349 struct userdata *u = userdata;
350
351 pa_assert(pd);
352 pa_assert(t);
353 pa_assert(u);
354 pa_assert(u->pdispatch == pd);
355
356 pa_log_info("Server signalled buffer overrun/underrun.");
357 request_latency(u);
358 }
359
360 /* Called from main context */
command_suspended(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)361 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
362 struct userdata *u = userdata;
363 uint32_t channel;
364 bool suspended;
365
366 pa_assert(pd);
367 pa_assert(t);
368 pa_assert(u);
369 pa_assert(u->pdispatch == pd);
370
371 if (pa_tagstruct_getu32(t, &channel) < 0 ||
372 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
373 !pa_tagstruct_eof(t)) {
374
375 pa_log("Invalid packet.");
376 unload_module(u->module->userdata);
377 return;
378 }
379
380 pa_log_debug("Server reports device suspend.");
381
382 #ifdef TUNNEL_SINK
383 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(suspended), 0, NULL);
384 #else
385 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(suspended), 0, NULL);
386 #endif
387
388 request_latency(u);
389 }
390
391 /* Called from main context */
command_moved(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)392 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
393 struct userdata *u = userdata;
394 uint32_t channel, di;
395 const char *dn;
396 bool suspended;
397
398 pa_assert(pd);
399 pa_assert(t);
400 pa_assert(u);
401 pa_assert(u->pdispatch == pd);
402
403 if (pa_tagstruct_getu32(t, &channel) < 0 ||
404 pa_tagstruct_getu32(t, &di) < 0 ||
405 pa_tagstruct_gets(t, &dn) < 0 ||
406 pa_tagstruct_get_boolean(t, &suspended) < 0) {
407
408 pa_log_error("Invalid packet.");
409 unload_module(u->module->userdata);
410 return;
411 }
412
413 pa_log_debug("Server reports a stream move.");
414
415 #ifdef TUNNEL_SINK
416 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(suspended), 0, NULL);
417 #else
418 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(suspended), 0, NULL);
419 #endif
420
421 request_latency(u);
422 }
423
command_stream_buffer_attr_changed(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)424 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
425 struct userdata *u = userdata;
426 uint32_t channel, maxlength, tlength = 0, fragsize, prebuf, minreq;
427 pa_usec_t usec;
428
429 pa_assert(pd);
430 pa_assert(t);
431 pa_assert(u);
432 pa_assert(u->pdispatch == pd);
433
434 if (pa_tagstruct_getu32(t, &channel) < 0 ||
435 pa_tagstruct_getu32(t, &maxlength) < 0) {
436
437 pa_log_error("Invalid packet.");
438 unload_module(u->module->userdata);
439 return;
440 }
441
442 if (command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED) {
443 if (pa_tagstruct_getu32(t, &fragsize) < 0 ||
444 pa_tagstruct_get_usec(t, &usec) < 0) {
445
446 pa_log_error("Invalid packet.");
447 unload_module(u->module->userdata);
448 return;
449 }
450 } else {
451 if (pa_tagstruct_getu32(t, &tlength) < 0 ||
452 pa_tagstruct_getu32(t, &prebuf) < 0 ||
453 pa_tagstruct_getu32(t, &minreq) < 0 ||
454 pa_tagstruct_get_usec(t, &usec) < 0) {
455
456 pa_log_error("Invalid packet.");
457 unload_module(u->module->userdata);
458 return;
459 }
460 }
461
462 #ifdef TUNNEL_SINK
463 pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength, (unsigned long) u->tlength);
464 #endif
465
466 request_latency(u);
467 }
468
469 #ifdef TUNNEL_SINK
470
471 /* Called from main context */
command_started(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)472 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
473 struct userdata *u = userdata;
474
475 pa_assert(pd);
476 pa_assert(t);
477 pa_assert(u);
478 pa_assert(u->pdispatch == pd);
479
480 pa_log_debug("Server reports playback started.");
481 request_latency(u);
482 }
483
484 #endif
485
486 /* Called from IO thread context */
check_smoother_status(struct userdata * u,bool past)487 static void check_smoother_status(struct userdata *u, bool past) {
488 pa_usec_t x;
489
490 pa_assert(u);
491
492 x = pa_rtclock_now();
493
494 /* Correct by the time the requested issued needs to travel to the
495 * other side. This is a valid thread-safe access, because the
496 * main thread is waiting for us */
497
498 if (past)
499 x -= u->thread_transport_usec;
500 else
501 x += u->thread_transport_usec;
502
503 if (u->remote_suspended || u->remote_corked)
504 #ifdef USE_SMOOTHER_2
505 pa_smoother_2_pause(u->smoother, x);
506 else
507 pa_smoother_2_resume(u->smoother, x);
508 #else
509 pa_smoother_pause(u->smoother, x);
510 else
511 pa_smoother_resume(u->smoother, x, true);
512 #endif
513 }
514
515 /* Called from IO thread context */
stream_cork_within_thread(struct userdata * u,bool cork)516 static void stream_cork_within_thread(struct userdata *u, bool cork) {
517 pa_assert(u);
518
519 if (u->remote_corked == cork)
520 return;
521
522 u->remote_corked = cork;
523 check_smoother_status(u, false);
524 }
525
526 /* Called from main context */
stream_cork(struct userdata * u,bool cork)527 static void stream_cork(struct userdata *u, bool cork) {
528 pa_tagstruct *t;
529 pa_assert(u);
530
531 if (!u->pstream)
532 return;
533
534 t = pa_tagstruct_new();
535 #ifdef TUNNEL_SINK
536 pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
537 #else
538 pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
539 #endif
540 pa_tagstruct_putu32(t, u->ctag++);
541 pa_tagstruct_putu32(t, u->channel);
542 pa_tagstruct_put_boolean(t, cork);
543 pa_pstream_send_tagstruct(u->pstream, t);
544
545 request_latency(u);
546 }
547
548 /* Called from IO thread context */
stream_suspend_within_thread(struct userdata * u,bool suspend)549 static void stream_suspend_within_thread(struct userdata *u, bool suspend) {
550 pa_assert(u);
551
552 if (u->remote_suspended == suspend)
553 return;
554
555 u->remote_suspended = suspend;
556 check_smoother_status(u, true);
557 }
558
559 #ifdef TUNNEL_SINK
560
561 /* Called from IO thread context */
send_data(struct userdata * u)562 static void send_data(struct userdata *u) {
563 pa_assert(u);
564
565 while (u->requested_bytes > 0) {
566 pa_memchunk memchunk;
567
568 pa_sink_render(u->sink, u->requested_bytes, &memchunk);
569 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
570 pa_memblock_unref(memchunk.memblock);
571
572 u->requested_bytes -= memchunk.length;
573
574 u->counter += (int64_t) memchunk.length;
575 }
576 }
577
578 /* This function is called from IO context -- except when it is not. */
sink_process_msg(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)579 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
580 struct userdata *u = PA_SINK(o)->userdata;
581
582 switch (code) {
583
584 case PA_SINK_MESSAGE_SET_STATE: {
585 int r;
586
587 /* First, change the state, because otherwise pa_sink_render() would fail */
588 if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
589
590 stream_cork_within_thread(u, u->sink->thread_info.state == PA_SINK_SUSPENDED);
591
592 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
593 send_data(u);
594 }
595
596 return r;
597 }
598
599 case PA_SINK_MESSAGE_GET_LATENCY: {
600 int64_t *usec = data;
601
602 #ifdef USE_SMOOTHER_2
603 *usec = pa_smoother_2_get_delay(u->smoother, pa_rtclock_now(), u->counter);
604 #else
605 pa_usec_t yl, yr;
606
607 yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
608 yr = pa_smoother_get(u->smoother, pa_rtclock_now());
609
610 *usec = (int64_t)yl - yr;
611 #endif
612 return 0;
613 }
614
615 case SINK_MESSAGE_GET_LATENCY_SNAPSHOT: {
616 int64_t *send_counter = data;
617
618 *send_counter = u->counter;
619 return 0;
620 }
621
622 case SINK_MESSAGE_REQUEST:
623
624 pa_assert(offset > 0);
625 u->requested_bytes += (size_t) offset;
626
627 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
628 send_data(u);
629
630 return 0;
631
632 case SINK_MESSAGE_REMOTE_SUSPEND:
633
634 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
635 return 0;
636
637 case SINK_MESSAGE_UPDATE_LATENCY: {
638 #ifdef USE_SMOOTHER_2
639 int64_t bytes;
640
641 if (offset < 0)
642 bytes = - pa_usec_to_bytes(- offset, &u->sink->sample_spec);
643 else
644 bytes = pa_usec_to_bytes(offset, &u->sink->sample_spec);
645
646 if (u->counter > bytes)
647 bytes = u->counter - bytes;
648 else
649 bytes = 0;
650
651 /* We may use u->snapshot time because the main thread is waiting */
652 pa_smoother_2_put(u->smoother, u->snapshot_time, bytes);
653 #else
654 pa_usec_t y;
655
656 y = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
657
658 if (y > (pa_usec_t) offset)
659 y -= (pa_usec_t) offset;
660 else
661 y = 0;
662
663 /* We may use u->snapshot time because the main thread is waiting */
664 pa_smoother_put(u->smoother, u->snapshot_time, y);
665 #endif
666
667 /* We can access this freely here, since the main thread is waiting for us */
668 u->thread_transport_usec = u->transport_usec;
669
670 return 0;
671 }
672
673 case SINK_MESSAGE_POST:
674
675 /* OK, This might be a bit confusing. This message is
676 * delivered to us from the main context -- NOT from the
677 * IO thread context where the rest of the messages are
678 * dispatched. Yeah, ugly, but I am a lazy bastard. */
679
680 pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
681
682 u->receive_counter += chunk->length;
683
684 return 0;
685 }
686
687 return pa_sink_process_msg(o, code, data, offset, chunk);
688 }
689
690 /* Called from main context */
sink_set_state_in_main_thread_cb(pa_sink * s,pa_sink_state_t state,pa_suspend_cause_t suspend_cause)691 static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) {
692 struct userdata *u;
693 pa_sink_assert_ref(s);
694 u = s->userdata;
695
696 /* It may be that only the suspend cause is changing, in which
697 * case there's nothing to do. */
698 if (state == s->state)
699 return 0;
700
701 switch ((pa_sink_state_t) state) {
702
703 case PA_SINK_SUSPENDED:
704 pa_assert(PA_SINK_IS_OPENED(s->state));
705 stream_cork(u, true);
706 break;
707
708 case PA_SINK_IDLE:
709 case PA_SINK_RUNNING:
710 if (s->state == PA_SINK_SUSPENDED)
711 stream_cork(u, false);
712 break;
713
714 case PA_SINK_UNLINKED:
715 case PA_SINK_INIT:
716 case PA_SINK_INVALID_STATE:
717 ;
718 }
719
720 return 0;
721 }
722
723 #else
724
725 /* This function is called from IO context -- except when it is not. */
source_process_msg(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)726 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
727 struct userdata *u = PA_SOURCE(o)->userdata;
728
729 switch (code) {
730
731 case PA_SOURCE_MESSAGE_SET_STATE: {
732 int r;
733
734 if ((r = pa_source_process_msg(o, code, data, offset, chunk)) >= 0)
735 stream_cork_within_thread(u, u->source->thread_info.state == PA_SOURCE_SUSPENDED);
736
737 return r;
738 }
739
740 case PA_SOURCE_MESSAGE_GET_LATENCY: {
741 int64_t *usec = data;
742
743 #ifdef USE_SMOOTHER_2
744 *usec = - pa_smoother_2_get_delay(u->smoother, pa_rtclock_now(), u->counter);
745 #else
746 pa_usec_t yr, yl;
747
748 yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SOURCE(o)->sample_spec);
749 yr = pa_smoother_get(u->smoother, pa_rtclock_now());
750
751 *usec = (int64_t)yr - yl;
752 #endif
753 return 0;
754 }
755
756 case SOURCE_MESSAGE_GET_LATENCY_SNAPSHOT: {
757 int64_t *send_counter = data;
758
759 *send_counter = u->counter;
760 return 0;
761 }
762
763 case SOURCE_MESSAGE_POST: {
764 pa_memchunk c;
765
766 pa_mcalign_push(u->mcalign, chunk);
767
768 while (pa_mcalign_pop(u->mcalign, &c) >= 0) {
769
770 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
771 pa_source_post(u->source, &c);
772
773 pa_memblock_unref(c.memblock);
774
775 u->counter += (int64_t) c.length;
776 }
777
778 return 0;
779 }
780
781 case SOURCE_MESSAGE_REMOTE_SUSPEND:
782
783 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
784 return 0;
785
786 case SOURCE_MESSAGE_UPDATE_LATENCY: {
787 #ifdef USE_SMOOTHER_2
788 int64_t bytes;
789
790 if (offset < 0)
791 bytes = - pa_usec_to_bytes(- offset, &u->source->sample_spec);
792 else
793 bytes = pa_usec_to_bytes(offset, &u->source->sample_spec);
794
795 bytes += u->counter;
796
797 /* We may use u->snapshot time because the main thread is waiting */
798 pa_smoother_2_put(u->smoother, u->snapshot_time, bytes);
799 #else
800 pa_usec_t y;
801
802 y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec);
803 y += offset;
804
805 /* We may use u->snapshot time because the main thread is waiting */
806 pa_smoother_put(u->smoother, u->snapshot_time, y);
807 #endif
808
809 /* We can access this freely here, since the main thread is waiting for us */
810 u->thread_transport_usec = u->transport_usec;
811
812 return 0;
813 }
814 }
815
816 return pa_source_process_msg(o, code, data, offset, chunk);
817 }
818
819 /* Called from main context */
source_set_state_in_main_thread_cb(pa_source * s,pa_source_state_t state,pa_suspend_cause_t suspend_cause)820 static int source_set_state_in_main_thread_cb(pa_source *s, pa_source_state_t state, pa_suspend_cause_t suspend_cause) {
821 struct userdata *u;
822 pa_source_assert_ref(s);
823 u = s->userdata;
824
825 /* It may be that only the suspend cause is changing, in which
826 * case there's nothing to do. */
827 if (state == s->state)
828 return 0;
829
830 switch ((pa_source_state_t) state) {
831
832 case PA_SOURCE_SUSPENDED:
833 pa_assert(PA_SOURCE_IS_OPENED(s->state));
834 stream_cork(u, true);
835 break;
836
837 case PA_SOURCE_IDLE:
838 case PA_SOURCE_RUNNING:
839 if (s->state == PA_SOURCE_SUSPENDED)
840 stream_cork(u, false);
841 break;
842
843 case PA_SOURCE_UNLINKED:
844 case PA_SOURCE_INIT:
845 case PA_SOURCE_INVALID_STATE:
846 ;
847 }
848
849 return 0;
850 }
851
852 #endif
853
thread_func(void * userdata)854 static void thread_func(void *userdata) {
855 struct userdata *u = userdata;
856
857 pa_assert(u);
858
859 pa_log_debug("Thread starting up");
860
861 pa_thread_mq_install(&u->thread_mq);
862
863 for (;;) {
864 int ret;
865
866 #ifdef TUNNEL_SINK
867 if (u->sink && PA_UNLIKELY(u->sink->thread_info.rewind_requested))
868 pa_sink_process_rewind(u->sink, 0);
869 #endif
870
871 if ((ret = pa_rtpoll_run(u->rtpoll)) < 0)
872 goto fail;
873
874 if (ret == 0)
875 goto finish;
876 }
877
878 fail:
879 /* If this was no regular exit from the loop we have to continue
880 * processing messages until we received PA_MESSAGE_SHUTDOWN */
881 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_MAYBE_RESTART, u, 0, NULL, NULL);
882 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
883
884 finish:
885 pa_log_debug("Thread shutting down");
886 }
887
888 #ifdef TUNNEL_SINK
889 /* Called from main context */
command_request(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)890 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
891 struct userdata *u = userdata;
892 uint32_t bytes, channel;
893
894 pa_assert(pd);
895 pa_assert(command == PA_COMMAND_REQUEST);
896 pa_assert(t);
897 pa_assert(u);
898 pa_assert(u->pdispatch == pd);
899
900 if (pa_tagstruct_getu32(t, &channel) < 0 ||
901 pa_tagstruct_getu32(t, &bytes) < 0) {
902 pa_log("Invalid protocol reply");
903 goto fail;
904 }
905
906 if (channel != u->channel) {
907 pa_log("Received data for invalid channel");
908 goto fail;
909 }
910
911 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
912 return;
913
914 fail:
915 unload_module(u->module->userdata);
916 }
917
918 #endif
919
920 /* Called from main context */
stream_get_latency_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)921 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
922 struct userdata *u = userdata;
923 pa_usec_t sink_usec, source_usec;
924 bool playing;
925 int64_t write_index, read_index;
926 struct timeval local, remote, now;
927 pa_sample_spec *ss;
928 int64_t delay;
929 #ifdef TUNNEL_SINK
930 uint64_t send_counter;
931 #endif
932
933 pa_assert(pd);
934 pa_assert(u);
935
936 if (command != PA_COMMAND_REPLY) {
937 if (command == PA_COMMAND_ERROR)
938 pa_log("Failed to get latency.");
939 else
940 pa_log("Protocol error.");
941 goto fail;
942 }
943
944 if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
945 pa_tagstruct_get_usec(t, &source_usec) < 0 ||
946 pa_tagstruct_get_boolean(t, &playing) < 0 ||
947 pa_tagstruct_get_timeval(t, &local) < 0 ||
948 pa_tagstruct_get_timeval(t, &remote) < 0 ||
949 pa_tagstruct_gets64(t, &write_index) < 0 ||
950 pa_tagstruct_gets64(t, &read_index) < 0) {
951 pa_log("Invalid reply.");
952 goto fail;
953 }
954
955 #ifdef TUNNEL_SINK
956 if (u->version >= 13) {
957 uint64_t underrun_for = 0, playing_for = 0;
958
959 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
960 pa_tagstruct_getu64(t, &playing_for) < 0) {
961 pa_log("Invalid reply.");
962 goto fail;
963 }
964 }
965 #endif
966
967 if (!pa_tagstruct_eof(t)) {
968 pa_log("Invalid reply.");
969 goto fail;
970 }
971
972 if (tag < u->ignore_latency_before) {
973 return;
974 }
975
976 pa_gettimeofday(&now);
977
978 /* Calculate transport usec */
979 if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now) < 0) {
980 /* local and remote seem to have synchronized clocks */
981 #ifdef TUNNEL_SINK
982 u->transport_usec = pa_timeval_diff(&remote, &local);
983 #else
984 u->transport_usec = pa_timeval_diff(&now, &remote);
985 #endif
986 } else
987 u->transport_usec = pa_timeval_diff(&now, &local)/2;
988
989 /* First, take the device's delay */
990 #ifdef TUNNEL_SINK
991 delay = (int64_t) sink_usec;
992 ss = &u->sink->sample_spec;
993 #else
994 delay = (int64_t) source_usec;
995 ss = &u->source->sample_spec;
996 #endif
997
998 /* Add the length of our server-side buffer */
999 if (write_index >= read_index)
1000 delay += (int64_t) pa_bytes_to_usec((uint64_t) (write_index-read_index), ss);
1001 else
1002 delay -= (int64_t) pa_bytes_to_usec((uint64_t) (read_index-write_index), ss);
1003
1004 /* Our measurements are already out of date, hence correct by the *
1005 * transport latency */
1006 #ifdef TUNNEL_SINK
1007 delay -= (int64_t) u->transport_usec;
1008 #else
1009 delay += (int64_t) u->transport_usec;
1010 #endif
1011
1012 /* Now correct by what we have have written since we requested the update. This
1013 * is not necessary for the source, because if data is received between request
1014 * and reply, it was already posted before we requested the source latency. */
1015 #ifdef TUNNEL_SINK
1016 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_GET_LATENCY_SNAPSHOT, &send_counter, 0, NULL);
1017 delay += (int64_t) pa_bytes_to_usec(send_counter - u->receive_snapshot, ss);
1018 #endif
1019
1020 /* It may take some time before the async message is executed, so we take a timestamp here */
1021 u->snapshot_time = pa_rtclock_now();
1022
1023 #ifdef TUNNEL_SINK
1024 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
1025 #else
1026 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
1027 #endif
1028
1029 return;
1030
1031 fail:
1032
1033 unload_module(u->module->userdata);
1034 }
1035
1036 /* Called from main context */
request_latency(struct userdata * u)1037 static void request_latency(struct userdata *u) {
1038 pa_tagstruct *t;
1039 struct timeval now;
1040 uint32_t tag;
1041 pa_assert(u);
1042
1043 t = pa_tagstruct_new();
1044 #ifdef TUNNEL_SINK
1045 pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
1046 #else
1047 pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
1048 #endif
1049 pa_tagstruct_putu32(t, tag = u->ctag++);
1050 pa_tagstruct_putu32(t, u->channel);
1051
1052 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1053
1054 pa_pstream_send_tagstruct(u->pstream, t);
1055 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
1056
1057 u->ignore_latency_before = tag;
1058 u->receive_snapshot = u->receive_counter;
1059 }
1060
1061 /* Called from main context */
timeout_callback(pa_mainloop_api * m,pa_time_event * e,const struct timeval * t,void * userdata)1062 static void timeout_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
1063 struct userdata *u = userdata;
1064
1065 pa_assert(m);
1066 pa_assert(e);
1067 pa_assert(u);
1068
1069 request_latency(u);
1070
1071 pa_core_rttime_restart(u->core, e, pa_rtclock_now() + LATENCY_INTERVAL);
1072 }
1073
1074 /* Called from main context */
update_description(struct userdata * u)1075 static void update_description(struct userdata *u) {
1076 char *d;
1077 char un[128], hn[128];
1078 pa_tagstruct *t;
1079
1080 pa_assert(u);
1081
1082 if (!u->server_fqdn || !u->user_name || !u->device_description)
1083 return;
1084
1085 d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
1086
1087 #ifdef TUNNEL_SINK
1088 pa_sink_set_description(u->sink, d);
1089 pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
1090 pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
1091 pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
1092 #else
1093 pa_source_set_description(u->source, d);
1094 pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
1095 pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
1096 pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
1097 #endif
1098
1099 pa_xfree(d);
1100
1101 d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
1102 pa_get_user_name(un, sizeof(un)),
1103 pa_get_host_name(hn, sizeof(hn)));
1104
1105 t = pa_tagstruct_new();
1106 #ifdef TUNNEL_SINK
1107 pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
1108 #else
1109 pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
1110 #endif
1111 pa_tagstruct_putu32(t, u->ctag++);
1112 pa_tagstruct_putu32(t, u->channel);
1113 pa_tagstruct_puts(t, d);
1114 pa_pstream_send_tagstruct(u->pstream, t);
1115
1116 pa_xfree(d);
1117 }
1118
1119 /* Called from main context */
server_info_cb(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1120 static void server_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1121 struct userdata *u = userdata;
1122 pa_sample_spec ss;
1123 pa_channel_map cm;
1124 const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
1125 uint32_t cookie;
1126
1127 pa_assert(pd);
1128 pa_assert(u);
1129
1130 if (command != PA_COMMAND_REPLY) {
1131 if (command == PA_COMMAND_ERROR)
1132 pa_log("Failed to get info.");
1133 else
1134 pa_log("Protocol error.");
1135 goto fail;
1136 }
1137
1138 if (pa_tagstruct_gets(t, &server_name) < 0 ||
1139 pa_tagstruct_gets(t, &server_version) < 0 ||
1140 pa_tagstruct_gets(t, &user_name) < 0 ||
1141 pa_tagstruct_gets(t, &host_name) < 0 ||
1142 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1143 pa_tagstruct_gets(t, &default_sink_name) < 0 ||
1144 pa_tagstruct_gets(t, &default_source_name) < 0 ||
1145 pa_tagstruct_getu32(t, &cookie) < 0 ||
1146 (u->version >= 15 && pa_tagstruct_get_channel_map(t, &cm) < 0)) {
1147
1148 pa_log("Parse failure");
1149 goto fail;
1150 }
1151
1152 if (!pa_tagstruct_eof(t)) {
1153 pa_log("Packet too long");
1154 goto fail;
1155 }
1156
1157 pa_xfree(u->server_fqdn);
1158 u->server_fqdn = pa_xstrdup(host_name);
1159
1160 pa_xfree(u->user_name);
1161 u->user_name = pa_xstrdup(user_name);
1162
1163 update_description(u);
1164
1165 return;
1166
1167 fail:
1168 unload_module(u->module->userdata);
1169 }
1170
read_ports(struct userdata * u,pa_tagstruct * t)1171 static int read_ports(struct userdata *u, pa_tagstruct *t) {
1172 if (u->version >= 16) {
1173 uint32_t n_ports;
1174 const char *s;
1175
1176 if (pa_tagstruct_getu32(t, &n_ports)) {
1177 pa_log("Parse failure");
1178 return -PA_ERR_PROTOCOL;
1179 }
1180
1181 for (uint32_t j = 0; j < n_ports; j++) {
1182 uint32_t priority;
1183
1184 if (pa_tagstruct_gets(t, &s) < 0 || /* name */
1185 pa_tagstruct_gets(t, &s) < 0 || /* description */
1186 pa_tagstruct_getu32(t, &priority) < 0) {
1187
1188 pa_log("Parse failure");
1189 return -PA_ERR_PROTOCOL;
1190 }
1191 if (u->version >= 24) {
1192 if (pa_tagstruct_getu32(t, &priority) < 0) { /* available */
1193 pa_log("Parse failure");
1194 return -PA_ERR_PROTOCOL;
1195 }
1196 if (u->version >= 34 &&
1197 (pa_tagstruct_gets(t, &s) < 0 || /* availability group */
1198 pa_tagstruct_getu32(t, &priority) < 0)) { /* device port type */
1199 pa_log("Parse failure");
1200 return -PA_ERR_PROTOCOL;
1201 }
1202 }
1203 }
1204
1205 if (pa_tagstruct_gets(t, &s) < 0) { /* active port */
1206 pa_log("Parse failure");
1207 return -PA_ERR_PROTOCOL;
1208 }
1209 }
1210 return 0;
1211 }
1212
read_formats(struct userdata * u,pa_tagstruct * t)1213 static int read_formats(struct userdata *u, pa_tagstruct *t) {
1214 uint8_t n_formats;
1215 pa_format_info *format;
1216
1217 if (pa_tagstruct_getu8(t, &n_formats) < 0) { /* no. of formats */
1218 pa_log("Parse failure");
1219 return -PA_ERR_PROTOCOL;
1220 }
1221
1222 for (uint8_t j = 0; j < n_formats; j++) {
1223 format = pa_format_info_new();
1224 if (pa_tagstruct_get_format_info(t, format)) { /* format info */
1225 pa_format_info_free(format);
1226 pa_log("Parse failure");
1227 return -PA_ERR_PROTOCOL;
1228 }
1229 pa_format_info_free(format);
1230 }
1231 return 0;
1232 }
1233
1234 #ifdef TUNNEL_SINK
1235
1236 /* Called from main context */
sink_info_cb(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1237 static void sink_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1238 struct userdata *u = userdata;
1239 uint32_t idx, owner_module, monitor_source, flags;
1240 const char *name, *description, *monitor_source_name, *driver;
1241 pa_sample_spec ss;
1242 pa_channel_map cm;
1243 pa_cvolume volume;
1244 bool mute;
1245 pa_usec_t latency;
1246
1247 pa_assert(pd);
1248 pa_assert(u);
1249
1250 if (command != PA_COMMAND_REPLY) {
1251 if (command == PA_COMMAND_ERROR)
1252 pa_log("Failed to get info.");
1253 else
1254 pa_log("Protocol error.");
1255 goto fail;
1256 }
1257
1258 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1259 pa_tagstruct_gets(t, &name) < 0 ||
1260 pa_tagstruct_gets(t, &description) < 0 ||
1261 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1262 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1263 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1264 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1265 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1266 pa_tagstruct_getu32(t, &monitor_source) < 0 ||
1267 pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
1268 pa_tagstruct_get_usec(t, &latency) < 0 ||
1269 pa_tagstruct_gets(t, &driver) < 0 ||
1270 pa_tagstruct_getu32(t, &flags) < 0) {
1271
1272 pa_log("Parse failure");
1273 goto fail;
1274 }
1275
1276 if (u->version >= 13) {
1277 pa_usec_t configured_latency;
1278
1279 if (pa_tagstruct_get_proplist(t, NULL) < 0 ||
1280 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1281
1282 pa_log("Parse failure");
1283 goto fail;
1284 }
1285 }
1286
1287 if (u->version >= 15) {
1288 pa_volume_t base_volume;
1289 uint32_t state, n_volume_steps, card;
1290
1291 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1292 pa_tagstruct_getu32(t, &state) < 0 ||
1293 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1294 pa_tagstruct_getu32(t, &card) < 0) {
1295
1296 pa_log("Parse failure");
1297 goto fail;
1298 }
1299 }
1300
1301 if (read_ports(u, t) < 0)
1302 goto fail;
1303
1304 if (u->version >= 21 && read_formats(u, t) < 0)
1305 goto fail;
1306
1307 if (!pa_tagstruct_eof(t)) {
1308 pa_log("Packet too long");
1309 goto fail;
1310 }
1311
1312 if (!u->sink_name || !pa_streq(name, u->sink_name))
1313 return;
1314
1315 pa_xfree(u->device_description);
1316 u->device_description = pa_xstrdup(description);
1317
1318 update_description(u);
1319
1320 return;
1321
1322 fail:
1323 unload_module(u->module->userdata);
1324 }
1325
1326 /* Called from main context */
sink_input_info_cb(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1327 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1328 struct userdata *u = userdata;
1329 uint32_t idx, owner_module, client, sink;
1330 pa_usec_t buffer_usec, sink_usec;
1331 const char *name, *driver, *resample_method;
1332 bool mute = false;
1333 pa_sample_spec sample_spec;
1334 pa_channel_map channel_map;
1335 pa_cvolume volume;
1336 bool b;
1337
1338 pa_assert(pd);
1339 pa_assert(u);
1340
1341 if (command != PA_COMMAND_REPLY) {
1342 if (command == PA_COMMAND_ERROR)
1343 pa_log("Failed to get info.");
1344 else
1345 pa_log("Protocol error.");
1346 goto fail;
1347 }
1348
1349 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1350 pa_tagstruct_gets(t, &name) < 0 ||
1351 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1352 pa_tagstruct_getu32(t, &client) < 0 ||
1353 pa_tagstruct_getu32(t, &sink) < 0 ||
1354 pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1355 pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1356 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1357 pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1358 pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1359 pa_tagstruct_gets(t, &resample_method) < 0 ||
1360 pa_tagstruct_gets(t, &driver) < 0) {
1361
1362 pa_log("Parse failure");
1363 goto fail;
1364 }
1365
1366 if (u->version >= 11) {
1367 if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1368
1369 pa_log("Parse failure");
1370 goto fail;
1371 }
1372 }
1373
1374 if (u->version >= 13) {
1375 if (pa_tagstruct_get_proplist(t, NULL) < 0) {
1376
1377 pa_log("Parse failure");
1378 goto fail;
1379 }
1380 }
1381
1382 if (u->version >= 19) {
1383 if (pa_tagstruct_get_boolean(t, &b) < 0) {
1384
1385 pa_log("Parse failure");
1386 goto fail;
1387 }
1388 }
1389
1390 if (u->version >= 20) {
1391 if (pa_tagstruct_get_boolean(t, &b) < 0 ||
1392 pa_tagstruct_get_boolean(t, &b) < 0) {
1393
1394 pa_log("Parse failure");
1395 goto fail;
1396 }
1397 }
1398
1399 if (u->version >= 21) {
1400 pa_format_info *format = pa_format_info_new();
1401
1402 if (pa_tagstruct_get_format_info(t, format) < 0) {
1403 pa_format_info_free(format);
1404 pa_log("Parse failure");
1405 goto fail;
1406 }
1407 pa_format_info_free(format);
1408 }
1409
1410 if (!pa_tagstruct_eof(t)) {
1411 pa_log("Packet too long");
1412 goto fail;
1413 }
1414
1415 if (idx != u->device_index)
1416 return;
1417
1418 pa_assert(u->sink);
1419
1420 if ((u->version < 11 || mute == u->sink->muted) &&
1421 pa_cvolume_equal(&volume, &u->sink->real_volume))
1422 return;
1423
1424 pa_sink_volume_changed(u->sink, &volume);
1425
1426 if (u->version >= 11)
1427 pa_sink_mute_changed(u->sink, mute);
1428
1429 return;
1430
1431 fail:
1432 unload_module(u->module->userdata);
1433 }
1434
1435 #else
1436
1437 /* Called from main context */
source_info_cb(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1438 static void source_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1439 struct userdata *u = userdata;
1440 uint32_t idx, owner_module, monitor_of_sink, flags;
1441 const char *name, *description, *monitor_of_sink_name, *driver;
1442 pa_sample_spec ss;
1443 pa_channel_map cm;
1444 pa_cvolume volume;
1445 bool mute;
1446 pa_usec_t latency, configured_latency;
1447
1448 pa_assert(pd);
1449 pa_assert(u);
1450
1451 if (command != PA_COMMAND_REPLY) {
1452 if (command == PA_COMMAND_ERROR)
1453 pa_log("Failed to get info.");
1454 else
1455 pa_log("Protocol error.");
1456 goto fail;
1457 }
1458
1459 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1460 pa_tagstruct_gets(t, &name) < 0 ||
1461 pa_tagstruct_gets(t, &description) < 0 ||
1462 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1463 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1464 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1465 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1466 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1467 pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1468 pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1469 pa_tagstruct_get_usec(t, &latency) < 0 ||
1470 pa_tagstruct_gets(t, &driver) < 0 ||
1471 pa_tagstruct_getu32(t, &flags) < 0) {
1472
1473 pa_log("Parse failure");
1474 goto fail;
1475 }
1476
1477 if (u->version >= 13) {
1478 if (pa_tagstruct_get_proplist(t, NULL) < 0 ||
1479 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1480
1481 pa_log("Parse failure");
1482 goto fail;
1483 }
1484 }
1485
1486 if (u->version >= 15) {
1487 pa_volume_t base_volume;
1488 uint32_t state, n_volume_steps, card;
1489
1490 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1491 pa_tagstruct_getu32(t, &state) < 0 ||
1492 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1493 pa_tagstruct_getu32(t, &card) < 0) {
1494
1495 pa_log("Parse failure");
1496 goto fail;
1497 }
1498 }
1499
1500 if (read_ports(u, t) < 0)
1501 goto fail;
1502
1503 if (u->version >= 22 && read_formats(u, t) < 0)
1504 goto fail;
1505
1506 if (!pa_tagstruct_eof(t)) {
1507 pa_log("Packet too long");
1508 goto fail;
1509 }
1510
1511 if (!u->source_name || !pa_streq(name, u->source_name))
1512 return;
1513
1514 pa_xfree(u->device_description);
1515 u->device_description = pa_xstrdup(description);
1516
1517 update_description(u);
1518
1519 return;
1520
1521 fail:
1522 unload_module(u->module->userdata);
1523 }
1524
1525 #endif
1526
1527 /* Called from main context */
request_info(struct userdata * u)1528 static void request_info(struct userdata *u) {
1529 pa_tagstruct *t;
1530 uint32_t tag;
1531 pa_assert(u);
1532
1533 t = pa_tagstruct_new();
1534 pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1535 pa_tagstruct_putu32(t, tag = u->ctag++);
1536 pa_pstream_send_tagstruct(u->pstream, t);
1537 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1538
1539 #ifdef TUNNEL_SINK
1540 t = pa_tagstruct_new();
1541 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1542 pa_tagstruct_putu32(t, tag = u->ctag++);
1543 pa_tagstruct_putu32(t, u->device_index);
1544 pa_pstream_send_tagstruct(u->pstream, t);
1545 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1546
1547 if (u->sink_name) {
1548 t = pa_tagstruct_new();
1549 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1550 pa_tagstruct_putu32(t, tag = u->ctag++);
1551 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1552 pa_tagstruct_puts(t, u->sink_name);
1553 pa_pstream_send_tagstruct(u->pstream, t);
1554 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1555 }
1556 #else
1557 if (u->source_name) {
1558 t = pa_tagstruct_new();
1559 pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1560 pa_tagstruct_putu32(t, tag = u->ctag++);
1561 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1562 pa_tagstruct_puts(t, u->source_name);
1563 pa_pstream_send_tagstruct(u->pstream, t);
1564 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1565 }
1566 #endif
1567 }
1568
1569 /* Called from main context */
command_subscribe_event(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1570 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1571 struct userdata *u = userdata;
1572 pa_subscription_event_type_t e;
1573 uint32_t idx;
1574
1575 pa_assert(pd);
1576 pa_assert(t);
1577 pa_assert(u);
1578 pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1579
1580 if (pa_tagstruct_getu32(t, &e) < 0 ||
1581 pa_tagstruct_getu32(t, &idx) < 0) {
1582 pa_log("Invalid protocol reply");
1583 unload_module(u->module->userdata);
1584 return;
1585 }
1586
1587 if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1588 #ifdef TUNNEL_SINK
1589 e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1590 e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1591 #else
1592 e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1593 #endif
1594 )
1595 return;
1596
1597 request_info(u);
1598 }
1599
1600 /* Called from main context */
start_subscribe(struct userdata * u)1601 static void start_subscribe(struct userdata *u) {
1602 pa_tagstruct *t;
1603 pa_assert(u);
1604
1605 t = pa_tagstruct_new();
1606 pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1607 pa_tagstruct_putu32(t, u->ctag++);
1608 pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1609 #ifdef TUNNEL_SINK
1610 PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1611 #else
1612 PA_SUBSCRIPTION_MASK_SOURCE
1613 #endif
1614 );
1615
1616 pa_pstream_send_tagstruct(u->pstream, t);
1617 }
1618
1619 /* Called from main context */
create_stream_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1620 static void create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1621 struct userdata *u = userdata;
1622 #ifdef TUNNEL_SINK
1623 uint32_t bytes;
1624 #endif
1625
1626 pa_assert(pd);
1627 pa_assert(u);
1628 pa_assert(u->pdispatch == pd);
1629
1630 if (command != PA_COMMAND_REPLY) {
1631 if (command == PA_COMMAND_ERROR)
1632 pa_log("Failed to create stream.");
1633 else
1634 pa_log("Protocol error.");
1635 goto fail;
1636 }
1637
1638 if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1639 pa_tagstruct_getu32(t, &u->device_index) < 0
1640 #ifdef TUNNEL_SINK
1641 || pa_tagstruct_getu32(t, &bytes) < 0
1642 #endif
1643 )
1644 goto parse_error;
1645
1646 if (u->version >= 9) {
1647 #ifdef TUNNEL_SINK
1648 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1649 pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1650 pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1651 pa_tagstruct_getu32(t, &u->minreq) < 0)
1652 goto parse_error;
1653 #else
1654 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1655 pa_tagstruct_getu32(t, &u->fragsize) < 0)
1656 goto parse_error;
1657 #endif
1658 }
1659
1660 if (u->version >= 12) {
1661 pa_sample_spec ss;
1662 pa_channel_map cm;
1663 uint32_t device_index;
1664 const char *dn;
1665 bool suspended;
1666
1667 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1668 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1669 pa_tagstruct_getu32(t, &device_index) < 0 ||
1670 pa_tagstruct_gets(t, &dn) < 0 ||
1671 pa_tagstruct_get_boolean(t, &suspended) < 0)
1672 goto parse_error;
1673
1674 #ifdef TUNNEL_SINK
1675 pa_xfree(u->sink_name);
1676 u->sink_name = pa_xstrdup(dn);
1677 #else
1678 pa_xfree(u->source_name);
1679 u->source_name = pa_xstrdup(dn);
1680 #endif
1681 }
1682
1683 if (u->version >= 13) {
1684 pa_usec_t usec;
1685
1686 if (pa_tagstruct_get_usec(t, &usec) < 0)
1687 goto parse_error;
1688
1689 /* #ifdef TUNNEL_SINK */
1690 /* pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1691 /* #else */
1692 /* pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1693 /* #endif */
1694 }
1695
1696 if (u->version >= 21) {
1697 pa_format_info *format = pa_format_info_new();
1698
1699 if (pa_tagstruct_get_format_info(t, format) < 0) {
1700 pa_format_info_free(format);
1701 goto parse_error;
1702 }
1703
1704 pa_format_info_free(format);
1705 }
1706
1707 if (!pa_tagstruct_eof(t))
1708 goto parse_error;
1709
1710 start_subscribe(u);
1711 request_info(u);
1712
1713 pa_assert(!u->time_event);
1714 u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + LATENCY_INTERVAL, timeout_callback, u);
1715
1716 request_latency(u);
1717
1718 pa_log_debug("Stream created.");
1719
1720 #ifdef TUNNEL_SINK
1721 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1722 #endif
1723
1724 return;
1725
1726 parse_error:
1727 pa_log("Invalid reply. (Create stream)");
1728
1729 fail:
1730 unload_module(u->module->userdata);
1731
1732 }
1733
1734 /* Called from main context */
setup_complete_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1735 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1736 struct userdata *u = userdata;
1737 pa_tagstruct *reply;
1738 char name[256], un[128], hn[128];
1739 pa_cvolume volume;
1740
1741 pa_assert(pd);
1742 pa_assert(u);
1743 pa_assert(u->pdispatch == pd);
1744
1745 if (command != PA_COMMAND_REPLY ||
1746 pa_tagstruct_getu32(t, &u->version) < 0 ||
1747 !pa_tagstruct_eof(t)) {
1748
1749 if (command == PA_COMMAND_ERROR)
1750 pa_log("Failed to authenticate");
1751 else
1752 pa_log("Protocol error.");
1753
1754 goto fail;
1755 }
1756
1757 /* Minimum supported protocol version */
1758 if (u->version < 8) {
1759 pa_log("Incompatible protocol version");
1760 goto fail;
1761 }
1762
1763 /* Starting with protocol version 13 the MSB of the version tag
1764 reflects if shm is enabled for this connection or not. We don't
1765 support SHM here at all, so we just ignore this. */
1766
1767 if (u->version >= 13)
1768 u->version &= 0x7FFFFFFFU;
1769
1770 pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1771
1772 #ifdef TUNNEL_SINK
1773 pa_proplist_setf(u->sink->proplist, "tunnel.remote_version", "%u", u->version);
1774 pa_sink_update_proplist(u->sink, 0, NULL);
1775
1776 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1777 u->sink_name,
1778 pa_get_user_name(un, sizeof(un)),
1779 pa_get_host_name(hn, sizeof(hn)));
1780 #else
1781 pa_proplist_setf(u->source->proplist, "tunnel.remote_version", "%u", u->version);
1782 pa_source_update_proplist(u->source, 0, NULL);
1783
1784 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1785 u->source_name,
1786 pa_get_user_name(un, sizeof(un)),
1787 pa_get_host_name(hn, sizeof(hn)));
1788 #endif
1789
1790 reply = pa_tagstruct_new();
1791 pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1792 pa_tagstruct_putu32(reply, u->ctag++);
1793
1794 if (u->version >= 13) {
1795 pa_proplist *pl;
1796 pl = pa_proplist_new();
1797 pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1798 pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1799 pa_init_proplist(pl);
1800 pa_tagstruct_put_proplist(reply, pl);
1801 pa_proplist_free(pl);
1802 } else
1803 pa_tagstruct_puts(reply, "PulseAudio");
1804
1805 pa_pstream_send_tagstruct(u->pstream, reply);
1806 /* We ignore the server's reply here */
1807
1808 reply = pa_tagstruct_new();
1809
1810 if (u->version < 13)
1811 /* Only for older PA versions we need to fill in the maxlength */
1812 u->maxlength = 4*1024*1024;
1813
1814 #ifdef TUNNEL_SINK
1815 u->tlength = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * u->latency, &u->sink->sample_spec);
1816 u->minreq = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * u->latency / 4, &u->sink->sample_spec);
1817 u->prebuf = u->tlength;
1818 #else
1819 u->fragsize = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * u->latency, &u->source->sample_spec);
1820 #endif
1821
1822 #ifdef TUNNEL_SINK
1823 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1824 pa_tagstruct_putu32(reply, tag = u->ctag++);
1825
1826 if (u->version < 13)
1827 pa_tagstruct_puts(reply, name);
1828
1829 pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1830 pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1831 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1832 pa_tagstruct_puts(reply, u->sink_name);
1833 pa_tagstruct_putu32(reply, u->maxlength);
1834 pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(u->sink->state));
1835 pa_tagstruct_putu32(reply, u->tlength);
1836 pa_tagstruct_putu32(reply, u->prebuf);
1837 pa_tagstruct_putu32(reply, u->minreq);
1838 pa_tagstruct_putu32(reply, 0);
1839 pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1840 pa_tagstruct_put_cvolume(reply, &volume);
1841 #else
1842 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1843 pa_tagstruct_putu32(reply, tag = u->ctag++);
1844
1845 if (u->version < 13)
1846 pa_tagstruct_puts(reply, name);
1847
1848 pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1849 pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1850 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1851 pa_tagstruct_puts(reply, u->source_name);
1852 pa_tagstruct_putu32(reply, u->maxlength);
1853 pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(u->source->state));
1854 pa_tagstruct_putu32(reply, u->fragsize);
1855 #endif
1856
1857 if (u->version >= 12) {
1858 pa_tagstruct_put_boolean(reply, false); /* no_remap */
1859 pa_tagstruct_put_boolean(reply, false); /* no_remix */
1860 pa_tagstruct_put_boolean(reply, false); /* fix_format */
1861 pa_tagstruct_put_boolean(reply, false); /* fix_rate */
1862 pa_tagstruct_put_boolean(reply, false); /* fix_channels */
1863 pa_tagstruct_put_boolean(reply, true); /* no_move */
1864 pa_tagstruct_put_boolean(reply, false); /* variable_rate */
1865 }
1866
1867 if (u->version >= 13) {
1868 pa_proplist *pl;
1869
1870 pa_tagstruct_put_boolean(reply, false); /* start muted/peak detect*/
1871 pa_tagstruct_put_boolean(reply, true); /* adjust_latency */
1872
1873 pl = pa_proplist_new();
1874 pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1875 pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1876 pa_tagstruct_put_proplist(reply, pl);
1877 pa_proplist_free(pl);
1878
1879 #ifndef TUNNEL_SINK
1880 pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1881 #endif
1882 }
1883
1884 if (u->version >= 14) {
1885 #ifdef TUNNEL_SINK
1886 pa_tagstruct_put_boolean(reply, false); /* volume_set */
1887 #endif
1888 pa_tagstruct_put_boolean(reply, true); /* early rquests */
1889 }
1890
1891 if (u->version >= 15) {
1892 #ifdef TUNNEL_SINK
1893 pa_tagstruct_put_boolean(reply, false); /* muted_set */
1894 #endif
1895 pa_tagstruct_put_boolean(reply, false); /* don't inhibit auto suspend */
1896 pa_tagstruct_put_boolean(reply, false); /* fail on suspend */
1897 }
1898
1899 #ifdef TUNNEL_SINK
1900 if (u->version >= 17)
1901 pa_tagstruct_put_boolean(reply, false); /* relative volume */
1902
1903 if (u->version >= 18)
1904 pa_tagstruct_put_boolean(reply, false); /* passthrough stream */
1905 #endif
1906
1907 #ifdef TUNNEL_SINK
1908 if (u->version >= 21) {
1909 /* We're not using the extended API, so n_formats = 0 and that's that */
1910 pa_tagstruct_putu8(reply, 0);
1911 }
1912 #else
1913 if (u->version >= 22) {
1914 /* We're not using the extended API, so n_formats = 0 and that's that */
1915 pa_tagstruct_putu8(reply, 0);
1916 pa_cvolume_reset(&volume, u->source->sample_spec.channels);
1917 pa_tagstruct_put_cvolume(reply, &volume);
1918 pa_tagstruct_put_boolean(reply, false); /* muted */
1919 pa_tagstruct_put_boolean(reply, false); /* volume_set */
1920 pa_tagstruct_put_boolean(reply, false); /* muted_set */
1921 pa_tagstruct_put_boolean(reply, false); /* relative volume */
1922 pa_tagstruct_put_boolean(reply, false); /* passthrough stream */
1923 }
1924 #endif
1925
1926 pa_pstream_send_tagstruct(u->pstream, reply);
1927 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1928
1929 pa_log_debug("Connection authenticated, creating stream ...");
1930
1931 return;
1932
1933 fail:
1934 unload_module(u->module->userdata);
1935 }
1936
1937 /* Called from main context */
pstream_die_callback(pa_pstream * p,void * userdata)1938 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1939 struct userdata *u = userdata;
1940
1941 pa_assert(p);
1942 pa_assert(u);
1943
1944 pa_log_warn("Stream died.");
1945 unload_module(u->module->userdata);
1946 }
1947
1948 /* Called from main context */
pstream_packet_callback(pa_pstream * p,pa_packet * packet,pa_cmsg_ancil_data * ancil_data,void * userdata)1949 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, pa_cmsg_ancil_data *ancil_data, void *userdata) {
1950 struct userdata *u = userdata;
1951
1952 pa_assert(p);
1953 pa_assert(packet);
1954 pa_assert(u);
1955
1956 if (pa_pdispatch_run(u->pdispatch, packet, ancil_data, u) < 0) {
1957 pa_log("Invalid packet");
1958 unload_module(u->module->userdata);
1959 return;
1960 }
1961 }
1962
1963 #ifndef TUNNEL_SINK
1964 /* Called from main context */
pstream_memblock_callback(pa_pstream * p,uint32_t channel,int64_t offset,pa_seek_mode_t seek,const pa_memchunk * chunk,void * userdata)1965 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1966 struct userdata *u = userdata;
1967
1968 pa_assert(p);
1969 pa_assert(chunk);
1970 pa_assert(u);
1971
1972 if (channel != u->channel) {
1973 pa_log("Received memory block on bad channel.");
1974 unload_module(u->module->userdata);
1975 return;
1976 }
1977
1978 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1979
1980 u->receive_counter += chunk->length;
1981 }
1982 #endif
1983
1984 /* Called from main context */
on_connection(pa_socket_client * sc,pa_iochannel * io,void * userdata)1985 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1986 struct userdata *u = userdata;
1987
1988 pa_assert_ctl_context();
1989
1990 pa_assert(sc);
1991 pa_assert(u);
1992 pa_assert(u->client == sc);
1993
1994 pa_socket_client_unref(u->client);
1995 u->client = NULL;
1996
1997 if (!io) {
1998 pa_log("Connection failed: %s", pa_cstrerror(errno));
1999 unload_module(u->module->userdata);
2000 return;
2001 }
2002
2003 u->io = io;
2004
2005 #ifdef TUNNEL_SINK
2006 create_sink(u);
2007 if (!u->sink) {
2008 unload_module(u->module->userdata);
2009 return;
2010 }
2011 on_sink_created(u);
2012 #else
2013 create_source(u);
2014 if (!u->source) {
2015 unload_module(u->module->userdata);
2016 return;
2017 }
2018 on_source_created(u);
2019 #endif
2020 }
2021
2022 #ifdef TUNNEL_SINK
on_sink_created(struct userdata * u)2023 static void on_sink_created(struct userdata *u)
2024 #else
2025 static void on_source_created(struct userdata *u)
2026 #endif
2027 {
2028 pa_tagstruct *t;
2029 uint32_t tag;
2030
2031 u->pstream = pa_pstream_new(u->core->mainloop, u->io, u->core->mempool);
2032 u->pdispatch = pa_pdispatch_new(u->core->mainloop, true, command_table, PA_COMMAND_MAX);
2033
2034 pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
2035 pa_pstream_set_receive_packet_callback(u->pstream, pstream_packet_callback, u);
2036 #ifndef TUNNEL_SINK
2037 pa_pstream_set_receive_memblock_callback(u->pstream, pstream_memblock_callback, u);
2038 #endif
2039
2040 t = pa_tagstruct_new();
2041 pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
2042 pa_tagstruct_putu32(t, tag = u->ctag++);
2043 pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
2044
2045 pa_tagstruct_put_arbitrary(t, pa_auth_cookie_read(u->auth_cookie, PA_NATIVE_COOKIE_LENGTH), PA_NATIVE_COOKIE_LENGTH);
2046
2047 #ifdef HAVE_CREDS
2048 {
2049 pa_creds ucred;
2050
2051 if (pa_iochannel_creds_supported(u->io))
2052 pa_iochannel_creds_enable(u->io);
2053
2054 ucred.uid = getuid();
2055 ucred.gid = getgid();
2056
2057 pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
2058 }
2059 #else
2060 pa_pstream_send_tagstruct(u->pstream, t);
2061 #endif
2062
2063 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
2064
2065 pa_log_debug("Connection established, authenticating ...");
2066 }
2067
2068 #ifdef TUNNEL_SINK
2069
2070 /* Called from main context */
sink_set_volume(pa_sink * sink)2071 static void sink_set_volume(pa_sink *sink) {
2072 struct userdata *u;
2073 pa_tagstruct *t;
2074
2075 pa_assert(sink);
2076 u = sink->userdata;
2077 pa_assert(u);
2078
2079 t = pa_tagstruct_new();
2080 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
2081 pa_tagstruct_putu32(t, u->ctag++);
2082 pa_tagstruct_putu32(t, u->device_index);
2083 pa_tagstruct_put_cvolume(t, &sink->real_volume);
2084 pa_pstream_send_tagstruct(u->pstream, t);
2085 }
2086
2087 /* Called from main context */
sink_set_mute(pa_sink * sink)2088 static void sink_set_mute(pa_sink *sink) {
2089 struct userdata *u;
2090 pa_tagstruct *t;
2091
2092 pa_assert(sink);
2093 u = sink->userdata;
2094 pa_assert(u);
2095
2096 if (u->version < 11)
2097 return;
2098
2099 t = pa_tagstruct_new();
2100 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
2101 pa_tagstruct_putu32(t, u->ctag++);
2102 pa_tagstruct_putu32(t, u->device_index);
2103 pa_tagstruct_put_boolean(t, sink->muted);
2104 pa_pstream_send_tagstruct(u->pstream, t);
2105 }
2106
2107 #endif
2108
2109 #ifdef TUNNEL_SINK
create_sink(struct userdata * u)2110 static void create_sink(struct userdata *u) {
2111 pa_sink_new_data data;
2112 char *data_name = NULL;
2113
2114 if (!(data_name = pa_xstrdup(u->configured_sink_name)))
2115 data_name = pa_sprintf_malloc("tunnel-sink.%s", u->server_name);
2116
2117 pa_sink_new_data_init(&data);
2118 data.driver = __FILE__;
2119 data.module = u->module;
2120 data.namereg_fail = false;
2121 pa_sink_new_data_set_name(&data, data_name);
2122 pa_sink_new_data_set_sample_spec(&data, &u->sample_spec);
2123 pa_sink_new_data_set_channel_map(&data, &u->channel_map);
2124 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
2125 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
2126 if (u->sink_name)
2127 pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
2128
2129 pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, u->sink_proplist);
2130
2131 u->sink = pa_sink_new(u->module->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY);
2132
2133 if (!u->sink) {
2134 pa_log("Failed to create sink.");
2135 goto finish;
2136 }
2137
2138 u->sink->parent.process_msg = sink_process_msg;
2139 u->sink->userdata = u;
2140 u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
2141 pa_sink_set_set_volume_callback(u->sink, sink_set_volume);
2142 pa_sink_set_set_mute_callback(u->sink, sink_set_mute);
2143
2144 u->sink->refresh_volume = u->sink->refresh_muted = false;
2145
2146 /* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
2147
2148 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
2149 pa_sink_set_rtpoll(u->sink, u->rtpoll);
2150 pa_sink_set_fixed_latency(u->sink, u->latency * PA_USEC_PER_MSEC);
2151
2152 pa_sink_put(u->sink);
2153
2154 finish:
2155 pa_sink_new_data_done(&data);
2156 pa_xfree(data_name);
2157 }
2158 #else
create_source(struct userdata * u)2159 static void create_source(struct userdata *u) {
2160 pa_source_new_data data;
2161 char *data_name = NULL;
2162
2163 if (!(data_name = pa_xstrdup(u->configured_source_name)))
2164 data_name = pa_sprintf_malloc("tunnel-source.%s", u->server_name);
2165
2166 pa_source_new_data_init(&data);
2167 data.driver = __FILE__;
2168 data.module = u->module;
2169 data.namereg_fail = false;
2170 pa_source_new_data_set_name(&data, data_name);
2171 pa_source_new_data_set_sample_spec(&data, &u->sample_spec);
2172 pa_source_new_data_set_channel_map(&data, &u->channel_map);
2173 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
2174 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
2175 if (u->source_name)
2176 pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
2177
2178 pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, u->source_proplist);
2179
2180 u->source = pa_source_new(u->module->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
2181
2182 if (!u->source) {
2183 pa_log("Failed to create source.");
2184 goto finish;
2185 }
2186
2187 u->source->parent.process_msg = source_process_msg;
2188 u->source->set_state_in_main_thread = source_set_state_in_main_thread_cb;
2189 u->source->userdata = u;
2190
2191 /* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
2192
2193 pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
2194 pa_source_set_rtpoll(u->source, u->rtpoll);
2195 pa_source_set_fixed_latency(u->source, u->latency * PA_USEC_PER_MSEC);
2196
2197 u->mcalign = pa_mcalign_new(pa_frame_size(&u->source->sample_spec));
2198
2199 pa_source_put(u->source);
2200
2201 finish:
2202 pa_source_new_data_done(&data);
2203 pa_xfree(data_name);
2204 }
2205 #endif
2206
2207 /* Runs in PA mainloop context */
tunnel_process_msg(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)2208 static int tunnel_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
2209 struct userdata *u = (struct userdata *) data;
2210
2211 pa_assert(u);
2212 pa_assert_ctl_context();
2213
2214 if (u->shutting_down)
2215 return 0;
2216
2217 switch (code) {
2218
2219 case TUNNEL_MESSAGE_MAYBE_RESTART:
2220 unload_module(u->module->userdata);
2221 break;
2222 }
2223
2224 return 0;
2225 }
2226
start_connect(struct userdata * u,char * server,bool automatic)2227 static int start_connect(struct userdata *u, char *server, bool automatic) {
2228 pa_strlist *server_list = NULL;
2229 int rc = 0;
2230
2231 if (server) {
2232 if (!(server_list = pa_strlist_parse(server))) {
2233 pa_log("Invalid server specified.");
2234 rc = -1;
2235 goto done;
2236 }
2237 } else {
2238 char *ufn;
2239
2240 if (!automatic) {
2241 pa_log("No server specified.");
2242 rc = -1;
2243 goto done;
2244 }
2245
2246 pa_log("No server address found. Attempting default local sockets.");
2247
2248 /* The system wide instance via PF_LOCAL */
2249 server_list = pa_strlist_prepend(server_list, PA_SYSTEM_RUNTIME_PATH PA_PATH_SEP PA_NATIVE_DEFAULT_UNIX_SOCKET);
2250
2251 /* The user instance via PF_LOCAL */
2252 if ((ufn = pa_runtime_path(PA_NATIVE_DEFAULT_UNIX_SOCKET))) {
2253 server_list = pa_strlist_prepend(server_list, ufn);
2254 pa_xfree(ufn);
2255 }
2256 }
2257
2258 for (;;) {
2259 server_list = pa_strlist_pop(server_list, &u->server_name);
2260
2261 if (!u->server_name) {
2262 if (server)
2263 pa_log("Failed to connect to server '%s'", server);
2264 else
2265 pa_log("Failed to connect");
2266 rc = -1;
2267 goto done;
2268 }
2269
2270 pa_log_debug("Trying to connect to %s...", u->server_name);
2271
2272 if (!(u->client = pa_socket_client_new_string(u->module->core->mainloop, true, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
2273 pa_xfree(u->server_name);
2274 u->server_name = NULL;
2275 continue;
2276 }
2277
2278 break;
2279 }
2280
2281 if (u->client)
2282 pa_socket_client_set_callback(u->client, on_connection, u);
2283
2284 done:
2285 pa_strlist_free(server_list);
2286
2287 return rc;
2288 }
2289
do_init(pa_module * m)2290 static int do_init(pa_module *m) {
2291 pa_modargs *ma = NULL;
2292 struct userdata *u = NULL;
2293 struct module_restart_data *rd;
2294 char *server = NULL;
2295 uint32_t latency_msec;
2296 bool automatic;
2297 #ifdef HAVE_X11
2298 xcb_connection_t *xcb = NULL;
2299 #endif
2300 const char *cookie_path;
2301 uint32_t reconnect_interval_ms = 0;
2302
2303 pa_assert(m);
2304 pa_assert(m->userdata);
2305
2306 rd = m->userdata;
2307
2308 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
2309 pa_log("Failed to parse module arguments");
2310 goto fail;
2311 }
2312
2313 rd->userdata = u = pa_xnew0(struct userdata, 1);
2314 u->core = m->core;
2315 u->module = m;
2316 u->client = NULL;
2317 u->pdispatch = NULL;
2318 u->pstream = NULL;
2319 u->server_name = NULL;
2320 #ifdef TUNNEL_SINK
2321 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
2322 u->configured_sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL));
2323 u->sink = NULL;
2324 u->requested_bytes = 0;
2325 #else
2326 u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
2327 u->configured_source_name = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL));
2328 u->source = NULL;
2329 #endif
2330 #ifndef USE_SMOOTHER_2
2331 u->smoother = pa_smoother_new(
2332 PA_USEC_PER_SEC,
2333 PA_USEC_PER_SEC*2,
2334 true,
2335 true,
2336 10,
2337 pa_rtclock_now(),
2338 false);
2339 #endif
2340 u->ctag = 1;
2341 u->device_index = u->channel = PA_INVALID_INDEX;
2342 u->time_event = NULL;
2343 u->ignore_latency_before = 0;
2344 u->transport_usec = u->thread_transport_usec = 0;
2345 u->remote_suspended = u->remote_corked = false;
2346 u->counter = 0;
2347 u->receive_snapshot = 0;
2348 u->receive_counter = 0;
2349
2350 u->msg = pa_msgobject_new(tunnel_msg);
2351 u->msg->parent.process_msg = tunnel_process_msg;
2352
2353 u->rtpoll = pa_rtpoll_new();
2354
2355 if (pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll) < 0) {
2356 pa_log("pa_thread_mq_init() failed.");
2357 goto fail;
2358 }
2359
2360 automatic = false;
2361 if (pa_modargs_get_value_boolean(ma, "auto", &automatic) < 0) {
2362 pa_log("Failed to parse argument \"auto\".");
2363 goto fail;
2364 }
2365
2366 /* Allow latencies between 5ms and 500ms */
2367 latency_msec = DEFAULT_LATENCY_MSEC;
2368 if (pa_modargs_get_value_u32(ma, "latency_msec", &latency_msec) < 0 || latency_msec < 5 || latency_msec > 500) {
2369 pa_log("Invalid latency specification");
2370 goto fail;
2371 }
2372
2373 u->latency = latency_msec;
2374
2375 cookie_path = pa_modargs_get_value(ma, "cookie", NULL);
2376 server = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL));
2377
2378 if (automatic) {
2379 #ifdef HAVE_X11
2380 /* Need an X11 connection to get root properties */
2381 if (getenv("DISPLAY") != NULL) {
2382 if (!(xcb = xcb_connect(getenv("DISPLAY"), NULL)))
2383 pa_log("xcb_connect() failed");
2384 else {
2385 if (xcb_connection_has_error(xcb)) {
2386 pa_log("xcb_connection_has_error() returned true");
2387 xcb_disconnect(xcb);
2388 xcb = NULL;
2389 }
2390 }
2391 }
2392 #endif
2393
2394 /* Figure out the cookie the same way a normal client would */
2395 if (!cookie_path)
2396 cookie_path = getenv(ENV_COOKIE_FILE);
2397
2398 #ifdef HAVE_X11
2399 if (!cookie_path && xcb) {
2400 char t[1024];
2401 if (pa_x11_get_prop(xcb, 0, "PULSE_COOKIE", t, sizeof(t))) {
2402 uint8_t cookie[PA_NATIVE_COOKIE_LENGTH];
2403
2404 if (pa_parsehex(t, cookie, sizeof(cookie)) != sizeof(cookie))
2405 pa_log("Failed to parse cookie data");
2406 else {
2407 if (!(u->auth_cookie = pa_auth_cookie_create(u->core, cookie, sizeof(cookie))))
2408 goto fail;
2409 }
2410 }
2411 }
2412 #endif
2413
2414 /* Same thing for the server name */
2415 if (!server)
2416 server = pa_xstrdup(getenv(ENV_DEFAULT_SERVER));
2417
2418 #ifdef HAVE_X11
2419 if (!server && xcb) {
2420 char t[1024];
2421 if (pa_x11_get_prop(xcb, 0, "PULSE_SERVER", t, sizeof(t)))
2422 server = pa_xstrdup(t);
2423 }
2424 #endif
2425
2426 /* Also determine the default sink/source on the other server */
2427 #ifdef TUNNEL_SINK
2428 if (!u->sink_name)
2429 u->sink_name = pa_xstrdup(getenv(ENV_DEFAULT_SINK));
2430
2431 #ifdef HAVE_X11
2432 if (!u->sink_name && xcb) {
2433 char t[1024];
2434 if (pa_x11_get_prop(xcb, 0, "PULSE_SINK", t, sizeof(t)))
2435 u->sink_name = pa_xstrdup(t);
2436 }
2437 #endif
2438 #else
2439 if (!u->source_name)
2440 u->source_name = pa_xstrdup(getenv(ENV_DEFAULT_SOURCE));
2441
2442 #ifdef HAVE_X11
2443 if (!u->source_name && xcb) {
2444 char t[1024];
2445 if (pa_x11_get_prop(xcb, 0, "PULSE_SOURCE", t, sizeof(t)))
2446 u->source_name = pa_xstrdup(t);
2447 }
2448 #endif
2449 #endif
2450 }
2451
2452 if (!cookie_path && !u->auth_cookie)
2453 cookie_path = PA_NATIVE_COOKIE_FILE;
2454
2455 if (cookie_path) {
2456 if (!(u->auth_cookie = pa_auth_cookie_get(u->core, cookie_path, true, PA_NATIVE_COOKIE_LENGTH)))
2457 goto fail;
2458 }
2459
2460 u->sample_spec = m->core->default_sample_spec;
2461 u->channel_map = m->core->default_channel_map;
2462 if (pa_modargs_get_sample_spec_and_channel_map(ma, &u->sample_spec, &u->channel_map, PA_CHANNEL_MAP_DEFAULT) < 0) {
2463 pa_log("Invalid sample format specification");
2464 goto fail;
2465 }
2466
2467 #ifdef USE_SMOOTHER_2
2468 /* Smoother window must be larger than time between updates. */
2469 u->smoother = pa_smoother_2_new(LATENCY_INTERVAL + 5*PA_USEC_PER_SEC, pa_rtclock_now(), pa_frame_size(&u->sample_spec), u->sample_spec.rate);
2470 #endif
2471
2472 pa_modargs_get_value_u32(ma, "reconnect_interval_ms", &reconnect_interval_ms);
2473 u->reconnect_interval_us = reconnect_interval_ms * PA_USEC_PER_MSEC;
2474
2475 #ifdef TUNNEL_SINK
2476
2477 u->sink_proplist = pa_proplist_new();
2478 if (pa_modargs_get_proplist(ma, "sink_properties", u->sink_proplist, PA_UPDATE_REPLACE) < 0) {
2479 pa_log("Invalid properties");
2480 goto fail;
2481 }
2482
2483 #else
2484
2485 u->source_proplist = pa_proplist_new();
2486 if (pa_modargs_get_proplist(ma, "source_properties", u->source_proplist, PA_UPDATE_REPLACE) < 0) {
2487 pa_log("Invalid properties");
2488 goto fail;
2489 }
2490
2491 #endif
2492
2493 u->time_event = NULL;
2494
2495 u->maxlength = (uint32_t) -1;
2496 #ifdef TUNNEL_SINK
2497 u->tlength = u->minreq = u->prebuf = (uint32_t) -1;
2498 #else
2499 u->fragsize = (uint32_t) -1;
2500 #endif
2501
2502 if (start_connect(u, server, automatic) < 0) {
2503 goto fail;
2504 }
2505
2506 if (!(u->thread = pa_thread_new("module-tunnel", thread_func, u))) {
2507 pa_log("Failed to create thread.");
2508 goto fail;
2509 }
2510
2511 if (server)
2512 pa_xfree(server);
2513
2514 #ifdef HAVE_X11
2515 if (xcb)
2516 xcb_disconnect(xcb);
2517 #endif
2518
2519 /* If the module is restarting and do_init() finishes successfully, the
2520 * restart data is no longer needed. If do_init() fails, don't touch the
2521 * restart data, because following restart attempts will continue to use
2522 * the same data. If restart_data is NULL, that means no restart is
2523 * currently pending. */
2524 if (rd->restart_data) {
2525 pa_restart_free(rd->restart_data);
2526 rd->restart_data = NULL;
2527 }
2528
2529 pa_modargs_free(ma);
2530
2531 return 0;
2532
2533 fail:
2534 if (server)
2535 pa_xfree(server);
2536
2537 #ifdef HAVE_X11
2538 if (xcb)
2539 xcb_disconnect(xcb);
2540 #endif
2541
2542 if (ma)
2543 pa_modargs_free(ma);
2544
2545 return -1;
2546 }
2547
do_done(pa_module * m)2548 static void do_done(pa_module *m) {
2549 struct userdata *u = NULL;
2550 struct module_restart_data *rd;
2551
2552 pa_assert(m);
2553
2554 if (!(rd = m->userdata))
2555 return;
2556 if (!(u = rd->userdata))
2557 return;
2558
2559 u->shutting_down = true;
2560
2561 #ifdef TUNNEL_SINK
2562 if (u->sink)
2563 pa_sink_unlink(u->sink);
2564 #else
2565 if (u->source)
2566 pa_source_unlink(u->source);
2567 #endif
2568
2569 if (u->thread) {
2570 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
2571 pa_thread_free(u->thread);
2572 }
2573
2574 pa_thread_mq_done(&u->thread_mq);
2575
2576 #ifdef TUNNEL_SINK
2577 if (u->sink)
2578 pa_sink_unref(u->sink);
2579 #else
2580 if (u->source)
2581 pa_source_unref(u->source);
2582 #endif
2583
2584 if (u->rtpoll)
2585 pa_rtpoll_free(u->rtpoll);
2586
2587 if (u->pstream) {
2588 pa_pstream_unlink(u->pstream);
2589 pa_pstream_unref(u->pstream);
2590 }
2591
2592 if (u->pdispatch)
2593 pa_pdispatch_unref(u->pdispatch);
2594
2595 if (u->client)
2596 pa_socket_client_unref(u->client);
2597
2598 if (u->auth_cookie)
2599 pa_auth_cookie_unref(u->auth_cookie);
2600
2601 if (u->smoother)
2602 #ifdef USE_SMOOTHER_2
2603 pa_smoother_2_free(u->smoother);
2604 #else
2605 pa_smoother_free(u->smoother);
2606 #endif
2607
2608 if (u->time_event)
2609 u->core->mainloop->time_free(u->time_event);
2610
2611 #ifndef TUNNEL_SINK
2612 if (u->mcalign)
2613 pa_mcalign_free(u->mcalign);
2614 #endif
2615
2616 #ifdef TUNNEL_SINK
2617 pa_xfree(u->sink_name);
2618 pa_xfree(u->configured_sink_name);
2619 pa_proplist_free(u->sink_proplist);
2620 #else
2621 pa_xfree(u->source_name);
2622 pa_xfree(u->configured_source_name);
2623 pa_proplist_free(u->source_proplist);
2624 #endif
2625 pa_xfree(u->server_name);
2626
2627 pa_xfree(u->device_description);
2628 pa_xfree(u->server_fqdn);
2629 pa_xfree(u->user_name);
2630
2631 pa_xfree(u->msg);
2632
2633 pa_xfree(u);
2634
2635 rd->userdata = NULL;
2636 }
2637
pa__init(pa_module * m)2638 int pa__init(pa_module *m) {
2639 int ret;
2640
2641 pa_assert(m);
2642
2643 m->userdata = pa_xnew0(struct module_restart_data, 1);
2644
2645 ret = do_init(m);
2646
2647 if (ret < 0)
2648 pa__done(m);
2649
2650 return ret;
2651 }
2652
pa__done(pa_module * m)2653 void pa__done(pa_module *m) {
2654 pa_assert(m);
2655
2656 do_done(m);
2657
2658 if (m->userdata) {
2659 struct module_restart_data *rd = m->userdata;
2660
2661 if (rd->restart_data)
2662 pa_restart_free(rd->restart_data);
2663
2664 pa_xfree(m->userdata);
2665 }
2666 }
2667