1
2 /***
3 This file is part of PulseAudio.
4
5 Copyright 2006 Lennart Poettering
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24
25 #include <stdio.h>
26 #include <sys/socket.h>
27 #include <netinet/in.h>
28 #include <errno.h>
29 #include <string.h>
30 #include <unistd.h>
31 #include <math.h>
32
33 #include <pulse/rtclock.h>
34 #include <pulse/timeval.h>
35 #include <pulse/xmalloc.h>
36
37 #include <pulsecore/core-error.h>
38 #include <pulsecore/module.h>
39 #include <pulsecore/llist.h>
40 #include <pulsecore/sink.h>
41 #include <pulsecore/sink-input.h>
42 #include <pulsecore/memblockq.h>
43 #include <pulsecore/log.h>
44 #include <pulsecore/core-rtclock.h>
45 #include <pulsecore/core-util.h>
46 #include <pulsecore/modargs.h>
47 #include <pulsecore/namereg.h>
48 #include <pulsecore/sample-util.h>
49 #include <pulsecore/macro.h>
50 #include <pulsecore/socket-util.h>
51 #include <pulsecore/atomic.h>
52 #include <pulsecore/once.h>
53 #include <pulsecore/poll.h>
54 #include <pulsecore/arpa-inet.h>
55
56 #include "rtp.h"
57 #include "sdp.h"
58 #include "sap.h"
59
60 PA_MODULE_AUTHOR("Lennart Poettering");
61 PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
62 PA_MODULE_VERSION(PACKAGE_VERSION);
63 PA_MODULE_LOAD_ONCE(false);
64 PA_MODULE_USAGE(
65 "sink=<name of the sink> "
66 "sap_address=<multicast address to listen on> "
67 "latency_msec=<latency in ms> "
68 );
69
70 #define SAP_PORT 9875
71 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
72 #define DEFAULT_LATENCY_MSEC 500
73 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
74 #define MAX_SESSIONS 16
75 #define DEATH_TIMEOUT 20
76 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
77
78 static const char* const valid_modargs[] = {
79 "sink",
80 "sap_address",
81 "latency_msec",
82 NULL
83 };
84
85 struct session {
86 struct userdata *userdata;
87 PA_LLIST_FIELDS(struct session);
88
89 pa_sink_input *sink_input;
90 pa_memblockq *memblockq;
91
92 bool first_packet;
93 uint32_t offset;
94
95 struct pa_sdp_info sdp_info;
96
97 pa_rtp_context *rtp_context;
98
99 pa_rtpoll_item *rtpoll_item;
100
101 pa_atomic_t timestamp;
102
103 pa_usec_t intended_latency;
104 pa_usec_t sink_latency;
105
106 unsigned int base_rate;
107 pa_usec_t last_rate_update;
108 pa_usec_t last_latency;
109 double estimated_rate;
110 double avg_estimated_rate;
111 };
112
113 struct userdata {
114 pa_module *module;
115 pa_core *core;
116
117 pa_sap_context sap_context;
118 pa_io_event* sap_event;
119
120 pa_time_event *check_death_event;
121
122 char *sink_name;
123
124 PA_LLIST_HEAD(struct session, sessions);
125 pa_hashmap *by_origin;
126 int n_sessions;
127
128 pa_usec_t latency;
129 };
130
131 static void session_free(struct session *s);
132
133 /* Called from I/O thread context */
sink_input_process_msg(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)134 static int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
135 struct session *s = PA_SINK_INPUT(o)->userdata;
136
137 switch (code) {
138 case PA_SINK_INPUT_MESSAGE_GET_LATENCY:
139 *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
140
141 /* Fall through, the default handler will add in the extra
142 * latency added by the resampler */
143 break;
144 }
145
146 return pa_sink_input_process_msg(o, code, data, offset, chunk);
147 }
148
149 /* Called from I/O thread context */
sink_input_pop_cb(pa_sink_input * i,size_t length,pa_memchunk * chunk)150 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
151 struct session *s;
152 pa_sink_input_assert_ref(i);
153 pa_assert_se(s = i->userdata);
154
155 if (pa_memblockq_peek(s->memblockq, chunk) < 0)
156 return -1;
157
158 pa_memblockq_drop(s->memblockq, chunk->length);
159
160 return 0;
161 }
162
163 /* Called from I/O thread context */
sink_input_process_rewind_cb(pa_sink_input * i,size_t nbytes)164 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
165 struct session *s;
166
167 pa_sink_input_assert_ref(i);
168 pa_assert_se(s = i->userdata);
169
170 pa_memblockq_rewind(s->memblockq, nbytes);
171 }
172
173 /* Called from I/O thread context */
sink_input_update_max_rewind_cb(pa_sink_input * i,size_t nbytes)174 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
175 struct session *s;
176
177 pa_sink_input_assert_ref(i);
178 pa_assert_se(s = i->userdata);
179
180 pa_memblockq_set_maxrewind(s->memblockq, nbytes);
181 }
182
183 /* Called from main context */
sink_input_kill(pa_sink_input * i)184 static void sink_input_kill(pa_sink_input* i) {
185 struct session *s;
186 pa_sink_input_assert_ref(i);
187 pa_assert_se(s = i->userdata);
188
189 pa_hashmap_remove_and_free(s->userdata->by_origin, s->sdp_info.origin);
190 }
191
192 /* Called from IO context */
sink_input_suspend_within_thread(pa_sink_input * i,bool b)193 static void sink_input_suspend_within_thread(pa_sink_input* i, bool b) {
194 struct session *s;
195 pa_sink_input_assert_ref(i);
196 pa_assert_se(s = i->userdata);
197
198 if (b)
199 pa_memblockq_flush_read(s->memblockq);
200 else
201 s->first_packet = false;
202 }
203
204 /* Called from I/O thread context */
rtpoll_work_cb(pa_rtpoll_item * i)205 static int rtpoll_work_cb(pa_rtpoll_item *i) {
206 pa_memchunk chunk;
207 uint32_t timestamp;
208 int64_t k, j, delta;
209 struct timeval now = { 0, 0 };
210 struct session *s;
211 struct pollfd *p;
212
213 pa_assert_se(s = pa_rtpoll_item_get_work_userdata(i));
214
215 p = pa_rtpoll_item_get_pollfd(i, NULL);
216
217 if (p->revents & (POLLERR|POLLNVAL|POLLHUP|POLLOUT)) {
218 pa_log("poll() signalled bad revents.");
219 return -1;
220 }
221
222 if ((p->revents & POLLIN) == 0)
223 return 0;
224
225 p->revents = 0;
226
227 if (pa_rtp_recv(s->rtp_context, &chunk, s->userdata->module->core->mempool, ×tamp, &now) < 0)
228 return 0;
229
230 if (!PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) {
231 pa_memblock_unref(chunk.memblock);
232 return 0;
233 }
234
235 if (!s->first_packet) {
236 s->first_packet = true;
237 s->offset = timestamp;
238 }
239
240 /* Check whether there was a timestamp overflow */
241 k = (int64_t) timestamp - (int64_t) s->offset;
242 j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) timestamp;
243
244 if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
245 delta = k;
246 else
247 delta = j;
248
249 pa_memblockq_seek(s->memblockq, delta * (int64_t) pa_rtp_context_get_frame_size(s->rtp_context), PA_SEEK_RELATIVE,
250 true);
251
252 if (now.tv_sec == 0) {
253 PA_ONCE_BEGIN {
254 pa_log_warn("Using artificial time instead of timestamp");
255 } PA_ONCE_END;
256 pa_rtclock_get(&now);
257 } else
258 pa_rtclock_from_wallclock(&now);
259
260 if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
261 pa_log_warn("Queue overrun");
262 pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, true);
263 }
264
265 /* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
266
267 pa_memblock_unref(chunk.memblock);
268
269 /* The next timestamp we expect */
270 s->offset = timestamp + (uint32_t) (chunk.length / pa_rtp_context_get_frame_size(s->rtp_context));
271
272 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
273
274 if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) {
275 pa_usec_t wi, ri, render_delay, sink_delay = 0, latency;
276 uint32_t current_rate = s->sink_input->sample_spec.rate;
277 uint32_t new_rate;
278 double estimated_rate, alpha = 0.02;
279
280 pa_log_debug("Updating sample rate");
281
282 wi = pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec);
283 ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec);
284
285 pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi, (unsigned long) ri);
286
287 sink_delay = pa_sink_get_latency_within_thread(s->sink_input->sink, false);
288 render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec);
289
290 if (ri > render_delay+sink_delay)
291 ri -= render_delay+sink_delay;
292 else
293 ri = 0;
294
295 if (wi < ri)
296 latency = 0;
297 else
298 latency = wi - ri;
299
300 pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double) s->intended_latency/PA_USEC_PER_MSEC);
301
302 /* The buffer is filling with some unknown rate R̂ samples/second. If the rate of reading in
303 * the last T seconds was Rⁿ, then the increase in buffer latency ΔLⁿ = Lⁿ - Lⁿ⁻ⁱ in that
304 * same period is ΔLⁿ = (TR̂ - TRⁿ) / R̂, giving the estimated target rate
305 * T
306 * R̂ = ─────────────── Rⁿ . (1)
307 * T - (Lⁿ - Lⁿ⁻ⁱ)
308 *
309 * Setting the sample rate to R̂ results in the latency being constant (if the estimate of R̂
310 * is correct). But there is also the requirement to keep the buffer at a predefined target
311 * latency L̂. So instead of setting Rⁿ⁺ⁱ to R̂ immediately, the strategy will be to reduce R
312 * from Rⁿ⁺ⁱ to R̂ in a steps of T seconds, where Rⁿ⁺ⁱ is chosen such that in the total time
313 * aT the latency is reduced from Lⁿ to L̂. This strategy translates to the requirements
314 * ₐ R̂ - Rⁿ⁺ʲ a-j+1 j-1
315 * Σ T ────────── = L̂ - Lⁿ with Rⁿ⁺ʲ = ───── Rⁿ⁺ⁱ + ───── R̂ .
316 * ʲ⁼ⁱ R̂ a a
317 * Solving for Rⁿ⁺ⁱ gives
318 * T - ²∕ₐ₊₁(L̂ - Lⁿ)
319 * Rⁿ⁺ⁱ = ───────────────── R̂ . (2)
320 * T
321 * In the code below a = 7 is used.
322 *
323 * Equation (1) is not directly used in (2), but instead an exponentially weighted average
324 * of the estimated rate R̂ is used. This average R̅ is defined as
325 * R̅ⁿ = α R̂ⁿ + (1-α) R̅ⁿ⁻ⁱ .
326 * Because it is difficult to find a fixed value for the coefficient α such that the
327 * averaging is without significant lag but oscillations are filtered out, a heuristic is
328 * used. When the successive estimates R̂ⁿ do not change much then α→1, but when there is a
329 * sudden spike in the estimated rate α→0, such that the deviation is given little weight.
330 */
331 estimated_rate = (double) current_rate * (double) RATE_UPDATE_INTERVAL / (double) (RATE_UPDATE_INTERVAL + s->last_latency - latency);
332 if (fabs(s->estimated_rate - s->avg_estimated_rate) > 1) {
333 double ratio = (estimated_rate + s->estimated_rate - 2*s->avg_estimated_rate) / (s->estimated_rate - s->avg_estimated_rate);
334 alpha = PA_CLAMP(2 * (ratio + fabs(ratio)) / (4 + ratio*ratio), 0.02, 0.8);
335 }
336 s->avg_estimated_rate = alpha * estimated_rate + (1-alpha) * s->avg_estimated_rate;
337 s->estimated_rate = estimated_rate;
338 pa_log_debug("Estimated target rate: %.0f Hz, using average of %.0f Hz (α=%.3f)", estimated_rate, s->avg_estimated_rate, alpha);
339 new_rate = (uint32_t) ((double) (RATE_UPDATE_INTERVAL + latency/4 - s->intended_latency/4) / (double) RATE_UPDATE_INTERVAL * s->avg_estimated_rate);
340 s->last_latency = latency;
341
342 if (new_rate < (uint32_t) (s->base_rate*0.8) || new_rate > (uint32_t) (s->base_rate*1.25)) {
343 pa_log_warn("Sample rates too different, not adjusting (%u vs. %u).", s->base_rate, new_rate);
344 new_rate = s->base_rate;
345 } else {
346 if (s->base_rate < new_rate + 20 && new_rate < s->base_rate + 20)
347 new_rate = s->base_rate;
348 /* Do the adjustment in small steps; 2‰ can be considered inaudible */
349 if (new_rate < (uint32_t) (current_rate*0.998) || new_rate > (uint32_t) (current_rate*1.002)) {
350 pa_log_info("New rate of %u Hz not within 2‰ of %u Hz, forcing smaller adjustment", new_rate, current_rate);
351 new_rate = PA_CLAMP(new_rate, (uint32_t) (current_rate*0.998), (uint32_t) (current_rate*1.002));
352 }
353 }
354 s->sink_input->sample_spec.rate = new_rate;
355
356 pa_assert(pa_sample_spec_valid(&s->sink_input->sample_spec));
357
358 pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate);
359
360 pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s->sink_input->sample_spec.rate);
361
362 s->last_rate_update = pa_timeval_load(&now);
363 }
364
365 if (pa_memblockq_is_readable(s->memblockq) &&
366 s->sink_input->thread_info.underrun_for > 0) {
367 pa_log_debug("Requesting rewind due to end of underrun");
368 pa_sink_input_request_rewind(s->sink_input,
369 (size_t) (s->sink_input->thread_info.underrun_for == (uint64_t) -1 ? 0 : s->sink_input->thread_info.underrun_for),
370 false, true, false);
371 }
372
373 return 1;
374 }
375
376 /* Called from I/O thread context */
sink_input_attach(pa_sink_input * i)377 static void sink_input_attach(pa_sink_input *i) {
378 struct session *s;
379
380 pa_sink_input_assert_ref(i);
381 pa_assert_se(s = i->userdata);
382
383 pa_assert(!s->rtpoll_item);
384 s->rtpoll_item = pa_rtp_context_get_rtpoll_item(s->rtp_context, i->sink->thread_info.rtpoll);
385
386 pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb, s);
387 }
388
389 /* Called from I/O thread context */
sink_input_detach(pa_sink_input * i)390 static void sink_input_detach(pa_sink_input *i) {
391 struct session *s;
392 pa_sink_input_assert_ref(i);
393 pa_assert_se(s = i->userdata);
394
395 pa_assert(s->rtpoll_item);
396 pa_rtpoll_item_free(s->rtpoll_item);
397 s->rtpoll_item = NULL;
398 }
399
mcast_socket(const struct sockaddr * sa,socklen_t salen)400 static int mcast_socket(const struct sockaddr* sa, socklen_t salen) {
401 int af, fd = -1, r, one;
402
403 pa_assert(sa);
404 pa_assert(salen > 0);
405
406 af = sa->sa_family;
407 if ((fd = pa_socket_cloexec(af, SOCK_DGRAM, 0)) < 0) {
408 pa_log("Failed to create socket: %s", pa_cstrerror(errno));
409 goto fail;
410 }
411
412 pa_make_udp_socket_low_delay(fd);
413
414 #ifdef SO_TIMESTAMP
415 one = 1;
416 if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) {
417 pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno));
418 goto fail;
419 }
420 #else
421 pa_log("SO_TIMESTAMP unsupported on this platform");
422 goto fail;
423 #endif
424
425 one = 1;
426 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
427 pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno));
428 goto fail;
429 }
430
431 r = 0;
432 if (af == AF_INET) {
433 /* IPv4 multicast addresses are in the 224.0.0.0-239.255.255.255 range */
434 static const uint32_t ipv4_mcast_mask = 0xe0000000;
435
436 if ((ntohl(((const struct sockaddr_in*) sa)->sin_addr.s_addr) & ipv4_mcast_mask) == ipv4_mcast_mask) {
437 struct ip_mreq mr4;
438 memset(&mr4, 0, sizeof(mr4));
439 mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr;
440 r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
441 }
442 #ifdef HAVE_IPV6
443 } else if (af == AF_INET6) {
444 /* IPv6 multicast addresses have 255 as the most significant byte */
445 if (((const struct sockaddr_in6*) sa)->sin6_addr.s6_addr[0] == 0xff) {
446 struct ipv6_mreq mr6;
447 memset(&mr6, 0, sizeof(mr6));
448 mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr;
449 r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6));
450 }
451 #endif
452 } else
453 pa_assert_not_reached();
454
455 if (r < 0) {
456 pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno));
457 goto fail;
458 }
459
460 if (bind(fd, sa, salen) < 0) {
461 pa_log("bind() failed: %s", pa_cstrerror(errno));
462 goto fail;
463 }
464
465 return fd;
466
467 fail:
468 if (fd >= 0)
469 close(fd);
470
471 return -1;
472 }
473
session_new(struct userdata * u,const pa_sdp_info * sdp_info)474 static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) {
475 struct session *s = NULL;
476 pa_sink *sink;
477 int fd = -1;
478 pa_memchunk silence;
479 pa_sink_input_new_data data;
480 struct timeval now;
481
482 pa_assert(u);
483 pa_assert(sdp_info);
484
485 if (u->n_sessions >= MAX_SESSIONS) {
486 pa_log("Session limit reached.");
487 goto fail;
488 }
489
490 if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK))) {
491 pa_log("Sink does not exist.");
492 goto fail;
493 }
494
495 pa_rtclock_get(&now);
496
497 s = pa_xnew0(struct session, 1);
498 s->userdata = u;
499 s->first_packet = false;
500 s->sdp_info = *sdp_info;
501 s->rtpoll_item = NULL;
502 s->intended_latency = u->latency;
503 s->last_rate_update = pa_timeval_load(&now);
504 s->last_latency = u->latency;
505 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
506
507 if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0)
508 goto fail;
509
510 pa_sink_input_new_data_init(&data);
511 pa_sink_input_new_data_set_sink(&data, sink, false, true);
512 data.driver = __FILE__;
513 pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream");
514 pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME,
515 "RTP Stream%s%s%s",
516 sdp_info->session_name ? " (" : "",
517 sdp_info->session_name ? sdp_info->session_name : "",
518 sdp_info->session_name ? ")" : "");
519
520 if (sdp_info->session_name)
521 pa_proplist_sets(data.proplist, "rtp.session", sdp_info->session_name);
522 pa_proplist_sets(data.proplist, "rtp.origin", sdp_info->origin);
523 pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload);
524 data.module = u->module;
525 pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec);
526 data.flags = PA_SINK_INPUT_VARIABLE_RATE;
527
528 pa_sink_input_new(&s->sink_input, u->module->core, &data);
529 pa_sink_input_new_data_done(&data);
530
531 if (!s->sink_input) {
532 pa_log("Failed to create sink input.");
533 goto fail;
534 }
535
536 s->base_rate = (double) s->sink_input->sample_spec.rate;
537 s->estimated_rate = (double) s->sink_input->sample_spec.rate;
538 s->avg_estimated_rate = (double) s->sink_input->sample_spec.rate;
539
540 s->sink_input->userdata = s;
541
542 s->sink_input->parent.process_msg = sink_input_process_msg;
543 s->sink_input->pop = sink_input_pop_cb;
544 s->sink_input->process_rewind = sink_input_process_rewind_cb;
545 s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
546 s->sink_input->kill = sink_input_kill;
547 s->sink_input->attach = sink_input_attach;
548 s->sink_input->detach = sink_input_detach;
549 s->sink_input->suspend_within_thread = sink_input_suspend_within_thread;
550
551 pa_sink_input_get_silence(s->sink_input, &silence);
552
553 s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, s->intended_latency/2);
554
555 if (s->intended_latency < s->sink_latency*2)
556 s->intended_latency = s->sink_latency*2;
557
558 s->memblockq = pa_memblockq_new(
559 "module-rtp-recv memblockq",
560 0,
561 MEMBLOCKQ_MAXLENGTH,
562 MEMBLOCKQ_MAXLENGTH,
563 &s->sink_input->sample_spec,
564 pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec),
565 0,
566 0,
567 &silence);
568
569 pa_memblock_unref(silence.memblock);
570
571 if (!(s->rtp_context = pa_rtp_context_new_recv(fd, sdp_info->payload, &s->sdp_info.sample_spec)))
572 goto fail;
573
574 pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
575 u->n_sessions++;
576 PA_LLIST_PREPEND(struct session, s->userdata->sessions, s);
577
578 pa_sink_input_put(s->sink_input);
579
580 pa_log_info("New session '%s'", s->sdp_info.session_name);
581
582 return s;
583
584 fail:
585 pa_xfree(s);
586
587 if (fd >= 0)
588 pa_close(fd);
589
590 return NULL;
591 }
592
session_free(struct session * s)593 static void session_free(struct session *s) {
594 pa_assert(s);
595
596 pa_log_info("Freeing session '%s'", s->sdp_info.session_name);
597
598 pa_sink_input_unlink(s->sink_input);
599 pa_sink_input_unref(s->sink_input);
600
601 PA_LLIST_REMOVE(struct session, s->userdata->sessions, s);
602 pa_assert(s->userdata->n_sessions >= 1);
603 s->userdata->n_sessions--;
604
605 pa_memblockq_free(s->memblockq);
606 pa_sdp_info_destroy(&s->sdp_info);
607 pa_rtp_context_free(s->rtp_context);
608
609 pa_xfree(s);
610 }
611
sap_event_cb(pa_mainloop_api * m,pa_io_event * e,int fd,pa_io_event_flags_t flags,void * userdata)612 static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) {
613 struct userdata *u = userdata;
614 bool goodbye = false;
615 pa_sdp_info info;
616 struct session *s;
617
618 pa_assert(m);
619 pa_assert(e);
620 pa_assert(u);
621 pa_assert(fd == u->sap_context.fd);
622 pa_assert(flags == PA_IO_EVENT_INPUT);
623
624 if (pa_sap_recv(&u->sap_context, &goodbye) < 0)
625 return;
626
627 if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye))
628 return;
629
630 if (goodbye) {
631 pa_hashmap_remove_and_free(u->by_origin, info.origin);
632 pa_sdp_info_destroy(&info);
633 } else {
634
635 if (!(s = pa_hashmap_get(u->by_origin, info.origin))) {
636 if (!session_new(u, &info))
637 pa_sdp_info_destroy(&info);
638
639 } else {
640 struct timeval now;
641 pa_rtclock_get(&now);
642 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
643
644 pa_sdp_info_destroy(&info);
645 }
646 }
647 }
648
check_death_event_cb(pa_mainloop_api * m,pa_time_event * t,const struct timeval * tv,void * userdata)649 static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *tv, void *userdata) {
650 struct session *s, *n;
651 struct userdata *u = userdata;
652 struct timeval now;
653
654 pa_assert(m);
655 pa_assert(t);
656 pa_assert(u);
657
658 pa_rtclock_get(&now);
659
660 pa_log_debug("Checking for dead streams ...");
661
662 for (s = u->sessions; s; s = n) {
663 int k;
664 n = s->next;
665
666 k = pa_atomic_load(&s->timestamp);
667
668 if (k + DEATH_TIMEOUT < now.tv_sec)
669 pa_hashmap_remove_and_free(u->by_origin, s->sdp_info.origin);
670 }
671
672 /* Restart timer */
673 pa_core_rttime_restart(u->module->core, t, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC);
674 }
675
pa__init(pa_module * m)676 int pa__init(pa_module*m) {
677 struct userdata *u;
678 pa_modargs *ma = NULL;
679 struct sockaddr_in sa4;
680 #ifdef HAVE_IPV6
681 struct sockaddr_in6 sa6;
682 #endif
683 struct sockaddr *sa;
684 socklen_t salen;
685 const char *sap_address;
686 uint32_t latency_msec;
687 int fd = -1;
688
689 pa_assert(m);
690
691 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
692 pa_log("failed to parse module arguments");
693 goto fail;
694 }
695
696 sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS);
697
698 if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) {
699 sa4.sin_family = AF_INET;
700 sa4.sin_port = htons(SAP_PORT);
701 sa = (struct sockaddr*) &sa4;
702 salen = sizeof(sa4);
703 #ifdef HAVE_IPV6
704 } else if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) {
705 sa6.sin6_family = AF_INET6;
706 sa6.sin6_port = htons(SAP_PORT);
707 sa = (struct sockaddr*) &sa6;
708 salen = sizeof(sa6);
709 #endif
710 } else {
711 pa_log("Invalid SAP address '%s'", sap_address);
712 goto fail;
713 }
714
715 latency_msec = DEFAULT_LATENCY_MSEC;
716 if (pa_modargs_get_value_u32(ma, "latency_msec", &latency_msec) < 0 || latency_msec < 1 || latency_msec > 300000) {
717 pa_log("Invalid latency specification");
718 goto fail;
719 }
720
721 if ((fd = mcast_socket(sa, salen)) < 0)
722 goto fail;
723
724 m->userdata = u = pa_xnew(struct userdata, 1);
725 u->module = m;
726 u->core = m->core;
727 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
728 u->latency = (pa_usec_t) latency_msec * PA_USEC_PER_MSEC;
729
730 u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u);
731 pa_sap_context_init_recv(&u->sap_context, fd);
732
733 PA_LLIST_HEAD_INIT(struct session, u->sessions);
734 u->n_sessions = 0;
735 u->by_origin = pa_hashmap_new_full(pa_idxset_string_hash_func, pa_idxset_string_compare_func, NULL, (pa_free_cb_t) session_free);
736
737 u->check_death_event = pa_core_rttime_new(m->core, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC, check_death_event_cb, u);
738
739 pa_modargs_free(ma);
740
741 return 0;
742
743 fail:
744 if (ma)
745 pa_modargs_free(ma);
746
747 if (fd >= 0)
748 pa_close(fd);
749
750 return -1;
751 }
752
pa__done(pa_module * m)753 void pa__done(pa_module*m) {
754 struct userdata *u;
755
756 pa_assert(m);
757
758 if (!(u = m->userdata))
759 return;
760
761 if (u->sap_event)
762 m->core->mainloop->io_free(u->sap_event);
763
764 if (u->check_death_event)
765 m->core->mainloop->time_free(u->check_death_event);
766
767 pa_sap_context_destroy(&u->sap_context);
768
769 if (u->by_origin)
770 pa_hashmap_free(u->by_origin);
771
772 pa_xfree(u->sink_name);
773 pa_xfree(u);
774 }
775