1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2013 Alexander Couzens
5
6 PulseAudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as published
8 by the Free Software Foundation; either version 2.1 of the License,
9 or (at your option) any later version.
10
11 PulseAudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public License
17 along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
18 ***/
19
20 #ifdef HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23
24 #include "restart-module.h"
25
26 #include <pulse/context.h>
27 #include <pulse/timeval.h>
28 #include <pulse/xmalloc.h>
29 #include <pulse/stream.h>
30 #include <pulse/mainloop.h>
31 #include <pulse/introspect.h>
32 #include <pulse/error.h>
33
34 #include <pulsecore/core.h>
35 #include <pulsecore/core-util.h>
36 #include <pulsecore/i18n.h>
37 #include <pulsecore/sink.h>
38 #include <pulsecore/modargs.h>
39 #include <pulsecore/log.h>
40 #include <pulsecore/thread.h>
41 #include <pulsecore/thread-mq.h>
42 #include <pulsecore/poll.h>
43 #include <pulsecore/rtpoll.h>
44 #include <pulsecore/proplist-util.h>
45
46 PA_MODULE_AUTHOR("Alexander Couzens");
47 PA_MODULE_DESCRIPTION("Create a network sink which connects via a stream to a remote PulseAudio server");
48 PA_MODULE_VERSION(PACKAGE_VERSION);
49 PA_MODULE_LOAD_ONCE(false);
50 PA_MODULE_USAGE(
51 "server=<address> "
52 "sink=<name of the remote sink> "
53 "sink_name=<name for the local sink> "
54 "sink_properties=<properties for the local sink> "
55 "reconnect_interval_ms=<interval to try reconnects, 0 or omitted if disabled> "
56 "format=<sample format> "
57 "channels=<number of channels> "
58 "rate=<sample rate> "
59 "channel_map=<channel map> "
60 "cookie=<cookie file path>"
61 );
62
63 #define MAX_LATENCY_USEC (200 * PA_USEC_PER_MSEC)
64 #define TUNNEL_THREAD_FAILED_MAINLOOP 1
65
66 static int do_init(pa_module *m);
67 static void do_done(pa_module *m);
68 static void stream_state_cb(pa_stream *stream, void *userdata);
69 static void stream_changed_buffer_attr_cb(pa_stream *stream, void *userdata);
70 static void stream_set_buffer_attr_cb(pa_stream *stream, int success, void *userdata);
71 static void context_state_cb(pa_context *c, void *userdata);
72 static void sink_update_requested_latency_cb(pa_sink *s);
73
74 struct tunnel_msg {
75 pa_msgobject parent;
76 };
77
78 typedef struct tunnel_msg tunnel_msg;
79 PA_DEFINE_PRIVATE_CLASS(tunnel_msg, pa_msgobject);
80
81 enum {
82 TUNNEL_MESSAGE_CREATE_SINK_REQUEST,
83 TUNNEL_MESSAGE_MAYBE_RESTART,
84 };
85
86 enum {
87 TUNNEL_MESSAGE_SINK_CREATED = PA_SINK_MESSAGE_MAX,
88 };
89
90 struct userdata {
91 pa_module *module;
92 pa_sink *sink;
93 pa_thread *thread;
94 pa_thread_mq *thread_mq;
95 pa_mainloop *thread_mainloop;
96 pa_mainloop_api *thread_mainloop_api;
97
98 pa_context *context;
99 pa_stream *stream;
100 pa_rtpoll *rtpoll;
101
102 bool update_stream_bufferattr_after_connect;
103
104 bool connected;
105 bool shutting_down;
106
107 char *cookie_file;
108 char *remote_server;
109 char *remote_sink_name;
110 char *sink_name;
111
112 pa_proplist *sink_proplist;
113 pa_sample_spec sample_spec;
114 pa_channel_map channel_map;
115
116 tunnel_msg *msg;
117
118 pa_usec_t reconnect_interval_us;
119 };
120
121 struct module_restart_data {
122 struct userdata *userdata;
123 pa_restart_data *restart_data;
124 };
125
126 static const char* const valid_modargs[] = {
127 "sink_name",
128 "sink_properties",
129 "server",
130 "sink",
131 "format",
132 "channels",
133 "rate",
134 "channel_map",
135 "cookie",
136 "reconnect_interval_ms",
137 NULL,
138 };
139
cork_stream(struct userdata * u,bool cork)140 static void cork_stream(struct userdata *u, bool cork) {
141 pa_operation *operation;
142
143 pa_assert(u);
144 pa_assert(u->stream);
145
146 if (cork) {
147 /* When the sink becomes suspended (which is the only case where we
148 * cork the stream), we don't want to keep any old data around, because
149 * the old data is most likely unrelated to the audio that will be
150 * played at the time when the sink starts running again. */
151 if ((operation = pa_stream_flush(u->stream, NULL, NULL)))
152 pa_operation_unref(operation);
153 }
154
155 if ((operation = pa_stream_cork(u->stream, cork, NULL, NULL)))
156 pa_operation_unref(operation);
157 }
158
reset_bufferattr(pa_buffer_attr * bufferattr)159 static void reset_bufferattr(pa_buffer_attr *bufferattr) {
160 pa_assert(bufferattr);
161 bufferattr->fragsize = (uint32_t) -1;
162 bufferattr->minreq = (uint32_t) -1;
163 bufferattr->maxlength = (uint32_t) -1;
164 bufferattr->prebuf = (uint32_t) -1;
165 bufferattr->tlength = (uint32_t) -1;
166 }
167
tunnel_new_proplist(struct userdata * u)168 static pa_proplist* tunnel_new_proplist(struct userdata *u) {
169 pa_proplist *proplist = pa_proplist_new();
170 pa_assert(proplist);
171 pa_proplist_sets(proplist, PA_PROP_APPLICATION_NAME, "PulseAudio");
172 pa_proplist_sets(proplist, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
173 pa_proplist_sets(proplist, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
174 pa_init_proplist(proplist);
175
176 return proplist;
177 }
178
thread_func(void * userdata)179 static void thread_func(void *userdata) {
180 struct userdata *u = userdata;
181 pa_proplist *proplist;
182
183 pa_assert(u);
184
185 pa_log_debug("Thread starting up");
186 pa_thread_mq_install(u->thread_mq);
187
188 proplist = tunnel_new_proplist(u);
189 u->context = pa_context_new_with_proplist(u->thread_mainloop_api,
190 "PulseAudio",
191 proplist);
192 pa_proplist_free(proplist);
193
194 if (!u->context) {
195 pa_log("Failed to create libpulse context");
196 goto fail;
197 }
198
199 if (u->cookie_file && pa_context_load_cookie_from_file(u->context, u->cookie_file) != 0) {
200 pa_log_error("Can not load cookie file!");
201 goto fail;
202 }
203
204 pa_context_set_state_callback(u->context, context_state_cb, u);
205 if (pa_context_connect(u->context,
206 u->remote_server,
207 PA_CONTEXT_NOAUTOSPAWN,
208 NULL) < 0) {
209 pa_log("Failed to connect libpulse context: %s", pa_strerror(pa_context_errno(u->context)));
210 goto fail;
211 }
212
213 for (;;) {
214 int ret;
215
216 if (pa_mainloop_iterate(u->thread_mainloop, 1, &ret) < 0) {
217 if (ret == 0)
218 goto finish;
219 else
220 goto fail;
221 }
222
223 if (u->sink && PA_UNLIKELY(u->sink->thread_info.rewind_requested))
224 pa_sink_process_rewind(u->sink, 0);
225
226 if (u->connected &&
227 pa_stream_get_state(u->stream) == PA_STREAM_READY &&
228 PA_SINK_IS_LINKED(u->sink->thread_info.state)) {
229 size_t writable;
230
231 writable = pa_stream_writable_size(u->stream);
232 if (writable > 0) {
233 pa_memchunk memchunk;
234 const void *p;
235
236 pa_sink_render_full(u->sink, writable, &memchunk);
237
238 pa_assert(memchunk.length > 0);
239
240 /* we have new data to write */
241 p = pa_memblock_acquire(memchunk.memblock);
242 /* TODO: Use pa_stream_begin_write() to reduce copying. */
243 ret = pa_stream_write(u->stream,
244 (uint8_t*) p + memchunk.index,
245 memchunk.length,
246 NULL, /**< A cleanup routine for the data or NULL to request an internal copy */
247 0, /** offset */
248 PA_SEEK_RELATIVE);
249 pa_memblock_release(memchunk.memblock);
250 pa_memblock_unref(memchunk.memblock);
251
252 if (ret != 0) {
253 pa_log_error("Could not write data into the stream ... ret = %i", ret);
254 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
255 }
256
257 }
258 }
259 }
260 fail:
261 /* send a message to the ctl thread to ask it to either terminate us, or
262 * restart us, but either way this thread will exit, so then wait for the
263 * shutdown message */
264 pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_MAYBE_RESTART, u, 0, NULL, NULL);
265 pa_asyncmsgq_wait_for(u->thread_mq->inq, PA_MESSAGE_SHUTDOWN);
266
267 finish:
268 if (u->stream) {
269 pa_stream_disconnect(u->stream);
270 pa_stream_unref(u->stream);
271 u->stream = NULL;
272 }
273
274 if (u->context) {
275 pa_context_disconnect(u->context);
276 pa_context_unref(u->context);
277 u->context = NULL;
278 }
279
280 pa_log_debug("Thread shutting down");
281 }
282
stream_state_cb(pa_stream * stream,void * userdata)283 static void stream_state_cb(pa_stream *stream, void *userdata) {
284 struct userdata *u = userdata;
285
286 pa_assert(u);
287
288 switch (pa_stream_get_state(stream)) {
289 case PA_STREAM_FAILED:
290 pa_log_error("Stream failed.");
291 u->connected = false;
292 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
293 break;
294 case PA_STREAM_TERMINATED:
295 pa_log_debug("Stream terminated.");
296 break;
297 case PA_STREAM_READY:
298 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
299 cork_stream(u, false);
300
301 /* Only call our requested_latency_cb when requested_latency
302 * changed between PA_STREAM_CREATING -> PA_STREAM_READY, because
303 * we don't want to override the initial tlength set by the server
304 * without a good reason. */
305 if (u->update_stream_bufferattr_after_connect)
306 sink_update_requested_latency_cb(u->sink);
307 else
308 stream_changed_buffer_attr_cb(stream, userdata);
309 case PA_STREAM_CREATING:
310 case PA_STREAM_UNCONNECTED:
311 break;
312 }
313 }
314
315 /* called when remote server changes the stream buffer_attr */
stream_changed_buffer_attr_cb(pa_stream * stream,void * userdata)316 static void stream_changed_buffer_attr_cb(pa_stream *stream, void *userdata) {
317 struct userdata *u = userdata;
318 const pa_buffer_attr *bufferattr;
319 pa_assert(u);
320
321 bufferattr = pa_stream_get_buffer_attr(u->stream);
322 pa_sink_set_max_request_within_thread(u->sink, bufferattr->tlength);
323
324 pa_log_debug("Server reports buffer attrs changed. tlength now at %lu.",
325 (unsigned long) bufferattr->tlength);
326 }
327
328 /* called after we requested a change of the stream buffer_attr */
stream_set_buffer_attr_cb(pa_stream * stream,int success,void * userdata)329 static void stream_set_buffer_attr_cb(pa_stream *stream, int success, void *userdata) {
330 stream_changed_buffer_attr_cb(stream, userdata);
331 }
332
333 /* called when the server experiences an underrun of our buffer */
stream_underflow_callback(pa_stream * stream,void * userdata)334 static void stream_underflow_callback(pa_stream *stream, void *userdata) {
335 pa_log_info("Server signalled buffer underrun.");
336 }
337
338 /* called when the server experiences an overrun of our buffer */
stream_overflow_callback(pa_stream * stream,void * userdata)339 static void stream_overflow_callback(pa_stream *stream, void *userdata) {
340 pa_log_info("Server signalled buffer overrun.");
341 }
342
343 /* Do a reinit of the module. Note that u will be freed as a result of this
344 * call. */
maybe_restart(struct module_restart_data * rd)345 static void maybe_restart(struct module_restart_data *rd) {
346 struct userdata *u = rd->userdata;
347
348 if (rd->restart_data) {
349 pa_log_debug("Restart already pending");
350 return;
351 }
352
353 if (u->reconnect_interval_us > 0) {
354 /* The handle returned here must be freed when do_init() finishes successfully
355 * and when the module exits. */
356 rd->restart_data = pa_restart_module_reinit(u->module, do_init, do_done, u->reconnect_interval_us);
357 } else {
358 /* exit the module */
359 pa_module_unload_request(u->module, true);
360 }
361 }
362
on_sink_created(struct userdata * u)363 static void on_sink_created(struct userdata *u) {
364 pa_proplist *proplist;
365 pa_buffer_attr bufferattr;
366 pa_usec_t requested_latency;
367 char *username = pa_get_user_name_malloc();
368 char *hostname = pa_get_host_name_malloc();
369 /* TODO: old tunnel put here the remote sink_name into stream name e.g. 'Null Output for lynxis@lazus' */
370 char *stream_name = pa_sprintf_malloc(_("Tunnel for %s@%s"), username, hostname);
371 pa_xfree(hostname);
372 pa_xfree(username);
373
374 pa_assert_io_context();
375
376 /* if we still don't have a sink, then sink creation failed, and we should
377 * kill this io thread */
378 if (!u->sink) {
379 pa_log_error("Could not create a sink.");
380 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
381 return;
382 }
383
384 proplist = tunnel_new_proplist(u);
385 u->stream = pa_stream_new_with_proplist(u->context,
386 stream_name,
387 &u->sink->sample_spec,
388 &u->sink->channel_map,
389 proplist);
390 pa_proplist_free(proplist);
391 pa_xfree(stream_name);
392
393 if (!u->stream) {
394 pa_log_error("Could not create a stream.");
395 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
396 return;
397 }
398
399 requested_latency = pa_sink_get_requested_latency_within_thread(u->sink);
400 if (requested_latency == (pa_usec_t) -1)
401 requested_latency = u->sink->thread_info.max_latency;
402
403 reset_bufferattr(&bufferattr);
404 bufferattr.tlength = pa_usec_to_bytes(requested_latency, &u->sink->sample_spec);
405
406 pa_log_debug("tlength requested at %lu.", (unsigned long) bufferattr.tlength);
407
408 pa_stream_set_state_callback(u->stream, stream_state_cb, u);
409 pa_stream_set_buffer_attr_callback(u->stream, stream_changed_buffer_attr_cb, u);
410 pa_stream_set_underflow_callback(u->stream, stream_underflow_callback, u);
411 pa_stream_set_overflow_callback(u->stream, stream_overflow_callback, u);
412 if (pa_stream_connect_playback(u->stream,
413 u->remote_sink_name,
414 &bufferattr,
415 PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_DONT_MOVE | PA_STREAM_START_CORKED | PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_ADJUST_LATENCY,
416 NULL,
417 NULL) < 0) {
418 pa_log_error("Could not connect stream.");
419 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
420 }
421 u->connected = true;
422 }
423
context_state_cb(pa_context * c,void * userdata)424 static void context_state_cb(pa_context *c, void *userdata) {
425 struct userdata *u = userdata;
426 pa_assert(u);
427
428 switch (pa_context_get_state(c)) {
429 case PA_CONTEXT_UNCONNECTED:
430 case PA_CONTEXT_CONNECTING:
431 case PA_CONTEXT_AUTHORIZING:
432 case PA_CONTEXT_SETTING_NAME:
433 break;
434 case PA_CONTEXT_READY:
435 /* now that we're connected, ask the control thread to create a sink for
436 * us, and wait for that to complete before proceeding, we'll
437 * receive TUNNEL_MESSAGE_SINK_CREATED in response when the sink is
438 * created (see sink_process_msg_cb()) */
439 pa_log_debug("Connection successful. Creating stream.");
440 pa_assert(!u->stream);
441 pa_assert(!u->sink);
442
443 pa_log_debug("Asking ctl thread to create sink.");
444 pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_CREATE_SINK_REQUEST, u, 0, NULL, NULL);
445 break;
446 case PA_CONTEXT_FAILED:
447 pa_log_debug("Context failed: %s.", pa_strerror(pa_context_errno(u->context)));
448 u->connected = false;
449 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
450 break;
451 case PA_CONTEXT_TERMINATED:
452 pa_log_debug("Context terminated.");
453 u->connected = false;
454 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
455 break;
456 }
457 }
458
sink_update_requested_latency_cb(pa_sink * s)459 static void sink_update_requested_latency_cb(pa_sink *s) {
460 struct userdata *u;
461 pa_operation *operation;
462 size_t nbytes;
463 pa_usec_t block_usec;
464 pa_buffer_attr bufferattr;
465
466 pa_sink_assert_ref(s);
467 pa_assert_se(u = s->userdata);
468
469 block_usec = pa_sink_get_requested_latency_within_thread(s);
470 if (block_usec == (pa_usec_t) -1)
471 block_usec = s->thread_info.max_latency;
472
473 nbytes = pa_usec_to_bytes(block_usec, &s->sample_spec);
474 pa_sink_set_max_request_within_thread(s, nbytes);
475
476 if (u->stream) {
477 switch (pa_stream_get_state(u->stream)) {
478 case PA_STREAM_READY:
479 if (pa_stream_get_buffer_attr(u->stream)->tlength == nbytes)
480 break;
481
482 pa_log_debug("Requesting new buffer attrs. tlength requested at %lu.",
483 (unsigned long) nbytes);
484
485 reset_bufferattr(&bufferattr);
486 bufferattr.tlength = nbytes;
487 if ((operation = pa_stream_set_buffer_attr(u->stream, &bufferattr, stream_set_buffer_attr_cb, u)))
488 pa_operation_unref(operation);
489 break;
490 case PA_STREAM_CREATING:
491 /* we have to delay our request until stream is ready */
492 u->update_stream_bufferattr_after_connect = true;
493 break;
494 default:
495 break;
496 }
497 }
498 }
499
sink_process_msg_cb(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)500 static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
501 struct userdata *u = PA_SINK(o)->userdata;
502
503 switch (code) {
504 case PA_SINK_MESSAGE_GET_LATENCY: {
505 int negative;
506 pa_usec_t remote_latency;
507
508 if (!PA_SINK_IS_LINKED(u->sink->thread_info.state)) {
509 *((int64_t*) data) = 0;
510 return 0;
511 }
512
513 if (!u->stream) {
514 *((int64_t*) data) = 0;
515 return 0;
516 }
517
518 if (pa_stream_get_state(u->stream) != PA_STREAM_READY) {
519 *((int64_t*) data) = 0;
520 return 0;
521 }
522
523 if (pa_stream_get_latency(u->stream, &remote_latency, &negative) < 0) {
524 *((int64_t*) data) = 0;
525 return 0;
526 }
527
528 *((int64_t*) data) = remote_latency;
529 return 0;
530 }
531 case TUNNEL_MESSAGE_SINK_CREATED:
532 on_sink_created(u);
533 return 0;
534 }
535 return pa_sink_process_msg(o, code, data, offset, chunk);
536 }
537
538 /* Called from the IO thread. */
sink_set_state_in_io_thread_cb(pa_sink * s,pa_sink_state_t new_state,pa_suspend_cause_t new_suspend_cause)539 static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state, pa_suspend_cause_t new_suspend_cause) {
540 struct userdata *u;
541
542 pa_assert(s);
543 pa_assert_se(u = s->userdata);
544
545 /* It may be that only the suspend cause is changing, in which case there's
546 * nothing to do. */
547 if (new_state == s->thread_info.state)
548 return 0;
549
550 if (!u->stream || pa_stream_get_state(u->stream) != PA_STREAM_READY)
551 return 0;
552
553 switch (new_state) {
554 case PA_SINK_SUSPENDED: {
555 cork_stream(u, true);
556 break;
557 }
558 case PA_SINK_IDLE:
559 case PA_SINK_RUNNING: {
560 cork_stream(u, false);
561 break;
562 }
563 case PA_SINK_INVALID_STATE:
564 case PA_SINK_INIT:
565 case PA_SINK_UNLINKED:
566 break;
567 }
568
569 return 0;
570 }
571
572 /* Creates a sink in the main thread.
573 *
574 * This method is called when we receive a message from the io thread that a
575 * connection has been established with the server. We defer creation of the
576 * sink until the connection is established, because we don't have a sink if
577 * the remote server isn't there.
578 */
create_sink(struct userdata * u)579 static void create_sink(struct userdata *u) {
580 pa_sink_new_data sink_data;
581
582 pa_assert_ctl_context();
583
584 /* Create sink */
585 pa_sink_new_data_init(&sink_data);
586 sink_data.driver = __FILE__;
587 sink_data.module = u->module;
588
589 pa_sink_new_data_set_name(&sink_data, u->sink_name);
590 pa_sink_new_data_set_sample_spec(&sink_data, &u->sample_spec);
591 pa_sink_new_data_set_channel_map(&sink_data, &u->channel_map);
592
593 pa_proplist_update(sink_data.proplist, PA_UPDATE_REPLACE, u->sink_proplist);
594
595 if (!(u->sink = pa_sink_new(u->module->core, &sink_data, PA_SINK_LATENCY | PA_SINK_DYNAMIC_LATENCY | PA_SINK_NETWORK))) {
596 pa_log("Failed to create sink.");
597 goto finish;
598 }
599
600 u->sink->userdata = u;
601 u->sink->parent.process_msg = sink_process_msg_cb;
602 u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
603 u->sink->update_requested_latency = sink_update_requested_latency_cb;
604 pa_sink_set_latency_range(u->sink, 0, MAX_LATENCY_USEC);
605
606 /* set thread message queue */
607 pa_sink_set_asyncmsgq(u->sink, u->thread_mq->inq);
608 pa_sink_set_rtpoll(u->sink, u->rtpoll);
609
610 pa_sink_put(u->sink);
611
612 finish:
613 pa_sink_new_data_done(&sink_data);
614
615 /* tell any interested io threads that the sink they asked for has now been
616 * created (even if we failed, we still notify the thread, so they can
617 * either handle or kill the thread, rather than deadlock waiting for a
618 * message that will never come */
619 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), TUNNEL_MESSAGE_SINK_CREATED, u, 0, NULL);
620 }
621
622 /* Runs in PA mainloop context */
tunnel_process_msg(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)623 static int tunnel_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
624 struct userdata *u = (struct userdata *) data;
625
626 pa_assert(u);
627 pa_assert_ctl_context();
628
629 if (u->shutting_down)
630 return 0;
631
632 switch (code) {
633 case TUNNEL_MESSAGE_CREATE_SINK_REQUEST:
634 create_sink(u);
635 break;
636 case TUNNEL_MESSAGE_MAYBE_RESTART:
637 maybe_restart(u->module->userdata);
638 break;
639 }
640
641 return 0;
642 }
643
do_init(pa_module * m)644 static int do_init(pa_module *m) {
645 struct userdata *u = NULL;
646 struct module_restart_data *rd;
647 pa_modargs *ma = NULL;
648 const char *remote_server = NULL;
649 char *default_sink_name = NULL;
650 uint32_t reconnect_interval_ms = 0;
651
652 pa_assert(m);
653 pa_assert(m->userdata);
654
655 rd = m->userdata;
656
657 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
658 pa_log("Failed to parse module arguments.");
659 goto fail;
660 }
661
662 u = pa_xnew0(struct userdata, 1);
663 u->module = m;
664 rd->userdata = u;
665
666 u->sample_spec = m->core->default_sample_spec;
667 u->channel_map = m->core->default_channel_map;
668 if (pa_modargs_get_sample_spec_and_channel_map(ma, &u->sample_spec, &u->channel_map, PA_CHANNEL_MAP_DEFAULT) < 0) {
669 pa_log("Invalid sample format specification or channel map");
670 goto fail;
671 }
672
673 remote_server = pa_modargs_get_value(ma, "server", NULL);
674 if (!remote_server) {
675 pa_log("No server given!");
676 goto fail;
677 }
678
679 u->remote_server = pa_xstrdup(remote_server);
680 u->thread_mainloop = pa_mainloop_new();
681 if (u->thread_mainloop == NULL) {
682 pa_log("Failed to create mainloop");
683 goto fail;
684 }
685 u->thread_mainloop_api = pa_mainloop_get_api(u->thread_mainloop);
686 u->cookie_file = pa_xstrdup(pa_modargs_get_value(ma, "cookie", NULL));
687 u->remote_sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
688
689 u->thread_mq = pa_xnew0(pa_thread_mq, 1);
690
691 if (pa_thread_mq_init_thread_mainloop(u->thread_mq, m->core->mainloop, u->thread_mainloop_api) < 0) {
692 pa_log("pa_thread_mq_init_thread_mainloop() failed.");
693 goto fail;
694 }
695
696 u->msg = pa_msgobject_new(tunnel_msg);
697 u->msg->parent.process_msg = tunnel_process_msg;
698
699 /* The rtpoll created here is never run. It is only necessary to avoid crashes
700 * when module-tunnel-sink-new is used together with module-loopback or
701 * module-combine-sink. Both modules base their asyncmsq on the rtpoll provided
702 * by the sink. module-loopback and combine-sink only work because they call
703 * pa_asyncmsq_process_one() themselves. module_rtp_recv also uses the rtpoll,
704 * but never calls pa_asyncmsq_process_one(), so it will not work in combination
705 * with module-tunnel-sink-new. */
706 u->rtpoll = pa_rtpoll_new();
707
708 default_sink_name = pa_sprintf_malloc("tunnel-sink-new.%s", remote_server);
709 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", default_sink_name));
710
711 u->sink_proplist = pa_proplist_new();
712 pa_proplist_sets(u->sink_proplist, PA_PROP_DEVICE_CLASS, "sound");
713 pa_proplist_setf(u->sink_proplist,
714 PA_PROP_DEVICE_DESCRIPTION,
715 _("Tunnel to %s/%s"),
716 remote_server,
717 pa_strempty(u->remote_sink_name));
718
719 if (pa_modargs_get_proplist(ma, "sink_properties", u->sink_proplist, PA_UPDATE_REPLACE) < 0) {
720 pa_log("Invalid properties");
721 goto fail;
722 }
723
724 pa_modargs_get_value_u32(ma, "reconnect_interval_ms", &reconnect_interval_ms);
725 u->reconnect_interval_us = reconnect_interval_ms * PA_USEC_PER_MSEC;
726
727 if (!(u->thread = pa_thread_new("tunnel-sink", thread_func, u))) {
728 pa_log("Failed to create thread.");
729 goto fail;
730 }
731
732 /* If the module is restarting and do_init() finishes successfully, the
733 * restart data is no longer needed. If do_init() fails, don't touch the
734 * restart data, because following restart attempts will continue to use
735 * the same data. If restart_data is NULL, that means no restart is
736 * currently pending. */
737 if (rd->restart_data) {
738 pa_restart_free(rd->restart_data);
739 rd->restart_data = NULL;
740 }
741
742 pa_modargs_free(ma);
743 pa_xfree(default_sink_name);
744
745 return 0;
746
747 fail:
748 if (ma)
749 pa_modargs_free(ma);
750
751 if (default_sink_name)
752 pa_xfree(default_sink_name);
753
754 return -1;
755 }
756
do_done(pa_module * m)757 static void do_done(pa_module *m) {
758 struct userdata *u = NULL;
759 struct module_restart_data *rd;
760
761 pa_assert(m);
762
763 if (!(rd = m->userdata))
764 return;
765 if (!(u = rd->userdata))
766 return;
767
768 u->shutting_down = true;
769
770 if (u->sink)
771 pa_sink_unlink(u->sink);
772
773 if (u->thread) {
774 pa_asyncmsgq_send(u->thread_mq->inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
775 pa_thread_free(u->thread);
776 }
777
778 if (u->thread_mq) {
779 pa_thread_mq_done(u->thread_mq);
780 pa_xfree(u->thread_mq);
781 }
782
783 if (u->thread_mainloop)
784 pa_mainloop_free(u->thread_mainloop);
785
786 if (u->cookie_file)
787 pa_xfree(u->cookie_file);
788
789 if (u->remote_sink_name)
790 pa_xfree(u->remote_sink_name);
791
792 if (u->remote_server)
793 pa_xfree(u->remote_server);
794
795 if (u->sink)
796 pa_sink_unref(u->sink);
797
798 if (u->rtpoll)
799 pa_rtpoll_free(u->rtpoll);
800
801 if (u->sink_proplist)
802 pa_proplist_free(u->sink_proplist);
803
804 if (u->sink_name)
805 pa_xfree(u->sink_name);
806
807 pa_xfree(u->msg);
808
809 pa_xfree(u);
810
811 rd->userdata = NULL;
812 }
813
pa__init(pa_module * m)814 int pa__init(pa_module *m) {
815 int ret;
816
817 pa_assert(m);
818
819 m->userdata = pa_xnew0(struct module_restart_data, 1);
820
821 ret = do_init(m);
822
823 if (ret < 0)
824 pa__done(m);
825
826 return ret;
827 }
828
pa__done(pa_module * m)829 void pa__done(pa_module *m) {
830 pa_assert(m);
831
832 do_done(m);
833
834 if (m->userdata) {
835 struct module_restart_data *rd = m->userdata;
836
837 if (rd->restart_data)
838 pa_restart_free(rd->restart_data);
839
840 pa_xfree(m->userdata);
841 }
842 }
843