• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /***
2   This file is part of PulseAudio.
3 
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2008 Colin Guthrie
6   Copyright 2013 Hajime Fujita
7   Copyright 2013 Martin Blanchard
8 
9   PulseAudio is free software; you can redistribute it and/or modify
10   it under the terms of the GNU Lesser General Public License as published
11   by the Free Software Foundation; either version 2.1 of the License,
12   or (at your option) any later version.
13 
14   PulseAudio is distributed in the hope that it will be useful, but
15   WITHOUT ANY WARRANTY; without even the implied warranty of
16   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17   General Public License for more details.
18 
19   You should have received a copy of the GNU Lesser General Public License
20   along with PulseAudio; if not, write to the Free Software
21   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
22   USA.
23 ***/
24 
25 #ifdef HAVE_CONFIG_H
26 #include <config.h>
27 #endif
28 
29 #include <stdlib.h>
30 #include <stdio.h>
31 #include <errno.h>
32 #include <string.h>
33 #include <unistd.h>
34 #include <sys/socket.h>
35 #include <netinet/in.h>
36 #include <netinet/tcp.h>
37 #include <sys/ioctl.h>
38 
39 #ifdef HAVE_LINUX_SOCKIOS_H
40 #include <linux/sockios.h>
41 #endif
42 
43 #include <pulse/rtclock.h>
44 #include <pulse/timeval.h>
45 #include <pulse/volume.h>
46 #include <pulse/xmalloc.h>
47 
48 #include <pulsecore/core.h>
49 #include <pulsecore/i18n.h>
50 #include <pulsecore/module.h>
51 #include <pulsecore/memchunk.h>
52 #include <pulsecore/sink.h>
53 #include <pulsecore/modargs.h>
54 #include <pulsecore/core-error.h>
55 #include <pulsecore/core-util.h>
56 #include <pulsecore/log.h>
57 #include <pulsecore/macro.h>
58 #include <pulsecore/thread.h>
59 #include <pulsecore/thread-mq.h>
60 #include <pulsecore/poll.h>
61 #include <pulsecore/rtpoll.h>
62 #include <pulsecore/core-rtclock.h>
63 #include <pulsecore/time-smoother.h>
64 
65 #include "raop-sink.h"
66 #include "raop-client.h"
67 #include "raop-util.h"
68 
69 #define UDP_TIMING_PACKET_LOSS_MAX (30 * PA_USEC_PER_SEC)
70 #define UDP_TIMING_PACKET_DISCONNECT_CYCLE 3
71 
72 struct userdata {
73     pa_core *core;
74     pa_module *module;
75     pa_sink *sink;
76     pa_card *card;
77 
78     pa_thread *thread;
79     pa_thread_mq thread_mq;
80     pa_rtpoll *rtpoll;
81     pa_rtpoll_item *rtpoll_item;
82     bool oob;
83 
84     pa_raop_client *raop;
85     char *server;
86     pa_raop_protocol_t protocol;
87     pa_raop_encryption_t encryption;
88     pa_raop_codec_t codec;
89     bool autoreconnect;
90     /* if true, behaves like a null-sink when disconnected */
91     bool autonull;
92 
93     size_t block_size;
94     pa_usec_t block_usec;
95     pa_memchunk memchunk;
96 
97     pa_usec_t delay;
98     pa_usec_t start;
99     pa_smoother *smoother;
100     uint64_t write_count;
101 
102     uint32_t latency;
103     /* Consider as first I/O thread iteration, can be switched to true in autoreconnect mode */
104     bool first;
105 };
106 
107 enum {
108     PA_SINK_MESSAGE_SET_RAOP_STATE = PA_SINK_MESSAGE_MAX,
109     PA_SINK_MESSAGE_DISCONNECT_REQUEST
110 };
111 
112 static void userdata_free(struct userdata *u);
113 
114 static void sink_set_volume_cb(pa_sink *s);
115 
raop_state_cb(pa_raop_state_t state,void * userdata)116 static void raop_state_cb(pa_raop_state_t state, void *userdata) {
117     struct userdata *u = userdata;
118 
119     pa_assert(u);
120 
121     pa_log_debug("State change received, informing IO thread...");
122 
123     pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), PA_SINK_MESSAGE_SET_RAOP_STATE, PA_INT_TO_PTR(state), 0, NULL, NULL);
124 }
125 
sink_get_latency(const struct userdata * u)126 static int64_t sink_get_latency(const struct userdata *u) {
127     pa_usec_t now;
128     int64_t latency;
129 
130     pa_assert(u);
131     pa_assert(u->smoother);
132 
133     now = pa_rtclock_now();
134     now = pa_smoother_get(u->smoother, now);
135 
136     latency = pa_bytes_to_usec(u->write_count, &u->sink->sample_spec) - (int64_t) now;
137 
138     /* RAOP default latency */
139     latency += u->latency * PA_USEC_PER_MSEC;
140 
141     return latency;
142 }
143 
sink_process_msg(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)144 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
145     struct userdata *u = PA_SINK(o)->userdata;
146 
147     pa_assert(u);
148     pa_assert(u->raop);
149 
150     switch (code) {
151         /* Exception : for this message, we are in main thread, msg sent from the IO/thread
152            Done here, as alloc/free of rtsp_client is also done in this thread for other cases */
153         case PA_SINK_MESSAGE_DISCONNECT_REQUEST: {
154             if (u->sink->state == PA_SINK_RUNNING) {
155                 /* Disconnect raop client, and restart the whole chain since
156                  * the authentication token might be outdated */
157                 pa_raop_client_disconnect(u->raop);
158                 pa_raop_client_authenticate(u->raop, NULL);
159             }
160 
161             return 0;
162         }
163 
164         case PA_SINK_MESSAGE_GET_LATENCY: {
165             int64_t r = 0;
166 
167             if (u->autonull || pa_raop_client_can_stream(u->raop))
168                 r = sink_get_latency(u);
169 
170             *((int64_t*) data) = r;
171 
172             return 0;
173         }
174 
175         case PA_SINK_MESSAGE_SET_RAOP_STATE: {
176             switch ((pa_raop_state_t) PA_PTR_TO_UINT(data)) {
177                 case PA_RAOP_AUTHENTICATED: {
178                     if (!pa_raop_client_is_authenticated(u->raop)) {
179                         pa_module_unload_request(u->module, true);
180                     }
181 
182                     if (u->autoreconnect && u->sink->state == PA_SINK_RUNNING) {
183                         pa_usec_t now;
184                         now = pa_rtclock_now();
185                         pa_smoother_reset(u->smoother, now, false);
186 
187                         if (!pa_raop_client_is_alive(u->raop)) {
188                             /* Connecting will trigger a RECORD and start steaming */
189                             pa_raop_client_announce(u->raop);
190                         }
191                     }
192 
193                     return 0;
194                 }
195 
196                 case PA_RAOP_CONNECTED: {
197                     pa_assert(!u->rtpoll_item);
198 
199                     u->oob = pa_raop_client_register_pollfd(u->raop, u->rtpoll, &u->rtpoll_item);
200 
201                     return 0;
202                 }
203 
204                 case PA_RAOP_RECORDING: {
205                     pa_usec_t now;
206 
207                     now = pa_rtclock_now();
208                     u->write_count = 0;
209                     u->start = now;
210                     u->first = true;
211                     pa_rtpoll_set_timer_absolute(u->rtpoll, now);
212 
213                     if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
214                         /* Our stream has been suspended so we just flush it... */
215                         pa_rtpoll_set_timer_disabled(u->rtpoll);
216                         pa_raop_client_flush(u->raop);
217                     } else {
218                         /* Set the initial volume */
219                         sink_set_volume_cb(u->sink);
220                         pa_sink_process_msg(o, PA_SINK_MESSAGE_SET_VOLUME, data, offset, chunk);
221                     }
222 
223                     return 0;
224                 }
225 
226                 case PA_RAOP_INVALID_STATE:
227                 case PA_RAOP_DISCONNECTED: {
228                     unsigned int nbfds = 0;
229                     struct pollfd *pollfd;
230                     unsigned int i;
231 
232                     if (u->rtpoll_item) {
233                         pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, &nbfds);
234                         if (pollfd) {
235                             for (i = 0; i < nbfds; i++) {
236                                 if (pollfd->fd >= 0)
237                                    pa_close(pollfd->fd);
238                                 pollfd++;
239                             }
240                         }
241                         pa_rtpoll_item_free(u->rtpoll_item);
242                         u->rtpoll_item = NULL;
243                     }
244 
245                     if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
246                         pa_rtpoll_set_timer_disabled(u->rtpoll);
247 
248                         return 0;
249                     }
250 
251                     if (u->autoreconnect) {
252                         if (u->sink->thread_info.state != PA_SINK_IDLE) {
253                             if (!u->autonull)
254                                 pa_rtpoll_set_timer_disabled(u->rtpoll);
255                             pa_raop_client_authenticate(u->raop, NULL);
256                         }
257                     } else {
258                         if (u->sink->thread_info.state != PA_SINK_IDLE)
259                             pa_module_unload_request(u->module, true);
260                     }
261 
262                     return 0;
263                 }
264             }
265 
266             return 0;
267         }
268     }
269 
270     return pa_sink_process_msg(o, code, data, offset, chunk);
271 }
272 
273 /* Called from the IO thread. */
sink_set_state_in_io_thread_cb(pa_sink * s,pa_sink_state_t new_state,pa_suspend_cause_t new_suspend_cause)274 static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state, pa_suspend_cause_t new_suspend_cause) {
275     struct userdata *u;
276 
277     pa_assert(s);
278     pa_assert_se(u = s->userdata);
279 
280     /* It may be that only the suspend cause is changing, in which case there's
281      * nothing to do. */
282     if (new_state == s->thread_info.state)
283         return 0;
284 
285     switch (new_state) {
286         case PA_SINK_SUSPENDED:
287             pa_log_debug("RAOP: SUSPENDED");
288 
289             pa_assert(PA_SINK_IS_OPENED(s->thread_info.state));
290 
291             /* Issue a TEARDOWN if we are still connected */
292             if (pa_raop_client_is_alive(u->raop)) {
293                 pa_raop_client_teardown(u->raop);
294             }
295 
296             break;
297 
298         case PA_SINK_IDLE:
299             pa_log_debug("RAOP: IDLE");
300 
301             /* Issue a FLUSH if we're coming from running state */
302             if (s->thread_info.state == PA_SINK_RUNNING) {
303                 pa_rtpoll_set_timer_disabled(u->rtpoll);
304                 pa_raop_client_flush(u->raop);
305             }
306 
307             break;
308 
309         case PA_SINK_RUNNING: {
310             pa_usec_t now;
311 
312             pa_log_debug("RAOP: RUNNING");
313 
314             now = pa_rtclock_now();
315             pa_smoother_reset(u->smoother, now, false);
316 
317             /* If autonull is enabled, I/O thread is always eating chunks since
318              * it is emulating a null sink */
319             if (u->autonull) {
320                 u->start = now;
321                 u->write_count = 0;
322                 u->first = true;
323                 pa_rtpoll_set_timer_absolute(u->rtpoll, now);
324             }
325 
326             if (!pa_raop_client_is_alive(u->raop)) {
327                 /* Connecting will trigger a RECORD and start streaming */
328                 pa_raop_client_announce(u->raop);
329             } else if (!pa_raop_client_is_recording(u->raop)) {
330                 /* RECORD alredy sent, simply start streaming */
331                 pa_raop_client_stream(u->raop);
332                 pa_rtpoll_set_timer_absolute(u->rtpoll, now);
333                 u->write_count = 0;
334                 u->start = now;
335             }
336 
337             break;
338         }
339 
340         case PA_SINK_UNLINKED:
341         case PA_SINK_INIT:
342         case PA_SINK_INVALID_STATE:
343             break;
344     }
345 
346     return 0;
347 }
348 
sink_set_volume_cb(pa_sink * s)349 static void sink_set_volume_cb(pa_sink *s) {
350     struct userdata *u = s->userdata;
351     pa_cvolume hw;
352     pa_volume_t v, v_orig;
353     char t[PA_CVOLUME_SNPRINT_VERBOSE_MAX];
354 
355     pa_assert(u);
356 
357     /* If we're muted we don't need to do anything. */
358     if (s->muted)
359         return;
360 
361     /* Calculate the max volume of all channels.
362      * We'll use this as our (single) volume on the APEX device and emulate
363      * any variation in channel volumes in software. */
364     v = pa_cvolume_max(&s->real_volume);
365 
366     v_orig = v;
367     v = pa_raop_client_adjust_volume(u->raop, v_orig);
368 
369     pa_log_debug("Volume adjusted: orig=%u adjusted=%u", v_orig, v);
370 
371     /* Create a pa_cvolume version of our single value. */
372     pa_cvolume_set(&hw, s->sample_spec.channels, v);
373 
374     /* Perform any software manipulation of the volume needed. */
375     pa_sw_cvolume_divide(&s->soft_volume, &s->real_volume, &hw);
376 
377     pa_log_debug("Requested volume: %s", pa_cvolume_snprint_verbose(t, sizeof(t), &s->real_volume, &s->channel_map, false));
378     pa_log_debug("Got hardware volume: %s", pa_cvolume_snprint_verbose(t, sizeof(t), &hw, &s->channel_map, false));
379     pa_log_debug("Calculated software volume: %s",
380                  pa_cvolume_snprint_verbose(t, sizeof(t), &s->soft_volume, &s->channel_map, true));
381 
382     /* Any necessary software volume manipulation is done so set
383      * our hw volume (or v as a single value) on the device. */
384     pa_raop_client_set_volume(u->raop, v);
385 }
386 
sink_set_mute_cb(pa_sink * s)387 static void sink_set_mute_cb(pa_sink *s) {
388     struct userdata *u = s->userdata;
389 
390     pa_assert(u);
391     pa_assert(u->raop);
392 
393     if (s->muted) {
394         pa_raop_client_set_volume(u->raop, PA_VOLUME_MUTED);
395     } else {
396         sink_set_volume_cb(s);
397     }
398 }
399 
thread_func(void * userdata)400 static void thread_func(void *userdata) {
401     struct userdata *u = userdata;
402     size_t offset = 0;
403     pa_usec_t last_timing = 0;
404     uint32_t check_timing_count = 1;
405     pa_usec_t intvl = 0;
406 
407     pa_assert(u);
408 
409     pa_log_debug("Thread starting up");
410 
411     pa_thread_mq_install(&u->thread_mq);
412     pa_smoother_set_time_offset(u->smoother, pa_rtclock_now());
413 
414     for (;;) {
415         struct pollfd *pollfd = NULL;
416         unsigned int i, nbfds = 0;
417         pa_usec_t now, estimated;
418         uint64_t position;
419         size_t index;
420         int ret;
421         bool canstream, sendstream, on_timeout;
422 
423         /* Polling (audio data + control socket + timing socket). */
424         if ((ret = pa_rtpoll_run(u->rtpoll)) < 0)
425             goto fail;
426         else if (ret == 0)
427             goto finish;
428 
429         if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
430             if (u->sink->thread_info.rewind_requested)
431                 pa_sink_process_rewind(u->sink, 0);
432         }
433 
434         on_timeout = pa_rtpoll_timer_elapsed(u->rtpoll);
435         if (u->rtpoll_item) {
436             pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, &nbfds);
437             /* If !oob: streaming driven by pollds (POLLOUT) */
438             if (pollfd && !u->oob && !pollfd->revents) {
439                 for (i = 0; i < nbfds; i++) {
440                     pollfd->events = POLLOUT;
441                     pollfd->revents = 0;
442 
443                     pollfd++;
444                 }
445 
446                 continue;
447             }
448 
449             /* if oob: streaming managed by timing, pollfd for oob sockets */
450             if (pollfd && u->oob && !on_timeout) {
451                 uint8_t packet[32];
452                 ssize_t read;
453 
454                 for (i = 0; i < nbfds; i++) {
455                     if (pollfd->revents & POLLERR) {
456                         if (u->autoreconnect && pa_raop_client_is_alive(u->raop)) {
457                             pollfd->revents = 0;
458                             pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink),
459                                               PA_SINK_MESSAGE_DISCONNECT_REQUEST, 0, 0, NULL, NULL);
460                             continue;
461                         }
462 
463                         /* one of UDP fds is in faulty state, may have been disconnected, this is fatal  */
464                         goto fail;
465                     }
466                     if (pollfd->revents & pollfd->events) {
467                         pollfd->revents = 0;
468                         read = pa_read(pollfd->fd, packet, sizeof(packet), NULL);
469                         pa_raop_client_handle_oob_packet(u->raop, pollfd->fd, packet, read);
470                         if (pa_raop_client_is_timing_fd(u->raop, pollfd->fd)) {
471                             last_timing = pa_rtclock_now();
472                             check_timing_count = 1;
473                         }
474                     }
475 
476                     pollfd++;
477                 }
478 
479                 continue;
480             }
481         }
482 
483         if (u->sink->thread_info.state != PA_SINK_RUNNING) {
484             continue;
485         }
486 
487         if (u->first) {
488             last_timing = 0;
489             check_timing_count = 1;
490             intvl = 0;
491             u->first = false;
492         }
493 
494         canstream = pa_raop_client_can_stream(u->raop);
495         now = pa_rtclock_now();
496 
497         if (u->oob && u->autoreconnect && on_timeout) {
498             if (!canstream) {
499                 last_timing = 0;
500             } else if (last_timing != 0) {
501                 pa_usec_t since = now - last_timing;
502                 /* Incoming Timing packets should be received every 3 seconds in UDP mode
503                    according to raop specifications.
504                    Here we disconnect if no packet received since UDP_TIMING_PACKET_LOSS_MAX seconds
505                    We only detect timing packet requests interruptions (we do nothing if no packet received at all), since some clients do not implement RTCP Timing requests at all */
506 
507                 if (since > (UDP_TIMING_PACKET_LOSS_MAX/UDP_TIMING_PACKET_DISCONNECT_CYCLE)*check_timing_count) {
508                     if (check_timing_count < UDP_TIMING_PACKET_DISCONNECT_CYCLE) {
509                         uint32_t since_in_sec = since / PA_USEC_PER_SEC;
510                         pa_log_warn(
511                                 "UDP Timing Packets Warn #%d/%d- Nothing received since %d seconds from %s",
512                                 check_timing_count,
513                                 UDP_TIMING_PACKET_DISCONNECT_CYCLE-1, since_in_sec, u->server);
514                         check_timing_count++;
515                     } else {
516                         /* Limit reached, then request disconnect */
517                         check_timing_count = 1;
518                         last_timing = 0;
519                         if (pa_raop_client_is_alive(u->raop)) {
520                             pa_log_warn("UDP Timing Packets Warn limit reached - Requesting reconnect");
521                             pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink),
522                                               PA_SINK_MESSAGE_DISCONNECT_REQUEST, 0, 0, NULL, NULL);
523                             continue;
524                         }
525                     }
526                 }
527             }
528         }
529 
530         if (!u->autonull) {
531             if (!canstream) {
532                 pa_log_debug("Can't stream, connection not established yet...");
533                 continue;
534             }
535             /* This assertion is meant to silence a complaint from Coverity about
536              * pollfd being possibly NULL when we access it later. That's a false
537              * positive, because we check pa_raop_client_can_stream() above, and if
538              * that returns true, it means that the connection is up, and when the
539              * connection is up, pollfd will be non-NULL. */
540             pa_assert(pollfd);
541         }
542 
543         if (u->memchunk.length <= 0) {
544             if (intvl < now + u->block_usec) {
545                 if (u->memchunk.memblock)
546                     pa_memblock_unref(u->memchunk.memblock);
547                 pa_memchunk_reset(&u->memchunk);
548 
549                 /* Grab unencoded audio data from PulseAudio */
550                 pa_sink_render_full(u->sink, u->block_size, &u->memchunk);
551                 offset = u->memchunk.index;
552             }
553         }
554 
555         if (u->memchunk.length > 0) {
556             index = u->memchunk.index;
557             sendstream = !u->autonull || (u->autonull && canstream);
558             if (sendstream && pa_raop_client_send_audio_packet(u->raop, &u->memchunk, offset) < 0) {
559                 if (errno == EINTR) {
560                     /* Just try again. */
561                     pa_log_debug("Failed to write data to FIFO (EINTR), retrying");
562                     if (u->autoreconnect) {
563                         pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), PA_SINK_MESSAGE_DISCONNECT_REQUEST,
564                                           0, 0, NULL, NULL);
565                         continue;
566                     } else
567                         goto fail;
568                 } else if (errno != EAGAIN && !u->oob) {
569                     /* Buffer is full, wait for POLLOUT. */
570                     if (!u->oob) {
571                         pollfd->events = POLLOUT;
572                         pollfd->revents = 0;
573                     }
574                 } else {
575                     pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno));
576                     if (u->autoreconnect) {
577                         pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), PA_SINK_MESSAGE_DISCONNECT_REQUEST,
578                                           0, 0, NULL, NULL);
579                         continue;
580                     } else
581                         goto fail;
582                 }
583             } else {
584                 if (sendstream) {
585                     u->write_count += (uint64_t) u->memchunk.index - (uint64_t) index;
586                 } else {
587                     u->write_count += u->memchunk.length;
588                     u->memchunk.length = 0;
589                 }
590                 position = u->write_count - pa_usec_to_bytes(u->delay, &u->sink->sample_spec);
591 
592                 now = pa_rtclock_now();
593                 estimated = pa_bytes_to_usec(position, &u->sink->sample_spec);
594                 pa_smoother_put(u->smoother, now, estimated);
595 
596                 if ((u->autonull && !canstream) || (u->oob && canstream && on_timeout)) {
597                     /* Sleep until next packet transmission */
598                     intvl = u->start + pa_bytes_to_usec(u->write_count, &u->sink->sample_spec);
599                     pa_rtpoll_set_timer_absolute(u->rtpoll, intvl);
600                 } else if (!u->oob) {
601                     if (u->memchunk.length > 0) {
602                         pollfd->events = POLLOUT;
603                         pollfd->revents = 0;
604                     } else {
605                         intvl = u->start + pa_bytes_to_usec(u->write_count, &u->sink->sample_spec);
606                         pa_rtpoll_set_timer_absolute(u->rtpoll, intvl);
607                         pollfd->revents = 0;
608                         pollfd->events = 0;
609                     }
610                 }
611             }
612         }
613     }
614 
615 fail:
616     /* If this was no regular exit from the loop we have to continue
617      * processing messages until we received PA_MESSAGE_SHUTDOWN */
618     pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
619     pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
620 
621 finish:
622     pa_log_debug("Thread shutting down");
623 }
624 
sink_set_port_cb(pa_sink * s,pa_device_port * p)625 static int sink_set_port_cb(pa_sink *s, pa_device_port *p) {
626     return 0;
627 }
628 
raop_create_port(struct userdata * u,const char * server)629 static pa_device_port *raop_create_port(struct userdata *u, const char *server) {
630     pa_device_port_new_data data;
631     pa_device_port *port;
632 
633     pa_device_port_new_data_init(&data);
634 
635     pa_device_port_new_data_set_name(&data, "network-output");
636     pa_device_port_new_data_set_description(&data, server);
637     pa_device_port_new_data_set_direction(&data, PA_DIRECTION_OUTPUT);
638     pa_device_port_new_data_set_type(&data, PA_DEVICE_PORT_TYPE_NETWORK);
639 
640     port = pa_device_port_new(u->core, &data, 0);
641 
642     pa_device_port_new_data_done(&data);
643 
644     if (port == NULL)
645         return NULL;
646 
647     pa_device_port_ref(port);
648 
649     return port;
650 }
651 
raop_create_profile()652 static pa_card_profile *raop_create_profile() {
653     pa_card_profile *profile;
654 
655     profile = pa_card_profile_new("RAOP", _("RAOP standard profile"), 0);
656     profile->priority = 10;
657     profile->n_sinks = 1;
658     profile->n_sources = 0;
659     profile->max_sink_channels = 2;
660     profile->max_source_channels = 0;
661 
662     return profile;
663 }
664 
raop_create_card(pa_module * m,pa_device_port * port,pa_card_profile * profile,const char * server,const char * nicename)665 static pa_card *raop_create_card(pa_module *m, pa_device_port *port, pa_card_profile *profile, const char *server, const char *nicename) {
666     pa_card_new_data data;
667     pa_card *card;
668     char *card_name;
669 
670     pa_card_new_data_init(&data);
671 
672     pa_proplist_sets(data.proplist, PA_PROP_DEVICE_STRING, server);
673     pa_proplist_sets(data.proplist, PA_PROP_DEVICE_DESCRIPTION, nicename);
674     data.driver = __FILE__;
675 
676     card_name = pa_sprintf_malloc("raop_client.%s", server);
677     pa_card_new_data_set_name(&data, card_name);
678     pa_xfree(card_name);
679 
680     pa_hashmap_put(data.ports, port->name, port);
681     pa_hashmap_put(data.profiles, profile->name, profile);
682 
683     card = pa_card_new(m->core, &data);
684 
685     pa_card_new_data_done(&data);
686 
687     if (card == NULL)
688         return NULL;
689 
690     pa_card_choose_initial_profile(card);
691 
692     pa_card_put(card);
693 
694     return card;
695 }
696 
pa_raop_sink_new(pa_module * m,pa_modargs * ma,const char * driver)697 pa_sink* pa_raop_sink_new(pa_module *m, pa_modargs *ma, const char *driver) {
698     struct userdata *u = NULL;
699     pa_sample_spec ss;
700     pa_channel_map map;
701     char *thread_name = NULL;
702     const char *server, *protocol, *encryption, *codec;
703     const char /* *username, */ *password;
704     pa_sink_new_data data;
705     const char *name = NULL;
706     const char *description = NULL;
707     pa_device_port *port;
708     pa_card_profile *profile;
709 
710     pa_assert(m);
711     pa_assert(ma);
712 
713     ss = m->core->default_sample_spec;
714     map = m->core->default_channel_map;
715 
716     if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
717         pa_log("Invalid sample format specification or channel map");
718         goto fail;
719     }
720 
721     if (!(server = pa_modargs_get_value(ma, "server", NULL))) {
722         pa_log("Failed to parse server argument");
723         goto fail;
724     }
725 
726     if (!(protocol = pa_modargs_get_value(ma, "protocol", NULL))) {
727         pa_log("Failed to parse protocol argument");
728         goto fail;
729     }
730 
731     u = pa_xnew0(struct userdata, 1);
732     u->core = m->core;
733     u->module = m;
734     u->thread = NULL;
735     u->rtpoll = pa_rtpoll_new();
736     u->rtpoll_item = NULL;
737     u->latency = RAOP_DEFAULT_LATENCY;
738     u->autoreconnect = false;
739     u->server = pa_xstrdup(server);
740 
741     if (pa_modargs_get_value_boolean(ma, "autoreconnect", &u->autoreconnect) < 0) {
742         pa_log("Failed to parse autoreconnect argument");
743         goto fail;
744     }
745     /* Linked for now, potentially ready for additional parameter */
746     u->autonull = u->autoreconnect;
747 
748     if (pa_modargs_get_value_u32(ma, "latency_msec", &u->latency) < 0) {
749         pa_log("Failed to parse latency_msec argument");
750         goto fail;
751     }
752 
753     if (pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll) < 0) {
754         pa_log("pa_thread_mq_init() failed.");
755         goto fail;
756     }
757 
758     u->oob = true;
759 
760     u->block_size = 0;
761     pa_memchunk_reset(&u->memchunk);
762 
763     u->delay = 0;
764     u->smoother = pa_smoother_new(
765             PA_USEC_PER_SEC,
766             PA_USEC_PER_SEC*2,
767             true,
768             true,
769             10,
770             0,
771             false);
772     u->write_count = 0;
773 
774     if (pa_streq(protocol, "TCP")) {
775         u->protocol = PA_RAOP_PROTOCOL_TCP;
776     } else if (pa_streq(protocol, "UDP")) {
777         u->protocol = PA_RAOP_PROTOCOL_UDP;
778     } else {
779         pa_log("Unsupported transport protocol argument: %s", protocol);
780         goto fail;
781     }
782 
783     encryption = pa_modargs_get_value(ma, "encryption", NULL);
784     codec = pa_modargs_get_value(ma, "codec", NULL);
785 
786     if (!encryption) {
787         u->encryption = PA_RAOP_ENCRYPTION_NONE;
788     } else if (pa_streq(encryption, "none")) {
789         u->encryption = PA_RAOP_ENCRYPTION_NONE;
790     } else if (pa_streq(encryption, "RSA")) {
791         u->encryption = PA_RAOP_ENCRYPTION_RSA;
792     } else {
793         pa_log("Unsupported encryption type argument: %s", encryption);
794         goto fail;
795     }
796 
797     if (!codec) {
798         u->codec = PA_RAOP_CODEC_PCM;
799     } else if (pa_streq(codec, "PCM")) {
800         u->codec = PA_RAOP_CODEC_PCM;
801     } else if (pa_streq(codec, "ALAC")) {
802         u->codec = PA_RAOP_CODEC_ALAC;
803     } else {
804         pa_log("Unsupported audio codec argument: %s", codec);
805         goto fail;
806     }
807 
808     pa_sink_new_data_init(&data);
809     data.driver = driver;
810     data.module = m;
811 
812     if ((name = pa_modargs_get_value(ma, "sink_name", NULL))) {
813         pa_sink_new_data_set_name(&data, name);
814     } else {
815         char *nick;
816 
817         if ((name = pa_modargs_get_value(ma, "name", NULL)))
818             nick = pa_sprintf_malloc("raop_client.%s", name);
819         else
820             nick = pa_sprintf_malloc("raop_client.%s", server);
821         pa_sink_new_data_set_name(&data, nick);
822         pa_xfree(nick);
823     }
824 
825     pa_sink_new_data_set_sample_spec(&data, &ss);
826     pa_sink_new_data_set_channel_map(&data, &map);
827 
828     pa_proplist_sets(data.proplist, PA_PROP_DEVICE_STRING, server);
829     pa_proplist_sets(data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "music");
830     pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "RAOP sink '%s'", server);
831 
832     if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
833         pa_log("Invalid properties");
834         pa_sink_new_data_done(&data);
835         goto fail;
836     }
837 
838     port = raop_create_port(u, server);
839     if (port == NULL) {
840         pa_log("Failed to create port object");
841         goto fail;
842     }
843 
844     profile = raop_create_profile();
845     pa_hashmap_put(port->profiles, profile->name, profile);
846 
847     description = pa_proplist_gets(data.proplist, PA_PROP_DEVICE_DESCRIPTION);
848     if (description == NULL)
849         description = server;
850 
851     u->card = raop_create_card(m, port, profile, server, description);
852     if (u->card == NULL) {
853         pa_log("Failed to create card object");
854         goto fail;
855     }
856 
857     data.card = u->card;
858     pa_hashmap_put(data.ports, port->name, port);
859 
860     u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY | PA_SINK_NETWORK);
861     pa_sink_new_data_done(&data);
862 
863     if (!(u->sink)) {
864         pa_log("Failed to create sink object");
865         goto fail;
866     }
867 
868     u->sink->parent.process_msg = sink_process_msg;
869     u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
870     pa_sink_set_set_volume_callback(u->sink, sink_set_volume_cb);
871     pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
872     u->sink->userdata = u;
873     u->sink->set_port = sink_set_port_cb;
874 
875     pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
876     pa_sink_set_rtpoll(u->sink, u->rtpoll);
877 
878     u->raop = pa_raop_client_new(u->core, server, u->protocol, u->encryption, u->codec, u->autoreconnect);
879 
880     if (!(u->raop)) {
881         pa_log("Failed to create RAOP client object");
882         goto fail;
883     }
884 
885     /* The number of frames per blocks is not negotiable... */
886     pa_raop_client_get_frames_per_block(u->raop, &u->block_size);
887     u->block_size *= pa_frame_size(&ss);
888     pa_sink_set_max_request(u->sink, u->block_size);
889     u->block_usec = pa_bytes_to_usec(u->block_size, &u->sink->sample_spec);
890 
891     pa_raop_client_set_state_callback(u->raop, raop_state_cb, u);
892 
893     thread_name = pa_sprintf_malloc("raop-sink-%s", server);
894     if (!(u->thread = pa_thread_new(thread_name, thread_func, u))) {
895         pa_log("Failed to create sink thread");
896         goto fail;
897     }
898     pa_xfree(thread_name);
899     thread_name = NULL;
900 
901     pa_sink_put(u->sink);
902 
903     /* username = pa_modargs_get_value(ma, "username", NULL); */
904     password = pa_modargs_get_value(ma, "password", NULL);
905     pa_raop_client_authenticate(u->raop, password );
906 
907     return u->sink;
908 
909 fail:
910     pa_xfree(thread_name);
911 
912     if (u)
913         userdata_free(u);
914 
915     return NULL;
916 }
917 
userdata_free(struct userdata * u)918 static void userdata_free(struct userdata *u) {
919     pa_assert(u);
920 
921     if (u->sink)
922         pa_sink_unlink(u->sink);
923 
924     if (u->thread) {
925         pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
926         pa_thread_free(u->thread);
927     }
928 
929     pa_thread_mq_done(&u->thread_mq);
930 
931     if (u->sink)
932         pa_sink_unref(u->sink);
933     u->sink = NULL;
934 
935     if (u->rtpoll_item)
936         pa_rtpoll_item_free(u->rtpoll_item);
937     if (u->rtpoll)
938         pa_rtpoll_free(u->rtpoll);
939     u->rtpoll_item = NULL;
940     u->rtpoll = NULL;
941 
942     if (u->memchunk.memblock)
943         pa_memblock_unref(u->memchunk.memblock);
944 
945     if (u->raop)
946         pa_raop_client_free(u->raop);
947     u->raop = NULL;
948 
949     if (u->smoother)
950         pa_smoother_free(u->smoother);
951     u->smoother = NULL;
952 
953     if (u->card)
954         pa_card_free(u->card);
955     if (u->server)
956         pa_xfree(u->server);
957 
958     pa_xfree(u);
959 }
960 
pa_raop_sink_free(pa_sink * s)961 void pa_raop_sink_free(pa_sink *s) {
962     struct userdata *u;
963 
964     pa_sink_assert_ref(s);
965     pa_assert_se(u = s->userdata);
966 
967     userdata_free(u);
968 }
969