1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2013 Alexander Couzens
5
6 PulseAudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as published
8 by the Free Software Foundation; either version 2.1 of the License,
9 or (at your option) any later version.
10
11 PulseAudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public License
17 along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
18 ***/
19
20 #ifdef HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23
24 #include <pulse/context.h>
25 #include <pulse/timeval.h>
26 #include <pulse/xmalloc.h>
27 #include <pulse/stream.h>
28 #include <pulse/mainloop.h>
29 #include <pulse/introspect.h>
30 #include <pulse/error.h>
31
32 #include <pulsecore/core.h>
33 #include <pulsecore/core-util.h>
34 #include <pulsecore/i18n.h>
35 #include <pulsecore/source.h>
36 #include <pulsecore/modargs.h>
37 #include <pulsecore/log.h>
38 #include <pulsecore/thread.h>
39 #include <pulsecore/thread-mq.h>
40 #include <pulsecore/poll.h>
41 #include <pulsecore/rtpoll.h>
42 #include <pulsecore/proplist-util.h>
43
44 PA_MODULE_AUTHOR("Alexander Couzens");
45 PA_MODULE_DESCRIPTION("Create a network source which connects via a stream to a remote PulseAudio server");
46 PA_MODULE_VERSION(PACKAGE_VERSION);
47 PA_MODULE_LOAD_ONCE(false);
48 PA_MODULE_USAGE(
49 "server=<address> "
50 "source=<name of the remote source> "
51 "source_name=<name for the local source> "
52 "source_properties=<properties for the local source> "
53 "format=<sample format> "
54 "channels=<number of channels> "
55 "rate=<sample rate> "
56 "channel_map=<channel map> "
57 "cookie=<cookie file path>"
58 );
59
60 #define TUNNEL_THREAD_FAILED_MAINLOOP 1
61
62 static void stream_state_cb(pa_stream *stream, void *userdata);
63 static void stream_read_cb(pa_stream *s, size_t length, void *userdata);
64 static void context_state_cb(pa_context *c, void *userdata);
65 static void source_update_requested_latency_cb(pa_source *s);
66
67 struct userdata {
68 pa_module *module;
69 pa_source *source;
70 pa_thread *thread;
71 pa_thread_mq *thread_mq;
72 pa_mainloop *thread_mainloop;
73 pa_mainloop_api *thread_mainloop_api;
74
75 pa_context *context;
76 pa_stream *stream;
77 pa_rtpoll *rtpoll;
78
79 bool update_stream_bufferattr_after_connect;
80 bool connected;
81 bool new_data;
82
83 char *cookie_file;
84 char *remote_server;
85 char *remote_source_name;
86 };
87
88 static const char* const valid_modargs[] = {
89 "source_name",
90 "source_properties",
91 "server",
92 "source",
93 "format",
94 "channels",
95 "rate",
96 "channel_map",
97 "cookie",
98 /* "reconnect", reconnect if server comes back again - unimplemented */
99 NULL,
100 };
101
cork_stream(struct userdata * u,bool cork)102 static void cork_stream(struct userdata *u, bool cork) {
103 pa_operation *operation;
104
105 pa_assert(u);
106 pa_assert(u->stream);
107
108 if ((operation = pa_stream_cork(u->stream, cork, NULL, NULL)))
109 pa_operation_unref(operation);
110 }
111
reset_bufferattr(pa_buffer_attr * bufferattr)112 static void reset_bufferattr(pa_buffer_attr *bufferattr) {
113 pa_assert(bufferattr);
114 bufferattr->fragsize = (uint32_t) -1;
115 bufferattr->minreq = (uint32_t) -1;
116 bufferattr->maxlength = (uint32_t) -1;
117 bufferattr->prebuf = (uint32_t) -1;
118 bufferattr->tlength = (uint32_t) -1;
119 }
120
tunnel_new_proplist(struct userdata * u)121 static pa_proplist* tunnel_new_proplist(struct userdata *u) {
122 pa_proplist *proplist = pa_proplist_new();
123 pa_assert(proplist);
124 pa_proplist_sets(proplist, PA_PROP_APPLICATION_NAME, "PulseAudio");
125 pa_proplist_sets(proplist, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
126 pa_proplist_sets(proplist, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
127 pa_init_proplist(proplist);
128
129 return proplist;
130 }
131
stream_read_cb(pa_stream * s,size_t length,void * userdata)132 static void stream_read_cb(pa_stream *s, size_t length, void *userdata) {
133 struct userdata *u = userdata;
134 u->new_data = true;
135 }
136
137 /* called from io context to read samples from the stream into our source */
read_new_samples(struct userdata * u)138 static void read_new_samples(struct userdata *u) {
139 const void *p;
140 size_t readable = 0;
141 pa_memchunk memchunk;
142
143 pa_assert(u);
144 u->new_data = false;
145
146 pa_memchunk_reset(&memchunk);
147
148 if (PA_UNLIKELY(!u->connected || pa_stream_get_state(u->stream) != PA_STREAM_READY))
149 return;
150
151 readable = pa_stream_readable_size(u->stream);
152 while (readable > 0) {
153 size_t nbytes = 0;
154 if (PA_UNLIKELY(pa_stream_peek(u->stream, &p, &nbytes) != 0)) {
155 pa_log("pa_stream_peek() failed: %s", pa_strerror(pa_context_errno(u->context)));
156 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
157 return;
158 }
159
160 if (PA_LIKELY(p)) {
161 /* we have valid data */
162 memchunk.memblock = pa_memblock_new_fixed(u->module->core->mempool, (void *) p, nbytes, true);
163 memchunk.length = nbytes;
164 memchunk.index = 0;
165
166 pa_source_post(u->source, &memchunk);
167 pa_memblock_unref_fixed(memchunk.memblock);
168 } else {
169 size_t bytes_to_generate = nbytes;
170
171 /* we have a hole. generate silence */
172 memchunk = u->source->silence;
173 pa_memblock_ref(memchunk.memblock);
174
175 while (bytes_to_generate > 0) {
176 if (bytes_to_generate < memchunk.length)
177 memchunk.length = bytes_to_generate;
178
179 pa_source_post(u->source, &memchunk);
180 bytes_to_generate -= memchunk.length;
181 }
182
183 pa_memblock_unref(memchunk.memblock);
184 }
185
186 pa_stream_drop(u->stream);
187 readable -= nbytes;
188 }
189 }
190
thread_func(void * userdata)191 static void thread_func(void *userdata) {
192 struct userdata *u = userdata;
193 pa_proplist *proplist;
194
195 pa_assert(u);
196
197 pa_log_debug("Thread starting up");
198 pa_thread_mq_install(u->thread_mq);
199
200 proplist = tunnel_new_proplist(u);
201 u->context = pa_context_new_with_proplist(u->thread_mainloop_api,
202 "PulseAudio",
203 proplist);
204 pa_proplist_free(proplist);
205
206 if (!u->context) {
207 pa_log("Failed to create libpulse context");
208 goto fail;
209 }
210
211 if (u->cookie_file && pa_context_load_cookie_from_file(u->context, u->cookie_file) != 0) {
212 pa_log_error("Can not load cookie file!");
213 goto fail;
214 }
215
216 pa_context_set_state_callback(u->context, context_state_cb, u);
217 if (pa_context_connect(u->context,
218 u->remote_server,
219 PA_CONTEXT_NOAUTOSPAWN,
220 NULL) < 0) {
221 pa_log("Failed to connect libpulse context: %s", pa_strerror(pa_context_errno(u->context)));
222 goto fail;
223 }
224
225 for (;;) {
226 int ret;
227
228 if (pa_mainloop_iterate(u->thread_mainloop, 1, &ret) < 0) {
229 if (ret == 0)
230 goto finish;
231 else
232 goto fail;
233 }
234
235 if (u->new_data)
236 read_new_samples(u);
237 }
238 fail:
239 pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->module->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
240 pa_asyncmsgq_wait_for(u->thread_mq->inq, PA_MESSAGE_SHUTDOWN);
241
242 finish:
243 if (u->stream) {
244 pa_stream_disconnect(u->stream);
245 pa_stream_unref(u->stream);
246 u->stream = NULL;
247 }
248
249 if (u->context) {
250 pa_context_disconnect(u->context);
251 pa_context_unref(u->context);
252 u->context = NULL;
253 }
254
255 pa_log_debug("Thread shutting down");
256 }
257
stream_state_cb(pa_stream * stream,void * userdata)258 static void stream_state_cb(pa_stream *stream, void *userdata) {
259 struct userdata *u = userdata;
260
261 pa_assert(u);
262
263 switch (pa_stream_get_state(stream)) {
264 case PA_STREAM_FAILED:
265 pa_log_error("Stream failed: %s", pa_strerror(pa_context_errno(u->context)));
266 u->connected = false;
267 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
268 break;
269 case PA_STREAM_TERMINATED:
270 pa_log_debug("Stream terminated.");
271 break;
272 case PA_STREAM_READY:
273 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
274 cork_stream(u, false);
275
276 /* Only call our requested_latency_cb when requested_latency
277 * changed between PA_STREAM_CREATING -> PA_STREAM_READY, because
278 * we don't want to override the initial fragsize set by the server
279 * without a good reason. */
280 if (u->update_stream_bufferattr_after_connect)
281 source_update_requested_latency_cb(u->source);
282 case PA_STREAM_UNCONNECTED:
283 case PA_STREAM_CREATING:
284 break;
285 }
286 }
287
context_state_cb(pa_context * c,void * userdata)288 static void context_state_cb(pa_context *c, void *userdata) {
289 struct userdata *u = userdata;
290 pa_assert(u);
291
292 switch (pa_context_get_state(c)) {
293 case PA_CONTEXT_UNCONNECTED:
294 case PA_CONTEXT_CONNECTING:
295 case PA_CONTEXT_AUTHORIZING:
296 case PA_CONTEXT_SETTING_NAME:
297 break;
298 case PA_CONTEXT_READY: {
299 pa_proplist *proplist;
300 pa_buffer_attr bufferattr;
301 pa_usec_t requested_latency;
302 char *username = pa_get_user_name_malloc();
303 char *hostname = pa_get_host_name_malloc();
304 /* TODO: old tunnel put here the remote source_name into stream name e.g. 'Null Output for lynxis@lazus' */
305 char *stream_name = pa_sprintf_malloc(_("Tunnel for %s@%s"), username, hostname);
306 pa_xfree(username);
307 pa_xfree(hostname);
308
309 pa_log_debug("Connection successful. Creating stream.");
310 pa_assert(!u->stream);
311
312 proplist = tunnel_new_proplist(u);
313 u->stream = pa_stream_new_with_proplist(u->context,
314 stream_name,
315 &u->source->sample_spec,
316 &u->source->channel_map,
317 proplist);
318 pa_proplist_free(proplist);
319 pa_xfree(stream_name);
320
321 if (!u->stream) {
322 pa_log_error("Could not create a stream: %s", pa_strerror(pa_context_errno(u->context)));
323 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
324 return;
325 }
326
327 requested_latency = pa_source_get_requested_latency_within_thread(u->source);
328 if (requested_latency == (uint32_t) -1)
329 requested_latency = u->source->thread_info.max_latency;
330
331 reset_bufferattr(&bufferattr);
332 bufferattr.fragsize = pa_usec_to_bytes(requested_latency, &u->source->sample_spec);
333
334 pa_stream_set_state_callback(u->stream, stream_state_cb, userdata);
335 pa_stream_set_read_callback(u->stream, stream_read_cb, userdata);
336 if (pa_stream_connect_record(u->stream,
337 u->remote_source_name,
338 &bufferattr,
339 PA_STREAM_INTERPOLATE_TIMING|PA_STREAM_DONT_MOVE|PA_STREAM_AUTO_TIMING_UPDATE|PA_STREAM_START_CORKED) < 0) {
340 pa_log_debug("Could not create stream: %s", pa_strerror(pa_context_errno(u->context)));
341 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
342 }
343 u->connected = true;
344 break;
345 }
346 case PA_CONTEXT_FAILED:
347 pa_log_debug("Context failed with err %s.", pa_strerror(pa_context_errno(u->context)));
348 u->connected = false;
349 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
350 break;
351 case PA_CONTEXT_TERMINATED:
352 pa_log_debug("Context terminated.");
353 u->connected = false;
354 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
355 break;
356 }
357 }
358
source_update_requested_latency_cb(pa_source * s)359 static void source_update_requested_latency_cb(pa_source *s) {
360 struct userdata *u;
361 pa_operation *operation;
362 size_t nbytes;
363 pa_usec_t block_usec;
364 pa_buffer_attr bufferattr;
365
366 pa_source_assert_ref(s);
367 pa_assert_se(u = s->userdata);
368
369 block_usec = pa_source_get_requested_latency_within_thread(s);
370 if (block_usec == (pa_usec_t) -1)
371 block_usec = s->thread_info.max_latency;
372
373 nbytes = pa_usec_to_bytes(block_usec, &s->sample_spec);
374
375 if (u->stream) {
376 switch (pa_stream_get_state(u->stream)) {
377 case PA_STREAM_READY:
378 if (pa_stream_get_buffer_attr(u->stream)->fragsize == nbytes)
379 break;
380
381 reset_bufferattr(&bufferattr);
382 bufferattr.fragsize = nbytes;
383 if ((operation = pa_stream_set_buffer_attr(u->stream, &bufferattr, NULL, NULL)))
384 pa_operation_unref(operation);
385 break;
386 case PA_STREAM_CREATING:
387 /* we have to delay our request until stream is ready */
388 u->update_stream_bufferattr_after_connect = true;
389 break;
390 default:
391 break;
392 }
393 }
394 }
395
source_process_msg_cb(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)396 static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
397 struct userdata *u = PA_SOURCE(o)->userdata;
398
399 switch (code) {
400 case PA_SOURCE_MESSAGE_GET_LATENCY: {
401 int negative;
402 pa_usec_t remote_latency;
403
404 if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state)) {
405 *((int64_t*) data) = 0;
406 return 0;
407 }
408
409 if (!u->stream) {
410 *((int64_t*) data) = 0;
411 return 0;
412 }
413
414 if (pa_stream_get_state(u->stream) != PA_STREAM_READY) {
415 *((int64_t*) data) = 0;
416 return 0;
417 }
418
419 if (pa_stream_get_latency(u->stream, &remote_latency, &negative) < 0) {
420 *((int64_t*) data) = 0;
421 return 0;
422 }
423
424 if (negative)
425 *((int64_t*) data) = - (int64_t)remote_latency;
426 else
427 *((int64_t*) data) = remote_latency;
428
429 return 0;
430 }
431 }
432 return pa_source_process_msg(o, code, data, offset, chunk);
433 }
434
435 /* Called from the IO thread. */
source_set_state_in_io_thread_cb(pa_source * s,pa_source_state_t new_state,pa_suspend_cause_t new_suspend_cause)436 static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state, pa_suspend_cause_t new_suspend_cause) {
437 struct userdata *u;
438
439 pa_assert(s);
440 pa_assert_se(u = s->userdata);
441
442 /* It may be that only the suspend cause is changing, in which case there's
443 * nothing to do. */
444 if (new_state == s->thread_info.state)
445 return 0;
446
447 if (!u->stream || pa_stream_get_state(u->stream) != PA_STREAM_READY)
448 return 0;
449
450 switch (new_state) {
451 case PA_SOURCE_SUSPENDED: {
452 cork_stream(u, true);
453 break;
454 }
455 case PA_SOURCE_IDLE:
456 case PA_SOURCE_RUNNING: {
457 cork_stream(u, false);
458 break;
459 }
460 case PA_SOURCE_INVALID_STATE:
461 case PA_SOURCE_INIT:
462 case PA_SOURCE_UNLINKED:
463 break;
464 }
465
466 return 0;
467 }
468
pa__init(pa_module * m)469 int pa__init(pa_module *m) {
470 struct userdata *u = NULL;
471 pa_modargs *ma = NULL;
472 pa_source_new_data source_data;
473 pa_sample_spec ss;
474 pa_channel_map map;
475 const char *remote_server = NULL;
476 const char *source_name = NULL;
477 char *default_source_name = NULL;
478
479 pa_assert(m);
480
481 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
482 pa_log("Failed to parse module arguments.");
483 goto fail;
484 }
485
486 ss = m->core->default_sample_spec;
487 map = m->core->default_channel_map;
488 if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
489 pa_log("Invalid sample format specification or channel map");
490 goto fail;
491 }
492
493 remote_server = pa_modargs_get_value(ma, "server", NULL);
494 if (!remote_server) {
495 pa_log("No server given!");
496 goto fail;
497 }
498
499 u = pa_xnew0(struct userdata, 1);
500 u->module = m;
501 m->userdata = u;
502 u->remote_server = pa_xstrdup(remote_server);
503 u->thread_mainloop = pa_mainloop_new();
504 if (u->thread_mainloop == NULL) {
505 pa_log("Failed to create mainloop");
506 goto fail;
507 }
508 u->thread_mainloop_api = pa_mainloop_get_api(u->thread_mainloop);
509 u->cookie_file = pa_xstrdup(pa_modargs_get_value(ma, "cookie", NULL));
510 u->remote_source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));
511
512 u->thread_mq = pa_xnew0(pa_thread_mq, 1);
513
514 if (pa_thread_mq_init_thread_mainloop(u->thread_mq, m->core->mainloop, u->thread_mainloop_api) < 0) {
515 pa_log("pa_thread_mq_init_thread_mainloop() failed.");
516 goto fail;
517 }
518
519 /* The rtpoll created here is never run. It is only necessary to avoid crashes
520 * when module-tunnel-source-new is used together with module-loopback.
521 * module-loopback bases the asyncmsq on the rtpoll provided by the source and
522 * only works because it calls pa_asyncmsq_process_one(). */
523 u->rtpoll = pa_rtpoll_new();
524
525 /* Create source */
526 pa_source_new_data_init(&source_data);
527 source_data.driver = __FILE__;
528 source_data.module = m;
529
530 default_source_name = pa_sprintf_malloc("tunnel-source-new.%s", remote_server);
531 source_name = pa_modargs_get_value(ma, "source_name", default_source_name);
532
533 pa_source_new_data_set_name(&source_data, source_name);
534 pa_source_new_data_set_sample_spec(&source_data, &ss);
535 pa_source_new_data_set_channel_map(&source_data, &map);
536
537 pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_CLASS, "sound");
538 pa_proplist_setf(source_data.proplist,
539 PA_PROP_DEVICE_DESCRIPTION,
540 _("Tunnel to %s/%s"),
541 remote_server,
542 pa_strempty(u->remote_source_name));
543
544 if (pa_modargs_get_proplist(ma, "source_properties", source_data.proplist, PA_UPDATE_REPLACE) < 0) {
545 pa_log("Invalid properties");
546 pa_source_new_data_done(&source_data);
547 goto fail;
548 }
549 if (!(u->source = pa_source_new(m->core, &source_data, PA_SOURCE_LATENCY | PA_SOURCE_DYNAMIC_LATENCY | PA_SOURCE_NETWORK))) {
550 pa_log("Failed to create source.");
551 pa_source_new_data_done(&source_data);
552 goto fail;
553 }
554
555 pa_source_new_data_done(&source_data);
556 u->source->userdata = u;
557 u->source->parent.process_msg = source_process_msg_cb;
558 u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb;
559 u->source->update_requested_latency = source_update_requested_latency_cb;
560
561 pa_source_set_asyncmsgq(u->source, u->thread_mq->inq);
562 pa_source_set_rtpoll(u->source, u->rtpoll);
563
564 if (!(u->thread = pa_thread_new("tunnel-source", thread_func, u))) {
565 pa_log("Failed to create thread.");
566 goto fail;
567 }
568
569 pa_source_put(u->source);
570 pa_modargs_free(ma);
571 pa_xfree(default_source_name);
572
573 return 0;
574
575 fail:
576 if (ma)
577 pa_modargs_free(ma);
578
579 if (default_source_name)
580 pa_xfree(default_source_name);
581
582 pa__done(m);
583
584 return -1;
585 }
586
pa__done(pa_module * m)587 void pa__done(pa_module *m) {
588 struct userdata *u;
589
590 pa_assert(m);
591
592 if (!(u = m->userdata))
593 return;
594
595 if (u->source)
596 pa_source_unlink(u->source);
597
598 if (u->thread) {
599 pa_asyncmsgq_send(u->thread_mq->inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
600 pa_thread_free(u->thread);
601 }
602
603 if (u->thread_mq) {
604 pa_thread_mq_done(u->thread_mq);
605 pa_xfree(u->thread_mq);
606 }
607
608 if (u->thread_mainloop)
609 pa_mainloop_free(u->thread_mainloop);
610
611 if (u->cookie_file)
612 pa_xfree(u->cookie_file);
613
614 if (u->remote_source_name)
615 pa_xfree(u->remote_source_name);
616
617 if (u->remote_server)
618 pa_xfree(u->remote_server);
619
620 if (u->source)
621 pa_source_unref(u->source);
622
623 if (u->rtpoll)
624 pa_rtpoll_free(u->rtpoll);
625
626 pa_xfree(u);
627 }
628