• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /***
2   This file is part of PulseAudio.
3 
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6 
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as
9   published by the Free Software Foundation; either version 2.1 of the
10   License, or (at your option) any later version.
11 
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   Lesser General Public License for more details.
16 
17   You should have received a copy of the GNU Lesser General Public
18   License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20 
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24 
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <unistd.h>
28 
29 #ifdef HAVE_NETINET_IN_H
30 #include <netinet/in.h>
31 #endif
32 
33 #include <pulse/xmalloc.h>
34 
35 #include <pulsecore/idxset.h>
36 #include <pulsecore/socket.h>
37 #include <pulsecore/queue.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/creds.h>
40 #include <pulsecore/refcnt.h>
41 #include <pulsecore/flist.h>
42 #include <pulsecore/macro.h>
43 
44 #include "pstream.h"
45 
46 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
47 #define PA_FLAG_SHMDATA     0x80000000LU
48 #define PA_FLAG_SHMDATA_MEMFD_BLOCK         0x20000000LU
49 #define PA_FLAG_SHMRELEASE  0x40000000LU
50 #define PA_FLAG_SHMREVOKE   0xC0000000LU
51 #define PA_FLAG_SHMMASK     0xFF000000LU
52 #define PA_FLAG_SEEKMASK    0x000000FFLU
53 #define PA_FLAG_SHMWRITABLE 0x00800000LU
54 
55 /* The sequence descriptor header consists of 5 32bit integers: */
56 enum {
57     PA_PSTREAM_DESCRIPTOR_LENGTH,
58     PA_PSTREAM_DESCRIPTOR_CHANNEL,
59     PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
60     PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
61     PA_PSTREAM_DESCRIPTOR_FLAGS,
62     PA_PSTREAM_DESCRIPTOR_MAX
63 };
64 
65 /* If we have an SHM block, this info follows the descriptor */
66 enum {
67     PA_PSTREAM_SHM_BLOCKID,
68     PA_PSTREAM_SHM_SHMID,
69     PA_PSTREAM_SHM_INDEX,
70     PA_PSTREAM_SHM_LENGTH,
71     PA_PSTREAM_SHM_MAX
72 };
73 
74 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
75 
76 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
77 
78 #define MINIBUF_SIZE (256)
79 
80 /* To allow uploading a single sample in one frame, this value should be the
81  * same size (16 MB) as PA_SCACHE_ENTRY_SIZE_MAX from pulsecore/core-scache.h.
82  */
83 #define FRAME_SIZE_MAX_ALLOW (1024*1024*16)
84 
85 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
86 
87 struct item_info {
88     enum {
89         PA_PSTREAM_ITEM_PACKET,
90         PA_PSTREAM_ITEM_MEMBLOCK,
91         PA_PSTREAM_ITEM_SHMRELEASE,
92         PA_PSTREAM_ITEM_SHMREVOKE
93     } type;
94 
95     /* packet info */
96     pa_packet *packet;
97 #ifdef HAVE_CREDS
98     bool with_ancil_data;
99     pa_cmsg_ancil_data ancil_data;
100 #endif
101 
102     /* memblock info */
103     pa_memchunk chunk;
104     uint32_t channel;
105     int64_t offset;
106     pa_seek_mode_t seek_mode;
107 
108     /* release/revoke info */
109     uint32_t block_id;
110 };
111 
112 struct pstream_read {
113     pa_pstream_descriptor descriptor;
114     pa_memblock *memblock;
115     pa_packet *packet;
116     uint32_t shm_info[PA_PSTREAM_SHM_MAX];
117     void *data;
118     size_t index;
119 };
120 
121 struct pa_pstream {
122     PA_REFCNT_DECLARE;
123 
124     pa_mainloop_api *mainloop;
125     pa_defer_event *defer_event;
126     pa_iochannel *io;
127     pa_srbchannel *srb, *srbpending;
128     bool is_srbpending;
129 
130     pa_queue *send_queue;
131 
132     bool dead;
133 
134     struct {
135         union {
136             uint8_t minibuf[MINIBUF_SIZE];
137             pa_pstream_descriptor descriptor;
138         };
139         struct item_info* current;
140         void *data;
141         size_t index;
142         int minibuf_validsize;
143         pa_memchunk memchunk;
144     } write;
145 
146     struct pstream_read readio, readsrb;
147 
148     /* @use_shm: beside copying the full audio data to the other
149      * PA end, this pipe supports just sending references of the
150      * same audio data blocks if they reside in a SHM pool.
151      *
152      * @use_memfd: pipe supports sending SHM memfd block references
153      *
154      * @registered_memfd_ids: registered memfd pools SHM IDs. Check
155      * pa_pstream_register_memfd_mempool() for more information. */
156     bool use_shm, use_memfd;
157     bool non_registered_memfd_id_error_logged;
158     pa_idxset *registered_memfd_ids;
159 
160     pa_memimport *import;
161     pa_memexport *export;
162 
163     pa_pstream_packet_cb_t receive_packet_callback;
164     void *receive_packet_callback_userdata;
165 
166     pa_pstream_memblock_cb_t receive_memblock_callback;
167     void *receive_memblock_callback_userdata;
168 
169     pa_pstream_notify_cb_t drain_callback;
170     void *drain_callback_userdata;
171 
172     pa_pstream_notify_cb_t die_callback;
173     void *die_callback_userdata;
174 
175     pa_pstream_block_id_cb_t revoke_callback;
176     void *revoke_callback_userdata;
177 
178     pa_pstream_block_id_cb_t release_callback;
179     void *release_callback_userdata;
180 
181     pa_mempool *mempool;
182 
183 #ifdef HAVE_CREDS
184     pa_cmsg_ancil_data read_ancil_data, *write_ancil_data;
185     bool send_ancil_data_now;
186 #endif
187 };
188 
189 #ifdef HAVE_CREDS
190 /*
191  * memfd-backed SHM pools blocks transfer occur without passing the pool's
192  * fd every time, thus minimizing overhead and avoiding fd leaks. A
193  * REGISTER_MEMFD_SHMID command is sent, with the pool's memfd fd, very early
194  * on. This command has an ID that uniquely identifies the pool in question.
195  * Further pool's block references can then be exclusively done using such ID;
196  * the fd can be safely closed – on both ends – afterwards.
197  *
198  * On the sending side of this command, we want to close the passed fds
199  * directly after being sent. Meanwhile we're only allowed to asynchronously
200  * schedule packet writes to the pstream, so the job of closing passed fds is
201  * left to the pstream's actual writing function do_write(): it knows the
202  * exact point in time where the fds are passed to the other end through
203  * iochannels and the sendmsg() system call.
204  *
205  * Nonetheless not all code paths in the system desire their socket-passed
206  * fds to be closed after the send. srbchannel needs the passed fds to still
207  * be open for further communication. System-wide global memfd-backed pools
208  * also require the passed fd to be open: they pass the same fd, with the same
209  * ID registration mechanism, for each newly connected client to the system.
210  *
211  * So from all of the above, never close the ancillary fds by your own and
212  * always call below method instead. It takes care of closing the passed fds
213  * _only if allowed_ by the code paths that originally created them to do so.
214  * Moreover, it is multiple-invocations safe: failure handlers can, and
215  * should, call it for passed fds cleanup without worrying too much about
216  * the system state.
217  */
pa_cmsg_ancil_data_close_fds(struct pa_cmsg_ancil_data * ancil)218 void pa_cmsg_ancil_data_close_fds(struct pa_cmsg_ancil_data *ancil) {
219     if (ancil && ancil->nfd > 0 && ancil->close_fds_on_cleanup) {
220         int i;
221 
222         pa_assert(ancil->nfd <= MAX_ANCIL_DATA_FDS);
223 
224         for (i = 0; i < ancil->nfd; i++)
225             if (ancil->fds[i] != -1) {
226                 pa_assert_se(pa_close(ancil->fds[i]) == 0);
227                 ancil->fds[i] = -1;
228             }
229 
230         ancil->nfd = 0;
231         ancil->close_fds_on_cleanup = false;
232     }
233 }
234 #endif
235 
236 static int do_write(pa_pstream *p);
237 static int do_read(pa_pstream *p, struct pstream_read *re);
238 
do_pstream_read_write(pa_pstream * p)239 static void do_pstream_read_write(pa_pstream *p) {
240     pa_assert(p);
241     pa_assert(PA_REFCNT_VALUE(p) > 0);
242 
243     pa_pstream_ref(p);
244 
245     p->mainloop->defer_enable(p->defer_event, 0);
246 
247     if (!p->dead && p->srb) {
248         int r = 0;
249 
250         if(do_write(p) < 0)
251             goto fail;
252 
253         while (!p->dead && r == 0) {
254             r = do_read(p, &p->readsrb);
255             if (r < 0)
256                 goto fail;
257         }
258     }
259 
260     if (!p->dead && pa_iochannel_is_readable(p->io)) {
261         if (do_read(p, &p->readio) < 0)
262             goto fail;
263     } else if (!p->dead && pa_iochannel_is_hungup(p->io))
264         goto fail;
265 
266     while (!p->dead && pa_iochannel_is_writable(p->io)) {
267         int r = do_write(p);
268         if (r < 0)
269             goto fail;
270         if (r == 0)
271             break;
272     }
273 
274     pa_pstream_unref(p);
275     return;
276 
277 fail:
278 
279     if (p->die_callback)
280         p->die_callback(p, p->die_callback_userdata);
281 
282     pa_pstream_unlink(p);
283     pa_pstream_unref(p);
284 }
285 
srb_callback(pa_srbchannel * srb,void * userdata)286 static bool srb_callback(pa_srbchannel *srb, void *userdata) {
287     bool b;
288     pa_pstream *p = userdata;
289 
290     pa_assert(p);
291     pa_assert(PA_REFCNT_VALUE(p) > 0);
292     pa_assert(p->srb == srb);
293 
294     pa_pstream_ref(p);
295 
296     do_pstream_read_write(p);
297 
298     /* If either pstream or the srb is going away, return false.
299        We need to check this before p is destroyed. */
300     b = (PA_REFCNT_VALUE(p) > 1) && (p->srb == srb);
301     pa_pstream_unref(p);
302 
303     return b;
304 }
305 
io_callback(pa_iochannel * io,void * userdata)306 static void io_callback(pa_iochannel*io, void *userdata) {
307     pa_pstream *p = userdata;
308 
309     pa_assert(p);
310     pa_assert(PA_REFCNT_VALUE(p) > 0);
311     pa_assert(p->io == io);
312 
313     do_pstream_read_write(p);
314 }
315 
defer_callback(pa_mainloop_api * m,pa_defer_event * e,void * userdata)316 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
317     pa_pstream *p = userdata;
318 
319     pa_assert(p);
320     pa_assert(PA_REFCNT_VALUE(p) > 0);
321     pa_assert(p->defer_event == e);
322     pa_assert(p->mainloop == m);
323 
324     do_pstream_read_write(p);
325 }
326 
327 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
328 
pa_pstream_new(pa_mainloop_api * m,pa_iochannel * io,pa_mempool * pool)329 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
330     pa_pstream *p;
331 
332     pa_assert(m);
333     pa_assert(io);
334     pa_assert(pool);
335 
336     p = pa_xnew0(pa_pstream, 1);
337     PA_REFCNT_INIT(p);
338     p->io = io;
339     pa_iochannel_set_callback(io, io_callback, p);
340 
341     p->mainloop = m;
342     p->defer_event = m->defer_new(m, defer_callback, p);
343     m->defer_enable(p->defer_event, 0);
344 
345     p->send_queue = pa_queue_new();
346 
347     p->mempool = pool;
348 
349     /* We do importing unconditionally */
350     p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
351 
352     pa_iochannel_socket_set_rcvbuf(io, pa_mempool_block_size_max(p->mempool));
353     pa_iochannel_socket_set_sndbuf(io, pa_mempool_block_size_max(p->mempool));
354 
355     return p;
356 }
357 
358 /* Attach memfd<->SHM_ID mapping to given pstream and its memimport.
359  * Check pa_pstream_register_memfd_mempool() for further info.
360  *
361  * Caller owns the passed @memfd_fd and must close it down when appropriate. */
pa_pstream_attach_memfd_shmid(pa_pstream * p,unsigned shm_id,int memfd_fd)362 int pa_pstream_attach_memfd_shmid(pa_pstream *p, unsigned shm_id, int memfd_fd) {
363     int err = -1;
364 
365     pa_assert(memfd_fd != -1);
366 
367     if (!p->use_memfd) {
368         pa_log_warn("Received memfd ID registration request over a pipe "
369                     "that does not support memfds");
370         return err;
371     }
372 
373     if (pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
374         pa_log_warn("previously registered memfd SHM ID = %u", shm_id);
375         return err;
376     }
377 
378     if (pa_memimport_attach_memfd(p->import, shm_id, memfd_fd, true)) {
379         pa_log("Failed to create permanent mapping for memfd region with ID = %u", shm_id);
380         return err;
381     }
382 
383     pa_assert_se(pa_idxset_put(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL) == 0);
384     return 0;
385 }
386 
item_free(void * item)387 static void item_free(void *item) {
388     struct item_info *i = item;
389     pa_assert(i);
390 
391     if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
392         pa_assert(i->chunk.memblock);
393         pa_memblock_unref(i->chunk.memblock);
394     } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
395         pa_assert(i->packet);
396         pa_packet_unref(i->packet);
397     }
398 
399 #ifdef HAVE_CREDS
400     /* On error recovery paths, there might be lingering items
401      * on the pstream send queue and they are usually freed with
402      * a call to 'pa_queue_free(p->send_queue, item_free)'. Make
403      * sure we do not leak any fds in that case! */
404     if (i->with_ancil_data)
405         pa_cmsg_ancil_data_close_fds(&i->ancil_data);
406 #endif
407 
408     if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
409         pa_xfree(i);
410 }
411 
pstream_free(pa_pstream * p)412 static void pstream_free(pa_pstream *p) {
413     pa_assert(p);
414 
415     pa_pstream_unlink(p);
416 
417     pa_queue_free(p->send_queue, item_free);
418 
419     if (p->write.current)
420         item_free(p->write.current);
421 
422     if (p->write.memchunk.memblock)
423         pa_memblock_unref(p->write.memchunk.memblock);
424 
425     if (p->readsrb.memblock)
426         pa_memblock_unref(p->readsrb.memblock);
427 
428     if (p->readsrb.packet)
429         pa_packet_unref(p->readsrb.packet);
430 
431     if (p->readio.memblock)
432         pa_memblock_unref(p->readio.memblock);
433 
434     if (p->readio.packet)
435         pa_packet_unref(p->readio.packet);
436 
437     if (p->registered_memfd_ids)
438         pa_idxset_free(p->registered_memfd_ids, NULL);
439 
440     pa_xfree(p);
441 }
442 
pa_pstream_send_packet(pa_pstream * p,pa_packet * packet,pa_cmsg_ancil_data * ancil_data)443 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, pa_cmsg_ancil_data *ancil_data) {
444     struct item_info *i;
445 
446     pa_assert(p);
447     pa_assert(PA_REFCNT_VALUE(p) > 0);
448     pa_assert(packet);
449 
450     if (p->dead) {
451 #ifdef HAVE_CREDS
452         pa_cmsg_ancil_data_close_fds(ancil_data);
453 #endif
454         return;
455     }
456 
457     if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
458         i = pa_xnew(struct item_info, 1);
459 
460     i->type = PA_PSTREAM_ITEM_PACKET;
461     i->packet = pa_packet_ref(packet);
462 
463 #ifdef HAVE_CREDS
464     if ((i->with_ancil_data = !!ancil_data)) {
465         i->ancil_data = *ancil_data;
466         if (ancil_data->creds_valid)
467             pa_assert(ancil_data->nfd == 0);
468         else
469             pa_assert(ancil_data->nfd > 0);
470     }
471 #endif
472 
473     pa_queue_push(p->send_queue, i);
474 
475     p->mainloop->defer_enable(p->defer_event, 1);
476 }
477 
pa_pstream_send_memblock(pa_pstream * p,uint32_t channel,int64_t offset,pa_seek_mode_t seek_mode,const pa_memchunk * chunk)478 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
479     size_t length, idx;
480     size_t bsm;
481 
482     pa_assert(p);
483     pa_assert(PA_REFCNT_VALUE(p) > 0);
484     pa_assert(channel != (uint32_t) -1);
485     pa_assert(chunk);
486 
487     if (p->dead)
488         return;
489 
490     idx = 0;
491     length = chunk->length;
492 
493     bsm = pa_mempool_block_size_max(p->mempool);
494 
495     while (length > 0) {
496         struct item_info *i;
497         size_t n;
498 
499         if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
500             i = pa_xnew(struct item_info, 1);
501         i->type = PA_PSTREAM_ITEM_MEMBLOCK;
502 
503         n = PA_MIN(length, bsm);
504         i->chunk.index = chunk->index + idx;
505         i->chunk.length = n;
506         i->chunk.memblock = pa_memblock_ref(chunk->memblock);
507 
508         i->channel = channel;
509         i->offset = offset;
510         i->seek_mode = seek_mode;
511 #ifdef HAVE_CREDS
512         i->with_ancil_data = false;
513 #endif
514 
515         pa_queue_push(p->send_queue, i);
516 
517         idx += n;
518         length -= n;
519     }
520 
521     p->mainloop->defer_enable(p->defer_event, 1);
522 }
523 
pa_pstream_send_release(pa_pstream * p,uint32_t block_id)524 void pa_pstream_send_release(pa_pstream *p, uint32_t block_id) {
525     struct item_info *item;
526     pa_assert(p);
527     pa_assert(PA_REFCNT_VALUE(p) > 0);
528 
529     if (p->dead)
530         return;
531 
532 /*     pa_log("Releasing block %u", block_id); */
533 
534     if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
535         item = pa_xnew(struct item_info, 1);
536     item->type = PA_PSTREAM_ITEM_SHMRELEASE;
537     item->block_id = block_id;
538 #ifdef HAVE_CREDS
539     item->with_ancil_data = false;
540 #endif
541 
542     pa_queue_push(p->send_queue, item);
543     p->mainloop->defer_enable(p->defer_event, 1);
544 }
545 
546 /* might be called from thread context */
memimport_release_cb(pa_memimport * i,uint32_t block_id,void * userdata)547 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
548     pa_pstream *p = userdata;
549 
550     pa_assert(p);
551     pa_assert(PA_REFCNT_VALUE(p) > 0);
552 
553     if (p->dead)
554         return;
555 
556     if (p->release_callback)
557         p->release_callback(p, block_id, p->release_callback_userdata);
558     else
559         pa_pstream_send_release(p, block_id);
560 }
561 
pa_pstream_send_revoke(pa_pstream * p,uint32_t block_id)562 void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id) {
563     struct item_info *item;
564     pa_assert(p);
565     pa_assert(PA_REFCNT_VALUE(p) > 0);
566 
567     if (p->dead)
568         return;
569 /*     pa_log("Revoking block %u", block_id); */
570 
571     if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
572         item = pa_xnew(struct item_info, 1);
573     item->type = PA_PSTREAM_ITEM_SHMREVOKE;
574     item->block_id = block_id;
575 #ifdef HAVE_CREDS
576     item->with_ancil_data = false;
577 #endif
578 
579     pa_queue_push(p->send_queue, item);
580     p->mainloop->defer_enable(p->defer_event, 1);
581 }
582 
583 /* might be called from thread context */
memexport_revoke_cb(pa_memexport * e,uint32_t block_id,void * userdata)584 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
585     pa_pstream *p = userdata;
586 
587     pa_assert(p);
588     pa_assert(PA_REFCNT_VALUE(p) > 0);
589 
590     if (p->revoke_callback)
591         p->revoke_callback(p, block_id, p->revoke_callback_userdata);
592     else
593         pa_pstream_send_revoke(p, block_id);
594 }
595 
prepare_next_write_item(pa_pstream * p)596 static void prepare_next_write_item(pa_pstream *p) {
597     pa_assert(p);
598     pa_assert(PA_REFCNT_VALUE(p) > 0);
599 
600     p->write.current = pa_queue_pop(p->send_queue);
601 
602     if (!p->write.current)
603         return;
604     p->write.index = 0;
605     p->write.data = NULL;
606     p->write.minibuf_validsize = 0;
607     pa_memchunk_reset(&p->write.memchunk);
608 
609     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
610     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
611     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
612     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
613     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
614 
615     if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
616         size_t plen;
617 
618         pa_assert(p->write.current->packet);
619 
620         p->write.data = (void *) pa_packet_data(p->write.current->packet, &plen);
621         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) plen);
622 
623         if (plen <= MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
624             memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, plen);
625             p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + plen;
626         }
627 
628     } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
629 
630         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
631         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
632 
633     } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
634 
635         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
636         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
637 
638     } else {
639         uint32_t flags;
640         bool send_payload = true;
641 
642         pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
643         pa_assert(p->write.current->chunk.memblock);
644 
645         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
646         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
647         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
648 
649         flags = (uint32_t) (p->write.current->seek_mode & PA_FLAG_SEEKMASK);
650 
651         if (p->use_shm) {
652             pa_mem_type_t type;
653             uint32_t block_id, shm_id;
654             size_t offset, length;
655             uint32_t *shm_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE];
656             size_t shm_size = sizeof(uint32_t) * PA_PSTREAM_SHM_MAX;
657             pa_mempool *current_pool = pa_memblock_get_pool(p->write.current->chunk.memblock);
658             pa_memexport *current_export;
659 
660             if (p->mempool == current_pool)
661                 pa_assert_se(current_export = p->export);
662             else
663                 pa_assert_se(current_export = pa_memexport_new(current_pool, memexport_revoke_cb, p));
664 
665             if (pa_memexport_put(current_export,
666                                  p->write.current->chunk.memblock,
667                                  &type,
668                                  &block_id,
669                                  &shm_id,
670                                  &offset,
671                                  &length) >= 0) {
672 
673                 if (type == PA_MEM_TYPE_SHARED_POSIX)
674                     send_payload = false;
675 
676                 if (type == PA_MEM_TYPE_SHARED_MEMFD && p->use_memfd) {
677                     if (pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
678                         flags |= PA_FLAG_SHMDATA_MEMFD_BLOCK;
679                         send_payload = false;
680                     } else {
681                         if (!p->non_registered_memfd_id_error_logged) {
682                             pa_log("Cannot send block reference with non-registered memfd ID = %u", shm_id);
683                             pa_log("Falling back to copying full block data over socket");
684                             pa_log("There's a bug report about this: https://gitlab.freedesktop.org/pulseaudio/pulseaudio/issues/824");
685                             p->non_registered_memfd_id_error_logged = true;
686                         }
687                     }
688                 }
689 
690                 if (send_payload) {
691                     pa_assert_se(pa_memexport_process_release(current_export, block_id) == 0);
692                 } else {
693                     flags |= PA_FLAG_SHMDATA;
694                     if (pa_mempool_is_remote_writable(current_pool))
695                         flags |= PA_FLAG_SHMWRITABLE;
696 
697                     shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
698                     shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
699                     shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
700                     shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
701 
702                     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(shm_size);
703                     p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + shm_size;
704                 }
705             }
706 /*             else */
707 /*                 FIXME: Avoid memexport slot leaks. Call pa_memexport_process_release() */
708 /*                 pa_log_warn("Failed to export memory block."); */
709 
710             if (current_export != p->export)
711                 pa_memexport_free(current_export);
712             pa_mempool_unref(current_pool);
713         }
714 
715         if (send_payload) {
716             p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
717             p->write.memchunk = p->write.current->chunk;
718             pa_memblock_ref(p->write.memchunk.memblock);
719         }
720 
721         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
722     }
723 
724 #ifdef HAVE_CREDS
725     if ((p->send_ancil_data_now = p->write.current->with_ancil_data))
726         p->write_ancil_data = &p->write.current->ancil_data;
727 #endif
728 }
729 
check_srbpending(pa_pstream * p)730 static void check_srbpending(pa_pstream *p) {
731     if (!p->is_srbpending)
732         return;
733 
734     if (p->srb)
735         pa_srbchannel_free(p->srb);
736 
737     p->srb = p->srbpending;
738     p->is_srbpending = false;
739 
740     if (p->srb)
741         pa_srbchannel_set_callback(p->srb, srb_callback, p);
742 }
743 
do_write(pa_pstream * p)744 static int do_write(pa_pstream *p) {
745     void *d;
746     size_t l;
747     ssize_t r;
748     pa_memblock *release_memblock = NULL;
749 
750     pa_assert(p);
751     pa_assert(PA_REFCNT_VALUE(p) > 0);
752 
753     if (!p->write.current)
754         prepare_next_write_item(p);
755 
756     if (!p->write.current) {
757         /* The out queue is empty, so switching channels is safe */
758         check_srbpending(p);
759         return 0;
760     }
761 
762     if (p->write.minibuf_validsize > 0) {
763         d = p->write.minibuf + p->write.index;
764         l = p->write.minibuf_validsize - p->write.index;
765     } else if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
766         d = (uint8_t*) p->write.descriptor + p->write.index;
767         l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
768     } else {
769         pa_assert(p->write.data || p->write.memchunk.memblock);
770 
771         if (p->write.data)
772             d = p->write.data;
773         else {
774             d = pa_memblock_acquire_chunk(&p->write.memchunk);
775             release_memblock = p->write.memchunk.memblock;
776         }
777 
778         d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
779         l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
780     }
781 
782     pa_assert(l > 0);
783 
784 #ifdef HAVE_CREDS
785     if (p->send_ancil_data_now) {
786         if (p->write_ancil_data->creds_valid) {
787             pa_assert(p->write_ancil_data->nfd == 0);
788             if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_ancil_data->creds)) < 0)
789                 goto fail;
790         }
791         else
792             if ((r = pa_iochannel_write_with_fds(p->io, d, l, p->write_ancil_data->nfd, p->write_ancil_data->fds)) < 0)
793                 goto fail;
794 
795         pa_cmsg_ancil_data_close_fds(p->write_ancil_data);
796         p->send_ancil_data_now = false;
797     } else
798 #endif
799     if (p->srb)
800         r = pa_srbchannel_write(p->srb, d, l);
801     else if ((r = pa_iochannel_write(p->io, d, l)) < 0)
802         goto fail;
803 
804     if (release_memblock)
805         pa_memblock_release(release_memblock);
806 
807     p->write.index += (size_t) r;
808 
809     if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
810         pa_assert(p->write.current);
811         item_free(p->write.current);
812         p->write.current = NULL;
813 
814         if (p->write.memchunk.memblock)
815             pa_memblock_unref(p->write.memchunk.memblock);
816 
817         pa_memchunk_reset(&p->write.memchunk);
818 
819         if (p->drain_callback && !pa_pstream_is_pending(p))
820             p->drain_callback(p, p->drain_callback_userdata);
821     }
822 
823     return (size_t) r == l ? 1 : 0;
824 
825 fail:
826 #ifdef HAVE_CREDS
827     if (p->send_ancil_data_now)
828         pa_cmsg_ancil_data_close_fds(p->write_ancil_data);
829 #endif
830 
831     if (release_memblock)
832         pa_memblock_release(release_memblock);
833 
834     return -1;
835 }
836 
memblock_complete(pa_pstream * p,struct pstream_read * re)837 static void memblock_complete(pa_pstream *p, struct pstream_read *re) {
838     pa_memchunk chunk;
839     int64_t offset;
840 
841     if (!p->receive_memblock_callback)
842         return;
843 
844     chunk.memblock = re->memblock;
845     chunk.index = 0;
846     chunk.length = re->index - PA_PSTREAM_DESCRIPTOR_SIZE;
847 
848     offset = (int64_t) (
849              (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
850              (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
851 
852     p->receive_memblock_callback(
853         p,
854         ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
855         offset,
856         ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
857         &chunk,
858         p->receive_memblock_callback_userdata);
859 }
860 
do_read(pa_pstream * p,struct pstream_read * re)861 static int do_read(pa_pstream *p, struct pstream_read *re) {
862     void *d;
863     size_t l;
864     ssize_t r;
865     pa_memblock *release_memblock = NULL;
866     pa_assert(p);
867     pa_assert(PA_REFCNT_VALUE(p) > 0);
868 
869     if (re->index < PA_PSTREAM_DESCRIPTOR_SIZE) {
870         d = (uint8_t*) re->descriptor + re->index;
871         l = PA_PSTREAM_DESCRIPTOR_SIZE - re->index;
872     } else {
873         pa_assert(re->data || re->memblock);
874 
875         if (re->data)
876             d = re->data;
877         else {
878             d = pa_memblock_acquire(re->memblock);
879             release_memblock = re->memblock;
880         }
881 
882         d = (uint8_t*) d + re->index - PA_PSTREAM_DESCRIPTOR_SIZE;
883         l = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (re->index - PA_PSTREAM_DESCRIPTOR_SIZE);
884     }
885 
886     if (re == &p->readsrb) {
887         r = pa_srbchannel_read(p->srb, d, l);
888         if (r == 0) {
889             if (release_memblock)
890                 pa_memblock_release(release_memblock);
891             return 1;
892         }
893     }
894     else
895 #ifdef HAVE_CREDS
896     {
897         pa_cmsg_ancil_data b;
898 
899         if ((r = pa_iochannel_read_with_ancil_data(p->io, d, l, &b)) <= 0)
900             goto fail;
901 
902         if (b.creds_valid) {
903             p->read_ancil_data.creds_valid = true;
904             p->read_ancil_data.creds = b.creds;
905         }
906         if (b.nfd > 0) {
907             pa_assert(b.nfd <= MAX_ANCIL_DATA_FDS);
908             p->read_ancil_data.nfd = b.nfd;
909             memcpy(p->read_ancil_data.fds, b.fds, sizeof(int) * b.nfd);
910             p->read_ancil_data.close_fds_on_cleanup = b.close_fds_on_cleanup;
911         }
912     }
913 #else
914     if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
915         goto fail;
916 #endif
917 
918     if (release_memblock)
919         pa_memblock_release(release_memblock);
920 
921     re->index += (size_t) r;
922 
923     if (re->index == PA_PSTREAM_DESCRIPTOR_SIZE) {
924         uint32_t flags, length, channel;
925         /* Reading of frame descriptor complete */
926 
927         flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
928 
929         if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
930             pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
931             return -1;
932         }
933 
934         if (flags == PA_FLAG_SHMRELEASE) {
935 
936             /* This is a SHM memblock release frame with no payload */
937 
938 /*             pa_log("Got release frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
939 
940             pa_assert(p->export);
941             pa_memexport_process_release(p->export, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
942 
943             goto frame_done;
944 
945         } else if (flags == PA_FLAG_SHMREVOKE) {
946 
947             /* This is a SHM memblock revoke frame with no payload */
948 
949 /*             pa_log("Got revoke frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
950 
951             pa_assert(p->import);
952             pa_memimport_process_revoke(p->import, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
953 
954             goto frame_done;
955         }
956 
957         length = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
958 
959         if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
960             pa_log_warn("Received invalid frame size: %lu", (unsigned long) length);
961             return -1;
962         }
963 
964         pa_assert(!re->packet && !re->memblock);
965 
966         channel = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
967 
968         if (channel == (uint32_t) -1) {
969             size_t plen;
970 
971             if (flags != 0) {
972                 pa_log_warn("Received packet frame with invalid flags value.");
973                 return -1;
974             }
975 
976             /* Frame is a packet frame */
977             re->packet = pa_packet_new(length);
978             re->data = (void *) pa_packet_data(re->packet, &plen);
979 
980         } else {
981 
982             if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
983                 pa_log_warn("Received memblock frame with invalid seek mode.");
984                 return -1;
985             }
986 
987             if (((flags & PA_FLAG_SHMMASK) & PA_FLAG_SHMDATA) != 0) {
988 
989                 if (length != sizeof(re->shm_info)) {
990                     pa_log_warn("Received SHM memblock frame with invalid frame length.");
991                     return -1;
992                 }
993 
994                 /* Frame is a memblock frame referencing an SHM memblock */
995                 re->data = re->shm_info;
996 
997             } else if ((flags & PA_FLAG_SHMMASK) == 0) {
998 
999                 /* Frame is a memblock frame */
1000 
1001                 re->memblock = pa_memblock_new(p->mempool, length);
1002                 re->data = NULL;
1003             } else {
1004 
1005                 pa_log_warn("Received memblock frame with invalid flags value.");
1006                 return -1;
1007             }
1008         }
1009 
1010     } else if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
1011         /* Frame complete */
1012 
1013         if (re->memblock) {
1014             memblock_complete(p, re);
1015 
1016             /* This was a memblock frame. We can unref the memblock now */
1017             pa_memblock_unref(re->memblock);
1018 
1019         } else if (re->packet) {
1020 
1021             if (p->receive_packet_callback)
1022 #ifdef HAVE_CREDS
1023                 p->receive_packet_callback(p, re->packet, &p->read_ancil_data, p->receive_packet_callback_userdata);
1024 #else
1025                 p->receive_packet_callback(p, re->packet, NULL, p->receive_packet_callback_userdata);
1026 #endif
1027 
1028             pa_packet_unref(re->packet);
1029         } else {
1030             pa_memblock *b = NULL;
1031             uint32_t flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
1032             uint32_t shm_id = ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]);
1033             pa_mem_type_t type = (flags & PA_FLAG_SHMDATA_MEMFD_BLOCK) ?
1034                                  PA_MEM_TYPE_SHARED_MEMFD : PA_MEM_TYPE_SHARED_POSIX;
1035 
1036             pa_assert(((flags & PA_FLAG_SHMMASK) & PA_FLAG_SHMDATA) != 0);
1037             pa_assert(p->import);
1038 
1039             if (type == PA_MEM_TYPE_SHARED_MEMFD && p->use_memfd &&
1040                 !pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
1041 
1042                 if (pa_log_ratelimit(PA_LOG_ERROR))
1043                     pa_log("Ignoring received block reference with non-registered memfd ID = %u", shm_id);
1044 
1045             } else if (!(b = pa_memimport_get(p->import,
1046                                               type,
1047                                               ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]),
1048                                               shm_id,
1049                                               ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]),
1050                                               ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]),
1051                                               !!(flags & PA_FLAG_SHMWRITABLE)))) {
1052 
1053                 if (pa_log_ratelimit(PA_LOG_DEBUG))
1054                     pa_log_debug("Failed to import memory block.");
1055             }
1056 
1057             if (p->receive_memblock_callback) {
1058                 int64_t offset;
1059                 pa_memchunk chunk;
1060 
1061                 chunk.memblock = b;
1062                 chunk.index = 0;
1063                 chunk.length = b ? pa_memblock_get_length(b) : ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]);
1064 
1065                 offset = (int64_t) (
1066                         (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
1067                         (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
1068 
1069                 p->receive_memblock_callback(
1070                         p,
1071                         ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
1072                         offset,
1073                         ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
1074                         &chunk,
1075                         p->receive_memblock_callback_userdata);
1076             }
1077 
1078             if (b)
1079                 pa_memblock_unref(b);
1080         }
1081 
1082         goto frame_done;
1083     }
1084 
1085     return 0;
1086 
1087 frame_done:
1088     re->memblock = NULL;
1089     re->packet = NULL;
1090     re->index = 0;
1091     re->data = NULL;
1092 
1093 #ifdef HAVE_CREDS
1094     /* FIXME: Close received ancillary data fds if the pstream's
1095      * receive_packet_callback did not do so.
1096      *
1097      * Malicious clients can attach fds to unknown commands, or attach them
1098      * to commands that does not expect fds. By doing so, server will reach
1099      * its open fd limit and future clients' SHM transfers will always fail.
1100      */
1101     p->read_ancil_data.creds_valid = false;
1102     p->read_ancil_data.nfd = 0;
1103 #endif
1104 
1105     return 0;
1106 
1107 fail:
1108     if (release_memblock)
1109         pa_memblock_release(release_memblock);
1110 
1111     return -1;
1112 }
1113 
pa_pstream_set_die_callback(pa_pstream * p,pa_pstream_notify_cb_t cb,void * userdata)1114 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
1115     pa_assert(p);
1116     pa_assert(PA_REFCNT_VALUE(p) > 0);
1117 
1118     p->die_callback = cb;
1119     p->die_callback_userdata = userdata;
1120 }
1121 
pa_pstream_set_drain_callback(pa_pstream * p,pa_pstream_notify_cb_t cb,void * userdata)1122 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
1123     pa_assert(p);
1124     pa_assert(PA_REFCNT_VALUE(p) > 0);
1125 
1126     p->drain_callback = cb;
1127     p->drain_callback_userdata = userdata;
1128 }
1129 
pa_pstream_set_receive_packet_callback(pa_pstream * p,pa_pstream_packet_cb_t cb,void * userdata)1130 void pa_pstream_set_receive_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
1131     pa_assert(p);
1132     pa_assert(PA_REFCNT_VALUE(p) > 0);
1133 
1134     p->receive_packet_callback = cb;
1135     p->receive_packet_callback_userdata = userdata;
1136 }
1137 
pa_pstream_set_receive_memblock_callback(pa_pstream * p,pa_pstream_memblock_cb_t cb,void * userdata)1138 void pa_pstream_set_receive_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
1139     pa_assert(p);
1140     pa_assert(PA_REFCNT_VALUE(p) > 0);
1141 
1142     p->receive_memblock_callback = cb;
1143     p->receive_memblock_callback_userdata = userdata;
1144 }
1145 
pa_pstream_set_release_callback(pa_pstream * p,pa_pstream_block_id_cb_t cb,void * userdata)1146 void pa_pstream_set_release_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
1147     pa_assert(p);
1148     pa_assert(PA_REFCNT_VALUE(p) > 0);
1149 
1150     p->release_callback = cb;
1151     p->release_callback_userdata = userdata;
1152 }
1153 
pa_pstream_set_revoke_callback(pa_pstream * p,pa_pstream_block_id_cb_t cb,void * userdata)1154 void pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
1155     pa_assert(p);
1156     pa_assert(PA_REFCNT_VALUE(p) > 0);
1157 
1158     p->revoke_callback = cb;
1159     p->revoke_callback_userdata = userdata;
1160 }
1161 
pa_pstream_is_pending(pa_pstream * p)1162 bool pa_pstream_is_pending(pa_pstream *p) {
1163     bool b;
1164 
1165     pa_assert(p);
1166     pa_assert(PA_REFCNT_VALUE(p) > 0);
1167 
1168     if (p->dead)
1169         b = false;
1170     else
1171         b = p->write.current || !pa_queue_isempty(p->send_queue);
1172 
1173     return b;
1174 }
1175 
pa_pstream_unref(pa_pstream * p)1176 void pa_pstream_unref(pa_pstream*p) {
1177     pa_assert(p);
1178     pa_assert(PA_REFCNT_VALUE(p) > 0);
1179 
1180     if (PA_REFCNT_DEC(p) <= 0)
1181         pstream_free(p);
1182 }
1183 
pa_pstream_ref(pa_pstream * p)1184 pa_pstream* pa_pstream_ref(pa_pstream*p) {
1185     pa_assert(p);
1186     pa_assert(PA_REFCNT_VALUE(p) > 0);
1187 
1188     PA_REFCNT_INC(p);
1189     return p;
1190 }
1191 
pa_pstream_unlink(pa_pstream * p)1192 void pa_pstream_unlink(pa_pstream *p) {
1193     pa_assert(p);
1194 
1195     if (p->dead)
1196         return;
1197 
1198     p->dead = true;
1199 
1200     while (p->srb || p->is_srbpending) /* In theory there could be one active and one pending */
1201         pa_pstream_set_srbchannel(p, NULL);
1202 
1203     if (p->import) {
1204         pa_memimport_free(p->import);
1205         p->import = NULL;
1206     }
1207 
1208     if (p->export) {
1209         pa_memexport_free(p->export);
1210         p->export = NULL;
1211     }
1212 
1213     if (p->io) {
1214         pa_iochannel_free(p->io);
1215         p->io = NULL;
1216     }
1217 
1218     if (p->defer_event) {
1219         p->mainloop->defer_free(p->defer_event);
1220         p->defer_event = NULL;
1221     }
1222 
1223     p->die_callback = NULL;
1224     p->drain_callback = NULL;
1225     p->receive_packet_callback = NULL;
1226     p->receive_memblock_callback = NULL;
1227 }
1228 
pa_pstream_enable_shm(pa_pstream * p,bool enable)1229 void pa_pstream_enable_shm(pa_pstream *p, bool enable) {
1230     pa_assert(p);
1231     pa_assert(PA_REFCNT_VALUE(p) > 0);
1232 
1233     p->use_shm = enable;
1234 
1235     if (enable) {
1236 
1237         if (!p->export)
1238             p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
1239 
1240     } else {
1241 
1242         if (p->export) {
1243             pa_memexport_free(p->export);
1244             p->export = NULL;
1245         }
1246     }
1247 }
1248 
pa_pstream_enable_memfd(pa_pstream * p)1249 void pa_pstream_enable_memfd(pa_pstream *p) {
1250     pa_assert(p);
1251     pa_assert(PA_REFCNT_VALUE(p) > 0);
1252     pa_assert(p->use_shm);
1253 
1254     p->use_memfd = true;
1255 
1256     if (!p->registered_memfd_ids) {
1257         p->registered_memfd_ids = pa_idxset_new(NULL, NULL);
1258     }
1259 }
1260 
pa_pstream_get_shm(pa_pstream * p)1261 bool pa_pstream_get_shm(pa_pstream *p) {
1262     pa_assert(p);
1263     pa_assert(PA_REFCNT_VALUE(p) > 0);
1264 
1265     return p->use_shm;
1266 }
1267 
pa_pstream_get_memfd(pa_pstream * p)1268 bool pa_pstream_get_memfd(pa_pstream *p) {
1269     pa_assert(p);
1270     pa_assert(PA_REFCNT_VALUE(p) > 0);
1271 
1272     return p->use_memfd;
1273 }
1274 
pa_pstream_set_srbchannel(pa_pstream * p,pa_srbchannel * srb)1275 void pa_pstream_set_srbchannel(pa_pstream *p, pa_srbchannel *srb) {
1276     pa_assert(p);
1277     pa_assert(PA_REFCNT_VALUE(p) > 0 || srb == NULL);
1278 
1279     if (srb == p->srb)
1280         return;
1281 
1282     /* We can't handle quick switches between srbchannels. */
1283     pa_assert(!p->is_srbpending);
1284 
1285     p->srbpending = srb;
1286     p->is_srbpending = true;
1287 
1288     /* Switch immediately, if possible. */
1289     if (p->dead)
1290         check_srbpending(p);
1291     else
1292         do_write(p);
1293 }
1294