• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /***
2     This file is part of PulseAudio.
3 
4     Copyright 2013 Alexander Couzens
5 
6     PulseAudio is free software; you can redistribute it and/or modify
7     it under the terms of the GNU Lesser General Public License as published
8     by the Free Software Foundation; either version 2.1 of the License,
9     or (at your option) any later version.
10 
11     PulseAudio is distributed in the hope that it will be useful, but
12     WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14     General Public License for more details.
15 
16     You should have received a copy of the GNU Lesser General Public License
17     along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
18 ***/
19 
20 #ifdef HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23 
24 #include <pulse/context.h>
25 #include <pulse/timeval.h>
26 #include <pulse/xmalloc.h>
27 #include <pulse/stream.h>
28 #include <pulse/mainloop.h>
29 #include <pulse/introspect.h>
30 #include <pulse/error.h>
31 
32 #include <pulsecore/core.h>
33 #include <pulsecore/core-util.h>
34 #include <pulsecore/i18n.h>
35 #include <pulsecore/source.h>
36 #include <pulsecore/modargs.h>
37 #include <pulsecore/log.h>
38 #include <pulsecore/thread.h>
39 #include <pulsecore/thread-mq.h>
40 #include <pulsecore/poll.h>
41 #include <pulsecore/rtpoll.h>
42 #include <pulsecore/proplist-util.h>
43 
44 PA_MODULE_AUTHOR("Alexander Couzens");
45 PA_MODULE_DESCRIPTION("Create a network source which connects via a stream to a remote PulseAudio server");
46 PA_MODULE_VERSION(PACKAGE_VERSION);
47 PA_MODULE_LOAD_ONCE(false);
48 PA_MODULE_USAGE(
49         "server=<address> "
50         "source=<name of the remote source> "
51         "source_name=<name for the local source> "
52         "source_properties=<properties for the local source> "
53         "format=<sample format> "
54         "channels=<number of channels> "
55         "rate=<sample rate> "
56         "channel_map=<channel map> "
57         "cookie=<cookie file path>"
58         );
59 
60 #define TUNNEL_THREAD_FAILED_MAINLOOP 1
61 
62 static void stream_state_cb(pa_stream *stream, void *userdata);
63 static void stream_read_cb(pa_stream *s, size_t length, void *userdata);
64 static void context_state_cb(pa_context *c, void *userdata);
65 static void source_update_requested_latency_cb(pa_source *s);
66 
67 struct userdata {
68     pa_module *module;
69     pa_source *source;
70     pa_thread *thread;
71     pa_thread_mq *thread_mq;
72     pa_mainloop *thread_mainloop;
73     pa_mainloop_api *thread_mainloop_api;
74 
75     pa_context *context;
76     pa_stream *stream;
77     pa_rtpoll *rtpoll;
78 
79     bool update_stream_bufferattr_after_connect;
80     bool connected;
81     bool new_data;
82 
83     char *cookie_file;
84     char *remote_server;
85     char *remote_source_name;
86 };
87 
88 static const char* const valid_modargs[] = {
89     "source_name",
90     "source_properties",
91     "server",
92     "source",
93     "format",
94     "channels",
95     "rate",
96     "channel_map",
97     "cookie",
98    /* "reconnect", reconnect if server comes back again - unimplemented */
99     NULL,
100 };
101 
cork_stream(struct userdata * u,bool cork)102 static void cork_stream(struct userdata *u, bool cork) {
103     pa_operation *operation;
104 
105     pa_assert(u);
106     pa_assert(u->stream);
107 
108     if ((operation = pa_stream_cork(u->stream, cork, NULL, NULL)))
109         pa_operation_unref(operation);
110 }
111 
reset_bufferattr(pa_buffer_attr * bufferattr)112 static void reset_bufferattr(pa_buffer_attr *bufferattr) {
113     pa_assert(bufferattr);
114     bufferattr->fragsize = (uint32_t) -1;
115     bufferattr->minreq = (uint32_t) -1;
116     bufferattr->maxlength = (uint32_t) -1;
117     bufferattr->prebuf = (uint32_t) -1;
118     bufferattr->tlength = (uint32_t) -1;
119 }
120 
tunnel_new_proplist(struct userdata * u)121 static pa_proplist* tunnel_new_proplist(struct userdata *u) {
122     pa_proplist *proplist = pa_proplist_new();
123     pa_assert(proplist);
124     pa_proplist_sets(proplist, PA_PROP_APPLICATION_NAME, "PulseAudio");
125     pa_proplist_sets(proplist, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
126     pa_proplist_sets(proplist, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
127     pa_init_proplist(proplist);
128 
129     return proplist;
130 }
131 
stream_read_cb(pa_stream * s,size_t length,void * userdata)132 static void stream_read_cb(pa_stream *s, size_t length, void *userdata) {
133     struct userdata *u = userdata;
134     u->new_data = true;
135 }
136 
137 /* called from io context to read samples from the stream into our source */
read_new_samples(struct userdata * u)138 static void read_new_samples(struct userdata *u) {
139     const void *p;
140     size_t readable = 0;
141     pa_memchunk memchunk;
142 
143     pa_assert(u);
144     u->new_data = false;
145 
146     pa_memchunk_reset(&memchunk);
147 
148     if (PA_UNLIKELY(!u->connected || pa_stream_get_state(u->stream) != PA_STREAM_READY))
149         return;
150 
151     readable = pa_stream_readable_size(u->stream);
152     while (readable > 0) {
153         size_t nbytes = 0;
154         if (PA_UNLIKELY(pa_stream_peek(u->stream, &p, &nbytes) != 0)) {
155             pa_log("pa_stream_peek() failed: %s", pa_strerror(pa_context_errno(u->context)));
156             u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
157             return;
158         }
159 
160         if (PA_LIKELY(p)) {
161             /* we have valid data */
162             memchunk.memblock = pa_memblock_new_fixed(u->module->core->mempool, (void *) p, nbytes, true);
163             memchunk.length = nbytes;
164             memchunk.index = 0;
165 
166             pa_source_post(u->source, &memchunk);
167             pa_memblock_unref_fixed(memchunk.memblock);
168         } else {
169             size_t bytes_to_generate = nbytes;
170 
171             /* we have a hole. generate silence */
172             memchunk = u->source->silence;
173             pa_memblock_ref(memchunk.memblock);
174 
175             while (bytes_to_generate > 0) {
176                 if (bytes_to_generate < memchunk.length)
177                     memchunk.length = bytes_to_generate;
178 
179                 pa_source_post(u->source, &memchunk);
180                 bytes_to_generate -= memchunk.length;
181             }
182 
183             pa_memblock_unref(memchunk.memblock);
184         }
185 
186         pa_stream_drop(u->stream);
187         readable -= nbytes;
188     }
189 }
190 
thread_func(void * userdata)191 static void thread_func(void *userdata) {
192     struct userdata *u = userdata;
193     pa_proplist *proplist;
194 
195     pa_assert(u);
196 
197     pa_log_debug("Thread starting up");
198     pa_thread_mq_install(u->thread_mq);
199 
200     proplist = tunnel_new_proplist(u);
201     u->context = pa_context_new_with_proplist(u->thread_mainloop_api,
202                                               "PulseAudio",
203                                               proplist);
204     pa_proplist_free(proplist);
205 
206     if (!u->context) {
207         pa_log("Failed to create libpulse context");
208         goto fail;
209     }
210 
211     if (u->cookie_file && pa_context_load_cookie_from_file(u->context, u->cookie_file) != 0) {
212         pa_log_error("Can not load cookie file!");
213         goto fail;
214     }
215 
216     pa_context_set_state_callback(u->context, context_state_cb, u);
217     if (pa_context_connect(u->context,
218                            u->remote_server,
219                            PA_CONTEXT_NOAUTOSPAWN,
220                            NULL) < 0) {
221         pa_log("Failed to connect libpulse context: %s", pa_strerror(pa_context_errno(u->context)));
222         goto fail;
223     }
224 
225     for (;;) {
226         int ret;
227 
228         if (pa_mainloop_iterate(u->thread_mainloop, 1, &ret) < 0) {
229             if (ret == 0)
230                 goto finish;
231             else
232                 goto fail;
233         }
234 
235         if (u->new_data)
236             read_new_samples(u);
237     }
238 fail:
239     pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->module->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
240     pa_asyncmsgq_wait_for(u->thread_mq->inq, PA_MESSAGE_SHUTDOWN);
241 
242 finish:
243     if (u->stream) {
244         pa_stream_disconnect(u->stream);
245         pa_stream_unref(u->stream);
246         u->stream = NULL;
247     }
248 
249     if (u->context) {
250         pa_context_disconnect(u->context);
251         pa_context_unref(u->context);
252         u->context = NULL;
253     }
254 
255     pa_log_debug("Thread shutting down");
256 }
257 
stream_state_cb(pa_stream * stream,void * userdata)258 static void stream_state_cb(pa_stream *stream, void *userdata) {
259     struct userdata *u = userdata;
260 
261     pa_assert(u);
262 
263     switch (pa_stream_get_state(stream)) {
264         case PA_STREAM_FAILED:
265             pa_log_error("Stream failed: %s", pa_strerror(pa_context_errno(u->context)));
266             u->connected = false;
267             u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
268             break;
269         case PA_STREAM_TERMINATED:
270             pa_log_debug("Stream terminated.");
271             break;
272         case PA_STREAM_READY:
273             if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
274                 cork_stream(u, false);
275 
276             /* Only call our requested_latency_cb when requested_latency
277              * changed between PA_STREAM_CREATING -> PA_STREAM_READY, because
278              * we don't want to override the initial fragsize set by the server
279              * without a good reason. */
280             if (u->update_stream_bufferattr_after_connect)
281                 source_update_requested_latency_cb(u->source);
282         case PA_STREAM_UNCONNECTED:
283         case PA_STREAM_CREATING:
284             break;
285     }
286 }
287 
context_state_cb(pa_context * c,void * userdata)288 static void context_state_cb(pa_context *c, void *userdata) {
289     struct userdata *u = userdata;
290     pa_assert(u);
291 
292     switch (pa_context_get_state(c)) {
293         case PA_CONTEXT_UNCONNECTED:
294         case PA_CONTEXT_CONNECTING:
295         case PA_CONTEXT_AUTHORIZING:
296         case PA_CONTEXT_SETTING_NAME:
297             break;
298         case PA_CONTEXT_READY: {
299             pa_proplist *proplist;
300             pa_buffer_attr bufferattr;
301             pa_usec_t requested_latency;
302             char *username = pa_get_user_name_malloc();
303             char *hostname = pa_get_host_name_malloc();
304             /* TODO: old tunnel put here the remote source_name into stream name e.g. 'Null Output for lynxis@lazus' */
305             char *stream_name = pa_sprintf_malloc(_("Tunnel for %s@%s"), username, hostname);
306             pa_xfree(username);
307             pa_xfree(hostname);
308 
309             pa_log_debug("Connection successful. Creating stream.");
310             pa_assert(!u->stream);
311 
312             proplist = tunnel_new_proplist(u);
313             u->stream = pa_stream_new_with_proplist(u->context,
314                                                     stream_name,
315                                                     &u->source->sample_spec,
316                                                     &u->source->channel_map,
317                                                     proplist);
318             pa_proplist_free(proplist);
319             pa_xfree(stream_name);
320 
321             if (!u->stream) {
322                 pa_log_error("Could not create a stream: %s", pa_strerror(pa_context_errno(u->context)));
323                 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
324                 return;
325             }
326 
327             requested_latency = pa_source_get_requested_latency_within_thread(u->source);
328             if (requested_latency == (uint32_t) -1)
329                 requested_latency = u->source->thread_info.max_latency;
330 
331             reset_bufferattr(&bufferattr);
332             bufferattr.fragsize = pa_usec_to_bytes(requested_latency, &u->source->sample_spec);
333 
334             pa_stream_set_state_callback(u->stream, stream_state_cb, userdata);
335             pa_stream_set_read_callback(u->stream, stream_read_cb, userdata);
336             if (pa_stream_connect_record(u->stream,
337                                          u->remote_source_name,
338                                          &bufferattr,
339                                          PA_STREAM_INTERPOLATE_TIMING|PA_STREAM_DONT_MOVE|PA_STREAM_AUTO_TIMING_UPDATE|PA_STREAM_START_CORKED) < 0) {
340                 pa_log_debug("Could not create stream: %s", pa_strerror(pa_context_errno(u->context)));
341                 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
342             }
343             u->connected = true;
344             break;
345         }
346         case PA_CONTEXT_FAILED:
347             pa_log_debug("Context failed with err %s.", pa_strerror(pa_context_errno(u->context)));
348             u->connected = false;
349             u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
350             break;
351         case PA_CONTEXT_TERMINATED:
352             pa_log_debug("Context terminated.");
353             u->connected = false;
354             u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
355             break;
356     }
357 }
358 
source_update_requested_latency_cb(pa_source * s)359 static void source_update_requested_latency_cb(pa_source *s) {
360     struct userdata *u;
361     pa_operation *operation;
362     size_t nbytes;
363     pa_usec_t block_usec;
364     pa_buffer_attr bufferattr;
365 
366     pa_source_assert_ref(s);
367     pa_assert_se(u = s->userdata);
368 
369     block_usec = pa_source_get_requested_latency_within_thread(s);
370     if (block_usec == (pa_usec_t) -1)
371         block_usec = s->thread_info.max_latency;
372 
373     nbytes = pa_usec_to_bytes(block_usec, &s->sample_spec);
374 
375     if (u->stream) {
376         switch (pa_stream_get_state(u->stream)) {
377             case PA_STREAM_READY:
378                 if (pa_stream_get_buffer_attr(u->stream)->fragsize == nbytes)
379                     break;
380 
381                 reset_bufferattr(&bufferattr);
382                 bufferattr.fragsize = nbytes;
383                 if ((operation = pa_stream_set_buffer_attr(u->stream, &bufferattr, NULL, NULL)))
384                     pa_operation_unref(operation);
385                 break;
386             case PA_STREAM_CREATING:
387                 /* we have to delay our request until stream is ready */
388                 u->update_stream_bufferattr_after_connect = true;
389                 break;
390             default:
391                 break;
392         }
393     }
394 }
395 
source_process_msg_cb(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)396 static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
397     struct userdata *u = PA_SOURCE(o)->userdata;
398 
399     switch (code) {
400         case PA_SOURCE_MESSAGE_GET_LATENCY: {
401             int negative;
402             pa_usec_t remote_latency;
403 
404             if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state)) {
405                 *((int64_t*) data) = 0;
406                 return 0;
407             }
408 
409             if (!u->stream) {
410                 *((int64_t*) data) = 0;
411                 return 0;
412             }
413 
414             if (pa_stream_get_state(u->stream) != PA_STREAM_READY) {
415                 *((int64_t*) data) = 0;
416                 return 0;
417             }
418 
419             if (pa_stream_get_latency(u->stream, &remote_latency, &negative) < 0) {
420                 *((int64_t*) data) = 0;
421                 return 0;
422             }
423 
424             if (negative)
425                 *((int64_t*) data) = - (int64_t)remote_latency;
426             else
427                 *((int64_t*) data) = remote_latency;
428 
429             return 0;
430         }
431     }
432     return pa_source_process_msg(o, code, data, offset, chunk);
433 }
434 
435 /* Called from the IO thread. */
source_set_state_in_io_thread_cb(pa_source * s,pa_source_state_t new_state,pa_suspend_cause_t new_suspend_cause)436 static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state, pa_suspend_cause_t new_suspend_cause) {
437     struct userdata *u;
438 
439     pa_assert(s);
440     pa_assert_se(u = s->userdata);
441 
442     /* It may be that only the suspend cause is changing, in which case there's
443      * nothing to do. */
444     if (new_state == s->thread_info.state)
445         return 0;
446 
447     if (!u->stream || pa_stream_get_state(u->stream) != PA_STREAM_READY)
448         return 0;
449 
450     switch (new_state) {
451         case PA_SOURCE_SUSPENDED: {
452             cork_stream(u, true);
453             break;
454         }
455         case PA_SOURCE_IDLE:
456         case PA_SOURCE_RUNNING: {
457             cork_stream(u, false);
458             break;
459         }
460         case PA_SOURCE_INVALID_STATE:
461         case PA_SOURCE_INIT:
462         case PA_SOURCE_UNLINKED:
463             break;
464     }
465 
466     return 0;
467 }
468 
pa__init(pa_module * m)469 int pa__init(pa_module *m) {
470     struct userdata *u = NULL;
471     pa_modargs *ma = NULL;
472     pa_source_new_data source_data;
473     pa_sample_spec ss;
474     pa_channel_map map;
475     const char *remote_server = NULL;
476     const char *source_name = NULL;
477     char *default_source_name = NULL;
478 
479     pa_assert(m);
480 
481     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
482         pa_log("Failed to parse module arguments.");
483         goto fail;
484     }
485 
486     ss = m->core->default_sample_spec;
487     map = m->core->default_channel_map;
488     if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
489         pa_log("Invalid sample format specification or channel map");
490         goto fail;
491     }
492 
493     remote_server = pa_modargs_get_value(ma, "server", NULL);
494     if (!remote_server) {
495         pa_log("No server given!");
496         goto fail;
497     }
498 
499     u = pa_xnew0(struct userdata, 1);
500     u->module = m;
501     m->userdata = u;
502     u->remote_server = pa_xstrdup(remote_server);
503     u->thread_mainloop = pa_mainloop_new();
504     if (u->thread_mainloop == NULL) {
505         pa_log("Failed to create mainloop");
506         goto fail;
507     }
508     u->thread_mainloop_api = pa_mainloop_get_api(u->thread_mainloop);
509     u->cookie_file = pa_xstrdup(pa_modargs_get_value(ma, "cookie", NULL));
510     u->remote_source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));
511 
512     u->thread_mq = pa_xnew0(pa_thread_mq, 1);
513 
514     if (pa_thread_mq_init_thread_mainloop(u->thread_mq, m->core->mainloop, u->thread_mainloop_api) < 0) {
515         pa_log("pa_thread_mq_init_thread_mainloop() failed.");
516         goto fail;
517     }
518 
519     /* The rtpoll created here is never run. It is only necessary to avoid crashes
520      * when module-tunnel-source-new is used together with module-loopback.
521      * module-loopback bases the asyncmsq on the rtpoll provided by the source and
522      * only works because it calls pa_asyncmsq_process_one(). */
523     u->rtpoll = pa_rtpoll_new();
524 
525     /* Create source */
526     pa_source_new_data_init(&source_data);
527     source_data.driver = __FILE__;
528     source_data.module = m;
529 
530     default_source_name = pa_sprintf_malloc("tunnel-source-new.%s", remote_server);
531     source_name = pa_modargs_get_value(ma, "source_name", default_source_name);
532 
533     pa_source_new_data_set_name(&source_data, source_name);
534     pa_source_new_data_set_sample_spec(&source_data, &ss);
535     pa_source_new_data_set_channel_map(&source_data, &map);
536 
537     pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_CLASS, "sound");
538     pa_proplist_setf(source_data.proplist,
539                      PA_PROP_DEVICE_DESCRIPTION,
540                      _("Tunnel to %s/%s"),
541                      remote_server,
542                      pa_strempty(u->remote_source_name));
543 
544     if (pa_modargs_get_proplist(ma, "source_properties", source_data.proplist, PA_UPDATE_REPLACE) < 0) {
545         pa_log("Invalid properties");
546         pa_source_new_data_done(&source_data);
547         goto fail;
548     }
549     if (!(u->source = pa_source_new(m->core, &source_data, PA_SOURCE_LATENCY | PA_SOURCE_DYNAMIC_LATENCY | PA_SOURCE_NETWORK))) {
550         pa_log("Failed to create source.");
551         pa_source_new_data_done(&source_data);
552         goto fail;
553     }
554 
555     pa_source_new_data_done(&source_data);
556     u->source->userdata = u;
557     u->source->parent.process_msg = source_process_msg_cb;
558     u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb;
559     u->source->update_requested_latency = source_update_requested_latency_cb;
560 
561     pa_source_set_asyncmsgq(u->source, u->thread_mq->inq);
562     pa_source_set_rtpoll(u->source, u->rtpoll);
563 
564     if (!(u->thread = pa_thread_new("tunnel-source", thread_func, u))) {
565         pa_log("Failed to create thread.");
566         goto fail;
567     }
568 
569     pa_source_put(u->source);
570     pa_modargs_free(ma);
571     pa_xfree(default_source_name);
572 
573     return 0;
574 
575 fail:
576     if (ma)
577         pa_modargs_free(ma);
578 
579     if (default_source_name)
580         pa_xfree(default_source_name);
581 
582     pa__done(m);
583 
584     return -1;
585 }
586 
pa__done(pa_module * m)587 void pa__done(pa_module *m) {
588     struct userdata *u;
589 
590     pa_assert(m);
591 
592     if (!(u = m->userdata))
593         return;
594 
595     if (u->source)
596         pa_source_unlink(u->source);
597 
598     if (u->thread) {
599         pa_asyncmsgq_send(u->thread_mq->inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
600         pa_thread_free(u->thread);
601     }
602 
603     if (u->thread_mq) {
604         pa_thread_mq_done(u->thread_mq);
605         pa_xfree(u->thread_mq);
606     }
607 
608     if (u->thread_mainloop)
609         pa_mainloop_free(u->thread_mainloop);
610 
611     if (u->cookie_file)
612         pa_xfree(u->cookie_file);
613 
614     if (u->remote_source_name)
615         pa_xfree(u->remote_source_name);
616 
617     if (u->remote_server)
618         pa_xfree(u->remote_server);
619 
620     if (u->source)
621         pa_source_unref(u->source);
622 
623     if (u->rtpoll)
624         pa_rtpoll_free(u->rtpoll);
625 
626     pa_xfree(u);
627 }
628