1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2013 Alexander Couzens
5
6 PulseAudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as published
8 by the Free Software Foundation; either version 2.1 of the License,
9 or (at your option) any later version.
10
11 PulseAudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public License
17 along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
18 ***/
19
20 #ifdef HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23
24 #include "restart-module.h"
25
26 #include <pulse/context.h>
27 #include <pulse/timeval.h>
28 #include <pulse/xmalloc.h>
29 #include <pulse/stream.h>
30 #include <pulse/mainloop.h>
31 #include <pulse/introspect.h>
32 #include <pulse/error.h>
33
34 #include <pulsecore/core.h>
35 #include <pulsecore/core-util.h>
36 #include <pulsecore/i18n.h>
37 #include <pulsecore/source.h>
38 #include <pulsecore/modargs.h>
39 #include <pulsecore/log.h>
40 #include <pulsecore/thread.h>
41 #include <pulsecore/thread-mq.h>
42 #include <pulsecore/poll.h>
43 #include <pulsecore/rtpoll.h>
44 #include <pulsecore/proplist-util.h>
45
46 PA_MODULE_AUTHOR("Alexander Couzens");
47 PA_MODULE_DESCRIPTION("Create a network source which connects via a stream to a remote PulseAudio server");
48 PA_MODULE_VERSION(PACKAGE_VERSION);
49 PA_MODULE_LOAD_ONCE(false);
50 PA_MODULE_USAGE(
51 "server=<address> "
52 "source=<name of the remote source> "
53 "source_name=<name for the local source> "
54 "source_properties=<properties for the local source> "
55 "reconnect_interval_ms=<interval to try reconnects, 0 or omitted if disabled> "
56 "format=<sample format> "
57 "channels=<number of channels> "
58 "rate=<sample rate> "
59 "channel_map=<channel map> "
60 "cookie=<cookie file path>"
61 );
62
63 #define TUNNEL_THREAD_FAILED_MAINLOOP 1
64
65 static int do_init(pa_module *m);
66 static void do_done(pa_module *m);
67 static void stream_state_cb(pa_stream *stream, void *userdata);
68 static void stream_read_cb(pa_stream *s, size_t length, void *userdata);
69 static void context_state_cb(pa_context *c, void *userdata);
70 static void source_update_requested_latency_cb(pa_source *s);
71
72 struct tunnel_msg {
73 pa_msgobject parent;
74 };
75
76 typedef struct tunnel_msg tunnel_msg;
77 PA_DEFINE_PRIVATE_CLASS(tunnel_msg, pa_msgobject);
78
79 enum {
80 TUNNEL_MESSAGE_CREATE_SOURCE_REQUEST,
81 TUNNEL_MESSAGE_MAYBE_RESTART,
82 };
83
84 enum {
85 TUNNEL_MESSAGE_SOURCE_CREATED = PA_SOURCE_MESSAGE_MAX,
86 };
87
88 struct userdata {
89 pa_module *module;
90 pa_source *source;
91 pa_thread *thread;
92 pa_thread_mq *thread_mq;
93 pa_mainloop *thread_mainloop;
94 pa_mainloop_api *thread_mainloop_api;
95
96 pa_context *context;
97 pa_stream *stream;
98 pa_rtpoll *rtpoll;
99
100 bool update_stream_bufferattr_after_connect;
101 bool connected;
102 bool shutting_down;
103 bool new_data;
104
105 char *cookie_file;
106 char *remote_server;
107 char *remote_source_name;
108 char *source_name;
109
110 pa_proplist *source_proplist;
111 pa_sample_spec sample_spec;
112 pa_channel_map channel_map;
113
114 tunnel_msg *msg;
115
116 pa_usec_t reconnect_interval_us;
117 };
118
119 struct module_restart_data {
120 struct userdata *userdata;
121 pa_restart_data *restart_data;
122 };
123
124 static const char* const valid_modargs[] = {
125 "source_name",
126 "source_properties",
127 "server",
128 "source",
129 "format",
130 "channels",
131 "rate",
132 "channel_map",
133 "cookie",
134 "reconnect_interval_ms",
135 NULL,
136 };
137
cork_stream(struct userdata * u,bool cork)138 static void cork_stream(struct userdata *u, bool cork) {
139 pa_operation *operation;
140
141 pa_assert(u);
142 pa_assert(u->stream);
143
144 if ((operation = pa_stream_cork(u->stream, cork, NULL, NULL)))
145 pa_operation_unref(operation);
146 }
147
reset_bufferattr(pa_buffer_attr * bufferattr)148 static void reset_bufferattr(pa_buffer_attr *bufferattr) {
149 pa_assert(bufferattr);
150 bufferattr->fragsize = (uint32_t) -1;
151 bufferattr->minreq = (uint32_t) -1;
152 bufferattr->maxlength = (uint32_t) -1;
153 bufferattr->prebuf = (uint32_t) -1;
154 bufferattr->tlength = (uint32_t) -1;
155 }
156
tunnel_new_proplist(struct userdata * u)157 static pa_proplist* tunnel_new_proplist(struct userdata *u) {
158 pa_proplist *proplist = pa_proplist_new();
159 pa_assert(proplist);
160 pa_proplist_sets(proplist, PA_PROP_APPLICATION_NAME, "PulseAudio");
161 pa_proplist_sets(proplist, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
162 pa_proplist_sets(proplist, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
163 pa_init_proplist(proplist);
164
165 return proplist;
166 }
167
stream_read_cb(pa_stream * s,size_t length,void * userdata)168 static void stream_read_cb(pa_stream *s, size_t length, void *userdata) {
169 struct userdata *u = userdata;
170 u->new_data = true;
171 }
172
173 /* called from io context to read samples from the stream into our source */
read_new_samples(struct userdata * u)174 static void read_new_samples(struct userdata *u) {
175 const void *p;
176 size_t readable = 0;
177 pa_memchunk memchunk;
178
179 pa_assert(u);
180 u->new_data = false;
181
182 pa_memchunk_reset(&memchunk);
183
184 if (PA_UNLIKELY(!u->connected || pa_stream_get_state(u->stream) != PA_STREAM_READY))
185 return;
186
187 readable = pa_stream_readable_size(u->stream);
188 while (readable > 0) {
189 size_t nbytes = 0;
190 if (PA_UNLIKELY(pa_stream_peek(u->stream, &p, &nbytes) != 0)) {
191 pa_log("pa_stream_peek() failed: %s", pa_strerror(pa_context_errno(u->context)));
192 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
193 return;
194 }
195
196 if (PA_LIKELY(p)) {
197 /* we have valid data */
198 memchunk.memblock = pa_memblock_new_fixed(u->module->core->mempool, (void *) p, nbytes, true);
199 memchunk.length = nbytes;
200 memchunk.index = 0;
201
202 pa_source_post(u->source, &memchunk);
203 pa_memblock_unref_fixed(memchunk.memblock);
204 } else {
205 size_t bytes_to_generate = nbytes;
206
207 /* we have a hole. generate silence */
208 memchunk = u->source->silence;
209 pa_memblock_ref(memchunk.memblock);
210
211 while (bytes_to_generate > 0) {
212 if (bytes_to_generate < memchunk.length)
213 memchunk.length = bytes_to_generate;
214
215 pa_source_post(u->source, &memchunk);
216 bytes_to_generate -= memchunk.length;
217 }
218
219 pa_memblock_unref(memchunk.memblock);
220 }
221
222 pa_stream_drop(u->stream);
223 readable -= nbytes;
224 }
225 }
226
thread_func(void * userdata)227 static void thread_func(void *userdata) {
228 struct userdata *u = userdata;
229 pa_proplist *proplist;
230
231 pa_assert(u);
232
233 pa_log_debug("Thread starting up");
234 pa_thread_mq_install(u->thread_mq);
235
236 proplist = tunnel_new_proplist(u);
237 u->context = pa_context_new_with_proplist(u->thread_mainloop_api,
238 "PulseAudio",
239 proplist);
240 pa_proplist_free(proplist);
241
242 if (!u->context) {
243 pa_log("Failed to create libpulse context");
244 goto fail;
245 }
246
247 if (u->cookie_file && pa_context_load_cookie_from_file(u->context, u->cookie_file) != 0) {
248 pa_log_error("Can not load cookie file!");
249 goto fail;
250 }
251
252 pa_context_set_state_callback(u->context, context_state_cb, u);
253 if (pa_context_connect(u->context,
254 u->remote_server,
255 PA_CONTEXT_NOAUTOSPAWN,
256 NULL) < 0) {
257 pa_log("Failed to connect libpulse context: %s", pa_strerror(pa_context_errno(u->context)));
258 goto fail;
259 }
260
261 for (;;) {
262 int ret;
263
264 if (pa_mainloop_iterate(u->thread_mainloop, 1, &ret) < 0) {
265 if (ret == 0)
266 goto finish;
267 else
268 goto fail;
269 }
270
271 if (u->new_data)
272 read_new_samples(u);
273
274 /* Run the rtpoll to process messages that other modules may have placed in the queue. */
275 pa_rtpoll_set_timer_relative(u->rtpoll, 0);
276 if (pa_rtpoll_run(u->rtpoll) < 0)
277 goto fail;
278 }
279 fail:
280 /* send a message to the ctl thread to ask it to either terminate us, or
281 * restart us, but either way this thread will exit, so then wait for the
282 * shutdown message */
283 pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_MAYBE_RESTART, u, 0, NULL, NULL);
284 pa_asyncmsgq_wait_for(u->thread_mq->inq, PA_MESSAGE_SHUTDOWN);
285
286 finish:
287 if (u->stream) {
288 pa_stream_disconnect(u->stream);
289 pa_stream_unref(u->stream);
290 u->stream = NULL;
291 }
292
293 if (u->context) {
294 pa_context_disconnect(u->context);
295 pa_context_unref(u->context);
296 u->context = NULL;
297 }
298
299 pa_log_debug("Thread shutting down");
300 }
301
stream_state_cb(pa_stream * stream,void * userdata)302 static void stream_state_cb(pa_stream *stream, void *userdata) {
303 struct userdata *u = userdata;
304
305 pa_assert(u);
306
307 switch (pa_stream_get_state(stream)) {
308 case PA_STREAM_FAILED:
309 pa_log_error("Stream failed: %s", pa_strerror(pa_context_errno(u->context)));
310 u->connected = false;
311 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
312 break;
313 case PA_STREAM_TERMINATED:
314 pa_log_debug("Stream terminated.");
315 break;
316 case PA_STREAM_READY:
317 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
318 cork_stream(u, false);
319
320 /* Only call our requested_latency_cb when requested_latency
321 * changed between PA_STREAM_CREATING -> PA_STREAM_READY, because
322 * we don't want to override the initial fragsize set by the server
323 * without a good reason. */
324 if (u->update_stream_bufferattr_after_connect)
325 source_update_requested_latency_cb(u->source);
326 case PA_STREAM_UNCONNECTED:
327 case PA_STREAM_CREATING:
328 break;
329 }
330 }
331
332 /* Do a reinit of the module. Note that u will be freed as a result of this
333 * call. */
maybe_restart(struct module_restart_data * rd)334 static void maybe_restart(struct module_restart_data *rd) {
335 struct userdata *u = rd->userdata;
336
337 if (rd->restart_data) {
338 pa_log_debug("Restart already pending");
339 return;
340 }
341
342 if (u->reconnect_interval_us > 0) {
343 /* The handle returned here must be freed when do_init() finishes successfully
344 * and when the module exits. */
345 rd->restart_data = pa_restart_module_reinit(u->module, do_init, do_done, u->reconnect_interval_us);
346 } else {
347 /* exit the module */
348 pa_module_unload_request(u->module, true);
349 }
350 }
351
on_source_created(struct userdata * u)352 static void on_source_created(struct userdata *u) {
353 pa_proplist *proplist;
354 pa_buffer_attr bufferattr;
355 pa_usec_t requested_latency;
356 char *username = pa_get_user_name_malloc();
357 char *hostname = pa_get_host_name_malloc();
358 /* TODO: old tunnel put here the remote source_name into stream name e.g. 'Null Output for lynxis@lazus' */
359 char *stream_name = pa_sprintf_malloc(_("Tunnel for %s@%s"), username, hostname);
360 pa_xfree(username);
361 pa_xfree(hostname);
362
363 pa_assert_io_context();
364
365 /* if we still don't have a source, then source creation failed, and we
366 * should kill this io thread */
367 if (!u->source) {
368 pa_log_error("Could not create a source.");
369 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
370 return;
371 }
372
373 proplist = tunnel_new_proplist(u);
374 u->stream = pa_stream_new_with_proplist(u->context,
375 stream_name,
376 &u->source->sample_spec,
377 &u->source->channel_map,
378 proplist);
379 pa_proplist_free(proplist);
380 pa_xfree(stream_name);
381
382 if (!u->stream) {
383 pa_log_error("Could not create a stream: %s", pa_strerror(pa_context_errno(u->context)));
384 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
385 return;
386 }
387
388 requested_latency = pa_source_get_requested_latency_within_thread(u->source);
389 if (requested_latency == (uint32_t) -1)
390 requested_latency = u->source->thread_info.max_latency;
391
392 reset_bufferattr(&bufferattr);
393 bufferattr.fragsize = pa_usec_to_bytes(requested_latency, &u->source->sample_spec);
394
395 pa_stream_set_state_callback(u->stream, stream_state_cb, u);
396 pa_stream_set_read_callback(u->stream, stream_read_cb, u);
397 if (pa_stream_connect_record(u->stream,
398 u->remote_source_name,
399 &bufferattr,
400 PA_STREAM_INTERPOLATE_TIMING|PA_STREAM_DONT_MOVE|PA_STREAM_AUTO_TIMING_UPDATE|PA_STREAM_START_CORKED|PA_STREAM_ADJUST_LATENCY) < 0) {
401 pa_log_debug("Could not create stream: %s", pa_strerror(pa_context_errno(u->context)));
402 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
403 }
404 u->connected = true;
405 }
406
context_state_cb(pa_context * c,void * userdata)407 static void context_state_cb(pa_context *c, void *userdata) {
408 struct userdata *u = userdata;
409 pa_assert(u);
410
411 switch (pa_context_get_state(c)) {
412 case PA_CONTEXT_UNCONNECTED:
413 case PA_CONTEXT_CONNECTING:
414 case PA_CONTEXT_AUTHORIZING:
415 case PA_CONTEXT_SETTING_NAME:
416 break;
417 case PA_CONTEXT_READY:
418 pa_log_debug("Connection successful. Creating stream.");
419 pa_assert(!u->stream);
420 pa_assert(!u->source);
421
422 pa_log_debug("Asking ctl thread to create source.");
423 pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_CREATE_SOURCE_REQUEST, u, 0, NULL, NULL);
424 break;
425 case PA_CONTEXT_FAILED:
426 pa_log_debug("Context failed with err %s.", pa_strerror(pa_context_errno(u->context)));
427 u->connected = false;
428 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
429 break;
430 case PA_CONTEXT_TERMINATED:
431 pa_log_debug("Context terminated.");
432 u->connected = false;
433 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
434 break;
435 }
436 }
437
source_update_requested_latency_cb(pa_source * s)438 static void source_update_requested_latency_cb(pa_source *s) {
439 struct userdata *u;
440 pa_operation *operation;
441 size_t nbytes;
442 pa_usec_t block_usec;
443 pa_buffer_attr bufferattr;
444
445 pa_source_assert_ref(s);
446 pa_assert_se(u = s->userdata);
447
448 block_usec = pa_source_get_requested_latency_within_thread(s);
449 if (block_usec == (pa_usec_t) -1)
450 block_usec = s->thread_info.max_latency;
451
452 nbytes = pa_usec_to_bytes(block_usec, &s->sample_spec);
453
454 if (u->stream) {
455 switch (pa_stream_get_state(u->stream)) {
456 case PA_STREAM_READY:
457 if (pa_stream_get_buffer_attr(u->stream)->fragsize == nbytes)
458 break;
459
460 reset_bufferattr(&bufferattr);
461 bufferattr.fragsize = nbytes;
462 if ((operation = pa_stream_set_buffer_attr(u->stream, &bufferattr, NULL, NULL)))
463 pa_operation_unref(operation);
464 break;
465 case PA_STREAM_CREATING:
466 /* we have to delay our request until stream is ready */
467 u->update_stream_bufferattr_after_connect = true;
468 break;
469 default:
470 break;
471 }
472 }
473 }
474
source_process_msg_cb(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)475 static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
476 struct userdata *u = PA_SOURCE(o)->userdata;
477
478 switch (code) {
479 case PA_SOURCE_MESSAGE_GET_LATENCY: {
480 int negative;
481 pa_usec_t remote_latency;
482
483 if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state)) {
484 *((int64_t*) data) = 0;
485 return 0;
486 }
487
488 if (!u->stream) {
489 *((int64_t*) data) = 0;
490 return 0;
491 }
492
493 if (pa_stream_get_state(u->stream) != PA_STREAM_READY) {
494 *((int64_t*) data) = 0;
495 return 0;
496 }
497
498 if (pa_stream_get_latency(u->stream, &remote_latency, &negative) < 0) {
499 *((int64_t*) data) = 0;
500 return 0;
501 }
502
503 if (negative)
504 *((int64_t*) data) = - (int64_t)remote_latency;
505 else
506 *((int64_t*) data) = remote_latency;
507
508 return 0;
509 }
510 case TUNNEL_MESSAGE_SOURCE_CREATED:
511 on_source_created(u);
512 return 0;
513 }
514 return pa_source_process_msg(o, code, data, offset, chunk);
515 }
516
517 /* Called from the IO thread. */
source_set_state_in_io_thread_cb(pa_source * s,pa_source_state_t new_state,pa_suspend_cause_t new_suspend_cause)518 static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state, pa_suspend_cause_t new_suspend_cause) {
519 struct userdata *u;
520
521 pa_assert(s);
522 pa_assert_se(u = s->userdata);
523
524 /* It may be that only the suspend cause is changing, in which case there's
525 * nothing to do. */
526 if (new_state == s->thread_info.state)
527 return 0;
528
529 if (!u->stream || pa_stream_get_state(u->stream) != PA_STREAM_READY)
530 return 0;
531
532 switch (new_state) {
533 case PA_SOURCE_SUSPENDED: {
534 cork_stream(u, true);
535 break;
536 }
537 case PA_SOURCE_IDLE:
538 case PA_SOURCE_RUNNING: {
539 cork_stream(u, false);
540 break;
541 }
542 case PA_SOURCE_INVALID_STATE:
543 case PA_SOURCE_INIT:
544 case PA_SOURCE_UNLINKED:
545 break;
546 }
547
548 return 0;
549 }
550
551 /* Creates a source in the main thread.
552 *
553 * This method is called when we receive a message from the io thread that a
554 * connection has been established with the server. We defer creation of the
555 * source until the connection is established, because we don't have a source
556 * if the remote server isn't there.
557 */
create_source(struct userdata * u)558 static void create_source(struct userdata *u) {
559 pa_source_new_data source_data;
560
561 pa_assert_ctl_context();
562
563 /* Create source */
564 pa_source_new_data_init(&source_data);
565 source_data.driver = __FILE__;
566 source_data.module = u->module;
567
568 pa_source_new_data_set_name(&source_data, u->source_name);
569 pa_source_new_data_set_sample_spec(&source_data, &u->sample_spec);
570 pa_source_new_data_set_channel_map(&source_data, &u->channel_map);
571
572 pa_proplist_update(source_data.proplist, PA_UPDATE_REPLACE, u->source_proplist);
573
574 if (!(u->source = pa_source_new(u->module->core, &source_data, PA_SOURCE_LATENCY | PA_SOURCE_DYNAMIC_LATENCY | PA_SOURCE_NETWORK))) {
575 pa_log("Failed to create source.");
576 goto finish;
577 }
578
579 u->source->userdata = u;
580 u->source->parent.process_msg = source_process_msg_cb;
581 u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb;
582 u->source->update_requested_latency = source_update_requested_latency_cb;
583
584 pa_source_set_asyncmsgq(u->source, u->thread_mq->inq);
585 pa_source_set_rtpoll(u->source, u->rtpoll);
586
587 pa_source_put(u->source);
588
589 finish:
590 pa_source_new_data_done(&source_data);
591
592 /* tell any interested io threads that the sink they asked for has now been
593 * created (even if we failed, we still notify the thread, so they can
594 * either handle or kill the thread, rather than deadlock waiting for a
595 * message that will never come */
596 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), TUNNEL_MESSAGE_SOURCE_CREATED, u, 0, NULL);
597 }
598
599 /* Runs in PA mainloop context */
tunnel_process_msg(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)600 static int tunnel_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
601 struct userdata *u = (struct userdata *) data;
602
603 pa_assert(u);
604 pa_assert_ctl_context();
605
606 if (u->shutting_down)
607 return 0;
608
609 switch (code) {
610 case TUNNEL_MESSAGE_CREATE_SOURCE_REQUEST:
611 create_source(u);
612 break;
613 case TUNNEL_MESSAGE_MAYBE_RESTART:
614 maybe_restart(u->module->userdata);
615 break;
616 }
617
618 return 0;
619 }
620
do_init(pa_module * m)621 static int do_init(pa_module *m) {
622 struct userdata *u = NULL;
623 struct module_restart_data *rd;
624 pa_modargs *ma = NULL;
625 const char *remote_server = NULL;
626 char *default_source_name = NULL;
627 uint32_t reconnect_interval_ms = 0;
628
629 pa_assert(m);
630 pa_assert(m->userdata);
631
632 rd = m->userdata;
633
634 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
635 pa_log("Failed to parse module arguments.");
636 goto fail;
637 }
638
639 u = pa_xnew0(struct userdata, 1);
640 u->module = m;
641 rd->userdata = u;
642
643 u->sample_spec = m->core->default_sample_spec;
644 u->channel_map = m->core->default_channel_map;
645 if (pa_modargs_get_sample_spec_and_channel_map(ma, &u->sample_spec, &u->channel_map, PA_CHANNEL_MAP_DEFAULT) < 0) {
646 pa_log("Invalid sample format specification or channel map");
647 goto fail;
648 }
649
650 remote_server = pa_modargs_get_value(ma, "server", NULL);
651 if (!remote_server) {
652 pa_log("No server given!");
653 goto fail;
654 }
655
656 u->remote_server = pa_xstrdup(remote_server);
657 u->thread_mainloop = pa_mainloop_new();
658 if (u->thread_mainloop == NULL) {
659 pa_log("Failed to create mainloop");
660 goto fail;
661 }
662 u->thread_mainloop_api = pa_mainloop_get_api(u->thread_mainloop);
663 u->cookie_file = pa_xstrdup(pa_modargs_get_value(ma, "cookie", NULL));
664 u->remote_source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));
665
666 u->thread_mq = pa_xnew0(pa_thread_mq, 1);
667
668 if (pa_thread_mq_init_thread_mainloop(u->thread_mq, m->core->mainloop, u->thread_mainloop_api) < 0) {
669 pa_log("pa_thread_mq_init_thread_mainloop() failed.");
670 goto fail;
671 }
672
673 u->msg = pa_msgobject_new(tunnel_msg);
674 u->msg->parent.process_msg = tunnel_process_msg;
675
676 /* The rtpoll created here must curently only exist to avoid crashes when
677 * the module is used together with module-loopback. Because module-loopback
678 * runs pa_asyncmsgq_process_one() from the pop callback, the rtpoll need not
679 * be run. We will do so anyway for potential modules similar to
680 * module-combine-sink that use the rtpoll of the underlying source for
681 * message exchange. */
682 u->rtpoll = pa_rtpoll_new();
683
684 default_source_name = pa_sprintf_malloc("tunnel-source-new.%s", remote_server);
685 u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source_name", default_source_name));
686
687 u->source_proplist = pa_proplist_new();
688 pa_proplist_sets(u->source_proplist, PA_PROP_DEVICE_CLASS, "sound");
689 pa_proplist_setf(u->source_proplist,
690 PA_PROP_DEVICE_DESCRIPTION,
691 _("Tunnel to %s/%s"),
692 remote_server,
693 pa_strempty(u->remote_source_name));
694
695 if (pa_modargs_get_proplist(ma, "source_properties", u->source_proplist, PA_UPDATE_REPLACE) < 0) {
696 pa_log("Invalid properties");
697 goto fail;
698 }
699
700 pa_modargs_get_value_u32(ma, "reconnect_interval_ms", &reconnect_interval_ms);
701 u->reconnect_interval_us = reconnect_interval_ms * PA_USEC_PER_MSEC;
702
703 if (!(u->thread = pa_thread_new("tunnel-source", thread_func, u))) {
704 pa_log("Failed to create thread.");
705 goto fail;
706 }
707
708 /* If the module is restarting and do_init() finishes successfully, the
709 * restart data is no longer needed. If do_init() fails, don't touch the
710 * restart data, because following restart attempts will continue to use
711 * the same data. If restart_data is NULL, that means no restart is
712 * currently pending. */
713 if (rd->restart_data) {
714 pa_restart_free(rd->restart_data);
715 rd->restart_data = NULL;
716 }
717
718 pa_modargs_free(ma);
719 pa_xfree(default_source_name);
720
721 return 0;
722
723 fail:
724 if (ma)
725 pa_modargs_free(ma);
726
727 if (default_source_name)
728 pa_xfree(default_source_name);
729
730 return -1;
731 }
732
do_done(pa_module * m)733 static void do_done(pa_module *m) {
734 struct userdata *u = NULL;
735 struct module_restart_data *rd;
736
737 pa_assert(m);
738
739 if (!(rd = m->userdata))
740 return;
741 if (!(u = rd->userdata))
742 return;
743
744 u->shutting_down = true;
745
746 if (u->source)
747 pa_source_unlink(u->source);
748
749 if (u->thread) {
750 pa_asyncmsgq_send(u->thread_mq->inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
751 pa_thread_free(u->thread);
752 }
753
754 if (u->thread_mq) {
755 pa_thread_mq_done(u->thread_mq);
756 pa_xfree(u->thread_mq);
757 }
758
759 if (u->thread_mainloop)
760 pa_mainloop_free(u->thread_mainloop);
761
762 if (u->cookie_file)
763 pa_xfree(u->cookie_file);
764
765 if (u->remote_source_name)
766 pa_xfree(u->remote_source_name);
767
768 if (u->remote_server)
769 pa_xfree(u->remote_server);
770
771 if (u->source)
772 pa_source_unref(u->source);
773
774 if (u->rtpoll)
775 pa_rtpoll_free(u->rtpoll);
776
777 if (u->source_proplist)
778 pa_proplist_free(u->source_proplist);
779
780 if (u->source_name)
781 pa_xfree(u->source_name);
782
783 pa_xfree(u->msg);
784
785 pa_xfree(u);
786
787 rd->userdata = NULL;
788 }
789
pa__init(pa_module * m)790 int pa__init(pa_module *m) {
791 int ret;
792
793 pa_assert(m);
794
795 m->userdata = pa_xnew0(struct module_restart_data, 1);
796
797 ret = do_init(m);
798
799 if (ret < 0)
800 pa__done(m);
801
802 return ret;
803 }
804
pa__done(pa_module * m)805 void pa__done(pa_module *m) {
806 pa_assert(m);
807
808 do_done(m);
809
810 if (m->userdata) {
811 struct module_restart_data *rd = m->userdata;
812
813 if (rd->restart_data)
814 pa_restart_free(rd->restart_data);
815
816 pa_xfree(m->userdata);
817 }
818 }
819