• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 
2 /***
3   This file is part of PulseAudio.
4 
5   Copyright 2006 Lennart Poettering
6 
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11 
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16 
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20 
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24 
25 #include <stdio.h>
26 #include <sys/socket.h>
27 #include <netinet/in.h>
28 #include <errno.h>
29 #include <string.h>
30 #include <unistd.h>
31 #include <math.h>
32 
33 #include <pulse/rtclock.h>
34 #include <pulse/timeval.h>
35 #include <pulse/xmalloc.h>
36 
37 #include <pulsecore/core-error.h>
38 #include <pulsecore/module.h>
39 #include <pulsecore/llist.h>
40 #include <pulsecore/sink.h>
41 #include <pulsecore/sink-input.h>
42 #include <pulsecore/memblockq.h>
43 #include <pulsecore/log.h>
44 #include <pulsecore/core-rtclock.h>
45 #include <pulsecore/core-util.h>
46 #include <pulsecore/modargs.h>
47 #include <pulsecore/namereg.h>
48 #include <pulsecore/sample-util.h>
49 #include <pulsecore/macro.h>
50 #include <pulsecore/socket-util.h>
51 #include <pulsecore/atomic.h>
52 #include <pulsecore/once.h>
53 #include <pulsecore/poll.h>
54 #include <pulsecore/arpa-inet.h>
55 
56 #include "rtp.h"
57 #include "sdp.h"
58 #include "sap.h"
59 
60 PA_MODULE_AUTHOR("Lennart Poettering");
61 PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
62 PA_MODULE_VERSION(PACKAGE_VERSION);
63 PA_MODULE_LOAD_ONCE(false);
64 PA_MODULE_USAGE(
65         "sink=<name of the sink> "
66         "sap_address=<multicast address to listen on> "
67         "latency_msec=<latency in ms> "
68 );
69 
70 #define SAP_PORT 9875
71 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
72 #define DEFAULT_LATENCY_MSEC 500
73 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
74 #define MAX_SESSIONS 16
75 #define DEATH_TIMEOUT 20
76 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
77 
78 static const char* const valid_modargs[] = {
79     "sink",
80     "sap_address",
81     "latency_msec",
82     NULL
83 };
84 
85 struct session {
86     struct userdata *userdata;
87     PA_LLIST_FIELDS(struct session);
88 
89     pa_sink_input *sink_input;
90     pa_memblockq *memblockq;
91 
92     bool first_packet;
93     uint32_t offset;
94 
95     struct pa_sdp_info sdp_info;
96 
97     pa_rtp_context *rtp_context;
98 
99     pa_rtpoll_item *rtpoll_item;
100 
101     pa_atomic_t timestamp;
102 
103     pa_usec_t intended_latency;
104     pa_usec_t sink_latency;
105 
106     unsigned int base_rate;
107     pa_usec_t last_rate_update;
108     pa_usec_t last_latency;
109     double estimated_rate;
110     double avg_estimated_rate;
111 };
112 
113 struct userdata {
114     pa_module *module;
115     pa_core *core;
116 
117     pa_sap_context sap_context;
118     pa_io_event* sap_event;
119 
120     pa_time_event *check_death_event;
121 
122     char *sink_name;
123 
124     PA_LLIST_HEAD(struct session, sessions);
125     pa_hashmap *by_origin;
126     int n_sessions;
127 
128     pa_usec_t latency;
129 };
130 
131 static void session_free(struct session *s);
132 
133 /* Called from I/O thread context */
sink_input_process_msg(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)134 static int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
135     struct session *s = PA_SINK_INPUT(o)->userdata;
136 
137     switch (code) {
138         case PA_SINK_INPUT_MESSAGE_GET_LATENCY:
139             *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
140 
141             /* Fall through, the default handler will add in the extra
142              * latency added by the resampler */
143             break;
144     }
145 
146     return pa_sink_input_process_msg(o, code, data, offset, chunk);
147 }
148 
149 /* Called from I/O thread context */
sink_input_pop_cb(pa_sink_input * i,size_t length,pa_memchunk * chunk)150 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
151     struct session *s;
152     pa_sink_input_assert_ref(i);
153     pa_assert_se(s = i->userdata);
154 
155     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
156         return -1;
157 
158     pa_memblockq_drop(s->memblockq, chunk->length);
159 
160     return 0;
161 }
162 
163 /* Called from I/O thread context */
sink_input_process_rewind_cb(pa_sink_input * i,size_t nbytes)164 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
165     struct session *s;
166 
167     pa_sink_input_assert_ref(i);
168     pa_assert_se(s = i->userdata);
169 
170     pa_memblockq_rewind(s->memblockq, nbytes);
171 }
172 
173 /* Called from I/O thread context */
sink_input_update_max_rewind_cb(pa_sink_input * i,size_t nbytes)174 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
175     struct session *s;
176 
177     pa_sink_input_assert_ref(i);
178     pa_assert_se(s = i->userdata);
179 
180     pa_memblockq_set_maxrewind(s->memblockq, nbytes);
181 }
182 
183 /* Called from main context */
sink_input_kill(pa_sink_input * i)184 static void sink_input_kill(pa_sink_input* i) {
185     struct session *s;
186     pa_sink_input_assert_ref(i);
187     pa_assert_se(s = i->userdata);
188 
189     pa_hashmap_remove_and_free(s->userdata->by_origin, s->sdp_info.origin);
190 }
191 
192 /* Called from IO context */
sink_input_suspend_within_thread(pa_sink_input * i,bool b)193 static void sink_input_suspend_within_thread(pa_sink_input* i, bool b) {
194     struct session *s;
195     pa_sink_input_assert_ref(i);
196     pa_assert_se(s = i->userdata);
197 
198     if (b)
199         pa_memblockq_flush_read(s->memblockq);
200     else
201         s->first_packet = false;
202 }
203 
204 /* Called from I/O thread context */
rtpoll_work_cb(pa_rtpoll_item * i)205 static int rtpoll_work_cb(pa_rtpoll_item *i) {
206     pa_memchunk chunk;
207     uint32_t timestamp;
208     int64_t k, j, delta;
209     struct timeval now = { 0, 0 };
210     struct session *s;
211     struct pollfd *p;
212 
213     pa_assert_se(s = pa_rtpoll_item_get_work_userdata(i));
214 
215     p = pa_rtpoll_item_get_pollfd(i, NULL);
216 
217     if (p->revents & (POLLERR|POLLNVAL|POLLHUP|POLLOUT)) {
218         pa_log("poll() signalled bad revents.");
219         return -1;
220     }
221 
222     if ((p->revents & POLLIN) == 0)
223         return 0;
224 
225     p->revents = 0;
226 
227     if (pa_rtp_recv(s->rtp_context, &chunk, s->userdata->module->core->mempool, &timestamp, &now) < 0)
228         return 0;
229 
230     if (!PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) {
231         pa_memblock_unref(chunk.memblock);
232         return 0;
233     }
234 
235     if (!s->first_packet) {
236         s->first_packet = true;
237         s->offset = timestamp;
238     }
239 
240     /* Check whether there was a timestamp overflow */
241     k = (int64_t) timestamp - (int64_t) s->offset;
242     j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) timestamp;
243 
244     if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
245         delta = k;
246     else
247         delta = j;
248 
249     pa_memblockq_seek(s->memblockq, delta * (int64_t) pa_rtp_context_get_frame_size(s->rtp_context), PA_SEEK_RELATIVE,
250             true);
251 
252     if (now.tv_sec == 0) {
253         PA_ONCE_BEGIN {
254             pa_log_warn("Using artificial time instead of timestamp");
255         } PA_ONCE_END;
256         pa_rtclock_get(&now);
257     } else
258         pa_rtclock_from_wallclock(&now);
259 
260     if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
261         pa_log_warn("Queue overrun");
262         pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, true);
263     }
264 
265 /*     pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
266 
267     pa_memblock_unref(chunk.memblock);
268 
269     /* The next timestamp we expect */
270     s->offset = timestamp + (uint32_t) (chunk.length / pa_rtp_context_get_frame_size(s->rtp_context));
271 
272     pa_atomic_store(&s->timestamp, (int) now.tv_sec);
273 
274     if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) {
275         pa_usec_t wi, ri, render_delay, sink_delay = 0, latency;
276         uint32_t current_rate = s->sink_input->sample_spec.rate;
277         uint32_t new_rate;
278         double estimated_rate, alpha = 0.02;
279 
280         pa_log_debug("Updating sample rate");
281 
282         wi = pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec);
283         ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec);
284 
285         pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi, (unsigned long) ri);
286 
287         sink_delay = pa_sink_get_latency_within_thread(s->sink_input->sink, false);
288         sink_delay += pa_resampler_get_delay_usec(s->sink_input->thread_info.resampler);
289         render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec);
290 
291         if (ri > render_delay+sink_delay)
292             ri -= render_delay+sink_delay;
293         else
294             ri = 0;
295 
296         if (wi < ri)
297             latency = 0;
298         else
299             latency = wi - ri;
300 
301         pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double) s->intended_latency/PA_USEC_PER_MSEC);
302 
303         /* The buffer is filling with some unknown rate R̂ samples/second. If the rate of reading in
304          * the last T seconds was Rⁿ, then the increase in buffer latency ΔLⁿ = Lⁿ - Lⁿ⁻ⁱ in that
305          * same period is ΔLⁿ = (TR̂ - TRⁿ) / R̂, giving the estimated target rate
306          *                                           T
307          *                                 R̂ = ─────────────── Rⁿ .                             (1)
308          *                                     T - (Lⁿ - Lⁿ⁻ⁱ)
309          *
310          * Setting the sample rate to R̂ results in the latency being constant (if the estimate of R̂
311          * is correct).  But there is also the requirement to keep the buffer at a predefined target
312          * latency L̂.  So instead of setting Rⁿ⁺ⁱ to R̂ immediately, the strategy will be to reduce R
313          * from Rⁿ⁺ⁱ to R̂ in a steps of T seconds, where Rⁿ⁺ⁱ is chosen such that in the total time
314          * aT the latency is reduced from Lⁿ to L̂.  This strategy translates to the requirements
315          *            ₐ      R̂ - Rⁿ⁺ʲ                            a-j+1         j-1
316          *            Σ  T ────────── = L̂ - Lⁿ    with    Rⁿ⁺ʲ = ───── Rⁿ⁺ⁱ + ───── R̂ .
317          *           ʲ⁼ⁱ        R̂                                  a            a
318          * Solving for Rⁿ⁺ⁱ gives
319          *                                     T - ²∕ₐ₊₁(L̂ - Lⁿ)
320          *                              Rⁿ⁺ⁱ = ───────────────── R̂ .                            (2)
321          *                                            T
322          * In the code below a = 7 is used.
323          *
324          * Equation (1) is not directly used in (2), but instead an exponentially weighted average
325          * of the estimated rate R̂ is used.  This average R̅ is defined as
326          *                                R̅ⁿ = α R̂ⁿ + (1-α) R̅ⁿ⁻ⁱ .
327          * Because it is difficult to find a fixed value for the coefficient α such that the
328          * averaging is without significant lag but oscillations are filtered out, a heuristic is
329          * used.  When the successive estimates R̂ⁿ do not change much then α→1, but when there is a
330          * sudden spike in the estimated rate α→0, such that the deviation is given little weight.
331          */
332         estimated_rate = (double) current_rate * (double) RATE_UPDATE_INTERVAL / (double) (RATE_UPDATE_INTERVAL + s->last_latency - latency);
333         if (fabs(s->estimated_rate - s->avg_estimated_rate) > 1) {
334           double ratio = (estimated_rate + s->estimated_rate - 2*s->avg_estimated_rate) / (s->estimated_rate - s->avg_estimated_rate);
335           alpha = PA_CLAMP(2 * (ratio + fabs(ratio)) / (4 + ratio*ratio), 0.02, 0.8);
336         }
337         s->avg_estimated_rate = alpha * estimated_rate + (1-alpha) * s->avg_estimated_rate;
338         s->estimated_rate = estimated_rate;
339         pa_log_debug("Estimated target rate: %.0f Hz, using average of %.0f Hz  (α=%.3f)", estimated_rate, s->avg_estimated_rate, alpha);
340         new_rate = (uint32_t) ((double) (RATE_UPDATE_INTERVAL + latency/4 - s->intended_latency/4) / (double) RATE_UPDATE_INTERVAL * s->avg_estimated_rate);
341         s->last_latency = latency;
342 
343         if (new_rate < (uint32_t) (s->base_rate*0.8) || new_rate > (uint32_t) (s->base_rate*1.25)) {
344             pa_log_warn("Sample rates too different, not adjusting (%u vs. %u).", s->base_rate, new_rate);
345             new_rate = s->base_rate;
346         } else {
347             if (s->base_rate < new_rate + 20 && new_rate < s->base_rate + 20)
348                 new_rate = s->base_rate;
349             /* Do the adjustment in small steps; 2‰ can be considered inaudible */
350             if (new_rate < (uint32_t) (current_rate*0.998) || new_rate > (uint32_t) (current_rate*1.002)) {
351                 pa_log_info("New rate of %u Hz not within 2‰ of %u Hz, forcing smaller adjustment", new_rate, current_rate);
352                 new_rate = PA_CLAMP(new_rate, (uint32_t) (current_rate*0.998), (uint32_t) (current_rate*1.002));
353             }
354         }
355         s->sink_input->sample_spec.rate = new_rate;
356 
357         pa_assert(pa_sample_spec_valid(&s->sink_input->sample_spec));
358 
359         pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate);
360 
361         pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s->sink_input->sample_spec.rate);
362 
363         s->last_rate_update = pa_timeval_load(&now);
364     }
365 
366     if (pa_memblockq_is_readable(s->memblockq) &&
367         s->sink_input->thread_info.underrun_for > 0) {
368         pa_log_debug("Requesting rewind due to end of underrun");
369         pa_sink_input_request_rewind(s->sink_input,
370                                      (size_t) (s->sink_input->thread_info.underrun_for == (uint64_t) -1 ? 0 : s->sink_input->thread_info.underrun_for),
371                                      false, true, false);
372     }
373 
374     return 1;
375 }
376 
377 /* Called from I/O thread context */
sink_input_attach(pa_sink_input * i)378 static void sink_input_attach(pa_sink_input *i) {
379     struct session *s;
380 
381     pa_sink_input_assert_ref(i);
382     pa_assert_se(s = i->userdata);
383 
384     pa_assert(!s->rtpoll_item);
385     s->rtpoll_item = pa_rtp_context_get_rtpoll_item(s->rtp_context, i->sink->thread_info.rtpoll);
386 
387     pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb, s);
388 }
389 
390 /* Called from I/O thread context */
sink_input_detach(pa_sink_input * i)391 static void sink_input_detach(pa_sink_input *i) {
392     struct session *s;
393     pa_sink_input_assert_ref(i);
394     pa_assert_se(s = i->userdata);
395 
396     pa_assert(s->rtpoll_item);
397     pa_rtpoll_item_free(s->rtpoll_item);
398     s->rtpoll_item = NULL;
399 }
400 
mcast_socket(const struct sockaddr * sa,socklen_t salen)401 static int mcast_socket(const struct sockaddr* sa, socklen_t salen) {
402     int af, fd = -1, r, one;
403 
404     pa_assert(sa);
405     pa_assert(salen > 0);
406 
407     af = sa->sa_family;
408     if ((fd = pa_socket_cloexec(af, SOCK_DGRAM, 0)) < 0) {
409         pa_log("Failed to create socket: %s", pa_cstrerror(errno));
410         goto fail;
411     }
412 
413     pa_make_udp_socket_low_delay(fd);
414 
415 #ifdef SO_TIMESTAMP
416     one = 1;
417     if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) {
418         pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno));
419         goto fail;
420     }
421 #else
422     pa_log("SO_TIMESTAMP unsupported on this platform");
423     goto fail;
424 #endif
425 
426     one = 1;
427     if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
428         pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno));
429         goto fail;
430     }
431 
432     r = 0;
433     if (af == AF_INET) {
434         /* IPv4 multicast addresses are in the 224.0.0.0-239.255.255.255 range */
435         static const uint32_t ipv4_mcast_mask = 0xe0000000;
436 
437         if ((ntohl(((const struct sockaddr_in*) sa)->sin_addr.s_addr) & ipv4_mcast_mask) == ipv4_mcast_mask) {
438             struct ip_mreq mr4;
439             memset(&mr4, 0, sizeof(mr4));
440             mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr;
441             r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
442         }
443 #ifdef HAVE_IPV6
444     } else if (af == AF_INET6) {
445         /* IPv6 multicast addresses have 255 as the most significant byte */
446         if (((const struct sockaddr_in6*) sa)->sin6_addr.s6_addr[0] == 0xff) {
447             struct ipv6_mreq mr6;
448             memset(&mr6, 0, sizeof(mr6));
449             mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr;
450             r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6));
451         }
452 #endif
453     } else
454         pa_assert_not_reached();
455 
456     if (r < 0) {
457         pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno));
458         goto fail;
459     }
460 
461     if (bind(fd, sa, salen) < 0) {
462         pa_log("bind() failed: %s", pa_cstrerror(errno));
463         goto fail;
464     }
465 
466     return fd;
467 
468 fail:
469     if (fd >= 0)
470         close(fd);
471 
472     return -1;
473 }
474 
session_new(struct userdata * u,const pa_sdp_info * sdp_info)475 static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) {
476     struct session *s = NULL;
477     pa_sink *sink;
478     int fd = -1;
479     pa_memchunk silence;
480     pa_sink_input_new_data data;
481     struct timeval now;
482 
483     pa_assert(u);
484     pa_assert(sdp_info);
485 
486     if (u->n_sessions >= MAX_SESSIONS) {
487         pa_log("Session limit reached.");
488         goto fail;
489     }
490 
491     if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK))) {
492         pa_log("Sink does not exist.");
493         goto fail;
494     }
495 
496     pa_rtclock_get(&now);
497 
498     s = pa_xnew0(struct session, 1);
499     s->userdata = u;
500     s->first_packet = false;
501     s->sdp_info = *sdp_info;
502     s->rtpoll_item = NULL;
503     s->intended_latency = u->latency;
504     s->last_rate_update = pa_timeval_load(&now);
505     s->last_latency = u->latency;
506     pa_atomic_store(&s->timestamp, (int) now.tv_sec);
507 
508     if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0)
509         goto fail;
510 
511     pa_sink_input_new_data_init(&data);
512     pa_sink_input_new_data_set_sink(&data, sink, false, true);
513     data.driver = __FILE__;
514     pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream");
515     pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME,
516                      "RTP Stream%s%s%s",
517                      sdp_info->session_name ? " (" : "",
518                      sdp_info->session_name ? sdp_info->session_name : "",
519                      sdp_info->session_name ? ")" : "");
520 
521     if (sdp_info->session_name)
522         pa_proplist_sets(data.proplist, "rtp.session", sdp_info->session_name);
523     pa_proplist_sets(data.proplist, "rtp.origin", sdp_info->origin);
524     pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload);
525     data.module = u->module;
526     pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec);
527     data.flags = PA_SINK_INPUT_VARIABLE_RATE;
528 
529     pa_sink_input_new(&s->sink_input, u->module->core, &data);
530     pa_sink_input_new_data_done(&data);
531 
532     if (!s->sink_input) {
533         pa_log("Failed to create sink input.");
534         goto fail;
535     }
536 
537     s->base_rate = (double) s->sink_input->sample_spec.rate;
538     s->estimated_rate = (double) s->sink_input->sample_spec.rate;
539     s->avg_estimated_rate = (double) s->sink_input->sample_spec.rate;
540 
541     s->sink_input->userdata = s;
542 
543     s->sink_input->parent.process_msg = sink_input_process_msg;
544     s->sink_input->pop = sink_input_pop_cb;
545     s->sink_input->process_rewind = sink_input_process_rewind_cb;
546     s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
547     s->sink_input->kill = sink_input_kill;
548     s->sink_input->attach = sink_input_attach;
549     s->sink_input->detach = sink_input_detach;
550     s->sink_input->suspend_within_thread = sink_input_suspend_within_thread;
551 
552     pa_sink_input_get_silence(s->sink_input, &silence);
553 
554     s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, s->intended_latency/2);
555 
556     if (s->intended_latency < s->sink_latency*2)
557         s->intended_latency = s->sink_latency*2;
558 
559     s->memblockq = pa_memblockq_new(
560             "module-rtp-recv memblockq",
561             0,
562             MEMBLOCKQ_MAXLENGTH,
563             MEMBLOCKQ_MAXLENGTH,
564             &s->sink_input->sample_spec,
565             pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec),
566             0,
567             0,
568             &silence);
569 
570     pa_memblock_unref(silence.memblock);
571 
572     if (!(s->rtp_context = pa_rtp_context_new_recv(fd, sdp_info->payload, &s->sdp_info.sample_spec, sdp_info->enable_opus)))
573         goto fail;
574 
575     pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
576     u->n_sessions++;
577     PA_LLIST_PREPEND(struct session, s->userdata->sessions, s);
578 
579     pa_sink_input_put(s->sink_input);
580 
581     pa_log_info("New session '%s'", s->sdp_info.session_name);
582 
583     return s;
584 
585 fail:
586     pa_xfree(s);
587 
588     if (fd >= 0)
589         pa_close(fd);
590 
591     return NULL;
592 }
593 
session_free(struct session * s)594 static void session_free(struct session *s) {
595     pa_assert(s);
596 
597     pa_log_info("Freeing session '%s'", s->sdp_info.session_name);
598 
599     pa_sink_input_unlink(s->sink_input);
600     pa_sink_input_unref(s->sink_input);
601 
602     PA_LLIST_REMOVE(struct session, s->userdata->sessions, s);
603     pa_assert(s->userdata->n_sessions >= 1);
604     s->userdata->n_sessions--;
605 
606     pa_memblockq_free(s->memblockq);
607     pa_sdp_info_destroy(&s->sdp_info);
608     pa_rtp_context_free(s->rtp_context);
609 
610     pa_xfree(s);
611 }
612 
sap_event_cb(pa_mainloop_api * m,pa_io_event * e,int fd,pa_io_event_flags_t flags,void * userdata)613 static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) {
614     struct userdata *u = userdata;
615     bool goodbye = false;
616     pa_sdp_info info;
617     struct session *s;
618 
619     pa_assert(m);
620     pa_assert(e);
621     pa_assert(u);
622     pa_assert(fd == u->sap_context.fd);
623     pa_assert(flags == PA_IO_EVENT_INPUT);
624 
625     if (pa_sap_recv(&u->sap_context, &goodbye) < 0)
626         return;
627 
628     if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye))
629         return;
630 
631     if (goodbye) {
632         pa_hashmap_remove_and_free(u->by_origin, info.origin);
633         pa_sdp_info_destroy(&info);
634     } else {
635 
636         if (!(s = pa_hashmap_get(u->by_origin, info.origin))) {
637             if (!session_new(u, &info))
638                 pa_sdp_info_destroy(&info);
639 
640         } else {
641             struct timeval now;
642             pa_rtclock_get(&now);
643             pa_atomic_store(&s->timestamp, (int) now.tv_sec);
644 
645             pa_sdp_info_destroy(&info);
646         }
647     }
648 }
649 
check_death_event_cb(pa_mainloop_api * m,pa_time_event * t,const struct timeval * tv,void * userdata)650 static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *tv, void *userdata) {
651     struct session *s, *n;
652     struct userdata *u = userdata;
653     struct timeval now;
654 
655     pa_assert(m);
656     pa_assert(t);
657     pa_assert(u);
658 
659     pa_rtclock_get(&now);
660 
661     pa_log_debug("Checking for dead streams ...");
662 
663     for (s = u->sessions; s; s = n) {
664         int k;
665         n = s->next;
666 
667         k = pa_atomic_load(&s->timestamp);
668 
669         if (k + DEATH_TIMEOUT < now.tv_sec)
670             pa_hashmap_remove_and_free(u->by_origin, s->sdp_info.origin);
671     }
672 
673     /* Restart timer */
674     pa_core_rttime_restart(u->module->core, t, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC);
675 }
676 
pa__init(pa_module * m)677 int pa__init(pa_module*m) {
678     struct userdata *u;
679     pa_modargs *ma = NULL;
680     struct sockaddr_in sa4;
681 #ifdef HAVE_IPV6
682     struct sockaddr_in6 sa6;
683 #endif
684     struct sockaddr *sa;
685     socklen_t salen;
686     const char *sap_address;
687     uint32_t latency_msec;
688     int fd = -1;
689 
690     pa_assert(m);
691 
692     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
693         pa_log("failed to parse module arguments");
694         goto fail;
695     }
696 
697     sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS);
698 
699     if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) {
700         sa4.sin_family = AF_INET;
701         sa4.sin_port = htons(SAP_PORT);
702         sa = (struct sockaddr*) &sa4;
703         salen = sizeof(sa4);
704 #ifdef HAVE_IPV6
705     } else if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) {
706         sa6.sin6_family = AF_INET6;
707         sa6.sin6_port = htons(SAP_PORT);
708         sa = (struct sockaddr*) &sa6;
709         salen = sizeof(sa6);
710 #endif
711     } else {
712         pa_log("Invalid SAP address '%s'", sap_address);
713         goto fail;
714     }
715 
716     latency_msec = DEFAULT_LATENCY_MSEC;
717     if (pa_modargs_get_value_u32(ma, "latency_msec", &latency_msec) < 0 || latency_msec < 1 || latency_msec > 300000) {
718         pa_log("Invalid latency specification");
719         goto fail;
720     }
721 
722     if ((fd = mcast_socket(sa, salen)) < 0)
723         goto fail;
724 
725     m->userdata = u = pa_xnew(struct userdata, 1);
726     u->module = m;
727     u->core = m->core;
728     u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
729     u->latency = (pa_usec_t) latency_msec * PA_USEC_PER_MSEC;
730 
731     u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u);
732     pa_sap_context_init_recv(&u->sap_context, fd);
733 
734     PA_LLIST_HEAD_INIT(struct session, u->sessions);
735     u->n_sessions = 0;
736     u->by_origin = pa_hashmap_new_full(pa_idxset_string_hash_func, pa_idxset_string_compare_func, NULL, (pa_free_cb_t) session_free);
737 
738     u->check_death_event = pa_core_rttime_new(m->core, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC, check_death_event_cb, u);
739 
740     pa_modargs_free(ma);
741 
742     return 0;
743 
744 fail:
745     if (ma)
746         pa_modargs_free(ma);
747 
748     if (fd >= 0)
749         pa_close(fd);
750 
751     return -1;
752 }
753 
pa__done(pa_module * m)754 void pa__done(pa_module*m) {
755     struct userdata *u;
756 
757     pa_assert(m);
758 
759     if (!(u = m->userdata))
760         return;
761 
762     if (u->sap_event)
763         m->core->mainloop->io_free(u->sap_event);
764 
765     if (u->check_death_event)
766         m->core->mainloop->time_free(u->check_death_event);
767 
768     pa_sap_context_destroy(&u->sap_context);
769 
770     if (u->by_origin)
771         pa_hashmap_free(u->by_origin);
772 
773     pa_xfree(u->sink_name);
774     pa_xfree(u);
775 }
776