1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2008 Colin Guthrie
6 Copyright 2013 Hajime Fujita
7 Copyright 2013 Martin Blanchard
8
9 PulseAudio is free software; you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License as published
11 by the Free Software Foundation; either version 2.1 of the License,
12 or (at your option) any later version.
13
14 PulseAudio is distributed in the hope that it will be useful, but
15 WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 General Public License for more details.
18
19 You should have received a copy of the GNU Lesser General Public License
20 along with PulseAudio; if not, write to the Free Software
21 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
22 USA.
23 ***/
24
25 #ifdef HAVE_CONFIG_H
26 #include <config.h>
27 #endif
28
29 #include <stdlib.h>
30 #include <stdio.h>
31 #include <errno.h>
32 #include <string.h>
33 #include <unistd.h>
34 #include <sys/socket.h>
35 #include <netinet/in.h>
36 #include <netinet/tcp.h>
37 #include <sys/ioctl.h>
38
39 #ifdef HAVE_LINUX_SOCKIOS_H
40 #include <linux/sockios.h>
41 #endif
42
43 #include <pulse/rtclock.h>
44 #include <pulse/timeval.h>
45 #include <pulse/volume.h>
46 #include <pulse/xmalloc.h>
47
48 #include <pulsecore/core.h>
49 #include <pulsecore/i18n.h>
50 #include <pulsecore/module.h>
51 #include <pulsecore/memchunk.h>
52 #include <pulsecore/sink.h>
53 #include <pulsecore/modargs.h>
54 #include <pulsecore/core-error.h>
55 #include <pulsecore/core-util.h>
56 #include <pulsecore/log.h>
57 #include <pulsecore/macro.h>
58 #include <pulsecore/thread.h>
59 #include <pulsecore/thread-mq.h>
60 #include <pulsecore/poll.h>
61 #include <pulsecore/rtpoll.h>
62 #include <pulsecore/core-rtclock.h>
63 #include <pulsecore/time-smoother.h>
64
65 #include "raop-sink.h"
66 #include "raop-client.h"
67 #include "raop-util.h"
68
69 #define UDP_TIMING_PACKET_LOSS_MAX (30 * PA_USEC_PER_SEC)
70 #define UDP_TIMING_PACKET_DISCONNECT_CYCLE 3
71
72 struct userdata {
73 pa_core *core;
74 pa_module *module;
75 pa_sink *sink;
76 pa_card *card;
77
78 pa_thread *thread;
79 pa_thread_mq thread_mq;
80 pa_rtpoll *rtpoll;
81 pa_rtpoll_item *rtpoll_item;
82 bool oob;
83
84 pa_raop_client *raop;
85 char *server;
86 pa_raop_protocol_t protocol;
87 pa_raop_encryption_t encryption;
88 pa_raop_codec_t codec;
89 bool autoreconnect;
90 /* if true, behaves like a null-sink when disconnected */
91 bool autonull;
92
93 size_t block_size;
94 pa_usec_t block_usec;
95 pa_memchunk memchunk;
96
97 pa_usec_t delay;
98 pa_usec_t start;
99 pa_smoother *smoother;
100 uint64_t write_count;
101
102 uint32_t latency;
103 /* Consider as first I/O thread iteration, can be switched to true in autoreconnect mode */
104 bool first;
105 };
106
107 enum {
108 PA_SINK_MESSAGE_SET_RAOP_STATE = PA_SINK_MESSAGE_MAX,
109 PA_SINK_MESSAGE_DISCONNECT_REQUEST
110 };
111
112 static void userdata_free(struct userdata *u);
113
114 static void sink_set_volume_cb(pa_sink *s);
115
raop_state_cb(pa_raop_state_t state,void * userdata)116 static void raop_state_cb(pa_raop_state_t state, void *userdata) {
117 struct userdata *u = userdata;
118
119 pa_assert(u);
120
121 pa_log_debug("State change received, informing IO thread...");
122
123 pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), PA_SINK_MESSAGE_SET_RAOP_STATE, PA_INT_TO_PTR(state), 0, NULL, NULL);
124 }
125
sink_get_latency(const struct userdata * u)126 static int64_t sink_get_latency(const struct userdata *u) {
127 pa_usec_t now;
128 int64_t latency;
129
130 pa_assert(u);
131 pa_assert(u->smoother);
132
133 now = pa_rtclock_now();
134 now = pa_smoother_get(u->smoother, now);
135
136 latency = pa_bytes_to_usec(u->write_count, &u->sink->sample_spec) - (int64_t) now;
137
138 /* RAOP default latency */
139 latency += u->latency * PA_USEC_PER_MSEC;
140
141 return latency;
142 }
143
sink_process_msg(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)144 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
145 struct userdata *u = PA_SINK(o)->userdata;
146
147 pa_assert(u);
148 pa_assert(u->raop);
149
150 switch (code) {
151 /* Exception : for this message, we are in main thread, msg sent from the IO/thread
152 Done here, as alloc/free of rtsp_client is also done in this thread for other cases */
153 case PA_SINK_MESSAGE_DISCONNECT_REQUEST: {
154 if (u->sink->state == PA_SINK_RUNNING) {
155 /* Disconnect raop client, and restart the whole chain since
156 * the authentication token might be outdated */
157 pa_raop_client_disconnect(u->raop);
158 pa_raop_client_authenticate(u->raop, NULL);
159 }
160
161 return 0;
162 }
163
164 case PA_SINK_MESSAGE_GET_LATENCY: {
165 int64_t r = 0;
166
167 if (u->autonull || pa_raop_client_can_stream(u->raop))
168 r = sink_get_latency(u);
169
170 *((int64_t*) data) = r;
171
172 return 0;
173 }
174
175 case PA_SINK_MESSAGE_SET_RAOP_STATE: {
176 switch ((pa_raop_state_t) PA_PTR_TO_UINT(data)) {
177 case PA_RAOP_AUTHENTICATED: {
178 if (!pa_raop_client_is_authenticated(u->raop)) {
179 pa_module_unload_request(u->module, true);
180 }
181
182 if (u->autoreconnect && u->sink->state == PA_SINK_RUNNING) {
183 pa_usec_t now;
184 now = pa_rtclock_now();
185 pa_smoother_reset(u->smoother, now, false);
186
187 if (!pa_raop_client_is_alive(u->raop)) {
188 /* Connecting will trigger a RECORD and start steaming */
189 pa_raop_client_announce(u->raop);
190 }
191 }
192
193 return 0;
194 }
195
196 case PA_RAOP_CONNECTED: {
197 pa_assert(!u->rtpoll_item);
198
199 u->oob = pa_raop_client_register_pollfd(u->raop, u->rtpoll, &u->rtpoll_item);
200
201 return 0;
202 }
203
204 case PA_RAOP_RECORDING: {
205 pa_usec_t now;
206
207 now = pa_rtclock_now();
208 u->write_count = 0;
209 u->start = now;
210 u->first = true;
211 pa_rtpoll_set_timer_absolute(u->rtpoll, now);
212
213 if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
214 /* Our stream has been suspended so we just flush it... */
215 pa_rtpoll_set_timer_disabled(u->rtpoll);
216 pa_raop_client_flush(u->raop);
217 } else {
218 /* Set the initial volume */
219 sink_set_volume_cb(u->sink);
220 pa_sink_process_msg(o, PA_SINK_MESSAGE_SET_VOLUME, data, offset, chunk);
221 }
222
223 return 0;
224 }
225
226 case PA_RAOP_INVALID_STATE:
227 case PA_RAOP_DISCONNECTED: {
228 unsigned int nbfds = 0;
229 struct pollfd *pollfd;
230 unsigned int i;
231
232 if (u->rtpoll_item) {
233 pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, &nbfds);
234 if (pollfd) {
235 for (i = 0; i < nbfds; i++) {
236 if (pollfd->fd >= 0)
237 pa_close(pollfd->fd);
238 pollfd++;
239 }
240 }
241 pa_rtpoll_item_free(u->rtpoll_item);
242 u->rtpoll_item = NULL;
243 }
244
245 if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
246 pa_rtpoll_set_timer_disabled(u->rtpoll);
247
248 return 0;
249 }
250
251 if (u->autoreconnect) {
252 if (u->sink->thread_info.state != PA_SINK_IDLE) {
253 if (!u->autonull)
254 pa_rtpoll_set_timer_disabled(u->rtpoll);
255 pa_raop_client_authenticate(u->raop, NULL);
256 }
257 } else {
258 if (u->sink->thread_info.state != PA_SINK_IDLE)
259 pa_module_unload_request(u->module, true);
260 }
261
262 return 0;
263 }
264 }
265
266 return 0;
267 }
268 }
269
270 return pa_sink_process_msg(o, code, data, offset, chunk);
271 }
272
273 /* Called from the IO thread. */
sink_set_state_in_io_thread_cb(pa_sink * s,pa_sink_state_t new_state,pa_suspend_cause_t new_suspend_cause)274 static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state, pa_suspend_cause_t new_suspend_cause) {
275 struct userdata *u;
276
277 pa_assert(s);
278 pa_assert_se(u = s->userdata);
279
280 /* It may be that only the suspend cause is changing, in which case there's
281 * nothing to do. */
282 if (new_state == s->thread_info.state)
283 return 0;
284
285 switch (new_state) {
286 case PA_SINK_SUSPENDED:
287 pa_log_debug("RAOP: SUSPENDED");
288
289 pa_assert(PA_SINK_IS_OPENED(s->thread_info.state));
290
291 /* Issue a TEARDOWN if we are still connected */
292 if (pa_raop_client_is_alive(u->raop)) {
293 pa_raop_client_teardown(u->raop);
294 }
295
296 break;
297
298 case PA_SINK_IDLE:
299 pa_log_debug("RAOP: IDLE");
300
301 /* Issue a FLUSH if we're coming from running state */
302 if (s->thread_info.state == PA_SINK_RUNNING) {
303 pa_rtpoll_set_timer_disabled(u->rtpoll);
304 pa_raop_client_flush(u->raop);
305 }
306
307 break;
308
309 case PA_SINK_RUNNING: {
310 pa_usec_t now;
311
312 pa_log_debug("RAOP: RUNNING");
313
314 now = pa_rtclock_now();
315 pa_smoother_reset(u->smoother, now, false);
316
317 /* If autonull is enabled, I/O thread is always eating chunks since
318 * it is emulating a null sink */
319 if (u->autonull) {
320 u->start = now;
321 u->write_count = 0;
322 u->first = true;
323 pa_rtpoll_set_timer_absolute(u->rtpoll, now);
324 }
325
326 if (!pa_raop_client_is_alive(u->raop)) {
327 /* Connecting will trigger a RECORD and start streaming */
328 pa_raop_client_announce(u->raop);
329 } else if (!pa_raop_client_is_recording(u->raop)) {
330 /* RECORD alredy sent, simply start streaming */
331 pa_raop_client_stream(u->raop);
332 pa_rtpoll_set_timer_absolute(u->rtpoll, now);
333 u->write_count = 0;
334 u->start = now;
335 }
336
337 break;
338 }
339
340 case PA_SINK_UNLINKED:
341 case PA_SINK_INIT:
342 case PA_SINK_INVALID_STATE:
343 break;
344 }
345
346 return 0;
347 }
348
sink_set_volume_cb(pa_sink * s)349 static void sink_set_volume_cb(pa_sink *s) {
350 struct userdata *u = s->userdata;
351 pa_cvolume hw;
352 pa_volume_t v, v_orig;
353 char t[PA_CVOLUME_SNPRINT_VERBOSE_MAX];
354
355 pa_assert(u);
356
357 /* If we're muted we don't need to do anything. */
358 if (s->muted)
359 return;
360
361 /* Calculate the max volume of all channels.
362 * We'll use this as our (single) volume on the APEX device and emulate
363 * any variation in channel volumes in software. */
364 v = pa_cvolume_max(&s->real_volume);
365
366 v_orig = v;
367 v = pa_raop_client_adjust_volume(u->raop, v_orig);
368
369 pa_log_debug("Volume adjusted: orig=%u adjusted=%u", v_orig, v);
370
371 /* Create a pa_cvolume version of our single value. */
372 pa_cvolume_set(&hw, s->sample_spec.channels, v);
373
374 /* Perform any software manipulation of the volume needed. */
375 pa_sw_cvolume_divide(&s->soft_volume, &s->real_volume, &hw);
376
377 pa_log_debug("Requested volume: %s", pa_cvolume_snprint_verbose(t, sizeof(t), &s->real_volume, &s->channel_map, false));
378 pa_log_debug("Got hardware volume: %s", pa_cvolume_snprint_verbose(t, sizeof(t), &hw, &s->channel_map, false));
379 pa_log_debug("Calculated software volume: %s",
380 pa_cvolume_snprint_verbose(t, sizeof(t), &s->soft_volume, &s->channel_map, true));
381
382 /* Any necessary software volume manipulation is done so set
383 * our hw volume (or v as a single value) on the device. */
384 pa_raop_client_set_volume(u->raop, v);
385 }
386
sink_set_mute_cb(pa_sink * s)387 static void sink_set_mute_cb(pa_sink *s) {
388 struct userdata *u = s->userdata;
389
390 pa_assert(u);
391 pa_assert(u->raop);
392
393 if (s->muted) {
394 pa_raop_client_set_volume(u->raop, PA_VOLUME_MUTED);
395 } else {
396 sink_set_volume_cb(s);
397 }
398 }
399
thread_func(void * userdata)400 static void thread_func(void *userdata) {
401 struct userdata *u = userdata;
402 size_t offset = 0;
403 pa_usec_t last_timing = 0;
404 uint32_t check_timing_count = 1;
405 pa_usec_t intvl = 0;
406
407 pa_assert(u);
408
409 pa_log_debug("Thread starting up");
410
411 pa_thread_mq_install(&u->thread_mq);
412 pa_smoother_set_time_offset(u->smoother, pa_rtclock_now());
413
414 for (;;) {
415 struct pollfd *pollfd = NULL;
416 unsigned int i, nbfds = 0;
417 pa_usec_t now, estimated;
418 uint64_t position;
419 size_t index;
420 int ret;
421 bool canstream, sendstream, on_timeout;
422
423 /* Polling (audio data + control socket + timing socket). */
424 if ((ret = pa_rtpoll_run(u->rtpoll)) < 0)
425 goto fail;
426 else if (ret == 0)
427 goto finish;
428
429 if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
430 if (u->sink->thread_info.rewind_requested)
431 pa_sink_process_rewind(u->sink, 0);
432 }
433
434 on_timeout = pa_rtpoll_timer_elapsed(u->rtpoll);
435 if (u->rtpoll_item) {
436 pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, &nbfds);
437 /* If !oob: streaming driven by pollds (POLLOUT) */
438 if (pollfd && !u->oob && !pollfd->revents) {
439 for (i = 0; i < nbfds; i++) {
440 pollfd->events = POLLOUT;
441 pollfd->revents = 0;
442
443 pollfd++;
444 }
445
446 continue;
447 }
448
449 /* if oob: streaming managed by timing, pollfd for oob sockets */
450 if (pollfd && u->oob && !on_timeout) {
451 uint8_t packet[32];
452 ssize_t read;
453
454 for (i = 0; i < nbfds; i++) {
455 if (pollfd->revents & POLLERR) {
456 if (u->autoreconnect && pa_raop_client_is_alive(u->raop)) {
457 pollfd->revents = 0;
458 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink),
459 PA_SINK_MESSAGE_DISCONNECT_REQUEST, 0, 0, NULL, NULL);
460 continue;
461 }
462
463 /* one of UDP fds is in faulty state, may have been disconnected, this is fatal */
464 goto fail;
465 }
466 if (pollfd->revents & pollfd->events) {
467 pollfd->revents = 0;
468 read = pa_read(pollfd->fd, packet, sizeof(packet), NULL);
469 pa_raop_client_handle_oob_packet(u->raop, pollfd->fd, packet, read);
470 if (pa_raop_client_is_timing_fd(u->raop, pollfd->fd)) {
471 last_timing = pa_rtclock_now();
472 check_timing_count = 1;
473 }
474 }
475
476 pollfd++;
477 }
478
479 continue;
480 }
481 }
482
483 if (u->sink->thread_info.state != PA_SINK_RUNNING) {
484 continue;
485 }
486
487 if (u->first) {
488 last_timing = 0;
489 check_timing_count = 1;
490 intvl = 0;
491 u->first = false;
492 }
493
494 canstream = pa_raop_client_can_stream(u->raop);
495 now = pa_rtclock_now();
496
497 if (u->oob && u->autoreconnect && on_timeout) {
498 if (!canstream) {
499 last_timing = 0;
500 } else if (last_timing != 0) {
501 pa_usec_t since = now - last_timing;
502 /* Incoming Timing packets should be received every 3 seconds in UDP mode
503 according to raop specifications.
504 Here we disconnect if no packet received since UDP_TIMING_PACKET_LOSS_MAX seconds
505 We only detect timing packet requests interruptions (we do nothing if no packet received at all), since some clients do not implement RTCP Timing requests at all */
506
507 if (since > (UDP_TIMING_PACKET_LOSS_MAX/UDP_TIMING_PACKET_DISCONNECT_CYCLE)*check_timing_count) {
508 if (check_timing_count < UDP_TIMING_PACKET_DISCONNECT_CYCLE) {
509 uint32_t since_in_sec = since / PA_USEC_PER_SEC;
510 pa_log_warn(
511 "UDP Timing Packets Warn #%d/%d- Nothing received since %d seconds from %s",
512 check_timing_count,
513 UDP_TIMING_PACKET_DISCONNECT_CYCLE-1, since_in_sec, u->server);
514 check_timing_count++;
515 } else {
516 /* Limit reached, then request disconnect */
517 check_timing_count = 1;
518 last_timing = 0;
519 if (pa_raop_client_is_alive(u->raop)) {
520 pa_log_warn("UDP Timing Packets Warn limit reached - Requesting reconnect");
521 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink),
522 PA_SINK_MESSAGE_DISCONNECT_REQUEST, 0, 0, NULL, NULL);
523 continue;
524 }
525 }
526 }
527 }
528 }
529
530 if (!u->autonull) {
531 if (!canstream) {
532 pa_log_debug("Can't stream, connection not established yet...");
533 continue;
534 }
535 /* This assertion is meant to silence a complaint from Coverity about
536 * pollfd being possibly NULL when we access it later. That's a false
537 * positive, because we check pa_raop_client_can_stream() above, and if
538 * that returns true, it means that the connection is up, and when the
539 * connection is up, pollfd will be non-NULL. */
540 pa_assert(pollfd);
541 }
542
543 if (u->memchunk.length <= 0) {
544 if (intvl < now + u->block_usec) {
545 if (u->memchunk.memblock)
546 pa_memblock_unref(u->memchunk.memblock);
547 pa_memchunk_reset(&u->memchunk);
548
549 /* Grab unencoded audio data from PulseAudio */
550 pa_sink_render_full(u->sink, u->block_size, &u->memchunk);
551 offset = u->memchunk.index;
552 }
553 }
554
555 if (u->memchunk.length > 0) {
556 index = u->memchunk.index;
557 sendstream = !u->autonull || (u->autonull && canstream);
558 if (sendstream && pa_raop_client_send_audio_packet(u->raop, &u->memchunk, offset) < 0) {
559 if (errno == EINTR) {
560 /* Just try again. */
561 pa_log_debug("Failed to write data to FIFO (EINTR), retrying");
562 if (u->autoreconnect) {
563 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), PA_SINK_MESSAGE_DISCONNECT_REQUEST,
564 0, 0, NULL, NULL);
565 continue;
566 } else
567 goto fail;
568 } else if (errno != EAGAIN && !u->oob) {
569 /* Buffer is full, wait for POLLOUT. */
570 if (!u->oob) {
571 pollfd->events = POLLOUT;
572 pollfd->revents = 0;
573 }
574 } else {
575 pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno));
576 if (u->autoreconnect) {
577 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), PA_SINK_MESSAGE_DISCONNECT_REQUEST,
578 0, 0, NULL, NULL);
579 continue;
580 } else
581 goto fail;
582 }
583 } else {
584 if (sendstream) {
585 u->write_count += (uint64_t) u->memchunk.index - (uint64_t) index;
586 } else {
587 u->write_count += u->memchunk.length;
588 u->memchunk.length = 0;
589 }
590 position = u->write_count - pa_usec_to_bytes(u->delay, &u->sink->sample_spec);
591
592 now = pa_rtclock_now();
593 estimated = pa_bytes_to_usec(position, &u->sink->sample_spec);
594 pa_smoother_put(u->smoother, now, estimated);
595
596 if ((u->autonull && !canstream) || (u->oob && canstream && on_timeout)) {
597 /* Sleep until next packet transmission */
598 intvl = u->start + pa_bytes_to_usec(u->write_count, &u->sink->sample_spec);
599 pa_rtpoll_set_timer_absolute(u->rtpoll, intvl);
600 } else if (!u->oob) {
601 if (u->memchunk.length > 0) {
602 pollfd->events = POLLOUT;
603 pollfd->revents = 0;
604 } else {
605 intvl = u->start + pa_bytes_to_usec(u->write_count, &u->sink->sample_spec);
606 pa_rtpoll_set_timer_absolute(u->rtpoll, intvl);
607 pollfd->revents = 0;
608 pollfd->events = 0;
609 }
610 }
611 }
612 }
613 }
614
615 fail:
616 /* If this was no regular exit from the loop we have to continue
617 * processing messages until we received PA_MESSAGE_SHUTDOWN */
618 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
619 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
620
621 finish:
622 pa_log_debug("Thread shutting down");
623 }
624
sink_set_port_cb(pa_sink * s,pa_device_port * p)625 static int sink_set_port_cb(pa_sink *s, pa_device_port *p) {
626 return 0;
627 }
628
raop_create_port(struct userdata * u,const char * server)629 static pa_device_port *raop_create_port(struct userdata *u, const char *server) {
630 pa_device_port_new_data data;
631 pa_device_port *port;
632
633 pa_device_port_new_data_init(&data);
634
635 pa_device_port_new_data_set_name(&data, "network-output");
636 pa_device_port_new_data_set_description(&data, server);
637 pa_device_port_new_data_set_direction(&data, PA_DIRECTION_OUTPUT);
638 pa_device_port_new_data_set_type(&data, PA_DEVICE_PORT_TYPE_NETWORK);
639
640 port = pa_device_port_new(u->core, &data, 0);
641
642 pa_device_port_new_data_done(&data);
643
644 if (port == NULL)
645 return NULL;
646
647 pa_device_port_ref(port);
648
649 return port;
650 }
651
raop_create_profile()652 static pa_card_profile *raop_create_profile() {
653 pa_card_profile *profile;
654
655 profile = pa_card_profile_new("RAOP", _("RAOP standard profile"), 0);
656 profile->priority = 10;
657 profile->n_sinks = 1;
658 profile->n_sources = 0;
659 profile->max_sink_channels = 2;
660 profile->max_source_channels = 0;
661
662 return profile;
663 }
664
raop_create_card(pa_module * m,pa_device_port * port,pa_card_profile * profile,const char * server,const char * nicename)665 static pa_card *raop_create_card(pa_module *m, pa_device_port *port, pa_card_profile *profile, const char *server, const char *nicename) {
666 pa_card_new_data data;
667 pa_card *card;
668 char *card_name;
669
670 pa_card_new_data_init(&data);
671
672 pa_proplist_sets(data.proplist, PA_PROP_DEVICE_STRING, server);
673 pa_proplist_sets(data.proplist, PA_PROP_DEVICE_DESCRIPTION, nicename);
674 data.driver = __FILE__;
675
676 card_name = pa_sprintf_malloc("raop_client.%s", server);
677 pa_card_new_data_set_name(&data, card_name);
678 pa_xfree(card_name);
679
680 pa_hashmap_put(data.ports, port->name, port);
681 pa_hashmap_put(data.profiles, profile->name, profile);
682
683 card = pa_card_new(m->core, &data);
684
685 pa_card_new_data_done(&data);
686
687 if (card == NULL)
688 return NULL;
689
690 pa_card_choose_initial_profile(card);
691
692 pa_card_put(card);
693
694 return card;
695 }
696
pa_raop_sink_new(pa_module * m,pa_modargs * ma,const char * driver)697 pa_sink* pa_raop_sink_new(pa_module *m, pa_modargs *ma, const char *driver) {
698 struct userdata *u = NULL;
699 pa_sample_spec ss;
700 pa_channel_map map;
701 char *thread_name = NULL;
702 const char *server, *protocol, *encryption, *codec;
703 const char /* *username, */ *password;
704 pa_sink_new_data data;
705 const char *name = NULL;
706 const char *description = NULL;
707 pa_device_port *port;
708 pa_card_profile *profile;
709
710 pa_assert(m);
711 pa_assert(ma);
712
713 ss = m->core->default_sample_spec;
714 map = m->core->default_channel_map;
715
716 if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
717 pa_log("Invalid sample format specification or channel map");
718 goto fail;
719 }
720
721 if (!(server = pa_modargs_get_value(ma, "server", NULL))) {
722 pa_log("Failed to parse server argument");
723 goto fail;
724 }
725
726 if (!(protocol = pa_modargs_get_value(ma, "protocol", NULL))) {
727 pa_log("Failed to parse protocol argument");
728 goto fail;
729 }
730
731 u = pa_xnew0(struct userdata, 1);
732 u->core = m->core;
733 u->module = m;
734 u->thread = NULL;
735 u->rtpoll = pa_rtpoll_new();
736 u->rtpoll_item = NULL;
737 u->latency = RAOP_DEFAULT_LATENCY;
738 u->autoreconnect = false;
739 u->server = pa_xstrdup(server);
740
741 if (pa_modargs_get_value_boolean(ma, "autoreconnect", &u->autoreconnect) < 0) {
742 pa_log("Failed to parse autoreconnect argument");
743 goto fail;
744 }
745 /* Linked for now, potentially ready for additional parameter */
746 u->autonull = u->autoreconnect;
747
748 if (pa_modargs_get_value_u32(ma, "latency_msec", &u->latency) < 0) {
749 pa_log("Failed to parse latency_msec argument");
750 goto fail;
751 }
752
753 if (pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll) < 0) {
754 pa_log("pa_thread_mq_init() failed.");
755 goto fail;
756 }
757
758 u->oob = true;
759
760 u->block_size = 0;
761 pa_memchunk_reset(&u->memchunk);
762
763 u->delay = 0;
764 u->smoother = pa_smoother_new(
765 PA_USEC_PER_SEC,
766 PA_USEC_PER_SEC*2,
767 true,
768 true,
769 10,
770 0,
771 false);
772 u->write_count = 0;
773
774 if (pa_streq(protocol, "TCP")) {
775 u->protocol = PA_RAOP_PROTOCOL_TCP;
776 } else if (pa_streq(protocol, "UDP")) {
777 u->protocol = PA_RAOP_PROTOCOL_UDP;
778 } else {
779 pa_log("Unsupported transport protocol argument: %s", protocol);
780 goto fail;
781 }
782
783 encryption = pa_modargs_get_value(ma, "encryption", NULL);
784 codec = pa_modargs_get_value(ma, "codec", NULL);
785
786 if (!encryption) {
787 u->encryption = PA_RAOP_ENCRYPTION_NONE;
788 } else if (pa_streq(encryption, "none")) {
789 u->encryption = PA_RAOP_ENCRYPTION_NONE;
790 } else if (pa_streq(encryption, "RSA")) {
791 u->encryption = PA_RAOP_ENCRYPTION_RSA;
792 } else {
793 pa_log("Unsupported encryption type argument: %s", encryption);
794 goto fail;
795 }
796
797 if (!codec) {
798 u->codec = PA_RAOP_CODEC_PCM;
799 } else if (pa_streq(codec, "PCM")) {
800 u->codec = PA_RAOP_CODEC_PCM;
801 } else if (pa_streq(codec, "ALAC")) {
802 u->codec = PA_RAOP_CODEC_ALAC;
803 } else {
804 pa_log("Unsupported audio codec argument: %s", codec);
805 goto fail;
806 }
807
808 pa_sink_new_data_init(&data);
809 data.driver = driver;
810 data.module = m;
811
812 if ((name = pa_modargs_get_value(ma, "sink_name", NULL))) {
813 pa_sink_new_data_set_name(&data, name);
814 } else {
815 char *nick;
816
817 if ((name = pa_modargs_get_value(ma, "name", NULL)))
818 nick = pa_sprintf_malloc("raop_client.%s", name);
819 else
820 nick = pa_sprintf_malloc("raop_client.%s", server);
821 pa_sink_new_data_set_name(&data, nick);
822 pa_xfree(nick);
823 }
824
825 pa_sink_new_data_set_sample_spec(&data, &ss);
826 pa_sink_new_data_set_channel_map(&data, &map);
827
828 pa_proplist_sets(data.proplist, PA_PROP_DEVICE_STRING, server);
829 pa_proplist_sets(data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "music");
830 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "RAOP sink '%s'", server);
831
832 if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
833 pa_log("Invalid properties");
834 pa_sink_new_data_done(&data);
835 goto fail;
836 }
837
838 port = raop_create_port(u, server);
839 if (port == NULL) {
840 pa_log("Failed to create port object");
841 goto fail;
842 }
843
844 profile = raop_create_profile();
845 pa_hashmap_put(port->profiles, profile->name, profile);
846
847 description = pa_proplist_gets(data.proplist, PA_PROP_DEVICE_DESCRIPTION);
848 if (description == NULL)
849 description = server;
850
851 u->card = raop_create_card(m, port, profile, server, description);
852 if (u->card == NULL) {
853 pa_log("Failed to create card object");
854 goto fail;
855 }
856
857 data.card = u->card;
858 pa_hashmap_put(data.ports, port->name, port);
859
860 u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY | PA_SINK_NETWORK);
861 pa_sink_new_data_done(&data);
862
863 if (!(u->sink)) {
864 pa_log("Failed to create sink object");
865 goto fail;
866 }
867
868 u->sink->parent.process_msg = sink_process_msg;
869 u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
870 pa_sink_set_set_volume_callback(u->sink, sink_set_volume_cb);
871 pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
872 u->sink->userdata = u;
873 u->sink->set_port = sink_set_port_cb;
874
875 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
876 pa_sink_set_rtpoll(u->sink, u->rtpoll);
877
878 u->raop = pa_raop_client_new(u->core, server, u->protocol, u->encryption, u->codec, u->autoreconnect);
879
880 if (!(u->raop)) {
881 pa_log("Failed to create RAOP client object");
882 goto fail;
883 }
884
885 /* The number of frames per blocks is not negotiable... */
886 pa_raop_client_get_frames_per_block(u->raop, &u->block_size);
887 u->block_size *= pa_frame_size(&ss);
888 pa_sink_set_max_request(u->sink, u->block_size);
889 u->block_usec = pa_bytes_to_usec(u->block_size, &u->sink->sample_spec);
890
891 pa_raop_client_set_state_callback(u->raop, raop_state_cb, u);
892
893 thread_name = pa_sprintf_malloc("raop-sink-%s", server);
894 if (!(u->thread = pa_thread_new(thread_name, thread_func, u))) {
895 pa_log("Failed to create sink thread");
896 goto fail;
897 }
898 pa_xfree(thread_name);
899 thread_name = NULL;
900
901 pa_sink_put(u->sink);
902
903 /* username = pa_modargs_get_value(ma, "username", NULL); */
904 password = pa_modargs_get_value(ma, "password", NULL);
905 pa_raop_client_authenticate(u->raop, password );
906
907 return u->sink;
908
909 fail:
910 pa_xfree(thread_name);
911
912 if (u)
913 userdata_free(u);
914
915 return NULL;
916 }
917
userdata_free(struct userdata * u)918 static void userdata_free(struct userdata *u) {
919 pa_assert(u);
920
921 if (u->sink)
922 pa_sink_unlink(u->sink);
923
924 if (u->thread) {
925 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
926 pa_thread_free(u->thread);
927 }
928
929 pa_thread_mq_done(&u->thread_mq);
930
931 if (u->sink)
932 pa_sink_unref(u->sink);
933 u->sink = NULL;
934
935 if (u->rtpoll_item)
936 pa_rtpoll_item_free(u->rtpoll_item);
937 if (u->rtpoll)
938 pa_rtpoll_free(u->rtpoll);
939 u->rtpoll_item = NULL;
940 u->rtpoll = NULL;
941
942 if (u->memchunk.memblock)
943 pa_memblock_unref(u->memchunk.memblock);
944
945 if (u->raop)
946 pa_raop_client_free(u->raop);
947 u->raop = NULL;
948
949 if (u->smoother)
950 pa_smoother_free(u->smoother);
951 u->smoother = NULL;
952
953 if (u->card)
954 pa_card_free(u->card);
955 if (u->server)
956 pa_xfree(u->server);
957
958 pa_xfree(u);
959 }
960
pa_raop_sink_free(pa_sink * s)961 void pa_raop_sink_free(pa_sink *s) {
962 struct userdata *u;
963
964 pa_sink_assert_ref(s);
965 pa_assert_se(u = s->userdata);
966
967 userdata_free(u);
968 }
969