• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 
2 /***
3   This file is part of PulseAudio.
4 
5   Copyright 2006 Lennart Poettering
6 
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11 
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16 
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20 
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24 
25 #include <stdio.h>
26 #include <sys/socket.h>
27 #include <netinet/in.h>
28 #include <errno.h>
29 #include <string.h>
30 #include <unistd.h>
31 #include <math.h>
32 
33 #include <pulse/rtclock.h>
34 #include <pulse/timeval.h>
35 #include <pulse/xmalloc.h>
36 
37 #include <pulsecore/core-error.h>
38 #include <pulsecore/module.h>
39 #include <pulsecore/llist.h>
40 #include <pulsecore/sink.h>
41 #include <pulsecore/sink-input.h>
42 #include <pulsecore/memblockq.h>
43 #include <pulsecore/log.h>
44 #include <pulsecore/core-rtclock.h>
45 #include <pulsecore/core-util.h>
46 #include <pulsecore/modargs.h>
47 #include <pulsecore/namereg.h>
48 #include <pulsecore/sample-util.h>
49 #include <pulsecore/macro.h>
50 #include <pulsecore/socket-util.h>
51 #include <pulsecore/atomic.h>
52 #include <pulsecore/once.h>
53 #include <pulsecore/poll.h>
54 #include <pulsecore/arpa-inet.h>
55 
56 #include "rtp.h"
57 #include "sdp.h"
58 #include "sap.h"
59 
60 PA_MODULE_AUTHOR("Lennart Poettering");
61 PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
62 PA_MODULE_VERSION(PACKAGE_VERSION);
63 PA_MODULE_LOAD_ONCE(false);
64 PA_MODULE_USAGE(
65         "sink=<name of the sink> "
66         "sap_address=<multicast address to listen on> "
67         "latency_msec=<latency in ms> "
68 );
69 
70 #define SAP_PORT 9875
71 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
72 #define DEFAULT_LATENCY_MSEC 500
73 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
74 #define MAX_SESSIONS 16
75 #define DEATH_TIMEOUT 20
76 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
77 
78 static const char* const valid_modargs[] = {
79     "sink",
80     "sap_address",
81     "latency_msec",
82     NULL
83 };
84 
85 struct session {
86     struct userdata *userdata;
87     PA_LLIST_FIELDS(struct session);
88 
89     pa_sink_input *sink_input;
90     pa_memblockq *memblockq;
91 
92     bool first_packet;
93     uint32_t offset;
94 
95     struct pa_sdp_info sdp_info;
96 
97     pa_rtp_context *rtp_context;
98 
99     pa_rtpoll_item *rtpoll_item;
100 
101     pa_atomic_t timestamp;
102 
103     pa_usec_t intended_latency;
104     pa_usec_t sink_latency;
105 
106     unsigned int base_rate;
107     pa_usec_t last_rate_update;
108     pa_usec_t last_latency;
109     double estimated_rate;
110     double avg_estimated_rate;
111 };
112 
113 struct userdata {
114     pa_module *module;
115     pa_core *core;
116 
117     pa_sap_context sap_context;
118     pa_io_event* sap_event;
119 
120     pa_time_event *check_death_event;
121 
122     char *sink_name;
123 
124     PA_LLIST_HEAD(struct session, sessions);
125     pa_hashmap *by_origin;
126     int n_sessions;
127 
128     pa_usec_t latency;
129 };
130 
131 static void session_free(struct session *s);
132 
133 /* Called from I/O thread context */
sink_input_process_msg(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)134 static int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
135     struct session *s = PA_SINK_INPUT(o)->userdata;
136 
137     switch (code) {
138         case PA_SINK_INPUT_MESSAGE_GET_LATENCY:
139             *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
140 
141             /* Fall through, the default handler will add in the extra
142              * latency added by the resampler */
143             break;
144     }
145 
146     return pa_sink_input_process_msg(o, code, data, offset, chunk);
147 }
148 
149 /* Called from I/O thread context */
sink_input_pop_cb(pa_sink_input * i,size_t length,pa_memchunk * chunk)150 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
151     struct session *s;
152     pa_sink_input_assert_ref(i);
153     pa_assert_se(s = i->userdata);
154 
155     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
156         return -1;
157 
158     pa_memblockq_drop(s->memblockq, chunk->length);
159 
160     return 0;
161 }
162 
163 /* Called from I/O thread context */
sink_input_process_rewind_cb(pa_sink_input * i,size_t nbytes)164 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
165     struct session *s;
166 
167     pa_sink_input_assert_ref(i);
168     pa_assert_se(s = i->userdata);
169 
170     pa_memblockq_rewind(s->memblockq, nbytes);
171 }
172 
173 /* Called from I/O thread context */
sink_input_update_max_rewind_cb(pa_sink_input * i,size_t nbytes)174 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
175     struct session *s;
176 
177     pa_sink_input_assert_ref(i);
178     pa_assert_se(s = i->userdata);
179 
180     pa_memblockq_set_maxrewind(s->memblockq, nbytes);
181 }
182 
183 /* Called from main context */
sink_input_kill(pa_sink_input * i)184 static void sink_input_kill(pa_sink_input* i) {
185     struct session *s;
186     pa_sink_input_assert_ref(i);
187     pa_assert_se(s = i->userdata);
188 
189     pa_hashmap_remove_and_free(s->userdata->by_origin, s->sdp_info.origin);
190 }
191 
192 /* Called from IO context */
sink_input_suspend_within_thread(pa_sink_input * i,bool b)193 static void sink_input_suspend_within_thread(pa_sink_input* i, bool b) {
194     struct session *s;
195     pa_sink_input_assert_ref(i);
196     pa_assert_se(s = i->userdata);
197 
198     if (b)
199         pa_memblockq_flush_read(s->memblockq);
200     else
201         s->first_packet = false;
202 }
203 
204 /* Called from I/O thread context */
rtpoll_work_cb(pa_rtpoll_item * i)205 static int rtpoll_work_cb(pa_rtpoll_item *i) {
206     pa_memchunk chunk;
207     uint32_t timestamp;
208     int64_t k, j, delta;
209     struct timeval now = { 0, 0 };
210     struct session *s;
211     struct pollfd *p;
212 
213     pa_assert_se(s = pa_rtpoll_item_get_work_userdata(i));
214 
215     p = pa_rtpoll_item_get_pollfd(i, NULL);
216 
217     if (p->revents & (POLLERR|POLLNVAL|POLLHUP|POLLOUT)) {
218         pa_log("poll() signalled bad revents.");
219         return -1;
220     }
221 
222     if ((p->revents & POLLIN) == 0)
223         return 0;
224 
225     p->revents = 0;
226 
227     if (pa_rtp_recv(s->rtp_context, &chunk, s->userdata->module->core->mempool, &timestamp, &now) < 0)
228         return 0;
229 
230     if (!PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) {
231         pa_memblock_unref(chunk.memblock);
232         return 0;
233     }
234 
235     if (!s->first_packet) {
236         s->first_packet = true;
237         s->offset = timestamp;
238     }
239 
240     /* Check whether there was a timestamp overflow */
241     k = (int64_t) timestamp - (int64_t) s->offset;
242     j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) timestamp;
243 
244     if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
245         delta = k;
246     else
247         delta = j;
248 
249     pa_memblockq_seek(s->memblockq, delta * (int64_t) pa_rtp_context_get_frame_size(s->rtp_context), PA_SEEK_RELATIVE,
250             true);
251 
252     if (now.tv_sec == 0) {
253         PA_ONCE_BEGIN {
254             pa_log_warn("Using artificial time instead of timestamp");
255         } PA_ONCE_END;
256         pa_rtclock_get(&now);
257     } else
258         pa_rtclock_from_wallclock(&now);
259 
260     if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
261         pa_log_warn("Queue overrun");
262         pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, true);
263     }
264 
265 /*     pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
266 
267     pa_memblock_unref(chunk.memblock);
268 
269     /* The next timestamp we expect */
270     s->offset = timestamp + (uint32_t) (chunk.length / pa_rtp_context_get_frame_size(s->rtp_context));
271 
272     pa_atomic_store(&s->timestamp, (int) now.tv_sec);
273 
274     if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) {
275         pa_usec_t wi, ri, render_delay, sink_delay = 0, latency;
276         uint32_t current_rate = s->sink_input->sample_spec.rate;
277         uint32_t new_rate;
278         double estimated_rate, alpha = 0.02;
279 
280         pa_log_debug("Updating sample rate");
281 
282         wi = pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec);
283         ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec);
284 
285         pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi, (unsigned long) ri);
286 
287         sink_delay = pa_sink_get_latency_within_thread(s->sink_input->sink, false);
288         render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec);
289 
290         if (ri > render_delay+sink_delay)
291             ri -= render_delay+sink_delay;
292         else
293             ri = 0;
294 
295         if (wi < ri)
296             latency = 0;
297         else
298             latency = wi - ri;
299 
300         pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double) s->intended_latency/PA_USEC_PER_MSEC);
301 
302         /* The buffer is filling with some unknown rate R̂ samples/second. If the rate of reading in
303          * the last T seconds was Rⁿ, then the increase in buffer latency ΔLⁿ = Lⁿ - Lⁿ⁻ⁱ in that
304          * same period is ΔLⁿ = (TR̂ - TRⁿ) / R̂, giving the estimated target rate
305          *                                           T
306          *                                 R̂ = ─────────────── Rⁿ .                             (1)
307          *                                     T - (Lⁿ - Lⁿ⁻ⁱ)
308          *
309          * Setting the sample rate to R̂ results in the latency being constant (if the estimate of R̂
310          * is correct).  But there is also the requirement to keep the buffer at a predefined target
311          * latency L̂.  So instead of setting Rⁿ⁺ⁱ to R̂ immediately, the strategy will be to reduce R
312          * from Rⁿ⁺ⁱ to R̂ in a steps of T seconds, where Rⁿ⁺ⁱ is chosen such that in the total time
313          * aT the latency is reduced from Lⁿ to L̂.  This strategy translates to the requirements
314          *            ₐ      R̂ - Rⁿ⁺ʲ                            a-j+1         j-1
315          *            Σ  T ────────── = L̂ - Lⁿ    with    Rⁿ⁺ʲ = ───── Rⁿ⁺ⁱ + ───── R̂ .
316          *           ʲ⁼ⁱ        R̂                                  a            a
317          * Solving for Rⁿ⁺ⁱ gives
318          *                                     T - ²∕ₐ₊₁(L̂ - Lⁿ)
319          *                              Rⁿ⁺ⁱ = ───────────────── R̂ .                            (2)
320          *                                            T
321          * In the code below a = 7 is used.
322          *
323          * Equation (1) is not directly used in (2), but instead an exponentially weighted average
324          * of the estimated rate R̂ is used.  This average R̅ is defined as
325          *                                R̅ⁿ = α R̂ⁿ + (1-α) R̅ⁿ⁻ⁱ .
326          * Because it is difficult to find a fixed value for the coefficient α such that the
327          * averaging is without significant lag but oscillations are filtered out, a heuristic is
328          * used.  When the successive estimates R̂ⁿ do not change much then α→1, but when there is a
329          * sudden spike in the estimated rate α→0, such that the deviation is given little weight.
330          */
331         estimated_rate = (double) current_rate * (double) RATE_UPDATE_INTERVAL / (double) (RATE_UPDATE_INTERVAL + s->last_latency - latency);
332         if (fabs(s->estimated_rate - s->avg_estimated_rate) > 1) {
333           double ratio = (estimated_rate + s->estimated_rate - 2*s->avg_estimated_rate) / (s->estimated_rate - s->avg_estimated_rate);
334           alpha = PA_CLAMP(2 * (ratio + fabs(ratio)) / (4 + ratio*ratio), 0.02, 0.8);
335         }
336         s->avg_estimated_rate = alpha * estimated_rate + (1-alpha) * s->avg_estimated_rate;
337         s->estimated_rate = estimated_rate;
338         pa_log_debug("Estimated target rate: %.0f Hz, using average of %.0f Hz  (α=%.3f)", estimated_rate, s->avg_estimated_rate, alpha);
339         new_rate = (uint32_t) ((double) (RATE_UPDATE_INTERVAL + latency/4 - s->intended_latency/4) / (double) RATE_UPDATE_INTERVAL * s->avg_estimated_rate);
340         s->last_latency = latency;
341 
342         if (new_rate < (uint32_t) (s->base_rate*0.8) || new_rate > (uint32_t) (s->base_rate*1.25)) {
343             pa_log_warn("Sample rates too different, not adjusting (%u vs. %u).", s->base_rate, new_rate);
344             new_rate = s->base_rate;
345         } else {
346             if (s->base_rate < new_rate + 20 && new_rate < s->base_rate + 20)
347                 new_rate = s->base_rate;
348             /* Do the adjustment in small steps; 2‰ can be considered inaudible */
349             if (new_rate < (uint32_t) (current_rate*0.998) || new_rate > (uint32_t) (current_rate*1.002)) {
350                 pa_log_info("New rate of %u Hz not within 2‰ of %u Hz, forcing smaller adjustment", new_rate, current_rate);
351                 new_rate = PA_CLAMP(new_rate, (uint32_t) (current_rate*0.998), (uint32_t) (current_rate*1.002));
352             }
353         }
354         s->sink_input->sample_spec.rate = new_rate;
355 
356         pa_assert(pa_sample_spec_valid(&s->sink_input->sample_spec));
357 
358         pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate);
359 
360         pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s->sink_input->sample_spec.rate);
361 
362         s->last_rate_update = pa_timeval_load(&now);
363     }
364 
365     if (pa_memblockq_is_readable(s->memblockq) &&
366         s->sink_input->thread_info.underrun_for > 0) {
367         pa_log_debug("Requesting rewind due to end of underrun");
368         pa_sink_input_request_rewind(s->sink_input,
369                                      (size_t) (s->sink_input->thread_info.underrun_for == (uint64_t) -1 ? 0 : s->sink_input->thread_info.underrun_for),
370                                      false, true, false);
371     }
372 
373     return 1;
374 }
375 
376 /* Called from I/O thread context */
sink_input_attach(pa_sink_input * i)377 static void sink_input_attach(pa_sink_input *i) {
378     struct session *s;
379 
380     pa_sink_input_assert_ref(i);
381     pa_assert_se(s = i->userdata);
382 
383     pa_assert(!s->rtpoll_item);
384     s->rtpoll_item = pa_rtp_context_get_rtpoll_item(s->rtp_context, i->sink->thread_info.rtpoll);
385 
386     pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb, s);
387 }
388 
389 /* Called from I/O thread context */
sink_input_detach(pa_sink_input * i)390 static void sink_input_detach(pa_sink_input *i) {
391     struct session *s;
392     pa_sink_input_assert_ref(i);
393     pa_assert_se(s = i->userdata);
394 
395     pa_assert(s->rtpoll_item);
396     pa_rtpoll_item_free(s->rtpoll_item);
397     s->rtpoll_item = NULL;
398 }
399 
mcast_socket(const struct sockaddr * sa,socklen_t salen)400 static int mcast_socket(const struct sockaddr* sa, socklen_t salen) {
401     int af, fd = -1, r, one;
402 
403     pa_assert(sa);
404     pa_assert(salen > 0);
405 
406     af = sa->sa_family;
407     if ((fd = pa_socket_cloexec(af, SOCK_DGRAM, 0)) < 0) {
408         pa_log("Failed to create socket: %s", pa_cstrerror(errno));
409         goto fail;
410     }
411 
412     pa_make_udp_socket_low_delay(fd);
413 
414 #ifdef SO_TIMESTAMP
415     one = 1;
416     if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) {
417         pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno));
418         goto fail;
419     }
420 #else
421     pa_log("SO_TIMESTAMP unsupported on this platform");
422     goto fail;
423 #endif
424 
425     one = 1;
426     if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
427         pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno));
428         goto fail;
429     }
430 
431     r = 0;
432     if (af == AF_INET) {
433         /* IPv4 multicast addresses are in the 224.0.0.0-239.255.255.255 range */
434         static const uint32_t ipv4_mcast_mask = 0xe0000000;
435 
436         if ((ntohl(((const struct sockaddr_in*) sa)->sin_addr.s_addr) & ipv4_mcast_mask) == ipv4_mcast_mask) {
437             struct ip_mreq mr4;
438             memset(&mr4, 0, sizeof(mr4));
439             mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr;
440             r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
441         }
442 #ifdef HAVE_IPV6
443     } else if (af == AF_INET6) {
444         /* IPv6 multicast addresses have 255 as the most significant byte */
445         if (((const struct sockaddr_in6*) sa)->sin6_addr.s6_addr[0] == 0xff) {
446             struct ipv6_mreq mr6;
447             memset(&mr6, 0, sizeof(mr6));
448             mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr;
449             r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6));
450         }
451 #endif
452     } else
453         pa_assert_not_reached();
454 
455     if (r < 0) {
456         pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno));
457         goto fail;
458     }
459 
460     if (bind(fd, sa, salen) < 0) {
461         pa_log("bind() failed: %s", pa_cstrerror(errno));
462         goto fail;
463     }
464 
465     return fd;
466 
467 fail:
468     if (fd >= 0)
469         close(fd);
470 
471     return -1;
472 }
473 
session_new(struct userdata * u,const pa_sdp_info * sdp_info)474 static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) {
475     struct session *s = NULL;
476     pa_sink *sink;
477     int fd = -1;
478     pa_memchunk silence;
479     pa_sink_input_new_data data;
480     struct timeval now;
481 
482     pa_assert(u);
483     pa_assert(sdp_info);
484 
485     if (u->n_sessions >= MAX_SESSIONS) {
486         pa_log("Session limit reached.");
487         goto fail;
488     }
489 
490     if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK))) {
491         pa_log("Sink does not exist.");
492         goto fail;
493     }
494 
495     pa_rtclock_get(&now);
496 
497     s = pa_xnew0(struct session, 1);
498     s->userdata = u;
499     s->first_packet = false;
500     s->sdp_info = *sdp_info;
501     s->rtpoll_item = NULL;
502     s->intended_latency = u->latency;
503     s->last_rate_update = pa_timeval_load(&now);
504     s->last_latency = u->latency;
505     pa_atomic_store(&s->timestamp, (int) now.tv_sec);
506 
507     if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0)
508         goto fail;
509 
510     pa_sink_input_new_data_init(&data);
511     pa_sink_input_new_data_set_sink(&data, sink, false, true);
512     data.driver = __FILE__;
513     pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream");
514     pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME,
515                      "RTP Stream%s%s%s",
516                      sdp_info->session_name ? " (" : "",
517                      sdp_info->session_name ? sdp_info->session_name : "",
518                      sdp_info->session_name ? ")" : "");
519 
520     if (sdp_info->session_name)
521         pa_proplist_sets(data.proplist, "rtp.session", sdp_info->session_name);
522     pa_proplist_sets(data.proplist, "rtp.origin", sdp_info->origin);
523     pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload);
524     data.module = u->module;
525     pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec);
526     data.flags = PA_SINK_INPUT_VARIABLE_RATE;
527 
528     pa_sink_input_new(&s->sink_input, u->module->core, &data);
529     pa_sink_input_new_data_done(&data);
530 
531     if (!s->sink_input) {
532         pa_log("Failed to create sink input.");
533         goto fail;
534     }
535 
536     s->base_rate = (double) s->sink_input->sample_spec.rate;
537     s->estimated_rate = (double) s->sink_input->sample_spec.rate;
538     s->avg_estimated_rate = (double) s->sink_input->sample_spec.rate;
539 
540     s->sink_input->userdata = s;
541 
542     s->sink_input->parent.process_msg = sink_input_process_msg;
543     s->sink_input->pop = sink_input_pop_cb;
544     s->sink_input->process_rewind = sink_input_process_rewind_cb;
545     s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
546     s->sink_input->kill = sink_input_kill;
547     s->sink_input->attach = sink_input_attach;
548     s->sink_input->detach = sink_input_detach;
549     s->sink_input->suspend_within_thread = sink_input_suspend_within_thread;
550 
551     pa_sink_input_get_silence(s->sink_input, &silence);
552 
553     s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, s->intended_latency/2);
554 
555     if (s->intended_latency < s->sink_latency*2)
556         s->intended_latency = s->sink_latency*2;
557 
558     s->memblockq = pa_memblockq_new(
559             "module-rtp-recv memblockq",
560             0,
561             MEMBLOCKQ_MAXLENGTH,
562             MEMBLOCKQ_MAXLENGTH,
563             &s->sink_input->sample_spec,
564             pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec),
565             0,
566             0,
567             &silence);
568 
569     pa_memblock_unref(silence.memblock);
570 
571     if (!(s->rtp_context = pa_rtp_context_new_recv(fd, sdp_info->payload, &s->sdp_info.sample_spec)))
572         goto fail;
573 
574     pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
575     u->n_sessions++;
576     PA_LLIST_PREPEND(struct session, s->userdata->sessions, s);
577 
578     pa_sink_input_put(s->sink_input);
579 
580     pa_log_info("New session '%s'", s->sdp_info.session_name);
581 
582     return s;
583 
584 fail:
585     pa_xfree(s);
586 
587     if (fd >= 0)
588         pa_close(fd);
589 
590     return NULL;
591 }
592 
session_free(struct session * s)593 static void session_free(struct session *s) {
594     pa_assert(s);
595 
596     pa_log_info("Freeing session '%s'", s->sdp_info.session_name);
597 
598     pa_sink_input_unlink(s->sink_input);
599     pa_sink_input_unref(s->sink_input);
600 
601     PA_LLIST_REMOVE(struct session, s->userdata->sessions, s);
602     pa_assert(s->userdata->n_sessions >= 1);
603     s->userdata->n_sessions--;
604 
605     pa_memblockq_free(s->memblockq);
606     pa_sdp_info_destroy(&s->sdp_info);
607     pa_rtp_context_free(s->rtp_context);
608 
609     pa_xfree(s);
610 }
611 
sap_event_cb(pa_mainloop_api * m,pa_io_event * e,int fd,pa_io_event_flags_t flags,void * userdata)612 static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) {
613     struct userdata *u = userdata;
614     bool goodbye = false;
615     pa_sdp_info info;
616     struct session *s;
617 
618     pa_assert(m);
619     pa_assert(e);
620     pa_assert(u);
621     pa_assert(fd == u->sap_context.fd);
622     pa_assert(flags == PA_IO_EVENT_INPUT);
623 
624     if (pa_sap_recv(&u->sap_context, &goodbye) < 0)
625         return;
626 
627     if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye))
628         return;
629 
630     if (goodbye) {
631         pa_hashmap_remove_and_free(u->by_origin, info.origin);
632         pa_sdp_info_destroy(&info);
633     } else {
634 
635         if (!(s = pa_hashmap_get(u->by_origin, info.origin))) {
636             if (!session_new(u, &info))
637                 pa_sdp_info_destroy(&info);
638 
639         } else {
640             struct timeval now;
641             pa_rtclock_get(&now);
642             pa_atomic_store(&s->timestamp, (int) now.tv_sec);
643 
644             pa_sdp_info_destroy(&info);
645         }
646     }
647 }
648 
check_death_event_cb(pa_mainloop_api * m,pa_time_event * t,const struct timeval * tv,void * userdata)649 static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *tv, void *userdata) {
650     struct session *s, *n;
651     struct userdata *u = userdata;
652     struct timeval now;
653 
654     pa_assert(m);
655     pa_assert(t);
656     pa_assert(u);
657 
658     pa_rtclock_get(&now);
659 
660     pa_log_debug("Checking for dead streams ...");
661 
662     for (s = u->sessions; s; s = n) {
663         int k;
664         n = s->next;
665 
666         k = pa_atomic_load(&s->timestamp);
667 
668         if (k + DEATH_TIMEOUT < now.tv_sec)
669             pa_hashmap_remove_and_free(u->by_origin, s->sdp_info.origin);
670     }
671 
672     /* Restart timer */
673     pa_core_rttime_restart(u->module->core, t, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC);
674 }
675 
pa__init(pa_module * m)676 int pa__init(pa_module*m) {
677     struct userdata *u;
678     pa_modargs *ma = NULL;
679     struct sockaddr_in sa4;
680 #ifdef HAVE_IPV6
681     struct sockaddr_in6 sa6;
682 #endif
683     struct sockaddr *sa;
684     socklen_t salen;
685     const char *sap_address;
686     uint32_t latency_msec;
687     int fd = -1;
688 
689     pa_assert(m);
690 
691     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
692         pa_log("failed to parse module arguments");
693         goto fail;
694     }
695 
696     sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS);
697 
698     if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) {
699         sa4.sin_family = AF_INET;
700         sa4.sin_port = htons(SAP_PORT);
701         sa = (struct sockaddr*) &sa4;
702         salen = sizeof(sa4);
703 #ifdef HAVE_IPV6
704     } else if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) {
705         sa6.sin6_family = AF_INET6;
706         sa6.sin6_port = htons(SAP_PORT);
707         sa = (struct sockaddr*) &sa6;
708         salen = sizeof(sa6);
709 #endif
710     } else {
711         pa_log("Invalid SAP address '%s'", sap_address);
712         goto fail;
713     }
714 
715     latency_msec = DEFAULT_LATENCY_MSEC;
716     if (pa_modargs_get_value_u32(ma, "latency_msec", &latency_msec) < 0 || latency_msec < 1 || latency_msec > 300000) {
717         pa_log("Invalid latency specification");
718         goto fail;
719     }
720 
721     if ((fd = mcast_socket(sa, salen)) < 0)
722         goto fail;
723 
724     m->userdata = u = pa_xnew(struct userdata, 1);
725     u->module = m;
726     u->core = m->core;
727     u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
728     u->latency = (pa_usec_t) latency_msec * PA_USEC_PER_MSEC;
729 
730     u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u);
731     pa_sap_context_init_recv(&u->sap_context, fd);
732 
733     PA_LLIST_HEAD_INIT(struct session, u->sessions);
734     u->n_sessions = 0;
735     u->by_origin = pa_hashmap_new_full(pa_idxset_string_hash_func, pa_idxset_string_compare_func, NULL, (pa_free_cb_t) session_free);
736 
737     u->check_death_event = pa_core_rttime_new(m->core, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC, check_death_event_cb, u);
738 
739     pa_modargs_free(ma);
740 
741     return 0;
742 
743 fail:
744     if (ma)
745         pa_modargs_free(ma);
746 
747     if (fd >= 0)
748         pa_close(fd);
749 
750     return -1;
751 }
752 
pa__done(pa_module * m)753 void pa__done(pa_module*m) {
754     struct userdata *u;
755 
756     pa_assert(m);
757 
758     if (!(u = m->userdata))
759         return;
760 
761     if (u->sap_event)
762         m->core->mainloop->io_free(u->sap_event);
763 
764     if (u->check_death_event)
765         m->core->mainloop->time_free(u->check_death_event);
766 
767     pa_sap_context_destroy(&u->sap_context);
768 
769     if (u->by_origin)
770         pa_hashmap_free(u->by_origin);
771 
772     pa_xfree(u->sink_name);
773     pa_xfree(u);
774 }
775