• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /***
2   This file is part of PulseAudio.
3 
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6 
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as
9   published by the Free Software Foundation; either version 2.1 of the
10   License, or (at your option) any later version.
11 
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   Lesser General Public License for more details.
16 
17   You should have received a copy of the GNU Lesser General Public
18   License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20 
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24 
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <unistd.h>
28 
29 #ifdef HAVE_NETINET_IN_H
30 #include <netinet/in.h>
31 #endif
32 
33 #include <pulse/xmalloc.h>
34 
35 #include <pulsecore/idxset.h>
36 #include <pulsecore/socket.h>
37 #include <pulsecore/queue.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/creds.h>
40 #include <pulsecore/refcnt.h>
41 #include <pulsecore/flist.h>
42 #include <pulsecore/macro.h>
43 
44 #include "pstream.h"
45 
46 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
47 #define PA_FLAG_SHMDATA     0x80000000LU
48 #define PA_FLAG_SHMDATA_MEMFD_BLOCK         0x20000000LU
49 #define PA_FLAG_SHMRELEASE  0x40000000LU
50 #define PA_FLAG_SHMREVOKE   0xC0000000LU
51 #define PA_FLAG_SHMMASK     0xFF000000LU
52 #define PA_FLAG_SEEKMASK    0x000000FFLU
53 #define PA_FLAG_SHMWRITABLE 0x00800000LU
54 
55 /* The sequence descriptor header consists of 5 32bit integers: */
56 enum {
57     PA_PSTREAM_DESCRIPTOR_LENGTH,
58     PA_PSTREAM_DESCRIPTOR_CHANNEL,
59     PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
60     PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
61     PA_PSTREAM_DESCRIPTOR_FLAGS,
62     PA_PSTREAM_DESCRIPTOR_MAX
63 };
64 
65 /* If we have an SHM block, this info follows the descriptor */
66 enum {
67     PA_PSTREAM_SHM_BLOCKID,
68     PA_PSTREAM_SHM_SHMID,
69     PA_PSTREAM_SHM_INDEX,
70     PA_PSTREAM_SHM_LENGTH,
71     PA_PSTREAM_SHM_MAX
72 };
73 
74 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
75 
76 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
77 
78 #define MINIBUF_SIZE (256)
79 
80 /* To allow uploading a single sample in one frame, this value should be the
81  * same size (16 MB) as PA_SCACHE_ENTRY_SIZE_MAX from pulsecore/core-scache.h.
82  */
83 #define FRAME_SIZE_MAX_ALLOW (1024*1024*16)
84 
85 /* Default memblock alignment used with pa_pstream_send_memblock()
86  */
87 #define DEFAULT_PSTREAM_MEMBLOCK_ALIGN (256)
88 
89 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
90 
91 struct item_info {
92     enum {
93         PA_PSTREAM_ITEM_PACKET,
94         PA_PSTREAM_ITEM_MEMBLOCK,
95         PA_PSTREAM_ITEM_SHMRELEASE,
96         PA_PSTREAM_ITEM_SHMREVOKE
97     } type;
98 
99     /* packet info */
100     pa_packet *packet;
101 #ifdef HAVE_CREDS
102     bool with_ancil_data;
103     pa_cmsg_ancil_data ancil_data;
104 #endif
105 
106     /* memblock info */
107     pa_memchunk chunk;
108     uint32_t channel;
109     int64_t offset;
110     pa_seek_mode_t seek_mode;
111 
112     /* release/revoke info */
113     uint32_t block_id;
114 };
115 
116 struct pstream_read {
117     pa_pstream_descriptor descriptor;
118     pa_memblock *memblock;
119     pa_packet *packet;
120     uint32_t shm_info[PA_PSTREAM_SHM_MAX];
121     void *data;
122     size_t index;
123 };
124 
125 struct pa_pstream {
126     PA_REFCNT_DECLARE;
127 
128     pa_mainloop_api *mainloop;
129     pa_defer_event *defer_event;
130     pa_iochannel *io;
131     pa_srbchannel *srb, *srbpending;
132     bool is_srbpending;
133 
134     pa_queue *send_queue;
135 
136     bool dead;
137 
138     struct {
139         union {
140             uint8_t minibuf[MINIBUF_SIZE];
141             pa_pstream_descriptor descriptor;
142         };
143         struct item_info* current;
144         void *data;
145         size_t index;
146         int minibuf_validsize;
147         pa_memchunk memchunk;
148     } write;
149 
150     struct pstream_read readio, readsrb;
151 
152     /* @use_shm: beside copying the full audio data to the other
153      * PA end, this pipe supports just sending references of the
154      * same audio data blocks if they reside in a SHM pool.
155      *
156      * @use_memfd: pipe supports sending SHM memfd block references
157      *
158      * @registered_memfd_ids: registered memfd pools SHM IDs. Check
159      * pa_pstream_register_memfd_mempool() for more information. */
160     bool use_shm, use_memfd;
161     bool non_registered_memfd_id_error_logged;
162     pa_idxset *registered_memfd_ids;
163 
164     pa_memimport *import;
165     pa_memexport *export;
166 
167     pa_pstream_packet_cb_t receive_packet_callback;
168     void *receive_packet_callback_userdata;
169 
170     pa_pstream_memblock_cb_t receive_memblock_callback;
171     void *receive_memblock_callback_userdata;
172 
173     pa_pstream_notify_cb_t drain_callback;
174     void *drain_callback_userdata;
175 
176     pa_pstream_notify_cb_t die_callback;
177     void *die_callback_userdata;
178 
179     pa_pstream_block_id_cb_t revoke_callback;
180     void *revoke_callback_userdata;
181 
182     pa_pstream_block_id_cb_t release_callback;
183     void *release_callback_userdata;
184 
185     pa_mempool *mempool;
186 
187 #ifdef HAVE_CREDS
188     pa_cmsg_ancil_data read_ancil_data, *write_ancil_data;
189     bool send_ancil_data_now;
190 #endif
191 };
192 
193 #ifdef HAVE_CREDS
194 /*
195  * memfd-backed SHM pools blocks transfer occur without passing the pool's
196  * fd every time, thus minimizing overhead and avoiding fd leaks. A
197  * REGISTER_MEMFD_SHMID command is sent, with the pool's memfd fd, very early
198  * on. This command has an ID that uniquely identifies the pool in question.
199  * Further pool's block references can then be exclusively done using such ID;
200  * the fd can be safely closed – on both ends – afterwards.
201  *
202  * On the sending side of this command, we want to close the passed fds
203  * directly after being sent. Meanwhile we're only allowed to asynchronously
204  * schedule packet writes to the pstream, so the job of closing passed fds is
205  * left to the pstream's actual writing function do_write(): it knows the
206  * exact point in time where the fds are passed to the other end through
207  * iochannels and the sendmsg() system call.
208  *
209  * Nonetheless not all code paths in the system desire their socket-passed
210  * fds to be closed after the send. srbchannel needs the passed fds to still
211  * be open for further communication. System-wide global memfd-backed pools
212  * also require the passed fd to be open: they pass the same fd, with the same
213  * ID registration mechanism, for each newly connected client to the system.
214  *
215  * So from all of the above, never close the ancillary fds by your own and
216  * always call below method instead. It takes care of closing the passed fds
217  * _only if allowed_ by the code paths that originally created them to do so.
218  * Moreover, it is multiple-invocations safe: failure handlers can, and
219  * should, call it for passed fds cleanup without worrying too much about
220  * the system state.
221  */
pa_cmsg_ancil_data_close_fds(struct pa_cmsg_ancil_data * ancil)222 void pa_cmsg_ancil_data_close_fds(struct pa_cmsg_ancil_data *ancil) {
223     if (ancil && ancil->nfd > 0 && ancil->close_fds_on_cleanup) {
224         int i;
225 
226         pa_assert(ancil->nfd <= MAX_ANCIL_DATA_FDS);
227 
228         for (i = 0; i < ancil->nfd; i++)
229             if (ancil->fds[i] != -1) {
230                 pa_assert_se(pa_close(ancil->fds[i]) == 0);
231                 ancil->fds[i] = -1;
232             }
233 
234         ancil->nfd = 0;
235         ancil->close_fds_on_cleanup = false;
236     }
237 }
238 #endif
239 
240 static int do_write(pa_pstream *p);
241 static int do_read(pa_pstream *p, struct pstream_read *re);
242 
do_pstream_read_write(pa_pstream * p)243 static void do_pstream_read_write(pa_pstream *p) {
244     pa_assert(p);
245     pa_assert(PA_REFCNT_VALUE(p) > 0);
246 
247     pa_pstream_ref(p);
248 
249     p->mainloop->defer_enable(p->defer_event, 0);
250 
251     if (!p->dead && p->srb) {
252         int r = 0;
253 
254         if(do_write(p) < 0)
255             goto fail;
256 
257         while (!p->dead && r == 0) {
258             r = do_read(p, &p->readsrb);
259             if (r < 0)
260                 goto fail;
261         }
262     }
263 
264     if (!p->dead && pa_iochannel_is_readable(p->io)) {
265         if (do_read(p, &p->readio) < 0)
266             goto fail;
267     } else if (!p->dead && pa_iochannel_is_hungup(p->io))
268         goto fail;
269 
270     while (!p->dead && pa_iochannel_is_writable(p->io)) {
271         int r = do_write(p);
272         if (r < 0)
273             goto fail;
274         if (r == 0)
275             break;
276     }
277 
278     pa_pstream_unref(p);
279     return;
280 
281 fail:
282 
283     if (p->die_callback)
284         p->die_callback(p, p->die_callback_userdata);
285 
286     pa_pstream_unlink(p);
287     pa_pstream_unref(p);
288 }
289 
srb_callback(pa_srbchannel * srb,void * userdata)290 static bool srb_callback(pa_srbchannel *srb, void *userdata) {
291     bool b;
292     pa_pstream *p = userdata;
293 
294     pa_assert(p);
295     pa_assert(PA_REFCNT_VALUE(p) > 0);
296     pa_assert(p->srb == srb);
297 
298     pa_pstream_ref(p);
299 
300     do_pstream_read_write(p);
301 
302     /* If either pstream or the srb is going away, return false.
303        We need to check this before p is destroyed. */
304     b = (PA_REFCNT_VALUE(p) > 1) && (p->srb == srb);
305     pa_pstream_unref(p);
306 
307     return b;
308 }
309 
io_callback(pa_iochannel * io,void * userdata)310 static void io_callback(pa_iochannel*io, void *userdata) {
311     pa_pstream *p = userdata;
312 
313     pa_assert(p);
314     pa_assert(PA_REFCNT_VALUE(p) > 0);
315     pa_assert(p->io == io);
316 
317     do_pstream_read_write(p);
318 }
319 
defer_callback(pa_mainloop_api * m,pa_defer_event * e,void * userdata)320 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
321     pa_pstream *p = userdata;
322 
323     pa_assert(p);
324     pa_assert(PA_REFCNT_VALUE(p) > 0);
325     pa_assert(p->defer_event == e);
326     pa_assert(p->mainloop == m);
327 
328     do_pstream_read_write(p);
329 }
330 
331 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
332 
pa_pstream_new(pa_mainloop_api * m,pa_iochannel * io,pa_mempool * pool)333 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
334     pa_pstream *p;
335 
336     pa_assert(m);
337     pa_assert(io);
338     pa_assert(pool);
339 
340     p = pa_xnew0(pa_pstream, 1);
341     PA_REFCNT_INIT(p);
342     p->io = io;
343     pa_iochannel_set_callback(io, io_callback, p);
344 
345     p->mainloop = m;
346     p->defer_event = m->defer_new(m, defer_callback, p);
347     m->defer_enable(p->defer_event, 0);
348 
349     p->send_queue = pa_queue_new();
350 
351     p->mempool = pool;
352 
353     /* We do importing unconditionally */
354     p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
355 
356     pa_iochannel_socket_set_rcvbuf(io, pa_mempool_block_size_max(p->mempool));
357     pa_iochannel_socket_set_sndbuf(io, pa_mempool_block_size_max(p->mempool));
358 
359     return p;
360 }
361 
362 /* Attach memfd<->SHM_ID mapping to given pstream and its memimport.
363  * Check pa_pstream_register_memfd_mempool() for further info.
364  *
365  * Caller owns the passed @memfd_fd and must close it down when appropriate. */
pa_pstream_attach_memfd_shmid(pa_pstream * p,unsigned shm_id,int memfd_fd)366 int pa_pstream_attach_memfd_shmid(pa_pstream *p, unsigned shm_id, int memfd_fd) {
367     int err = -1;
368 
369     pa_assert(memfd_fd != -1);
370 
371     if (!p->use_memfd) {
372         pa_log_warn("Received memfd ID registration request over a pipe "
373                     "that does not support memfds");
374         return err;
375     }
376 
377     if (pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
378         pa_log_warn("previously registered memfd SHM ID = %u", shm_id);
379         return err;
380     }
381 
382     if (pa_memimport_attach_memfd(p->import, shm_id, memfd_fd, true)) {
383         pa_log("Failed to create permanent mapping for memfd region with ID = %u", shm_id);
384         return err;
385     }
386 
387     pa_assert_se(pa_idxset_put(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL) == 0);
388     return 0;
389 }
390 
item_free(void * item)391 static void item_free(void *item) {
392     struct item_info *i = item;
393     pa_assert(i);
394 
395     if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
396         pa_assert(i->chunk.memblock);
397         pa_memblock_unref(i->chunk.memblock);
398     } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
399         pa_assert(i->packet);
400         pa_packet_unref(i->packet);
401     }
402 
403 #ifdef HAVE_CREDS
404     /* On error recovery paths, there might be lingering items
405      * on the pstream send queue and they are usually freed with
406      * a call to 'pa_queue_free(p->send_queue, item_free)'. Make
407      * sure we do not leak any fds in that case! */
408     if (i->with_ancil_data)
409         pa_cmsg_ancil_data_close_fds(&i->ancil_data);
410 #endif
411 
412     if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
413         pa_xfree(i);
414 }
415 
pstream_free(pa_pstream * p)416 static void pstream_free(pa_pstream *p) {
417     pa_assert(p);
418 
419     pa_pstream_unlink(p);
420 
421     pa_queue_free(p->send_queue, item_free);
422 
423     if (p->write.current)
424         item_free(p->write.current);
425 
426     if (p->write.memchunk.memblock)
427         pa_memblock_unref(p->write.memchunk.memblock);
428 
429     if (p->readsrb.memblock)
430         pa_memblock_unref(p->readsrb.memblock);
431 
432     if (p->readsrb.packet)
433         pa_packet_unref(p->readsrb.packet);
434 
435     if (p->readio.memblock)
436         pa_memblock_unref(p->readio.memblock);
437 
438     if (p->readio.packet)
439         pa_packet_unref(p->readio.packet);
440 
441     if (p->registered_memfd_ids)
442         pa_idxset_free(p->registered_memfd_ids, NULL);
443 
444     pa_xfree(p);
445 }
446 
pa_pstream_send_packet(pa_pstream * p,pa_packet * packet,pa_cmsg_ancil_data * ancil_data)447 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, pa_cmsg_ancil_data *ancil_data) {
448     struct item_info *i;
449 
450     pa_assert(p);
451     pa_assert(PA_REFCNT_VALUE(p) > 0);
452     pa_assert(packet);
453 
454     if (p->dead) {
455 #ifdef HAVE_CREDS
456         pa_cmsg_ancil_data_close_fds(ancil_data);
457 #endif
458         return;
459     }
460 
461     if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
462         i = pa_xnew(struct item_info, 1);
463 
464     i->type = PA_PSTREAM_ITEM_PACKET;
465     i->packet = pa_packet_ref(packet);
466 
467 #ifdef HAVE_CREDS
468     if ((i->with_ancil_data = !!ancil_data)) {
469         i->ancil_data = *ancil_data;
470         if (ancil_data->creds_valid)
471             pa_assert(ancil_data->nfd == 0);
472         else
473             pa_assert(ancil_data->nfd > 0);
474     }
475 #endif
476 
477     pa_queue_push(p->send_queue, i);
478     if (PaQueueGetLen(p->send_queue) >= 10) {  // 10 maybe have msg backlog
479         pa_log_warn("[MSG backlog]: PaQueueLen = %u", PaQueueGetLen(p->send_queue));
480     }
481 
482     p->mainloop->defer_enable(p->defer_event, 1);
483 }
484 
pa_pstream_send_memblock(pa_pstream * p,uint32_t channel,int64_t offset,pa_seek_mode_t seek_mode,const pa_memchunk * chunk,size_t align)485 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk, size_t align) {
486     size_t length, idx;
487     size_t bsm;
488 
489     pa_assert(p);
490     pa_assert(PA_REFCNT_VALUE(p) > 0);
491     pa_assert(channel != (uint32_t) -1);
492     pa_assert(chunk);
493 
494     if (p->dead)
495         return;
496 
497     idx = 0;
498     length = chunk->length;
499 
500     bsm = pa_mempool_block_size_max(p->mempool);
501 
502     if (align == 0)
503         align = DEFAULT_PSTREAM_MEMBLOCK_ALIGN;
504 
505     bsm = (bsm / align) * align;
506 
507     while (length > 0) {
508         struct item_info *i;
509         size_t n;
510 
511         if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
512             i = pa_xnew(struct item_info, 1);
513         i->type = PA_PSTREAM_ITEM_MEMBLOCK;
514 
515         n = PA_MIN(length, bsm);
516         i->chunk.index = chunk->index + idx;
517         i->chunk.length = n;
518         i->chunk.memblock = pa_memblock_ref(chunk->memblock);
519 
520         i->channel = channel;
521         i->offset = offset;
522         i->seek_mode = seek_mode;
523 #ifdef HAVE_CREDS
524         i->with_ancil_data = false;
525 #endif
526 
527         pa_queue_push(p->send_queue, i);
528 
529         idx += n;
530         length -= n;
531     }
532 
533     p->mainloop->defer_enable(p->defer_event, 1);
534 }
535 
pa_pstream_send_release(pa_pstream * p,uint32_t block_id)536 void pa_pstream_send_release(pa_pstream *p, uint32_t block_id) {
537     struct item_info *item;
538     pa_assert(p);
539     pa_assert(PA_REFCNT_VALUE(p) > 0);
540 
541     if (p->dead)
542         return;
543 
544 /*     pa_log("Releasing block %u", block_id); */
545 
546     if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
547         item = pa_xnew(struct item_info, 1);
548     item->type = PA_PSTREAM_ITEM_SHMRELEASE;
549     item->block_id = block_id;
550 #ifdef HAVE_CREDS
551     item->with_ancil_data = false;
552 #endif
553 
554     pa_queue_push(p->send_queue, item);
555     p->mainloop->defer_enable(p->defer_event, 1);
556 }
557 
558 /* might be called from thread context */
memimport_release_cb(pa_memimport * i,uint32_t block_id,void * userdata)559 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
560     pa_pstream *p = userdata;
561 
562     pa_assert(p);
563     pa_assert(PA_REFCNT_VALUE(p) > 0);
564 
565     if (p->dead)
566         return;
567 
568     if (p->release_callback)
569         p->release_callback(p, block_id, p->release_callback_userdata);
570     else
571         pa_pstream_send_release(p, block_id);
572 }
573 
pa_pstream_send_revoke(pa_pstream * p,uint32_t block_id)574 void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id) {
575     struct item_info *item;
576     pa_assert(p);
577     pa_assert(PA_REFCNT_VALUE(p) > 0);
578 
579     if (p->dead)
580         return;
581 /*     pa_log("Revoking block %u", block_id); */
582 
583     if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
584         item = pa_xnew(struct item_info, 1);
585     item->type = PA_PSTREAM_ITEM_SHMREVOKE;
586     item->block_id = block_id;
587 #ifdef HAVE_CREDS
588     item->with_ancil_data = false;
589 #endif
590 
591     pa_queue_push(p->send_queue, item);
592     p->mainloop->defer_enable(p->defer_event, 1);
593 }
594 
595 /* might be called from thread context */
memexport_revoke_cb(pa_memexport * e,uint32_t block_id,void * userdata)596 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
597     pa_pstream *p = userdata;
598 
599     pa_assert(p);
600     pa_assert(PA_REFCNT_VALUE(p) > 0);
601 
602     if (p->revoke_callback)
603         p->revoke_callback(p, block_id, p->revoke_callback_userdata);
604     else
605         pa_pstream_send_revoke(p, block_id);
606 }
607 
prepare_next_write_item(pa_pstream * p)608 static void prepare_next_write_item(pa_pstream *p) {
609     pa_assert(p);
610     pa_assert(PA_REFCNT_VALUE(p) > 0);
611 
612     p->write.current = pa_queue_pop(p->send_queue);
613 
614     if (!p->write.current)
615         return;
616     p->write.index = 0;
617     p->write.data = NULL;
618     p->write.minibuf_validsize = 0;
619     pa_memchunk_reset(&p->write.memchunk);
620 
621     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
622     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
623     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
624     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
625     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
626 
627     if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
628         size_t plen;
629 
630         pa_assert(p->write.current->packet);
631 
632         p->write.data = (void *) pa_packet_data(p->write.current->packet, &plen);
633         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) plen);
634 
635         if (plen <= MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
636             memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, plen);
637             p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + plen;
638         }
639 
640     } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
641 
642         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
643         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
644 
645     } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
646 
647         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
648         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
649 
650     } else {
651         uint32_t flags;
652         bool send_payload = true;
653 
654         pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
655         pa_assert(p->write.current->chunk.memblock);
656 
657         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
658         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
659         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
660 
661         flags = (uint32_t) (p->write.current->seek_mode & PA_FLAG_SEEKMASK);
662 
663         if (p->use_shm) {
664             pa_mem_type_t type;
665             uint32_t block_id, shm_id;
666             size_t offset, length;
667             uint32_t *shm_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE];
668             size_t shm_size = sizeof(uint32_t) * PA_PSTREAM_SHM_MAX;
669             pa_mempool *current_pool = pa_memblock_get_pool(p->write.current->chunk.memblock);
670             pa_memexport *current_export;
671 
672             if (p->mempool == current_pool)
673                 pa_assert_se(current_export = p->export);
674             else
675                 pa_assert_se(current_export = pa_memexport_new(current_pool, memexport_revoke_cb, p));
676 
677             if (pa_memexport_put(current_export,
678                                  p->write.current->chunk.memblock,
679                                  &type,
680                                  &block_id,
681                                  &shm_id,
682                                  &offset,
683                                  &length) >= 0) {
684 
685                 if (type == PA_MEM_TYPE_SHARED_POSIX)
686                     send_payload = false;
687 
688                 if (type == PA_MEM_TYPE_SHARED_MEMFD && p->use_memfd) {
689                     if (pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
690                         flags |= PA_FLAG_SHMDATA_MEMFD_BLOCK;
691                         send_payload = false;
692                     } else {
693                         if (!p->non_registered_memfd_id_error_logged) {
694                             pa_log("Cannot send block reference with non-registered memfd ID = %u", shm_id);
695                             pa_log("Falling back to copying full block data over socket");
696                             pa_log("There's a bug report about this: https://gitlab.freedesktop.org/pulseaudio/pulseaudio/issues/824");
697                             p->non_registered_memfd_id_error_logged = true;
698                         }
699                     }
700                 }
701 
702                 if (send_payload) {
703                     pa_assert_se(pa_memexport_process_release(current_export, block_id) == 0);
704                 } else {
705                     flags |= PA_FLAG_SHMDATA;
706                     if (pa_mempool_is_remote_writable(current_pool))
707                         flags |= PA_FLAG_SHMWRITABLE;
708 
709                     shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
710                     shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
711                     shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
712                     shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
713 
714                     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(shm_size);
715                     p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + shm_size;
716                 }
717             }
718 /*             else */
719 /*                 FIXME: Avoid memexport slot leaks. Call pa_memexport_process_release() */
720 /*                 pa_log_warn("Failed to export memory block."); */
721 
722             if (current_export != p->export)
723                 pa_memexport_free(current_export);
724             pa_mempool_unref(current_pool);
725         }
726 
727         if (send_payload) {
728             p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
729             p->write.memchunk = p->write.current->chunk;
730             pa_memblock_ref(p->write.memchunk.memblock);
731         }
732 
733         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
734     }
735 
736 #ifdef HAVE_CREDS
737     if ((p->send_ancil_data_now = p->write.current->with_ancil_data))
738         p->write_ancil_data = &p->write.current->ancil_data;
739 #endif
740 }
741 
check_srbpending(pa_pstream * p)742 static void check_srbpending(pa_pstream *p) {
743     if (!p->is_srbpending)
744         return;
745 
746     if (p->srb)
747         pa_srbchannel_free(p->srb);
748 
749     p->srb = p->srbpending;
750     p->is_srbpending = false;
751 
752     if (p->srb)
753         pa_srbchannel_set_callback(p->srb, srb_callback, p);
754 }
755 
do_write(pa_pstream * p)756 static int do_write(pa_pstream *p) {
757     void *d;
758     size_t l;
759     ssize_t r;
760     pa_memblock *release_memblock = NULL;
761 
762     pa_assert(p);
763     pa_assert(PA_REFCNT_VALUE(p) > 0);
764 
765     if (!p->write.current)
766         prepare_next_write_item(p);
767 
768     if (!p->write.current) {
769         /* The out queue is empty, so switching channels is safe */
770         check_srbpending(p);
771         return 0;
772     }
773 
774     if (p->write.minibuf_validsize > 0) {
775         d = p->write.minibuf + p->write.index;
776         l = p->write.minibuf_validsize - p->write.index;
777     } else if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
778         d = (uint8_t*) p->write.descriptor + p->write.index;
779         l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
780     } else {
781         pa_assert(p->write.data || p->write.memchunk.memblock);
782 
783         if (p->write.data)
784             d = p->write.data;
785         else {
786             d = pa_memblock_acquire_chunk(&p->write.memchunk);
787             release_memblock = p->write.memchunk.memblock;
788         }
789 
790         d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
791         l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
792     }
793 
794     pa_assert(l > 0);
795 
796 #ifdef HAVE_CREDS
797     if (p->send_ancil_data_now) {
798         if (p->write_ancil_data->creds_valid) {
799             pa_assert(p->write_ancil_data->nfd == 0);
800             if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_ancil_data->creds)) < 0)
801                 goto fail;
802         }
803         else
804             if ((r = pa_iochannel_write_with_fds(p->io, d, l, p->write_ancil_data->nfd, p->write_ancil_data->fds)) < 0)
805                 goto fail;
806 
807         pa_cmsg_ancil_data_close_fds(p->write_ancil_data);
808         p->send_ancil_data_now = false;
809     } else
810 #endif
811     if (p->srb)
812         r = pa_srbchannel_write(p->srb, d, l);
813     else if ((r = pa_iochannel_write(p->io, d, l)) < 0)
814         goto fail;
815 
816     if (release_memblock)
817         pa_memblock_release(release_memblock);
818 
819     p->write.index += (size_t) r;
820 
821     if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
822         pa_assert(p->write.current);
823         item_free(p->write.current);
824         p->write.current = NULL;
825 
826         if (p->write.memchunk.memblock)
827             pa_memblock_unref(p->write.memchunk.memblock);
828 
829         pa_memchunk_reset(&p->write.memchunk);
830 
831         if (p->drain_callback && !pa_pstream_is_pending(p))
832             p->drain_callback(p, p->drain_callback_userdata);
833     }
834 
835     return (size_t) r == l ? 1 : 0;
836 
837 fail:
838 #ifdef HAVE_CREDS
839     if (p->send_ancil_data_now)
840         pa_cmsg_ancil_data_close_fds(p->write_ancil_data);
841 #endif
842 
843     if (release_memblock)
844         pa_memblock_release(release_memblock);
845 
846     return -1;
847 }
848 
memblock_complete(pa_pstream * p,struct pstream_read * re)849 static void memblock_complete(pa_pstream *p, struct pstream_read *re) {
850     pa_memchunk chunk;
851     int64_t offset;
852 
853     if (!p->receive_memblock_callback)
854         return;
855 
856     chunk.memblock = re->memblock;
857     chunk.index = 0;
858     chunk.length = re->index - PA_PSTREAM_DESCRIPTOR_SIZE;
859 
860     offset = (int64_t) (
861              (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
862              (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
863 
864     p->receive_memblock_callback(
865         p,
866         ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
867         offset,
868         ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
869         &chunk,
870         p->receive_memblock_callback_userdata);
871 }
872 
do_read(pa_pstream * p,struct pstream_read * re)873 static int do_read(pa_pstream *p, struct pstream_read *re) {
874     void *d;
875     size_t l;
876     ssize_t r;
877     pa_memblock *release_memblock = NULL;
878     pa_assert(p);
879     pa_assert(PA_REFCNT_VALUE(p) > 0);
880 
881     if (re->index < PA_PSTREAM_DESCRIPTOR_SIZE) {
882         d = (uint8_t*) re->descriptor + re->index;
883         l = PA_PSTREAM_DESCRIPTOR_SIZE - re->index;
884     } else {
885         pa_assert(re->data || re->memblock);
886 
887         if (re->data)
888             d = re->data;
889         else {
890             d = pa_memblock_acquire(re->memblock);
891             release_memblock = re->memblock;
892         }
893 
894         d = (uint8_t*) d + re->index - PA_PSTREAM_DESCRIPTOR_SIZE;
895         l = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (re->index - PA_PSTREAM_DESCRIPTOR_SIZE);
896     }
897 
898     if (re == &p->readsrb) {
899         r = pa_srbchannel_read(p->srb, d, l);
900         if (r == 0) {
901             if (release_memblock)
902                 pa_memblock_release(release_memblock);
903             return 1;
904         }
905     }
906     else
907 #ifdef HAVE_CREDS
908     {
909         pa_cmsg_ancil_data b;
910 
911         if ((r = pa_iochannel_read_with_ancil_data(p->io, d, l, &b)) <= 0)
912             goto fail;
913 
914         if (b.creds_valid) {
915             p->read_ancil_data.creds_valid = true;
916             p->read_ancil_data.creds = b.creds;
917         }
918         if (b.nfd > 0) {
919             pa_assert(b.nfd <= MAX_ANCIL_DATA_FDS);
920             p->read_ancil_data.nfd = b.nfd;
921             memcpy(p->read_ancil_data.fds, b.fds, sizeof(int) * b.nfd);
922             p->read_ancil_data.close_fds_on_cleanup = b.close_fds_on_cleanup;
923         }
924     }
925 #else
926     if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
927         goto fail;
928 #endif
929 
930     if (release_memblock)
931         pa_memblock_release(release_memblock);
932 
933     re->index += (size_t) r;
934 
935     if (re->index == PA_PSTREAM_DESCRIPTOR_SIZE) {
936         uint32_t flags, length, channel;
937         /* Reading of frame descriptor complete */
938 
939         flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
940 
941         if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
942             pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
943             return -1;
944         }
945 
946         if (flags == PA_FLAG_SHMRELEASE) {
947 
948             /* This is a SHM memblock release frame with no payload */
949 
950 /*             pa_log("Got release frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
951 
952             pa_assert(p->export);
953             pa_memexport_process_release(p->export, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
954 
955             goto frame_done;
956 
957         } else if (flags == PA_FLAG_SHMREVOKE) {
958 
959             /* This is a SHM memblock revoke frame with no payload */
960 
961 /*             pa_log("Got revoke frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
962 
963             pa_assert(p->import);
964             pa_memimport_process_revoke(p->import, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
965 
966             goto frame_done;
967         }
968 
969         length = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
970 
971         if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
972             pa_log_warn("Received invalid frame size: %lu", (unsigned long) length);
973             return -1;
974         }
975 
976         pa_assert(!re->packet && !re->memblock);
977 
978         channel = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
979 
980         if (channel == (uint32_t) -1) {
981             size_t plen;
982 
983             if (flags != 0) {
984                 pa_log_warn("Received packet frame with invalid flags value.");
985                 return -1;
986             }
987 
988             /* Frame is a packet frame */
989             re->packet = pa_packet_new(length);
990             re->data = (void *) pa_packet_data(re->packet, &plen);
991 
992         } else {
993 
994             if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
995                 pa_log_warn("Received memblock frame with invalid seek mode.");
996                 return -1;
997             }
998 
999             if (((flags & PA_FLAG_SHMMASK) & PA_FLAG_SHMDATA) != 0) {
1000 
1001                 if (length != sizeof(re->shm_info)) {
1002                     pa_log_warn("Received SHM memblock frame with invalid frame length.");
1003                     return -1;
1004                 }
1005 
1006                 /* Frame is a memblock frame referencing an SHM memblock */
1007                 re->data = re->shm_info;
1008 
1009             } else if ((flags & PA_FLAG_SHMMASK) == 0) {
1010 
1011                 /* Frame is a memblock frame */
1012 
1013                 re->memblock = pa_memblock_new(p->mempool, length);
1014                 re->data = NULL;
1015             } else {
1016 
1017                 pa_log_warn("Received memblock frame with invalid flags value.");
1018                 return -1;
1019             }
1020         }
1021 
1022     } else if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
1023         /* Frame complete */
1024 
1025         if (re->memblock) {
1026             memblock_complete(p, re);
1027 
1028             /* This was a memblock frame. We can unref the memblock now */
1029             pa_memblock_unref(re->memblock);
1030 
1031         } else if (re->packet) {
1032 
1033             if (p->receive_packet_callback)
1034 #ifdef HAVE_CREDS
1035                 p->receive_packet_callback(p, re->packet, &p->read_ancil_data, p->receive_packet_callback_userdata);
1036 #else
1037                 p->receive_packet_callback(p, re->packet, NULL, p->receive_packet_callback_userdata);
1038 #endif
1039 
1040             pa_packet_unref(re->packet);
1041         } else {
1042             pa_memblock *b = NULL;
1043             uint32_t flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
1044             uint32_t shm_id = ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]);
1045             pa_mem_type_t type = (flags & PA_FLAG_SHMDATA_MEMFD_BLOCK) ?
1046                                  PA_MEM_TYPE_SHARED_MEMFD : PA_MEM_TYPE_SHARED_POSIX;
1047 
1048             pa_assert(((flags & PA_FLAG_SHMMASK) & PA_FLAG_SHMDATA) != 0);
1049             pa_assert(p->import);
1050 
1051             if (type == PA_MEM_TYPE_SHARED_MEMFD && p->use_memfd &&
1052                 !pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
1053 
1054                 if (pa_log_ratelimit(PA_LOG_ERROR))
1055                     pa_log("Ignoring received block reference with non-registered memfd ID = %u", shm_id);
1056 
1057             } else if (!(b = pa_memimport_get(p->import,
1058                                               type,
1059                                               ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]),
1060                                               shm_id,
1061                                               ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]),
1062                                               ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]),
1063                                               !!(flags & PA_FLAG_SHMWRITABLE)))) {
1064 
1065                 if (pa_log_ratelimit(PA_LOG_DEBUG))
1066                     pa_log_debug("Failed to import memory block.");
1067             }
1068 
1069             if (p->receive_memblock_callback) {
1070                 int64_t offset;
1071                 pa_memchunk chunk;
1072 
1073                 chunk.memblock = b;
1074                 chunk.index = 0;
1075                 chunk.length = b ? pa_memblock_get_length(b) : ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]);
1076 
1077                 offset = (int64_t) (
1078                         (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
1079                         (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
1080 
1081                 p->receive_memblock_callback(
1082                         p,
1083                         ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
1084                         offset,
1085                         ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
1086                         &chunk,
1087                         p->receive_memblock_callback_userdata);
1088             }
1089 
1090             if (b)
1091                 pa_memblock_unref(b);
1092         }
1093 
1094         goto frame_done;
1095     }
1096 
1097     return 0;
1098 
1099 frame_done:
1100     re->memblock = NULL;
1101     re->packet = NULL;
1102     re->index = 0;
1103     re->data = NULL;
1104 
1105 #ifdef HAVE_CREDS
1106     /* FIXME: Close received ancillary data fds if the pstream's
1107      * receive_packet_callback did not do so.
1108      *
1109      * Malicious clients can attach fds to unknown commands, or attach them
1110      * to commands that does not expect fds. By doing so, server will reach
1111      * its open fd limit and future clients' SHM transfers will always fail.
1112      */
1113     p->read_ancil_data.creds_valid = false;
1114     p->read_ancil_data.nfd = 0;
1115 #endif
1116 
1117     return 0;
1118 
1119 fail:
1120     if (release_memblock)
1121         pa_memblock_release(release_memblock);
1122 
1123     return -1;
1124 }
1125 
pa_pstream_set_die_callback(pa_pstream * p,pa_pstream_notify_cb_t cb,void * userdata)1126 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
1127     pa_assert(p);
1128     pa_assert(PA_REFCNT_VALUE(p) > 0);
1129 
1130     p->die_callback = cb;
1131     p->die_callback_userdata = userdata;
1132 }
1133 
pa_pstream_set_drain_callback(pa_pstream * p,pa_pstream_notify_cb_t cb,void * userdata)1134 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
1135     pa_assert(p);
1136     pa_assert(PA_REFCNT_VALUE(p) > 0);
1137 
1138     p->drain_callback = cb;
1139     p->drain_callback_userdata = userdata;
1140 }
1141 
pa_pstream_set_receive_packet_callback(pa_pstream * p,pa_pstream_packet_cb_t cb,void * userdata)1142 void pa_pstream_set_receive_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
1143     pa_assert(p);
1144     pa_assert(PA_REFCNT_VALUE(p) > 0);
1145 
1146     p->receive_packet_callback = cb;
1147     p->receive_packet_callback_userdata = userdata;
1148 }
1149 
pa_pstream_set_receive_memblock_callback(pa_pstream * p,pa_pstream_memblock_cb_t cb,void * userdata)1150 void pa_pstream_set_receive_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
1151     pa_assert(p);
1152     pa_assert(PA_REFCNT_VALUE(p) > 0);
1153 
1154     p->receive_memblock_callback = cb;
1155     p->receive_memblock_callback_userdata = userdata;
1156 }
1157 
pa_pstream_set_release_callback(pa_pstream * p,pa_pstream_block_id_cb_t cb,void * userdata)1158 void pa_pstream_set_release_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
1159     pa_assert(p);
1160     pa_assert(PA_REFCNT_VALUE(p) > 0);
1161 
1162     p->release_callback = cb;
1163     p->release_callback_userdata = userdata;
1164 }
1165 
pa_pstream_set_revoke_callback(pa_pstream * p,pa_pstream_block_id_cb_t cb,void * userdata)1166 void pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
1167     pa_assert(p);
1168     pa_assert(PA_REFCNT_VALUE(p) > 0);
1169 
1170     p->revoke_callback = cb;
1171     p->revoke_callback_userdata = userdata;
1172 }
1173 
pa_pstream_is_pending(pa_pstream * p)1174 bool pa_pstream_is_pending(pa_pstream *p) {
1175     bool b;
1176 
1177     pa_assert(p);
1178     pa_assert(PA_REFCNT_VALUE(p) > 0);
1179 
1180     if (p->dead)
1181         b = false;
1182     else
1183         b = p->write.current || !pa_queue_isempty(p->send_queue);
1184 
1185     return b;
1186 }
1187 
pa_pstream_unref(pa_pstream * p)1188 void pa_pstream_unref(pa_pstream*p) {
1189     pa_assert(p);
1190     pa_assert(PA_REFCNT_VALUE(p) > 0);
1191 
1192     if (PA_REFCNT_DEC(p) <= 0)
1193         pstream_free(p);
1194 }
1195 
pa_pstream_ref(pa_pstream * p)1196 pa_pstream* pa_pstream_ref(pa_pstream*p) {
1197     pa_assert(p);
1198     pa_assert(PA_REFCNT_VALUE(p) > 0);
1199 
1200     PA_REFCNT_INC(p);
1201     return p;
1202 }
1203 
pa_pstream_unlink(pa_pstream * p)1204 void pa_pstream_unlink(pa_pstream *p) {
1205     pa_assert(p);
1206 
1207     if (p->dead)
1208         return;
1209 
1210     p->dead = true;
1211 
1212     while (p->srb || p->is_srbpending) /* In theory there could be one active and one pending */
1213         pa_pstream_set_srbchannel(p, NULL);
1214 
1215     if (p->import) {
1216         pa_memimport_free(p->import);
1217         p->import = NULL;
1218     }
1219 
1220     if (p->export) {
1221         pa_memexport_free(p->export);
1222         p->export = NULL;
1223     }
1224 
1225     if (p->io) {
1226         pa_iochannel_free(p->io);
1227         p->io = NULL;
1228     }
1229 
1230     if (p->defer_event) {
1231         p->mainloop->defer_free(p->defer_event);
1232         p->defer_event = NULL;
1233     }
1234 
1235     p->die_callback = NULL;
1236     p->drain_callback = NULL;
1237     p->receive_packet_callback = NULL;
1238     p->receive_memblock_callback = NULL;
1239 }
1240 
pa_pstream_enable_shm(pa_pstream * p,bool enable)1241 void pa_pstream_enable_shm(pa_pstream *p, bool enable) {
1242     pa_assert(p);
1243     pa_assert(PA_REFCNT_VALUE(p) > 0);
1244 
1245     p->use_shm = enable;
1246 
1247     if (enable) {
1248 
1249         if (!p->export)
1250             p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
1251 
1252     } else {
1253 
1254         if (p->export) {
1255             pa_memexport_free(p->export);
1256             p->export = NULL;
1257         }
1258     }
1259 }
1260 
pa_pstream_enable_memfd(pa_pstream * p)1261 void pa_pstream_enable_memfd(pa_pstream *p) {
1262     pa_assert(p);
1263     pa_assert(PA_REFCNT_VALUE(p) > 0);
1264     pa_assert(p->use_shm);
1265 
1266     p->use_memfd = true;
1267 
1268     if (!p->registered_memfd_ids) {
1269         p->registered_memfd_ids = pa_idxset_new(NULL, NULL);
1270     }
1271 }
1272 
pa_pstream_get_shm(pa_pstream * p)1273 bool pa_pstream_get_shm(pa_pstream *p) {
1274     pa_assert(p);
1275     pa_assert(PA_REFCNT_VALUE(p) > 0);
1276 
1277     return p->use_shm;
1278 }
1279 
pa_pstream_get_memfd(pa_pstream * p)1280 bool pa_pstream_get_memfd(pa_pstream *p) {
1281     pa_assert(p);
1282     pa_assert(PA_REFCNT_VALUE(p) > 0);
1283 
1284     return p->use_memfd;
1285 }
1286 
pa_pstream_set_srbchannel(pa_pstream * p,pa_srbchannel * srb)1287 void pa_pstream_set_srbchannel(pa_pstream *p, pa_srbchannel *srb) {
1288     pa_assert(p);
1289     pa_assert(PA_REFCNT_VALUE(p) > 0 || srb == NULL);
1290 
1291     if (srb == p->srb)
1292         return;
1293 
1294     /* We can't handle quick switches between srbchannels. */
1295     pa_assert(!p->is_srbpending);
1296 
1297     p->srbpending = srb;
1298     p->is_srbpending = true;
1299 
1300     /* Switch immediately, if possible. */
1301     if (p->dead)
1302         check_srbpending(p);
1303     else
1304         do_write(p);
1305 }
1306