• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /***
2   This file is part of PulseAudio.
3 
4   Copyright 2006 Lennart Poettering
5 
6   PulseAudio is free software; you can redistribute it and/or modify
7   it under the terms of the GNU Lesser General Public License as published
8   by the Free Software Foundation; either version 2.1 of the License,
9   or (at your option) any later version.
10 
11   PulseAudio is distributed in the hope that it will be useful, but
12   WITHOUT ANY WARRANTY; without even the implied warranty of
13   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14   General Public License for more details.
15 
16   You should have received a copy of the GNU Lesser General Public License
17   along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
18 ***/
19 
20 #ifdef HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23 
24 #include <stdlib.h>
25 #include <string.h>
26 #include <errno.h>
27 #include <unistd.h>
28 #include <sys/ioctl.h>
29 
30 #ifdef HAVE_SYS_FILIO_H
31 #include <sys/filio.h>
32 #endif
33 
34 #ifdef HAVE_SYS_UIO_H
35 #include <sys/uio.h>
36 #endif
37 
38 #include <pulsecore/core-error.h>
39 #include <pulsecore/log.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-util.h>
42 #include <pulsecore/arpa-inet.h>
43 #include <pulsecore/poll.h>
44 
45 #include "rtp.h"
46 
47 typedef struct pa_rtp_context {
48     int fd;
49     uint16_t sequence;
50     uint32_t timestamp;
51     uint32_t ssrc;
52     uint8_t payload;
53     size_t frame_size;
54     size_t mtu;
55 
56     uint8_t *recv_buf;
57     size_t recv_buf_size;
58     pa_memchunk memchunk;
59 } pa_rtp_context;
60 
pa_rtp_context_new_send(int fd,uint8_t payload,size_t mtu,const pa_sample_spec * ss,bool enable_opus)61 pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, const pa_sample_spec *ss, bool enable_opus) {
62     pa_rtp_context *c;
63 
64     pa_assert(fd >= 0);
65 
66     pa_log_info("Initialising native RTP backend for send");
67 
68     c = pa_xnew0(pa_rtp_context, 1);
69 
70     c->fd = fd;
71     c->sequence = (uint16_t) (rand()*rand());
72     c->timestamp = 0;
73     c->ssrc = (uint32_t) (rand()*rand());
74     c->payload = (uint8_t) (payload & 127U);
75     c->frame_size = pa_frame_size(ss);
76     c->mtu = mtu;
77 
78     c->recv_buf = NULL;
79     c->recv_buf_size = 0;
80     pa_memchunk_reset(&c->memchunk);
81 
82     return c;
83 }
84 
85 #define MAX_IOVECS 16
86 
pa_rtp_send(pa_rtp_context * c,pa_memblockq * q)87 int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) {
88     struct iovec iov[MAX_IOVECS];
89     pa_memblock* mb[MAX_IOVECS];
90     int iov_idx = 1;
91     size_t n = 0;
92 
93     pa_assert(c);
94     pa_assert(q);
95 
96     if (pa_memblockq_get_length(q) < c->mtu)
97         return 0;
98 
99     for (;;) {
100         int r;
101         pa_memchunk chunk;
102 
103         pa_memchunk_reset(&chunk);
104 
105         if ((r = pa_memblockq_peek(q, &chunk)) >= 0) {
106 
107             size_t k = n + chunk.length > c->mtu ? c->mtu - n : chunk.length;
108 
109             pa_assert(chunk.memblock);
110 
111             iov[iov_idx].iov_base = pa_memblock_acquire_chunk(&chunk);
112             iov[iov_idx].iov_len = k;
113             mb[iov_idx] = chunk.memblock;
114             iov_idx ++;
115 
116             n += k;
117             pa_memblockq_drop(q, k);
118         }
119 
120         pa_assert(n % c->frame_size == 0);
121 
122         if (r < 0 || n >= c->mtu || iov_idx >= MAX_IOVECS) {
123             uint32_t header[3];
124             struct msghdr m;
125             ssize_t k;
126             int i;
127 
128             if (n > 0) {
129                 header[0] = htonl(((uint32_t) 2 << 30) | ((uint32_t) c->payload << 16) | ((uint32_t) c->sequence));
130                 header[1] = htonl(c->timestamp);
131                 header[2] = htonl(c->ssrc);
132 
133                 iov[0].iov_base = (void*)header;
134                 iov[0].iov_len = sizeof(header);
135 
136                 m.msg_name = NULL;
137                 m.msg_namelen = 0;
138                 m.msg_iov = iov;
139                 m.msg_iovlen = (size_t) iov_idx;
140                 m.msg_control = NULL;
141                 m.msg_controllen = 0;
142                 m.msg_flags = 0;
143 
144                 k = sendmsg(c->fd, &m, MSG_DONTWAIT);
145 
146                 for (i = 1; i < iov_idx; i++) {
147                     pa_memblock_release(mb[i]);
148                     pa_memblock_unref(mb[i]);
149                 }
150 
151                 c->sequence++;
152             } else
153                 k = 0;
154 
155             c->timestamp += (unsigned) (n/c->frame_size);
156 
157             if (k < 0) {
158                 if (errno != EAGAIN && errno != EINTR) /* If the queue is full, just ignore it */
159                     pa_log("sendmsg() failed: %s", pa_cstrerror(errno));
160                 return -1;
161             }
162 
163             if (r < 0 || pa_memblockq_get_length(q) < c->mtu)
164                 break;
165 
166             n = 0;
167             iov_idx = 1;
168         }
169     }
170 
171     return 0;
172 }
173 
pa_rtp_context_new_recv(int fd,uint8_t payload,const pa_sample_spec * ss,bool enable_opus)174 pa_rtp_context* pa_rtp_context_new_recv(int fd, uint8_t payload, const pa_sample_spec *ss, bool enable_opus) {
175     pa_rtp_context *c;
176 
177     pa_log_info("Initialising native RTP backend for receive");
178 
179     c = pa_xnew0(pa_rtp_context, 1);
180 
181     c->fd = fd;
182     c->payload = payload;
183     c->frame_size = pa_frame_size(ss);
184 
185     c->recv_buf_size = 2000;
186     c->recv_buf = pa_xmalloc(c->recv_buf_size);
187     pa_memchunk_reset(&c->memchunk);
188 
189     return c;
190 }
191 
pa_rtp_recv(pa_rtp_context * c,pa_memchunk * chunk,pa_mempool * pool,uint32_t * rtp_tstamp,struct timeval * tstamp)192 int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, uint32_t *rtp_tstamp, struct timeval *tstamp) {
193     int size;
194     size_t audio_length;
195     size_t metadata_length;
196     struct msghdr m;
197     struct cmsghdr *cm;
198     struct iovec iov;
199     uint32_t header;
200     uint32_t ssrc;
201     uint8_t payload;
202     unsigned cc;
203     uint8_t aux[1024];
204     bool found_tstamp = false;
205 
206     pa_assert(c);
207     pa_assert(chunk);
208 
209     pa_memchunk_reset(chunk);
210 
211     /* FIONREAD works on both BSD and Linux, but they do something different:
212      * - on Linux it returns the amount of bytes in the next datagram
213      * - on BSDs it returns the total amount of bytes in the output buffer; this can be
214      *   more than one datagram and includes headers
215      *
216      * So the result will be a lower bound of how many bytes are needed, but might not be
217      * the exact size of the buffer size needed.
218      */
219 
220     if (ioctl(c->fd, FIONREAD, &size) < 0) {
221         pa_log_warn("FIONREAD failed: %s", pa_cstrerror(errno));
222         goto fail;
223     }
224 
225     if (size <= 0) {
226         /* size can be 0 due to any of the following reasons:
227          *
228          * 1. Somebody sent us a perfectly valid zero-length UDP packet.
229          * 2. Somebody sent us a UDP packet with a bad CRC.
230          *
231          * It is unknown whether size can actually be less than zero.
232          *
233          * In the first case, the packet has to be read out, otherwise the
234          * kernel will tell us again and again about it, thus preventing
235          * reception of any further packets. So let's just read it out
236          * now and discard it later, when comparing the number of bytes
237          * received (0) with the number of bytes wanted (1, see below).
238          *
239          * In the second case, recvmsg() will fail, thus allowing us to
240          * return the error.
241          *
242          * Just to avoid passing zero-sized memchunks and NULL pointers to
243          * recvmsg(), let's force allocation of at least one byte by setting
244          * size to 1.
245          */
246         size = 1;
247     }
248 
249     /* Since size is a lower bound, also constrain it to an upper bound */
250     size = PA_MIN(size, 1<<16);
251 
252     if (c->recv_buf_size < (size_t) size) {
253         do
254             c->recv_buf_size *= 2;
255         while (c->recv_buf_size < (size_t) size);
256 
257         c->recv_buf = pa_xrealloc(c->recv_buf, c->recv_buf_size);
258     }
259 
260     pa_assert(c->recv_buf_size >= (size_t) size);
261 
262     iov.iov_base = c->recv_buf;
263     iov.iov_len = (size_t) size;
264 
265     m.msg_name = NULL;
266     m.msg_namelen = 0;
267     m.msg_iov = &iov;
268     m.msg_iovlen = 1;
269     m.msg_control = aux;
270     m.msg_controllen = sizeof(aux);
271     m.msg_flags = 0;
272 
273     size = recvmsg(c->fd, &m, 0);
274 
275     if (size < 0) {
276         if (errno != EAGAIN && errno != EINTR)
277             pa_log_warn("recvmsg() failed: %s", pa_cstrerror(errno));
278         goto fail;
279     }
280 
281     if (size < 12) {
282         pa_log_warn("RTP packet too short.");
283         goto fail;
284     }
285 
286     memcpy(&header, iov.iov_base, sizeof(uint32_t));
287     memcpy(rtp_tstamp, (uint8_t*) iov.iov_base + 4, sizeof(uint32_t));
288     memcpy(&ssrc, (uint8_t*) iov.iov_base + 8, sizeof(uint32_t));
289 
290     header = ntohl(header);
291     *rtp_tstamp = ntohl(*rtp_tstamp);
292     ssrc = ntohl(c->ssrc);
293 
294     if ((header >> 30) != 2) {
295         pa_log_warn("Unsupported RTP version.");
296         goto fail;
297     }
298 
299     if ((header >> 29) & 1) {
300         pa_log_warn("RTP padding not supported.");
301         goto fail;
302     }
303 
304     if ((header >> 28) & 1) {
305         pa_log_warn("RTP header extensions not supported.");
306         goto fail;
307     }
308 
309     if (ssrc != c->ssrc) {
310         pa_log_debug("Got unexpected SSRC");
311         goto fail;
312     }
313 
314     cc = (header >> 24) & 0xF;
315     payload = (uint8_t) ((header >> 16) & 127U);
316     c->sequence = (uint16_t) (header & 0xFFFFU);
317 
318     metadata_length = 12 + cc * 4;
319 
320     if (payload != c->payload) {
321         pa_log_debug("Got unexpected payload: %u", payload);
322         goto fail;
323     }
324 
325     if (metadata_length > (unsigned) size) {
326         pa_log_warn("RTP packet too short. (CSRC)");
327         goto fail;
328     }
329 
330     audio_length = size - metadata_length;
331 
332     if (audio_length % c->frame_size != 0) {
333         pa_log_warn("Bad RTP packet size.");
334         goto fail;
335     }
336 
337     if (c->memchunk.length < (unsigned) audio_length) {
338         size_t l;
339 
340         if (c->memchunk.memblock)
341             pa_memblock_unref(c->memchunk.memblock);
342 
343         l = PA_MAX((size_t) audio_length, pa_mempool_block_size_max(pool));
344 
345         c->memchunk.memblock = pa_memblock_new(pool, l);
346         c->memchunk.index = 0;
347         c->memchunk.length = pa_memblock_get_length(c->memchunk.memblock);
348     }
349 
350     memcpy(pa_memblock_acquire_chunk(&c->memchunk), c->recv_buf + metadata_length, audio_length);
351     pa_memblock_release(c->memchunk.memblock);
352 
353     chunk->memblock = pa_memblock_ref(c->memchunk.memblock);
354     chunk->index = c->memchunk.index;
355     chunk->length = audio_length;
356 
357     c->memchunk.index += audio_length;
358     c->memchunk.length -= audio_length;
359 
360     if (c->memchunk.length <= 0) {
361         pa_memblock_unref(c->memchunk.memblock);
362         pa_memchunk_reset(&c->memchunk);
363     }
364 
365     for (cm = CMSG_FIRSTHDR(&m); cm; cm = CMSG_NXTHDR(&m, cm))
366         if (cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_TIMESTAMP) {
367             memcpy(tstamp, CMSG_DATA(cm), sizeof(struct timeval));
368             found_tstamp = true;
369             break;
370         }
371 
372     if (!found_tstamp) {
373         pa_log_warn("Couldn't find SCM_TIMESTAMP data in auxiliary recvmsg() data!");
374         pa_zero(*tstamp);
375     }
376 
377     return 0;
378 
379 fail:
380     if (chunk->memblock)
381         pa_memblock_unref(chunk->memblock);
382 
383     return -1;
384 }
385 
pa_rtp_context_free(pa_rtp_context * c)386 void pa_rtp_context_free(pa_rtp_context *c) {
387     pa_assert(c);
388 
389     pa_assert_se(pa_close(c->fd) == 0);
390 
391     if (c->memchunk.memblock)
392         pa_memblock_unref(c->memchunk.memblock);
393 
394     pa_xfree(c->recv_buf);
395     pa_xfree(c);
396 }
397 
pa_rtp_context_get_frame_size(pa_rtp_context * c)398 size_t pa_rtp_context_get_frame_size(pa_rtp_context *c) {
399     return c->frame_size;
400 }
401 
pa_rtp_context_get_rtpoll_item(pa_rtp_context * c,pa_rtpoll * rtpoll)402 pa_rtpoll_item* pa_rtp_context_get_rtpoll_item(pa_rtp_context *c, pa_rtpoll *rtpoll) {
403     pa_rtpoll_item *item;
404     struct pollfd *p;
405 
406     item = pa_rtpoll_item_new(rtpoll, PA_RTPOLL_LATE, 1);
407 
408     p = pa_rtpoll_item_get_pollfd(item, NULL);
409     p->fd = c->fd;
410     p->events = POLLIN;
411     p->revents = 0;
412 
413     return item;
414 }
415