• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 
2 /***
3   This file is part of PulseAudio.
4 
5   Copyright 2006 Lennart Poettering
6 
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11 
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16 
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20 
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24 
25 #include <stdio.h>
26 #include <sys/socket.h>
27 #include <netinet/in.h>
28 #include <errno.h>
29 #include <string.h>
30 #include <unistd.h>
31 #include <math.h>
32 #ifdef HAVE_NETDB_H
33 #include <netdb.h>
34 #endif
35 
36 #include <pulse/rtclock.h>
37 #include <pulse/timeval.h>
38 #include <pulse/xmalloc.h>
39 
40 #include <pulsecore/core-error.h>
41 #include <pulsecore/module.h>
42 #include <pulsecore/llist.h>
43 #include <pulsecore/sink.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/memblockq.h>
46 #include <pulsecore/log.h>
47 #include <pulsecore/core-rtclock.h>
48 #include <pulsecore/core-util.h>
49 #include <pulsecore/modargs.h>
50 #include <pulsecore/namereg.h>
51 #include <pulsecore/sample-util.h>
52 #include <pulsecore/macro.h>
53 #include <pulsecore/socket-util.h>
54 #include <pulsecore/atomic.h>
55 #include <pulsecore/once.h>
56 #include <pulsecore/poll.h>
57 #include <pulsecore/arpa-inet.h>
58 
59 #include "rtp.h"
60 #include "sdp.h"
61 #include "sap.h"
62 
63 PA_MODULE_AUTHOR("Lennart Poettering");
64 PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
65 PA_MODULE_VERSION(PACKAGE_VERSION);
66 PA_MODULE_LOAD_ONCE(false);
67 PA_MODULE_USAGE(
68         "sink=<name of the sink> "
69         "sap_address=<multicast address to listen on> "
70         "latency_msec=<latency in ms> "
71 );
72 
73 #define SAP_PORT 9875
74 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
75 #define DEFAULT_LATENCY_MSEC 500
76 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
77 #define MAX_SESSIONS 16
78 #define DEATH_TIMEOUT 20
79 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
80 
81 static const char* const valid_modargs[] = {
82     "sink",
83     "sap_address",
84     "latency_msec",
85     NULL
86 };
87 
88 struct session {
89     struct userdata *userdata;
90     PA_LLIST_FIELDS(struct session);
91 
92     pa_sink_input *sink_input;
93     pa_memblockq *memblockq;
94 
95     bool first_packet;
96     uint32_t offset;
97 
98     struct pa_sdp_info sdp_info;
99 
100     pa_rtp_context *rtp_context;
101 
102     pa_rtpoll_item *rtpoll_item;
103 
104     pa_atomic_t timestamp;
105 
106     pa_usec_t intended_latency;
107     pa_usec_t sink_latency;
108 
109     unsigned int base_rate;
110     pa_usec_t last_rate_update;
111     pa_usec_t last_latency;
112     double estimated_rate;
113     double avg_estimated_rate;
114 };
115 
116 struct userdata {
117     pa_module *module;
118     pa_core *core;
119 
120     pa_sap_context sap_context;
121     pa_io_event* sap_event;
122 
123     pa_time_event *check_death_event;
124 
125     char *sink_name;
126 
127     PA_LLIST_HEAD(struct session, sessions);
128     pa_hashmap *by_origin;
129     int n_sessions;
130 
131     pa_usec_t latency;
132 };
133 
134 static void session_free(struct session *s);
135 
136 /* Called from I/O thread context */
sink_input_process_msg(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)137 static int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
138     struct session *s = PA_SINK_INPUT(o)->userdata;
139 
140     switch (code) {
141         case PA_SINK_INPUT_MESSAGE_GET_LATENCY:
142             *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
143 
144             /* Fall through, the default handler will add in the extra
145              * latency added by the resampler */
146             break;
147     }
148 
149     return pa_sink_input_process_msg(o, code, data, offset, chunk);
150 }
151 
152 /* Called from I/O thread context */
sink_input_pop_cb(pa_sink_input * i,size_t length,pa_memchunk * chunk)153 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
154     struct session *s;
155     pa_sink_input_assert_ref(i);
156     pa_assert_se(s = i->userdata);
157 
158     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
159         return -1;
160 
161     pa_memblockq_drop(s->memblockq, chunk->length);
162 
163     return 0;
164 }
165 
166 /* Called from I/O thread context */
sink_input_process_rewind_cb(pa_sink_input * i,size_t nbytes)167 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
168     struct session *s;
169 
170     pa_sink_input_assert_ref(i);
171     pa_assert_se(s = i->userdata);
172 
173     pa_memblockq_rewind(s->memblockq, nbytes);
174 }
175 
176 /* Called from I/O thread context */
sink_input_update_max_rewind_cb(pa_sink_input * i,size_t nbytes)177 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
178     struct session *s;
179 
180     pa_sink_input_assert_ref(i);
181     pa_assert_se(s = i->userdata);
182 
183     pa_memblockq_set_maxrewind(s->memblockq, nbytes);
184 }
185 
186 /* Called from main context */
sink_input_kill(pa_sink_input * i)187 static void sink_input_kill(pa_sink_input* i) {
188     struct session *s;
189     pa_sink_input_assert_ref(i);
190     pa_assert_se(s = i->userdata);
191 
192     pa_hashmap_remove_and_free(s->userdata->by_origin, s->sdp_info.origin);
193 }
194 
195 /* Called from IO context */
sink_input_suspend_within_thread(pa_sink_input * i,bool b)196 static void sink_input_suspend_within_thread(pa_sink_input* i, bool b) {
197     struct session *s;
198     pa_sink_input_assert_ref(i);
199     pa_assert_se(s = i->userdata);
200 
201     if (b)
202         pa_memblockq_flush_read(s->memblockq);
203     else
204         s->first_packet = false;
205 }
206 
207 /* Called from I/O thread context */
rtpoll_work_cb(pa_rtpoll_item * i)208 static int rtpoll_work_cb(pa_rtpoll_item *i) {
209     pa_memchunk chunk;
210     uint32_t timestamp;
211     int64_t k, j, delta;
212     struct timeval now = { 0, 0 };
213     struct session *s;
214     struct pollfd *p;
215 
216     pa_assert_se(s = pa_rtpoll_item_get_work_userdata(i));
217 
218     p = pa_rtpoll_item_get_pollfd(i, NULL);
219 
220     if (p->revents & (POLLERR|POLLNVAL|POLLHUP|POLLOUT)) {
221         pa_log("poll() signalled bad revents.");
222         return -1;
223     }
224 
225     if ((p->revents & POLLIN) == 0)
226         return 0;
227 
228     p->revents = 0;
229 
230     if (pa_rtp_recv(s->rtp_context, &chunk, s->userdata->module->core->mempool, &timestamp, &now) < 0)
231         return 0;
232 
233     if (!PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) {
234         pa_memblock_unref(chunk.memblock);
235         return 0;
236     }
237 
238     if (!s->first_packet) {
239         s->first_packet = true;
240         s->offset = timestamp;
241     }
242 
243     /* Check whether there was a timestamp overflow */
244     k = (int64_t) timestamp - (int64_t) s->offset;
245     j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) timestamp;
246 
247     if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
248         delta = k;
249     else
250         delta = j;
251 
252     pa_memblockq_seek(s->memblockq, delta * (int64_t) pa_rtp_context_get_frame_size(s->rtp_context), PA_SEEK_RELATIVE,
253             true);
254 
255     if (now.tv_sec == 0) {
256         PA_ONCE_BEGIN {
257             pa_log_warn("Using artificial time instead of timestamp");
258         } PA_ONCE_END;
259         pa_rtclock_get(&now);
260     } else
261         pa_rtclock_from_wallclock(&now);
262 
263     if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
264         pa_log_warn("Queue overrun");
265         pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, true);
266     }
267 
268 /*     pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
269 
270     pa_memblock_unref(chunk.memblock);
271 
272     /* The next timestamp we expect */
273     s->offset = timestamp + (uint32_t) (chunk.length / pa_rtp_context_get_frame_size(s->rtp_context));
274 
275     pa_atomic_store(&s->timestamp, (int) now.tv_sec);
276 
277     if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) {
278         pa_usec_t wi, ri, render_delay, sink_delay = 0, latency;
279         uint32_t current_rate = s->sink_input->sample_spec.rate;
280         uint32_t new_rate;
281         double estimated_rate, alpha = 0.02;
282 
283         pa_log_debug("Updating sample rate");
284 
285         wi = pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec);
286         ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec);
287 
288         pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi, (unsigned long) ri);
289 
290         sink_delay = pa_sink_get_latency_within_thread(s->sink_input->sink, false);
291         sink_delay += pa_resampler_get_delay_usec(s->sink_input->thread_info.resampler);
292         render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec);
293 
294         if (ri > render_delay+sink_delay)
295             ri -= render_delay+sink_delay;
296         else
297             ri = 0;
298 
299         if (wi < ri)
300             latency = 0;
301         else
302             latency = wi - ri;
303 
304         pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double) s->intended_latency/PA_USEC_PER_MSEC);
305 
306         /* The buffer is filling with some unknown rate R̂ samples/second. If the rate of reading in
307          * the last T seconds was Rⁿ, then the increase in buffer latency ΔLⁿ = Lⁿ - Lⁿ⁻ⁱ in that
308          * same period is ΔLⁿ = (TR̂ - TRⁿ) / R̂, giving the estimated target rate
309          *                                           T
310          *                                 R̂ = ─────────────── Rⁿ .                             (1)
311          *                                     T - (Lⁿ - Lⁿ⁻ⁱ)
312          *
313          * Setting the sample rate to R̂ results in the latency being constant (if the estimate of R̂
314          * is correct).  But there is also the requirement to keep the buffer at a predefined target
315          * latency L̂.  So instead of setting Rⁿ⁺ⁱ to R̂ immediately, the strategy will be to reduce R
316          * from Rⁿ⁺ⁱ to R̂ in a steps of T seconds, where Rⁿ⁺ⁱ is chosen such that in the total time
317          * aT the latency is reduced from Lⁿ to L̂.  This strategy translates to the requirements
318          *            ₐ      R̂ - Rⁿ⁺ʲ                            a-j+1         j-1
319          *            Σ  T ────────── = L̂ - Lⁿ    with    Rⁿ⁺ʲ = ───── Rⁿ⁺ⁱ + ───── R̂ .
320          *           ʲ⁼ⁱ        R̂                                  a            a
321          * Solving for Rⁿ⁺ⁱ gives
322          *                                     T - ²∕ₐ₊₁(L̂ - Lⁿ)
323          *                              Rⁿ⁺ⁱ = ───────────────── R̂ .                            (2)
324          *                                            T
325          * In the code below a = 7 is used.
326          *
327          * Equation (1) is not directly used in (2), but instead an exponentially weighted average
328          * of the estimated rate R̂ is used.  This average R̅ is defined as
329          *                                R̅ⁿ = α R̂ⁿ + (1-α) R̅ⁿ⁻ⁱ .
330          * Because it is difficult to find a fixed value for the coefficient α such that the
331          * averaging is without significant lag but oscillations are filtered out, a heuristic is
332          * used.  When the successive estimates R̂ⁿ do not change much then α→1, but when there is a
333          * sudden spike in the estimated rate α→0, such that the deviation is given little weight.
334          */
335         estimated_rate = (double) current_rate * (double) RATE_UPDATE_INTERVAL / (double) (RATE_UPDATE_INTERVAL + s->last_latency - latency);
336         if (fabs(s->estimated_rate - s->avg_estimated_rate) > 1) {
337           double ratio = (estimated_rate + s->estimated_rate - 2*s->avg_estimated_rate) / (s->estimated_rate - s->avg_estimated_rate);
338           alpha = PA_CLAMP(2 * (ratio + fabs(ratio)) / (4 + ratio*ratio), 0.02, 0.8);
339         }
340         s->avg_estimated_rate = alpha * estimated_rate + (1-alpha) * s->avg_estimated_rate;
341         s->estimated_rate = estimated_rate;
342         pa_log_debug("Estimated target rate: %.0f Hz, using average of %.0f Hz  (α=%.3f)", estimated_rate, s->avg_estimated_rate, alpha);
343         new_rate = (uint32_t) ((double) (RATE_UPDATE_INTERVAL + latency/4 - s->intended_latency/4) / (double) RATE_UPDATE_INTERVAL * s->avg_estimated_rate);
344         s->last_latency = latency;
345 
346         if (new_rate < (uint32_t) (s->base_rate*0.8) || new_rate > (uint32_t) (s->base_rate*1.25)) {
347             pa_log_warn("Sample rates too different, not adjusting (%u vs. %u).", s->base_rate, new_rate);
348             new_rate = s->base_rate;
349         } else {
350             if (s->base_rate < new_rate + 20 && new_rate < s->base_rate + 20)
351                 new_rate = s->base_rate;
352             /* Do the adjustment in small steps; 2‰ can be considered inaudible */
353             if (new_rate < (uint32_t) (current_rate*0.998) || new_rate > (uint32_t) (current_rate*1.002)) {
354                 pa_log_info("New rate of %u Hz not within 2‰ of %u Hz, forcing smaller adjustment", new_rate, current_rate);
355                 new_rate = PA_CLAMP(new_rate, (uint32_t) (current_rate*0.998), (uint32_t) (current_rate*1.002));
356             }
357         }
358         s->sink_input->sample_spec.rate = new_rate;
359 
360         pa_assert(pa_sample_spec_valid(&s->sink_input->sample_spec));
361 
362         pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate);
363 
364         pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s->sink_input->sample_spec.rate);
365 
366         s->last_rate_update = pa_timeval_load(&now);
367     }
368 
369     if (pa_memblockq_is_readable(s->memblockq) &&
370         s->sink_input->thread_info.underrun_for > 0) {
371         pa_log_debug("Requesting rewind due to end of underrun");
372         pa_sink_input_request_rewind(s->sink_input,
373                                      (size_t) (s->sink_input->thread_info.underrun_for == (uint64_t) -1 ? 0 : s->sink_input->thread_info.underrun_for),
374                                      false, true, false);
375     }
376 
377     return 1;
378 }
379 
380 /* Called from I/O thread context */
sink_input_attach(pa_sink_input * i)381 static void sink_input_attach(pa_sink_input *i) {
382     struct session *s;
383 
384     pa_sink_input_assert_ref(i);
385     pa_assert_se(s = i->userdata);
386 
387     pa_assert(!s->rtpoll_item);
388     s->rtpoll_item = pa_rtp_context_get_rtpoll_item(s->rtp_context, i->sink->thread_info.rtpoll);
389 
390     pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb, s);
391 }
392 
393 /* Called from I/O thread context */
sink_input_detach(pa_sink_input * i)394 static void sink_input_detach(pa_sink_input *i) {
395     struct session *s;
396     pa_sink_input_assert_ref(i);
397     pa_assert_se(s = i->userdata);
398 
399     pa_assert(s->rtpoll_item);
400     pa_rtpoll_item_free(s->rtpoll_item);
401     s->rtpoll_item = NULL;
402 }
403 
mcast_socket(const struct sockaddr * sa,socklen_t salen)404 static int mcast_socket(const struct sockaddr* sa, socklen_t salen) {
405     int af, fd = -1, r, one;
406 
407     pa_assert(sa);
408     pa_assert(salen > 0);
409 
410     af = sa->sa_family;
411     if ((fd = pa_socket_cloexec(af, SOCK_DGRAM, 0)) < 0) {
412         pa_log("Failed to create socket: %s", pa_cstrerror(errno));
413         goto fail;
414     }
415 
416     pa_make_udp_socket_low_delay(fd);
417 
418 #ifdef SO_TIMESTAMP
419     one = 1;
420     if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) {
421         pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno));
422         goto fail;
423     }
424 #else
425     pa_log("SO_TIMESTAMP unsupported on this platform");
426     goto fail;
427 #endif
428 
429     one = 1;
430     if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
431         pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno));
432         goto fail;
433     }
434 
435     r = 0;
436     if (af == AF_INET) {
437         /* IPv4 multicast addresses are in the 224.0.0.0-239.255.255.255 range */
438         static const uint32_t ipv4_mcast_mask = 0xe0000000;
439 
440         if ((ntohl(((const struct sockaddr_in*) sa)->sin_addr.s_addr) & ipv4_mcast_mask) == ipv4_mcast_mask) {
441             struct ip_mreq mr4;
442             memset(&mr4, 0, sizeof(mr4));
443             mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr;
444             r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
445         }
446 #ifdef HAVE_IPV6
447     } else if (af == AF_INET6) {
448         /* IPv6 multicast addresses have 255 as the most significant byte */
449         if (((const struct sockaddr_in6*) sa)->sin6_addr.s6_addr[0] == 0xff) {
450             struct ipv6_mreq mr6;
451             memset(&mr6, 0, sizeof(mr6));
452             mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr;
453             r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6));
454         }
455 #endif
456     } else
457         pa_assert_not_reached();
458 
459     if (r < 0) {
460         pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno));
461         goto fail;
462     }
463 
464     if (bind(fd, sa, salen) < 0) {
465         pa_log("bind() failed: %s", pa_cstrerror(errno));
466         goto fail;
467     }
468 
469     return fd;
470 
471 fail:
472     if (fd >= 0)
473         close(fd);
474 
475     return -1;
476 }
477 
session_new(struct userdata * u,const pa_sdp_info * sdp_info)478 static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) {
479     struct session *s = NULL;
480     pa_sink *sink;
481     int fd = -1;
482     pa_memchunk silence;
483     pa_sink_input_new_data data;
484     struct timeval now;
485 
486     pa_assert(u);
487     pa_assert(sdp_info);
488 
489     if (u->n_sessions >= MAX_SESSIONS) {
490         pa_log("Session limit reached.");
491         goto fail;
492     }
493 
494     if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK))) {
495         pa_log("Sink does not exist.");
496         goto fail;
497     }
498 
499     pa_rtclock_get(&now);
500 
501     s = pa_xnew0(struct session, 1);
502     s->userdata = u;
503     s->first_packet = false;
504     s->sdp_info = *sdp_info;
505     s->rtpoll_item = NULL;
506     s->intended_latency = u->latency;
507     s->last_rate_update = pa_timeval_load(&now);
508     s->last_latency = u->latency;
509     pa_atomic_store(&s->timestamp, (int) now.tv_sec);
510 
511     if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0)
512         goto fail;
513 
514     pa_sink_input_new_data_init(&data);
515     pa_sink_input_new_data_set_sink(&data, sink, false, true);
516     data.driver = __FILE__;
517     pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream");
518     pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME,
519                      "RTP Stream%s%s%s",
520                      sdp_info->session_name ? " (" : "",
521                      sdp_info->session_name ? sdp_info->session_name : "",
522                      sdp_info->session_name ? ")" : "");
523 
524     if (sdp_info->session_name)
525         pa_proplist_sets(data.proplist, "rtp.session", sdp_info->session_name);
526     pa_proplist_sets(data.proplist, "rtp.origin", sdp_info->origin);
527     pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload);
528     data.module = u->module;
529     pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec);
530     data.flags = PA_SINK_INPUT_VARIABLE_RATE;
531 
532     pa_sink_input_new(&s->sink_input, u->module->core, &data);
533     pa_sink_input_new_data_done(&data);
534 
535     if (!s->sink_input) {
536         pa_log("Failed to create sink input.");
537         goto fail;
538     }
539 
540     s->base_rate = (double) s->sink_input->sample_spec.rate;
541     s->estimated_rate = (double) s->sink_input->sample_spec.rate;
542     s->avg_estimated_rate = (double) s->sink_input->sample_spec.rate;
543 
544     s->sink_input->userdata = s;
545 
546     s->sink_input->parent.process_msg = sink_input_process_msg;
547     s->sink_input->pop = sink_input_pop_cb;
548     s->sink_input->process_rewind = sink_input_process_rewind_cb;
549     s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
550     s->sink_input->kill = sink_input_kill;
551     s->sink_input->attach = sink_input_attach;
552     s->sink_input->detach = sink_input_detach;
553     s->sink_input->suspend_within_thread = sink_input_suspend_within_thread;
554 
555     pa_sink_input_get_silence(s->sink_input, &silence);
556 
557     s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, s->intended_latency/2);
558 
559     if (s->intended_latency < s->sink_latency*2)
560         s->intended_latency = s->sink_latency*2;
561 
562     s->memblockq = pa_memblockq_new(
563             "module-rtp-recv memblockq",
564             0,
565             MEMBLOCKQ_MAXLENGTH,
566             MEMBLOCKQ_MAXLENGTH,
567             &s->sink_input->sample_spec,
568             pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec),
569             0,
570             0,
571             &silence);
572 
573     pa_memblock_unref(silence.memblock);
574 
575     if (!(s->rtp_context = pa_rtp_context_new_recv(fd, sdp_info->payload, &s->sdp_info.sample_spec, sdp_info->enable_opus)))
576         goto fail;
577 
578     pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
579     u->n_sessions++;
580     PA_LLIST_PREPEND(struct session, s->userdata->sessions, s);
581 
582     pa_sink_input_put(s->sink_input);
583 
584     pa_log_info("New session '%s'", s->sdp_info.session_name);
585 
586     return s;
587 
588 fail:
589     pa_xfree(s);
590 
591     if (fd >= 0)
592         pa_close(fd);
593 
594     return NULL;
595 }
596 
session_free(struct session * s)597 static void session_free(struct session *s) {
598     pa_assert(s);
599 
600     pa_log_info("Freeing session '%s'", s->sdp_info.session_name);
601 
602     pa_sink_input_unlink(s->sink_input);
603     pa_sink_input_unref(s->sink_input);
604 
605     PA_LLIST_REMOVE(struct session, s->userdata->sessions, s);
606     pa_assert(s->userdata->n_sessions >= 1);
607     s->userdata->n_sessions--;
608 
609     pa_memblockq_free(s->memblockq);
610     pa_sdp_info_destroy(&s->sdp_info);
611     pa_rtp_context_free(s->rtp_context);
612 
613     pa_xfree(s);
614 }
615 
sap_event_cb(pa_mainloop_api * m,pa_io_event * e,int fd,pa_io_event_flags_t flags,void * userdata)616 static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) {
617     struct userdata *u = userdata;
618     bool goodbye = false;
619     pa_sdp_info info;
620     struct session *s;
621 
622     pa_assert(m);
623     pa_assert(e);
624     pa_assert(u);
625     pa_assert(fd == u->sap_context.fd);
626     pa_assert(flags == PA_IO_EVENT_INPUT);
627 
628     if (pa_sap_recv(&u->sap_context, &goodbye) < 0)
629         return;
630 
631     if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye))
632         return;
633 
634     if (goodbye) {
635         pa_hashmap_remove_and_free(u->by_origin, info.origin);
636         pa_sdp_info_destroy(&info);
637     } else {
638 
639         if (!(s = pa_hashmap_get(u->by_origin, info.origin))) {
640             if (!session_new(u, &info))
641                 pa_sdp_info_destroy(&info);
642 
643         } else {
644             struct timeval now;
645             pa_rtclock_get(&now);
646             pa_atomic_store(&s->timestamp, (int) now.tv_sec);
647 
648             pa_sdp_info_destroy(&info);
649         }
650     }
651 }
652 
check_death_event_cb(pa_mainloop_api * m,pa_time_event * t,const struct timeval * tv,void * userdata)653 static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *tv, void *userdata) {
654     struct session *s, *n;
655     struct userdata *u = userdata;
656     struct timeval now;
657 
658     pa_assert(m);
659     pa_assert(t);
660     pa_assert(u);
661 
662     pa_rtclock_get(&now);
663 
664     pa_log_debug("Checking for dead streams ...");
665 
666     for (s = u->sessions; s; s = n) {
667         int k;
668         n = s->next;
669 
670         k = pa_atomic_load(&s->timestamp);
671 
672         if (k + DEATH_TIMEOUT < now.tv_sec)
673             pa_hashmap_remove_and_free(u->by_origin, s->sdp_info.origin);
674     }
675 
676     /* Restart timer */
677     pa_core_rttime_restart(u->module->core, t, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC);
678 }
679 
pa__init(pa_module * m)680 int pa__init(pa_module*m) {
681     struct userdata *u;
682     pa_modargs *ma = NULL;
683 #if defined(HAVE_GETADDRINFO)
684     struct addrinfo *sap_addrinfo = NULL;
685 #else
686     struct sockaddr_in sa4;
687 #ifdef HAVE_IPV6
688     struct sockaddr_in6 sa6;
689 #endif
690 #endif
691     struct sockaddr *sa;
692     socklen_t salen;
693     const char *sap_address;
694     uint32_t latency_msec;
695     int fd = -1;
696 
697     pa_assert(m);
698 
699     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
700         pa_log("failed to parse module arguments");
701         goto fail;
702     }
703 
704     sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS);
705 
706 #if defined(HAVE_GETADDRINFO)
707     {
708         struct addrinfo hints;
709         char *service;
710 
711         pa_zero(hints);
712 
713         service = pa_sprintf_malloc("%d", htons(SAP_PORT));
714 
715         hints.ai_flags = AI_NUMERICHOST;
716         if (getaddrinfo(sap_address, service, &hints, &sap_addrinfo) != 0) {
717             pa_xfree(service);
718             pa_log("Invalid SAP address '%s'", sap_address);
719             goto fail;
720         }
721         pa_xfree(service);
722 
723         sa = sap_addrinfo->ai_addr;
724         salen = sap_addrinfo->ai_addrlen;
725     }
726 #else
727     if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) {
728         sa4.sin_family = AF_INET;
729         sa4.sin_port = htons(SAP_PORT);
730         sa = (struct sockaddr*) &sa4;
731         salen = sizeof(sa4);
732 #ifdef HAVE_IPV6
733     } else if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) {
734         sa6.sin6_family = AF_INET6;
735         sa6.sin6_port = htons(SAP_PORT);
736         sa = (struct sockaddr*) &sa6;
737         salen = sizeof(sa6);
738 #endif
739     } else {
740         pa_log("Invalid SAP address '%s'", sap_address);
741         goto fail;
742     }
743 #endif
744 
745     latency_msec = DEFAULT_LATENCY_MSEC;
746     if (pa_modargs_get_value_u32(ma, "latency_msec", &latency_msec) < 0 || latency_msec < 1 || latency_msec > 300000) {
747         pa_log("Invalid latency specification");
748         goto fail;
749     }
750 
751     if ((fd = mcast_socket(sa, salen)) < 0)
752         goto fail;
753 
754     m->userdata = u = pa_xnew(struct userdata, 1);
755     u->module = m;
756     u->core = m->core;
757     u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
758     u->latency = (pa_usec_t) latency_msec * PA_USEC_PER_MSEC;
759 
760     u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u);
761     pa_sap_context_init_recv(&u->sap_context, fd);
762 
763     PA_LLIST_HEAD_INIT(struct session, u->sessions);
764     u->n_sessions = 0;
765     u->by_origin = pa_hashmap_new_full(pa_idxset_string_hash_func, pa_idxset_string_compare_func, NULL, (pa_free_cb_t) session_free);
766 
767     u->check_death_event = pa_core_rttime_new(m->core, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC, check_death_event_cb, u);
768 
769     pa_modargs_free(ma);
770 
771 #if defined(HAVE_GETADDRINFO)
772     freeaddrinfo(sap_addrinfo);
773 #endif
774 
775     return 0;
776 
777 fail:
778 #if defined(HAVE_GETADDRINFO)
779     if (sap_addrinfo)
780         freeaddrinfo(sap_addrinfo);
781 #endif
782 
783     if (ma)
784         pa_modargs_free(ma);
785 
786     if (fd >= 0)
787         pa_close(fd);
788 
789     return -1;
790 }
791 
pa__done(pa_module * m)792 void pa__done(pa_module*m) {
793     struct userdata *u;
794 
795     pa_assert(m);
796 
797     if (!(u = m->userdata))
798         return;
799 
800     if (u->sap_event)
801         m->core->mainloop->io_free(u->sap_event);
802 
803     if (u->check_death_event)
804         m->core->mainloop->time_free(u->check_death_event);
805 
806     pa_sap_context_destroy(&u->sap_context);
807 
808     if (u->by_origin)
809         pa_hashmap_free(u->by_origin);
810 
811     pa_xfree(u->sink_name);
812     pa_xfree(u);
813 }
814