1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24
25 #include <unistd.h>
26 #include <string.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <stdio.h>
30 #include <stdlib.h>
31
32 #ifdef HAVE_X11
33 #include <xcb/xcb.h>
34 #endif
35
36 #include <pulse/rtclock.h>
37 #include <pulse/timeval.h>
38 #include <pulse/util.h>
39 #include <pulse/version.h>
40 #include <pulse/xmalloc.h>
41
42 #include <pulsecore/module.h>
43 #include <pulsecore/core-util.h>
44 #include <pulsecore/modargs.h>
45 #include <pulsecore/log.h>
46 #include <pulsecore/core-subscribe.h>
47 #include <pulsecore/pdispatch.h>
48 #include <pulsecore/pstream.h>
49 #include <pulsecore/pstream-util.h>
50 #include <pulsecore/socket-client.h>
51 #include <pulsecore/time-smoother.h>
52 #include <pulsecore/thread.h>
53 #include <pulsecore/thread-mq.h>
54 #include <pulsecore/core-rtclock.h>
55 #include <pulsecore/core-error.h>
56 #include <pulsecore/proplist-util.h>
57 #include <pulsecore/auth-cookie.h>
58 #include <pulsecore/mcalign.h>
59 #include <pulsecore/strlist.h>
60
61 #ifdef HAVE_X11
62 #include <pulsecore/x11prop.h>
63 #endif
64
65 #define ENV_DEFAULT_SINK "PULSE_SINK"
66 #define ENV_DEFAULT_SOURCE "PULSE_SOURCE"
67 #define ENV_DEFAULT_SERVER "PULSE_SERVER"
68 #define ENV_COOKIE_FILE "PULSE_COOKIE"
69
70 #ifdef TUNNEL_SINK
71 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
72 PA_MODULE_USAGE(
73 "sink_name=<name for the local sink> "
74 "sink_properties=<properties for the local sink> "
75 "auto=<determine server/sink/cookie automatically> "
76 "server=<address> "
77 "sink=<remote sink name> "
78 "cookie=<filename> "
79 "format=<sample format> "
80 "channels=<number of channels> "
81 "rate=<sample rate> "
82 "channel_map=<channel map>");
83 #else
84 PA_MODULE_DESCRIPTION("Tunnel module for sources");
85 PA_MODULE_USAGE(
86 "source_name=<name for the local source> "
87 "source_properties=<properties for the local source> "
88 "auto=<determine server/source/cookie automatically> "
89 "server=<address> "
90 "source=<remote source name> "
91 "cookie=<filename> "
92 "format=<sample format> "
93 "channels=<number of channels> "
94 "rate=<sample rate> "
95 "channel_map=<channel map>");
96 #endif
97
98 PA_MODULE_AUTHOR("Lennart Poettering");
99 PA_MODULE_VERSION(PACKAGE_VERSION);
100 PA_MODULE_LOAD_ONCE(false);
101
102 static const char* const valid_modargs[] = {
103 "auto",
104 "server",
105 "cookie",
106 "format",
107 "channels",
108 "rate",
109 #ifdef TUNNEL_SINK
110 "sink_name",
111 "sink_properties",
112 "sink",
113 #else
114 "source_name",
115 "source_properties",
116 "source",
117 #endif
118 "channel_map",
119 NULL,
120 };
121
122 #define DEFAULT_TIMEOUT 5
123
124 #define LATENCY_INTERVAL (10*PA_USEC_PER_SEC)
125
126 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
127
128 #ifdef TUNNEL_SINK
129
130 enum {
131 SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
132 SINK_MESSAGE_REMOTE_SUSPEND,
133 SINK_MESSAGE_UPDATE_LATENCY,
134 SINK_MESSAGE_POST
135 };
136
137 #define DEFAULT_TLENGTH_MSEC 150
138 #define DEFAULT_MINREQ_MSEC 25
139
140 #else
141
142 enum {
143 SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
144 SOURCE_MESSAGE_REMOTE_SUSPEND,
145 SOURCE_MESSAGE_UPDATE_LATENCY
146 };
147
148 #define DEFAULT_FRAGSIZE_MSEC 25
149
150 #endif
151
152 #ifdef TUNNEL_SINK
153 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
154 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
155 #endif
156 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
157 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
158 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
159 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
160 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
161 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
162 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
163
164 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
165 #ifdef TUNNEL_SINK
166 [PA_COMMAND_REQUEST] = command_request,
167 [PA_COMMAND_STARTED] = command_started,
168 #endif
169 [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
170 [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
171 [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
172 [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
173 [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
174 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
175 [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
176 [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
177 [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
178 [PA_COMMAND_PLAYBACK_STREAM_EVENT] = command_stream_or_client_event,
179 [PA_COMMAND_RECORD_STREAM_EVENT] = command_stream_or_client_event,
180 [PA_COMMAND_CLIENT_EVENT] = command_stream_or_client_event,
181 [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed,
182 [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed
183 };
184
185 struct userdata {
186 pa_core *core;
187 pa_module *module;
188
189 pa_thread_mq thread_mq;
190 pa_rtpoll *rtpoll;
191 pa_thread *thread;
192
193 pa_socket_client *client;
194 pa_pstream *pstream;
195 pa_pdispatch *pdispatch;
196
197 char *server_name;
198 #ifdef TUNNEL_SINK
199 char *sink_name;
200 pa_sink *sink;
201 size_t requested_bytes;
202 #else
203 char *source_name;
204 pa_source *source;
205 pa_mcalign *mcalign;
206 #endif
207
208 pa_auth_cookie *auth_cookie;
209
210 uint32_t version;
211 uint32_t ctag;
212 uint32_t device_index;
213 uint32_t channel;
214
215 int64_t counter, counter_delta;
216
217 bool remote_corked:1;
218 bool remote_suspended:1;
219
220 pa_usec_t transport_usec; /* maintained in the main thread */
221 pa_usec_t thread_transport_usec; /* maintained in the IO thread */
222
223 uint32_t ignore_latency_before;
224
225 pa_time_event *time_event;
226
227 pa_smoother *smoother;
228
229 char *device_description;
230 char *server_fqdn;
231 char *user_name;
232
233 uint32_t maxlength;
234 #ifdef TUNNEL_SINK
235 uint32_t tlength;
236 uint32_t minreq;
237 uint32_t prebuf;
238 #else
239 uint32_t fragsize;
240 #endif
241 };
242
243 static void request_latency(struct userdata *u);
244
245 /* Called from main context */
command_stream_or_client_event(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)246 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
247 pa_log_debug("Got stream or client event.");
248 }
249
250 /* Called from main context */
command_stream_killed(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)251 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
252 struct userdata *u = userdata;
253
254 pa_assert(pd);
255 pa_assert(t);
256 pa_assert(u);
257 pa_assert(u->pdispatch == pd);
258
259 pa_log_warn("Stream killed");
260 pa_module_unload_request(u->module, true);
261 }
262
263 /* Called from main context */
command_overflow_or_underflow(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)264 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
265 struct userdata *u = userdata;
266
267 pa_assert(pd);
268 pa_assert(t);
269 pa_assert(u);
270 pa_assert(u->pdispatch == pd);
271
272 pa_log_info("Server signalled buffer overrun/underrun.");
273 request_latency(u);
274 }
275
276 /* Called from main context */
command_suspended(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)277 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
278 struct userdata *u = userdata;
279 uint32_t channel;
280 bool suspended;
281
282 pa_assert(pd);
283 pa_assert(t);
284 pa_assert(u);
285 pa_assert(u->pdispatch == pd);
286
287 if (pa_tagstruct_getu32(t, &channel) < 0 ||
288 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
289 !pa_tagstruct_eof(t)) {
290
291 pa_log("Invalid packet.");
292 pa_module_unload_request(u->module, true);
293 return;
294 }
295
296 pa_log_debug("Server reports device suspend.");
297
298 #ifdef TUNNEL_SINK
299 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(suspended), 0, NULL);
300 #else
301 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(suspended), 0, NULL);
302 #endif
303
304 request_latency(u);
305 }
306
307 /* Called from main context */
command_moved(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)308 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
309 struct userdata *u = userdata;
310 uint32_t channel, di;
311 const char *dn;
312 bool suspended;
313
314 pa_assert(pd);
315 pa_assert(t);
316 pa_assert(u);
317 pa_assert(u->pdispatch == pd);
318
319 if (pa_tagstruct_getu32(t, &channel) < 0 ||
320 pa_tagstruct_getu32(t, &di) < 0 ||
321 pa_tagstruct_gets(t, &dn) < 0 ||
322 pa_tagstruct_get_boolean(t, &suspended) < 0) {
323
324 pa_log_error("Invalid packet.");
325 pa_module_unload_request(u->module, true);
326 return;
327 }
328
329 pa_log_debug("Server reports a stream move.");
330
331 #ifdef TUNNEL_SINK
332 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(suspended), 0, NULL);
333 #else
334 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(suspended), 0, NULL);
335 #endif
336
337 request_latency(u);
338 }
339
command_stream_buffer_attr_changed(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)340 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
341 struct userdata *u = userdata;
342 uint32_t channel, maxlength, tlength = 0, fragsize, prebuf, minreq;
343 pa_usec_t usec;
344
345 pa_assert(pd);
346 pa_assert(t);
347 pa_assert(u);
348 pa_assert(u->pdispatch == pd);
349
350 if (pa_tagstruct_getu32(t, &channel) < 0 ||
351 pa_tagstruct_getu32(t, &maxlength) < 0) {
352
353 pa_log_error("Invalid packet.");
354 pa_module_unload_request(u->module, true);
355 return;
356 }
357
358 if (command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED) {
359 if (pa_tagstruct_getu32(t, &fragsize) < 0 ||
360 pa_tagstruct_get_usec(t, &usec) < 0) {
361
362 pa_log_error("Invalid packet.");
363 pa_module_unload_request(u->module, true);
364 return;
365 }
366 } else {
367 if (pa_tagstruct_getu32(t, &tlength) < 0 ||
368 pa_tagstruct_getu32(t, &prebuf) < 0 ||
369 pa_tagstruct_getu32(t, &minreq) < 0 ||
370 pa_tagstruct_get_usec(t, &usec) < 0) {
371
372 pa_log_error("Invalid packet.");
373 pa_module_unload_request(u->module, true);
374 return;
375 }
376 }
377
378 #ifdef TUNNEL_SINK
379 pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength, (unsigned long) u->tlength);
380 #endif
381
382 request_latency(u);
383 }
384
385 #ifdef TUNNEL_SINK
386
387 /* Called from main context */
command_started(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)388 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
389 struct userdata *u = userdata;
390
391 pa_assert(pd);
392 pa_assert(t);
393 pa_assert(u);
394 pa_assert(u->pdispatch == pd);
395
396 pa_log_debug("Server reports playback started.");
397 request_latency(u);
398 }
399
400 #endif
401
402 /* Called from IO thread context */
check_smoother_status(struct userdata * u,bool past)403 static void check_smoother_status(struct userdata *u, bool past) {
404 pa_usec_t x;
405
406 pa_assert(u);
407
408 x = pa_rtclock_now();
409
410 /* Correct by the time the requested issued needs to travel to the
411 * other side. This is a valid thread-safe access, because the
412 * main thread is waiting for us */
413
414 if (past)
415 x -= u->thread_transport_usec;
416 else
417 x += u->thread_transport_usec;
418
419 if (u->remote_suspended || u->remote_corked)
420 pa_smoother_pause(u->smoother, x);
421 else
422 pa_smoother_resume(u->smoother, x, true);
423 }
424
425 /* Called from IO thread context */
stream_cork_within_thread(struct userdata * u,bool cork)426 static void stream_cork_within_thread(struct userdata *u, bool cork) {
427 pa_assert(u);
428
429 if (u->remote_corked == cork)
430 return;
431
432 u->remote_corked = cork;
433 check_smoother_status(u, false);
434 }
435
436 /* Called from main context */
stream_cork(struct userdata * u,bool cork)437 static void stream_cork(struct userdata *u, bool cork) {
438 pa_tagstruct *t;
439 pa_assert(u);
440
441 if (!u->pstream)
442 return;
443
444 t = pa_tagstruct_new();
445 #ifdef TUNNEL_SINK
446 pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
447 #else
448 pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
449 #endif
450 pa_tagstruct_putu32(t, u->ctag++);
451 pa_tagstruct_putu32(t, u->channel);
452 pa_tagstruct_put_boolean(t, cork);
453 pa_pstream_send_tagstruct(u->pstream, t);
454
455 request_latency(u);
456 }
457
458 /* Called from IO thread context */
stream_suspend_within_thread(struct userdata * u,bool suspend)459 static void stream_suspend_within_thread(struct userdata *u, bool suspend) {
460 pa_assert(u);
461
462 if (u->remote_suspended == suspend)
463 return;
464
465 u->remote_suspended = suspend;
466 check_smoother_status(u, true);
467 }
468
469 #ifdef TUNNEL_SINK
470
471 /* Called from IO thread context */
send_data(struct userdata * u)472 static void send_data(struct userdata *u) {
473 pa_assert(u);
474
475 while (u->requested_bytes > 0) {
476 pa_memchunk memchunk;
477
478 pa_sink_render(u->sink, u->requested_bytes, &memchunk);
479 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
480 pa_memblock_unref(memchunk.memblock);
481
482 u->requested_bytes -= memchunk.length;
483
484 u->counter += (int64_t) memchunk.length;
485 }
486 }
487
488 /* This function is called from IO context -- except when it is not. */
sink_process_msg(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)489 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
490 struct userdata *u = PA_SINK(o)->userdata;
491
492 switch (code) {
493
494 case PA_SINK_MESSAGE_SET_STATE: {
495 int r;
496
497 /* First, change the state, because otherwise pa_sink_render() would fail */
498 if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
499
500 stream_cork_within_thread(u, u->sink->thread_info.state == PA_SINK_SUSPENDED);
501
502 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
503 send_data(u);
504 }
505
506 return r;
507 }
508
509 case PA_SINK_MESSAGE_GET_LATENCY: {
510 pa_usec_t yl, yr;
511 int64_t *usec = data;
512
513 yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
514 yr = pa_smoother_get(u->smoother, pa_rtclock_now());
515
516 *usec = (int64_t)yl - yr;
517 return 0;
518 }
519
520 case SINK_MESSAGE_REQUEST:
521
522 pa_assert(offset > 0);
523 u->requested_bytes += (size_t) offset;
524
525 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
526 send_data(u);
527
528 return 0;
529
530 case SINK_MESSAGE_REMOTE_SUSPEND:
531
532 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
533 return 0;
534
535 case SINK_MESSAGE_UPDATE_LATENCY: {
536 pa_usec_t y;
537
538 y = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
539
540 if (y > (pa_usec_t) offset)
541 y -= (pa_usec_t) offset;
542 else
543 y = 0;
544
545 pa_smoother_put(u->smoother, pa_rtclock_now(), y);
546
547 /* We can access this freely here, since the main thread is waiting for us */
548 u->thread_transport_usec = u->transport_usec;
549
550 return 0;
551 }
552
553 case SINK_MESSAGE_POST:
554
555 /* OK, This might be a bit confusing. This message is
556 * delivered to us from the main context -- NOT from the
557 * IO thread context where the rest of the messages are
558 * dispatched. Yeah, ugly, but I am a lazy bastard. */
559
560 pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
561
562 u->counter_delta += (int64_t) chunk->length;
563
564 return 0;
565 }
566
567 return pa_sink_process_msg(o, code, data, offset, chunk);
568 }
569
570 /* Called from main context */
sink_set_state_in_main_thread_cb(pa_sink * s,pa_sink_state_t state,pa_suspend_cause_t suspend_cause)571 static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) {
572 struct userdata *u;
573 pa_sink_assert_ref(s);
574 u = s->userdata;
575
576 /* It may be that only the suspend cause is changing, in which
577 * case there's nothing to do. */
578 if (state == s->state)
579 return 0;
580
581 switch ((pa_sink_state_t) state) {
582
583 case PA_SINK_SUSPENDED:
584 pa_assert(PA_SINK_IS_OPENED(s->state));
585 stream_cork(u, true);
586 break;
587
588 case PA_SINK_IDLE:
589 case PA_SINK_RUNNING:
590 if (s->state == PA_SINK_SUSPENDED)
591 stream_cork(u, false);
592 break;
593
594 case PA_SINK_UNLINKED:
595 case PA_SINK_INIT:
596 case PA_SINK_INVALID_STATE:
597 ;
598 }
599
600 return 0;
601 }
602
603 #else
604
605 /* This function is called from IO context -- except when it is not. */
source_process_msg(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)606 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
607 struct userdata *u = PA_SOURCE(o)->userdata;
608
609 switch (code) {
610
611 case PA_SOURCE_MESSAGE_SET_STATE: {
612 int r;
613
614 if ((r = pa_source_process_msg(o, code, data, offset, chunk)) >= 0)
615 stream_cork_within_thread(u, u->source->thread_info.state == PA_SOURCE_SUSPENDED);
616
617 return r;
618 }
619
620 case PA_SOURCE_MESSAGE_GET_LATENCY: {
621 pa_usec_t yr, yl;
622 int64_t *usec = data;
623
624 yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SOURCE(o)->sample_spec);
625 yr = pa_smoother_get(u->smoother, pa_rtclock_now());
626
627 *usec = (int64_t)yr - yl;
628 return 0;
629 }
630
631 case SOURCE_MESSAGE_POST: {
632 pa_memchunk c;
633
634 pa_mcalign_push(u->mcalign, chunk);
635
636 while (pa_mcalign_pop(u->mcalign, &c) >= 0) {
637
638 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
639 pa_source_post(u->source, &c);
640
641 pa_memblock_unref(c.memblock);
642
643 u->counter += (int64_t) c.length;
644 }
645
646 return 0;
647 }
648
649 case SOURCE_MESSAGE_REMOTE_SUSPEND:
650
651 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
652 return 0;
653
654 case SOURCE_MESSAGE_UPDATE_LATENCY: {
655 pa_usec_t y;
656
657 y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec);
658 y += (pa_usec_t) offset;
659
660 pa_smoother_put(u->smoother, pa_rtclock_now(), y);
661
662 /* We can access this freely here, since the main thread is waiting for us */
663 u->thread_transport_usec = u->transport_usec;
664
665 return 0;
666 }
667 }
668
669 return pa_source_process_msg(o, code, data, offset, chunk);
670 }
671
672 /* Called from main context */
source_set_state_in_main_thread_cb(pa_source * s,pa_source_state_t state,pa_suspend_cause_t suspend_cause)673 static int source_set_state_in_main_thread_cb(pa_source *s, pa_source_state_t state, pa_suspend_cause_t suspend_cause) {
674 struct userdata *u;
675 pa_source_assert_ref(s);
676 u = s->userdata;
677
678 /* It may be that only the suspend cause is changing, in which
679 * case there's nothing to do. */
680 if (state == s->state)
681 return 0;
682
683 switch ((pa_source_state_t) state) {
684
685 case PA_SOURCE_SUSPENDED:
686 pa_assert(PA_SOURCE_IS_OPENED(s->state));
687 stream_cork(u, true);
688 break;
689
690 case PA_SOURCE_IDLE:
691 case PA_SOURCE_RUNNING:
692 if (s->state == PA_SOURCE_SUSPENDED)
693 stream_cork(u, false);
694 break;
695
696 case PA_SOURCE_UNLINKED:
697 case PA_SOURCE_INIT:
698 case PA_SOURCE_INVALID_STATE:
699 ;
700 }
701
702 return 0;
703 }
704
705 #endif
706
thread_func(void * userdata)707 static void thread_func(void *userdata) {
708 struct userdata *u = userdata;
709
710 pa_assert(u);
711
712 pa_log_debug("Thread starting up");
713
714 pa_thread_mq_install(&u->thread_mq);
715
716 for (;;) {
717 int ret;
718
719 #ifdef TUNNEL_SINK
720 if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
721 pa_sink_process_rewind(u->sink, 0);
722 #endif
723
724 if ((ret = pa_rtpoll_run(u->rtpoll)) < 0)
725 goto fail;
726
727 if (ret == 0)
728 goto finish;
729 }
730
731 fail:
732 /* If this was no regular exit from the loop we have to continue
733 * processing messages until we received PA_MESSAGE_SHUTDOWN */
734 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
735 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
736
737 finish:
738 pa_log_debug("Thread shutting down");
739 }
740
741 #ifdef TUNNEL_SINK
742 /* Called from main context */
command_request(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)743 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
744 struct userdata *u = userdata;
745 uint32_t bytes, channel;
746
747 pa_assert(pd);
748 pa_assert(command == PA_COMMAND_REQUEST);
749 pa_assert(t);
750 pa_assert(u);
751 pa_assert(u->pdispatch == pd);
752
753 if (pa_tagstruct_getu32(t, &channel) < 0 ||
754 pa_tagstruct_getu32(t, &bytes) < 0) {
755 pa_log("Invalid protocol reply");
756 goto fail;
757 }
758
759 if (channel != u->channel) {
760 pa_log("Received data for invalid channel");
761 goto fail;
762 }
763
764 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
765 return;
766
767 fail:
768 pa_module_unload_request(u->module, true);
769 }
770
771 #endif
772
773 /* Called from main context */
stream_get_latency_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)774 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
775 struct userdata *u = userdata;
776 pa_usec_t sink_usec, source_usec;
777 bool playing;
778 int64_t write_index, read_index;
779 struct timeval local, remote, now;
780 pa_sample_spec *ss;
781 int64_t delay;
782
783 pa_assert(pd);
784 pa_assert(u);
785
786 if (command != PA_COMMAND_REPLY) {
787 if (command == PA_COMMAND_ERROR)
788 pa_log("Failed to get latency.");
789 else
790 pa_log("Protocol error.");
791 goto fail;
792 }
793
794 if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
795 pa_tagstruct_get_usec(t, &source_usec) < 0 ||
796 pa_tagstruct_get_boolean(t, &playing) < 0 ||
797 pa_tagstruct_get_timeval(t, &local) < 0 ||
798 pa_tagstruct_get_timeval(t, &remote) < 0 ||
799 pa_tagstruct_gets64(t, &write_index) < 0 ||
800 pa_tagstruct_gets64(t, &read_index) < 0) {
801 pa_log("Invalid reply.");
802 goto fail;
803 }
804
805 #ifdef TUNNEL_SINK
806 if (u->version >= 13) {
807 uint64_t underrun_for = 0, playing_for = 0;
808
809 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
810 pa_tagstruct_getu64(t, &playing_for) < 0) {
811 pa_log("Invalid reply.");
812 goto fail;
813 }
814 }
815 #endif
816
817 if (!pa_tagstruct_eof(t)) {
818 pa_log("Invalid reply.");
819 goto fail;
820 }
821
822 if (tag < u->ignore_latency_before) {
823 return;
824 }
825
826 pa_gettimeofday(&now);
827
828 /* Calculate transport usec */
829 if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
830 /* local and remote seem to have synchronized clocks */
831 #ifdef TUNNEL_SINK
832 u->transport_usec = pa_timeval_diff(&remote, &local);
833 #else
834 u->transport_usec = pa_timeval_diff(&now, &remote);
835 #endif
836 } else
837 u->transport_usec = pa_timeval_diff(&now, &local)/2;
838
839 /* First, take the device's delay */
840 #ifdef TUNNEL_SINK
841 delay = (int64_t) sink_usec;
842 ss = &u->sink->sample_spec;
843 #else
844 delay = (int64_t) source_usec;
845 ss = &u->source->sample_spec;
846 #endif
847
848 /* Add the length of our server-side buffer */
849 if (write_index >= read_index)
850 delay += (int64_t) pa_bytes_to_usec((uint64_t) (write_index-read_index), ss);
851 else
852 delay -= (int64_t) pa_bytes_to_usec((uint64_t) (read_index-write_index), ss);
853
854 /* Our measurements are already out of date, hence correct by the *
855 * transport latency */
856 #ifdef TUNNEL_SINK
857 delay -= (int64_t) u->transport_usec;
858 #else
859 delay += (int64_t) u->transport_usec;
860 #endif
861
862 /* Now correct by what we have have read/written since we requested the update */
863 #ifdef TUNNEL_SINK
864 delay += (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
865 #else
866 delay -= (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
867 #endif
868
869 #ifdef TUNNEL_SINK
870 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
871 #else
872 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
873 #endif
874
875 return;
876
877 fail:
878
879 pa_module_unload_request(u->module, true);
880 }
881
882 /* Called from main context */
request_latency(struct userdata * u)883 static void request_latency(struct userdata *u) {
884 pa_tagstruct *t;
885 struct timeval now;
886 uint32_t tag;
887 pa_assert(u);
888
889 t = pa_tagstruct_new();
890 #ifdef TUNNEL_SINK
891 pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
892 #else
893 pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
894 #endif
895 pa_tagstruct_putu32(t, tag = u->ctag++);
896 pa_tagstruct_putu32(t, u->channel);
897
898 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
899
900 pa_pstream_send_tagstruct(u->pstream, t);
901 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
902
903 u->ignore_latency_before = tag;
904 u->counter_delta = 0;
905 }
906
907 /* Called from main context */
timeout_callback(pa_mainloop_api * m,pa_time_event * e,const struct timeval * t,void * userdata)908 static void timeout_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
909 struct userdata *u = userdata;
910
911 pa_assert(m);
912 pa_assert(e);
913 pa_assert(u);
914
915 request_latency(u);
916
917 pa_core_rttime_restart(u->core, e, pa_rtclock_now() + LATENCY_INTERVAL);
918 }
919
920 /* Called from main context */
update_description(struct userdata * u)921 static void update_description(struct userdata *u) {
922 char *d;
923 char un[128], hn[128];
924 pa_tagstruct *t;
925
926 pa_assert(u);
927
928 if (!u->server_fqdn || !u->user_name || !u->device_description)
929 return;
930
931 d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
932
933 #ifdef TUNNEL_SINK
934 pa_sink_set_description(u->sink, d);
935 pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
936 pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
937 pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
938 #else
939 pa_source_set_description(u->source, d);
940 pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
941 pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
942 pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
943 #endif
944
945 pa_xfree(d);
946
947 d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
948 pa_get_user_name(un, sizeof(un)),
949 pa_get_host_name(hn, sizeof(hn)));
950
951 t = pa_tagstruct_new();
952 #ifdef TUNNEL_SINK
953 pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
954 #else
955 pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
956 #endif
957 pa_tagstruct_putu32(t, u->ctag++);
958 pa_tagstruct_putu32(t, u->channel);
959 pa_tagstruct_puts(t, d);
960 pa_pstream_send_tagstruct(u->pstream, t);
961
962 pa_xfree(d);
963 }
964
965 /* Called from main context */
server_info_cb(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)966 static void server_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
967 struct userdata *u = userdata;
968 pa_sample_spec ss;
969 pa_channel_map cm;
970 const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
971 uint32_t cookie;
972
973 pa_assert(pd);
974 pa_assert(u);
975
976 if (command != PA_COMMAND_REPLY) {
977 if (command == PA_COMMAND_ERROR)
978 pa_log("Failed to get info.");
979 else
980 pa_log("Protocol error.");
981 goto fail;
982 }
983
984 if (pa_tagstruct_gets(t, &server_name) < 0 ||
985 pa_tagstruct_gets(t, &server_version) < 0 ||
986 pa_tagstruct_gets(t, &user_name) < 0 ||
987 pa_tagstruct_gets(t, &host_name) < 0 ||
988 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
989 pa_tagstruct_gets(t, &default_sink_name) < 0 ||
990 pa_tagstruct_gets(t, &default_source_name) < 0 ||
991 pa_tagstruct_getu32(t, &cookie) < 0 ||
992 (u->version >= 15 && pa_tagstruct_get_channel_map(t, &cm) < 0)) {
993
994 pa_log("Parse failure");
995 goto fail;
996 }
997
998 if (!pa_tagstruct_eof(t)) {
999 pa_log("Packet too long");
1000 goto fail;
1001 }
1002
1003 pa_xfree(u->server_fqdn);
1004 u->server_fqdn = pa_xstrdup(host_name);
1005
1006 pa_xfree(u->user_name);
1007 u->user_name = pa_xstrdup(user_name);
1008
1009 update_description(u);
1010
1011 return;
1012
1013 fail:
1014 pa_module_unload_request(u->module, true);
1015 }
1016
read_ports(struct userdata * u,pa_tagstruct * t)1017 static int read_ports(struct userdata *u, pa_tagstruct *t) {
1018 if (u->version >= 16) {
1019 uint32_t n_ports;
1020 const char *s;
1021
1022 if (pa_tagstruct_getu32(t, &n_ports)) {
1023 pa_log("Parse failure");
1024 return -PA_ERR_PROTOCOL;
1025 }
1026
1027 for (uint32_t j = 0; j < n_ports; j++) {
1028 uint32_t priority;
1029
1030 if (pa_tagstruct_gets(t, &s) < 0 || /* name */
1031 pa_tagstruct_gets(t, &s) < 0 || /* description */
1032 pa_tagstruct_getu32(t, &priority) < 0) {
1033
1034 pa_log("Parse failure");
1035 return -PA_ERR_PROTOCOL;
1036 }
1037 if (u->version >= 24) {
1038 if (pa_tagstruct_getu32(t, &priority) < 0) { /* available */
1039 pa_log("Parse failure");
1040 return -PA_ERR_PROTOCOL;
1041 }
1042 if (u->version >= 34 &&
1043 (pa_tagstruct_gets(t, &s) < 0 || /* availability group */
1044 pa_tagstruct_getu32(t, &priority) < 0)) { /* device port type */
1045 pa_log("Parse failure");
1046 return -PA_ERR_PROTOCOL;
1047 }
1048 }
1049 }
1050
1051 if (pa_tagstruct_gets(t, &s) < 0) { /* active port */
1052 pa_log("Parse failure");
1053 return -PA_ERR_PROTOCOL;
1054 }
1055 }
1056 return 0;
1057 }
1058
read_formats(struct userdata * u,pa_tagstruct * t)1059 static int read_formats(struct userdata *u, pa_tagstruct *t) {
1060 uint8_t n_formats;
1061 pa_format_info *format;
1062
1063 if (pa_tagstruct_getu8(t, &n_formats) < 0) { /* no. of formats */
1064 pa_log("Parse failure");
1065 return -PA_ERR_PROTOCOL;
1066 }
1067
1068 for (uint8_t j = 0; j < n_formats; j++) {
1069 format = pa_format_info_new();
1070 if (pa_tagstruct_get_format_info(t, format)) { /* format info */
1071 pa_format_info_free(format);
1072 pa_log("Parse failure");
1073 return -PA_ERR_PROTOCOL;
1074 }
1075 pa_format_info_free(format);
1076 }
1077 return 0;
1078 }
1079
1080 #ifdef TUNNEL_SINK
1081
1082 /* Called from main context */
sink_info_cb(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1083 static void sink_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1084 struct userdata *u = userdata;
1085 uint32_t idx, owner_module, monitor_source, flags;
1086 const char *name, *description, *monitor_source_name, *driver;
1087 pa_sample_spec ss;
1088 pa_channel_map cm;
1089 pa_cvolume volume;
1090 bool mute;
1091 pa_usec_t latency;
1092
1093 pa_assert(pd);
1094 pa_assert(u);
1095
1096 if (command != PA_COMMAND_REPLY) {
1097 if (command == PA_COMMAND_ERROR)
1098 pa_log("Failed to get info.");
1099 else
1100 pa_log("Protocol error.");
1101 goto fail;
1102 }
1103
1104 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1105 pa_tagstruct_gets(t, &name) < 0 ||
1106 pa_tagstruct_gets(t, &description) < 0 ||
1107 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1108 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1109 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1110 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1111 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1112 pa_tagstruct_getu32(t, &monitor_source) < 0 ||
1113 pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
1114 pa_tagstruct_get_usec(t, &latency) < 0 ||
1115 pa_tagstruct_gets(t, &driver) < 0 ||
1116 pa_tagstruct_getu32(t, &flags) < 0) {
1117
1118 pa_log("Parse failure");
1119 goto fail;
1120 }
1121
1122 if (u->version >= 13) {
1123 pa_usec_t configured_latency;
1124
1125 if (pa_tagstruct_get_proplist(t, NULL) < 0 ||
1126 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1127
1128 pa_log("Parse failure");
1129 goto fail;
1130 }
1131 }
1132
1133 if (u->version >= 15) {
1134 pa_volume_t base_volume;
1135 uint32_t state, n_volume_steps, card;
1136
1137 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1138 pa_tagstruct_getu32(t, &state) < 0 ||
1139 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1140 pa_tagstruct_getu32(t, &card) < 0) {
1141
1142 pa_log("Parse failure");
1143 goto fail;
1144 }
1145 }
1146
1147 if (read_ports(u, t) < 0)
1148 goto fail;
1149
1150 if (u->version >= 21 && read_formats(u, t) < 0)
1151 goto fail;
1152
1153 if (!pa_tagstruct_eof(t)) {
1154 pa_log("Packet too long");
1155 goto fail;
1156 }
1157
1158 if (!u->sink_name || !pa_streq(name, u->sink_name))
1159 return;
1160
1161 pa_xfree(u->device_description);
1162 u->device_description = pa_xstrdup(description);
1163
1164 update_description(u);
1165
1166 return;
1167
1168 fail:
1169 pa_module_unload_request(u->module, true);
1170 }
1171
1172 /* Called from main context */
sink_input_info_cb(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1173 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1174 struct userdata *u = userdata;
1175 uint32_t idx, owner_module, client, sink;
1176 pa_usec_t buffer_usec, sink_usec;
1177 const char *name, *driver, *resample_method;
1178 bool mute = false;
1179 pa_sample_spec sample_spec;
1180 pa_channel_map channel_map;
1181 pa_cvolume volume;
1182 bool b;
1183
1184 pa_assert(pd);
1185 pa_assert(u);
1186
1187 if (command != PA_COMMAND_REPLY) {
1188 if (command == PA_COMMAND_ERROR)
1189 pa_log("Failed to get info.");
1190 else
1191 pa_log("Protocol error.");
1192 goto fail;
1193 }
1194
1195 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1196 pa_tagstruct_gets(t, &name) < 0 ||
1197 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1198 pa_tagstruct_getu32(t, &client) < 0 ||
1199 pa_tagstruct_getu32(t, &sink) < 0 ||
1200 pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1201 pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1202 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1203 pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1204 pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1205 pa_tagstruct_gets(t, &resample_method) < 0 ||
1206 pa_tagstruct_gets(t, &driver) < 0) {
1207
1208 pa_log("Parse failure");
1209 goto fail;
1210 }
1211
1212 if (u->version >= 11) {
1213 if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1214
1215 pa_log("Parse failure");
1216 goto fail;
1217 }
1218 }
1219
1220 if (u->version >= 13) {
1221 if (pa_tagstruct_get_proplist(t, NULL) < 0) {
1222
1223 pa_log("Parse failure");
1224 goto fail;
1225 }
1226 }
1227
1228 if (u->version >= 19) {
1229 if (pa_tagstruct_get_boolean(t, &b) < 0) {
1230
1231 pa_log("Parse failure");
1232 goto fail;
1233 }
1234 }
1235
1236 if (u->version >= 20) {
1237 if (pa_tagstruct_get_boolean(t, &b) < 0 ||
1238 pa_tagstruct_get_boolean(t, &b) < 0) {
1239
1240 pa_log("Parse failure");
1241 goto fail;
1242 }
1243 }
1244
1245 if (u->version >= 21) {
1246 pa_format_info *format = pa_format_info_new();
1247
1248 if (pa_tagstruct_get_format_info(t, format) < 0) {
1249 pa_format_info_free(format);
1250 pa_log("Parse failure");
1251 goto fail;
1252 }
1253 pa_format_info_free(format);
1254 }
1255
1256 if (!pa_tagstruct_eof(t)) {
1257 pa_log("Packet too long");
1258 goto fail;
1259 }
1260
1261 if (idx != u->device_index)
1262 return;
1263
1264 pa_assert(u->sink);
1265
1266 if ((u->version < 11 || mute == u->sink->muted) &&
1267 pa_cvolume_equal(&volume, &u->sink->real_volume))
1268 return;
1269
1270 pa_sink_volume_changed(u->sink, &volume);
1271
1272 if (u->version >= 11)
1273 pa_sink_mute_changed(u->sink, mute);
1274
1275 return;
1276
1277 fail:
1278 pa_module_unload_request(u->module, true);
1279 }
1280
1281 #else
1282
1283 /* Called from main context */
source_info_cb(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1284 static void source_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1285 struct userdata *u = userdata;
1286 uint32_t idx, owner_module, monitor_of_sink, flags;
1287 const char *name, *description, *monitor_of_sink_name, *driver;
1288 pa_sample_spec ss;
1289 pa_channel_map cm;
1290 pa_cvolume volume;
1291 bool mute;
1292 pa_usec_t latency, configured_latency;
1293
1294 pa_assert(pd);
1295 pa_assert(u);
1296
1297 if (command != PA_COMMAND_REPLY) {
1298 if (command == PA_COMMAND_ERROR)
1299 pa_log("Failed to get info.");
1300 else
1301 pa_log("Protocol error.");
1302 goto fail;
1303 }
1304
1305 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1306 pa_tagstruct_gets(t, &name) < 0 ||
1307 pa_tagstruct_gets(t, &description) < 0 ||
1308 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1309 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1310 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1311 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1312 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1313 pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1314 pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1315 pa_tagstruct_get_usec(t, &latency) < 0 ||
1316 pa_tagstruct_gets(t, &driver) < 0 ||
1317 pa_tagstruct_getu32(t, &flags) < 0) {
1318
1319 pa_log("Parse failure");
1320 goto fail;
1321 }
1322
1323 if (u->version >= 13) {
1324 if (pa_tagstruct_get_proplist(t, NULL) < 0 ||
1325 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1326
1327 pa_log("Parse failure");
1328 goto fail;
1329 }
1330 }
1331
1332 if (u->version >= 15) {
1333 pa_volume_t base_volume;
1334 uint32_t state, n_volume_steps, card;
1335
1336 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1337 pa_tagstruct_getu32(t, &state) < 0 ||
1338 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1339 pa_tagstruct_getu32(t, &card) < 0) {
1340
1341 pa_log("Parse failure");
1342 goto fail;
1343 }
1344 }
1345
1346 if (read_ports(u, t) < 0)
1347 goto fail;
1348
1349 if (u->version >= 22 && read_formats(u, t) < 0)
1350 goto fail;
1351
1352 if (!pa_tagstruct_eof(t)) {
1353 pa_log("Packet too long");
1354 goto fail;
1355 }
1356
1357 if (!u->source_name || !pa_streq(name, u->source_name))
1358 return;
1359
1360 pa_xfree(u->device_description);
1361 u->device_description = pa_xstrdup(description);
1362
1363 update_description(u);
1364
1365 return;
1366
1367 fail:
1368 pa_module_unload_request(u->module, true);
1369 }
1370
1371 #endif
1372
1373 /* Called from main context */
request_info(struct userdata * u)1374 static void request_info(struct userdata *u) {
1375 pa_tagstruct *t;
1376 uint32_t tag;
1377 pa_assert(u);
1378
1379 t = pa_tagstruct_new();
1380 pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1381 pa_tagstruct_putu32(t, tag = u->ctag++);
1382 pa_pstream_send_tagstruct(u->pstream, t);
1383 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1384
1385 #ifdef TUNNEL_SINK
1386 t = pa_tagstruct_new();
1387 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1388 pa_tagstruct_putu32(t, tag = u->ctag++);
1389 pa_tagstruct_putu32(t, u->device_index);
1390 pa_pstream_send_tagstruct(u->pstream, t);
1391 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1392
1393 if (u->sink_name) {
1394 t = pa_tagstruct_new();
1395 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1396 pa_tagstruct_putu32(t, tag = u->ctag++);
1397 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1398 pa_tagstruct_puts(t, u->sink_name);
1399 pa_pstream_send_tagstruct(u->pstream, t);
1400 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1401 }
1402 #else
1403 if (u->source_name) {
1404 t = pa_tagstruct_new();
1405 pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1406 pa_tagstruct_putu32(t, tag = u->ctag++);
1407 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1408 pa_tagstruct_puts(t, u->source_name);
1409 pa_pstream_send_tagstruct(u->pstream, t);
1410 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1411 }
1412 #endif
1413 }
1414
1415 /* Called from main context */
command_subscribe_event(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1416 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1417 struct userdata *u = userdata;
1418 pa_subscription_event_type_t e;
1419 uint32_t idx;
1420
1421 pa_assert(pd);
1422 pa_assert(t);
1423 pa_assert(u);
1424 pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1425
1426 if (pa_tagstruct_getu32(t, &e) < 0 ||
1427 pa_tagstruct_getu32(t, &idx) < 0) {
1428 pa_log("Invalid protocol reply");
1429 pa_module_unload_request(u->module, true);
1430 return;
1431 }
1432
1433 if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1434 #ifdef TUNNEL_SINK
1435 e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1436 e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1437 #else
1438 e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1439 #endif
1440 )
1441 return;
1442
1443 request_info(u);
1444 }
1445
1446 /* Called from main context */
start_subscribe(struct userdata * u)1447 static void start_subscribe(struct userdata *u) {
1448 pa_tagstruct *t;
1449 pa_assert(u);
1450
1451 t = pa_tagstruct_new();
1452 pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1453 pa_tagstruct_putu32(t, u->ctag++);
1454 pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1455 #ifdef TUNNEL_SINK
1456 PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1457 #else
1458 PA_SUBSCRIPTION_MASK_SOURCE
1459 #endif
1460 );
1461
1462 pa_pstream_send_tagstruct(u->pstream, t);
1463 }
1464
1465 /* Called from main context */
create_stream_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1466 static void create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1467 struct userdata *u = userdata;
1468 #ifdef TUNNEL_SINK
1469 uint32_t bytes;
1470 #endif
1471
1472 pa_assert(pd);
1473 pa_assert(u);
1474 pa_assert(u->pdispatch == pd);
1475
1476 if (command != PA_COMMAND_REPLY) {
1477 if (command == PA_COMMAND_ERROR)
1478 pa_log("Failed to create stream.");
1479 else
1480 pa_log("Protocol error.");
1481 goto fail;
1482 }
1483
1484 if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1485 pa_tagstruct_getu32(t, &u->device_index) < 0
1486 #ifdef TUNNEL_SINK
1487 || pa_tagstruct_getu32(t, &bytes) < 0
1488 #endif
1489 )
1490 goto parse_error;
1491
1492 if (u->version >= 9) {
1493 #ifdef TUNNEL_SINK
1494 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1495 pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1496 pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1497 pa_tagstruct_getu32(t, &u->minreq) < 0)
1498 goto parse_error;
1499 #else
1500 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1501 pa_tagstruct_getu32(t, &u->fragsize) < 0)
1502 goto parse_error;
1503 #endif
1504 }
1505
1506 if (u->version >= 12) {
1507 pa_sample_spec ss;
1508 pa_channel_map cm;
1509 uint32_t device_index;
1510 const char *dn;
1511 bool suspended;
1512
1513 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1514 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1515 pa_tagstruct_getu32(t, &device_index) < 0 ||
1516 pa_tagstruct_gets(t, &dn) < 0 ||
1517 pa_tagstruct_get_boolean(t, &suspended) < 0)
1518 goto parse_error;
1519
1520 #ifdef TUNNEL_SINK
1521 pa_xfree(u->sink_name);
1522 u->sink_name = pa_xstrdup(dn);
1523 #else
1524 pa_xfree(u->source_name);
1525 u->source_name = pa_xstrdup(dn);
1526 #endif
1527 }
1528
1529 if (u->version >= 13) {
1530 pa_usec_t usec;
1531
1532 if (pa_tagstruct_get_usec(t, &usec) < 0)
1533 goto parse_error;
1534
1535 /* #ifdef TUNNEL_SINK */
1536 /* pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1537 /* #else */
1538 /* pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1539 /* #endif */
1540 }
1541
1542 if (u->version >= 21) {
1543 pa_format_info *format = pa_format_info_new();
1544
1545 if (pa_tagstruct_get_format_info(t, format) < 0) {
1546 pa_format_info_free(format);
1547 goto parse_error;
1548 }
1549
1550 pa_format_info_free(format);
1551 }
1552
1553 if (!pa_tagstruct_eof(t))
1554 goto parse_error;
1555
1556 start_subscribe(u);
1557 request_info(u);
1558
1559 pa_assert(!u->time_event);
1560 u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + LATENCY_INTERVAL, timeout_callback, u);
1561
1562 request_latency(u);
1563
1564 pa_log_debug("Stream created.");
1565
1566 #ifdef TUNNEL_SINK
1567 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1568 #endif
1569
1570 return;
1571
1572 parse_error:
1573 pa_log("Invalid reply. (Create stream)");
1574
1575 fail:
1576 pa_module_unload_request(u->module, true);
1577
1578 }
1579
1580 /* Called from main context */
setup_complete_callback(pa_pdispatch * pd,uint32_t command,uint32_t tag,pa_tagstruct * t,void * userdata)1581 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1582 struct userdata *u = userdata;
1583 pa_tagstruct *reply;
1584 char name[256], un[128], hn[128];
1585 pa_cvolume volume;
1586
1587 pa_assert(pd);
1588 pa_assert(u);
1589 pa_assert(u->pdispatch == pd);
1590
1591 if (command != PA_COMMAND_REPLY ||
1592 pa_tagstruct_getu32(t, &u->version) < 0 ||
1593 !pa_tagstruct_eof(t)) {
1594
1595 if (command == PA_COMMAND_ERROR)
1596 pa_log("Failed to authenticate");
1597 else
1598 pa_log("Protocol error.");
1599
1600 goto fail;
1601 }
1602
1603 /* Minimum supported protocol version */
1604 if (u->version < 8) {
1605 pa_log("Incompatible protocol version");
1606 goto fail;
1607 }
1608
1609 /* Starting with protocol version 13 the MSB of the version tag
1610 reflects if shm is enabled for this connection or not. We don't
1611 support SHM here at all, so we just ignore this. */
1612
1613 if (u->version >= 13)
1614 u->version &= 0x7FFFFFFFU;
1615
1616 pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1617
1618 #ifdef TUNNEL_SINK
1619 pa_proplist_setf(u->sink->proplist, "tunnel.remote_version", "%u", u->version);
1620 pa_sink_update_proplist(u->sink, 0, NULL);
1621
1622 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1623 u->sink_name,
1624 pa_get_user_name(un, sizeof(un)),
1625 pa_get_host_name(hn, sizeof(hn)));
1626 #else
1627 pa_proplist_setf(u->source->proplist, "tunnel.remote_version", "%u", u->version);
1628 pa_source_update_proplist(u->source, 0, NULL);
1629
1630 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1631 u->source_name,
1632 pa_get_user_name(un, sizeof(un)),
1633 pa_get_host_name(hn, sizeof(hn)));
1634 #endif
1635
1636 reply = pa_tagstruct_new();
1637 pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1638 pa_tagstruct_putu32(reply, u->ctag++);
1639
1640 if (u->version >= 13) {
1641 pa_proplist *pl;
1642 pl = pa_proplist_new();
1643 pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1644 pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1645 pa_init_proplist(pl);
1646 pa_tagstruct_put_proplist(reply, pl);
1647 pa_proplist_free(pl);
1648 } else
1649 pa_tagstruct_puts(reply, "PulseAudio");
1650
1651 pa_pstream_send_tagstruct(u->pstream, reply);
1652 /* We ignore the server's reply here */
1653
1654 reply = pa_tagstruct_new();
1655
1656 if (u->version < 13)
1657 /* Only for older PA versions we need to fill in the maxlength */
1658 u->maxlength = 4*1024*1024;
1659
1660 #ifdef TUNNEL_SINK
1661 u->tlength = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &u->sink->sample_spec);
1662 u->minreq = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &u->sink->sample_spec);
1663 u->prebuf = u->tlength;
1664 #else
1665 u->fragsize = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &u->source->sample_spec);
1666 #endif
1667
1668 #ifdef TUNNEL_SINK
1669 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1670 pa_tagstruct_putu32(reply, tag = u->ctag++);
1671
1672 if (u->version < 13)
1673 pa_tagstruct_puts(reply, name);
1674
1675 pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1676 pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1677 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1678 pa_tagstruct_puts(reply, u->sink_name);
1679 pa_tagstruct_putu32(reply, u->maxlength);
1680 pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(u->sink->state));
1681 pa_tagstruct_putu32(reply, u->tlength);
1682 pa_tagstruct_putu32(reply, u->prebuf);
1683 pa_tagstruct_putu32(reply, u->minreq);
1684 pa_tagstruct_putu32(reply, 0);
1685 pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1686 pa_tagstruct_put_cvolume(reply, &volume);
1687 #else
1688 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1689 pa_tagstruct_putu32(reply, tag = u->ctag++);
1690
1691 if (u->version < 13)
1692 pa_tagstruct_puts(reply, name);
1693
1694 pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1695 pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1696 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1697 pa_tagstruct_puts(reply, u->source_name);
1698 pa_tagstruct_putu32(reply, u->maxlength);
1699 pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(u->source->state));
1700 pa_tagstruct_putu32(reply, u->fragsize);
1701 #endif
1702
1703 if (u->version >= 12) {
1704 pa_tagstruct_put_boolean(reply, false); /* no_remap */
1705 pa_tagstruct_put_boolean(reply, false); /* no_remix */
1706 pa_tagstruct_put_boolean(reply, false); /* fix_format */
1707 pa_tagstruct_put_boolean(reply, false); /* fix_rate */
1708 pa_tagstruct_put_boolean(reply, false); /* fix_channels */
1709 pa_tagstruct_put_boolean(reply, true); /* no_move */
1710 pa_tagstruct_put_boolean(reply, false); /* variable_rate */
1711 }
1712
1713 if (u->version >= 13) {
1714 pa_proplist *pl;
1715
1716 pa_tagstruct_put_boolean(reply, false); /* start muted/peak detect*/
1717 pa_tagstruct_put_boolean(reply, true); /* adjust_latency */
1718
1719 pl = pa_proplist_new();
1720 pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1721 pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1722 pa_tagstruct_put_proplist(reply, pl);
1723 pa_proplist_free(pl);
1724
1725 #ifndef TUNNEL_SINK
1726 pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1727 #endif
1728 }
1729
1730 if (u->version >= 14) {
1731 #ifdef TUNNEL_SINK
1732 pa_tagstruct_put_boolean(reply, false); /* volume_set */
1733 #endif
1734 pa_tagstruct_put_boolean(reply, true); /* early rquests */
1735 }
1736
1737 if (u->version >= 15) {
1738 #ifdef TUNNEL_SINK
1739 pa_tagstruct_put_boolean(reply, false); /* muted_set */
1740 #endif
1741 pa_tagstruct_put_boolean(reply, false); /* don't inhibit auto suspend */
1742 pa_tagstruct_put_boolean(reply, false); /* fail on suspend */
1743 }
1744
1745 #ifdef TUNNEL_SINK
1746 if (u->version >= 17)
1747 pa_tagstruct_put_boolean(reply, false); /* relative volume */
1748
1749 if (u->version >= 18)
1750 pa_tagstruct_put_boolean(reply, false); /* passthrough stream */
1751 #endif
1752
1753 #ifdef TUNNEL_SINK
1754 if (u->version >= 21) {
1755 /* We're not using the extended API, so n_formats = 0 and that's that */
1756 pa_tagstruct_putu8(reply, 0);
1757 }
1758 #else
1759 if (u->version >= 22) {
1760 /* We're not using the extended API, so n_formats = 0 and that's that */
1761 pa_tagstruct_putu8(reply, 0);
1762 pa_cvolume_reset(&volume, u->source->sample_spec.channels);
1763 pa_tagstruct_put_cvolume(reply, &volume);
1764 pa_tagstruct_put_boolean(reply, false); /* muted */
1765 pa_tagstruct_put_boolean(reply, false); /* volume_set */
1766 pa_tagstruct_put_boolean(reply, false); /* muted_set */
1767 pa_tagstruct_put_boolean(reply, false); /* relative volume */
1768 pa_tagstruct_put_boolean(reply, false); /* passthrough stream */
1769 }
1770 #endif
1771
1772 pa_pstream_send_tagstruct(u->pstream, reply);
1773 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1774
1775 pa_log_debug("Connection authenticated, creating stream ...");
1776
1777 return;
1778
1779 fail:
1780 pa_module_unload_request(u->module, true);
1781 }
1782
1783 /* Called from main context */
pstream_die_callback(pa_pstream * p,void * userdata)1784 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1785 struct userdata *u = userdata;
1786
1787 pa_assert(p);
1788 pa_assert(u);
1789
1790 pa_log_warn("Stream died.");
1791 pa_module_unload_request(u->module, true);
1792 }
1793
1794 /* Called from main context */
pstream_packet_callback(pa_pstream * p,pa_packet * packet,pa_cmsg_ancil_data * ancil_data,void * userdata)1795 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, pa_cmsg_ancil_data *ancil_data, void *userdata) {
1796 struct userdata *u = userdata;
1797
1798 pa_assert(p);
1799 pa_assert(packet);
1800 pa_assert(u);
1801
1802 if (pa_pdispatch_run(u->pdispatch, packet, ancil_data, u) < 0) {
1803 pa_log("Invalid packet");
1804 pa_module_unload_request(u->module, true);
1805 return;
1806 }
1807 }
1808
1809 #ifndef TUNNEL_SINK
1810 /* Called from main context */
pstream_memblock_callback(pa_pstream * p,uint32_t channel,int64_t offset,pa_seek_mode_t seek,const pa_memchunk * chunk,void * userdata)1811 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1812 struct userdata *u = userdata;
1813
1814 pa_assert(p);
1815 pa_assert(chunk);
1816 pa_assert(u);
1817
1818 if (channel != u->channel) {
1819 pa_log("Received memory block on bad channel.");
1820 pa_module_unload_request(u->module, true);
1821 return;
1822 }
1823
1824 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1825
1826 u->counter_delta += (int64_t) chunk->length;
1827 }
1828 #endif
1829
1830 /* Called from main context */
on_connection(pa_socket_client * sc,pa_iochannel * io,void * userdata)1831 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1832 struct userdata *u = userdata;
1833 pa_tagstruct *t;
1834 uint32_t tag;
1835
1836 pa_assert(sc);
1837 pa_assert(u);
1838 pa_assert(u->client == sc);
1839
1840 pa_socket_client_unref(u->client);
1841 u->client = NULL;
1842
1843 if (!io) {
1844 pa_log("Connection failed: %s", pa_cstrerror(errno));
1845 pa_module_unload_request(u->module, true);
1846 return;
1847 }
1848
1849 u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
1850 u->pdispatch = pa_pdispatch_new(u->core->mainloop, true, command_table, PA_COMMAND_MAX);
1851
1852 pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
1853 pa_pstream_set_receive_packet_callback(u->pstream, pstream_packet_callback, u);
1854 #ifndef TUNNEL_SINK
1855 pa_pstream_set_receive_memblock_callback(u->pstream, pstream_memblock_callback, u);
1856 #endif
1857
1858 t = pa_tagstruct_new();
1859 pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
1860 pa_tagstruct_putu32(t, tag = u->ctag++);
1861 pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
1862
1863 pa_tagstruct_put_arbitrary(t, pa_auth_cookie_read(u->auth_cookie, PA_NATIVE_COOKIE_LENGTH), PA_NATIVE_COOKIE_LENGTH);
1864
1865 #ifdef HAVE_CREDS
1866 {
1867 pa_creds ucred;
1868
1869 if (pa_iochannel_creds_supported(io))
1870 pa_iochannel_creds_enable(io);
1871
1872 ucred.uid = getuid();
1873 ucred.gid = getgid();
1874
1875 pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
1876 }
1877 #else
1878 pa_pstream_send_tagstruct(u->pstream, t);
1879 #endif
1880
1881 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
1882
1883 pa_log_debug("Connection established, authenticating ...");
1884 }
1885
1886 #ifdef TUNNEL_SINK
1887
1888 /* Called from main context */
sink_set_volume(pa_sink * sink)1889 static void sink_set_volume(pa_sink *sink) {
1890 struct userdata *u;
1891 pa_tagstruct *t;
1892
1893 pa_assert(sink);
1894 u = sink->userdata;
1895 pa_assert(u);
1896
1897 t = pa_tagstruct_new();
1898 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
1899 pa_tagstruct_putu32(t, u->ctag++);
1900 pa_tagstruct_putu32(t, u->device_index);
1901 pa_tagstruct_put_cvolume(t, &sink->real_volume);
1902 pa_pstream_send_tagstruct(u->pstream, t);
1903 }
1904
1905 /* Called from main context */
sink_set_mute(pa_sink * sink)1906 static void sink_set_mute(pa_sink *sink) {
1907 struct userdata *u;
1908 pa_tagstruct *t;
1909
1910 pa_assert(sink);
1911 u = sink->userdata;
1912 pa_assert(u);
1913
1914 if (u->version < 11)
1915 return;
1916
1917 t = pa_tagstruct_new();
1918 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
1919 pa_tagstruct_putu32(t, u->ctag++);
1920 pa_tagstruct_putu32(t, u->device_index);
1921 pa_tagstruct_put_boolean(t, sink->muted);
1922 pa_pstream_send_tagstruct(u->pstream, t);
1923 }
1924
1925 #endif
1926
pa__init(pa_module * m)1927 int pa__init(pa_module*m) {
1928 pa_modargs *ma = NULL;
1929 struct userdata *u = NULL;
1930 char *server = NULL;
1931 pa_strlist *server_list = NULL;
1932 pa_sample_spec ss;
1933 pa_channel_map map;
1934 char *dn = NULL;
1935 #ifdef TUNNEL_SINK
1936 pa_sink_new_data data;
1937 #else
1938 pa_source_new_data data;
1939 #endif
1940 bool automatic;
1941 #ifdef HAVE_X11
1942 xcb_connection_t *xcb = NULL;
1943 #endif
1944 const char *cookie_path;
1945
1946 pa_assert(m);
1947
1948 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1949 pa_log("Failed to parse module arguments");
1950 goto fail;
1951 }
1952
1953 m->userdata = u = pa_xnew0(struct userdata, 1);
1954 u->core = m->core;
1955 u->module = m;
1956 u->client = NULL;
1957 u->pdispatch = NULL;
1958 u->pstream = NULL;
1959 u->server_name = NULL;
1960 #ifdef TUNNEL_SINK
1961 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
1962 u->sink = NULL;
1963 u->requested_bytes = 0;
1964 #else
1965 u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
1966 u->source = NULL;
1967 #endif
1968 u->smoother = pa_smoother_new(
1969 PA_USEC_PER_SEC,
1970 PA_USEC_PER_SEC*2,
1971 true,
1972 true,
1973 10,
1974 pa_rtclock_now(),
1975 false);
1976 u->ctag = 1;
1977 u->device_index = u->channel = PA_INVALID_INDEX;
1978 u->time_event = NULL;
1979 u->ignore_latency_before = 0;
1980 u->transport_usec = u->thread_transport_usec = 0;
1981 u->remote_suspended = u->remote_corked = false;
1982 u->counter = u->counter_delta = 0;
1983
1984 u->rtpoll = pa_rtpoll_new();
1985
1986 if (pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll) < 0) {
1987 pa_log("pa_thread_mq_init() failed.");
1988 goto fail;
1989 }
1990
1991 if (pa_modargs_get_value_boolean(ma, "auto", &automatic) < 0) {
1992 pa_log("Failed to parse argument \"auto\".");
1993 goto fail;
1994 }
1995
1996 cookie_path = pa_modargs_get_value(ma, "cookie", NULL);
1997 server = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL));
1998
1999 if (automatic) {
2000 #ifdef HAVE_X11
2001 /* Need an X11 connection to get root properties */
2002 if (getenv("DISPLAY") != NULL) {
2003 if (!(xcb = xcb_connect(getenv("DISPLAY"), NULL)))
2004 pa_log("xcb_connect() failed");
2005 else {
2006 if (xcb_connection_has_error(xcb)) {
2007 pa_log("xcb_connection_has_error() returned true");
2008 xcb_disconnect(xcb);
2009 xcb = NULL;
2010 }
2011 }
2012 }
2013 #endif
2014
2015 /* Figure out the cookie the same way a normal client would */
2016 if (!cookie_path)
2017 cookie_path = getenv(ENV_COOKIE_FILE);
2018
2019 #ifdef HAVE_X11
2020 if (!cookie_path && xcb) {
2021 char t[1024];
2022 if (pa_x11_get_prop(xcb, 0, "PULSE_COOKIE", t, sizeof(t))) {
2023 uint8_t cookie[PA_NATIVE_COOKIE_LENGTH];
2024
2025 if (pa_parsehex(t, cookie, sizeof(cookie)) != sizeof(cookie))
2026 pa_log("Failed to parse cookie data");
2027 else {
2028 if (!(u->auth_cookie = pa_auth_cookie_create(u->core, cookie, sizeof(cookie))))
2029 goto fail;
2030 }
2031 }
2032 }
2033 #endif
2034
2035 /* Same thing for the server name */
2036 if (!server)
2037 server = pa_xstrdup(getenv(ENV_DEFAULT_SERVER));
2038
2039 #ifdef HAVE_X11
2040 if (!server && xcb) {
2041 char t[1024];
2042 if (pa_x11_get_prop(xcb, 0, "PULSE_SERVER", t, sizeof(t)))
2043 server = pa_xstrdup(t);
2044 }
2045 #endif
2046
2047 /* Also determine the default sink/source on the other server */
2048 #ifdef TUNNEL_SINK
2049 if (!u->sink_name)
2050 u->sink_name = pa_xstrdup(getenv(ENV_DEFAULT_SINK));
2051
2052 #ifdef HAVE_X11
2053 if (!u->sink_name && xcb) {
2054 char t[1024];
2055 if (pa_x11_get_prop(xcb, 0, "PULSE_SINK", t, sizeof(t)))
2056 u->sink_name = pa_xstrdup(t);
2057 }
2058 #endif
2059 #else
2060 if (!u->source_name)
2061 u->source_name = pa_xstrdup(getenv(ENV_DEFAULT_SOURCE));
2062
2063 #ifdef HAVE_X11
2064 if (!u->source_name && xcb) {
2065 char t[1024];
2066 if (pa_x11_get_prop(xcb, 0, "PULSE_SOURCE", t, sizeof(t)))
2067 u->source_name = pa_xstrdup(t);
2068 }
2069 #endif
2070 #endif
2071 }
2072
2073 if (!cookie_path && !u->auth_cookie)
2074 cookie_path = PA_NATIVE_COOKIE_FILE;
2075
2076 if (cookie_path) {
2077 if (!(u->auth_cookie = pa_auth_cookie_get(u->core, cookie_path, true, PA_NATIVE_COOKIE_LENGTH)))
2078 goto fail;
2079 }
2080
2081 if (server) {
2082 if (!(server_list = pa_strlist_parse(server))) {
2083 pa_log("Invalid server specified.");
2084 goto fail;
2085 }
2086 } else {
2087 char *ufn;
2088
2089 if (!automatic) {
2090 pa_log("No server specified.");
2091 goto fail;
2092 }
2093
2094 pa_log("No server address found. Attempting default local sockets.");
2095
2096 /* The system wide instance via PF_LOCAL */
2097 server_list = pa_strlist_prepend(server_list, PA_SYSTEM_RUNTIME_PATH PA_PATH_SEP PA_NATIVE_DEFAULT_UNIX_SOCKET);
2098
2099 /* The user instance via PF_LOCAL */
2100 if ((ufn = pa_runtime_path(PA_NATIVE_DEFAULT_UNIX_SOCKET))) {
2101 server_list = pa_strlist_prepend(server_list, ufn);
2102 pa_xfree(ufn);
2103 }
2104 }
2105
2106 ss = m->core->default_sample_spec;
2107 map = m->core->default_channel_map;
2108 if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
2109 pa_log("Invalid sample format specification");
2110 goto fail;
2111 }
2112
2113 for (;;) {
2114 server_list = pa_strlist_pop(server_list, &u->server_name);
2115
2116 if (!u->server_name) {
2117 pa_log("Failed to connect to server '%s'", server);
2118 goto fail;
2119 }
2120
2121 pa_log_debug("Trying to connect to %s...", u->server_name);
2122
2123 if (!(u->client = pa_socket_client_new_string(m->core->mainloop, true, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
2124 pa_xfree(u->server_name);
2125 u->server_name = NULL;
2126 continue;
2127 }
2128
2129 break;
2130 }
2131
2132 pa_socket_client_set_callback(u->client, on_connection, u);
2133
2134 #ifdef TUNNEL_SINK
2135
2136 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
2137 dn = pa_sprintf_malloc("tunnel-sink.%s", u->server_name);
2138
2139 pa_sink_new_data_init(&data);
2140 data.driver = __FILE__;
2141 data.module = m;
2142 data.namereg_fail = false;
2143 pa_sink_new_data_set_name(&data, dn);
2144 pa_sink_new_data_set_sample_spec(&data, &ss);
2145 pa_sink_new_data_set_channel_map(&data, &map);
2146 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
2147 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
2148 if (u->sink_name)
2149 pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
2150
2151 if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
2152 pa_log("Invalid properties");
2153 pa_sink_new_data_done(&data);
2154 goto fail;
2155 }
2156
2157 u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY);
2158 pa_sink_new_data_done(&data);
2159
2160 if (!u->sink) {
2161 pa_log("Failed to create sink.");
2162 goto fail;
2163 }
2164
2165 u->sink->parent.process_msg = sink_process_msg;
2166 u->sink->userdata = u;
2167 u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
2168 pa_sink_set_set_volume_callback(u->sink, sink_set_volume);
2169 pa_sink_set_set_mute_callback(u->sink, sink_set_mute);
2170
2171 u->sink->refresh_volume = u->sink->refresh_muted = false;
2172
2173 /* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
2174
2175 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
2176 pa_sink_set_rtpoll(u->sink, u->rtpoll);
2177
2178 #else
2179
2180 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
2181 dn = pa_sprintf_malloc("tunnel-source.%s", u->server_name);
2182
2183 pa_source_new_data_init(&data);
2184 data.driver = __FILE__;
2185 data.module = m;
2186 data.namereg_fail = false;
2187 pa_source_new_data_set_name(&data, dn);
2188 pa_source_new_data_set_sample_spec(&data, &ss);
2189 pa_source_new_data_set_channel_map(&data, &map);
2190 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
2191 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
2192 if (u->source_name)
2193 pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
2194
2195 if (pa_modargs_get_proplist(ma, "source_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
2196 pa_log("Invalid properties");
2197 pa_source_new_data_done(&data);
2198 goto fail;
2199 }
2200
2201 u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
2202 pa_source_new_data_done(&data);
2203
2204 if (!u->source) {
2205 pa_log("Failed to create source.");
2206 goto fail;
2207 }
2208
2209 u->source->parent.process_msg = source_process_msg;
2210 u->source->set_state_in_main_thread = source_set_state_in_main_thread_cb;
2211 u->source->userdata = u;
2212
2213 /* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
2214
2215 pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
2216 pa_source_set_rtpoll(u->source, u->rtpoll);
2217
2218 u->mcalign = pa_mcalign_new(pa_frame_size(&u->source->sample_spec));
2219 #endif
2220
2221 u->time_event = NULL;
2222
2223 u->maxlength = (uint32_t) -1;
2224 #ifdef TUNNEL_SINK
2225 u->tlength = u->minreq = u->prebuf = (uint32_t) -1;
2226 #else
2227 u->fragsize = (uint32_t) -1;
2228 #endif
2229
2230 if (!(u->thread = pa_thread_new("module-tunnel", thread_func, u))) {
2231 pa_log("Failed to create thread.");
2232 goto fail;
2233 }
2234
2235 #ifdef TUNNEL_SINK
2236 pa_sink_put(u->sink);
2237 #else
2238 pa_source_put(u->source);
2239 #endif
2240
2241 pa_xfree(dn);
2242
2243 if (server)
2244 pa_xfree(server);
2245
2246 if (server_list)
2247 pa_strlist_free(server_list);
2248
2249 #ifdef HAVE_X11
2250 if (xcb)
2251 xcb_disconnect(xcb);
2252 #endif
2253
2254 pa_modargs_free(ma);
2255
2256 return 0;
2257
2258 fail:
2259 pa__done(m);
2260
2261 if (server)
2262 pa_xfree(server);
2263
2264 if (server_list)
2265 pa_strlist_free(server_list);
2266
2267 #ifdef HAVE_X11
2268 if (xcb)
2269 xcb_disconnect(xcb);
2270 #endif
2271
2272 if (ma)
2273 pa_modargs_free(ma);
2274
2275 pa_xfree(dn);
2276
2277 return -1;
2278 }
2279
pa__done(pa_module * m)2280 void pa__done(pa_module*m) {
2281 struct userdata* u;
2282
2283 pa_assert(m);
2284
2285 if (!(u = m->userdata))
2286 return;
2287
2288 #ifdef TUNNEL_SINK
2289 if (u->sink)
2290 pa_sink_unlink(u->sink);
2291 #else
2292 if (u->source)
2293 pa_source_unlink(u->source);
2294 #endif
2295
2296 if (u->thread) {
2297 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
2298 pa_thread_free(u->thread);
2299 }
2300
2301 pa_thread_mq_done(&u->thread_mq);
2302
2303 #ifdef TUNNEL_SINK
2304 if (u->sink)
2305 pa_sink_unref(u->sink);
2306 #else
2307 if (u->source)
2308 pa_source_unref(u->source);
2309 #endif
2310
2311 if (u->rtpoll)
2312 pa_rtpoll_free(u->rtpoll);
2313
2314 if (u->pstream) {
2315 pa_pstream_unlink(u->pstream);
2316 pa_pstream_unref(u->pstream);
2317 }
2318
2319 if (u->pdispatch)
2320 pa_pdispatch_unref(u->pdispatch);
2321
2322 if (u->client)
2323 pa_socket_client_unref(u->client);
2324
2325 if (u->auth_cookie)
2326 pa_auth_cookie_unref(u->auth_cookie);
2327
2328 if (u->smoother)
2329 pa_smoother_free(u->smoother);
2330
2331 if (u->time_event)
2332 u->core->mainloop->time_free(u->time_event);
2333
2334 #ifndef TUNNEL_SINK
2335 if (u->mcalign)
2336 pa_mcalign_free(u->mcalign);
2337 #endif
2338
2339 #ifdef TUNNEL_SINK
2340 pa_xfree(u->sink_name);
2341 #else
2342 pa_xfree(u->source_name);
2343 #endif
2344 pa_xfree(u->server_name);
2345
2346 pa_xfree(u->device_description);
2347 pa_xfree(u->server_fqdn);
2348 pa_xfree(u->user_name);
2349
2350 pa_xfree(u);
2351 }
2352