1
2 /***
3 This file is part of PulseAudio.
4
5 Copyright 2006 Lennart Poettering
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24
25 #include <stdio.h>
26 #include <sys/socket.h>
27 #include <netinet/in.h>
28 #include <errno.h>
29 #include <string.h>
30 #include <unistd.h>
31 #include <math.h>
32 #ifdef HAVE_NETDB_H
33 #include <netdb.h>
34 #endif
35
36 #include <pulse/rtclock.h>
37 #include <pulse/timeval.h>
38 #include <pulse/xmalloc.h>
39
40 #include <pulsecore/core-error.h>
41 #include <pulsecore/module.h>
42 #include <pulsecore/llist.h>
43 #include <pulsecore/sink.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/memblockq.h>
46 #include <pulsecore/log.h>
47 #include <pulsecore/core-rtclock.h>
48 #include <pulsecore/core-util.h>
49 #include <pulsecore/modargs.h>
50 #include <pulsecore/namereg.h>
51 #include <pulsecore/sample-util.h>
52 #include <pulsecore/macro.h>
53 #include <pulsecore/socket-util.h>
54 #include <pulsecore/atomic.h>
55 #include <pulsecore/once.h>
56 #include <pulsecore/poll.h>
57 #include <pulsecore/arpa-inet.h>
58
59 #include "rtp.h"
60 #include "sdp.h"
61 #include "sap.h"
62
63 PA_MODULE_AUTHOR("Lennart Poettering");
64 PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
65 PA_MODULE_VERSION(PACKAGE_VERSION);
66 PA_MODULE_LOAD_ONCE(false);
67 PA_MODULE_USAGE(
68 "sink=<name of the sink> "
69 "sap_address=<multicast address to listen on> "
70 "latency_msec=<latency in ms> "
71 );
72
73 #define SAP_PORT 9875
74 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
75 #define DEFAULT_LATENCY_MSEC 500
76 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
77 #define MAX_SESSIONS 16
78 #define DEATH_TIMEOUT 20
79 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
80
81 static const char* const valid_modargs[] = {
82 "sink",
83 "sap_address",
84 "latency_msec",
85 NULL
86 };
87
88 struct session {
89 struct userdata *userdata;
90 PA_LLIST_FIELDS(struct session);
91
92 pa_sink_input *sink_input;
93 pa_memblockq *memblockq;
94
95 bool first_packet;
96 uint32_t offset;
97
98 struct pa_sdp_info sdp_info;
99
100 pa_rtp_context *rtp_context;
101
102 pa_rtpoll_item *rtpoll_item;
103
104 pa_atomic_t timestamp;
105
106 pa_usec_t intended_latency;
107 pa_usec_t sink_latency;
108
109 unsigned int base_rate;
110 pa_usec_t last_rate_update;
111 pa_usec_t last_latency;
112 double estimated_rate;
113 double avg_estimated_rate;
114 };
115
116 struct userdata {
117 pa_module *module;
118 pa_core *core;
119
120 pa_sap_context sap_context;
121 pa_io_event* sap_event;
122
123 pa_time_event *check_death_event;
124
125 char *sink_name;
126
127 PA_LLIST_HEAD(struct session, sessions);
128 pa_hashmap *by_origin;
129 int n_sessions;
130
131 pa_usec_t latency;
132 };
133
134 static void session_free(struct session *s);
135
136 /* Called from I/O thread context */
sink_input_process_msg(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)137 static int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
138 struct session *s = PA_SINK_INPUT(o)->userdata;
139
140 switch (code) {
141 case PA_SINK_INPUT_MESSAGE_GET_LATENCY:
142 *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
143
144 /* Fall through, the default handler will add in the extra
145 * latency added by the resampler */
146 break;
147 }
148
149 return pa_sink_input_process_msg(o, code, data, offset, chunk);
150 }
151
152 /* Called from I/O thread context */
sink_input_pop_cb(pa_sink_input * i,size_t length,pa_memchunk * chunk)153 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
154 struct session *s;
155 pa_sink_input_assert_ref(i);
156 pa_assert_se(s = i->userdata);
157
158 if (pa_memblockq_peek(s->memblockq, chunk) < 0)
159 return -1;
160
161 pa_memblockq_drop(s->memblockq, chunk->length);
162
163 return 0;
164 }
165
166 /* Called from I/O thread context */
sink_input_process_rewind_cb(pa_sink_input * i,size_t nbytes)167 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
168 struct session *s;
169
170 pa_sink_input_assert_ref(i);
171 pa_assert_se(s = i->userdata);
172
173 pa_memblockq_rewind(s->memblockq, nbytes);
174 }
175
176 /* Called from I/O thread context */
sink_input_update_max_rewind_cb(pa_sink_input * i,size_t nbytes)177 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
178 struct session *s;
179
180 pa_sink_input_assert_ref(i);
181 pa_assert_se(s = i->userdata);
182
183 pa_memblockq_set_maxrewind(s->memblockq, nbytes);
184 }
185
186 /* Called from main context */
sink_input_kill(pa_sink_input * i)187 static void sink_input_kill(pa_sink_input* i) {
188 struct session *s;
189 pa_sink_input_assert_ref(i);
190 pa_assert_se(s = i->userdata);
191
192 pa_hashmap_remove_and_free(s->userdata->by_origin, s->sdp_info.origin);
193 }
194
195 /* Called from IO context */
sink_input_suspend_within_thread(pa_sink_input * i,bool b)196 static void sink_input_suspend_within_thread(pa_sink_input* i, bool b) {
197 struct session *s;
198 pa_sink_input_assert_ref(i);
199 pa_assert_se(s = i->userdata);
200
201 if (b)
202 pa_memblockq_flush_read(s->memblockq);
203 else
204 s->first_packet = false;
205 }
206
207 /* Called from I/O thread context */
rtpoll_work_cb(pa_rtpoll_item * i)208 static int rtpoll_work_cb(pa_rtpoll_item *i) {
209 pa_memchunk chunk;
210 uint32_t timestamp;
211 int64_t k, j, delta;
212 struct timeval now = { 0, 0 };
213 struct session *s;
214 struct pollfd *p;
215
216 pa_assert_se(s = pa_rtpoll_item_get_work_userdata(i));
217
218 p = pa_rtpoll_item_get_pollfd(i, NULL);
219
220 if (p->revents & (POLLERR|POLLNVAL|POLLHUP|POLLOUT)) {
221 pa_log("poll() signalled bad revents.");
222 return -1;
223 }
224
225 if ((p->revents & POLLIN) == 0)
226 return 0;
227
228 p->revents = 0;
229
230 if (pa_rtp_recv(s->rtp_context, &chunk, s->userdata->module->core->mempool, ×tamp, &now) < 0)
231 return 0;
232
233 if (!PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) {
234 pa_memblock_unref(chunk.memblock);
235 return 0;
236 }
237
238 if (!s->first_packet) {
239 s->first_packet = true;
240 s->offset = timestamp;
241 }
242
243 /* Check whether there was a timestamp overflow */
244 k = (int64_t) timestamp - (int64_t) s->offset;
245 j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) timestamp;
246
247 if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
248 delta = k;
249 else
250 delta = j;
251
252 pa_memblockq_seek(s->memblockq, delta * (int64_t) pa_rtp_context_get_frame_size(s->rtp_context), PA_SEEK_RELATIVE,
253 true);
254
255 if (now.tv_sec == 0) {
256 PA_ONCE_BEGIN {
257 pa_log_warn("Using artificial time instead of timestamp");
258 } PA_ONCE_END;
259 pa_rtclock_get(&now);
260 } else
261 pa_rtclock_from_wallclock(&now);
262
263 if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
264 pa_log_warn("Queue overrun");
265 pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, true);
266 }
267
268 /* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
269
270 pa_memblock_unref(chunk.memblock);
271
272 /* The next timestamp we expect */
273 s->offset = timestamp + (uint32_t) (chunk.length / pa_rtp_context_get_frame_size(s->rtp_context));
274
275 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
276
277 if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) {
278 pa_usec_t wi, ri, render_delay, sink_delay = 0, latency;
279 uint32_t current_rate = s->sink_input->sample_spec.rate;
280 uint32_t new_rate;
281 double estimated_rate, alpha = 0.02;
282
283 pa_log_debug("Updating sample rate");
284
285 wi = pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec);
286 ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec);
287
288 pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi, (unsigned long) ri);
289
290 sink_delay = pa_sink_get_latency_within_thread(s->sink_input->sink, false);
291 sink_delay += pa_resampler_get_delay_usec(s->sink_input->thread_info.resampler);
292 render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec);
293
294 if (ri > render_delay+sink_delay)
295 ri -= render_delay+sink_delay;
296 else
297 ri = 0;
298
299 if (wi < ri)
300 latency = 0;
301 else
302 latency = wi - ri;
303
304 pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double) s->intended_latency/PA_USEC_PER_MSEC);
305
306 /* The buffer is filling with some unknown rate R̂ samples/second. If the rate of reading in
307 * the last T seconds was Rⁿ, then the increase in buffer latency ΔLⁿ = Lⁿ - Lⁿ⁻ⁱ in that
308 * same period is ΔLⁿ = (TR̂ - TRⁿ) / R̂, giving the estimated target rate
309 * T
310 * R̂ = ─────────────── Rⁿ . (1)
311 * T - (Lⁿ - Lⁿ⁻ⁱ)
312 *
313 * Setting the sample rate to R̂ results in the latency being constant (if the estimate of R̂
314 * is correct). But there is also the requirement to keep the buffer at a predefined target
315 * latency L̂. So instead of setting Rⁿ⁺ⁱ to R̂ immediately, the strategy will be to reduce R
316 * from Rⁿ⁺ⁱ to R̂ in a steps of T seconds, where Rⁿ⁺ⁱ is chosen such that in the total time
317 * aT the latency is reduced from Lⁿ to L̂. This strategy translates to the requirements
318 * ₐ R̂ - Rⁿ⁺ʲ a-j+1 j-1
319 * Σ T ────────── = L̂ - Lⁿ with Rⁿ⁺ʲ = ───── Rⁿ⁺ⁱ + ───── R̂ .
320 * ʲ⁼ⁱ R̂ a a
321 * Solving for Rⁿ⁺ⁱ gives
322 * T - ²∕ₐ₊₁(L̂ - Lⁿ)
323 * Rⁿ⁺ⁱ = ───────────────── R̂ . (2)
324 * T
325 * In the code below a = 7 is used.
326 *
327 * Equation (1) is not directly used in (2), but instead an exponentially weighted average
328 * of the estimated rate R̂ is used. This average R̅ is defined as
329 * R̅ⁿ = α R̂ⁿ + (1-α) R̅ⁿ⁻ⁱ .
330 * Because it is difficult to find a fixed value for the coefficient α such that the
331 * averaging is without significant lag but oscillations are filtered out, a heuristic is
332 * used. When the successive estimates R̂ⁿ do not change much then α→1, but when there is a
333 * sudden spike in the estimated rate α→0, such that the deviation is given little weight.
334 */
335 estimated_rate = (double) current_rate * (double) RATE_UPDATE_INTERVAL / (double) (RATE_UPDATE_INTERVAL + s->last_latency - latency);
336 if (fabs(s->estimated_rate - s->avg_estimated_rate) > 1) {
337 double ratio = (estimated_rate + s->estimated_rate - 2*s->avg_estimated_rate) / (s->estimated_rate - s->avg_estimated_rate);
338 alpha = PA_CLAMP(2 * (ratio + fabs(ratio)) / (4 + ratio*ratio), 0.02, 0.8);
339 }
340 s->avg_estimated_rate = alpha * estimated_rate + (1-alpha) * s->avg_estimated_rate;
341 s->estimated_rate = estimated_rate;
342 pa_log_debug("Estimated target rate: %.0f Hz, using average of %.0f Hz (α=%.3f)", estimated_rate, s->avg_estimated_rate, alpha);
343 new_rate = (uint32_t) ((double) (RATE_UPDATE_INTERVAL + latency/4 - s->intended_latency/4) / (double) RATE_UPDATE_INTERVAL * s->avg_estimated_rate);
344 s->last_latency = latency;
345
346 if (new_rate < (uint32_t) (s->base_rate*0.8) || new_rate > (uint32_t) (s->base_rate*1.25)) {
347 pa_log_warn("Sample rates too different, not adjusting (%u vs. %u).", s->base_rate, new_rate);
348 new_rate = s->base_rate;
349 } else {
350 if (s->base_rate < new_rate + 20 && new_rate < s->base_rate + 20)
351 new_rate = s->base_rate;
352 /* Do the adjustment in small steps; 2‰ can be considered inaudible */
353 if (new_rate < (uint32_t) (current_rate*0.998) || new_rate > (uint32_t) (current_rate*1.002)) {
354 pa_log_info("New rate of %u Hz not within 2‰ of %u Hz, forcing smaller adjustment", new_rate, current_rate);
355 new_rate = PA_CLAMP(new_rate, (uint32_t) (current_rate*0.998), (uint32_t) (current_rate*1.002));
356 }
357 }
358 s->sink_input->sample_spec.rate = new_rate;
359
360 pa_assert(pa_sample_spec_valid(&s->sink_input->sample_spec));
361
362 pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate);
363
364 pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s->sink_input->sample_spec.rate);
365
366 s->last_rate_update = pa_timeval_load(&now);
367 }
368
369 if (pa_memblockq_is_readable(s->memblockq) &&
370 s->sink_input->thread_info.underrun_for > 0) {
371 pa_log_debug("Requesting rewind due to end of underrun");
372 pa_sink_input_request_rewind(s->sink_input,
373 (size_t) (s->sink_input->thread_info.underrun_for == (uint64_t) -1 ? 0 : s->sink_input->thread_info.underrun_for),
374 false, true, false);
375 }
376
377 return 1;
378 }
379
380 /* Called from I/O thread context */
sink_input_attach(pa_sink_input * i)381 static void sink_input_attach(pa_sink_input *i) {
382 struct session *s;
383
384 pa_sink_input_assert_ref(i);
385 pa_assert_se(s = i->userdata);
386
387 pa_assert(!s->rtpoll_item);
388 s->rtpoll_item = pa_rtp_context_get_rtpoll_item(s->rtp_context, i->sink->thread_info.rtpoll);
389
390 pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb, s);
391 }
392
393 /* Called from I/O thread context */
sink_input_detach(pa_sink_input * i)394 static void sink_input_detach(pa_sink_input *i) {
395 struct session *s;
396 pa_sink_input_assert_ref(i);
397 pa_assert_se(s = i->userdata);
398
399 pa_assert(s->rtpoll_item);
400 pa_rtpoll_item_free(s->rtpoll_item);
401 s->rtpoll_item = NULL;
402 }
403
mcast_socket(const struct sockaddr * sa,socklen_t salen)404 static int mcast_socket(const struct sockaddr* sa, socklen_t salen) {
405 int af, fd = -1, r, one;
406
407 pa_assert(sa);
408 pa_assert(salen > 0);
409
410 af = sa->sa_family;
411 if ((fd = pa_socket_cloexec(af, SOCK_DGRAM, 0)) < 0) {
412 pa_log("Failed to create socket: %s", pa_cstrerror(errno));
413 goto fail;
414 }
415
416 pa_make_udp_socket_low_delay(fd);
417
418 #ifdef SO_TIMESTAMP
419 one = 1;
420 if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) {
421 pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno));
422 goto fail;
423 }
424 #else
425 pa_log("SO_TIMESTAMP unsupported on this platform");
426 goto fail;
427 #endif
428
429 one = 1;
430 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
431 pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno));
432 goto fail;
433 }
434
435 r = 0;
436 if (af == AF_INET) {
437 /* IPv4 multicast addresses are in the 224.0.0.0-239.255.255.255 range */
438 static const uint32_t ipv4_mcast_mask = 0xe0000000;
439
440 if ((ntohl(((const struct sockaddr_in*) sa)->sin_addr.s_addr) & ipv4_mcast_mask) == ipv4_mcast_mask) {
441 struct ip_mreq mr4;
442 memset(&mr4, 0, sizeof(mr4));
443 mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr;
444 r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
445 }
446 #ifdef HAVE_IPV6
447 } else if (af == AF_INET6) {
448 /* IPv6 multicast addresses have 255 as the most significant byte */
449 if (((const struct sockaddr_in6*) sa)->sin6_addr.s6_addr[0] == 0xff) {
450 struct ipv6_mreq mr6;
451 memset(&mr6, 0, sizeof(mr6));
452 mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr;
453 r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6));
454 }
455 #endif
456 } else
457 pa_assert_not_reached();
458
459 if (r < 0) {
460 pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno));
461 goto fail;
462 }
463
464 if (bind(fd, sa, salen) < 0) {
465 pa_log("bind() failed: %s", pa_cstrerror(errno));
466 goto fail;
467 }
468
469 return fd;
470
471 fail:
472 if (fd >= 0)
473 close(fd);
474
475 return -1;
476 }
477
session_new(struct userdata * u,const pa_sdp_info * sdp_info)478 static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) {
479 struct session *s = NULL;
480 pa_sink *sink;
481 int fd = -1;
482 pa_memchunk silence;
483 pa_sink_input_new_data data;
484 struct timeval now;
485
486 pa_assert(u);
487 pa_assert(sdp_info);
488
489 if (u->n_sessions >= MAX_SESSIONS) {
490 pa_log("Session limit reached.");
491 goto fail;
492 }
493
494 if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK))) {
495 pa_log("Sink does not exist.");
496 goto fail;
497 }
498
499 pa_rtclock_get(&now);
500
501 s = pa_xnew0(struct session, 1);
502 s->userdata = u;
503 s->first_packet = false;
504 s->sdp_info = *sdp_info;
505 s->rtpoll_item = NULL;
506 s->intended_latency = u->latency;
507 s->last_rate_update = pa_timeval_load(&now);
508 s->last_latency = u->latency;
509 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
510
511 if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0)
512 goto fail;
513
514 pa_sink_input_new_data_init(&data);
515 pa_sink_input_new_data_set_sink(&data, sink, false, true);
516 data.driver = __FILE__;
517 pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream");
518 pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME,
519 "RTP Stream%s%s%s",
520 sdp_info->session_name ? " (" : "",
521 sdp_info->session_name ? sdp_info->session_name : "",
522 sdp_info->session_name ? ")" : "");
523
524 if (sdp_info->session_name)
525 pa_proplist_sets(data.proplist, "rtp.session", sdp_info->session_name);
526 pa_proplist_sets(data.proplist, "rtp.origin", sdp_info->origin);
527 pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload);
528 data.module = u->module;
529 pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec);
530 data.flags = PA_SINK_INPUT_VARIABLE_RATE;
531
532 pa_sink_input_new(&s->sink_input, u->module->core, &data);
533 pa_sink_input_new_data_done(&data);
534
535 if (!s->sink_input) {
536 pa_log("Failed to create sink input.");
537 goto fail;
538 }
539
540 s->base_rate = (double) s->sink_input->sample_spec.rate;
541 s->estimated_rate = (double) s->sink_input->sample_spec.rate;
542 s->avg_estimated_rate = (double) s->sink_input->sample_spec.rate;
543
544 s->sink_input->userdata = s;
545
546 s->sink_input->parent.process_msg = sink_input_process_msg;
547 s->sink_input->pop = sink_input_pop_cb;
548 s->sink_input->process_rewind = sink_input_process_rewind_cb;
549 s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
550 s->sink_input->kill = sink_input_kill;
551 s->sink_input->attach = sink_input_attach;
552 s->sink_input->detach = sink_input_detach;
553 s->sink_input->suspend_within_thread = sink_input_suspend_within_thread;
554
555 pa_sink_input_get_silence(s->sink_input, &silence);
556
557 s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, s->intended_latency/2);
558
559 if (s->intended_latency < s->sink_latency*2)
560 s->intended_latency = s->sink_latency*2;
561
562 s->memblockq = pa_memblockq_new(
563 "module-rtp-recv memblockq",
564 0,
565 MEMBLOCKQ_MAXLENGTH,
566 MEMBLOCKQ_MAXLENGTH,
567 &s->sink_input->sample_spec,
568 pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec),
569 0,
570 0,
571 &silence);
572
573 pa_memblock_unref(silence.memblock);
574
575 if (!(s->rtp_context = pa_rtp_context_new_recv(fd, sdp_info->payload, &s->sdp_info.sample_spec, sdp_info->enable_opus)))
576 goto fail;
577
578 pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
579 u->n_sessions++;
580 PA_LLIST_PREPEND(struct session, s->userdata->sessions, s);
581
582 pa_sink_input_put(s->sink_input);
583
584 pa_log_info("New session '%s'", s->sdp_info.session_name);
585
586 return s;
587
588 fail:
589 pa_xfree(s);
590
591 if (fd >= 0)
592 pa_close(fd);
593
594 return NULL;
595 }
596
session_free(struct session * s)597 static void session_free(struct session *s) {
598 pa_assert(s);
599
600 pa_log_info("Freeing session '%s'", s->sdp_info.session_name);
601
602 pa_sink_input_unlink(s->sink_input);
603 pa_sink_input_unref(s->sink_input);
604
605 PA_LLIST_REMOVE(struct session, s->userdata->sessions, s);
606 pa_assert(s->userdata->n_sessions >= 1);
607 s->userdata->n_sessions--;
608
609 pa_memblockq_free(s->memblockq);
610 pa_sdp_info_destroy(&s->sdp_info);
611 pa_rtp_context_free(s->rtp_context);
612
613 pa_xfree(s);
614 }
615
sap_event_cb(pa_mainloop_api * m,pa_io_event * e,int fd,pa_io_event_flags_t flags,void * userdata)616 static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) {
617 struct userdata *u = userdata;
618 bool goodbye = false;
619 pa_sdp_info info;
620 struct session *s;
621
622 pa_assert(m);
623 pa_assert(e);
624 pa_assert(u);
625 pa_assert(fd == u->sap_context.fd);
626 pa_assert(flags == PA_IO_EVENT_INPUT);
627
628 if (pa_sap_recv(&u->sap_context, &goodbye) < 0)
629 return;
630
631 if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye))
632 return;
633
634 if (goodbye) {
635 pa_hashmap_remove_and_free(u->by_origin, info.origin);
636 pa_sdp_info_destroy(&info);
637 } else {
638
639 if (!(s = pa_hashmap_get(u->by_origin, info.origin))) {
640 if (!session_new(u, &info))
641 pa_sdp_info_destroy(&info);
642
643 } else {
644 struct timeval now;
645 pa_rtclock_get(&now);
646 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
647
648 pa_sdp_info_destroy(&info);
649 }
650 }
651 }
652
check_death_event_cb(pa_mainloop_api * m,pa_time_event * t,const struct timeval * tv,void * userdata)653 static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *tv, void *userdata) {
654 struct session *s, *n;
655 struct userdata *u = userdata;
656 struct timeval now;
657
658 pa_assert(m);
659 pa_assert(t);
660 pa_assert(u);
661
662 pa_rtclock_get(&now);
663
664 pa_log_debug("Checking for dead streams ...");
665
666 for (s = u->sessions; s; s = n) {
667 int k;
668 n = s->next;
669
670 k = pa_atomic_load(&s->timestamp);
671
672 if (k + DEATH_TIMEOUT < now.tv_sec)
673 pa_hashmap_remove_and_free(u->by_origin, s->sdp_info.origin);
674 }
675
676 /* Restart timer */
677 pa_core_rttime_restart(u->module->core, t, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC);
678 }
679
pa__init(pa_module * m)680 int pa__init(pa_module*m) {
681 struct userdata *u;
682 pa_modargs *ma = NULL;
683 #if defined(HAVE_GETADDRINFO)
684 struct addrinfo *sap_addrinfo = NULL;
685 #else
686 struct sockaddr_in sa4;
687 #ifdef HAVE_IPV6
688 struct sockaddr_in6 sa6;
689 #endif
690 #endif
691 struct sockaddr *sa;
692 socklen_t salen;
693 const char *sap_address;
694 uint32_t latency_msec;
695 int fd = -1;
696
697 pa_assert(m);
698
699 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
700 pa_log("failed to parse module arguments");
701 goto fail;
702 }
703
704 sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS);
705
706 #if defined(HAVE_GETADDRINFO)
707 {
708 struct addrinfo hints;
709 char *service;
710
711 pa_zero(hints);
712
713 service = pa_sprintf_malloc("%d", htons(SAP_PORT));
714
715 hints.ai_flags = AI_NUMERICHOST;
716 if (getaddrinfo(sap_address, service, &hints, &sap_addrinfo) != 0) {
717 pa_xfree(service);
718 pa_log("Invalid SAP address '%s'", sap_address);
719 goto fail;
720 }
721 pa_xfree(service);
722
723 sa = sap_addrinfo->ai_addr;
724 salen = sap_addrinfo->ai_addrlen;
725 }
726 #else
727 if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) {
728 sa4.sin_family = AF_INET;
729 sa4.sin_port = htons(SAP_PORT);
730 sa = (struct sockaddr*) &sa4;
731 salen = sizeof(sa4);
732 #ifdef HAVE_IPV6
733 } else if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) {
734 sa6.sin6_family = AF_INET6;
735 sa6.sin6_port = htons(SAP_PORT);
736 sa = (struct sockaddr*) &sa6;
737 salen = sizeof(sa6);
738 #endif
739 } else {
740 pa_log("Invalid SAP address '%s'", sap_address);
741 goto fail;
742 }
743 #endif
744
745 latency_msec = DEFAULT_LATENCY_MSEC;
746 if (pa_modargs_get_value_u32(ma, "latency_msec", &latency_msec) < 0 || latency_msec < 1 || latency_msec > 300000) {
747 pa_log("Invalid latency specification");
748 goto fail;
749 }
750
751 if ((fd = mcast_socket(sa, salen)) < 0)
752 goto fail;
753
754 m->userdata = u = pa_xnew(struct userdata, 1);
755 u->module = m;
756 u->core = m->core;
757 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
758 u->latency = (pa_usec_t) latency_msec * PA_USEC_PER_MSEC;
759
760 u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u);
761 pa_sap_context_init_recv(&u->sap_context, fd);
762
763 PA_LLIST_HEAD_INIT(struct session, u->sessions);
764 u->n_sessions = 0;
765 u->by_origin = pa_hashmap_new_full(pa_idxset_string_hash_func, pa_idxset_string_compare_func, NULL, (pa_free_cb_t) session_free);
766
767 u->check_death_event = pa_core_rttime_new(m->core, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC, check_death_event_cb, u);
768
769 pa_modargs_free(ma);
770
771 #if defined(HAVE_GETADDRINFO)
772 freeaddrinfo(sap_addrinfo);
773 #endif
774
775 return 0;
776
777 fail:
778 #if defined(HAVE_GETADDRINFO)
779 if (sap_addrinfo)
780 freeaddrinfo(sap_addrinfo);
781 #endif
782
783 if (ma)
784 pa_modargs_free(ma);
785
786 if (fd >= 0)
787 pa_close(fd);
788
789 return -1;
790 }
791
pa__done(pa_module * m)792 void pa__done(pa_module*m) {
793 struct userdata *u;
794
795 pa_assert(m);
796
797 if (!(u = m->userdata))
798 return;
799
800 if (u->sap_event)
801 m->core->mainloop->io_free(u->sap_event);
802
803 if (u->check_death_event)
804 m->core->mainloop->time_free(u->check_death_event);
805
806 pa_sap_context_destroy(&u->sap_context);
807
808 if (u->by_origin)
809 pa_hashmap_free(u->by_origin);
810
811 pa_xfree(u->sink_name);
812 pa_xfree(u);
813 }
814