1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2006 Lennart Poettering
5
6 PulseAudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as published
8 by the Free Software Foundation; either version 2.1 of the License,
9 or (at your option) any later version.
10
11 PulseAudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public License
17 along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
18 ***/
19
20 #ifdef HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23
24 #include <stdlib.h>
25 #include <string.h>
26 #include <errno.h>
27 #include <unistd.h>
28 #include <sys/ioctl.h>
29
30 #ifdef HAVE_SYS_FILIO_H
31 #include <sys/filio.h>
32 #endif
33
34 #ifdef HAVE_SYS_UIO_H
35 #include <sys/uio.h>
36 #endif
37
38 #include <pulsecore/core-error.h>
39 #include <pulsecore/log.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-util.h>
42 #include <pulsecore/arpa-inet.h>
43 #include <pulsecore/poll.h>
44
45 #include "rtp.h"
46
47 typedef struct pa_rtp_context {
48 int fd;
49 uint16_t sequence;
50 uint32_t timestamp;
51 uint32_t ssrc;
52 uint8_t payload;
53 size_t frame_size;
54 size_t mtu;
55
56 uint8_t *recv_buf;
57 size_t recv_buf_size;
58 pa_memchunk memchunk;
59 } pa_rtp_context;
60
pa_rtp_context_new_send(int fd,uint8_t payload,size_t mtu,const pa_sample_spec * ss,bool enable_opus)61 pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, const pa_sample_spec *ss, bool enable_opus) {
62 pa_rtp_context *c;
63
64 pa_assert(fd >= 0);
65
66 pa_log_info("Initialising native RTP backend for send");
67
68 c = pa_xnew0(pa_rtp_context, 1);
69
70 c->fd = fd;
71 c->sequence = (uint16_t) (rand()*rand());
72 c->timestamp = 0;
73 c->ssrc = (uint32_t) (rand()*rand());
74 c->payload = (uint8_t) (payload & 127U);
75 c->frame_size = pa_frame_size(ss);
76 c->mtu = mtu;
77
78 c->recv_buf = NULL;
79 c->recv_buf_size = 0;
80 pa_memchunk_reset(&c->memchunk);
81
82 return c;
83 }
84
85 #define MAX_IOVECS 16
86
pa_rtp_send(pa_rtp_context * c,pa_memblockq * q)87 int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) {
88 struct iovec iov[MAX_IOVECS];
89 pa_memblock* mb[MAX_IOVECS];
90 int iov_idx = 1;
91 size_t n = 0;
92
93 pa_assert(c);
94 pa_assert(q);
95
96 if (pa_memblockq_get_length(q) < c->mtu)
97 return 0;
98
99 for (;;) {
100 int r;
101 pa_memchunk chunk;
102
103 pa_memchunk_reset(&chunk);
104
105 if ((r = pa_memblockq_peek(q, &chunk)) >= 0) {
106
107 size_t k = n + chunk.length > c->mtu ? c->mtu - n : chunk.length;
108
109 pa_assert(chunk.memblock);
110
111 iov[iov_idx].iov_base = pa_memblock_acquire_chunk(&chunk);
112 iov[iov_idx].iov_len = k;
113 mb[iov_idx] = chunk.memblock;
114 iov_idx ++;
115
116 n += k;
117 pa_memblockq_drop(q, k);
118 }
119
120 pa_assert(n % c->frame_size == 0);
121
122 if (r < 0 || n >= c->mtu || iov_idx >= MAX_IOVECS) {
123 uint32_t header[3];
124 struct msghdr m;
125 ssize_t k;
126 int i;
127
128 if (n > 0) {
129 header[0] = htonl(((uint32_t) 2 << 30) | ((uint32_t) c->payload << 16) | ((uint32_t) c->sequence));
130 header[1] = htonl(c->timestamp);
131 header[2] = htonl(c->ssrc);
132
133 iov[0].iov_base = (void*)header;
134 iov[0].iov_len = sizeof(header);
135
136 m.msg_name = NULL;
137 m.msg_namelen = 0;
138 m.msg_iov = iov;
139 m.msg_iovlen = (size_t) iov_idx;
140 m.msg_control = NULL;
141 m.msg_controllen = 0;
142 m.msg_flags = 0;
143
144 k = sendmsg(c->fd, &m, MSG_DONTWAIT);
145
146 for (i = 1; i < iov_idx; i++) {
147 pa_memblock_release(mb[i]);
148 pa_memblock_unref(mb[i]);
149 }
150
151 c->sequence++;
152 } else
153 k = 0;
154
155 c->timestamp += (unsigned) (n/c->frame_size);
156
157 if (k < 0) {
158 if (errno != EAGAIN && errno != EINTR) /* If the queue is full, just ignore it */
159 pa_log("sendmsg() failed: %s", pa_cstrerror(errno));
160 return -1;
161 }
162
163 if (r < 0 || pa_memblockq_get_length(q) < c->mtu)
164 break;
165
166 n = 0;
167 iov_idx = 1;
168 }
169 }
170
171 return 0;
172 }
173
pa_rtp_context_new_recv(int fd,uint8_t payload,const pa_sample_spec * ss,bool enable_opus)174 pa_rtp_context* pa_rtp_context_new_recv(int fd, uint8_t payload, const pa_sample_spec *ss, bool enable_opus) {
175 pa_rtp_context *c;
176
177 pa_log_info("Initialising native RTP backend for receive");
178
179 c = pa_xnew0(pa_rtp_context, 1);
180
181 c->fd = fd;
182 c->payload = payload;
183 c->frame_size = pa_frame_size(ss);
184
185 c->recv_buf_size = 2000;
186 c->recv_buf = pa_xmalloc(c->recv_buf_size);
187 pa_memchunk_reset(&c->memchunk);
188
189 return c;
190 }
191
pa_rtp_recv(pa_rtp_context * c,pa_memchunk * chunk,pa_mempool * pool,uint32_t * rtp_tstamp,struct timeval * tstamp)192 int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, uint32_t *rtp_tstamp, struct timeval *tstamp) {
193 int size;
194 size_t audio_length;
195 size_t metadata_length;
196 struct msghdr m;
197 struct cmsghdr *cm;
198 struct iovec iov;
199 uint32_t header;
200 uint32_t ssrc;
201 uint8_t payload;
202 unsigned cc;
203 uint8_t aux[1024];
204 bool found_tstamp = false;
205
206 pa_assert(c);
207 pa_assert(chunk);
208
209 pa_memchunk_reset(chunk);
210
211 /* FIONREAD works on both BSD and Linux, but they do something different:
212 * - on Linux it returns the amount of bytes in the next datagram
213 * - on BSDs it returns the total amount of bytes in the output buffer; this can be
214 * more than one datagram and includes headers
215 *
216 * So the result will be a lower bound of how many bytes are needed, but might not be
217 * the exact size of the buffer size needed.
218 */
219
220 if (ioctl(c->fd, FIONREAD, &size) < 0) {
221 pa_log_warn("FIONREAD failed: %s", pa_cstrerror(errno));
222 goto fail;
223 }
224
225 if (size <= 0) {
226 /* size can be 0 due to any of the following reasons:
227 *
228 * 1. Somebody sent us a perfectly valid zero-length UDP packet.
229 * 2. Somebody sent us a UDP packet with a bad CRC.
230 *
231 * It is unknown whether size can actually be less than zero.
232 *
233 * In the first case, the packet has to be read out, otherwise the
234 * kernel will tell us again and again about it, thus preventing
235 * reception of any further packets. So let's just read it out
236 * now and discard it later, when comparing the number of bytes
237 * received (0) with the number of bytes wanted (1, see below).
238 *
239 * In the second case, recvmsg() will fail, thus allowing us to
240 * return the error.
241 *
242 * Just to avoid passing zero-sized memchunks and NULL pointers to
243 * recvmsg(), let's force allocation of at least one byte by setting
244 * size to 1.
245 */
246 size = 1;
247 }
248
249 /* Since size is a lower bound, also constrain it to an upper bound */
250 size = PA_MIN(size, 1<<16);
251
252 if (c->recv_buf_size < (size_t) size) {
253 do
254 c->recv_buf_size *= 2;
255 while (c->recv_buf_size < (size_t) size);
256
257 c->recv_buf = pa_xrealloc(c->recv_buf, c->recv_buf_size);
258 }
259
260 pa_assert(c->recv_buf_size >= (size_t) size);
261
262 iov.iov_base = c->recv_buf;
263 iov.iov_len = (size_t) size;
264
265 m.msg_name = NULL;
266 m.msg_namelen = 0;
267 m.msg_iov = &iov;
268 m.msg_iovlen = 1;
269 m.msg_control = aux;
270 m.msg_controllen = sizeof(aux);
271 m.msg_flags = 0;
272
273 size = recvmsg(c->fd, &m, 0);
274
275 if (size < 0) {
276 if (errno != EAGAIN && errno != EINTR)
277 pa_log_warn("recvmsg() failed: %s", pa_cstrerror(errno));
278 goto fail;
279 }
280
281 if (size < 12) {
282 pa_log_warn("RTP packet too short.");
283 goto fail;
284 }
285
286 memcpy(&header, iov.iov_base, sizeof(uint32_t));
287 memcpy(rtp_tstamp, (uint8_t*) iov.iov_base + 4, sizeof(uint32_t));
288 memcpy(&ssrc, (uint8_t*) iov.iov_base + 8, sizeof(uint32_t));
289
290 header = ntohl(header);
291 *rtp_tstamp = ntohl(*rtp_tstamp);
292 ssrc = ntohl(c->ssrc);
293
294 if ((header >> 30) != 2) {
295 pa_log_warn("Unsupported RTP version.");
296 goto fail;
297 }
298
299 if ((header >> 29) & 1) {
300 pa_log_warn("RTP padding not supported.");
301 goto fail;
302 }
303
304 if ((header >> 28) & 1) {
305 pa_log_warn("RTP header extensions not supported.");
306 goto fail;
307 }
308
309 if (ssrc != c->ssrc) {
310 pa_log_debug("Got unexpected SSRC");
311 goto fail;
312 }
313
314 cc = (header >> 24) & 0xF;
315 payload = (uint8_t) ((header >> 16) & 127U);
316 c->sequence = (uint16_t) (header & 0xFFFFU);
317
318 metadata_length = 12 + cc * 4;
319
320 if (payload != c->payload) {
321 pa_log_debug("Got unexpected payload: %u", payload);
322 goto fail;
323 }
324
325 if (metadata_length > (unsigned) size) {
326 pa_log_warn("RTP packet too short. (CSRC)");
327 goto fail;
328 }
329
330 audio_length = size - metadata_length;
331
332 if (audio_length % c->frame_size != 0) {
333 pa_log_warn("Bad RTP packet size.");
334 goto fail;
335 }
336
337 if (c->memchunk.length < (unsigned) audio_length) {
338 size_t l;
339
340 if (c->memchunk.memblock)
341 pa_memblock_unref(c->memchunk.memblock);
342
343 l = PA_MAX((size_t) audio_length, pa_mempool_block_size_max(pool));
344
345 c->memchunk.memblock = pa_memblock_new(pool, l);
346 c->memchunk.index = 0;
347 c->memchunk.length = pa_memblock_get_length(c->memchunk.memblock);
348 }
349
350 memcpy(pa_memblock_acquire_chunk(&c->memchunk), c->recv_buf + metadata_length, audio_length);
351 pa_memblock_release(c->memchunk.memblock);
352
353 chunk->memblock = pa_memblock_ref(c->memchunk.memblock);
354 chunk->index = c->memchunk.index;
355 chunk->length = audio_length;
356
357 c->memchunk.index += audio_length;
358 c->memchunk.length -= audio_length;
359
360 if (c->memchunk.length <= 0) {
361 pa_memblock_unref(c->memchunk.memblock);
362 pa_memchunk_reset(&c->memchunk);
363 }
364
365 for (cm = CMSG_FIRSTHDR(&m); cm; cm = CMSG_NXTHDR(&m, cm))
366 if (cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_TIMESTAMP) {
367 memcpy(tstamp, CMSG_DATA(cm), sizeof(struct timeval));
368 found_tstamp = true;
369 break;
370 }
371
372 if (!found_tstamp) {
373 pa_log_warn("Couldn't find SCM_TIMESTAMP data in auxiliary recvmsg() data!");
374 pa_zero(*tstamp);
375 }
376
377 return 0;
378
379 fail:
380 if (chunk->memblock)
381 pa_memblock_unref(chunk->memblock);
382
383 return -1;
384 }
385
pa_rtp_context_free(pa_rtp_context * c)386 void pa_rtp_context_free(pa_rtp_context *c) {
387 pa_assert(c);
388
389 pa_assert_se(pa_close(c->fd) == 0);
390
391 if (c->memchunk.memblock)
392 pa_memblock_unref(c->memchunk.memblock);
393
394 pa_xfree(c->recv_buf);
395 pa_xfree(c);
396 }
397
pa_rtp_context_get_frame_size(pa_rtp_context * c)398 size_t pa_rtp_context_get_frame_size(pa_rtp_context *c) {
399 return c->frame_size;
400 }
401
pa_rtp_context_get_rtpoll_item(pa_rtp_context * c,pa_rtpoll * rtpoll)402 pa_rtpoll_item* pa_rtp_context_get_rtpoll_item(pa_rtp_context *c, pa_rtpoll *rtpoll) {
403 pa_rtpoll_item *item;
404 struct pollfd *p;
405
406 item = pa_rtpoll_item_new(rtpoll, PA_RTPOLL_LATE, 1);
407
408 p = pa_rtpoll_item_get_pollfd(item, NULL);
409 p->fd = c->fd;
410 p->events = POLLIN;
411 p->revents = 0;
412
413 return item;
414 }
415