• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /***
2     This file is part of PulseAudio.
3 
4     Copyright 2013 Alexander Couzens
5 
6     PulseAudio is free software; you can redistribute it and/or modify
7     it under the terms of the GNU Lesser General Public License as published
8     by the Free Software Foundation; either version 2.1 of the License,
9     or (at your option) any later version.
10 
11     PulseAudio is distributed in the hope that it will be useful, but
12     WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14     General Public License for more details.
15 
16     You should have received a copy of the GNU Lesser General Public License
17     along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
18 ***/
19 
20 #ifdef HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23 
24 #include "restart-module.h"
25 
26 #include <pulse/context.h>
27 #include <pulse/timeval.h>
28 #include <pulse/xmalloc.h>
29 #include <pulse/stream.h>
30 #include <pulse/mainloop.h>
31 #include <pulse/introspect.h>
32 #include <pulse/error.h>
33 
34 #include <pulsecore/core.h>
35 #include <pulsecore/core-util.h>
36 #include <pulsecore/i18n.h>
37 #include <pulsecore/sink.h>
38 #include <pulsecore/modargs.h>
39 #include <pulsecore/log.h>
40 #include <pulsecore/thread.h>
41 #include <pulsecore/thread-mq.h>
42 #include <pulsecore/poll.h>
43 #include <pulsecore/rtpoll.h>
44 #include <pulsecore/proplist-util.h>
45 
46 PA_MODULE_AUTHOR("Alexander Couzens");
47 PA_MODULE_DESCRIPTION("Create a network sink which connects via a stream to a remote PulseAudio server");
48 PA_MODULE_VERSION(PACKAGE_VERSION);
49 PA_MODULE_LOAD_ONCE(false);
50 PA_MODULE_USAGE(
51         "server=<address> "
52         "sink=<name of the remote sink> "
53         "sink_name=<name for the local sink> "
54         "sink_properties=<properties for the local sink> "
55         "reconnect_interval_ms=<interval to try reconnects, 0 or omitted if disabled> "
56         "format=<sample format> "
57         "channels=<number of channels> "
58         "rate=<sample rate> "
59         "channel_map=<channel map> "
60         "cookie=<cookie file path>"
61         );
62 
63 #define MAX_LATENCY_USEC (200 * PA_USEC_PER_MSEC)
64 #define TUNNEL_THREAD_FAILED_MAINLOOP 1
65 
66 static int do_init(pa_module *m);
67 static void do_done(pa_module *m);
68 static void stream_state_cb(pa_stream *stream, void *userdata);
69 static void stream_changed_buffer_attr_cb(pa_stream *stream, void *userdata);
70 static void stream_set_buffer_attr_cb(pa_stream *stream, int success, void *userdata);
71 static void context_state_cb(pa_context *c, void *userdata);
72 static void sink_update_requested_latency_cb(pa_sink *s);
73 
74 struct tunnel_msg {
75     pa_msgobject parent;
76 };
77 
78 typedef struct tunnel_msg tunnel_msg;
79 PA_DEFINE_PRIVATE_CLASS(tunnel_msg, pa_msgobject);
80 
81 enum {
82     TUNNEL_MESSAGE_CREATE_SINK_REQUEST,
83     TUNNEL_MESSAGE_MAYBE_RESTART,
84 };
85 
86 enum {
87     TUNNEL_MESSAGE_SINK_CREATED = PA_SINK_MESSAGE_MAX,
88 };
89 
90 struct userdata {
91     pa_module *module;
92     pa_sink *sink;
93     pa_thread *thread;
94     pa_thread_mq *thread_mq;
95     pa_mainloop *thread_mainloop;
96     pa_mainloop_api *thread_mainloop_api;
97 
98     pa_context *context;
99     pa_stream *stream;
100     pa_rtpoll *rtpoll;
101 
102     bool update_stream_bufferattr_after_connect;
103 
104     bool connected;
105     bool shutting_down;
106 
107     char *cookie_file;
108     char *remote_server;
109     char *remote_sink_name;
110     char *sink_name;
111 
112     pa_proplist *sink_proplist;
113     pa_sample_spec sample_spec;
114     pa_channel_map channel_map;
115 
116     tunnel_msg *msg;
117 
118     pa_usec_t reconnect_interval_us;
119 };
120 
121 struct module_restart_data {
122     struct userdata *userdata;
123     pa_restart_data *restart_data;
124 };
125 
126 static const char* const valid_modargs[] = {
127     "sink_name",
128     "sink_properties",
129     "server",
130     "sink",
131     "format",
132     "channels",
133     "rate",
134     "channel_map",
135     "cookie",
136     "reconnect_interval_ms",
137     NULL,
138 };
139 
cork_stream(struct userdata * u,bool cork)140 static void cork_stream(struct userdata *u, bool cork) {
141     pa_operation *operation;
142 
143     pa_assert(u);
144     pa_assert(u->stream);
145 
146     if (cork) {
147         /* When the sink becomes suspended (which is the only case where we
148          * cork the stream), we don't want to keep any old data around, because
149          * the old data is most likely unrelated to the audio that will be
150          * played at the time when the sink starts running again. */
151         if ((operation = pa_stream_flush(u->stream, NULL, NULL)))
152             pa_operation_unref(operation);
153     }
154 
155     if ((operation = pa_stream_cork(u->stream, cork, NULL, NULL)))
156         pa_operation_unref(operation);
157 }
158 
reset_bufferattr(pa_buffer_attr * bufferattr)159 static void reset_bufferattr(pa_buffer_attr *bufferattr) {
160     pa_assert(bufferattr);
161     bufferattr->fragsize = (uint32_t) -1;
162     bufferattr->minreq = (uint32_t) -1;
163     bufferattr->maxlength = (uint32_t) -1;
164     bufferattr->prebuf = (uint32_t) -1;
165     bufferattr->tlength = (uint32_t) -1;
166 }
167 
tunnel_new_proplist(struct userdata * u)168 static pa_proplist* tunnel_new_proplist(struct userdata *u) {
169     pa_proplist *proplist = pa_proplist_new();
170     pa_assert(proplist);
171     pa_proplist_sets(proplist, PA_PROP_APPLICATION_NAME, "PulseAudio");
172     pa_proplist_sets(proplist, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
173     pa_proplist_sets(proplist, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
174     pa_init_proplist(proplist);
175 
176     return proplist;
177 }
178 
thread_func(void * userdata)179 static void thread_func(void *userdata) {
180     struct userdata *u = userdata;
181     pa_proplist *proplist;
182 
183     pa_assert(u);
184 
185     pa_log_debug("Thread starting up");
186     pa_thread_mq_install(u->thread_mq);
187 
188     proplist = tunnel_new_proplist(u);
189     u->context = pa_context_new_with_proplist(u->thread_mainloop_api,
190                                               "PulseAudio",
191                                               proplist);
192     pa_proplist_free(proplist);
193 
194     if (!u->context) {
195         pa_log("Failed to create libpulse context");
196         goto fail;
197     }
198 
199     if (u->cookie_file && pa_context_load_cookie_from_file(u->context, u->cookie_file) != 0) {
200         pa_log_error("Can not load cookie file!");
201         goto fail;
202     }
203 
204     pa_context_set_state_callback(u->context, context_state_cb, u);
205     if (pa_context_connect(u->context,
206                            u->remote_server,
207                            PA_CONTEXT_NOAUTOSPAWN,
208                            NULL) < 0) {
209         pa_log("Failed to connect libpulse context: %s", pa_strerror(pa_context_errno(u->context)));
210         goto fail;
211     }
212 
213     for (;;) {
214         int ret;
215 
216         if (pa_mainloop_iterate(u->thread_mainloop, 1, &ret) < 0) {
217             if (ret == 0)
218                 goto finish;
219             else
220                 goto fail;
221         }
222 
223         if (u->sink && PA_UNLIKELY(u->sink->thread_info.rewind_requested))
224             pa_sink_process_rewind(u->sink, 0);
225 
226         if (u->connected &&
227                 pa_stream_get_state(u->stream) == PA_STREAM_READY &&
228                 PA_SINK_IS_LINKED(u->sink->thread_info.state)) {
229             size_t writable;
230 
231             writable = pa_stream_writable_size(u->stream);
232             if (writable > 0) {
233                 pa_memchunk memchunk;
234                 const void *p;
235 
236                 pa_sink_render_full(u->sink, writable, &memchunk);
237 
238                 pa_assert(memchunk.length > 0);
239 
240                 /* we have new data to write */
241                 p = pa_memblock_acquire(memchunk.memblock);
242                 /* TODO: Use pa_stream_begin_write() to reduce copying. */
243                 ret = pa_stream_write(u->stream,
244                                       (uint8_t*) p + memchunk.index,
245                                       memchunk.length,
246                                       NULL,     /**< A cleanup routine for the data or NULL to request an internal copy */
247                                       0,        /** offset */
248                                       PA_SEEK_RELATIVE);
249                 pa_memblock_release(memchunk.memblock);
250                 pa_memblock_unref(memchunk.memblock);
251 
252                 if (ret != 0) {
253                     pa_log_error("Could not write data into the stream ... ret = %i", ret);
254                     u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
255                 }
256 
257             }
258         }
259     }
260 fail:
261     /* send a message to the ctl thread to ask it to either terminate us, or
262      * restart us, but either way this thread will exit, so then wait for the
263      * shutdown message */
264     pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_MAYBE_RESTART, u, 0, NULL, NULL);
265     pa_asyncmsgq_wait_for(u->thread_mq->inq, PA_MESSAGE_SHUTDOWN);
266 
267 finish:
268     if (u->stream) {
269         pa_stream_disconnect(u->stream);
270         pa_stream_unref(u->stream);
271         u->stream = NULL;
272     }
273 
274     if (u->context) {
275         pa_context_disconnect(u->context);
276         pa_context_unref(u->context);
277         u->context = NULL;
278     }
279 
280     pa_log_debug("Thread shutting down");
281 }
282 
stream_state_cb(pa_stream * stream,void * userdata)283 static void stream_state_cb(pa_stream *stream, void *userdata) {
284     struct userdata *u = userdata;
285 
286     pa_assert(u);
287 
288     switch (pa_stream_get_state(stream)) {
289         case PA_STREAM_FAILED:
290             pa_log_error("Stream failed.");
291             u->connected = false;
292             u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
293             break;
294         case PA_STREAM_TERMINATED:
295             pa_log_debug("Stream terminated.");
296             break;
297         case PA_STREAM_READY:
298             if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
299                 cork_stream(u, false);
300 
301             /* Only call our requested_latency_cb when requested_latency
302              * changed between PA_STREAM_CREATING -> PA_STREAM_READY, because
303              * we don't want to override the initial tlength set by the server
304              * without a good reason. */
305             if (u->update_stream_bufferattr_after_connect)
306                 sink_update_requested_latency_cb(u->sink);
307             else
308                 stream_changed_buffer_attr_cb(stream, userdata);
309         case PA_STREAM_CREATING:
310         case PA_STREAM_UNCONNECTED:
311             break;
312     }
313 }
314 
315 /* called when remote server changes the stream buffer_attr */
stream_changed_buffer_attr_cb(pa_stream * stream,void * userdata)316 static void stream_changed_buffer_attr_cb(pa_stream *stream, void *userdata) {
317     struct userdata *u = userdata;
318     const pa_buffer_attr *bufferattr;
319     pa_assert(u);
320 
321     bufferattr = pa_stream_get_buffer_attr(u->stream);
322     pa_sink_set_max_request_within_thread(u->sink, bufferattr->tlength);
323 
324     pa_log_debug("Server reports buffer attrs changed. tlength now at %lu.",
325                  (unsigned long) bufferattr->tlength);
326 }
327 
328 /* called after we requested a change of the stream buffer_attr */
stream_set_buffer_attr_cb(pa_stream * stream,int success,void * userdata)329 static void stream_set_buffer_attr_cb(pa_stream *stream, int success, void *userdata) {
330     stream_changed_buffer_attr_cb(stream, userdata);
331 }
332 
333 /* called when the server experiences an underrun of our buffer */
stream_underflow_callback(pa_stream * stream,void * userdata)334 static void stream_underflow_callback(pa_stream *stream, void *userdata) {
335     pa_log_info("Server signalled buffer underrun.");
336 }
337 
338 /* called when the server experiences an overrun of our buffer */
stream_overflow_callback(pa_stream * stream,void * userdata)339 static void stream_overflow_callback(pa_stream *stream, void *userdata) {
340     pa_log_info("Server signalled buffer overrun.");
341 }
342 
343 /* Do a reinit of the module.  Note that u will be freed as a result of this
344  * call. */
maybe_restart(struct module_restart_data * rd)345 static void maybe_restart(struct module_restart_data *rd) {
346     struct userdata *u = rd->userdata;
347 
348     if (rd->restart_data) {
349         pa_log_debug("Restart already pending");
350         return;
351     }
352 
353     if (u->reconnect_interval_us > 0) {
354         /* The handle returned here must be freed when do_init() finishes successfully
355          * and when the module exits. */
356         rd->restart_data = pa_restart_module_reinit(u->module, do_init, do_done, u->reconnect_interval_us);
357     } else {
358         /* exit the module */
359         pa_module_unload_request(u->module, true);
360     }
361 }
362 
on_sink_created(struct userdata * u)363 static void on_sink_created(struct userdata *u) {
364     pa_proplist *proplist;
365     pa_buffer_attr bufferattr;
366     pa_usec_t requested_latency;
367     char *username = pa_get_user_name_malloc();
368     char *hostname = pa_get_host_name_malloc();
369     /* TODO: old tunnel put here the remote sink_name into stream name e.g. 'Null Output for lynxis@lazus' */
370     char *stream_name = pa_sprintf_malloc(_("Tunnel for %s@%s"), username, hostname);
371     pa_xfree(hostname);
372     pa_xfree(username);
373 
374     pa_assert_io_context();
375 
376     /* if we still don't have a sink, then sink creation failed, and we should
377      * kill this io thread */
378     if (!u->sink) {
379         pa_log_error("Could not create a sink.");
380         u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
381         return;
382     }
383 
384     proplist = tunnel_new_proplist(u);
385     u->stream = pa_stream_new_with_proplist(u->context,
386                                             stream_name,
387                                             &u->sink->sample_spec,
388                                             &u->sink->channel_map,
389                                             proplist);
390     pa_proplist_free(proplist);
391     pa_xfree(stream_name);
392 
393     if (!u->stream) {
394         pa_log_error("Could not create a stream.");
395         u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
396         return;
397     }
398 
399     requested_latency = pa_sink_get_requested_latency_within_thread(u->sink);
400     if (requested_latency == (pa_usec_t) -1)
401         requested_latency = u->sink->thread_info.max_latency;
402 
403     reset_bufferattr(&bufferattr);
404     bufferattr.tlength = pa_usec_to_bytes(requested_latency, &u->sink->sample_spec);
405 
406     pa_log_debug("tlength requested at %lu.", (unsigned long) bufferattr.tlength);
407 
408     pa_stream_set_state_callback(u->stream, stream_state_cb, u);
409     pa_stream_set_buffer_attr_callback(u->stream, stream_changed_buffer_attr_cb, u);
410     pa_stream_set_underflow_callback(u->stream, stream_underflow_callback, u);
411     pa_stream_set_overflow_callback(u->stream, stream_overflow_callback, u);
412     if (pa_stream_connect_playback(u->stream,
413                                    u->remote_sink_name,
414                                    &bufferattr,
415                                    PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_DONT_MOVE | PA_STREAM_START_CORKED | PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_ADJUST_LATENCY,
416                                    NULL,
417                                    NULL) < 0) {
418         pa_log_error("Could not connect stream.");
419         u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
420     }
421     u->connected = true;
422 }
423 
context_state_cb(pa_context * c,void * userdata)424 static void context_state_cb(pa_context *c, void *userdata) {
425     struct userdata *u = userdata;
426     pa_assert(u);
427 
428     switch (pa_context_get_state(c)) {
429         case PA_CONTEXT_UNCONNECTED:
430         case PA_CONTEXT_CONNECTING:
431         case PA_CONTEXT_AUTHORIZING:
432         case PA_CONTEXT_SETTING_NAME:
433             break;
434         case PA_CONTEXT_READY:
435             /* now that we're connected, ask the control thread to create a sink for
436              * us, and wait for that to complete before proceeding, we'll
437              * receive TUNNEL_MESSAGE_SINK_CREATED in response when the sink is
438              * created (see sink_process_msg_cb()) */
439             pa_log_debug("Connection successful. Creating stream.");
440             pa_assert(!u->stream);
441             pa_assert(!u->sink);
442 
443             pa_log_debug("Asking ctl thread to create sink.");
444             pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_CREATE_SINK_REQUEST, u, 0, NULL, NULL);
445             break;
446         case PA_CONTEXT_FAILED:
447             pa_log_debug("Context failed: %s.", pa_strerror(pa_context_errno(u->context)));
448             u->connected = false;
449             u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
450             break;
451         case PA_CONTEXT_TERMINATED:
452             pa_log_debug("Context terminated.");
453             u->connected = false;
454             u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
455             break;
456     }
457 }
458 
sink_update_requested_latency_cb(pa_sink * s)459 static void sink_update_requested_latency_cb(pa_sink *s) {
460     struct userdata *u;
461     pa_operation *operation;
462     size_t nbytes;
463     pa_usec_t block_usec;
464     pa_buffer_attr bufferattr;
465 
466     pa_sink_assert_ref(s);
467     pa_assert_se(u = s->userdata);
468 
469     block_usec = pa_sink_get_requested_latency_within_thread(s);
470     if (block_usec == (pa_usec_t) -1)
471         block_usec = s->thread_info.max_latency;
472 
473     nbytes = pa_usec_to_bytes(block_usec, &s->sample_spec);
474     pa_sink_set_max_request_within_thread(s, nbytes);
475 
476     if (u->stream) {
477         switch (pa_stream_get_state(u->stream)) {
478             case PA_STREAM_READY:
479                 if (pa_stream_get_buffer_attr(u->stream)->tlength == nbytes)
480                     break;
481 
482                 pa_log_debug("Requesting new buffer attrs. tlength requested at %lu.",
483                              (unsigned long) nbytes);
484 
485                 reset_bufferattr(&bufferattr);
486                 bufferattr.tlength = nbytes;
487                 if ((operation = pa_stream_set_buffer_attr(u->stream, &bufferattr, stream_set_buffer_attr_cb, u)))
488                     pa_operation_unref(operation);
489                 break;
490             case PA_STREAM_CREATING:
491                 /* we have to delay our request until stream is ready */
492                 u->update_stream_bufferattr_after_connect = true;
493                 break;
494             default:
495                 break;
496         }
497     }
498 }
499 
sink_process_msg_cb(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)500 static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
501     struct userdata *u = PA_SINK(o)->userdata;
502 
503     switch (code) {
504         case PA_SINK_MESSAGE_GET_LATENCY: {
505             int negative;
506             pa_usec_t remote_latency;
507 
508             if (!PA_SINK_IS_LINKED(u->sink->thread_info.state)) {
509                 *((int64_t*) data) = 0;
510                 return 0;
511             }
512 
513             if (!u->stream) {
514                 *((int64_t*) data) = 0;
515                 return 0;
516             }
517 
518             if (pa_stream_get_state(u->stream) != PA_STREAM_READY) {
519                 *((int64_t*) data) = 0;
520                 return 0;
521             }
522 
523             if (pa_stream_get_latency(u->stream, &remote_latency, &negative) < 0) {
524                 *((int64_t*) data) = 0;
525                 return 0;
526             }
527 
528             *((int64_t*) data) = remote_latency;
529             return 0;
530         }
531         case TUNNEL_MESSAGE_SINK_CREATED:
532             on_sink_created(u);
533             return 0;
534     }
535     return pa_sink_process_msg(o, code, data, offset, chunk);
536 }
537 
538 /* Called from the IO thread. */
sink_set_state_in_io_thread_cb(pa_sink * s,pa_sink_state_t new_state,pa_suspend_cause_t new_suspend_cause)539 static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state, pa_suspend_cause_t new_suspend_cause) {
540     struct userdata *u;
541 
542     pa_assert(s);
543     pa_assert_se(u = s->userdata);
544 
545     /* It may be that only the suspend cause is changing, in which case there's
546      * nothing to do. */
547     if (new_state == s->thread_info.state)
548         return 0;
549 
550     if (!u->stream || pa_stream_get_state(u->stream) != PA_STREAM_READY)
551         return 0;
552 
553     switch (new_state) {
554         case PA_SINK_SUSPENDED: {
555             cork_stream(u, true);
556             break;
557         }
558         case PA_SINK_IDLE:
559         case PA_SINK_RUNNING: {
560             cork_stream(u, false);
561             break;
562         }
563         case PA_SINK_INVALID_STATE:
564         case PA_SINK_INIT:
565         case PA_SINK_UNLINKED:
566             break;
567     }
568 
569     return 0;
570 }
571 
572 /* Creates a sink in the main thread.
573  *
574  * This method is called when we receive a message from the io thread that a
575  * connection has been established with the server.  We defer creation of the
576  * sink until the connection is established, because we don't have a sink if
577  * the remote server isn't there.
578  */
create_sink(struct userdata * u)579 static void create_sink(struct userdata *u) {
580     pa_sink_new_data sink_data;
581 
582     pa_assert_ctl_context();
583 
584     /* Create sink */
585     pa_sink_new_data_init(&sink_data);
586     sink_data.driver = __FILE__;
587     sink_data.module = u->module;
588 
589     pa_sink_new_data_set_name(&sink_data, u->sink_name);
590     pa_sink_new_data_set_sample_spec(&sink_data, &u->sample_spec);
591     pa_sink_new_data_set_channel_map(&sink_data, &u->channel_map);
592 
593     pa_proplist_update(sink_data.proplist, PA_UPDATE_REPLACE, u->sink_proplist);
594 
595     if (!(u->sink = pa_sink_new(u->module->core, &sink_data, PA_SINK_LATENCY | PA_SINK_DYNAMIC_LATENCY | PA_SINK_NETWORK))) {
596         pa_log("Failed to create sink.");
597         goto finish;
598     }
599 
600     u->sink->userdata = u;
601     u->sink->parent.process_msg = sink_process_msg_cb;
602     u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
603     u->sink->update_requested_latency = sink_update_requested_latency_cb;
604     pa_sink_set_latency_range(u->sink, 0, MAX_LATENCY_USEC);
605 
606     /* set thread message queue */
607     pa_sink_set_asyncmsgq(u->sink, u->thread_mq->inq);
608     pa_sink_set_rtpoll(u->sink, u->rtpoll);
609 
610     pa_sink_put(u->sink);
611 
612 finish:
613     pa_sink_new_data_done(&sink_data);
614 
615     /* tell any interested io threads that the sink they asked for has now been
616      * created (even if we failed, we still notify the thread, so they can
617      * either handle or kill the thread, rather than deadlock waiting for a
618      * message that will never come */
619     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), TUNNEL_MESSAGE_SINK_CREATED, u, 0, NULL);
620 }
621 
622 /* Runs in PA mainloop context */
tunnel_process_msg(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)623 static int tunnel_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
624     struct userdata *u = (struct userdata *) data;
625 
626     pa_assert(u);
627     pa_assert_ctl_context();
628 
629     if (u->shutting_down)
630         return 0;
631 
632     switch (code) {
633         case TUNNEL_MESSAGE_CREATE_SINK_REQUEST:
634             create_sink(u);
635             break;
636         case TUNNEL_MESSAGE_MAYBE_RESTART:
637             maybe_restart(u->module->userdata);
638             break;
639     }
640 
641     return 0;
642 }
643 
do_init(pa_module * m)644 static int do_init(pa_module *m) {
645     struct userdata *u = NULL;
646     struct module_restart_data *rd;
647     pa_modargs *ma = NULL;
648     const char *remote_server = NULL;
649     char *default_sink_name = NULL;
650     uint32_t reconnect_interval_ms = 0;
651 
652     pa_assert(m);
653     pa_assert(m->userdata);
654 
655     rd = m->userdata;
656 
657     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
658         pa_log("Failed to parse module arguments.");
659         goto fail;
660     }
661 
662     u = pa_xnew0(struct userdata, 1);
663     u->module = m;
664     rd->userdata = u;
665 
666     u->sample_spec = m->core->default_sample_spec;
667     u->channel_map = m->core->default_channel_map;
668     if (pa_modargs_get_sample_spec_and_channel_map(ma, &u->sample_spec, &u->channel_map, PA_CHANNEL_MAP_DEFAULT) < 0) {
669         pa_log("Invalid sample format specification or channel map");
670         goto fail;
671     }
672 
673     remote_server = pa_modargs_get_value(ma, "server", NULL);
674     if (!remote_server) {
675         pa_log("No server given!");
676         goto fail;
677     }
678 
679     u->remote_server = pa_xstrdup(remote_server);
680     u->thread_mainloop = pa_mainloop_new();
681     if (u->thread_mainloop == NULL) {
682         pa_log("Failed to create mainloop");
683         goto fail;
684     }
685     u->thread_mainloop_api = pa_mainloop_get_api(u->thread_mainloop);
686     u->cookie_file = pa_xstrdup(pa_modargs_get_value(ma, "cookie", NULL));
687     u->remote_sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
688 
689     u->thread_mq = pa_xnew0(pa_thread_mq, 1);
690 
691     if (pa_thread_mq_init_thread_mainloop(u->thread_mq, m->core->mainloop, u->thread_mainloop_api) < 0) {
692         pa_log("pa_thread_mq_init_thread_mainloop() failed.");
693         goto fail;
694     }
695 
696     u->msg = pa_msgobject_new(tunnel_msg);
697     u->msg->parent.process_msg = tunnel_process_msg;
698 
699     /* The rtpoll created here is never run. It is only necessary to avoid crashes
700      * when module-tunnel-sink-new is used together with module-loopback or
701      * module-combine-sink. Both modules base their asyncmsq on the rtpoll provided
702      * by the sink. module-loopback and combine-sink only work because they call
703      * pa_asyncmsq_process_one() themselves. module_rtp_recv also uses the rtpoll,
704      * but never calls pa_asyncmsq_process_one(), so it will not work in combination
705      * with module-tunnel-sink-new. */
706     u->rtpoll = pa_rtpoll_new();
707 
708     default_sink_name = pa_sprintf_malloc("tunnel-sink-new.%s", remote_server);
709     u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", default_sink_name));
710 
711     u->sink_proplist = pa_proplist_new();
712     pa_proplist_sets(u->sink_proplist, PA_PROP_DEVICE_CLASS, "sound");
713     pa_proplist_setf(u->sink_proplist,
714                      PA_PROP_DEVICE_DESCRIPTION,
715                      _("Tunnel to %s/%s"),
716                      remote_server,
717                      pa_strempty(u->remote_sink_name));
718 
719     if (pa_modargs_get_proplist(ma, "sink_properties", u->sink_proplist, PA_UPDATE_REPLACE) < 0) {
720         pa_log("Invalid properties");
721         goto fail;
722     }
723 
724     pa_modargs_get_value_u32(ma, "reconnect_interval_ms", &reconnect_interval_ms);
725     u->reconnect_interval_us = reconnect_interval_ms * PA_USEC_PER_MSEC;
726 
727     if (!(u->thread = pa_thread_new("tunnel-sink", thread_func, u))) {
728         pa_log("Failed to create thread.");
729         goto fail;
730     }
731 
732     /* If the module is restarting and do_init() finishes successfully, the
733      * restart data is no longer needed. If do_init() fails, don't touch the
734      * restart data, because following restart attempts will continue to use
735      * the same data. If restart_data is NULL, that means no restart is
736      * currently pending. */
737     if (rd->restart_data) {
738         pa_restart_free(rd->restart_data);
739         rd->restart_data = NULL;
740     }
741 
742     pa_modargs_free(ma);
743     pa_xfree(default_sink_name);
744 
745     return 0;
746 
747 fail:
748     if (ma)
749         pa_modargs_free(ma);
750 
751     if (default_sink_name)
752         pa_xfree(default_sink_name);
753 
754     return -1;
755 }
756 
do_done(pa_module * m)757 static void do_done(pa_module *m) {
758     struct userdata *u = NULL;
759     struct module_restart_data *rd;
760 
761     pa_assert(m);
762 
763     if (!(rd = m->userdata))
764         return;
765     if (!(u = rd->userdata))
766         return;
767 
768     u->shutting_down = true;
769 
770     if (u->sink)
771         pa_sink_unlink(u->sink);
772 
773     if (u->thread) {
774         pa_asyncmsgq_send(u->thread_mq->inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
775         pa_thread_free(u->thread);
776     }
777 
778     if (u->thread_mq) {
779         pa_thread_mq_done(u->thread_mq);
780         pa_xfree(u->thread_mq);
781     }
782 
783     if (u->thread_mainloop)
784         pa_mainloop_free(u->thread_mainloop);
785 
786     if (u->cookie_file)
787         pa_xfree(u->cookie_file);
788 
789     if (u->remote_sink_name)
790         pa_xfree(u->remote_sink_name);
791 
792     if (u->remote_server)
793         pa_xfree(u->remote_server);
794 
795     if (u->sink)
796         pa_sink_unref(u->sink);
797 
798     if (u->rtpoll)
799         pa_rtpoll_free(u->rtpoll);
800 
801     if (u->sink_proplist)
802         pa_proplist_free(u->sink_proplist);
803 
804     if (u->sink_name)
805         pa_xfree(u->sink_name);
806 
807     pa_xfree(u->msg);
808 
809     pa_xfree(u);
810 
811     rd->userdata = NULL;
812 }
813 
pa__init(pa_module * m)814 int pa__init(pa_module *m) {
815     int ret;
816 
817     pa_assert(m);
818 
819     m->userdata = pa_xnew0(struct module_restart_data, 1);
820 
821     ret = do_init(m);
822 
823     if (ret < 0)
824         pa__done(m);
825 
826     return ret;
827 }
828 
pa__done(pa_module * m)829 void pa__done(pa_module *m) {
830     pa_assert(m);
831 
832     do_done(m);
833 
834     if (m->userdata) {
835         struct module_restart_data *rd = m->userdata;
836 
837         if (rd->restart_data)
838             pa_restart_free(rd->restart_data);
839 
840         pa_xfree(m->userdata);
841     }
842 }
843