• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /***
2     This file is part of PulseAudio.
3 
4     Copyright 2013 Alexander Couzens
5 
6     PulseAudio is free software; you can redistribute it and/or modify
7     it under the terms of the GNU Lesser General Public License as published
8     by the Free Software Foundation; either version 2.1 of the License,
9     or (at your option) any later version.
10 
11     PulseAudio is distributed in the hope that it will be useful, but
12     WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14     General Public License for more details.
15 
16     You should have received a copy of the GNU Lesser General Public License
17     along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
18 ***/
19 
20 #ifdef HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23 
24 #include "restart-module.h"
25 
26 #include <pulse/context.h>
27 #include <pulse/timeval.h>
28 #include <pulse/xmalloc.h>
29 #include <pulse/stream.h>
30 #include <pulse/mainloop.h>
31 #include <pulse/introspect.h>
32 #include <pulse/error.h>
33 
34 #include <pulsecore/core.h>
35 #include <pulsecore/core-util.h>
36 #include <pulsecore/i18n.h>
37 #include <pulsecore/source.h>
38 #include <pulsecore/modargs.h>
39 #include <pulsecore/log.h>
40 #include <pulsecore/thread.h>
41 #include <pulsecore/thread-mq.h>
42 #include <pulsecore/poll.h>
43 #include <pulsecore/rtpoll.h>
44 #include <pulsecore/proplist-util.h>
45 
46 PA_MODULE_AUTHOR("Alexander Couzens");
47 PA_MODULE_DESCRIPTION("Create a network source which connects via a stream to a remote PulseAudio server");
48 PA_MODULE_VERSION(PACKAGE_VERSION);
49 PA_MODULE_LOAD_ONCE(false);
50 PA_MODULE_USAGE(
51         "server=<address> "
52         "source=<name of the remote source> "
53         "source_name=<name for the local source> "
54         "source_properties=<properties for the local source> "
55         "reconnect_interval_ms=<interval to try reconnects, 0 or omitted if disabled> "
56         "format=<sample format> "
57         "channels=<number of channels> "
58         "rate=<sample rate> "
59         "channel_map=<channel map> "
60         "cookie=<cookie file path>"
61         );
62 
63 #define TUNNEL_THREAD_FAILED_MAINLOOP 1
64 
65 static int do_init(pa_module *m);
66 static void do_done(pa_module *m);
67 static void stream_state_cb(pa_stream *stream, void *userdata);
68 static void stream_read_cb(pa_stream *s, size_t length, void *userdata);
69 static void context_state_cb(pa_context *c, void *userdata);
70 static void source_update_requested_latency_cb(pa_source *s);
71 
72 struct tunnel_msg {
73     pa_msgobject parent;
74 };
75 
76 typedef struct tunnel_msg tunnel_msg;
77 PA_DEFINE_PRIVATE_CLASS(tunnel_msg, pa_msgobject);
78 
79 enum {
80     TUNNEL_MESSAGE_CREATE_SOURCE_REQUEST,
81     TUNNEL_MESSAGE_MAYBE_RESTART,
82 };
83 
84 enum {
85     TUNNEL_MESSAGE_SOURCE_CREATED = PA_SOURCE_MESSAGE_MAX,
86 };
87 
88 struct userdata {
89     pa_module *module;
90     pa_source *source;
91     pa_thread *thread;
92     pa_thread_mq *thread_mq;
93     pa_mainloop *thread_mainloop;
94     pa_mainloop_api *thread_mainloop_api;
95 
96     pa_context *context;
97     pa_stream *stream;
98     pa_rtpoll *rtpoll;
99 
100     bool update_stream_bufferattr_after_connect;
101     bool connected;
102     bool shutting_down;
103     bool new_data;
104 
105     char *cookie_file;
106     char *remote_server;
107     char *remote_source_name;
108     char *source_name;
109 
110     pa_proplist *source_proplist;
111     pa_sample_spec sample_spec;
112     pa_channel_map channel_map;
113 
114     tunnel_msg *msg;
115 
116     pa_usec_t reconnect_interval_us;
117 };
118 
119 struct module_restart_data {
120     struct userdata *userdata;
121     pa_restart_data *restart_data;
122 };
123 
124 static const char* const valid_modargs[] = {
125     "source_name",
126     "source_properties",
127     "server",
128     "source",
129     "format",
130     "channels",
131     "rate",
132     "channel_map",
133     "cookie",
134     "reconnect_interval_ms",
135     NULL,
136 };
137 
cork_stream(struct userdata * u,bool cork)138 static void cork_stream(struct userdata *u, bool cork) {
139     pa_operation *operation;
140 
141     pa_assert(u);
142     pa_assert(u->stream);
143 
144     if ((operation = pa_stream_cork(u->stream, cork, NULL, NULL)))
145         pa_operation_unref(operation);
146 }
147 
reset_bufferattr(pa_buffer_attr * bufferattr)148 static void reset_bufferattr(pa_buffer_attr *bufferattr) {
149     pa_assert(bufferattr);
150     bufferattr->fragsize = (uint32_t) -1;
151     bufferattr->minreq = (uint32_t) -1;
152     bufferattr->maxlength = (uint32_t) -1;
153     bufferattr->prebuf = (uint32_t) -1;
154     bufferattr->tlength = (uint32_t) -1;
155 }
156 
tunnel_new_proplist(struct userdata * u)157 static pa_proplist* tunnel_new_proplist(struct userdata *u) {
158     pa_proplist *proplist = pa_proplist_new();
159     pa_assert(proplist);
160     pa_proplist_sets(proplist, PA_PROP_APPLICATION_NAME, "PulseAudio");
161     pa_proplist_sets(proplist, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
162     pa_proplist_sets(proplist, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
163     pa_init_proplist(proplist);
164 
165     return proplist;
166 }
167 
stream_read_cb(pa_stream * s,size_t length,void * userdata)168 static void stream_read_cb(pa_stream *s, size_t length, void *userdata) {
169     struct userdata *u = userdata;
170     u->new_data = true;
171 }
172 
173 /* called from io context to read samples from the stream into our source */
read_new_samples(struct userdata * u)174 static void read_new_samples(struct userdata *u) {
175     const void *p;
176     size_t readable = 0;
177     pa_memchunk memchunk;
178 
179     pa_assert(u);
180     u->new_data = false;
181 
182     pa_memchunk_reset(&memchunk);
183 
184     if (PA_UNLIKELY(!u->connected || pa_stream_get_state(u->stream) != PA_STREAM_READY))
185         return;
186 
187     readable = pa_stream_readable_size(u->stream);
188     while (readable > 0) {
189         size_t nbytes = 0;
190         if (PA_UNLIKELY(pa_stream_peek(u->stream, &p, &nbytes) != 0)) {
191             pa_log("pa_stream_peek() failed: %s", pa_strerror(pa_context_errno(u->context)));
192             u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
193             return;
194         }
195 
196         if (PA_LIKELY(p)) {
197             /* we have valid data */
198             memchunk.memblock = pa_memblock_new_fixed(u->module->core->mempool, (void *) p, nbytes, true);
199             memchunk.length = nbytes;
200             memchunk.index = 0;
201 
202             pa_source_post(u->source, &memchunk);
203             pa_memblock_unref_fixed(memchunk.memblock);
204         } else {
205             size_t bytes_to_generate = nbytes;
206 
207             /* we have a hole. generate silence */
208             memchunk = u->source->silence;
209             pa_memblock_ref(memchunk.memblock);
210 
211             while (bytes_to_generate > 0) {
212                 if (bytes_to_generate < memchunk.length)
213                     memchunk.length = bytes_to_generate;
214 
215                 pa_source_post(u->source, &memchunk);
216                 bytes_to_generate -= memchunk.length;
217             }
218 
219             pa_memblock_unref(memchunk.memblock);
220         }
221 
222         pa_stream_drop(u->stream);
223         readable -= nbytes;
224     }
225 }
226 
thread_func(void * userdata)227 static void thread_func(void *userdata) {
228     struct userdata *u = userdata;
229     pa_proplist *proplist;
230 
231     pa_assert(u);
232 
233     pa_log_debug("Thread starting up");
234     pa_thread_mq_install(u->thread_mq);
235 
236     proplist = tunnel_new_proplist(u);
237     u->context = pa_context_new_with_proplist(u->thread_mainloop_api,
238                                               "PulseAudio",
239                                               proplist);
240     pa_proplist_free(proplist);
241 
242     if (!u->context) {
243         pa_log("Failed to create libpulse context");
244         goto fail;
245     }
246 
247     if (u->cookie_file && pa_context_load_cookie_from_file(u->context, u->cookie_file) != 0) {
248         pa_log_error("Can not load cookie file!");
249         goto fail;
250     }
251 
252     pa_context_set_state_callback(u->context, context_state_cb, u);
253     if (pa_context_connect(u->context,
254                            u->remote_server,
255                            PA_CONTEXT_NOAUTOSPAWN,
256                            NULL) < 0) {
257         pa_log("Failed to connect libpulse context: %s", pa_strerror(pa_context_errno(u->context)));
258         goto fail;
259     }
260 
261     for (;;) {
262         int ret;
263 
264         if (pa_mainloop_iterate(u->thread_mainloop, 1, &ret) < 0) {
265             if (ret == 0)
266                 goto finish;
267             else
268                 goto fail;
269         }
270 
271         if (u->new_data)
272             read_new_samples(u);
273 
274         /* Run the rtpoll to process messages that other modules may have placed in the queue. */
275         pa_rtpoll_set_timer_relative(u->rtpoll, 0);
276         if (pa_rtpoll_run(u->rtpoll) < 0)
277             goto fail;
278     }
279 fail:
280     /* send a message to the ctl thread to ask it to either terminate us, or
281      * restart us, but either way this thread will exit, so then wait for the
282      * shutdown message */
283     pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_MAYBE_RESTART, u, 0, NULL, NULL);
284     pa_asyncmsgq_wait_for(u->thread_mq->inq, PA_MESSAGE_SHUTDOWN);
285 
286 finish:
287     if (u->stream) {
288         pa_stream_disconnect(u->stream);
289         pa_stream_unref(u->stream);
290         u->stream = NULL;
291     }
292 
293     if (u->context) {
294         pa_context_disconnect(u->context);
295         pa_context_unref(u->context);
296         u->context = NULL;
297     }
298 
299     pa_log_debug("Thread shutting down");
300 }
301 
stream_state_cb(pa_stream * stream,void * userdata)302 static void stream_state_cb(pa_stream *stream, void *userdata) {
303     struct userdata *u = userdata;
304 
305     pa_assert(u);
306 
307     switch (pa_stream_get_state(stream)) {
308         case PA_STREAM_FAILED:
309             pa_log_error("Stream failed: %s", pa_strerror(pa_context_errno(u->context)));
310             u->connected = false;
311             u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
312             break;
313         case PA_STREAM_TERMINATED:
314             pa_log_debug("Stream terminated.");
315             break;
316         case PA_STREAM_READY:
317             if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
318                 cork_stream(u, false);
319 
320             /* Only call our requested_latency_cb when requested_latency
321              * changed between PA_STREAM_CREATING -> PA_STREAM_READY, because
322              * we don't want to override the initial fragsize set by the server
323              * without a good reason. */
324             if (u->update_stream_bufferattr_after_connect)
325                 source_update_requested_latency_cb(u->source);
326         case PA_STREAM_UNCONNECTED:
327         case PA_STREAM_CREATING:
328             break;
329     }
330 }
331 
332 /* Do a reinit of the module.  Note that u will be freed as a result of this
333  * call. */
maybe_restart(struct module_restart_data * rd)334 static void maybe_restart(struct module_restart_data *rd) {
335     struct userdata *u = rd->userdata;
336 
337     if (rd->restart_data) {
338         pa_log_debug("Restart already pending");
339         return;
340     }
341 
342     if (u->reconnect_interval_us > 0) {
343         /* The handle returned here must be freed when do_init() finishes successfully
344          * and when the module exits. */
345         rd->restart_data = pa_restart_module_reinit(u->module, do_init, do_done, u->reconnect_interval_us);
346     } else {
347         /* exit the module */
348         pa_module_unload_request(u->module, true);
349     }
350 }
351 
on_source_created(struct userdata * u)352 static void on_source_created(struct userdata *u) {
353     pa_proplist *proplist;
354     pa_buffer_attr bufferattr;
355     pa_usec_t requested_latency;
356     char *username = pa_get_user_name_malloc();
357     char *hostname = pa_get_host_name_malloc();
358     /* TODO: old tunnel put here the remote source_name into stream name e.g. 'Null Output for lynxis@lazus' */
359     char *stream_name = pa_sprintf_malloc(_("Tunnel for %s@%s"), username, hostname);
360     pa_xfree(username);
361     pa_xfree(hostname);
362 
363     pa_assert_io_context();
364 
365     /* if we still don't have a source, then source creation failed, and we
366      * should kill this io thread */
367     if (!u->source) {
368         pa_log_error("Could not create a source.");
369         u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
370         return;
371     }
372 
373     proplist = tunnel_new_proplist(u);
374     u->stream = pa_stream_new_with_proplist(u->context,
375                                             stream_name,
376                                             &u->source->sample_spec,
377                                             &u->source->channel_map,
378                                             proplist);
379     pa_proplist_free(proplist);
380     pa_xfree(stream_name);
381 
382     if (!u->stream) {
383         pa_log_error("Could not create a stream: %s", pa_strerror(pa_context_errno(u->context)));
384         u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
385         return;
386     }
387 
388     requested_latency = pa_source_get_requested_latency_within_thread(u->source);
389     if (requested_latency == (uint32_t) -1)
390         requested_latency = u->source->thread_info.max_latency;
391 
392     reset_bufferattr(&bufferattr);
393     bufferattr.fragsize = pa_usec_to_bytes(requested_latency, &u->source->sample_spec);
394 
395     pa_stream_set_state_callback(u->stream, stream_state_cb, u);
396     pa_stream_set_read_callback(u->stream, stream_read_cb, u);
397     if (pa_stream_connect_record(u->stream,
398                                  u->remote_source_name,
399                                  &bufferattr,
400                                  PA_STREAM_INTERPOLATE_TIMING|PA_STREAM_DONT_MOVE|PA_STREAM_AUTO_TIMING_UPDATE|PA_STREAM_START_CORKED|PA_STREAM_ADJUST_LATENCY) < 0) {
401         pa_log_debug("Could not create stream: %s", pa_strerror(pa_context_errno(u->context)));
402         u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
403     }
404     u->connected = true;
405 }
406 
context_state_cb(pa_context * c,void * userdata)407 static void context_state_cb(pa_context *c, void *userdata) {
408     struct userdata *u = userdata;
409     pa_assert(u);
410 
411     switch (pa_context_get_state(c)) {
412         case PA_CONTEXT_UNCONNECTED:
413         case PA_CONTEXT_CONNECTING:
414         case PA_CONTEXT_AUTHORIZING:
415         case PA_CONTEXT_SETTING_NAME:
416             break;
417         case PA_CONTEXT_READY:
418             pa_log_debug("Connection successful. Creating stream.");
419             pa_assert(!u->stream);
420             pa_assert(!u->source);
421 
422             pa_log_debug("Asking ctl thread to create source.");
423             pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_CREATE_SOURCE_REQUEST, u, 0, NULL, NULL);
424             break;
425         case PA_CONTEXT_FAILED:
426             pa_log_debug("Context failed with err %s.", pa_strerror(pa_context_errno(u->context)));
427             u->connected = false;
428             u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
429             break;
430         case PA_CONTEXT_TERMINATED:
431             pa_log_debug("Context terminated.");
432             u->connected = false;
433             u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
434             break;
435     }
436 }
437 
source_update_requested_latency_cb(pa_source * s)438 static void source_update_requested_latency_cb(pa_source *s) {
439     struct userdata *u;
440     pa_operation *operation;
441     size_t nbytes;
442     pa_usec_t block_usec;
443     pa_buffer_attr bufferattr;
444 
445     pa_source_assert_ref(s);
446     pa_assert_se(u = s->userdata);
447 
448     block_usec = pa_source_get_requested_latency_within_thread(s);
449     if (block_usec == (pa_usec_t) -1)
450         block_usec = s->thread_info.max_latency;
451 
452     nbytes = pa_usec_to_bytes(block_usec, &s->sample_spec);
453 
454     if (u->stream) {
455         switch (pa_stream_get_state(u->stream)) {
456             case PA_STREAM_READY:
457                 if (pa_stream_get_buffer_attr(u->stream)->fragsize == nbytes)
458                     break;
459 
460                 reset_bufferattr(&bufferattr);
461                 bufferattr.fragsize = nbytes;
462                 if ((operation = pa_stream_set_buffer_attr(u->stream, &bufferattr, NULL, NULL)))
463                     pa_operation_unref(operation);
464                 break;
465             case PA_STREAM_CREATING:
466                 /* we have to delay our request until stream is ready */
467                 u->update_stream_bufferattr_after_connect = true;
468                 break;
469             default:
470                 break;
471         }
472     }
473 }
474 
source_process_msg_cb(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)475 static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
476     struct userdata *u = PA_SOURCE(o)->userdata;
477 
478     switch (code) {
479         case PA_SOURCE_MESSAGE_GET_LATENCY: {
480             int negative;
481             pa_usec_t remote_latency;
482 
483             if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state)) {
484                 *((int64_t*) data) = 0;
485                 return 0;
486             }
487 
488             if (!u->stream) {
489                 *((int64_t*) data) = 0;
490                 return 0;
491             }
492 
493             if (pa_stream_get_state(u->stream) != PA_STREAM_READY) {
494                 *((int64_t*) data) = 0;
495                 return 0;
496             }
497 
498             if (pa_stream_get_latency(u->stream, &remote_latency, &negative) < 0) {
499                 *((int64_t*) data) = 0;
500                 return 0;
501             }
502 
503             if (negative)
504                 *((int64_t*) data) = - (int64_t)remote_latency;
505             else
506                 *((int64_t*) data) = remote_latency;
507 
508             return 0;
509         }
510         case TUNNEL_MESSAGE_SOURCE_CREATED:
511             on_source_created(u);
512             return 0;
513     }
514     return pa_source_process_msg(o, code, data, offset, chunk);
515 }
516 
517 /* Called from the IO thread. */
source_set_state_in_io_thread_cb(pa_source * s,pa_source_state_t new_state,pa_suspend_cause_t new_suspend_cause)518 static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state, pa_suspend_cause_t new_suspend_cause) {
519     struct userdata *u;
520 
521     pa_assert(s);
522     pa_assert_se(u = s->userdata);
523 
524     /* It may be that only the suspend cause is changing, in which case there's
525      * nothing to do. */
526     if (new_state == s->thread_info.state)
527         return 0;
528 
529     if (!u->stream || pa_stream_get_state(u->stream) != PA_STREAM_READY)
530         return 0;
531 
532     switch (new_state) {
533         case PA_SOURCE_SUSPENDED: {
534             cork_stream(u, true);
535             break;
536         }
537         case PA_SOURCE_IDLE:
538         case PA_SOURCE_RUNNING: {
539             cork_stream(u, false);
540             break;
541         }
542         case PA_SOURCE_INVALID_STATE:
543         case PA_SOURCE_INIT:
544         case PA_SOURCE_UNLINKED:
545             break;
546     }
547 
548     return 0;
549 }
550 
551 /* Creates a source in the main thread.
552  *
553  * This method is called when we receive a message from the io thread that a
554  * connection has been established with the server.  We defer creation of the
555  * source until the connection is established, because we don't have a source
556  * if the remote server isn't there.
557  */
create_source(struct userdata * u)558 static void create_source(struct userdata *u) {
559     pa_source_new_data source_data;
560 
561     pa_assert_ctl_context();
562 
563     /* Create source */
564     pa_source_new_data_init(&source_data);
565     source_data.driver = __FILE__;
566     source_data.module = u->module;
567 
568     pa_source_new_data_set_name(&source_data, u->source_name);
569     pa_source_new_data_set_sample_spec(&source_data, &u->sample_spec);
570     pa_source_new_data_set_channel_map(&source_data, &u->channel_map);
571 
572     pa_proplist_update(source_data.proplist, PA_UPDATE_REPLACE, u->source_proplist);
573 
574     if (!(u->source = pa_source_new(u->module->core, &source_data, PA_SOURCE_LATENCY | PA_SOURCE_DYNAMIC_LATENCY | PA_SOURCE_NETWORK))) {
575         pa_log("Failed to create source.");
576         goto finish;
577     }
578 
579     u->source->userdata = u;
580     u->source->parent.process_msg = source_process_msg_cb;
581     u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb;
582     u->source->update_requested_latency = source_update_requested_latency_cb;
583 
584     pa_source_set_asyncmsgq(u->source, u->thread_mq->inq);
585     pa_source_set_rtpoll(u->source, u->rtpoll);
586 
587     pa_source_put(u->source);
588 
589 finish:
590     pa_source_new_data_done(&source_data);
591 
592     /* tell any interested io threads that the sink they asked for has now been
593      * created (even if we failed, we still notify the thread, so they can
594      * either handle or kill the thread, rather than deadlock waiting for a
595      * message that will never come */
596     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), TUNNEL_MESSAGE_SOURCE_CREATED, u, 0, NULL);
597 }
598 
599 /* Runs in PA mainloop context */
tunnel_process_msg(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)600 static int tunnel_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
601     struct userdata *u = (struct userdata *) data;
602 
603     pa_assert(u);
604     pa_assert_ctl_context();
605 
606     if (u->shutting_down)
607         return 0;
608 
609     switch (code) {
610         case TUNNEL_MESSAGE_CREATE_SOURCE_REQUEST:
611             create_source(u);
612             break;
613         case TUNNEL_MESSAGE_MAYBE_RESTART:
614             maybe_restart(u->module->userdata);
615             break;
616     }
617 
618     return 0;
619 }
620 
do_init(pa_module * m)621 static int do_init(pa_module *m) {
622     struct userdata *u = NULL;
623     struct module_restart_data *rd;
624     pa_modargs *ma = NULL;
625     const char *remote_server = NULL;
626     char *default_source_name = NULL;
627     uint32_t reconnect_interval_ms = 0;
628 
629     pa_assert(m);
630     pa_assert(m->userdata);
631 
632     rd = m->userdata;
633 
634     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
635         pa_log("Failed to parse module arguments.");
636         goto fail;
637     }
638 
639     u = pa_xnew0(struct userdata, 1);
640     u->module = m;
641     rd->userdata = u;
642 
643     u->sample_spec = m->core->default_sample_spec;
644     u->channel_map = m->core->default_channel_map;
645     if (pa_modargs_get_sample_spec_and_channel_map(ma, &u->sample_spec, &u->channel_map, PA_CHANNEL_MAP_DEFAULT) < 0) {
646         pa_log("Invalid sample format specification or channel map");
647         goto fail;
648     }
649 
650     remote_server = pa_modargs_get_value(ma, "server", NULL);
651     if (!remote_server) {
652         pa_log("No server given!");
653         goto fail;
654     }
655 
656     u->remote_server = pa_xstrdup(remote_server);
657     u->thread_mainloop = pa_mainloop_new();
658     if (u->thread_mainloop == NULL) {
659         pa_log("Failed to create mainloop");
660         goto fail;
661     }
662     u->thread_mainloop_api = pa_mainloop_get_api(u->thread_mainloop);
663     u->cookie_file = pa_xstrdup(pa_modargs_get_value(ma, "cookie", NULL));
664     u->remote_source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));
665 
666     u->thread_mq = pa_xnew0(pa_thread_mq, 1);
667 
668     if (pa_thread_mq_init_thread_mainloop(u->thread_mq, m->core->mainloop, u->thread_mainloop_api) < 0) {
669         pa_log("pa_thread_mq_init_thread_mainloop() failed.");
670         goto fail;
671     }
672 
673     u->msg = pa_msgobject_new(tunnel_msg);
674     u->msg->parent.process_msg = tunnel_process_msg;
675 
676     /* The rtpoll created here must curently only exist to avoid crashes when
677      * the module is used together with module-loopback. Because module-loopback
678      * runs pa_asyncmsgq_process_one() from the pop callback, the rtpoll need not
679      * be run. We will do so anyway for potential modules similar to
680      * module-combine-sink that use the rtpoll of the underlying source for
681      * message exchange. */
682     u->rtpoll = pa_rtpoll_new();
683 
684     default_source_name = pa_sprintf_malloc("tunnel-source-new.%s", remote_server);
685     u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source_name", default_source_name));
686 
687     u->source_proplist = pa_proplist_new();
688     pa_proplist_sets(u->source_proplist, PA_PROP_DEVICE_CLASS, "sound");
689     pa_proplist_setf(u->source_proplist,
690                      PA_PROP_DEVICE_DESCRIPTION,
691                      _("Tunnel to %s/%s"),
692                      remote_server,
693                      pa_strempty(u->remote_source_name));
694 
695     if (pa_modargs_get_proplist(ma, "source_properties", u->source_proplist, PA_UPDATE_REPLACE) < 0) {
696         pa_log("Invalid properties");
697         goto fail;
698     }
699 
700     pa_modargs_get_value_u32(ma, "reconnect_interval_ms", &reconnect_interval_ms);
701     u->reconnect_interval_us = reconnect_interval_ms * PA_USEC_PER_MSEC;
702 
703     if (!(u->thread = pa_thread_new("tunnel-source", thread_func, u))) {
704         pa_log("Failed to create thread.");
705         goto fail;
706     }
707 
708     /* If the module is restarting and do_init() finishes successfully, the
709      * restart data is no longer needed. If do_init() fails, don't touch the
710      * restart data, because following restart attempts will continue to use
711      * the same data. If restart_data is NULL, that means no restart is
712      * currently pending. */
713     if (rd->restart_data) {
714         pa_restart_free(rd->restart_data);
715         rd->restart_data = NULL;
716     }
717 
718     pa_modargs_free(ma);
719     pa_xfree(default_source_name);
720 
721     return 0;
722 
723 fail:
724     if (ma)
725         pa_modargs_free(ma);
726 
727     if (default_source_name)
728         pa_xfree(default_source_name);
729 
730     return -1;
731 }
732 
do_done(pa_module * m)733 static void do_done(pa_module *m) {
734     struct userdata *u = NULL;
735     struct module_restart_data *rd;
736 
737     pa_assert(m);
738 
739     if (!(rd = m->userdata))
740         return;
741     if (!(u = rd->userdata))
742         return;
743 
744     u->shutting_down = true;
745 
746     if (u->source)
747         pa_source_unlink(u->source);
748 
749     if (u->thread) {
750         pa_asyncmsgq_send(u->thread_mq->inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
751         pa_thread_free(u->thread);
752     }
753 
754     if (u->thread_mq) {
755         pa_thread_mq_done(u->thread_mq);
756         pa_xfree(u->thread_mq);
757     }
758 
759     if (u->thread_mainloop)
760         pa_mainloop_free(u->thread_mainloop);
761 
762     if (u->cookie_file)
763         pa_xfree(u->cookie_file);
764 
765     if (u->remote_source_name)
766         pa_xfree(u->remote_source_name);
767 
768     if (u->remote_server)
769         pa_xfree(u->remote_server);
770 
771     if (u->source)
772         pa_source_unref(u->source);
773 
774     if (u->rtpoll)
775         pa_rtpoll_free(u->rtpoll);
776 
777     if (u->source_proplist)
778         pa_proplist_free(u->source_proplist);
779 
780     if (u->source_name)
781         pa_xfree(u->source_name);
782 
783     pa_xfree(u->msg);
784 
785     pa_xfree(u);
786 
787     rd->userdata = NULL;
788 }
789 
pa__init(pa_module * m)790 int pa__init(pa_module *m) {
791     int ret;
792 
793     pa_assert(m);
794 
795     m->userdata = pa_xnew0(struct module_restart_data, 1);
796 
797     ret = do_init(m);
798 
799     if (ret < 0)
800         pa__done(m);
801 
802     return ret;
803 }
804 
pa__done(pa_module * m)805 void pa__done(pa_module *m) {
806     pa_assert(m);
807 
808     do_done(m);
809 
810     if (m->userdata) {
811         struct module_restart_data *rd = m->userdata;
812 
813         if (rd->restart_data)
814             pa_restart_free(rd->restart_data);
815 
816         pa_xfree(m->userdata);
817     }
818 }
819