• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /***
2     This file is part of PulseAudio.
3 
4     Copyright 2013 Alexander Couzens
5 
6     PulseAudio is free software; you can redistribute it and/or modify
7     it under the terms of the GNU Lesser General Public License as published
8     by the Free Software Foundation; either version 2.1 of the License,
9     or (at your option) any later version.
10 
11     PulseAudio is distributed in the hope that it will be useful, but
12     WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14     General Public License for more details.
15 
16     You should have received a copy of the GNU Lesser General Public License
17     along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
18 ***/
19 
20 #ifdef HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23 
24 #include "restart-module.h"
25 
26 #include <pulse/context.h>
27 #include <pulse/timeval.h>
28 #include <pulse/xmalloc.h>
29 #include <pulse/stream.h>
30 #include <pulse/mainloop.h>
31 #include <pulse/introspect.h>
32 #include <pulse/error.h>
33 
34 #include <pulsecore/core.h>
35 #include <pulsecore/core-util.h>
36 #include <pulsecore/i18n.h>
37 #include <pulsecore/source.h>
38 #include <pulsecore/modargs.h>
39 #include <pulsecore/log.h>
40 #include <pulsecore/thread.h>
41 #include <pulsecore/thread-mq.h>
42 #include <pulsecore/poll.h>
43 #include <pulsecore/rtpoll.h>
44 #include <pulsecore/proplist-util.h>
45 
46 PA_MODULE_AUTHOR("Alexander Couzens");
47 PA_MODULE_DESCRIPTION("Create a network source which connects via a stream to a remote PulseAudio server");
48 PA_MODULE_VERSION(PACKAGE_VERSION);
49 PA_MODULE_LOAD_ONCE(false);
50 PA_MODULE_USAGE(
51         "server=<address> "
52         "source=<name of the remote source> "
53         "source_name=<name for the local source> "
54         "source_properties=<properties for the local source> "
55         "reconnect_interval_ms=<interval to try reconnects, 0 or omitted if disabled> "
56         "format=<sample format> "
57         "channels=<number of channels> "
58         "rate=<sample rate> "
59         "channel_map=<channel map> "
60         "cookie=<cookie file path>"
61         );
62 
63 #define TUNNEL_THREAD_FAILED_MAINLOOP 1
64 
65 static int do_init(pa_module *m);
66 static void do_done(pa_module *m);
67 static void stream_state_cb(pa_stream *stream, void *userdata);
68 static void stream_read_cb(pa_stream *s, size_t length, void *userdata);
69 static void context_state_cb(pa_context *c, void *userdata);
70 static void source_update_requested_latency_cb(pa_source *s);
71 
72 struct tunnel_msg {
73     pa_msgobject parent;
74 };
75 
76 typedef struct tunnel_msg tunnel_msg;
77 PA_DEFINE_PRIVATE_CLASS(tunnel_msg, pa_msgobject);
78 
79 enum {
80     TUNNEL_MESSAGE_CREATE_SOURCE_REQUEST,
81     TUNNEL_MESSAGE_MAYBE_RESTART,
82 };
83 
84 enum {
85     TUNNEL_MESSAGE_SOURCE_CREATED = PA_SOURCE_MESSAGE_MAX,
86 };
87 
88 struct userdata {
89     pa_module *module;
90     pa_source *source;
91     pa_thread *thread;
92     pa_thread_mq *thread_mq;
93     pa_mainloop *thread_mainloop;
94     pa_mainloop_api *thread_mainloop_api;
95 
96     pa_context *context;
97     pa_stream *stream;
98     pa_rtpoll *rtpoll;
99 
100     bool update_stream_bufferattr_after_connect;
101     bool connected;
102     bool shutting_down;
103     bool new_data;
104 
105     char *cookie_file;
106     char *remote_server;
107     char *remote_source_name;
108     char *source_name;
109 
110     pa_proplist *source_proplist;
111     pa_sample_spec sample_spec;
112     pa_channel_map channel_map;
113 
114     tunnel_msg *msg;
115 
116     pa_usec_t reconnect_interval_us;
117 };
118 
119 struct module_restart_data {
120     struct userdata *userdata;
121     pa_restart_data *restart_data;
122 };
123 
124 static const char* const valid_modargs[] = {
125     "source_name",
126     "source_properties",
127     "server",
128     "source",
129     "format",
130     "channels",
131     "rate",
132     "channel_map",
133     "cookie",
134     "reconnect_interval_ms",
135     NULL,
136 };
137 
cork_stream(struct userdata * u,bool cork)138 static void cork_stream(struct userdata *u, bool cork) {
139     pa_operation *operation;
140 
141     pa_assert(u);
142     pa_assert(u->stream);
143 
144     if ((operation = pa_stream_cork(u->stream, cork, NULL, NULL)))
145         pa_operation_unref(operation);
146 }
147 
reset_bufferattr(pa_buffer_attr * bufferattr)148 static void reset_bufferattr(pa_buffer_attr *bufferattr) {
149     pa_assert(bufferattr);
150     bufferattr->fragsize = (uint32_t) -1;
151     bufferattr->minreq = (uint32_t) -1;
152     bufferattr->maxlength = (uint32_t) -1;
153     bufferattr->prebuf = (uint32_t) -1;
154     bufferattr->tlength = (uint32_t) -1;
155 }
156 
tunnel_new_proplist(struct userdata * u)157 static pa_proplist* tunnel_new_proplist(struct userdata *u) {
158     pa_proplist *proplist = pa_proplist_new();
159     pa_assert(proplist);
160     pa_proplist_sets(proplist, PA_PROP_APPLICATION_NAME, "PulseAudio");
161     pa_proplist_sets(proplist, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
162     pa_proplist_sets(proplist, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
163     pa_init_proplist(proplist);
164 
165     return proplist;
166 }
167 
stream_read_cb(pa_stream * s,size_t length,void * userdata)168 static void stream_read_cb(pa_stream *s, size_t length, void *userdata) {
169     struct userdata *u = userdata;
170     u->new_data = true;
171 }
172 
173 /* called from io context to read samples from the stream into our source */
read_new_samples(struct userdata * u)174 static void read_new_samples(struct userdata *u) {
175     const void *p;
176     size_t readable = 0;
177     pa_memchunk memchunk;
178 
179     pa_assert(u);
180     u->new_data = false;
181 
182     pa_memchunk_reset(&memchunk);
183 
184     if (PA_UNLIKELY(!u->connected || pa_stream_get_state(u->stream) != PA_STREAM_READY))
185         return;
186 
187     readable = pa_stream_readable_size(u->stream);
188     while (readable > 0) {
189         size_t nbytes = 0;
190         if (PA_UNLIKELY(pa_stream_peek(u->stream, &p, &nbytes) != 0)) {
191             pa_log("pa_stream_peek() failed: %s", pa_strerror(pa_context_errno(u->context)));
192             u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
193             return;
194         }
195 
196         if (PA_LIKELY(p)) {
197             /* we have valid data */
198             memchunk.memblock = pa_memblock_new_fixed(u->module->core->mempool, (void *) p, nbytes, true);
199             memchunk.length = nbytes;
200             memchunk.index = 0;
201 
202             pa_source_post(u->source, &memchunk);
203             pa_memblock_unref_fixed(memchunk.memblock);
204         } else {
205             size_t bytes_to_generate = nbytes;
206 
207             /* we have a hole. generate silence */
208             memchunk = u->source->silence;
209             pa_memblock_ref(memchunk.memblock);
210 
211             while (bytes_to_generate > 0) {
212                 if (bytes_to_generate < memchunk.length)
213                     memchunk.length = bytes_to_generate;
214 
215                 pa_source_post(u->source, &memchunk);
216                 bytes_to_generate -= memchunk.length;
217             }
218 
219             pa_memblock_unref(memchunk.memblock);
220         }
221 
222         pa_stream_drop(u->stream);
223         readable -= nbytes;
224     }
225 }
226 
thread_func(void * userdata)227 static void thread_func(void *userdata) {
228     struct userdata *u = userdata;
229     pa_proplist *proplist;
230 
231     pa_assert(u);
232 
233     pa_log_debug("Thread starting up");
234     pa_thread_mq_install(u->thread_mq);
235 
236     proplist = tunnel_new_proplist(u);
237     u->context = pa_context_new_with_proplist(u->thread_mainloop_api,
238                                               "PulseAudio",
239                                               proplist);
240     pa_proplist_free(proplist);
241 
242     if (!u->context) {
243         pa_log("Failed to create libpulse context");
244         goto fail;
245     }
246 
247     if (u->cookie_file && pa_context_load_cookie_from_file(u->context, u->cookie_file) != 0) {
248         pa_log_error("Can not load cookie file!");
249         goto fail;
250     }
251 
252     pa_context_set_state_callback(u->context, context_state_cb, u);
253     if (pa_context_connect(u->context,
254                            u->remote_server,
255                            PA_CONTEXT_NOAUTOSPAWN,
256                            NULL) < 0) {
257         pa_log("Failed to connect libpulse context: %s", pa_strerror(pa_context_errno(u->context)));
258         goto fail;
259     }
260 
261     for (;;) {
262         int ret;
263 
264         if (pa_mainloop_iterate(u->thread_mainloop, 1, &ret) < 0) {
265             if (ret == 0)
266                 goto finish;
267             else
268                 goto fail;
269         }
270 
271         if (u->new_data)
272             read_new_samples(u);
273     }
274 fail:
275     /* send a message to the ctl thread to ask it to either terminate us, or
276      * restart us, but either way this thread will exit, so then wait for the
277      * shutdown message */
278     pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_MAYBE_RESTART, u, 0, NULL, NULL);
279     pa_asyncmsgq_wait_for(u->thread_mq->inq, PA_MESSAGE_SHUTDOWN);
280 
281 finish:
282     if (u->stream) {
283         pa_stream_disconnect(u->stream);
284         pa_stream_unref(u->stream);
285         u->stream = NULL;
286     }
287 
288     if (u->context) {
289         pa_context_disconnect(u->context);
290         pa_context_unref(u->context);
291         u->context = NULL;
292     }
293 
294     pa_log_debug("Thread shutting down");
295 }
296 
stream_state_cb(pa_stream * stream,void * userdata)297 static void stream_state_cb(pa_stream *stream, void *userdata) {
298     struct userdata *u = userdata;
299 
300     pa_assert(u);
301 
302     switch (pa_stream_get_state(stream)) {
303         case PA_STREAM_FAILED:
304             pa_log_error("Stream failed: %s", pa_strerror(pa_context_errno(u->context)));
305             u->connected = false;
306             u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
307             break;
308         case PA_STREAM_TERMINATED:
309             pa_log_debug("Stream terminated.");
310             break;
311         case PA_STREAM_READY:
312             if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
313                 cork_stream(u, false);
314 
315             /* Only call our requested_latency_cb when requested_latency
316              * changed between PA_STREAM_CREATING -> PA_STREAM_READY, because
317              * we don't want to override the initial fragsize set by the server
318              * without a good reason. */
319             if (u->update_stream_bufferattr_after_connect)
320                 source_update_requested_latency_cb(u->source);
321         case PA_STREAM_UNCONNECTED:
322         case PA_STREAM_CREATING:
323             break;
324     }
325 }
326 
327 /* Do a reinit of the module.  Note that u will be freed as a result of this
328  * call. */
maybe_restart(struct module_restart_data * rd)329 static void maybe_restart(struct module_restart_data *rd) {
330     struct userdata *u = rd->userdata;
331 
332     if (rd->restart_data) {
333         pa_log_debug("Restart already pending");
334         return;
335     }
336 
337     if (u->reconnect_interval_us > 0) {
338         /* The handle returned here must be freed when do_init() finishes successfully
339          * and when the module exits. */
340         rd->restart_data = pa_restart_module_reinit(u->module, do_init, do_done, u->reconnect_interval_us);
341     } else {
342         /* exit the module */
343         pa_module_unload_request(u->module, true);
344     }
345 }
346 
on_source_created(struct userdata * u)347 static void on_source_created(struct userdata *u) {
348     pa_proplist *proplist;
349     pa_buffer_attr bufferattr;
350     pa_usec_t requested_latency;
351     char *username = pa_get_user_name_malloc();
352     char *hostname = pa_get_host_name_malloc();
353     /* TODO: old tunnel put here the remote source_name into stream name e.g. 'Null Output for lynxis@lazus' */
354     char *stream_name = pa_sprintf_malloc(_("Tunnel for %s@%s"), username, hostname);
355     pa_xfree(username);
356     pa_xfree(hostname);
357 
358     pa_assert_io_context();
359 
360     /* if we still don't have a source, then source creation failed, and we
361      * should kill this io thread */
362     if (!u->source) {
363         pa_log_error("Could not create a source.");
364         u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
365         return;
366     }
367 
368     proplist = tunnel_new_proplist(u);
369     u->stream = pa_stream_new_with_proplist(u->context,
370                                             stream_name,
371                                             &u->source->sample_spec,
372                                             &u->source->channel_map,
373                                             proplist);
374     pa_proplist_free(proplist);
375     pa_xfree(stream_name);
376 
377     if (!u->stream) {
378         pa_log_error("Could not create a stream: %s", pa_strerror(pa_context_errno(u->context)));
379         u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
380         return;
381     }
382 
383     requested_latency = pa_source_get_requested_latency_within_thread(u->source);
384     if (requested_latency == (uint32_t) -1)
385         requested_latency = u->source->thread_info.max_latency;
386 
387     reset_bufferattr(&bufferattr);
388     bufferattr.fragsize = pa_usec_to_bytes(requested_latency, &u->source->sample_spec);
389 
390     pa_stream_set_state_callback(u->stream, stream_state_cb, u);
391     pa_stream_set_read_callback(u->stream, stream_read_cb, u);
392     if (pa_stream_connect_record(u->stream,
393                                  u->remote_source_name,
394                                  &bufferattr,
395                                  PA_STREAM_INTERPOLATE_TIMING|PA_STREAM_DONT_MOVE|PA_STREAM_AUTO_TIMING_UPDATE|PA_STREAM_START_CORKED|PA_STREAM_ADJUST_LATENCY) < 0) {
396         pa_log_debug("Could not create stream: %s", pa_strerror(pa_context_errno(u->context)));
397         u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
398     }
399     u->connected = true;
400 }
401 
context_state_cb(pa_context * c,void * userdata)402 static void context_state_cb(pa_context *c, void *userdata) {
403     struct userdata *u = userdata;
404     pa_assert(u);
405 
406     switch (pa_context_get_state(c)) {
407         case PA_CONTEXT_UNCONNECTED:
408         case PA_CONTEXT_CONNECTING:
409         case PA_CONTEXT_AUTHORIZING:
410         case PA_CONTEXT_SETTING_NAME:
411             break;
412         case PA_CONTEXT_READY:
413             pa_log_debug("Connection successful. Creating stream.");
414             pa_assert(!u->stream);
415             pa_assert(!u->source);
416 
417             pa_log_debug("Asking ctl thread to create source.");
418             pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_CREATE_SOURCE_REQUEST, u, 0, NULL, NULL);
419             break;
420         case PA_CONTEXT_FAILED:
421             pa_log_debug("Context failed with err %s.", pa_strerror(pa_context_errno(u->context)));
422             u->connected = false;
423             u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
424             break;
425         case PA_CONTEXT_TERMINATED:
426             pa_log_debug("Context terminated.");
427             u->connected = false;
428             u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
429             break;
430     }
431 }
432 
source_update_requested_latency_cb(pa_source * s)433 static void source_update_requested_latency_cb(pa_source *s) {
434     struct userdata *u;
435     pa_operation *operation;
436     size_t nbytes;
437     pa_usec_t block_usec;
438     pa_buffer_attr bufferattr;
439 
440     pa_source_assert_ref(s);
441     pa_assert_se(u = s->userdata);
442 
443     block_usec = pa_source_get_requested_latency_within_thread(s);
444     if (block_usec == (pa_usec_t) -1)
445         block_usec = s->thread_info.max_latency;
446 
447     nbytes = pa_usec_to_bytes(block_usec, &s->sample_spec);
448 
449     if (u->stream) {
450         switch (pa_stream_get_state(u->stream)) {
451             case PA_STREAM_READY:
452                 if (pa_stream_get_buffer_attr(u->stream)->fragsize == nbytes)
453                     break;
454 
455                 reset_bufferattr(&bufferattr);
456                 bufferattr.fragsize = nbytes;
457                 if ((operation = pa_stream_set_buffer_attr(u->stream, &bufferattr, NULL, NULL)))
458                     pa_operation_unref(operation);
459                 break;
460             case PA_STREAM_CREATING:
461                 /* we have to delay our request until stream is ready */
462                 u->update_stream_bufferattr_after_connect = true;
463                 break;
464             default:
465                 break;
466         }
467     }
468 }
469 
source_process_msg_cb(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)470 static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
471     struct userdata *u = PA_SOURCE(o)->userdata;
472 
473     switch (code) {
474         case PA_SOURCE_MESSAGE_GET_LATENCY: {
475             int negative;
476             pa_usec_t remote_latency;
477 
478             if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state)) {
479                 *((int64_t*) data) = 0;
480                 return 0;
481             }
482 
483             if (!u->stream) {
484                 *((int64_t*) data) = 0;
485                 return 0;
486             }
487 
488             if (pa_stream_get_state(u->stream) != PA_STREAM_READY) {
489                 *((int64_t*) data) = 0;
490                 return 0;
491             }
492 
493             if (pa_stream_get_latency(u->stream, &remote_latency, &negative) < 0) {
494                 *((int64_t*) data) = 0;
495                 return 0;
496             }
497 
498             if (negative)
499                 *((int64_t*) data) = - (int64_t)remote_latency;
500             else
501                 *((int64_t*) data) = remote_latency;
502 
503             return 0;
504         }
505         case TUNNEL_MESSAGE_SOURCE_CREATED:
506             on_source_created(u);
507             return 0;
508     }
509     return pa_source_process_msg(o, code, data, offset, chunk);
510 }
511 
512 /* Called from the IO thread. */
source_set_state_in_io_thread_cb(pa_source * s,pa_source_state_t new_state,pa_suspend_cause_t new_suspend_cause)513 static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state, pa_suspend_cause_t new_suspend_cause) {
514     struct userdata *u;
515 
516     pa_assert(s);
517     pa_assert_se(u = s->userdata);
518 
519     /* It may be that only the suspend cause is changing, in which case there's
520      * nothing to do. */
521     if (new_state == s->thread_info.state)
522         return 0;
523 
524     if (!u->stream || pa_stream_get_state(u->stream) != PA_STREAM_READY)
525         return 0;
526 
527     switch (new_state) {
528         case PA_SOURCE_SUSPENDED: {
529             cork_stream(u, true);
530             break;
531         }
532         case PA_SOURCE_IDLE:
533         case PA_SOURCE_RUNNING: {
534             cork_stream(u, false);
535             break;
536         }
537         case PA_SOURCE_INVALID_STATE:
538         case PA_SOURCE_INIT:
539         case PA_SOURCE_UNLINKED:
540             break;
541     }
542 
543     return 0;
544 }
545 
546 /* Creates a source in the main thread.
547  *
548  * This method is called when we receive a message from the io thread that a
549  * connection has been established with the server.  We defer creation of the
550  * source until the connection is established, because we don't have a source
551  * if the remote server isn't there.
552  */
create_source(struct userdata * u)553 static void create_source(struct userdata *u) {
554     pa_source_new_data source_data;
555 
556     pa_assert_ctl_context();
557 
558     /* Create source */
559     pa_source_new_data_init(&source_data);
560     source_data.driver = __FILE__;
561     source_data.module = u->module;
562 
563     pa_source_new_data_set_name(&source_data, u->source_name);
564     pa_source_new_data_set_sample_spec(&source_data, &u->sample_spec);
565     pa_source_new_data_set_channel_map(&source_data, &u->channel_map);
566 
567     pa_proplist_update(source_data.proplist, PA_UPDATE_REPLACE, u->source_proplist);
568 
569     if (!(u->source = pa_source_new(u->module->core, &source_data, PA_SOURCE_LATENCY | PA_SOURCE_DYNAMIC_LATENCY | PA_SOURCE_NETWORK))) {
570         pa_log("Failed to create source.");
571         goto finish;
572     }
573 
574     u->source->userdata = u;
575     u->source->parent.process_msg = source_process_msg_cb;
576     u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb;
577     u->source->update_requested_latency = source_update_requested_latency_cb;
578 
579     pa_source_set_asyncmsgq(u->source, u->thread_mq->inq);
580     pa_source_set_rtpoll(u->source, u->rtpoll);
581 
582     pa_source_put(u->source);
583 
584 finish:
585     pa_source_new_data_done(&source_data);
586 
587     /* tell any interested io threads that the sink they asked for has now been
588      * created (even if we failed, we still notify the thread, so they can
589      * either handle or kill the thread, rather than deadlock waiting for a
590      * message that will never come */
591     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), TUNNEL_MESSAGE_SOURCE_CREATED, u, 0, NULL);
592 }
593 
594 /* Runs in PA mainloop context */
tunnel_process_msg(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)595 static int tunnel_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
596     struct userdata *u = (struct userdata *) data;
597 
598     pa_assert(u);
599     pa_assert_ctl_context();
600 
601     if (u->shutting_down)
602         return 0;
603 
604     switch (code) {
605         case TUNNEL_MESSAGE_CREATE_SOURCE_REQUEST:
606             create_source(u);
607             break;
608         case TUNNEL_MESSAGE_MAYBE_RESTART:
609             maybe_restart(u->module->userdata);
610             break;
611     }
612 
613     return 0;
614 }
615 
do_init(pa_module * m)616 static int do_init(pa_module *m) {
617     struct userdata *u = NULL;
618     struct module_restart_data *rd;
619     pa_modargs *ma = NULL;
620     const char *remote_server = NULL;
621     char *default_source_name = NULL;
622     uint32_t reconnect_interval_ms = 0;
623 
624     pa_assert(m);
625     pa_assert(m->userdata);
626 
627     rd = m->userdata;
628 
629     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
630         pa_log("Failed to parse module arguments.");
631         goto fail;
632     }
633 
634     u = pa_xnew0(struct userdata, 1);
635     u->module = m;
636     rd->userdata = u;
637 
638     u->sample_spec = m->core->default_sample_spec;
639     u->channel_map = m->core->default_channel_map;
640     if (pa_modargs_get_sample_spec_and_channel_map(ma, &u->sample_spec, &u->channel_map, PA_CHANNEL_MAP_DEFAULT) < 0) {
641         pa_log("Invalid sample format specification or channel map");
642         goto fail;
643     }
644 
645     remote_server = pa_modargs_get_value(ma, "server", NULL);
646     if (!remote_server) {
647         pa_log("No server given!");
648         goto fail;
649     }
650 
651     u->remote_server = pa_xstrdup(remote_server);
652     u->thread_mainloop = pa_mainloop_new();
653     if (u->thread_mainloop == NULL) {
654         pa_log("Failed to create mainloop");
655         goto fail;
656     }
657     u->thread_mainloop_api = pa_mainloop_get_api(u->thread_mainloop);
658     u->cookie_file = pa_xstrdup(pa_modargs_get_value(ma, "cookie", NULL));
659     u->remote_source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));
660 
661     u->thread_mq = pa_xnew0(pa_thread_mq, 1);
662 
663     if (pa_thread_mq_init_thread_mainloop(u->thread_mq, m->core->mainloop, u->thread_mainloop_api) < 0) {
664         pa_log("pa_thread_mq_init_thread_mainloop() failed.");
665         goto fail;
666     }
667 
668     u->msg = pa_msgobject_new(tunnel_msg);
669     u->msg->parent.process_msg = tunnel_process_msg;
670 
671     /* The rtpoll created here is never run. It is only necessary to avoid crashes
672      * when module-tunnel-source-new is used together with module-loopback.
673      * module-loopback bases the asyncmsq on the rtpoll provided by the source and
674      * only works because it calls pa_asyncmsq_process_one(). */
675     u->rtpoll = pa_rtpoll_new();
676 
677     default_source_name = pa_sprintf_malloc("tunnel-source-new.%s", remote_server);
678     u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source_name", default_source_name));
679 
680     u->source_proplist = pa_proplist_new();
681     pa_proplist_sets(u->source_proplist, PA_PROP_DEVICE_CLASS, "sound");
682     pa_proplist_setf(u->source_proplist,
683                      PA_PROP_DEVICE_DESCRIPTION,
684                      _("Tunnel to %s/%s"),
685                      remote_server,
686                      pa_strempty(u->remote_source_name));
687 
688     if (pa_modargs_get_proplist(ma, "source_properties", u->source_proplist, PA_UPDATE_REPLACE) < 0) {
689         pa_log("Invalid properties");
690         goto fail;
691     }
692 
693     pa_modargs_get_value_u32(ma, "reconnect_interval_ms", &reconnect_interval_ms);
694     u->reconnect_interval_us = reconnect_interval_ms * PA_USEC_PER_MSEC;
695 
696     if (!(u->thread = pa_thread_new("tunnel-source", thread_func, u))) {
697         pa_log("Failed to create thread.");
698         goto fail;
699     }
700 
701     /* If the module is restarting and do_init() finishes successfully, the
702      * restart data is no longer needed. If do_init() fails, don't touch the
703      * restart data, because following restart attempts will continue to use
704      * the same data. If restart_data is NULL, that means no restart is
705      * currently pending. */
706     if (rd->restart_data) {
707         pa_restart_free(rd->restart_data);
708         rd->restart_data = NULL;
709     }
710 
711     pa_modargs_free(ma);
712     pa_xfree(default_source_name);
713 
714     return 0;
715 
716 fail:
717     if (ma)
718         pa_modargs_free(ma);
719 
720     if (default_source_name)
721         pa_xfree(default_source_name);
722 
723     return -1;
724 }
725 
do_done(pa_module * m)726 static void do_done(pa_module *m) {
727     struct userdata *u = NULL;
728     struct module_restart_data *rd;
729 
730     pa_assert(m);
731 
732     if (!(rd = m->userdata))
733         return;
734     if (!(u = rd->userdata))
735         return;
736 
737     u->shutting_down = true;
738 
739     if (u->source)
740         pa_source_unlink(u->source);
741 
742     if (u->thread) {
743         pa_asyncmsgq_send(u->thread_mq->inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
744         pa_thread_free(u->thread);
745     }
746 
747     if (u->thread_mq) {
748         pa_thread_mq_done(u->thread_mq);
749         pa_xfree(u->thread_mq);
750     }
751 
752     if (u->thread_mainloop)
753         pa_mainloop_free(u->thread_mainloop);
754 
755     if (u->cookie_file)
756         pa_xfree(u->cookie_file);
757 
758     if (u->remote_source_name)
759         pa_xfree(u->remote_source_name);
760 
761     if (u->remote_server)
762         pa_xfree(u->remote_server);
763 
764     if (u->source)
765         pa_source_unref(u->source);
766 
767     if (u->rtpoll)
768         pa_rtpoll_free(u->rtpoll);
769 
770     if (u->source_proplist)
771         pa_proplist_free(u->source_proplist);
772 
773     if (u->source_name)
774         pa_xfree(u->source_name);
775 
776     pa_xfree(u->msg);
777 
778     pa_xfree(u);
779 
780     rd->userdata = NULL;
781 }
782 
pa__init(pa_module * m)783 int pa__init(pa_module *m) {
784     int ret;
785 
786     pa_assert(m);
787 
788     m->userdata = pa_xnew0(struct module_restart_data, 1);
789 
790     ret = do_init(m);
791 
792     if (ret < 0)
793         pa__done(m);
794 
795     return ret;
796 }
797 
pa__done(pa_module * m)798 void pa__done(pa_module *m) {
799     pa_assert(m);
800 
801     do_done(m);
802 
803     if (m->userdata) {
804         struct module_restart_data *rd = m->userdata;
805 
806         if (rd->restart_data)
807             pa_restart_free(rd->restart_data);
808 
809         pa_xfree(m->userdata);
810     }
811 }
812