1
2 /***
3 This file is part of PulseAudio.
4
5 Copyright 2006 Lennart Poettering
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24
25 #include <stdio.h>
26 #include <sys/socket.h>
27 #include <netinet/in.h>
28 #include <errno.h>
29 #include <string.h>
30 #include <unistd.h>
31 #include <math.h>
32
33 #include <pulse/rtclock.h>
34 #include <pulse/timeval.h>
35 #include <pulse/xmalloc.h>
36
37 #include <pulsecore/core-error.h>
38 #include <pulsecore/module.h>
39 #include <pulsecore/llist.h>
40 #include <pulsecore/sink.h>
41 #include <pulsecore/sink-input.h>
42 #include <pulsecore/memblockq.h>
43 #include <pulsecore/log.h>
44 #include <pulsecore/core-rtclock.h>
45 #include <pulsecore/core-util.h>
46 #include <pulsecore/modargs.h>
47 #include <pulsecore/namereg.h>
48 #include <pulsecore/sample-util.h>
49 #include <pulsecore/macro.h>
50 #include <pulsecore/socket-util.h>
51 #include <pulsecore/atomic.h>
52 #include <pulsecore/once.h>
53 #include <pulsecore/poll.h>
54 #include <pulsecore/arpa-inet.h>
55
56 #include "rtp.h"
57 #include "sdp.h"
58 #include "sap.h"
59
60 PA_MODULE_AUTHOR("Lennart Poettering");
61 PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
62 PA_MODULE_VERSION(PACKAGE_VERSION);
63 PA_MODULE_LOAD_ONCE(false);
64 PA_MODULE_USAGE(
65 "sink=<name of the sink> "
66 "sap_address=<multicast address to listen on> "
67 "latency_msec=<latency in ms> "
68 );
69
70 #define SAP_PORT 9875
71 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
72 #define DEFAULT_LATENCY_MSEC 500
73 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
74 #define MAX_SESSIONS 16
75 #define DEATH_TIMEOUT 20
76 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
77
78 static const char* const valid_modargs[] = {
79 "sink",
80 "sap_address",
81 "latency_msec",
82 NULL
83 };
84
85 struct session {
86 struct userdata *userdata;
87 PA_LLIST_FIELDS(struct session);
88
89 pa_sink_input *sink_input;
90 pa_memblockq *memblockq;
91
92 bool first_packet;
93 uint32_t offset;
94
95 struct pa_sdp_info sdp_info;
96
97 pa_rtp_context *rtp_context;
98
99 pa_rtpoll_item *rtpoll_item;
100
101 pa_atomic_t timestamp;
102
103 pa_usec_t intended_latency;
104 pa_usec_t sink_latency;
105
106 unsigned int base_rate;
107 pa_usec_t last_rate_update;
108 pa_usec_t last_latency;
109 double estimated_rate;
110 double avg_estimated_rate;
111 };
112
113 struct userdata {
114 pa_module *module;
115 pa_core *core;
116
117 pa_sap_context sap_context;
118 pa_io_event* sap_event;
119
120 pa_time_event *check_death_event;
121
122 char *sink_name;
123
124 PA_LLIST_HEAD(struct session, sessions);
125 pa_hashmap *by_origin;
126 int n_sessions;
127
128 pa_usec_t latency;
129 };
130
131 static void session_free(struct session *s);
132
133 /* Called from I/O thread context */
sink_input_process_msg(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)134 static int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
135 struct session *s = PA_SINK_INPUT(o)->userdata;
136
137 switch (code) {
138 case PA_SINK_INPUT_MESSAGE_GET_LATENCY:
139 *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
140
141 /* Fall through, the default handler will add in the extra
142 * latency added by the resampler */
143 break;
144 }
145
146 return pa_sink_input_process_msg(o, code, data, offset, chunk);
147 }
148
149 /* Called from I/O thread context */
sink_input_pop_cb(pa_sink_input * i,size_t length,pa_memchunk * chunk)150 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
151 struct session *s;
152 pa_sink_input_assert_ref(i);
153 pa_assert_se(s = i->userdata);
154
155 if (pa_memblockq_peek(s->memblockq, chunk) < 0)
156 return -1;
157
158 pa_memblockq_drop(s->memblockq, chunk->length);
159
160 return 0;
161 }
162
163 /* Called from I/O thread context */
sink_input_process_rewind_cb(pa_sink_input * i,size_t nbytes)164 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
165 struct session *s;
166
167 pa_sink_input_assert_ref(i);
168 pa_assert_se(s = i->userdata);
169
170 pa_memblockq_rewind(s->memblockq, nbytes);
171 }
172
173 /* Called from I/O thread context */
sink_input_update_max_rewind_cb(pa_sink_input * i,size_t nbytes)174 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
175 struct session *s;
176
177 pa_sink_input_assert_ref(i);
178 pa_assert_se(s = i->userdata);
179
180 pa_memblockq_set_maxrewind(s->memblockq, nbytes);
181 }
182
183 /* Called from main context */
sink_input_kill(pa_sink_input * i)184 static void sink_input_kill(pa_sink_input* i) {
185 struct session *s;
186 pa_sink_input_assert_ref(i);
187 pa_assert_se(s = i->userdata);
188
189 pa_hashmap_remove_and_free(s->userdata->by_origin, s->sdp_info.origin);
190 }
191
192 /* Called from IO context */
sink_input_suspend_within_thread(pa_sink_input * i,bool b)193 static void sink_input_suspend_within_thread(pa_sink_input* i, bool b) {
194 struct session *s;
195 pa_sink_input_assert_ref(i);
196 pa_assert_se(s = i->userdata);
197
198 if (b)
199 pa_memblockq_flush_read(s->memblockq);
200 else
201 s->first_packet = false;
202 }
203
204 /* Called from I/O thread context */
rtpoll_work_cb(pa_rtpoll_item * i)205 static int rtpoll_work_cb(pa_rtpoll_item *i) {
206 pa_memchunk chunk;
207 uint32_t timestamp;
208 int64_t k, j, delta;
209 struct timeval now = { 0, 0 };
210 struct session *s;
211 struct pollfd *p;
212
213 pa_assert_se(s = pa_rtpoll_item_get_work_userdata(i));
214
215 p = pa_rtpoll_item_get_pollfd(i, NULL);
216
217 if (p->revents & (POLLERR|POLLNVAL|POLLHUP|POLLOUT)) {
218 pa_log("poll() signalled bad revents.");
219 return -1;
220 }
221
222 if ((p->revents & POLLIN) == 0)
223 return 0;
224
225 p->revents = 0;
226
227 if (pa_rtp_recv(s->rtp_context, &chunk, s->userdata->module->core->mempool, ×tamp, &now) < 0)
228 return 0;
229
230 if (!PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) {
231 pa_memblock_unref(chunk.memblock);
232 return 0;
233 }
234
235 if (!s->first_packet) {
236 s->first_packet = true;
237 s->offset = timestamp;
238 }
239
240 /* Check whether there was a timestamp overflow */
241 k = (int64_t) timestamp - (int64_t) s->offset;
242 j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) timestamp;
243
244 if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
245 delta = k;
246 else
247 delta = j;
248
249 pa_memblockq_seek(s->memblockq, delta * (int64_t) pa_rtp_context_get_frame_size(s->rtp_context), PA_SEEK_RELATIVE,
250 true);
251
252 if (now.tv_sec == 0) {
253 PA_ONCE_BEGIN {
254 pa_log_warn("Using artificial time instead of timestamp");
255 } PA_ONCE_END;
256 pa_rtclock_get(&now);
257 } else
258 pa_rtclock_from_wallclock(&now);
259
260 if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
261 pa_log_warn("Queue overrun");
262 pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, true);
263 }
264
265 /* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
266
267 pa_memblock_unref(chunk.memblock);
268
269 /* The next timestamp we expect */
270 s->offset = timestamp + (uint32_t) (chunk.length / pa_rtp_context_get_frame_size(s->rtp_context));
271
272 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
273
274 if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) {
275 pa_usec_t wi, ri, render_delay, sink_delay = 0, latency;
276 uint32_t current_rate = s->sink_input->sample_spec.rate;
277 uint32_t new_rate;
278 double estimated_rate, alpha = 0.02;
279
280 pa_log_debug("Updating sample rate");
281
282 wi = pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec);
283 ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec);
284
285 pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi, (unsigned long) ri);
286
287 sink_delay = pa_sink_get_latency_within_thread(s->sink_input->sink, false);
288 sink_delay += pa_resampler_get_delay_usec(s->sink_input->thread_info.resampler);
289 render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec);
290
291 if (ri > render_delay+sink_delay)
292 ri -= render_delay+sink_delay;
293 else
294 ri = 0;
295
296 if (wi < ri)
297 latency = 0;
298 else
299 latency = wi - ri;
300
301 pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double) s->intended_latency/PA_USEC_PER_MSEC);
302
303 /* The buffer is filling with some unknown rate R̂ samples/second. If the rate of reading in
304 * the last T seconds was Rⁿ, then the increase in buffer latency ΔLⁿ = Lⁿ - Lⁿ⁻ⁱ in that
305 * same period is ΔLⁿ = (TR̂ - TRⁿ) / R̂, giving the estimated target rate
306 * T
307 * R̂ = ─────────────── Rⁿ . (1)
308 * T - (Lⁿ - Lⁿ⁻ⁱ)
309 *
310 * Setting the sample rate to R̂ results in the latency being constant (if the estimate of R̂
311 * is correct). But there is also the requirement to keep the buffer at a predefined target
312 * latency L̂. So instead of setting Rⁿ⁺ⁱ to R̂ immediately, the strategy will be to reduce R
313 * from Rⁿ⁺ⁱ to R̂ in a steps of T seconds, where Rⁿ⁺ⁱ is chosen such that in the total time
314 * aT the latency is reduced from Lⁿ to L̂. This strategy translates to the requirements
315 * ₐ R̂ - Rⁿ⁺ʲ a-j+1 j-1
316 * Σ T ────────── = L̂ - Lⁿ with Rⁿ⁺ʲ = ───── Rⁿ⁺ⁱ + ───── R̂ .
317 * ʲ⁼ⁱ R̂ a a
318 * Solving for Rⁿ⁺ⁱ gives
319 * T - ²∕ₐ₊₁(L̂ - Lⁿ)
320 * Rⁿ⁺ⁱ = ───────────────── R̂ . (2)
321 * T
322 * In the code below a = 7 is used.
323 *
324 * Equation (1) is not directly used in (2), but instead an exponentially weighted average
325 * of the estimated rate R̂ is used. This average R̅ is defined as
326 * R̅ⁿ = α R̂ⁿ + (1-α) R̅ⁿ⁻ⁱ .
327 * Because it is difficult to find a fixed value for the coefficient α such that the
328 * averaging is without significant lag but oscillations are filtered out, a heuristic is
329 * used. When the successive estimates R̂ⁿ do not change much then α→1, but when there is a
330 * sudden spike in the estimated rate α→0, such that the deviation is given little weight.
331 */
332 estimated_rate = (double) current_rate * (double) RATE_UPDATE_INTERVAL / (double) (RATE_UPDATE_INTERVAL + s->last_latency - latency);
333 if (fabs(s->estimated_rate - s->avg_estimated_rate) > 1) {
334 double ratio = (estimated_rate + s->estimated_rate - 2*s->avg_estimated_rate) / (s->estimated_rate - s->avg_estimated_rate);
335 alpha = PA_CLAMP(2 * (ratio + fabs(ratio)) / (4 + ratio*ratio), 0.02, 0.8);
336 }
337 s->avg_estimated_rate = alpha * estimated_rate + (1-alpha) * s->avg_estimated_rate;
338 s->estimated_rate = estimated_rate;
339 pa_log_debug("Estimated target rate: %.0f Hz, using average of %.0f Hz (α=%.3f)", estimated_rate, s->avg_estimated_rate, alpha);
340 new_rate = (uint32_t) ((double) (RATE_UPDATE_INTERVAL + latency/4 - s->intended_latency/4) / (double) RATE_UPDATE_INTERVAL * s->avg_estimated_rate);
341 s->last_latency = latency;
342
343 if (new_rate < (uint32_t) (s->base_rate*0.8) || new_rate > (uint32_t) (s->base_rate*1.25)) {
344 pa_log_warn("Sample rates too different, not adjusting (%u vs. %u).", s->base_rate, new_rate);
345 new_rate = s->base_rate;
346 } else {
347 if (s->base_rate < new_rate + 20 && new_rate < s->base_rate + 20)
348 new_rate = s->base_rate;
349 /* Do the adjustment in small steps; 2‰ can be considered inaudible */
350 if (new_rate < (uint32_t) (current_rate*0.998) || new_rate > (uint32_t) (current_rate*1.002)) {
351 pa_log_info("New rate of %u Hz not within 2‰ of %u Hz, forcing smaller adjustment", new_rate, current_rate);
352 new_rate = PA_CLAMP(new_rate, (uint32_t) (current_rate*0.998), (uint32_t) (current_rate*1.002));
353 }
354 }
355 s->sink_input->sample_spec.rate = new_rate;
356
357 pa_assert(pa_sample_spec_valid(&s->sink_input->sample_spec));
358
359 pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate);
360
361 pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s->sink_input->sample_spec.rate);
362
363 s->last_rate_update = pa_timeval_load(&now);
364 }
365
366 if (pa_memblockq_is_readable(s->memblockq) &&
367 s->sink_input->thread_info.underrun_for > 0) {
368 pa_log_debug("Requesting rewind due to end of underrun");
369 pa_sink_input_request_rewind(s->sink_input,
370 (size_t) (s->sink_input->thread_info.underrun_for == (uint64_t) -1 ? 0 : s->sink_input->thread_info.underrun_for),
371 false, true, false);
372 }
373
374 return 1;
375 }
376
377 /* Called from I/O thread context */
sink_input_attach(pa_sink_input * i)378 static void sink_input_attach(pa_sink_input *i) {
379 struct session *s;
380
381 pa_sink_input_assert_ref(i);
382 pa_assert_se(s = i->userdata);
383
384 pa_assert(!s->rtpoll_item);
385 s->rtpoll_item = pa_rtp_context_get_rtpoll_item(s->rtp_context, i->sink->thread_info.rtpoll);
386
387 pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb, s);
388 }
389
390 /* Called from I/O thread context */
sink_input_detach(pa_sink_input * i)391 static void sink_input_detach(pa_sink_input *i) {
392 struct session *s;
393 pa_sink_input_assert_ref(i);
394 pa_assert_se(s = i->userdata);
395
396 pa_assert(s->rtpoll_item);
397 pa_rtpoll_item_free(s->rtpoll_item);
398 s->rtpoll_item = NULL;
399 }
400
mcast_socket(const struct sockaddr * sa,socklen_t salen)401 static int mcast_socket(const struct sockaddr* sa, socklen_t salen) {
402 int af, fd = -1, r, one;
403
404 pa_assert(sa);
405 pa_assert(salen > 0);
406
407 af = sa->sa_family;
408 if ((fd = pa_socket_cloexec(af, SOCK_DGRAM, 0)) < 0) {
409 pa_log("Failed to create socket: %s", pa_cstrerror(errno));
410 goto fail;
411 }
412
413 pa_make_udp_socket_low_delay(fd);
414
415 #ifdef SO_TIMESTAMP
416 one = 1;
417 if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) {
418 pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno));
419 goto fail;
420 }
421 #else
422 pa_log("SO_TIMESTAMP unsupported on this platform");
423 goto fail;
424 #endif
425
426 one = 1;
427 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
428 pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno));
429 goto fail;
430 }
431
432 r = 0;
433 if (af == AF_INET) {
434 /* IPv4 multicast addresses are in the 224.0.0.0-239.255.255.255 range */
435 static const uint32_t ipv4_mcast_mask = 0xe0000000;
436
437 if ((ntohl(((const struct sockaddr_in*) sa)->sin_addr.s_addr) & ipv4_mcast_mask) == ipv4_mcast_mask) {
438 struct ip_mreq mr4;
439 memset(&mr4, 0, sizeof(mr4));
440 mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr;
441 r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
442 }
443 #ifdef HAVE_IPV6
444 } else if (af == AF_INET6) {
445 /* IPv6 multicast addresses have 255 as the most significant byte */
446 if (((const struct sockaddr_in6*) sa)->sin6_addr.s6_addr[0] == 0xff) {
447 struct ipv6_mreq mr6;
448 memset(&mr6, 0, sizeof(mr6));
449 mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr;
450 r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6));
451 }
452 #endif
453 } else
454 pa_assert_not_reached();
455
456 if (r < 0) {
457 pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno));
458 goto fail;
459 }
460
461 if (bind(fd, sa, salen) < 0) {
462 pa_log("bind() failed: %s", pa_cstrerror(errno));
463 goto fail;
464 }
465
466 return fd;
467
468 fail:
469 if (fd >= 0)
470 close(fd);
471
472 return -1;
473 }
474
session_new(struct userdata * u,const pa_sdp_info * sdp_info)475 static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) {
476 struct session *s = NULL;
477 pa_sink *sink;
478 int fd = -1;
479 pa_memchunk silence;
480 pa_sink_input_new_data data;
481 struct timeval now;
482
483 pa_assert(u);
484 pa_assert(sdp_info);
485
486 if (u->n_sessions >= MAX_SESSIONS) {
487 pa_log("Session limit reached.");
488 goto fail;
489 }
490
491 if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK))) {
492 pa_log("Sink does not exist.");
493 goto fail;
494 }
495
496 pa_rtclock_get(&now);
497
498 s = pa_xnew0(struct session, 1);
499 s->userdata = u;
500 s->first_packet = false;
501 s->sdp_info = *sdp_info;
502 s->rtpoll_item = NULL;
503 s->intended_latency = u->latency;
504 s->last_rate_update = pa_timeval_load(&now);
505 s->last_latency = u->latency;
506 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
507
508 if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0)
509 goto fail;
510
511 pa_sink_input_new_data_init(&data);
512 pa_sink_input_new_data_set_sink(&data, sink, false, true);
513 data.driver = __FILE__;
514 pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream");
515 pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME,
516 "RTP Stream%s%s%s",
517 sdp_info->session_name ? " (" : "",
518 sdp_info->session_name ? sdp_info->session_name : "",
519 sdp_info->session_name ? ")" : "");
520
521 if (sdp_info->session_name)
522 pa_proplist_sets(data.proplist, "rtp.session", sdp_info->session_name);
523 pa_proplist_sets(data.proplist, "rtp.origin", sdp_info->origin);
524 pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload);
525 data.module = u->module;
526 pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec);
527 data.flags = PA_SINK_INPUT_VARIABLE_RATE;
528
529 pa_sink_input_new(&s->sink_input, u->module->core, &data);
530 pa_sink_input_new_data_done(&data);
531
532 if (!s->sink_input) {
533 pa_log("Failed to create sink input.");
534 goto fail;
535 }
536
537 s->base_rate = (double) s->sink_input->sample_spec.rate;
538 s->estimated_rate = (double) s->sink_input->sample_spec.rate;
539 s->avg_estimated_rate = (double) s->sink_input->sample_spec.rate;
540
541 s->sink_input->userdata = s;
542
543 s->sink_input->parent.process_msg = sink_input_process_msg;
544 s->sink_input->pop = sink_input_pop_cb;
545 s->sink_input->process_rewind = sink_input_process_rewind_cb;
546 s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
547 s->sink_input->kill = sink_input_kill;
548 s->sink_input->attach = sink_input_attach;
549 s->sink_input->detach = sink_input_detach;
550 s->sink_input->suspend_within_thread = sink_input_suspend_within_thread;
551
552 pa_sink_input_get_silence(s->sink_input, &silence);
553
554 s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, s->intended_latency/2);
555
556 if (s->intended_latency < s->sink_latency*2)
557 s->intended_latency = s->sink_latency*2;
558
559 s->memblockq = pa_memblockq_new(
560 "module-rtp-recv memblockq",
561 0,
562 MEMBLOCKQ_MAXLENGTH,
563 MEMBLOCKQ_MAXLENGTH,
564 &s->sink_input->sample_spec,
565 pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec),
566 0,
567 0,
568 &silence);
569
570 pa_memblock_unref(silence.memblock);
571
572 if (!(s->rtp_context = pa_rtp_context_new_recv(fd, sdp_info->payload, &s->sdp_info.sample_spec, sdp_info->enable_opus)))
573 goto fail;
574
575 pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
576 u->n_sessions++;
577 PA_LLIST_PREPEND(struct session, s->userdata->sessions, s);
578
579 pa_sink_input_put(s->sink_input);
580
581 pa_log_info("New session '%s'", s->sdp_info.session_name);
582
583 return s;
584
585 fail:
586 pa_xfree(s);
587
588 if (fd >= 0)
589 pa_close(fd);
590
591 return NULL;
592 }
593
session_free(struct session * s)594 static void session_free(struct session *s) {
595 pa_assert(s);
596
597 pa_log_info("Freeing session '%s'", s->sdp_info.session_name);
598
599 pa_sink_input_unlink(s->sink_input);
600 pa_sink_input_unref(s->sink_input);
601
602 PA_LLIST_REMOVE(struct session, s->userdata->sessions, s);
603 pa_assert(s->userdata->n_sessions >= 1);
604 s->userdata->n_sessions--;
605
606 pa_memblockq_free(s->memblockq);
607 pa_sdp_info_destroy(&s->sdp_info);
608 pa_rtp_context_free(s->rtp_context);
609
610 pa_xfree(s);
611 }
612
sap_event_cb(pa_mainloop_api * m,pa_io_event * e,int fd,pa_io_event_flags_t flags,void * userdata)613 static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) {
614 struct userdata *u = userdata;
615 bool goodbye = false;
616 pa_sdp_info info;
617 struct session *s;
618
619 pa_assert(m);
620 pa_assert(e);
621 pa_assert(u);
622 pa_assert(fd == u->sap_context.fd);
623 pa_assert(flags == PA_IO_EVENT_INPUT);
624
625 if (pa_sap_recv(&u->sap_context, &goodbye) < 0)
626 return;
627
628 if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye))
629 return;
630
631 if (goodbye) {
632 pa_hashmap_remove_and_free(u->by_origin, info.origin);
633 pa_sdp_info_destroy(&info);
634 } else {
635
636 if (!(s = pa_hashmap_get(u->by_origin, info.origin))) {
637 if (!session_new(u, &info))
638 pa_sdp_info_destroy(&info);
639
640 } else {
641 struct timeval now;
642 pa_rtclock_get(&now);
643 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
644
645 pa_sdp_info_destroy(&info);
646 }
647 }
648 }
649
check_death_event_cb(pa_mainloop_api * m,pa_time_event * t,const struct timeval * tv,void * userdata)650 static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *tv, void *userdata) {
651 struct session *s, *n;
652 struct userdata *u = userdata;
653 struct timeval now;
654
655 pa_assert(m);
656 pa_assert(t);
657 pa_assert(u);
658
659 pa_rtclock_get(&now);
660
661 pa_log_debug("Checking for dead streams ...");
662
663 for (s = u->sessions; s; s = n) {
664 int k;
665 n = s->next;
666
667 k = pa_atomic_load(&s->timestamp);
668
669 if (k + DEATH_TIMEOUT < now.tv_sec)
670 pa_hashmap_remove_and_free(u->by_origin, s->sdp_info.origin);
671 }
672
673 /* Restart timer */
674 pa_core_rttime_restart(u->module->core, t, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC);
675 }
676
pa__init(pa_module * m)677 int pa__init(pa_module*m) {
678 struct userdata *u;
679 pa_modargs *ma = NULL;
680 struct sockaddr_in sa4;
681 #ifdef HAVE_IPV6
682 struct sockaddr_in6 sa6;
683 #endif
684 struct sockaddr *sa;
685 socklen_t salen;
686 const char *sap_address;
687 uint32_t latency_msec;
688 int fd = -1;
689
690 pa_assert(m);
691
692 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
693 pa_log("failed to parse module arguments");
694 goto fail;
695 }
696
697 sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS);
698
699 if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) {
700 sa4.sin_family = AF_INET;
701 sa4.sin_port = htons(SAP_PORT);
702 sa = (struct sockaddr*) &sa4;
703 salen = sizeof(sa4);
704 #ifdef HAVE_IPV6
705 } else if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) {
706 sa6.sin6_family = AF_INET6;
707 sa6.sin6_port = htons(SAP_PORT);
708 sa = (struct sockaddr*) &sa6;
709 salen = sizeof(sa6);
710 #endif
711 } else {
712 pa_log("Invalid SAP address '%s'", sap_address);
713 goto fail;
714 }
715
716 latency_msec = DEFAULT_LATENCY_MSEC;
717 if (pa_modargs_get_value_u32(ma, "latency_msec", &latency_msec) < 0 || latency_msec < 1 || latency_msec > 300000) {
718 pa_log("Invalid latency specification");
719 goto fail;
720 }
721
722 if ((fd = mcast_socket(sa, salen)) < 0)
723 goto fail;
724
725 m->userdata = u = pa_xnew(struct userdata, 1);
726 u->module = m;
727 u->core = m->core;
728 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
729 u->latency = (pa_usec_t) latency_msec * PA_USEC_PER_MSEC;
730
731 u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u);
732 pa_sap_context_init_recv(&u->sap_context, fd);
733
734 PA_LLIST_HEAD_INIT(struct session, u->sessions);
735 u->n_sessions = 0;
736 u->by_origin = pa_hashmap_new_full(pa_idxset_string_hash_func, pa_idxset_string_compare_func, NULL, (pa_free_cb_t) session_free);
737
738 u->check_death_event = pa_core_rttime_new(m->core, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC, check_death_event_cb, u);
739
740 pa_modargs_free(ma);
741
742 return 0;
743
744 fail:
745 if (ma)
746 pa_modargs_free(ma);
747
748 if (fd >= 0)
749 pa_close(fd);
750
751 return -1;
752 }
753
pa__done(pa_module * m)754 void pa__done(pa_module*m) {
755 struct userdata *u;
756
757 pa_assert(m);
758
759 if (!(u = m->userdata))
760 return;
761
762 if (u->sap_event)
763 m->core->mainloop->io_free(u->sap_event);
764
765 if (u->check_death_event)
766 m->core->mainloop->time_free(u->check_death_event);
767
768 pa_sap_context_destroy(&u->sap_context);
769
770 if (u->by_origin)
771 pa_hashmap_free(u->by_origin);
772
773 pa_xfree(u->sink_name);
774 pa_xfree(u);
775 }
776