1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2013 Alexander Couzens
5
6 PulseAudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as published
8 by the Free Software Foundation; either version 2.1 of the License,
9 or (at your option) any later version.
10
11 PulseAudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public License
17 along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
18 ***/
19
20 #ifdef HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23
24 #include <pulse/context.h>
25 #include <pulse/timeval.h>
26 #include <pulse/xmalloc.h>
27 #include <pulse/stream.h>
28 #include <pulse/mainloop.h>
29 #include <pulse/introspect.h>
30 #include <pulse/error.h>
31
32 #include <pulsecore/core.h>
33 #include <pulsecore/core-util.h>
34 #include <pulsecore/i18n.h>
35 #include <pulsecore/sink.h>
36 #include <pulsecore/modargs.h>
37 #include <pulsecore/log.h>
38 #include <pulsecore/thread.h>
39 #include <pulsecore/thread-mq.h>
40 #include <pulsecore/poll.h>
41 #include <pulsecore/rtpoll.h>
42 #include <pulsecore/proplist-util.h>
43
44 PA_MODULE_AUTHOR("Alexander Couzens");
45 PA_MODULE_DESCRIPTION("Create a network sink which connects via a stream to a remote PulseAudio server");
46 PA_MODULE_VERSION(PACKAGE_VERSION);
47 PA_MODULE_LOAD_ONCE(false);
48 PA_MODULE_USAGE(
49 "server=<address> "
50 "sink=<name of the remote sink> "
51 "sink_name=<name for the local sink> "
52 "sink_properties=<properties for the local sink> "
53 "format=<sample format> "
54 "channels=<number of channels> "
55 "rate=<sample rate> "
56 "channel_map=<channel map> "
57 "cookie=<cookie file path>"
58 );
59
60 #define MAX_LATENCY_USEC (200 * PA_USEC_PER_MSEC)
61 #define TUNNEL_THREAD_FAILED_MAINLOOP 1
62
63 static void stream_state_cb(pa_stream *stream, void *userdata);
64 static void stream_changed_buffer_attr_cb(pa_stream *stream, void *userdata);
65 static void stream_set_buffer_attr_cb(pa_stream *stream, int success, void *userdata);
66 static void context_state_cb(pa_context *c, void *userdata);
67 static void sink_update_requested_latency_cb(pa_sink *s);
68
69 struct userdata {
70 pa_module *module;
71 pa_sink *sink;
72 pa_thread *thread;
73 pa_thread_mq *thread_mq;
74 pa_mainloop *thread_mainloop;
75 pa_mainloop_api *thread_mainloop_api;
76
77 pa_context *context;
78 pa_stream *stream;
79 pa_rtpoll *rtpoll;
80
81 bool update_stream_bufferattr_after_connect;
82
83 bool connected;
84
85 char *cookie_file;
86 char *remote_server;
87 char *remote_sink_name;
88 };
89
90 static const char* const valid_modargs[] = {
91 "sink_name",
92 "sink_properties",
93 "server",
94 "sink",
95 "format",
96 "channels",
97 "rate",
98 "channel_map",
99 "cookie",
100 /* "reconnect", reconnect if server comes back again - unimplemented */
101 NULL,
102 };
103
cork_stream(struct userdata * u,bool cork)104 static void cork_stream(struct userdata *u, bool cork) {
105 pa_operation *operation;
106
107 pa_assert(u);
108 pa_assert(u->stream);
109
110 if (cork) {
111 /* When the sink becomes suspended (which is the only case where we
112 * cork the stream), we don't want to keep any old data around, because
113 * the old data is most likely unrelated to the audio that will be
114 * played at the time when the sink starts running again. */
115 if ((operation = pa_stream_flush(u->stream, NULL, NULL)))
116 pa_operation_unref(operation);
117 }
118
119 if ((operation = pa_stream_cork(u->stream, cork, NULL, NULL)))
120 pa_operation_unref(operation);
121 }
122
reset_bufferattr(pa_buffer_attr * bufferattr)123 static void reset_bufferattr(pa_buffer_attr *bufferattr) {
124 pa_assert(bufferattr);
125 bufferattr->fragsize = (uint32_t) -1;
126 bufferattr->minreq = (uint32_t) -1;
127 bufferattr->maxlength = (uint32_t) -1;
128 bufferattr->prebuf = (uint32_t) -1;
129 bufferattr->tlength = (uint32_t) -1;
130 }
131
tunnel_new_proplist(struct userdata * u)132 static pa_proplist* tunnel_new_proplist(struct userdata *u) {
133 pa_proplist *proplist = pa_proplist_new();
134 pa_assert(proplist);
135 pa_proplist_sets(proplist, PA_PROP_APPLICATION_NAME, "PulseAudio");
136 pa_proplist_sets(proplist, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
137 pa_proplist_sets(proplist, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
138 pa_init_proplist(proplist);
139
140 return proplist;
141 }
142
thread_func(void * userdata)143 static void thread_func(void *userdata) {
144 struct userdata *u = userdata;
145 pa_proplist *proplist;
146 pa_assert(u);
147
148 pa_log_debug("Thread starting up");
149 pa_thread_mq_install(u->thread_mq);
150
151 proplist = tunnel_new_proplist(u);
152 u->context = pa_context_new_with_proplist(u->thread_mainloop_api,
153 "PulseAudio",
154 proplist);
155 pa_proplist_free(proplist);
156
157 if (!u->context) {
158 pa_log("Failed to create libpulse context");
159 goto fail;
160 }
161
162 if (u->cookie_file && pa_context_load_cookie_from_file(u->context, u->cookie_file) != 0) {
163 pa_log_error("Can not load cookie file!");
164 goto fail;
165 }
166
167 pa_context_set_state_callback(u->context, context_state_cb, u);
168 if (pa_context_connect(u->context,
169 u->remote_server,
170 PA_CONTEXT_NOAUTOSPAWN,
171 NULL) < 0) {
172 pa_log("Failed to connect libpulse context");
173 goto fail;
174 }
175
176 for (;;) {
177 int ret;
178
179 if (pa_mainloop_iterate(u->thread_mainloop, 1, &ret) < 0) {
180 if (ret == 0)
181 goto finish;
182 else
183 goto fail;
184 }
185
186 if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
187 pa_sink_process_rewind(u->sink, 0);
188
189 if (u->connected &&
190 pa_stream_get_state(u->stream) == PA_STREAM_READY &&
191 PA_SINK_IS_LINKED(u->sink->thread_info.state)) {
192 size_t writable;
193
194 writable = pa_stream_writable_size(u->stream);
195 if (writable > 0) {
196 pa_memchunk memchunk;
197 const void *p;
198
199 pa_sink_render_full(u->sink, writable, &memchunk);
200
201 pa_assert(memchunk.length > 0);
202
203 /* we have new data to write */
204 p = pa_memblock_acquire(memchunk.memblock);
205 /* TODO: Use pa_stream_begin_write() to reduce copying. */
206 ret = pa_stream_write(u->stream,
207 (uint8_t*) p + memchunk.index,
208 memchunk.length,
209 NULL, /**< A cleanup routine for the data or NULL to request an internal copy */
210 0, /** offset */
211 PA_SEEK_RELATIVE);
212 pa_memblock_release(memchunk.memblock);
213 pa_memblock_unref(memchunk.memblock);
214
215 if (ret != 0) {
216 pa_log_error("Could not write data into the stream ... ret = %i", ret);
217 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
218 }
219
220 }
221 }
222 }
223 fail:
224 pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->module->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
225 pa_asyncmsgq_wait_for(u->thread_mq->inq, PA_MESSAGE_SHUTDOWN);
226
227 finish:
228 if (u->stream) {
229 pa_stream_disconnect(u->stream);
230 pa_stream_unref(u->stream);
231 u->stream = NULL;
232 }
233
234 if (u->context) {
235 pa_context_disconnect(u->context);
236 pa_context_unref(u->context);
237 u->context = NULL;
238 }
239
240 pa_log_debug("Thread shutting down");
241 }
242
stream_state_cb(pa_stream * stream,void * userdata)243 static void stream_state_cb(pa_stream *stream, void *userdata) {
244 struct userdata *u = userdata;
245
246 pa_assert(u);
247
248 switch (pa_stream_get_state(stream)) {
249 case PA_STREAM_FAILED:
250 pa_log_error("Stream failed.");
251 u->connected = false;
252 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
253 break;
254 case PA_STREAM_TERMINATED:
255 pa_log_debug("Stream terminated.");
256 break;
257 case PA_STREAM_READY:
258 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
259 cork_stream(u, false);
260
261 /* Only call our requested_latency_cb when requested_latency
262 * changed between PA_STREAM_CREATING -> PA_STREAM_READY, because
263 * we don't want to override the initial tlength set by the server
264 * without a good reason. */
265 if (u->update_stream_bufferattr_after_connect)
266 sink_update_requested_latency_cb(u->sink);
267 else
268 stream_changed_buffer_attr_cb(stream, userdata);
269 case PA_STREAM_CREATING:
270 case PA_STREAM_UNCONNECTED:
271 break;
272 }
273 }
274
275 /* called when remote server changes the stream buffer_attr */
stream_changed_buffer_attr_cb(pa_stream * stream,void * userdata)276 static void stream_changed_buffer_attr_cb(pa_stream *stream, void *userdata) {
277 struct userdata *u = userdata;
278 const pa_buffer_attr *bufferattr;
279 pa_assert(u);
280
281 bufferattr = pa_stream_get_buffer_attr(u->stream);
282 pa_sink_set_max_request_within_thread(u->sink, bufferattr->tlength);
283 }
284
285 /* called after we requested a change of the stream buffer_attr */
stream_set_buffer_attr_cb(pa_stream * stream,int success,void * userdata)286 static void stream_set_buffer_attr_cb(pa_stream *stream, int success, void *userdata) {
287 stream_changed_buffer_attr_cb(stream, userdata);
288 }
289
context_state_cb(pa_context * c,void * userdata)290 static void context_state_cb(pa_context *c, void *userdata) {
291 struct userdata *u = userdata;
292 pa_assert(u);
293
294 switch (pa_context_get_state(c)) {
295 case PA_CONTEXT_UNCONNECTED:
296 case PA_CONTEXT_CONNECTING:
297 case PA_CONTEXT_AUTHORIZING:
298 case PA_CONTEXT_SETTING_NAME:
299 break;
300 case PA_CONTEXT_READY: {
301 pa_proplist *proplist;
302 pa_buffer_attr bufferattr;
303 pa_usec_t requested_latency;
304 char *username = pa_get_user_name_malloc();
305 char *hostname = pa_get_host_name_malloc();
306 /* TODO: old tunnel put here the remote sink_name into stream name e.g. 'Null Output for lynxis@lazus' */
307 char *stream_name = pa_sprintf_malloc(_("Tunnel for %s@%s"), username, hostname);
308 pa_xfree(hostname);
309 pa_xfree(username);
310
311 pa_log_debug("Connection successful. Creating stream.");
312 pa_assert(!u->stream);
313
314 proplist = tunnel_new_proplist(u);
315 u->stream = pa_stream_new_with_proplist(u->context,
316 stream_name,
317 &u->sink->sample_spec,
318 &u->sink->channel_map,
319 proplist);
320 pa_proplist_free(proplist);
321 pa_xfree(stream_name);
322
323 if (!u->stream) {
324 pa_log_error("Could not create a stream.");
325 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
326 return;
327 }
328
329 requested_latency = pa_sink_get_requested_latency_within_thread(u->sink);
330 if (requested_latency == (pa_usec_t) -1)
331 requested_latency = u->sink->thread_info.max_latency;
332
333 reset_bufferattr(&bufferattr);
334 bufferattr.tlength = pa_usec_to_bytes(requested_latency, &u->sink->sample_spec);
335
336 pa_stream_set_state_callback(u->stream, stream_state_cb, userdata);
337 pa_stream_set_buffer_attr_callback(u->stream, stream_changed_buffer_attr_cb, userdata);
338 if (pa_stream_connect_playback(u->stream,
339 u->remote_sink_name,
340 &bufferattr,
341 PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_DONT_MOVE | PA_STREAM_START_CORKED | PA_STREAM_AUTO_TIMING_UPDATE,
342 NULL,
343 NULL) < 0) {
344 pa_log_error("Could not connect stream.");
345 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
346 }
347 u->connected = true;
348 break;
349 }
350 case PA_CONTEXT_FAILED:
351 pa_log_debug("Context failed: %s.", pa_strerror(pa_context_errno(u->context)));
352 u->connected = false;
353 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
354 break;
355 case PA_CONTEXT_TERMINATED:
356 pa_log_debug("Context terminated.");
357 u->connected = false;
358 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
359 break;
360 }
361 }
362
sink_update_requested_latency_cb(pa_sink * s)363 static void sink_update_requested_latency_cb(pa_sink *s) {
364 struct userdata *u;
365 pa_operation *operation;
366 size_t nbytes;
367 pa_usec_t block_usec;
368 pa_buffer_attr bufferattr;
369
370 pa_sink_assert_ref(s);
371 pa_assert_se(u = s->userdata);
372
373 block_usec = pa_sink_get_requested_latency_within_thread(s);
374 if (block_usec == (pa_usec_t) -1)
375 block_usec = s->thread_info.max_latency;
376
377 nbytes = pa_usec_to_bytes(block_usec, &s->sample_spec);
378 pa_sink_set_max_request_within_thread(s, nbytes);
379
380 if (u->stream) {
381 switch (pa_stream_get_state(u->stream)) {
382 case PA_STREAM_READY:
383 if (pa_stream_get_buffer_attr(u->stream)->tlength == nbytes)
384 break;
385
386 reset_bufferattr(&bufferattr);
387 bufferattr.tlength = nbytes;
388 if ((operation = pa_stream_set_buffer_attr(u->stream, &bufferattr, stream_set_buffer_attr_cb, u)))
389 pa_operation_unref(operation);
390 break;
391 case PA_STREAM_CREATING:
392 /* we have to delay our request until stream is ready */
393 u->update_stream_bufferattr_after_connect = true;
394 break;
395 default:
396 break;
397 }
398 }
399 }
400
sink_process_msg_cb(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)401 static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
402 struct userdata *u = PA_SINK(o)->userdata;
403
404 switch (code) {
405 case PA_SINK_MESSAGE_GET_LATENCY: {
406 int negative;
407 pa_usec_t remote_latency;
408
409 if (!PA_SINK_IS_LINKED(u->sink->thread_info.state)) {
410 *((int64_t*) data) = 0;
411 return 0;
412 }
413
414 if (!u->stream) {
415 *((int64_t*) data) = 0;
416 return 0;
417 }
418
419 if (pa_stream_get_state(u->stream) != PA_STREAM_READY) {
420 *((int64_t*) data) = 0;
421 return 0;
422 }
423
424 if (pa_stream_get_latency(u->stream, &remote_latency, &negative) < 0) {
425 *((int64_t*) data) = 0;
426 return 0;
427 }
428
429 *((int64_t*) data) = remote_latency;
430 return 0;
431 }
432 }
433 return pa_sink_process_msg(o, code, data, offset, chunk);
434 }
435
436 /* Called from the IO thread. */
sink_set_state_in_io_thread_cb(pa_sink * s,pa_sink_state_t new_state,pa_suspend_cause_t new_suspend_cause)437 static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state, pa_suspend_cause_t new_suspend_cause) {
438 struct userdata *u;
439
440 pa_assert(s);
441 pa_assert_se(u = s->userdata);
442
443 /* It may be that only the suspend cause is changing, in which case there's
444 * nothing to do. */
445 if (new_state == s->thread_info.state)
446 return 0;
447
448 if (!u->stream || pa_stream_get_state(u->stream) != PA_STREAM_READY)
449 return 0;
450
451 switch (new_state) {
452 case PA_SINK_SUSPENDED: {
453 cork_stream(u, true);
454 break;
455 }
456 case PA_SINK_IDLE:
457 case PA_SINK_RUNNING: {
458 cork_stream(u, false);
459 break;
460 }
461 case PA_SINK_INVALID_STATE:
462 case PA_SINK_INIT:
463 case PA_SINK_UNLINKED:
464 break;
465 }
466
467 return 0;
468 }
469
pa__init(pa_module * m)470 int pa__init(pa_module *m) {
471 struct userdata *u = NULL;
472 pa_modargs *ma = NULL;
473 pa_sink_new_data sink_data;
474 pa_sample_spec ss;
475 pa_channel_map map;
476 const char *remote_server = NULL;
477 const char *sink_name = NULL;
478 char *default_sink_name = NULL;
479
480 pa_assert(m);
481
482 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
483 pa_log("Failed to parse module arguments.");
484 goto fail;
485 }
486
487 ss = m->core->default_sample_spec;
488 map = m->core->default_channel_map;
489 if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
490 pa_log("Invalid sample format specification or channel map");
491 goto fail;
492 }
493
494 remote_server = pa_modargs_get_value(ma, "server", NULL);
495 if (!remote_server) {
496 pa_log("No server given!");
497 goto fail;
498 }
499
500 u = pa_xnew0(struct userdata, 1);
501 u->module = m;
502 m->userdata = u;
503 u->remote_server = pa_xstrdup(remote_server);
504 u->thread_mainloop = pa_mainloop_new();
505 if (u->thread_mainloop == NULL) {
506 pa_log("Failed to create mainloop");
507 goto fail;
508 }
509 u->thread_mainloop_api = pa_mainloop_get_api(u->thread_mainloop);
510 u->cookie_file = pa_xstrdup(pa_modargs_get_value(ma, "cookie", NULL));
511 u->remote_sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
512
513 u->thread_mq = pa_xnew0(pa_thread_mq, 1);
514
515 if (pa_thread_mq_init_thread_mainloop(u->thread_mq, m->core->mainloop, u->thread_mainloop_api) < 0) {
516 pa_log("pa_thread_mq_init_thread_mainloop() failed.");
517 goto fail;
518 }
519
520 /* The rtpoll created here is never run. It is only necessary to avoid crashes
521 * when module-tunnel-sink-new is used together with module-loopback or
522 * module-combine-sink. Both modules base their asyncmsq on the rtpoll provided
523 * by the sink. module-loopback and combine-sink only work because they call
524 * pa_asyncmsq_process_one() themselves. module_rtp_recv also uses the rtpoll,
525 * but never calls pa_asyncmsq_process_one(), so it will not work in combination
526 * with module-tunnel-sink-new. */
527 u->rtpoll = pa_rtpoll_new();
528
529 /* Create sink */
530 pa_sink_new_data_init(&sink_data);
531 sink_data.driver = __FILE__;
532 sink_data.module = m;
533
534 default_sink_name = pa_sprintf_malloc("tunnel-sink-new.%s", remote_server);
535 sink_name = pa_modargs_get_value(ma, "sink_name", default_sink_name);
536
537 pa_sink_new_data_set_name(&sink_data, sink_name);
538 pa_sink_new_data_set_sample_spec(&sink_data, &ss);
539 pa_sink_new_data_set_channel_map(&sink_data, &map);
540
541 pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_CLASS, "sound");
542 pa_proplist_setf(sink_data.proplist,
543 PA_PROP_DEVICE_DESCRIPTION,
544 _("Tunnel to %s/%s"),
545 remote_server,
546 pa_strempty(u->remote_sink_name));
547
548 if (pa_modargs_get_proplist(ma, "sink_properties", sink_data.proplist, PA_UPDATE_REPLACE) < 0) {
549 pa_log("Invalid properties");
550 pa_sink_new_data_done(&sink_data);
551 goto fail;
552 }
553 if (!(u->sink = pa_sink_new(m->core, &sink_data, PA_SINK_LATENCY | PA_SINK_DYNAMIC_LATENCY | PA_SINK_NETWORK))) {
554 pa_log("Failed to create sink.");
555 pa_sink_new_data_done(&sink_data);
556 goto fail;
557 }
558
559 pa_sink_new_data_done(&sink_data);
560 u->sink->userdata = u;
561 u->sink->parent.process_msg = sink_process_msg_cb;
562 u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
563 u->sink->update_requested_latency = sink_update_requested_latency_cb;
564 pa_sink_set_latency_range(u->sink, 0, MAX_LATENCY_USEC);
565
566 /* set thread message queue */
567 pa_sink_set_asyncmsgq(u->sink, u->thread_mq->inq);
568 pa_sink_set_rtpoll(u->sink, u->rtpoll);
569
570 if (!(u->thread = pa_thread_new("tunnel-sink", thread_func, u))) {
571 pa_log("Failed to create thread.");
572 goto fail;
573 }
574
575 pa_sink_put(u->sink);
576 pa_modargs_free(ma);
577 pa_xfree(default_sink_name);
578
579 return 0;
580
581 fail:
582 if (ma)
583 pa_modargs_free(ma);
584
585 if (default_sink_name)
586 pa_xfree(default_sink_name);
587
588 pa__done(m);
589
590 return -1;
591 }
592
pa__done(pa_module * m)593 void pa__done(pa_module *m) {
594 struct userdata *u;
595
596 pa_assert(m);
597
598 if (!(u = m->userdata))
599 return;
600
601 if (u->sink)
602 pa_sink_unlink(u->sink);
603
604 if (u->thread) {
605 pa_asyncmsgq_send(u->thread_mq->inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
606 pa_thread_free(u->thread);
607 }
608
609 if (u->thread_mq) {
610 pa_thread_mq_done(u->thread_mq);
611 pa_xfree(u->thread_mq);
612 }
613
614 if (u->thread_mainloop)
615 pa_mainloop_free(u->thread_mainloop);
616
617 if (u->cookie_file)
618 pa_xfree(u->cookie_file);
619
620 if (u->remote_sink_name)
621 pa_xfree(u->remote_sink_name);
622
623 if (u->remote_server)
624 pa_xfree(u->remote_server);
625
626 if (u->sink)
627 pa_sink_unref(u->sink);
628
629 if (u->rtpoll)
630 pa_rtpoll_free(u->rtpoll);
631
632 pa_xfree(u);
633 }
634