• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /***
2   This file is part of PulseAudio.
3 
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2008 Colin Guthrie
6   Copyright 2013 Hajime Fujita
7   Copyright 2013 Martin Blanchard
8 
9   PulseAudio is free software; you can redistribute it and/or modify
10   it under the terms of the GNU Lesser General Public License as published
11   by the Free Software Foundation; either version 2.1 of the License,
12   or (at your option) any later version.
13 
14   PulseAudio is distributed in the hope that it will be useful, but
15   WITHOUT ANY WARRANTY; without even the implied warranty of
16   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17   General Public License for more details.
18 
19   You should have received a copy of the GNU Lesser General Public License
20   along with PulseAudio; if not, write to the Free Software
21   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
22   USA.
23 ***/
24 
25 #ifdef HAVE_CONFIG_H
26 #include <config.h>
27 #endif
28 
29 #include <stdlib.h>
30 #include <stdio.h>
31 #include <errno.h>
32 #include <string.h>
33 #include <unistd.h>
34 #include <sys/socket.h>
35 #include <netinet/in.h>
36 #include <netinet/tcp.h>
37 #include <sys/ioctl.h>
38 
39 #ifdef HAVE_LINUX_SOCKIOS_H
40 #include <linux/sockios.h>
41 #endif
42 
43 #include <pulse/rtclock.h>
44 #include <pulse/timeval.h>
45 #include <pulse/volume.h>
46 #include <pulse/xmalloc.h>
47 
48 #include <pulsecore/core.h>
49 #include <pulsecore/i18n.h>
50 #include <pulsecore/module.h>
51 #include <pulsecore/memchunk.h>
52 #include <pulsecore/sink.h>
53 #include <pulsecore/modargs.h>
54 #include <pulsecore/core-error.h>
55 #include <pulsecore/core-util.h>
56 #include <pulsecore/log.h>
57 #include <pulsecore/macro.h>
58 #include <pulsecore/thread.h>
59 #include <pulsecore/thread-mq.h>
60 #include <pulsecore/poll.h>
61 #include <pulsecore/rtpoll.h>
62 #include <pulsecore/core-rtclock.h>
63 
64 #ifdef USE_SMOOTHER_2
65 #include <pulsecore/time-smoother_2.h>
66 #else
67 #include <pulsecore/time-smoother.h>
68 #endif
69 
70 #include "raop-sink.h"
71 #include "raop-client.h"
72 #include "raop-util.h"
73 
74 #define UDP_TIMING_PACKET_LOSS_MAX (30 * PA_USEC_PER_SEC)
75 #define UDP_TIMING_PACKET_DISCONNECT_CYCLE 3
76 
77 struct userdata {
78     pa_core *core;
79     pa_module *module;
80     pa_sink *sink;
81     pa_card *card;
82 
83     pa_thread *thread;
84     pa_thread_mq thread_mq;
85     pa_rtpoll *rtpoll;
86     pa_rtpoll_item *rtpoll_item;
87     bool oob;
88 
89     pa_raop_client *raop;
90     char *server;
91     pa_raop_protocol_t protocol;
92     pa_raop_encryption_t encryption;
93     pa_raop_codec_t codec;
94     bool autoreconnect;
95     /* if true, behaves like a null-sink when disconnected */
96     bool autonull;
97 
98     size_t block_size;
99     pa_usec_t block_usec;
100     pa_memchunk memchunk;
101 
102     pa_usec_t delay;
103     pa_usec_t start;
104 #ifdef USE_SMOOTHER_2
105     pa_smoother_2 *smoother;
106 #else
107     pa_smoother *smoother;
108 #endif
109     uint64_t write_count;
110 
111     uint32_t latency;
112     /* Consider as first I/O thread iteration, can be switched to true in autoreconnect mode */
113     bool first;
114 };
115 
116 enum {
117     PA_SINK_MESSAGE_SET_RAOP_STATE = PA_SINK_MESSAGE_MAX,
118     PA_SINK_MESSAGE_DISCONNECT_REQUEST
119 };
120 
121 static void userdata_free(struct userdata *u);
122 
123 static void sink_set_volume_cb(pa_sink *s);
124 
raop_state_cb(pa_raop_state_t state,void * userdata)125 static void raop_state_cb(pa_raop_state_t state, void *userdata) {
126     struct userdata *u = userdata;
127 
128     pa_assert(u);
129 
130     pa_log_debug("State change received, informing IO thread...");
131 
132     pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), PA_SINK_MESSAGE_SET_RAOP_STATE, PA_INT_TO_PTR(state), 0, NULL, NULL);
133 }
134 
sink_get_latency(const struct userdata * u)135 static int64_t sink_get_latency(const struct userdata *u) {
136 #ifndef USE_SMOOTHER_2
137     pa_usec_t now;
138 #endif
139     int64_t latency;
140 
141     pa_assert(u);
142     pa_assert(u->smoother);
143 
144 #ifdef USE_SMOOTHER_2
145     latency = pa_smoother_2_get_delay(u->smoother, pa_rtclock_now(), u->write_count);
146 #else
147     now = pa_rtclock_now();
148     now = pa_smoother_get(u->smoother, now);
149 
150     latency = pa_bytes_to_usec(u->write_count, &u->sink->sample_spec) - (int64_t) now;
151 #endif
152 
153     /* RAOP default latency */
154     latency += u->latency * PA_USEC_PER_MSEC;
155 
156     return latency;
157 }
158 
sink_process_msg(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)159 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
160     struct userdata *u = PA_SINK(o)->userdata;
161 
162     pa_assert(u);
163     pa_assert(u->raop);
164 
165     switch (code) {
166         /* Exception : for this message, we are in main thread, msg sent from the IO/thread
167            Done here, as alloc/free of rtsp_client is also done in this thread for other cases */
168         case PA_SINK_MESSAGE_DISCONNECT_REQUEST: {
169             if (u->sink->state == PA_SINK_RUNNING) {
170                 /* Disconnect raop client, and restart the whole chain since
171                  * the authentication token might be outdated */
172                 pa_raop_client_disconnect(u->raop);
173                 pa_raop_client_authenticate(u->raop, NULL);
174             }
175 
176             return 0;
177         }
178 
179         case PA_SINK_MESSAGE_GET_LATENCY: {
180             int64_t r = 0;
181 
182             if (u->autonull || pa_raop_client_can_stream(u->raop))
183                 r = sink_get_latency(u);
184 
185             *((int64_t*) data) = r;
186 
187             return 0;
188         }
189 
190         case PA_SINK_MESSAGE_SET_RAOP_STATE: {
191             switch ((pa_raop_state_t) PA_PTR_TO_UINT(data)) {
192                 case PA_RAOP_AUTHENTICATED: {
193                     if (!pa_raop_client_is_authenticated(u->raop)) {
194                         pa_module_unload_request(u->module, true);
195                     }
196 
197                     if (u->autoreconnect && u->sink->state == PA_SINK_RUNNING) {
198                         pa_usec_t now;
199                         now = pa_rtclock_now();
200 #ifdef USE_SMOOTHER_2
201                         pa_smoother_2_reset(u->smoother, now);
202 #else
203                         pa_smoother_reset(u->smoother, now, false);
204 #endif
205 
206                         if (!pa_raop_client_is_alive(u->raop)) {
207                             /* Connecting will trigger a RECORD and start steaming */
208                             pa_raop_client_announce(u->raop);
209                         }
210                     }
211 
212                     return 0;
213                 }
214 
215                 case PA_RAOP_CONNECTED: {
216                     pa_assert(!u->rtpoll_item);
217 
218                     u->oob = pa_raop_client_register_pollfd(u->raop, u->rtpoll, &u->rtpoll_item);
219 
220                     return 0;
221                 }
222 
223                 case PA_RAOP_RECORDING: {
224                     pa_usec_t now;
225 
226                     now = pa_rtclock_now();
227                     u->write_count = 0;
228                     u->start = now;
229                     u->first = true;
230                     pa_rtpoll_set_timer_absolute(u->rtpoll, now);
231 
232                     if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
233                         /* Our stream has been suspended so we just flush it... */
234                         pa_rtpoll_set_timer_disabled(u->rtpoll);
235                         pa_raop_client_flush(u->raop);
236                     } else {
237                         /* Set the initial volume */
238                         sink_set_volume_cb(u->sink);
239                         pa_sink_process_msg(o, PA_SINK_MESSAGE_SET_VOLUME, data, offset, chunk);
240                     }
241 
242                     return 0;
243                 }
244 
245                 case PA_RAOP_INVALID_STATE:
246                 case PA_RAOP_DISCONNECTED: {
247                     unsigned int nbfds = 0;
248                     struct pollfd *pollfd;
249                     unsigned int i;
250 
251                     if (u->rtpoll_item) {
252                         pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, &nbfds);
253                         if (pollfd) {
254                             for (i = 0; i < nbfds; i++) {
255                                 if (pollfd->fd >= 0)
256                                    pa_close(pollfd->fd);
257                                 pollfd++;
258                             }
259                         }
260                         pa_rtpoll_item_free(u->rtpoll_item);
261                         u->rtpoll_item = NULL;
262                     }
263 
264                     if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
265                         pa_rtpoll_set_timer_disabled(u->rtpoll);
266 
267                         return 0;
268                     }
269 
270                     if (u->autoreconnect) {
271                         if (u->sink->thread_info.state != PA_SINK_IDLE) {
272                             if (!u->autonull)
273                                 pa_rtpoll_set_timer_disabled(u->rtpoll);
274                             pa_raop_client_authenticate(u->raop, NULL);
275                         }
276                     } else {
277                         if (u->sink->thread_info.state != PA_SINK_IDLE)
278                             pa_module_unload_request(u->module, true);
279                     }
280 
281                     return 0;
282                 }
283             }
284 
285             return 0;
286         }
287     }
288 
289     return pa_sink_process_msg(o, code, data, offset, chunk);
290 }
291 
292 /* Called from the IO thread. */
sink_set_state_in_io_thread_cb(pa_sink * s,pa_sink_state_t new_state,pa_suspend_cause_t new_suspend_cause)293 static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state, pa_suspend_cause_t new_suspend_cause) {
294     struct userdata *u;
295 
296     pa_assert(s);
297     pa_assert_se(u = s->userdata);
298 
299     /* It may be that only the suspend cause is changing, in which case there's
300      * nothing to do. */
301     if (new_state == s->thread_info.state)
302         return 0;
303 
304     switch (new_state) {
305         case PA_SINK_SUSPENDED:
306             pa_log_debug("RAOP: SUSPENDED");
307 
308             pa_assert(PA_SINK_IS_OPENED(s->thread_info.state));
309 
310             /* Issue a TEARDOWN if we are still connected */
311             if (pa_raop_client_is_alive(u->raop)) {
312                 pa_raop_client_teardown(u->raop);
313             }
314 
315             break;
316 
317         case PA_SINK_IDLE:
318             pa_log_debug("RAOP: IDLE");
319 
320             /* Issue a FLUSH if we're coming from running state */
321             if (s->thread_info.state == PA_SINK_RUNNING) {
322                 pa_rtpoll_set_timer_disabled(u->rtpoll);
323                 pa_raop_client_flush(u->raop);
324             }
325 
326             break;
327 
328         case PA_SINK_RUNNING: {
329             pa_usec_t now;
330 
331             pa_log_debug("RAOP: RUNNING");
332 
333             now = pa_rtclock_now();
334 #ifdef USE_SMOOTHER_2
335             pa_smoother_2_reset(u->smoother, now);
336 #else
337             pa_smoother_reset(u->smoother, now, false);
338 #endif
339 
340             /* If autonull is enabled, I/O thread is always eating chunks since
341              * it is emulating a null sink */
342             if (u->autonull) {
343                 u->start = now;
344                 u->write_count = 0;
345                 u->first = true;
346                 pa_rtpoll_set_timer_absolute(u->rtpoll, now);
347             }
348 
349             if (!pa_raop_client_is_alive(u->raop)) {
350                 /* Connecting will trigger a RECORD and start streaming */
351                 pa_raop_client_announce(u->raop);
352             } else if (!pa_raop_client_is_recording(u->raop)) {
353                 /* RECORD alredy sent, simply start streaming */
354                 pa_raop_client_stream(u->raop);
355                 pa_rtpoll_set_timer_absolute(u->rtpoll, now);
356                 u->write_count = 0;
357                 u->start = now;
358             }
359 
360             break;
361         }
362 
363         case PA_SINK_UNLINKED:
364         case PA_SINK_INIT:
365         case PA_SINK_INVALID_STATE:
366             break;
367     }
368 
369     return 0;
370 }
371 
sink_set_volume_cb(pa_sink * s)372 static void sink_set_volume_cb(pa_sink *s) {
373     struct userdata *u = s->userdata;
374     pa_cvolume hw;
375     pa_volume_t v, v_orig;
376     char t[PA_CVOLUME_SNPRINT_VERBOSE_MAX];
377 
378     pa_assert(u);
379 
380     /* If we're muted we don't need to do anything. */
381     if (s->muted)
382         return;
383 
384     /* Calculate the max volume of all channels.
385      * We'll use this as our (single) volume on the APEX device and emulate
386      * any variation in channel volumes in software. */
387     v = pa_cvolume_max(&s->real_volume);
388 
389     v_orig = v;
390     v = pa_raop_client_adjust_volume(u->raop, v_orig);
391 
392     pa_log_debug("Volume adjusted: orig=%u adjusted=%u", v_orig, v);
393 
394     /* Create a pa_cvolume version of our single value. */
395     pa_cvolume_set(&hw, s->sample_spec.channels, v);
396 
397     /* Perform any software manipulation of the volume needed. */
398     pa_sw_cvolume_divide(&s->soft_volume, &s->real_volume, &hw);
399 
400     pa_log_debug("Requested volume: %s", pa_cvolume_snprint_verbose(t, sizeof(t), &s->real_volume, &s->channel_map, false));
401     pa_log_debug("Got hardware volume: %s", pa_cvolume_snprint_verbose(t, sizeof(t), &hw, &s->channel_map, false));
402     pa_log_debug("Calculated software volume: %s",
403                  pa_cvolume_snprint_verbose(t, sizeof(t), &s->soft_volume, &s->channel_map, true));
404 
405     /* Any necessary software volume manipulation is done so set
406      * our hw volume (or v as a single value) on the device. */
407     pa_raop_client_set_volume(u->raop, v);
408 }
409 
sink_set_mute_cb(pa_sink * s)410 static void sink_set_mute_cb(pa_sink *s) {
411     struct userdata *u = s->userdata;
412 
413     pa_assert(u);
414     pa_assert(u->raop);
415 
416     if (s->muted) {
417         pa_raop_client_set_volume(u->raop, PA_VOLUME_MUTED);
418     } else {
419         sink_set_volume_cb(s);
420     }
421 }
422 
thread_func(void * userdata)423 static void thread_func(void *userdata) {
424     struct userdata *u = userdata;
425     size_t offset = 0;
426     pa_usec_t last_timing = 0;
427     uint32_t check_timing_count = 1;
428     pa_usec_t intvl = 0;
429 
430     pa_assert(u);
431 
432     pa_log_debug("Thread starting up");
433 
434     pa_thread_mq_install(&u->thread_mq);
435 #ifdef USE_SMOOTHER_2
436     pa_smoother_2_reset(u->smoother, pa_rtclock_now());
437 #else
438     pa_smoother_set_time_offset(u->smoother, pa_rtclock_now());
439 #endif
440 
441     for (;;) {
442         struct pollfd *pollfd = NULL;
443         unsigned int i, nbfds = 0;
444         pa_usec_t now;
445         uint64_t position;
446         size_t index;
447         int ret;
448         bool canstream, sendstream, on_timeout;
449 #ifndef USE_SMOOTHER_2
450         pa_usec_t estimated;
451 #endif
452 
453         /* Polling (audio data + control socket + timing socket). */
454         if ((ret = pa_rtpoll_run(u->rtpoll)) < 0)
455             goto fail;
456         else if (ret == 0)
457             goto finish;
458 
459         if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
460             if (u->sink->thread_info.rewind_requested)
461                 pa_sink_process_rewind(u->sink, 0);
462         }
463 
464         on_timeout = pa_rtpoll_timer_elapsed(u->rtpoll);
465         if (u->rtpoll_item) {
466             pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, &nbfds);
467             /* If !oob: streaming driven by pollds (POLLOUT) */
468             if (pollfd && !u->oob && !pollfd->revents) {
469                 for (i = 0; i < nbfds; i++) {
470                     pollfd->events = POLLOUT;
471                     pollfd->revents = 0;
472 
473                     pollfd++;
474                 }
475 
476                 continue;
477             }
478 
479             /* if oob: streaming managed by timing, pollfd for oob sockets */
480             if (pollfd && u->oob && !on_timeout) {
481                 uint8_t packet[32];
482                 ssize_t read;
483 
484                 for (i = 0; i < nbfds; i++) {
485                     if (pollfd->revents & POLLERR) {
486                         if (u->autoreconnect && pa_raop_client_is_alive(u->raop)) {
487                             pollfd->revents = 0;
488                             pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink),
489                                               PA_SINK_MESSAGE_DISCONNECT_REQUEST, 0, 0, NULL, NULL);
490                             continue;
491                         }
492 
493                         /* one of UDP fds is in faulty state, may have been disconnected, this is fatal  */
494                         goto fail;
495                     }
496                     if (pollfd->revents & pollfd->events) {
497                         pollfd->revents = 0;
498                         read = pa_read(pollfd->fd, packet, sizeof(packet), NULL);
499                         pa_raop_client_handle_oob_packet(u->raop, pollfd->fd, packet, read);
500                         if (pa_raop_client_is_timing_fd(u->raop, pollfd->fd)) {
501                             last_timing = pa_rtclock_now();
502                             check_timing_count = 1;
503                         }
504                     }
505 
506                     pollfd++;
507                 }
508 
509                 continue;
510             }
511         }
512 
513         if (u->sink->thread_info.state != PA_SINK_RUNNING) {
514             continue;
515         }
516 
517         if (u->first) {
518             last_timing = 0;
519             check_timing_count = 1;
520             intvl = 0;
521             u->first = false;
522         }
523 
524         canstream = pa_raop_client_can_stream(u->raop);
525         now = pa_rtclock_now();
526 
527         if (u->oob && u->autoreconnect && on_timeout) {
528             if (!canstream) {
529                 last_timing = 0;
530             } else if (last_timing != 0) {
531                 pa_usec_t since = now - last_timing;
532                 /* Incoming Timing packets should be received every 3 seconds in UDP mode
533                    according to raop specifications.
534                    Here we disconnect if no packet received since UDP_TIMING_PACKET_LOSS_MAX seconds
535                    We only detect timing packet requests interruptions (we do nothing if no packet received at all), since some clients do not implement RTCP Timing requests at all */
536 
537                 if (since > (UDP_TIMING_PACKET_LOSS_MAX/UDP_TIMING_PACKET_DISCONNECT_CYCLE)*check_timing_count) {
538                     if (check_timing_count < UDP_TIMING_PACKET_DISCONNECT_CYCLE) {
539                         uint32_t since_in_sec = since / PA_USEC_PER_SEC;
540                         pa_log_warn(
541                                 "UDP Timing Packets Warn #%d/%d- Nothing received since %d seconds from %s",
542                                 check_timing_count,
543                                 UDP_TIMING_PACKET_DISCONNECT_CYCLE-1, since_in_sec, u->server);
544                         check_timing_count++;
545                     } else {
546                         /* Limit reached, then request disconnect */
547                         check_timing_count = 1;
548                         last_timing = 0;
549                         if (pa_raop_client_is_alive(u->raop)) {
550                             pa_log_warn("UDP Timing Packets Warn limit reached - Requesting reconnect");
551                             pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink),
552                                               PA_SINK_MESSAGE_DISCONNECT_REQUEST, 0, 0, NULL, NULL);
553                             continue;
554                         }
555                     }
556                 }
557             }
558         }
559 
560         if (!u->autonull) {
561             if (!canstream) {
562                 pa_log_debug("Can't stream, connection not established yet...");
563                 continue;
564             }
565             /* This assertion is meant to silence a complaint from Coverity about
566              * pollfd being possibly NULL when we access it later. That's a false
567              * positive, because we check pa_raop_client_can_stream() above, and if
568              * that returns true, it means that the connection is up, and when the
569              * connection is up, pollfd will be non-NULL. */
570             pa_assert(pollfd);
571         }
572 
573         if (u->memchunk.length <= 0) {
574             if (intvl < now + u->block_usec) {
575                 if (u->memchunk.memblock)
576                     pa_memblock_unref(u->memchunk.memblock);
577                 pa_memchunk_reset(&u->memchunk);
578 
579                 /* Grab unencoded audio data from PulseAudio */
580                 pa_sink_render_full(u->sink, u->block_size, &u->memchunk);
581                 offset = u->memchunk.index;
582             }
583         }
584 
585         if (u->memchunk.length > 0) {
586             index = u->memchunk.index;
587             sendstream = !u->autonull || (u->autonull && canstream);
588             if (sendstream && pa_raop_client_send_audio_packet(u->raop, &u->memchunk, offset) < 0) {
589                 if (errno == EINTR) {
590                     /* Just try again. */
591                     pa_log_debug("Failed to write data to FIFO (EINTR), retrying");
592                     if (u->autoreconnect) {
593                         pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), PA_SINK_MESSAGE_DISCONNECT_REQUEST,
594                                           0, 0, NULL, NULL);
595                         continue;
596                     } else
597                         goto fail;
598                 } else if (errno != EAGAIN && !u->oob) {
599                     /* Buffer is full, wait for POLLOUT. */
600                     if (!u->oob) {
601                         pollfd->events = POLLOUT;
602                         pollfd->revents = 0;
603                     }
604                 } else {
605                     pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno));
606                     if (u->autoreconnect) {
607                         pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), PA_SINK_MESSAGE_DISCONNECT_REQUEST,
608                                           0, 0, NULL, NULL);
609                         continue;
610                     } else
611                         goto fail;
612                 }
613             } else {
614                 if (sendstream) {
615                     u->write_count += (uint64_t) u->memchunk.index - (uint64_t) index;
616                 } else {
617                     u->write_count += u->memchunk.length;
618                     u->memchunk.length = 0;
619                 }
620                 position = u->write_count - pa_usec_to_bytes(u->delay, &u->sink->sample_spec);
621 
622                 now = pa_rtclock_now();
623 #ifdef USE_SMOOTHER_2
624                 pa_smoother_2_put(u->smoother, now, position);
625 #else
626                 estimated = pa_bytes_to_usec(position, &u->sink->sample_spec);
627                 pa_smoother_put(u->smoother, now, estimated);
628 #endif
629 
630                 if ((u->autonull && !canstream) || (u->oob && canstream && on_timeout)) {
631                     /* Sleep until next packet transmission */
632                     intvl = u->start + pa_bytes_to_usec(u->write_count, &u->sink->sample_spec);
633                     pa_rtpoll_set_timer_absolute(u->rtpoll, intvl);
634                 } else if (!u->oob) {
635                     if (u->memchunk.length > 0) {
636                         pollfd->events = POLLOUT;
637                         pollfd->revents = 0;
638                     } else {
639                         intvl = u->start + pa_bytes_to_usec(u->write_count, &u->sink->sample_spec);
640                         pa_rtpoll_set_timer_absolute(u->rtpoll, intvl);
641                         pollfd->revents = 0;
642                         pollfd->events = 0;
643                     }
644                 }
645             }
646         }
647     }
648 
649 fail:
650     /* If this was no regular exit from the loop we have to continue
651      * processing messages until we received PA_MESSAGE_SHUTDOWN */
652     pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
653     pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
654 
655 finish:
656     pa_log_debug("Thread shutting down");
657 }
658 
sink_set_port_cb(pa_sink * s,pa_device_port * p)659 static int sink_set_port_cb(pa_sink *s, pa_device_port *p) {
660     return 0;
661 }
662 
raop_create_port(struct userdata * u,const char * server)663 static pa_device_port *raop_create_port(struct userdata *u, const char *server) {
664     pa_device_port_new_data data;
665     pa_device_port *port;
666 
667     pa_device_port_new_data_init(&data);
668 
669     pa_device_port_new_data_set_name(&data, "network-output");
670     pa_device_port_new_data_set_description(&data, server);
671     pa_device_port_new_data_set_direction(&data, PA_DIRECTION_OUTPUT);
672     pa_device_port_new_data_set_type(&data, PA_DEVICE_PORT_TYPE_NETWORK);
673 
674     port = pa_device_port_new(u->core, &data, 0);
675 
676     pa_device_port_new_data_done(&data);
677 
678     if (port == NULL)
679         return NULL;
680 
681     pa_device_port_ref(port);
682 
683     return port;
684 }
685 
raop_create_profile()686 static pa_card_profile *raop_create_profile() {
687     pa_card_profile *profile;
688 
689     profile = pa_card_profile_new("RAOP", _("RAOP standard profile"), 0);
690     profile->priority = 10;
691     profile->n_sinks = 1;
692     profile->n_sources = 0;
693     profile->max_sink_channels = 2;
694     profile->max_source_channels = 0;
695 
696     return profile;
697 }
698 
raop_create_card(pa_module * m,pa_device_port * port,pa_card_profile * profile,const char * server,const char * nicename)699 static pa_card *raop_create_card(pa_module *m, pa_device_port *port, pa_card_profile *profile, const char *server, const char *nicename) {
700     pa_card_new_data data;
701     pa_card *card;
702     char *card_name;
703 
704     pa_card_new_data_init(&data);
705 
706     pa_proplist_sets(data.proplist, PA_PROP_DEVICE_STRING, server);
707     pa_proplist_sets(data.proplist, PA_PROP_DEVICE_DESCRIPTION, nicename);
708     data.driver = __FILE__;
709 
710     card_name = pa_sprintf_malloc("raop_client.%s", server);
711     pa_card_new_data_set_name(&data, card_name);
712     pa_xfree(card_name);
713 
714     pa_hashmap_put(data.ports, port->name, port);
715     pa_hashmap_put(data.profiles, profile->name, profile);
716 
717     card = pa_card_new(m->core, &data);
718 
719     pa_card_new_data_done(&data);
720 
721     if (card == NULL)
722         return NULL;
723 
724     pa_card_choose_initial_profile(card);
725 
726     pa_card_put(card);
727 
728     return card;
729 }
730 
pa_raop_sink_new(pa_module * m,pa_modargs * ma,const char * driver)731 pa_sink* pa_raop_sink_new(pa_module *m, pa_modargs *ma, const char *driver) {
732     struct userdata *u = NULL;
733     pa_sample_spec ss;
734     pa_channel_map map;
735     char *thread_name = NULL;
736     const char *server, *protocol, *encryption, *codec;
737     const char /* *username, */ *password;
738     pa_sink_new_data data;
739     const char *name = NULL;
740     const char *description = NULL;
741     pa_device_port *port;
742     pa_card_profile *profile;
743 
744     pa_assert(m);
745     pa_assert(ma);
746 
747     ss = m->core->default_sample_spec;
748     map = m->core->default_channel_map;
749 
750     if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
751         pa_log("Invalid sample format specification or channel map");
752         goto fail;
753     }
754 
755     if (!(server = pa_modargs_get_value(ma, "server", NULL))) {
756         pa_log("Failed to parse server argument");
757         goto fail;
758     }
759 
760     if (!(protocol = pa_modargs_get_value(ma, "protocol", NULL))) {
761         pa_log("Failed to parse protocol argument");
762         goto fail;
763     }
764 
765     u = pa_xnew0(struct userdata, 1);
766     u->core = m->core;
767     u->module = m;
768     u->thread = NULL;
769     u->rtpoll = pa_rtpoll_new();
770     u->rtpoll_item = NULL;
771     u->latency = RAOP_DEFAULT_LATENCY;
772     u->autoreconnect = false;
773     u->server = pa_xstrdup(server);
774 
775     if (pa_modargs_get_value_boolean(ma, "autoreconnect", &u->autoreconnect) < 0) {
776         pa_log("Failed to parse autoreconnect argument");
777         goto fail;
778     }
779     /* Linked for now, potentially ready for additional parameter */
780     u->autonull = u->autoreconnect;
781 
782     if (pa_modargs_get_value_u32(ma, "latency_msec", &u->latency) < 0) {
783         pa_log("Failed to parse latency_msec argument");
784         goto fail;
785     }
786 
787     if (pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll) < 0) {
788         pa_log("pa_thread_mq_init() failed.");
789         goto fail;
790     }
791 
792     u->oob = true;
793 
794     u->block_size = 0;
795     pa_memchunk_reset(&u->memchunk);
796 
797     u->delay = 0;
798 #ifdef USE_SMOOTHER_2
799     u->smoother = pa_smoother_2_new(5*PA_USEC_PER_SEC, pa_rtclock_now(), pa_frame_size(&ss), ss.rate);
800 #else
801     u->smoother = pa_smoother_new(
802             PA_USEC_PER_SEC,
803             PA_USEC_PER_SEC*2,
804             true,
805             true,
806             10,
807             0,
808             false);
809 #endif
810     u->write_count = 0;
811 
812     if (pa_streq(protocol, "TCP")) {
813         u->protocol = PA_RAOP_PROTOCOL_TCP;
814     } else if (pa_streq(protocol, "UDP")) {
815         u->protocol = PA_RAOP_PROTOCOL_UDP;
816     } else {
817         pa_log("Unsupported transport protocol argument: %s", protocol);
818         goto fail;
819     }
820 
821     encryption = pa_modargs_get_value(ma, "encryption", NULL);
822     codec = pa_modargs_get_value(ma, "codec", NULL);
823 
824     if (!encryption) {
825         u->encryption = PA_RAOP_ENCRYPTION_NONE;
826     } else if (pa_streq(encryption, "none")) {
827         u->encryption = PA_RAOP_ENCRYPTION_NONE;
828     } else if (pa_streq(encryption, "RSA")) {
829         u->encryption = PA_RAOP_ENCRYPTION_RSA;
830     } else {
831         pa_log("Unsupported encryption type argument: %s", encryption);
832         goto fail;
833     }
834 
835     if (!codec) {
836         u->codec = PA_RAOP_CODEC_PCM;
837     } else if (pa_streq(codec, "PCM")) {
838         u->codec = PA_RAOP_CODEC_PCM;
839     } else if (pa_streq(codec, "ALAC")) {
840         u->codec = PA_RAOP_CODEC_ALAC;
841     } else {
842         pa_log("Unsupported audio codec argument: %s", codec);
843         goto fail;
844     }
845 
846     pa_sink_new_data_init(&data);
847     data.driver = driver;
848     data.module = m;
849 
850     if ((name = pa_modargs_get_value(ma, "sink_name", NULL))) {
851         pa_sink_new_data_set_name(&data, name);
852     } else {
853         char *nick;
854 
855         if ((name = pa_modargs_get_value(ma, "name", NULL)))
856             nick = pa_sprintf_malloc("raop_client.%s", name);
857         else
858             nick = pa_sprintf_malloc("raop_client.%s", server);
859         pa_sink_new_data_set_name(&data, nick);
860         pa_xfree(nick);
861     }
862 
863     pa_sink_new_data_set_sample_spec(&data, &ss);
864     pa_sink_new_data_set_channel_map(&data, &map);
865 
866     pa_proplist_sets(data.proplist, PA_PROP_DEVICE_STRING, server);
867     pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "RAOP sink '%s'", server);
868 
869     if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
870         pa_log("Invalid properties");
871         pa_sink_new_data_done(&data);
872         goto fail;
873     }
874 
875     port = raop_create_port(u, server);
876     if (port == NULL) {
877         pa_log("Failed to create port object");
878         goto fail;
879     }
880 
881     profile = raop_create_profile();
882     pa_hashmap_put(port->profiles, profile->name, profile);
883 
884     description = pa_proplist_gets(data.proplist, PA_PROP_DEVICE_DESCRIPTION);
885     if (description == NULL)
886         description = server;
887 
888     u->card = raop_create_card(m, port, profile, server, description);
889     if (u->card == NULL) {
890         pa_log("Failed to create card object");
891         goto fail;
892     }
893 
894     data.card = u->card;
895     pa_hashmap_put(data.ports, port->name, port);
896 
897     u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY | PA_SINK_NETWORK);
898     pa_sink_new_data_done(&data);
899 
900     if (!(u->sink)) {
901         pa_log("Failed to create sink object");
902         goto fail;
903     }
904 
905     u->sink->parent.process_msg = sink_process_msg;
906     u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
907     pa_sink_set_set_volume_callback(u->sink, sink_set_volume_cb);
908     pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
909     u->sink->userdata = u;
910     u->sink->set_port = sink_set_port_cb;
911 
912     pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
913     pa_sink_set_rtpoll(u->sink, u->rtpoll);
914 
915     u->raop = pa_raop_client_new(u->core, server, u->protocol, u->encryption, u->codec, u->autoreconnect);
916 
917     if (!(u->raop)) {
918         pa_log("Failed to create RAOP client object");
919         goto fail;
920     }
921 
922     /* The number of frames per blocks is not negotiable... */
923     pa_raop_client_get_frames_per_block(u->raop, &u->block_size);
924     u->block_size *= pa_frame_size(&ss);
925     pa_sink_set_max_request(u->sink, u->block_size);
926     u->block_usec = pa_bytes_to_usec(u->block_size, &u->sink->sample_spec);
927 
928     pa_raop_client_set_state_callback(u->raop, raop_state_cb, u);
929 
930     thread_name = pa_sprintf_malloc("raop-sink-%s", server);
931     if (!(u->thread = pa_thread_new(thread_name, thread_func, u))) {
932         pa_log("Failed to create sink thread");
933         goto fail;
934     }
935     pa_xfree(thread_name);
936     thread_name = NULL;
937 
938     pa_sink_put(u->sink);
939 
940     /* username = pa_modargs_get_value(ma, "username", NULL); */
941     password = pa_modargs_get_value(ma, "password", NULL);
942     pa_raop_client_authenticate(u->raop, password );
943 
944     return u->sink;
945 
946 fail:
947     pa_xfree(thread_name);
948 
949     if (u)
950         userdata_free(u);
951 
952     return NULL;
953 }
954 
userdata_free(struct userdata * u)955 static void userdata_free(struct userdata *u) {
956     pa_assert(u);
957 
958     if (u->sink)
959         pa_sink_unlink(u->sink);
960 
961     if (u->thread) {
962         pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
963         pa_thread_free(u->thread);
964     }
965 
966     pa_thread_mq_done(&u->thread_mq);
967 
968     if (u->sink)
969         pa_sink_unref(u->sink);
970     u->sink = NULL;
971 
972     if (u->rtpoll_item)
973         pa_rtpoll_item_free(u->rtpoll_item);
974     if (u->rtpoll)
975         pa_rtpoll_free(u->rtpoll);
976     u->rtpoll_item = NULL;
977     u->rtpoll = NULL;
978 
979     if (u->memchunk.memblock)
980         pa_memblock_unref(u->memchunk.memblock);
981 
982     if (u->raop)
983         pa_raop_client_free(u->raop);
984     u->raop = NULL;
985 
986     if (u->smoother)
987 #ifdef USE_SMOOTHER_2
988         pa_smoother_2_free(u->smoother);
989 #else
990         pa_smoother_free(u->smoother);
991 #endif
992     u->smoother = NULL;
993 
994     if (u->card)
995         pa_card_free(u->card);
996     if (u->server)
997         pa_xfree(u->server);
998 
999     pa_xfree(u);
1000 }
1001 
pa_raop_sink_free(pa_sink * s)1002 void pa_raop_sink_free(pa_sink *s) {
1003     struct userdata *u;
1004 
1005     pa_sink_assert_ref(s);
1006     pa_assert_se(u = s->userdata);
1007 
1008     userdata_free(u);
1009 }
1010