• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /***
2     This file is part of PulseAudio.
3 
4     Copyright 2013 Alexander Couzens
5 
6     PulseAudio is free software; you can redistribute it and/or modify
7     it under the terms of the GNU Lesser General Public License as published
8     by the Free Software Foundation; either version 2.1 of the License,
9     or (at your option) any later version.
10 
11     PulseAudio is distributed in the hope that it will be useful, but
12     WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14     General Public License for more details.
15 
16     You should have received a copy of the GNU Lesser General Public License
17     along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
18 ***/
19 
20 #ifdef HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23 
24 #include "restart-module.h"
25 
26 #include <pulse/context.h>
27 #include <pulse/timeval.h>
28 #include <pulse/xmalloc.h>
29 #include <pulse/stream.h>
30 #include <pulse/mainloop.h>
31 #include <pulse/introspect.h>
32 #include <pulse/error.h>
33 
34 #include <pulsecore/core.h>
35 #include <pulsecore/core-util.h>
36 #include <pulsecore/i18n.h>
37 #include <pulsecore/sink.h>
38 #include <pulsecore/modargs.h>
39 #include <pulsecore/log.h>
40 #include <pulsecore/thread.h>
41 #include <pulsecore/thread-mq.h>
42 #include <pulsecore/poll.h>
43 #include <pulsecore/rtpoll.h>
44 #include <pulsecore/proplist-util.h>
45 
46 PA_MODULE_AUTHOR("Alexander Couzens");
47 PA_MODULE_DESCRIPTION("Create a network sink which connects via a stream to a remote PulseAudio server");
48 PA_MODULE_VERSION(PACKAGE_VERSION);
49 PA_MODULE_LOAD_ONCE(false);
50 PA_MODULE_USAGE(
51         "server=<address> "
52         "sink=<name of the remote sink> "
53         "sink_name=<name for the local sink> "
54         "sink_properties=<properties for the local sink> "
55         "reconnect_interval_ms=<interval to try reconnects, 0 or omitted if disabled> "
56         "format=<sample format> "
57         "channels=<number of channels> "
58         "rate=<sample rate> "
59         "channel_map=<channel map> "
60         "cookie=<cookie file path>"
61         );
62 
63 #define MAX_LATENCY_USEC (200 * PA_USEC_PER_MSEC)
64 #define TUNNEL_THREAD_FAILED_MAINLOOP 1
65 
66 static int do_init(pa_module *m);
67 static void do_done(pa_module *m);
68 static void stream_state_cb(pa_stream *stream, void *userdata);
69 static void stream_changed_buffer_attr_cb(pa_stream *stream, void *userdata);
70 static void stream_set_buffer_attr_cb(pa_stream *stream, int success, void *userdata);
71 static void context_state_cb(pa_context *c, void *userdata);
72 static void sink_update_requested_latency_cb(pa_sink *s);
73 
74 struct tunnel_msg {
75     pa_msgobject parent;
76 };
77 
78 typedef struct tunnel_msg tunnel_msg;
79 PA_DEFINE_PRIVATE_CLASS(tunnel_msg, pa_msgobject);
80 
81 enum {
82     TUNNEL_MESSAGE_CREATE_SINK_REQUEST,
83     TUNNEL_MESSAGE_MAYBE_RESTART,
84 };
85 
86 enum {
87     TUNNEL_MESSAGE_SINK_CREATED = PA_SINK_MESSAGE_MAX,
88 };
89 
90 struct userdata {
91     pa_module *module;
92     pa_sink *sink;
93     pa_thread *thread;
94     pa_thread_mq *thread_mq;
95     pa_mainloop *thread_mainloop;
96     pa_mainloop_api *thread_mainloop_api;
97 
98     pa_context *context;
99     pa_stream *stream;
100     pa_rtpoll *rtpoll;
101 
102     bool update_stream_bufferattr_after_connect;
103 
104     bool connected;
105     bool shutting_down;
106 
107     char *cookie_file;
108     char *remote_server;
109     char *remote_sink_name;
110     char *sink_name;
111 
112     pa_proplist *sink_proplist;
113     pa_sample_spec sample_spec;
114     pa_channel_map channel_map;
115 
116     tunnel_msg *msg;
117 
118     pa_usec_t reconnect_interval_us;
119 };
120 
121 struct module_restart_data {
122     struct userdata *userdata;
123     pa_restart_data *restart_data;
124 };
125 
126 static const char* const valid_modargs[] = {
127     "sink_name",
128     "sink_properties",
129     "server",
130     "sink",
131     "format",
132     "channels",
133     "rate",
134     "channel_map",
135     "cookie",
136     "reconnect_interval_ms",
137     NULL,
138 };
139 
cork_stream(struct userdata * u,bool cork)140 static void cork_stream(struct userdata *u, bool cork) {
141     pa_operation *operation;
142 
143     pa_assert(u);
144     pa_assert(u->stream);
145 
146     if (cork) {
147         /* When the sink becomes suspended (which is the only case where we
148          * cork the stream), we don't want to keep any old data around, because
149          * the old data is most likely unrelated to the audio that will be
150          * played at the time when the sink starts running again. */
151         if ((operation = pa_stream_flush(u->stream, NULL, NULL)))
152             pa_operation_unref(operation);
153     }
154 
155     if ((operation = pa_stream_cork(u->stream, cork, NULL, NULL)))
156         pa_operation_unref(operation);
157 }
158 
reset_bufferattr(pa_buffer_attr * bufferattr)159 static void reset_bufferattr(pa_buffer_attr *bufferattr) {
160     pa_assert(bufferattr);
161     bufferattr->fragsize = (uint32_t) -1;
162     bufferattr->minreq = (uint32_t) -1;
163     bufferattr->maxlength = (uint32_t) -1;
164     bufferattr->prebuf = (uint32_t) -1;
165     bufferattr->tlength = (uint32_t) -1;
166 }
167 
tunnel_new_proplist(struct userdata * u)168 static pa_proplist* tunnel_new_proplist(struct userdata *u) {
169     pa_proplist *proplist = pa_proplist_new();
170     pa_assert(proplist);
171     pa_proplist_sets(proplist, PA_PROP_APPLICATION_NAME, "PulseAudio");
172     pa_proplist_sets(proplist, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
173     pa_proplist_sets(proplist, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
174     pa_init_proplist(proplist);
175 
176     return proplist;
177 }
178 
thread_func(void * userdata)179 static void thread_func(void *userdata) {
180     struct userdata *u = userdata;
181     pa_proplist *proplist;
182 
183     pa_assert(u);
184 
185     pa_log_debug("Thread starting up");
186     pa_thread_mq_install(u->thread_mq);
187 
188     proplist = tunnel_new_proplist(u);
189     u->context = pa_context_new_with_proplist(u->thread_mainloop_api,
190                                               "PulseAudio",
191                                               proplist);
192     pa_proplist_free(proplist);
193 
194     if (!u->context) {
195         pa_log("Failed to create libpulse context");
196         goto fail;
197     }
198 
199     if (u->cookie_file && pa_context_load_cookie_from_file(u->context, u->cookie_file) != 0) {
200         pa_log_error("Can not load cookie file!");
201         goto fail;
202     }
203 
204     pa_context_set_state_callback(u->context, context_state_cb, u);
205     if (pa_context_connect(u->context,
206                            u->remote_server,
207                            PA_CONTEXT_NOAUTOSPAWN,
208                            NULL) < 0) {
209         pa_log("Failed to connect libpulse context: %s", pa_strerror(pa_context_errno(u->context)));
210         goto fail;
211     }
212 
213     for (;;) {
214         int ret;
215 
216         if (pa_mainloop_iterate(u->thread_mainloop, 1, &ret) < 0) {
217             if (ret == 0)
218                 goto finish;
219             else
220                 goto fail;
221         }
222 
223         if (u->sink && PA_UNLIKELY(u->sink->thread_info.rewind_requested))
224             pa_sink_process_rewind(u->sink, 0);
225 
226         if (u->connected &&
227                 pa_stream_get_state(u->stream) == PA_STREAM_READY &&
228                 PA_SINK_IS_LINKED(u->sink->thread_info.state)) {
229             size_t writable;
230 
231             writable = pa_stream_writable_size(u->stream);
232             if (writable > 0) {
233                 pa_memchunk memchunk;
234                 const void *p;
235 
236                 pa_sink_render_full(u->sink, writable, &memchunk);
237 
238                 pa_assert(memchunk.length > 0);
239 
240                 /* we have new data to write */
241                 p = pa_memblock_acquire(memchunk.memblock);
242                 /* TODO: Use pa_stream_begin_write() to reduce copying. */
243                 ret = pa_stream_write(u->stream,
244                                       (uint8_t*) p + memchunk.index,
245                                       memchunk.length,
246                                       NULL,     /**< A cleanup routine for the data or NULL to request an internal copy */
247                                       0,        /** offset */
248                                       PA_SEEK_RELATIVE);
249                 pa_memblock_release(memchunk.memblock);
250                 pa_memblock_unref(memchunk.memblock);
251 
252                 if (ret != 0) {
253                     pa_log_error("Could not write data into the stream ... ret = %i", ret);
254                     u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
255                 }
256             }
257         }
258 
259         /* Run the rtpoll to process messages that other modules (module-combine-sink,
260          * module-loopback and module-rtp-recv) may have placed in the queue. */
261         pa_rtpoll_set_timer_relative(u->rtpoll, 0);
262         if (pa_rtpoll_run(u->rtpoll) < 0)
263             goto fail;
264     }
265 fail:
266     /* send a message to the ctl thread to ask it to either terminate us, or
267      * restart us, but either way this thread will exit, so then wait for the
268      * shutdown message */
269     pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_MAYBE_RESTART, u, 0, NULL, NULL);
270     pa_asyncmsgq_wait_for(u->thread_mq->inq, PA_MESSAGE_SHUTDOWN);
271 
272 finish:
273     if (u->stream) {
274         pa_stream_disconnect(u->stream);
275         pa_stream_unref(u->stream);
276         u->stream = NULL;
277     }
278 
279     if (u->context) {
280         pa_context_disconnect(u->context);
281         pa_context_unref(u->context);
282         u->context = NULL;
283     }
284 
285     pa_log_debug("Thread shutting down");
286 }
287 
stream_state_cb(pa_stream * stream,void * userdata)288 static void stream_state_cb(pa_stream *stream, void *userdata) {
289     struct userdata *u = userdata;
290 
291     pa_assert(u);
292 
293     switch (pa_stream_get_state(stream)) {
294         case PA_STREAM_FAILED:
295             pa_log_error("Stream failed.");
296             u->connected = false;
297             u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
298             break;
299         case PA_STREAM_TERMINATED:
300             pa_log_debug("Stream terminated.");
301             break;
302         case PA_STREAM_READY:
303             if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
304                 cork_stream(u, false);
305 
306             /* Only call our requested_latency_cb when requested_latency
307              * changed between PA_STREAM_CREATING -> PA_STREAM_READY, because
308              * we don't want to override the initial tlength set by the server
309              * without a good reason. */
310             if (u->update_stream_bufferattr_after_connect)
311                 sink_update_requested_latency_cb(u->sink);
312             else
313                 stream_changed_buffer_attr_cb(stream, userdata);
314         case PA_STREAM_CREATING:
315         case PA_STREAM_UNCONNECTED:
316             break;
317     }
318 }
319 
320 /* called when remote server changes the stream buffer_attr */
stream_changed_buffer_attr_cb(pa_stream * stream,void * userdata)321 static void stream_changed_buffer_attr_cb(pa_stream *stream, void *userdata) {
322     struct userdata *u = userdata;
323     const pa_buffer_attr *bufferattr;
324     pa_assert(u);
325 
326     bufferattr = pa_stream_get_buffer_attr(u->stream);
327     pa_sink_set_max_request_within_thread(u->sink, bufferattr->tlength);
328 
329     pa_log_debug("Server reports buffer attrs changed. tlength now at %lu.",
330                  (unsigned long) bufferattr->tlength);
331 }
332 
333 /* called after we requested a change of the stream buffer_attr */
stream_set_buffer_attr_cb(pa_stream * stream,int success,void * userdata)334 static void stream_set_buffer_attr_cb(pa_stream *stream, int success, void *userdata) {
335     stream_changed_buffer_attr_cb(stream, userdata);
336 }
337 
338 /* called when the server experiences an underrun of our buffer */
stream_underflow_callback(pa_stream * stream,void * userdata)339 static void stream_underflow_callback(pa_stream *stream, void *userdata) {
340     pa_log_info("Server signalled buffer underrun.");
341 }
342 
343 /* called when the server experiences an overrun of our buffer */
stream_overflow_callback(pa_stream * stream,void * userdata)344 static void stream_overflow_callback(pa_stream *stream, void *userdata) {
345     pa_log_info("Server signalled buffer overrun.");
346 }
347 
348 /* Do a reinit of the module.  Note that u will be freed as a result of this
349  * call. */
maybe_restart(struct module_restart_data * rd)350 static void maybe_restart(struct module_restart_data *rd) {
351     struct userdata *u = rd->userdata;
352 
353     if (rd->restart_data) {
354         pa_log_debug("Restart already pending");
355         return;
356     }
357 
358     if (u->reconnect_interval_us > 0) {
359         /* The handle returned here must be freed when do_init() finishes successfully
360          * and when the module exits. */
361         rd->restart_data = pa_restart_module_reinit(u->module, do_init, do_done, u->reconnect_interval_us);
362     } else {
363         /* exit the module */
364         pa_module_unload_request(u->module, true);
365     }
366 }
367 
on_sink_created(struct userdata * u)368 static void on_sink_created(struct userdata *u) {
369     pa_proplist *proplist;
370     pa_buffer_attr bufferattr;
371     pa_usec_t requested_latency;
372     char *username = pa_get_user_name_malloc();
373     char *hostname = pa_get_host_name_malloc();
374     /* TODO: old tunnel put here the remote sink_name into stream name e.g. 'Null Output for lynxis@lazus' */
375     char *stream_name = pa_sprintf_malloc(_("Tunnel for %s@%s"), username, hostname);
376     pa_xfree(hostname);
377     pa_xfree(username);
378 
379     pa_assert_io_context();
380 
381     /* if we still don't have a sink, then sink creation failed, and we should
382      * kill this io thread */
383     if (!u->sink) {
384         pa_log_error("Could not create a sink.");
385         u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
386         return;
387     }
388 
389     proplist = tunnel_new_proplist(u);
390     u->stream = pa_stream_new_with_proplist(u->context,
391                                             stream_name,
392                                             &u->sink->sample_spec,
393                                             &u->sink->channel_map,
394                                             proplist);
395     pa_proplist_free(proplist);
396     pa_xfree(stream_name);
397 
398     if (!u->stream) {
399         pa_log_error("Could not create a stream.");
400         u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
401         return;
402     }
403 
404     requested_latency = pa_sink_get_requested_latency_within_thread(u->sink);
405     if (requested_latency == (pa_usec_t) -1)
406         requested_latency = u->sink->thread_info.max_latency;
407 
408     reset_bufferattr(&bufferattr);
409     bufferattr.tlength = pa_usec_to_bytes(requested_latency, &u->sink->sample_spec);
410 
411     pa_log_debug("tlength requested at %lu.", (unsigned long) bufferattr.tlength);
412 
413     pa_stream_set_state_callback(u->stream, stream_state_cb, u);
414     pa_stream_set_buffer_attr_callback(u->stream, stream_changed_buffer_attr_cb, u);
415     pa_stream_set_underflow_callback(u->stream, stream_underflow_callback, u);
416     pa_stream_set_overflow_callback(u->stream, stream_overflow_callback, u);
417     if (pa_stream_connect_playback(u->stream,
418                                    u->remote_sink_name,
419                                    &bufferattr,
420                                    PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_DONT_MOVE | PA_STREAM_START_CORKED | PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_ADJUST_LATENCY,
421                                    NULL,
422                                    NULL) < 0) {
423         pa_log_error("Could not connect stream.");
424         u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
425     }
426     u->connected = true;
427 }
428 
context_state_cb(pa_context * c,void * userdata)429 static void context_state_cb(pa_context *c, void *userdata) {
430     struct userdata *u = userdata;
431     pa_assert(u);
432 
433     switch (pa_context_get_state(c)) {
434         case PA_CONTEXT_UNCONNECTED:
435         case PA_CONTEXT_CONNECTING:
436         case PA_CONTEXT_AUTHORIZING:
437         case PA_CONTEXT_SETTING_NAME:
438             break;
439         case PA_CONTEXT_READY:
440             /* now that we're connected, ask the control thread to create a sink for
441              * us, and wait for that to complete before proceeding, we'll
442              * receive TUNNEL_MESSAGE_SINK_CREATED in response when the sink is
443              * created (see sink_process_msg_cb()) */
444             pa_log_debug("Connection successful. Creating stream.");
445             pa_assert(!u->stream);
446             pa_assert(!u->sink);
447 
448             pa_log_debug("Asking ctl thread to create sink.");
449             pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_CREATE_SINK_REQUEST, u, 0, NULL, NULL);
450             break;
451         case PA_CONTEXT_FAILED:
452             pa_log_debug("Context failed: %s.", pa_strerror(pa_context_errno(u->context)));
453             u->connected = false;
454             u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
455             break;
456         case PA_CONTEXT_TERMINATED:
457             pa_log_debug("Context terminated.");
458             u->connected = false;
459             u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
460             break;
461     }
462 }
463 
sink_update_requested_latency_cb(pa_sink * s)464 static void sink_update_requested_latency_cb(pa_sink *s) {
465     struct userdata *u;
466     pa_operation *operation;
467     size_t nbytes;
468     pa_usec_t block_usec;
469     pa_buffer_attr bufferattr;
470 
471     pa_sink_assert_ref(s);
472     pa_assert_se(u = s->userdata);
473 
474     block_usec = pa_sink_get_requested_latency_within_thread(s);
475     if (block_usec == (pa_usec_t) -1)
476         block_usec = s->thread_info.max_latency;
477 
478     nbytes = pa_usec_to_bytes(block_usec, &s->sample_spec);
479     pa_sink_set_max_request_within_thread(s, nbytes);
480 
481     if (u->stream) {
482         switch (pa_stream_get_state(u->stream)) {
483             case PA_STREAM_READY:
484                 if (pa_stream_get_buffer_attr(u->stream)->tlength == nbytes)
485                     break;
486 
487                 pa_log_debug("Requesting new buffer attrs. tlength requested at %lu.",
488                              (unsigned long) nbytes);
489 
490                 reset_bufferattr(&bufferattr);
491                 bufferattr.tlength = nbytes;
492                 if ((operation = pa_stream_set_buffer_attr(u->stream, &bufferattr, stream_set_buffer_attr_cb, u)))
493                     pa_operation_unref(operation);
494                 break;
495             case PA_STREAM_CREATING:
496                 /* we have to delay our request until stream is ready */
497                 u->update_stream_bufferattr_after_connect = true;
498                 break;
499             default:
500                 break;
501         }
502     }
503 }
504 
sink_process_msg_cb(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)505 static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
506     struct userdata *u = PA_SINK(o)->userdata;
507 
508     switch (code) {
509         case PA_SINK_MESSAGE_GET_LATENCY: {
510             int negative;
511             pa_usec_t remote_latency;
512 
513             if (!PA_SINK_IS_LINKED(u->sink->thread_info.state)) {
514                 *((int64_t*) data) = 0;
515                 return 0;
516             }
517 
518             if (!u->stream) {
519                 *((int64_t*) data) = 0;
520                 return 0;
521             }
522 
523             if (pa_stream_get_state(u->stream) != PA_STREAM_READY) {
524                 *((int64_t*) data) = 0;
525                 return 0;
526             }
527 
528             if (pa_stream_get_latency(u->stream, &remote_latency, &negative) < 0) {
529                 *((int64_t*) data) = 0;
530                 return 0;
531             }
532 
533             *((int64_t*) data) = remote_latency;
534             return 0;
535         }
536         case TUNNEL_MESSAGE_SINK_CREATED:
537             on_sink_created(u);
538             return 0;
539     }
540     return pa_sink_process_msg(o, code, data, offset, chunk);
541 }
542 
543 /* Called from the IO thread. */
sink_set_state_in_io_thread_cb(pa_sink * s,pa_sink_state_t new_state,pa_suspend_cause_t new_suspend_cause)544 static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state, pa_suspend_cause_t new_suspend_cause) {
545     struct userdata *u;
546 
547     pa_assert(s);
548     pa_assert_se(u = s->userdata);
549 
550     /* It may be that only the suspend cause is changing, in which case there's
551      * nothing to do. */
552     if (new_state == s->thread_info.state)
553         return 0;
554 
555     if (!u->stream || pa_stream_get_state(u->stream) != PA_STREAM_READY)
556         return 0;
557 
558     switch (new_state) {
559         case PA_SINK_SUSPENDED: {
560             cork_stream(u, true);
561             break;
562         }
563         case PA_SINK_IDLE:
564         case PA_SINK_RUNNING: {
565             cork_stream(u, false);
566             break;
567         }
568         case PA_SINK_INVALID_STATE:
569         case PA_SINK_INIT:
570         case PA_SINK_UNLINKED:
571             break;
572     }
573 
574     return 0;
575 }
576 
577 /* Creates a sink in the main thread.
578  *
579  * This method is called when we receive a message from the io thread that a
580  * connection has been established with the server.  We defer creation of the
581  * sink until the connection is established, because we don't have a sink if
582  * the remote server isn't there.
583  */
create_sink(struct userdata * u)584 static void create_sink(struct userdata *u) {
585     pa_sink_new_data sink_data;
586 
587     pa_assert_ctl_context();
588 
589     /* Create sink */
590     pa_sink_new_data_init(&sink_data);
591     sink_data.driver = __FILE__;
592     sink_data.module = u->module;
593 
594     pa_sink_new_data_set_name(&sink_data, u->sink_name);
595     pa_sink_new_data_set_sample_spec(&sink_data, &u->sample_spec);
596     pa_sink_new_data_set_channel_map(&sink_data, &u->channel_map);
597 
598     pa_proplist_update(sink_data.proplist, PA_UPDATE_REPLACE, u->sink_proplist);
599 
600     if (!(u->sink = pa_sink_new(u->module->core, &sink_data, PA_SINK_LATENCY | PA_SINK_DYNAMIC_LATENCY | PA_SINK_NETWORK))) {
601         pa_log("Failed to create sink.");
602         goto finish;
603     }
604 
605     u->sink->userdata = u;
606     u->sink->parent.process_msg = sink_process_msg_cb;
607     u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
608     u->sink->update_requested_latency = sink_update_requested_latency_cb;
609     pa_sink_set_latency_range(u->sink, 0, MAX_LATENCY_USEC);
610 
611     /* set thread message queue */
612     pa_sink_set_asyncmsgq(u->sink, u->thread_mq->inq);
613     pa_sink_set_rtpoll(u->sink, u->rtpoll);
614 
615     pa_sink_put(u->sink);
616 
617 finish:
618     pa_sink_new_data_done(&sink_data);
619 
620     /* tell any interested io threads that the sink they asked for has now been
621      * created (even if we failed, we still notify the thread, so they can
622      * either handle or kill the thread, rather than deadlock waiting for a
623      * message that will never come */
624     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), TUNNEL_MESSAGE_SINK_CREATED, u, 0, NULL);
625 }
626 
627 /* Runs in PA mainloop context */
tunnel_process_msg(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)628 static int tunnel_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
629     struct userdata *u = (struct userdata *) data;
630 
631     pa_assert(u);
632     pa_assert_ctl_context();
633 
634     if (u->shutting_down)
635         return 0;
636 
637     switch (code) {
638         case TUNNEL_MESSAGE_CREATE_SINK_REQUEST:
639             create_sink(u);
640             break;
641         case TUNNEL_MESSAGE_MAYBE_RESTART:
642             maybe_restart(u->module->userdata);
643             break;
644     }
645 
646     return 0;
647 }
648 
do_init(pa_module * m)649 static int do_init(pa_module *m) {
650     struct userdata *u = NULL;
651     struct module_restart_data *rd;
652     pa_modargs *ma = NULL;
653     const char *remote_server = NULL;
654     char *default_sink_name = NULL;
655     uint32_t reconnect_interval_ms = 0;
656 
657     pa_assert(m);
658     pa_assert(m->userdata);
659 
660     rd = m->userdata;
661 
662     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
663         pa_log("Failed to parse module arguments.");
664         goto fail;
665     }
666 
667     u = pa_xnew0(struct userdata, 1);
668     u->module = m;
669     rd->userdata = u;
670 
671     u->sample_spec = m->core->default_sample_spec;
672     u->channel_map = m->core->default_channel_map;
673     if (pa_modargs_get_sample_spec_and_channel_map(ma, &u->sample_spec, &u->channel_map, PA_CHANNEL_MAP_DEFAULT) < 0) {
674         pa_log("Invalid sample format specification or channel map");
675         goto fail;
676     }
677 
678     remote_server = pa_modargs_get_value(ma, "server", NULL);
679     if (!remote_server) {
680         pa_log("No server given!");
681         goto fail;
682     }
683 
684     u->remote_server = pa_xstrdup(remote_server);
685     u->thread_mainloop = pa_mainloop_new();
686     if (u->thread_mainloop == NULL) {
687         pa_log("Failed to create mainloop");
688         goto fail;
689     }
690     u->thread_mainloop_api = pa_mainloop_get_api(u->thread_mainloop);
691     u->cookie_file = pa_xstrdup(pa_modargs_get_value(ma, "cookie", NULL));
692     u->remote_sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
693 
694     u->thread_mq = pa_xnew0(pa_thread_mq, 1);
695 
696     if (pa_thread_mq_init_thread_mainloop(u->thread_mq, m->core->mainloop, u->thread_mainloop_api) < 0) {
697         pa_log("pa_thread_mq_init_thread_mainloop() failed.");
698         goto fail;
699     }
700 
701     u->msg = pa_msgobject_new(tunnel_msg);
702     u->msg->parent.process_msg = tunnel_process_msg;
703 
704     /* The rtpoll created here is only run for the sake of module-combine-sink. It must
705      * exist to avoid crashes when module-tunnel-sink-new is used together with
706      * module-loopback or module-combine-sink. Both modules base their asyncmsgq on the
707      * rtpoll provided by the sink. module-loopback and combine-sink only work because
708      * they call pa_asyncmsq_process_one() themselves. module-combine-sink does this
709      * however only for the audio_inq, so without running the rtpoll, messages placed
710      * in control_inq would never be executed. */
711     u->rtpoll = pa_rtpoll_new();
712 
713     default_sink_name = pa_sprintf_malloc("tunnel-sink-new.%s", remote_server);
714     u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", default_sink_name));
715 
716     u->sink_proplist = pa_proplist_new();
717     pa_proplist_sets(u->sink_proplist, PA_PROP_DEVICE_CLASS, "sound");
718     pa_proplist_setf(u->sink_proplist,
719                      PA_PROP_DEVICE_DESCRIPTION,
720                      _("Tunnel to %s/%s"),
721                      remote_server,
722                      pa_strempty(u->remote_sink_name));
723 
724     if (pa_modargs_get_proplist(ma, "sink_properties", u->sink_proplist, PA_UPDATE_REPLACE) < 0) {
725         pa_log("Invalid properties");
726         goto fail;
727     }
728 
729     pa_modargs_get_value_u32(ma, "reconnect_interval_ms", &reconnect_interval_ms);
730     u->reconnect_interval_us = reconnect_interval_ms * PA_USEC_PER_MSEC;
731 
732     if (!(u->thread = pa_thread_new("tunnel-sink", thread_func, u))) {
733         pa_log("Failed to create thread.");
734         goto fail;
735     }
736 
737     /* If the module is restarting and do_init() finishes successfully, the
738      * restart data is no longer needed. If do_init() fails, don't touch the
739      * restart data, because following restart attempts will continue to use
740      * the same data. If restart_data is NULL, that means no restart is
741      * currently pending. */
742     if (rd->restart_data) {
743         pa_restart_free(rd->restart_data);
744         rd->restart_data = NULL;
745     }
746 
747     pa_modargs_free(ma);
748     pa_xfree(default_sink_name);
749 
750     return 0;
751 
752 fail:
753     if (ma)
754         pa_modargs_free(ma);
755 
756     if (default_sink_name)
757         pa_xfree(default_sink_name);
758 
759     return -1;
760 }
761 
do_done(pa_module * m)762 static void do_done(pa_module *m) {
763     struct userdata *u = NULL;
764     struct module_restart_data *rd;
765 
766     pa_assert(m);
767 
768     if (!(rd = m->userdata))
769         return;
770     if (!(u = rd->userdata))
771         return;
772 
773     u->shutting_down = true;
774 
775     if (u->sink)
776         pa_sink_unlink(u->sink);
777 
778     if (u->thread) {
779         pa_asyncmsgq_send(u->thread_mq->inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
780         pa_thread_free(u->thread);
781     }
782 
783     if (u->thread_mq) {
784         pa_thread_mq_done(u->thread_mq);
785         pa_xfree(u->thread_mq);
786     }
787 
788     if (u->thread_mainloop)
789         pa_mainloop_free(u->thread_mainloop);
790 
791     if (u->cookie_file)
792         pa_xfree(u->cookie_file);
793 
794     if (u->remote_sink_name)
795         pa_xfree(u->remote_sink_name);
796 
797     if (u->remote_server)
798         pa_xfree(u->remote_server);
799 
800     if (u->sink)
801         pa_sink_unref(u->sink);
802 
803     if (u->rtpoll)
804         pa_rtpoll_free(u->rtpoll);
805 
806     if (u->sink_proplist)
807         pa_proplist_free(u->sink_proplist);
808 
809     if (u->sink_name)
810         pa_xfree(u->sink_name);
811 
812     pa_xfree(u->msg);
813 
814     pa_xfree(u);
815 
816     rd->userdata = NULL;
817 }
818 
pa__init(pa_module * m)819 int pa__init(pa_module *m) {
820     int ret;
821 
822     pa_assert(m);
823 
824     m->userdata = pa_xnew0(struct module_restart_data, 1);
825 
826     ret = do_init(m);
827 
828     if (ret < 0)
829         pa__done(m);
830 
831     return ret;
832 }
833 
pa__done(pa_module * m)834 void pa__done(pa_module *m) {
835     pa_assert(m);
836 
837     do_done(m);
838 
839     if (m->userdata) {
840         struct module_restart_data *rd = m->userdata;
841 
842         if (rd->restart_data)
843             pa_restart_free(rd->restart_data);
844 
845         pa_xfree(m->userdata);
846     }
847 }
848