1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2013 Alexander Couzens
5
6 PulseAudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as published
8 by the Free Software Foundation; either version 2.1 of the License,
9 or (at your option) any later version.
10
11 PulseAudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public License
17 along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
18 ***/
19
20 #ifdef HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23
24 #include "restart-module.h"
25
26 #include <pulse/context.h>
27 #include <pulse/timeval.h>
28 #include <pulse/xmalloc.h>
29 #include <pulse/stream.h>
30 #include <pulse/mainloop.h>
31 #include <pulse/introspect.h>
32 #include <pulse/error.h>
33
34 #include <pulsecore/core.h>
35 #include <pulsecore/core-util.h>
36 #include <pulsecore/i18n.h>
37 #include <pulsecore/sink.h>
38 #include <pulsecore/modargs.h>
39 #include <pulsecore/log.h>
40 #include <pulsecore/thread.h>
41 #include <pulsecore/thread-mq.h>
42 #include <pulsecore/poll.h>
43 #include <pulsecore/rtpoll.h>
44 #include <pulsecore/proplist-util.h>
45
46 PA_MODULE_AUTHOR("Alexander Couzens");
47 PA_MODULE_DESCRIPTION("Create a network sink which connects via a stream to a remote PulseAudio server");
48 PA_MODULE_VERSION(PACKAGE_VERSION);
49 PA_MODULE_LOAD_ONCE(false);
50 PA_MODULE_USAGE(
51 "server=<address> "
52 "sink=<name of the remote sink> "
53 "sink_name=<name for the local sink> "
54 "sink_properties=<properties for the local sink> "
55 "reconnect_interval_ms=<interval to try reconnects, 0 or omitted if disabled> "
56 "format=<sample format> "
57 "channels=<number of channels> "
58 "rate=<sample rate> "
59 "channel_map=<channel map> "
60 "cookie=<cookie file path>"
61 );
62
63 #define MAX_LATENCY_USEC (200 * PA_USEC_PER_MSEC)
64 #define TUNNEL_THREAD_FAILED_MAINLOOP 1
65
66 static int do_init(pa_module *m);
67 static void do_done(pa_module *m);
68 static void stream_state_cb(pa_stream *stream, void *userdata);
69 static void stream_changed_buffer_attr_cb(pa_stream *stream, void *userdata);
70 static void stream_set_buffer_attr_cb(pa_stream *stream, int success, void *userdata);
71 static void context_state_cb(pa_context *c, void *userdata);
72 static void sink_update_requested_latency_cb(pa_sink *s);
73
74 struct tunnel_msg {
75 pa_msgobject parent;
76 };
77
78 typedef struct tunnel_msg tunnel_msg;
79 PA_DEFINE_PRIVATE_CLASS(tunnel_msg, pa_msgobject);
80
81 enum {
82 TUNNEL_MESSAGE_CREATE_SINK_REQUEST,
83 TUNNEL_MESSAGE_MAYBE_RESTART,
84 };
85
86 enum {
87 TUNNEL_MESSAGE_SINK_CREATED = PA_SINK_MESSAGE_MAX,
88 };
89
90 struct userdata {
91 pa_module *module;
92 pa_sink *sink;
93 pa_thread *thread;
94 pa_thread_mq *thread_mq;
95 pa_mainloop *thread_mainloop;
96 pa_mainloop_api *thread_mainloop_api;
97
98 pa_context *context;
99 pa_stream *stream;
100 pa_rtpoll *rtpoll;
101
102 bool update_stream_bufferattr_after_connect;
103
104 bool connected;
105 bool shutting_down;
106
107 char *cookie_file;
108 char *remote_server;
109 char *remote_sink_name;
110 char *sink_name;
111
112 pa_proplist *sink_proplist;
113 pa_sample_spec sample_spec;
114 pa_channel_map channel_map;
115
116 tunnel_msg *msg;
117
118 pa_usec_t reconnect_interval_us;
119 };
120
121 struct module_restart_data {
122 struct userdata *userdata;
123 pa_restart_data *restart_data;
124 };
125
126 static const char* const valid_modargs[] = {
127 "sink_name",
128 "sink_properties",
129 "server",
130 "sink",
131 "format",
132 "channels",
133 "rate",
134 "channel_map",
135 "cookie",
136 "reconnect_interval_ms",
137 NULL,
138 };
139
cork_stream(struct userdata * u,bool cork)140 static void cork_stream(struct userdata *u, bool cork) {
141 pa_operation *operation;
142
143 pa_assert(u);
144 pa_assert(u->stream);
145
146 if (cork) {
147 /* When the sink becomes suspended (which is the only case where we
148 * cork the stream), we don't want to keep any old data around, because
149 * the old data is most likely unrelated to the audio that will be
150 * played at the time when the sink starts running again. */
151 if ((operation = pa_stream_flush(u->stream, NULL, NULL)))
152 pa_operation_unref(operation);
153 }
154
155 if ((operation = pa_stream_cork(u->stream, cork, NULL, NULL)))
156 pa_operation_unref(operation);
157 }
158
reset_bufferattr(pa_buffer_attr * bufferattr)159 static void reset_bufferattr(pa_buffer_attr *bufferattr) {
160 pa_assert(bufferattr);
161 bufferattr->fragsize = (uint32_t) -1;
162 bufferattr->minreq = (uint32_t) -1;
163 bufferattr->maxlength = (uint32_t) -1;
164 bufferattr->prebuf = (uint32_t) -1;
165 bufferattr->tlength = (uint32_t) -1;
166 }
167
tunnel_new_proplist(struct userdata * u)168 static pa_proplist* tunnel_new_proplist(struct userdata *u) {
169 pa_proplist *proplist = pa_proplist_new();
170 pa_assert(proplist);
171 pa_proplist_sets(proplist, PA_PROP_APPLICATION_NAME, "PulseAudio");
172 pa_proplist_sets(proplist, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
173 pa_proplist_sets(proplist, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
174 pa_init_proplist(proplist);
175
176 return proplist;
177 }
178
thread_func(void * userdata)179 static void thread_func(void *userdata) {
180 struct userdata *u = userdata;
181 pa_proplist *proplist;
182
183 pa_assert(u);
184
185 pa_log_debug("Thread starting up");
186 pa_thread_mq_install(u->thread_mq);
187
188 proplist = tunnel_new_proplist(u);
189 u->context = pa_context_new_with_proplist(u->thread_mainloop_api,
190 "PulseAudio",
191 proplist);
192 pa_proplist_free(proplist);
193
194 if (!u->context) {
195 pa_log("Failed to create libpulse context");
196 goto fail;
197 }
198
199 if (u->cookie_file && pa_context_load_cookie_from_file(u->context, u->cookie_file) != 0) {
200 pa_log_error("Can not load cookie file!");
201 goto fail;
202 }
203
204 pa_context_set_state_callback(u->context, context_state_cb, u);
205 if (pa_context_connect(u->context,
206 u->remote_server,
207 PA_CONTEXT_NOAUTOSPAWN,
208 NULL) < 0) {
209 pa_log("Failed to connect libpulse context: %s", pa_strerror(pa_context_errno(u->context)));
210 goto fail;
211 }
212
213 for (;;) {
214 int ret;
215
216 if (pa_mainloop_iterate(u->thread_mainloop, 1, &ret) < 0) {
217 if (ret == 0)
218 goto finish;
219 else
220 goto fail;
221 }
222
223 if (u->sink && PA_UNLIKELY(u->sink->thread_info.rewind_requested))
224 pa_sink_process_rewind(u->sink, 0);
225
226 if (u->connected &&
227 pa_stream_get_state(u->stream) == PA_STREAM_READY &&
228 PA_SINK_IS_LINKED(u->sink->thread_info.state)) {
229 size_t writable;
230
231 writable = pa_stream_writable_size(u->stream);
232 if (writable > 0) {
233 pa_memchunk memchunk;
234 const void *p;
235
236 pa_sink_render_full(u->sink, writable, &memchunk);
237
238 pa_assert(memchunk.length > 0);
239
240 /* we have new data to write */
241 p = pa_memblock_acquire(memchunk.memblock);
242 /* TODO: Use pa_stream_begin_write() to reduce copying. */
243 ret = pa_stream_write(u->stream,
244 (uint8_t*) p + memchunk.index,
245 memchunk.length,
246 NULL, /**< A cleanup routine for the data or NULL to request an internal copy */
247 0, /** offset */
248 PA_SEEK_RELATIVE);
249 pa_memblock_release(memchunk.memblock);
250 pa_memblock_unref(memchunk.memblock);
251
252 if (ret != 0) {
253 pa_log_error("Could not write data into the stream ... ret = %i", ret);
254 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
255 }
256 }
257 }
258
259 /* Run the rtpoll to process messages that other modules (module-combine-sink,
260 * module-loopback and module-rtp-recv) may have placed in the queue. */
261 pa_rtpoll_set_timer_relative(u->rtpoll, 0);
262 if (pa_rtpoll_run(u->rtpoll) < 0)
263 goto fail;
264 }
265 fail:
266 /* send a message to the ctl thread to ask it to either terminate us, or
267 * restart us, but either way this thread will exit, so then wait for the
268 * shutdown message */
269 pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_MAYBE_RESTART, u, 0, NULL, NULL);
270 pa_asyncmsgq_wait_for(u->thread_mq->inq, PA_MESSAGE_SHUTDOWN);
271
272 finish:
273 if (u->stream) {
274 pa_stream_disconnect(u->stream);
275 pa_stream_unref(u->stream);
276 u->stream = NULL;
277 }
278
279 if (u->context) {
280 pa_context_disconnect(u->context);
281 pa_context_unref(u->context);
282 u->context = NULL;
283 }
284
285 pa_log_debug("Thread shutting down");
286 }
287
stream_state_cb(pa_stream * stream,void * userdata)288 static void stream_state_cb(pa_stream *stream, void *userdata) {
289 struct userdata *u = userdata;
290
291 pa_assert(u);
292
293 switch (pa_stream_get_state(stream)) {
294 case PA_STREAM_FAILED:
295 pa_log_error("Stream failed.");
296 u->connected = false;
297 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
298 break;
299 case PA_STREAM_TERMINATED:
300 pa_log_debug("Stream terminated.");
301 break;
302 case PA_STREAM_READY:
303 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
304 cork_stream(u, false);
305
306 /* Only call our requested_latency_cb when requested_latency
307 * changed between PA_STREAM_CREATING -> PA_STREAM_READY, because
308 * we don't want to override the initial tlength set by the server
309 * without a good reason. */
310 if (u->update_stream_bufferattr_after_connect)
311 sink_update_requested_latency_cb(u->sink);
312 else
313 stream_changed_buffer_attr_cb(stream, userdata);
314 case PA_STREAM_CREATING:
315 case PA_STREAM_UNCONNECTED:
316 break;
317 }
318 }
319
320 /* called when remote server changes the stream buffer_attr */
stream_changed_buffer_attr_cb(pa_stream * stream,void * userdata)321 static void stream_changed_buffer_attr_cb(pa_stream *stream, void *userdata) {
322 struct userdata *u = userdata;
323 const pa_buffer_attr *bufferattr;
324 pa_assert(u);
325
326 bufferattr = pa_stream_get_buffer_attr(u->stream);
327 pa_sink_set_max_request_within_thread(u->sink, bufferattr->tlength);
328
329 pa_log_debug("Server reports buffer attrs changed. tlength now at %lu.",
330 (unsigned long) bufferattr->tlength);
331 }
332
333 /* called after we requested a change of the stream buffer_attr */
stream_set_buffer_attr_cb(pa_stream * stream,int success,void * userdata)334 static void stream_set_buffer_attr_cb(pa_stream *stream, int success, void *userdata) {
335 stream_changed_buffer_attr_cb(stream, userdata);
336 }
337
338 /* called when the server experiences an underrun of our buffer */
stream_underflow_callback(pa_stream * stream,void * userdata)339 static void stream_underflow_callback(pa_stream *stream, void *userdata) {
340 pa_log_info("Server signalled buffer underrun.");
341 }
342
343 /* called when the server experiences an overrun of our buffer */
stream_overflow_callback(pa_stream * stream,void * userdata)344 static void stream_overflow_callback(pa_stream *stream, void *userdata) {
345 pa_log_info("Server signalled buffer overrun.");
346 }
347
348 /* Do a reinit of the module. Note that u will be freed as a result of this
349 * call. */
maybe_restart(struct module_restart_data * rd)350 static void maybe_restart(struct module_restart_data *rd) {
351 struct userdata *u = rd->userdata;
352
353 if (rd->restart_data) {
354 pa_log_debug("Restart already pending");
355 return;
356 }
357
358 if (u->reconnect_interval_us > 0) {
359 /* The handle returned here must be freed when do_init() finishes successfully
360 * and when the module exits. */
361 rd->restart_data = pa_restart_module_reinit(u->module, do_init, do_done, u->reconnect_interval_us);
362 } else {
363 /* exit the module */
364 pa_module_unload_request(u->module, true);
365 }
366 }
367
on_sink_created(struct userdata * u)368 static void on_sink_created(struct userdata *u) {
369 pa_proplist *proplist;
370 pa_buffer_attr bufferattr;
371 pa_usec_t requested_latency;
372 char *username = pa_get_user_name_malloc();
373 char *hostname = pa_get_host_name_malloc();
374 /* TODO: old tunnel put here the remote sink_name into stream name e.g. 'Null Output for lynxis@lazus' */
375 char *stream_name = pa_sprintf_malloc(_("Tunnel for %s@%s"), username, hostname);
376 pa_xfree(hostname);
377 pa_xfree(username);
378
379 pa_assert_io_context();
380
381 /* if we still don't have a sink, then sink creation failed, and we should
382 * kill this io thread */
383 if (!u->sink) {
384 pa_log_error("Could not create a sink.");
385 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
386 return;
387 }
388
389 proplist = tunnel_new_proplist(u);
390 u->stream = pa_stream_new_with_proplist(u->context,
391 stream_name,
392 &u->sink->sample_spec,
393 &u->sink->channel_map,
394 proplist);
395 pa_proplist_free(proplist);
396 pa_xfree(stream_name);
397
398 if (!u->stream) {
399 pa_log_error("Could not create a stream.");
400 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
401 return;
402 }
403
404 requested_latency = pa_sink_get_requested_latency_within_thread(u->sink);
405 if (requested_latency == (pa_usec_t) -1)
406 requested_latency = u->sink->thread_info.max_latency;
407
408 reset_bufferattr(&bufferattr);
409 bufferattr.tlength = pa_usec_to_bytes(requested_latency, &u->sink->sample_spec);
410
411 pa_log_debug("tlength requested at %lu.", (unsigned long) bufferattr.tlength);
412
413 pa_stream_set_state_callback(u->stream, stream_state_cb, u);
414 pa_stream_set_buffer_attr_callback(u->stream, stream_changed_buffer_attr_cb, u);
415 pa_stream_set_underflow_callback(u->stream, stream_underflow_callback, u);
416 pa_stream_set_overflow_callback(u->stream, stream_overflow_callback, u);
417 if (pa_stream_connect_playback(u->stream,
418 u->remote_sink_name,
419 &bufferattr,
420 PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_DONT_MOVE | PA_STREAM_START_CORKED | PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_ADJUST_LATENCY,
421 NULL,
422 NULL) < 0) {
423 pa_log_error("Could not connect stream.");
424 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
425 }
426 u->connected = true;
427 }
428
context_state_cb(pa_context * c,void * userdata)429 static void context_state_cb(pa_context *c, void *userdata) {
430 struct userdata *u = userdata;
431 pa_assert(u);
432
433 switch (pa_context_get_state(c)) {
434 case PA_CONTEXT_UNCONNECTED:
435 case PA_CONTEXT_CONNECTING:
436 case PA_CONTEXT_AUTHORIZING:
437 case PA_CONTEXT_SETTING_NAME:
438 break;
439 case PA_CONTEXT_READY:
440 /* now that we're connected, ask the control thread to create a sink for
441 * us, and wait for that to complete before proceeding, we'll
442 * receive TUNNEL_MESSAGE_SINK_CREATED in response when the sink is
443 * created (see sink_process_msg_cb()) */
444 pa_log_debug("Connection successful. Creating stream.");
445 pa_assert(!u->stream);
446 pa_assert(!u->sink);
447
448 pa_log_debug("Asking ctl thread to create sink.");
449 pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_CREATE_SINK_REQUEST, u, 0, NULL, NULL);
450 break;
451 case PA_CONTEXT_FAILED:
452 pa_log_debug("Context failed: %s.", pa_strerror(pa_context_errno(u->context)));
453 u->connected = false;
454 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
455 break;
456 case PA_CONTEXT_TERMINATED:
457 pa_log_debug("Context terminated.");
458 u->connected = false;
459 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
460 break;
461 }
462 }
463
sink_update_requested_latency_cb(pa_sink * s)464 static void sink_update_requested_latency_cb(pa_sink *s) {
465 struct userdata *u;
466 pa_operation *operation;
467 size_t nbytes;
468 pa_usec_t block_usec;
469 pa_buffer_attr bufferattr;
470
471 pa_sink_assert_ref(s);
472 pa_assert_se(u = s->userdata);
473
474 block_usec = pa_sink_get_requested_latency_within_thread(s);
475 if (block_usec == (pa_usec_t) -1)
476 block_usec = s->thread_info.max_latency;
477
478 nbytes = pa_usec_to_bytes(block_usec, &s->sample_spec);
479 pa_sink_set_max_request_within_thread(s, nbytes);
480
481 if (u->stream) {
482 switch (pa_stream_get_state(u->stream)) {
483 case PA_STREAM_READY:
484 if (pa_stream_get_buffer_attr(u->stream)->tlength == nbytes)
485 break;
486
487 pa_log_debug("Requesting new buffer attrs. tlength requested at %lu.",
488 (unsigned long) nbytes);
489
490 reset_bufferattr(&bufferattr);
491 bufferattr.tlength = nbytes;
492 if ((operation = pa_stream_set_buffer_attr(u->stream, &bufferattr, stream_set_buffer_attr_cb, u)))
493 pa_operation_unref(operation);
494 break;
495 case PA_STREAM_CREATING:
496 /* we have to delay our request until stream is ready */
497 u->update_stream_bufferattr_after_connect = true;
498 break;
499 default:
500 break;
501 }
502 }
503 }
504
sink_process_msg_cb(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)505 static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
506 struct userdata *u = PA_SINK(o)->userdata;
507
508 switch (code) {
509 case PA_SINK_MESSAGE_GET_LATENCY: {
510 int negative;
511 pa_usec_t remote_latency;
512
513 if (!PA_SINK_IS_LINKED(u->sink->thread_info.state)) {
514 *((int64_t*) data) = 0;
515 return 0;
516 }
517
518 if (!u->stream) {
519 *((int64_t*) data) = 0;
520 return 0;
521 }
522
523 if (pa_stream_get_state(u->stream) != PA_STREAM_READY) {
524 *((int64_t*) data) = 0;
525 return 0;
526 }
527
528 if (pa_stream_get_latency(u->stream, &remote_latency, &negative) < 0) {
529 *((int64_t*) data) = 0;
530 return 0;
531 }
532
533 *((int64_t*) data) = remote_latency;
534 return 0;
535 }
536 case TUNNEL_MESSAGE_SINK_CREATED:
537 on_sink_created(u);
538 return 0;
539 }
540 return pa_sink_process_msg(o, code, data, offset, chunk);
541 }
542
543 /* Called from the IO thread. */
sink_set_state_in_io_thread_cb(pa_sink * s,pa_sink_state_t new_state,pa_suspend_cause_t new_suspend_cause)544 static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state, pa_suspend_cause_t new_suspend_cause) {
545 struct userdata *u;
546
547 pa_assert(s);
548 pa_assert_se(u = s->userdata);
549
550 /* It may be that only the suspend cause is changing, in which case there's
551 * nothing to do. */
552 if (new_state == s->thread_info.state)
553 return 0;
554
555 if (!u->stream || pa_stream_get_state(u->stream) != PA_STREAM_READY)
556 return 0;
557
558 switch (new_state) {
559 case PA_SINK_SUSPENDED: {
560 cork_stream(u, true);
561 break;
562 }
563 case PA_SINK_IDLE:
564 case PA_SINK_RUNNING: {
565 cork_stream(u, false);
566 break;
567 }
568 case PA_SINK_INVALID_STATE:
569 case PA_SINK_INIT:
570 case PA_SINK_UNLINKED:
571 break;
572 }
573
574 return 0;
575 }
576
577 /* Creates a sink in the main thread.
578 *
579 * This method is called when we receive a message from the io thread that a
580 * connection has been established with the server. We defer creation of the
581 * sink until the connection is established, because we don't have a sink if
582 * the remote server isn't there.
583 */
create_sink(struct userdata * u)584 static void create_sink(struct userdata *u) {
585 pa_sink_new_data sink_data;
586
587 pa_assert_ctl_context();
588
589 /* Create sink */
590 pa_sink_new_data_init(&sink_data);
591 sink_data.driver = __FILE__;
592 sink_data.module = u->module;
593
594 pa_sink_new_data_set_name(&sink_data, u->sink_name);
595 pa_sink_new_data_set_sample_spec(&sink_data, &u->sample_spec);
596 pa_sink_new_data_set_channel_map(&sink_data, &u->channel_map);
597
598 pa_proplist_update(sink_data.proplist, PA_UPDATE_REPLACE, u->sink_proplist);
599
600 if (!(u->sink = pa_sink_new(u->module->core, &sink_data, PA_SINK_LATENCY | PA_SINK_DYNAMIC_LATENCY | PA_SINK_NETWORK))) {
601 pa_log("Failed to create sink.");
602 goto finish;
603 }
604
605 u->sink->userdata = u;
606 u->sink->parent.process_msg = sink_process_msg_cb;
607 u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
608 u->sink->update_requested_latency = sink_update_requested_latency_cb;
609 pa_sink_set_latency_range(u->sink, 0, MAX_LATENCY_USEC);
610
611 /* set thread message queue */
612 pa_sink_set_asyncmsgq(u->sink, u->thread_mq->inq);
613 pa_sink_set_rtpoll(u->sink, u->rtpoll);
614
615 pa_sink_put(u->sink);
616
617 finish:
618 pa_sink_new_data_done(&sink_data);
619
620 /* tell any interested io threads that the sink they asked for has now been
621 * created (even if we failed, we still notify the thread, so they can
622 * either handle or kill the thread, rather than deadlock waiting for a
623 * message that will never come */
624 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), TUNNEL_MESSAGE_SINK_CREATED, u, 0, NULL);
625 }
626
627 /* Runs in PA mainloop context */
tunnel_process_msg(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)628 static int tunnel_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
629 struct userdata *u = (struct userdata *) data;
630
631 pa_assert(u);
632 pa_assert_ctl_context();
633
634 if (u->shutting_down)
635 return 0;
636
637 switch (code) {
638 case TUNNEL_MESSAGE_CREATE_SINK_REQUEST:
639 create_sink(u);
640 break;
641 case TUNNEL_MESSAGE_MAYBE_RESTART:
642 maybe_restart(u->module->userdata);
643 break;
644 }
645
646 return 0;
647 }
648
do_init(pa_module * m)649 static int do_init(pa_module *m) {
650 struct userdata *u = NULL;
651 struct module_restart_data *rd;
652 pa_modargs *ma = NULL;
653 const char *remote_server = NULL;
654 char *default_sink_name = NULL;
655 uint32_t reconnect_interval_ms = 0;
656
657 pa_assert(m);
658 pa_assert(m->userdata);
659
660 rd = m->userdata;
661
662 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
663 pa_log("Failed to parse module arguments.");
664 goto fail;
665 }
666
667 u = pa_xnew0(struct userdata, 1);
668 u->module = m;
669 rd->userdata = u;
670
671 u->sample_spec = m->core->default_sample_spec;
672 u->channel_map = m->core->default_channel_map;
673 if (pa_modargs_get_sample_spec_and_channel_map(ma, &u->sample_spec, &u->channel_map, PA_CHANNEL_MAP_DEFAULT) < 0) {
674 pa_log("Invalid sample format specification or channel map");
675 goto fail;
676 }
677
678 remote_server = pa_modargs_get_value(ma, "server", NULL);
679 if (!remote_server) {
680 pa_log("No server given!");
681 goto fail;
682 }
683
684 u->remote_server = pa_xstrdup(remote_server);
685 u->thread_mainloop = pa_mainloop_new();
686 if (u->thread_mainloop == NULL) {
687 pa_log("Failed to create mainloop");
688 goto fail;
689 }
690 u->thread_mainloop_api = pa_mainloop_get_api(u->thread_mainloop);
691 u->cookie_file = pa_xstrdup(pa_modargs_get_value(ma, "cookie", NULL));
692 u->remote_sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
693
694 u->thread_mq = pa_xnew0(pa_thread_mq, 1);
695
696 if (pa_thread_mq_init_thread_mainloop(u->thread_mq, m->core->mainloop, u->thread_mainloop_api) < 0) {
697 pa_log("pa_thread_mq_init_thread_mainloop() failed.");
698 goto fail;
699 }
700
701 u->msg = pa_msgobject_new(tunnel_msg);
702 u->msg->parent.process_msg = tunnel_process_msg;
703
704 /* The rtpoll created here is only run for the sake of module-combine-sink. It must
705 * exist to avoid crashes when module-tunnel-sink-new is used together with
706 * module-loopback or module-combine-sink. Both modules base their asyncmsgq on the
707 * rtpoll provided by the sink. module-loopback and combine-sink only work because
708 * they call pa_asyncmsq_process_one() themselves. module-combine-sink does this
709 * however only for the audio_inq, so without running the rtpoll, messages placed
710 * in control_inq would never be executed. */
711 u->rtpoll = pa_rtpoll_new();
712
713 default_sink_name = pa_sprintf_malloc("tunnel-sink-new.%s", remote_server);
714 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", default_sink_name));
715
716 u->sink_proplist = pa_proplist_new();
717 pa_proplist_sets(u->sink_proplist, PA_PROP_DEVICE_CLASS, "sound");
718 pa_proplist_setf(u->sink_proplist,
719 PA_PROP_DEVICE_DESCRIPTION,
720 _("Tunnel to %s/%s"),
721 remote_server,
722 pa_strempty(u->remote_sink_name));
723
724 if (pa_modargs_get_proplist(ma, "sink_properties", u->sink_proplist, PA_UPDATE_REPLACE) < 0) {
725 pa_log("Invalid properties");
726 goto fail;
727 }
728
729 pa_modargs_get_value_u32(ma, "reconnect_interval_ms", &reconnect_interval_ms);
730 u->reconnect_interval_us = reconnect_interval_ms * PA_USEC_PER_MSEC;
731
732 if (!(u->thread = pa_thread_new("tunnel-sink", thread_func, u))) {
733 pa_log("Failed to create thread.");
734 goto fail;
735 }
736
737 /* If the module is restarting and do_init() finishes successfully, the
738 * restart data is no longer needed. If do_init() fails, don't touch the
739 * restart data, because following restart attempts will continue to use
740 * the same data. If restart_data is NULL, that means no restart is
741 * currently pending. */
742 if (rd->restart_data) {
743 pa_restart_free(rd->restart_data);
744 rd->restart_data = NULL;
745 }
746
747 pa_modargs_free(ma);
748 pa_xfree(default_sink_name);
749
750 return 0;
751
752 fail:
753 if (ma)
754 pa_modargs_free(ma);
755
756 if (default_sink_name)
757 pa_xfree(default_sink_name);
758
759 return -1;
760 }
761
do_done(pa_module * m)762 static void do_done(pa_module *m) {
763 struct userdata *u = NULL;
764 struct module_restart_data *rd;
765
766 pa_assert(m);
767
768 if (!(rd = m->userdata))
769 return;
770 if (!(u = rd->userdata))
771 return;
772
773 u->shutting_down = true;
774
775 if (u->sink)
776 pa_sink_unlink(u->sink);
777
778 if (u->thread) {
779 pa_asyncmsgq_send(u->thread_mq->inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
780 pa_thread_free(u->thread);
781 }
782
783 if (u->thread_mq) {
784 pa_thread_mq_done(u->thread_mq);
785 pa_xfree(u->thread_mq);
786 }
787
788 if (u->thread_mainloop)
789 pa_mainloop_free(u->thread_mainloop);
790
791 if (u->cookie_file)
792 pa_xfree(u->cookie_file);
793
794 if (u->remote_sink_name)
795 pa_xfree(u->remote_sink_name);
796
797 if (u->remote_server)
798 pa_xfree(u->remote_server);
799
800 if (u->sink)
801 pa_sink_unref(u->sink);
802
803 if (u->rtpoll)
804 pa_rtpoll_free(u->rtpoll);
805
806 if (u->sink_proplist)
807 pa_proplist_free(u->sink_proplist);
808
809 if (u->sink_name)
810 pa_xfree(u->sink_name);
811
812 pa_xfree(u->msg);
813
814 pa_xfree(u);
815
816 rd->userdata = NULL;
817 }
818
pa__init(pa_module * m)819 int pa__init(pa_module *m) {
820 int ret;
821
822 pa_assert(m);
823
824 m->userdata = pa_xnew0(struct module_restart_data, 1);
825
826 ret = do_init(m);
827
828 if (ret < 0)
829 pa__done(m);
830
831 return ret;
832 }
833
pa__done(pa_module * m)834 void pa__done(pa_module *m) {
835 pa_assert(m);
836
837 do_done(m);
838
839 if (m->userdata) {
840 struct module_restart_data *rd = m->userdata;
841
842 if (rd->restart_data)
843 pa_restart_free(rd->restart_data);
844
845 pa_xfree(m->userdata);
846 }
847 }
848