1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2013 Alexander Couzens
5
6 PulseAudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as published
8 by the Free Software Foundation; either version 2.1 of the License,
9 or (at your option) any later version.
10
11 PulseAudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public License
17 along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
18 ***/
19
20 #ifdef HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23
24 #include "restart-module.h"
25
26 #include <pulse/context.h>
27 #include <pulse/timeval.h>
28 #include <pulse/xmalloc.h>
29 #include <pulse/stream.h>
30 #include <pulse/mainloop.h>
31 #include <pulse/introspect.h>
32 #include <pulse/error.h>
33
34 #include <pulsecore/core.h>
35 #include <pulsecore/core-util.h>
36 #include <pulsecore/i18n.h>
37 #include <pulsecore/source.h>
38 #include <pulsecore/modargs.h>
39 #include <pulsecore/log.h>
40 #include <pulsecore/thread.h>
41 #include <pulsecore/thread-mq.h>
42 #include <pulsecore/poll.h>
43 #include <pulsecore/rtpoll.h>
44 #include <pulsecore/proplist-util.h>
45
46 PA_MODULE_AUTHOR("Alexander Couzens");
47 PA_MODULE_DESCRIPTION("Create a network source which connects via a stream to a remote PulseAudio server");
48 PA_MODULE_VERSION(PACKAGE_VERSION);
49 PA_MODULE_LOAD_ONCE(false);
50 PA_MODULE_USAGE(
51 "server=<address> "
52 "source=<name of the remote source> "
53 "source_name=<name for the local source> "
54 "source_properties=<properties for the local source> "
55 "reconnect_interval_ms=<interval to try reconnects, 0 or omitted if disabled> "
56 "format=<sample format> "
57 "channels=<number of channels> "
58 "rate=<sample rate> "
59 "channel_map=<channel map> "
60 "cookie=<cookie file path>"
61 );
62
63 #define TUNNEL_THREAD_FAILED_MAINLOOP 1
64
65 static int do_init(pa_module *m);
66 static void do_done(pa_module *m);
67 static void stream_state_cb(pa_stream *stream, void *userdata);
68 static void stream_read_cb(pa_stream *s, size_t length, void *userdata);
69 static void context_state_cb(pa_context *c, void *userdata);
70 static void source_update_requested_latency_cb(pa_source *s);
71
72 struct tunnel_msg {
73 pa_msgobject parent;
74 };
75
76 typedef struct tunnel_msg tunnel_msg;
77 PA_DEFINE_PRIVATE_CLASS(tunnel_msg, pa_msgobject);
78
79 enum {
80 TUNNEL_MESSAGE_CREATE_SOURCE_REQUEST,
81 TUNNEL_MESSAGE_MAYBE_RESTART,
82 };
83
84 enum {
85 TUNNEL_MESSAGE_SOURCE_CREATED = PA_SOURCE_MESSAGE_MAX,
86 };
87
88 struct userdata {
89 pa_module *module;
90 pa_source *source;
91 pa_thread *thread;
92 pa_thread_mq *thread_mq;
93 pa_mainloop *thread_mainloop;
94 pa_mainloop_api *thread_mainloop_api;
95
96 pa_context *context;
97 pa_stream *stream;
98 pa_rtpoll *rtpoll;
99
100 bool update_stream_bufferattr_after_connect;
101 bool connected;
102 bool shutting_down;
103 bool new_data;
104
105 char *cookie_file;
106 char *remote_server;
107 char *remote_source_name;
108 char *source_name;
109
110 pa_proplist *source_proplist;
111 pa_sample_spec sample_spec;
112 pa_channel_map channel_map;
113
114 tunnel_msg *msg;
115
116 pa_usec_t reconnect_interval_us;
117 };
118
119 struct module_restart_data {
120 struct userdata *userdata;
121 pa_restart_data *restart_data;
122 };
123
124 static const char* const valid_modargs[] = {
125 "source_name",
126 "source_properties",
127 "server",
128 "source",
129 "format",
130 "channels",
131 "rate",
132 "channel_map",
133 "cookie",
134 "reconnect_interval_ms",
135 NULL,
136 };
137
cork_stream(struct userdata * u,bool cork)138 static void cork_stream(struct userdata *u, bool cork) {
139 pa_operation *operation;
140
141 pa_assert(u);
142 pa_assert(u->stream);
143
144 if ((operation = pa_stream_cork(u->stream, cork, NULL, NULL)))
145 pa_operation_unref(operation);
146 }
147
reset_bufferattr(pa_buffer_attr * bufferattr)148 static void reset_bufferattr(pa_buffer_attr *bufferattr) {
149 pa_assert(bufferattr);
150 bufferattr->fragsize = (uint32_t) -1;
151 bufferattr->minreq = (uint32_t) -1;
152 bufferattr->maxlength = (uint32_t) -1;
153 bufferattr->prebuf = (uint32_t) -1;
154 bufferattr->tlength = (uint32_t) -1;
155 }
156
tunnel_new_proplist(struct userdata * u)157 static pa_proplist* tunnel_new_proplist(struct userdata *u) {
158 pa_proplist *proplist = pa_proplist_new();
159 pa_assert(proplist);
160 pa_proplist_sets(proplist, PA_PROP_APPLICATION_NAME, "PulseAudio");
161 pa_proplist_sets(proplist, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
162 pa_proplist_sets(proplist, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
163 pa_init_proplist(proplist);
164
165 return proplist;
166 }
167
stream_read_cb(pa_stream * s,size_t length,void * userdata)168 static void stream_read_cb(pa_stream *s, size_t length, void *userdata) {
169 struct userdata *u = userdata;
170 u->new_data = true;
171 }
172
173 /* called from io context to read samples from the stream into our source */
read_new_samples(struct userdata * u)174 static void read_new_samples(struct userdata *u) {
175 const void *p;
176 size_t readable = 0;
177 pa_memchunk memchunk;
178
179 pa_assert(u);
180 u->new_data = false;
181
182 pa_memchunk_reset(&memchunk);
183
184 if (PA_UNLIKELY(!u->connected || pa_stream_get_state(u->stream) != PA_STREAM_READY))
185 return;
186
187 readable = pa_stream_readable_size(u->stream);
188 while (readable > 0) {
189 size_t nbytes = 0;
190 if (PA_UNLIKELY(pa_stream_peek(u->stream, &p, &nbytes) != 0)) {
191 pa_log("pa_stream_peek() failed: %s", pa_strerror(pa_context_errno(u->context)));
192 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
193 return;
194 }
195
196 if (PA_LIKELY(p)) {
197 /* we have valid data */
198 memchunk.memblock = pa_memblock_new_fixed(u->module->core->mempool, (void *) p, nbytes, true);
199 memchunk.length = nbytes;
200 memchunk.index = 0;
201
202 pa_source_post(u->source, &memchunk);
203 pa_memblock_unref_fixed(memchunk.memblock);
204 } else {
205 size_t bytes_to_generate = nbytes;
206
207 /* we have a hole. generate silence */
208 memchunk = u->source->silence;
209 pa_memblock_ref(memchunk.memblock);
210
211 while (bytes_to_generate > 0) {
212 if (bytes_to_generate < memchunk.length)
213 memchunk.length = bytes_to_generate;
214
215 pa_source_post(u->source, &memchunk);
216 bytes_to_generate -= memchunk.length;
217 }
218
219 pa_memblock_unref(memchunk.memblock);
220 }
221
222 pa_stream_drop(u->stream);
223 readable -= nbytes;
224 }
225 }
226
thread_func(void * userdata)227 static void thread_func(void *userdata) {
228 struct userdata *u = userdata;
229 pa_proplist *proplist;
230
231 pa_assert(u);
232
233 pa_log_debug("Thread starting up");
234 pa_thread_mq_install(u->thread_mq);
235
236 proplist = tunnel_new_proplist(u);
237 u->context = pa_context_new_with_proplist(u->thread_mainloop_api,
238 "PulseAudio",
239 proplist);
240 pa_proplist_free(proplist);
241
242 if (!u->context) {
243 pa_log("Failed to create libpulse context");
244 goto fail;
245 }
246
247 if (u->cookie_file && pa_context_load_cookie_from_file(u->context, u->cookie_file) != 0) {
248 pa_log_error("Can not load cookie file!");
249 goto fail;
250 }
251
252 pa_context_set_state_callback(u->context, context_state_cb, u);
253 if (pa_context_connect(u->context,
254 u->remote_server,
255 PA_CONTEXT_NOAUTOSPAWN,
256 NULL) < 0) {
257 pa_log("Failed to connect libpulse context: %s", pa_strerror(pa_context_errno(u->context)));
258 goto fail;
259 }
260
261 for (;;) {
262 int ret;
263
264 if (pa_mainloop_iterate(u->thread_mainloop, 1, &ret) < 0) {
265 if (ret == 0)
266 goto finish;
267 else
268 goto fail;
269 }
270
271 if (u->new_data)
272 read_new_samples(u);
273 }
274 fail:
275 /* send a message to the ctl thread to ask it to either terminate us, or
276 * restart us, but either way this thread will exit, so then wait for the
277 * shutdown message */
278 pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_MAYBE_RESTART, u, 0, NULL, NULL);
279 pa_asyncmsgq_wait_for(u->thread_mq->inq, PA_MESSAGE_SHUTDOWN);
280
281 finish:
282 if (u->stream) {
283 pa_stream_disconnect(u->stream);
284 pa_stream_unref(u->stream);
285 u->stream = NULL;
286 }
287
288 if (u->context) {
289 pa_context_disconnect(u->context);
290 pa_context_unref(u->context);
291 u->context = NULL;
292 }
293
294 pa_log_debug("Thread shutting down");
295 }
296
stream_state_cb(pa_stream * stream,void * userdata)297 static void stream_state_cb(pa_stream *stream, void *userdata) {
298 struct userdata *u = userdata;
299
300 pa_assert(u);
301
302 switch (pa_stream_get_state(stream)) {
303 case PA_STREAM_FAILED:
304 pa_log_error("Stream failed: %s", pa_strerror(pa_context_errno(u->context)));
305 u->connected = false;
306 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
307 break;
308 case PA_STREAM_TERMINATED:
309 pa_log_debug("Stream terminated.");
310 break;
311 case PA_STREAM_READY:
312 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
313 cork_stream(u, false);
314
315 /* Only call our requested_latency_cb when requested_latency
316 * changed between PA_STREAM_CREATING -> PA_STREAM_READY, because
317 * we don't want to override the initial fragsize set by the server
318 * without a good reason. */
319 if (u->update_stream_bufferattr_after_connect)
320 source_update_requested_latency_cb(u->source);
321 case PA_STREAM_UNCONNECTED:
322 case PA_STREAM_CREATING:
323 break;
324 }
325 }
326
327 /* Do a reinit of the module. Note that u will be freed as a result of this
328 * call. */
maybe_restart(struct module_restart_data * rd)329 static void maybe_restart(struct module_restart_data *rd) {
330 struct userdata *u = rd->userdata;
331
332 if (rd->restart_data) {
333 pa_log_debug("Restart already pending");
334 return;
335 }
336
337 if (u->reconnect_interval_us > 0) {
338 /* The handle returned here must be freed when do_init() finishes successfully
339 * and when the module exits. */
340 rd->restart_data = pa_restart_module_reinit(u->module, do_init, do_done, u->reconnect_interval_us);
341 } else {
342 /* exit the module */
343 pa_module_unload_request(u->module, true);
344 }
345 }
346
on_source_created(struct userdata * u)347 static void on_source_created(struct userdata *u) {
348 pa_proplist *proplist;
349 pa_buffer_attr bufferattr;
350 pa_usec_t requested_latency;
351 char *username = pa_get_user_name_malloc();
352 char *hostname = pa_get_host_name_malloc();
353 /* TODO: old tunnel put here the remote source_name into stream name e.g. 'Null Output for lynxis@lazus' */
354 char *stream_name = pa_sprintf_malloc(_("Tunnel for %s@%s"), username, hostname);
355 pa_xfree(username);
356 pa_xfree(hostname);
357
358 pa_assert_io_context();
359
360 /* if we still don't have a source, then source creation failed, and we
361 * should kill this io thread */
362 if (!u->source) {
363 pa_log_error("Could not create a source.");
364 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
365 return;
366 }
367
368 proplist = tunnel_new_proplist(u);
369 u->stream = pa_stream_new_with_proplist(u->context,
370 stream_name,
371 &u->source->sample_spec,
372 &u->source->channel_map,
373 proplist);
374 pa_proplist_free(proplist);
375 pa_xfree(stream_name);
376
377 if (!u->stream) {
378 pa_log_error("Could not create a stream: %s", pa_strerror(pa_context_errno(u->context)));
379 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
380 return;
381 }
382
383 requested_latency = pa_source_get_requested_latency_within_thread(u->source);
384 if (requested_latency == (uint32_t) -1)
385 requested_latency = u->source->thread_info.max_latency;
386
387 reset_bufferattr(&bufferattr);
388 bufferattr.fragsize = pa_usec_to_bytes(requested_latency, &u->source->sample_spec);
389
390 pa_stream_set_state_callback(u->stream, stream_state_cb, u);
391 pa_stream_set_read_callback(u->stream, stream_read_cb, u);
392 if (pa_stream_connect_record(u->stream,
393 u->remote_source_name,
394 &bufferattr,
395 PA_STREAM_INTERPOLATE_TIMING|PA_STREAM_DONT_MOVE|PA_STREAM_AUTO_TIMING_UPDATE|PA_STREAM_START_CORKED|PA_STREAM_ADJUST_LATENCY) < 0) {
396 pa_log_debug("Could not create stream: %s", pa_strerror(pa_context_errno(u->context)));
397 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
398 }
399 u->connected = true;
400 }
401
context_state_cb(pa_context * c,void * userdata)402 static void context_state_cb(pa_context *c, void *userdata) {
403 struct userdata *u = userdata;
404 pa_assert(u);
405
406 switch (pa_context_get_state(c)) {
407 case PA_CONTEXT_UNCONNECTED:
408 case PA_CONTEXT_CONNECTING:
409 case PA_CONTEXT_AUTHORIZING:
410 case PA_CONTEXT_SETTING_NAME:
411 break;
412 case PA_CONTEXT_READY:
413 pa_log_debug("Connection successful. Creating stream.");
414 pa_assert(!u->stream);
415 pa_assert(!u->source);
416
417 pa_log_debug("Asking ctl thread to create source.");
418 pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_CREATE_SOURCE_REQUEST, u, 0, NULL, NULL);
419 break;
420 case PA_CONTEXT_FAILED:
421 pa_log_debug("Context failed with err %s.", pa_strerror(pa_context_errno(u->context)));
422 u->connected = false;
423 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
424 break;
425 case PA_CONTEXT_TERMINATED:
426 pa_log_debug("Context terminated.");
427 u->connected = false;
428 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
429 break;
430 }
431 }
432
source_update_requested_latency_cb(pa_source * s)433 static void source_update_requested_latency_cb(pa_source *s) {
434 struct userdata *u;
435 pa_operation *operation;
436 size_t nbytes;
437 pa_usec_t block_usec;
438 pa_buffer_attr bufferattr;
439
440 pa_source_assert_ref(s);
441 pa_assert_se(u = s->userdata);
442
443 block_usec = pa_source_get_requested_latency_within_thread(s);
444 if (block_usec == (pa_usec_t) -1)
445 block_usec = s->thread_info.max_latency;
446
447 nbytes = pa_usec_to_bytes(block_usec, &s->sample_spec);
448
449 if (u->stream) {
450 switch (pa_stream_get_state(u->stream)) {
451 case PA_STREAM_READY:
452 if (pa_stream_get_buffer_attr(u->stream)->fragsize == nbytes)
453 break;
454
455 reset_bufferattr(&bufferattr);
456 bufferattr.fragsize = nbytes;
457 if ((operation = pa_stream_set_buffer_attr(u->stream, &bufferattr, NULL, NULL)))
458 pa_operation_unref(operation);
459 break;
460 case PA_STREAM_CREATING:
461 /* we have to delay our request until stream is ready */
462 u->update_stream_bufferattr_after_connect = true;
463 break;
464 default:
465 break;
466 }
467 }
468 }
469
source_process_msg_cb(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)470 static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
471 struct userdata *u = PA_SOURCE(o)->userdata;
472
473 switch (code) {
474 case PA_SOURCE_MESSAGE_GET_LATENCY: {
475 int negative;
476 pa_usec_t remote_latency;
477
478 if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state)) {
479 *((int64_t*) data) = 0;
480 return 0;
481 }
482
483 if (!u->stream) {
484 *((int64_t*) data) = 0;
485 return 0;
486 }
487
488 if (pa_stream_get_state(u->stream) != PA_STREAM_READY) {
489 *((int64_t*) data) = 0;
490 return 0;
491 }
492
493 if (pa_stream_get_latency(u->stream, &remote_latency, &negative) < 0) {
494 *((int64_t*) data) = 0;
495 return 0;
496 }
497
498 if (negative)
499 *((int64_t*) data) = - (int64_t)remote_latency;
500 else
501 *((int64_t*) data) = remote_latency;
502
503 return 0;
504 }
505 case TUNNEL_MESSAGE_SOURCE_CREATED:
506 on_source_created(u);
507 return 0;
508 }
509 return pa_source_process_msg(o, code, data, offset, chunk);
510 }
511
512 /* Called from the IO thread. */
source_set_state_in_io_thread_cb(pa_source * s,pa_source_state_t new_state,pa_suspend_cause_t new_suspend_cause)513 static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state, pa_suspend_cause_t new_suspend_cause) {
514 struct userdata *u;
515
516 pa_assert(s);
517 pa_assert_se(u = s->userdata);
518
519 /* It may be that only the suspend cause is changing, in which case there's
520 * nothing to do. */
521 if (new_state == s->thread_info.state)
522 return 0;
523
524 if (!u->stream || pa_stream_get_state(u->stream) != PA_STREAM_READY)
525 return 0;
526
527 switch (new_state) {
528 case PA_SOURCE_SUSPENDED: {
529 cork_stream(u, true);
530 break;
531 }
532 case PA_SOURCE_IDLE:
533 case PA_SOURCE_RUNNING: {
534 cork_stream(u, false);
535 break;
536 }
537 case PA_SOURCE_INVALID_STATE:
538 case PA_SOURCE_INIT:
539 case PA_SOURCE_UNLINKED:
540 break;
541 }
542
543 return 0;
544 }
545
546 /* Creates a source in the main thread.
547 *
548 * This method is called when we receive a message from the io thread that a
549 * connection has been established with the server. We defer creation of the
550 * source until the connection is established, because we don't have a source
551 * if the remote server isn't there.
552 */
create_source(struct userdata * u)553 static void create_source(struct userdata *u) {
554 pa_source_new_data source_data;
555
556 pa_assert_ctl_context();
557
558 /* Create source */
559 pa_source_new_data_init(&source_data);
560 source_data.driver = __FILE__;
561 source_data.module = u->module;
562
563 pa_source_new_data_set_name(&source_data, u->source_name);
564 pa_source_new_data_set_sample_spec(&source_data, &u->sample_spec);
565 pa_source_new_data_set_channel_map(&source_data, &u->channel_map);
566
567 pa_proplist_update(source_data.proplist, PA_UPDATE_REPLACE, u->source_proplist);
568
569 if (!(u->source = pa_source_new(u->module->core, &source_data, PA_SOURCE_LATENCY | PA_SOURCE_DYNAMIC_LATENCY | PA_SOURCE_NETWORK))) {
570 pa_log("Failed to create source.");
571 goto finish;
572 }
573
574 u->source->userdata = u;
575 u->source->parent.process_msg = source_process_msg_cb;
576 u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb;
577 u->source->update_requested_latency = source_update_requested_latency_cb;
578
579 pa_source_set_asyncmsgq(u->source, u->thread_mq->inq);
580 pa_source_set_rtpoll(u->source, u->rtpoll);
581
582 pa_source_put(u->source);
583
584 finish:
585 pa_source_new_data_done(&source_data);
586
587 /* tell any interested io threads that the sink they asked for has now been
588 * created (even if we failed, we still notify the thread, so they can
589 * either handle or kill the thread, rather than deadlock waiting for a
590 * message that will never come */
591 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), TUNNEL_MESSAGE_SOURCE_CREATED, u, 0, NULL);
592 }
593
594 /* Runs in PA mainloop context */
tunnel_process_msg(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)595 static int tunnel_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
596 struct userdata *u = (struct userdata *) data;
597
598 pa_assert(u);
599 pa_assert_ctl_context();
600
601 if (u->shutting_down)
602 return 0;
603
604 switch (code) {
605 case TUNNEL_MESSAGE_CREATE_SOURCE_REQUEST:
606 create_source(u);
607 break;
608 case TUNNEL_MESSAGE_MAYBE_RESTART:
609 maybe_restart(u->module->userdata);
610 break;
611 }
612
613 return 0;
614 }
615
do_init(pa_module * m)616 static int do_init(pa_module *m) {
617 struct userdata *u = NULL;
618 struct module_restart_data *rd;
619 pa_modargs *ma = NULL;
620 const char *remote_server = NULL;
621 char *default_source_name = NULL;
622 uint32_t reconnect_interval_ms = 0;
623
624 pa_assert(m);
625 pa_assert(m->userdata);
626
627 rd = m->userdata;
628
629 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
630 pa_log("Failed to parse module arguments.");
631 goto fail;
632 }
633
634 u = pa_xnew0(struct userdata, 1);
635 u->module = m;
636 rd->userdata = u;
637
638 u->sample_spec = m->core->default_sample_spec;
639 u->channel_map = m->core->default_channel_map;
640 if (pa_modargs_get_sample_spec_and_channel_map(ma, &u->sample_spec, &u->channel_map, PA_CHANNEL_MAP_DEFAULT) < 0) {
641 pa_log("Invalid sample format specification or channel map");
642 goto fail;
643 }
644
645 remote_server = pa_modargs_get_value(ma, "server", NULL);
646 if (!remote_server) {
647 pa_log("No server given!");
648 goto fail;
649 }
650
651 u->remote_server = pa_xstrdup(remote_server);
652 u->thread_mainloop = pa_mainloop_new();
653 if (u->thread_mainloop == NULL) {
654 pa_log("Failed to create mainloop");
655 goto fail;
656 }
657 u->thread_mainloop_api = pa_mainloop_get_api(u->thread_mainloop);
658 u->cookie_file = pa_xstrdup(pa_modargs_get_value(ma, "cookie", NULL));
659 u->remote_source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));
660
661 u->thread_mq = pa_xnew0(pa_thread_mq, 1);
662
663 if (pa_thread_mq_init_thread_mainloop(u->thread_mq, m->core->mainloop, u->thread_mainloop_api) < 0) {
664 pa_log("pa_thread_mq_init_thread_mainloop() failed.");
665 goto fail;
666 }
667
668 u->msg = pa_msgobject_new(tunnel_msg);
669 u->msg->parent.process_msg = tunnel_process_msg;
670
671 /* The rtpoll created here is never run. It is only necessary to avoid crashes
672 * when module-tunnel-source-new is used together with module-loopback.
673 * module-loopback bases the asyncmsq on the rtpoll provided by the source and
674 * only works because it calls pa_asyncmsq_process_one(). */
675 u->rtpoll = pa_rtpoll_new();
676
677 default_source_name = pa_sprintf_malloc("tunnel-source-new.%s", remote_server);
678 u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source_name", default_source_name));
679
680 u->source_proplist = pa_proplist_new();
681 pa_proplist_sets(u->source_proplist, PA_PROP_DEVICE_CLASS, "sound");
682 pa_proplist_setf(u->source_proplist,
683 PA_PROP_DEVICE_DESCRIPTION,
684 _("Tunnel to %s/%s"),
685 remote_server,
686 pa_strempty(u->remote_source_name));
687
688 if (pa_modargs_get_proplist(ma, "source_properties", u->source_proplist, PA_UPDATE_REPLACE) < 0) {
689 pa_log("Invalid properties");
690 goto fail;
691 }
692
693 pa_modargs_get_value_u32(ma, "reconnect_interval_ms", &reconnect_interval_ms);
694 u->reconnect_interval_us = reconnect_interval_ms * PA_USEC_PER_MSEC;
695
696 if (!(u->thread = pa_thread_new("tunnel-source", thread_func, u))) {
697 pa_log("Failed to create thread.");
698 goto fail;
699 }
700
701 /* If the module is restarting and do_init() finishes successfully, the
702 * restart data is no longer needed. If do_init() fails, don't touch the
703 * restart data, because following restart attempts will continue to use
704 * the same data. If restart_data is NULL, that means no restart is
705 * currently pending. */
706 if (rd->restart_data) {
707 pa_restart_free(rd->restart_data);
708 rd->restart_data = NULL;
709 }
710
711 pa_modargs_free(ma);
712 pa_xfree(default_source_name);
713
714 return 0;
715
716 fail:
717 if (ma)
718 pa_modargs_free(ma);
719
720 if (default_source_name)
721 pa_xfree(default_source_name);
722
723 return -1;
724 }
725
do_done(pa_module * m)726 static void do_done(pa_module *m) {
727 struct userdata *u = NULL;
728 struct module_restart_data *rd;
729
730 pa_assert(m);
731
732 if (!(rd = m->userdata))
733 return;
734 if (!(u = rd->userdata))
735 return;
736
737 u->shutting_down = true;
738
739 if (u->source)
740 pa_source_unlink(u->source);
741
742 if (u->thread) {
743 pa_asyncmsgq_send(u->thread_mq->inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
744 pa_thread_free(u->thread);
745 }
746
747 if (u->thread_mq) {
748 pa_thread_mq_done(u->thread_mq);
749 pa_xfree(u->thread_mq);
750 }
751
752 if (u->thread_mainloop)
753 pa_mainloop_free(u->thread_mainloop);
754
755 if (u->cookie_file)
756 pa_xfree(u->cookie_file);
757
758 if (u->remote_source_name)
759 pa_xfree(u->remote_source_name);
760
761 if (u->remote_server)
762 pa_xfree(u->remote_server);
763
764 if (u->source)
765 pa_source_unref(u->source);
766
767 if (u->rtpoll)
768 pa_rtpoll_free(u->rtpoll);
769
770 if (u->source_proplist)
771 pa_proplist_free(u->source_proplist);
772
773 if (u->source_name)
774 pa_xfree(u->source_name);
775
776 pa_xfree(u->msg);
777
778 pa_xfree(u);
779
780 rd->userdata = NULL;
781 }
782
pa__init(pa_module * m)783 int pa__init(pa_module *m) {
784 int ret;
785
786 pa_assert(m);
787
788 m->userdata = pa_xnew0(struct module_restart_data, 1);
789
790 ret = do_init(m);
791
792 if (ret < 0)
793 pa__done(m);
794
795 return ret;
796 }
797
pa__done(pa_module * m)798 void pa__done(pa_module *m) {
799 pa_assert(m);
800
801 do_done(m);
802
803 if (m->userdata) {
804 struct module_restart_data *rd = m->userdata;
805
806 if (rd->restart_data)
807 pa_restart_free(rd->restart_data);
808
809 pa_xfree(m->userdata);
810 }
811 }
812