• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /***
2   This file is part of PulseAudio.
3 
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6 
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as
9   published by the Free Software Foundation; either version 2.1 of the
10   License, or (at your option) any later version.
11 
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   Lesser General Public License for more details.
16 
17   You should have received a copy of the GNU Lesser General Public
18   License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20 
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24 
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <unistd.h>
28 
29 #ifdef HAVE_NETINET_IN_H
30 #include <netinet/in.h>
31 #endif
32 
33 #include <pulse/xmalloc.h>
34 
35 #include <pulsecore/idxset.h>
36 #include <pulsecore/socket.h>
37 #include <pulsecore/queue.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/creds.h>
40 #include <pulsecore/refcnt.h>
41 #include <pulsecore/flist.h>
42 #include <pulsecore/macro.h>
43 
44 #include "pstream.h"
45 
46 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
47 #define PA_FLAG_SHMDATA     0x80000000LU
48 #define PA_FLAG_SHMDATA_MEMFD_BLOCK         0x20000000LU
49 #define PA_FLAG_SHMRELEASE  0x40000000LU
50 #define PA_FLAG_SHMREVOKE   0xC0000000LU
51 #define PA_FLAG_SHMMASK     0xFF000000LU
52 #define PA_FLAG_SEEKMASK    0x000000FFLU
53 #define PA_FLAG_SHMWRITABLE 0x00800000LU
54 
55 /* The sequence descriptor header consists of 5 32bit integers: */
56 enum {
57     PA_PSTREAM_DESCRIPTOR_LENGTH,
58     PA_PSTREAM_DESCRIPTOR_CHANNEL,
59     PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
60     PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
61     PA_PSTREAM_DESCRIPTOR_FLAGS,
62     PA_PSTREAM_DESCRIPTOR_MAX
63 };
64 
65 /* If we have an SHM block, this info follows the descriptor */
66 enum {
67     PA_PSTREAM_SHM_BLOCKID,
68     PA_PSTREAM_SHM_SHMID,
69     PA_PSTREAM_SHM_INDEX,
70     PA_PSTREAM_SHM_LENGTH,
71     PA_PSTREAM_SHM_MAX
72 };
73 
74 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
75 
76 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
77 
78 #define MINIBUF_SIZE (256)
79 
80 /* To allow uploading a single sample in one frame, this value should be the
81  * same size (16 MB) as PA_SCACHE_ENTRY_SIZE_MAX from pulsecore/core-scache.h.
82  */
83 #define FRAME_SIZE_MAX_ALLOW (1024*1024*16)
84 
85 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
86 
87 struct item_info {
88     enum {
89         PA_PSTREAM_ITEM_PACKET,
90         PA_PSTREAM_ITEM_MEMBLOCK,
91         PA_PSTREAM_ITEM_SHMRELEASE,
92         PA_PSTREAM_ITEM_SHMREVOKE
93     } type;
94 
95     /* packet info */
96     pa_packet *packet;
97 #ifdef HAVE_CREDS
98     bool with_ancil_data;
99     pa_cmsg_ancil_data ancil_data;
100 #endif
101 
102     /* memblock info */
103     pa_memchunk chunk;
104     uint32_t channel;
105     int64_t offset;
106     pa_seek_mode_t seek_mode;
107 
108     /* release/revoke info */
109     uint32_t block_id;
110 };
111 
112 struct pstream_read {
113     pa_pstream_descriptor descriptor;
114     pa_memblock *memblock;
115     pa_packet *packet;
116     uint32_t shm_info[PA_PSTREAM_SHM_MAX];
117     void *data;
118     size_t index;
119 };
120 
121 struct pa_pstream {
122     PA_REFCNT_DECLARE;
123 
124     pa_mainloop_api *mainloop;
125     pa_defer_event *defer_event;
126     pa_iochannel *io;
127     pa_srbchannel *srb, *srbpending;
128     bool is_srbpending;
129 
130     pa_queue *send_queue;
131 
132     bool dead;
133 
134     struct {
135         union {
136             uint8_t minibuf[MINIBUF_SIZE];
137             pa_pstream_descriptor descriptor;
138         };
139         struct item_info* current;
140         void *data;
141         size_t index;
142         int minibuf_validsize;
143         pa_memchunk memchunk;
144     } write;
145 
146     struct pstream_read readio, readsrb;
147 
148     /* @use_shm: beside copying the full audio data to the other
149      * PA end, this pipe supports just sending references of the
150      * same audio data blocks if they reside in a SHM pool.
151      *
152      * @use_memfd: pipe supports sending SHM memfd block references
153      *
154      * @registered_memfd_ids: registered memfd pools SHM IDs. Check
155      * pa_pstream_register_memfd_mempool() for more information. */
156     bool use_shm, use_memfd;
157     pa_idxset *registered_memfd_ids;
158 
159     pa_memimport *import;
160     pa_memexport *export;
161 
162     pa_pstream_packet_cb_t receive_packet_callback;
163     void *receive_packet_callback_userdata;
164 
165     pa_pstream_memblock_cb_t receive_memblock_callback;
166     void *receive_memblock_callback_userdata;
167 
168     pa_pstream_notify_cb_t drain_callback;
169     void *drain_callback_userdata;
170 
171     pa_pstream_notify_cb_t die_callback;
172     void *die_callback_userdata;
173 
174     pa_pstream_block_id_cb_t revoke_callback;
175     void *revoke_callback_userdata;
176 
177     pa_pstream_block_id_cb_t release_callback;
178     void *release_callback_userdata;
179 
180     pa_mempool *mempool;
181 
182 #ifdef HAVE_CREDS
183     pa_cmsg_ancil_data read_ancil_data, *write_ancil_data;
184     bool send_ancil_data_now;
185 #endif
186 };
187 
188 #ifdef HAVE_CREDS
189 /*
190  * memfd-backed SHM pools blocks transfer occur without passing the pool's
191  * fd every time, thus minimizing overhead and avoiding fd leaks. A
192  * REGISTER_MEMFD_SHMID command is sent, with the pool's memfd fd, very early
193  * on. This command has an ID that uniquely identifies the pool in question.
194  * Further pool's block references can then be exclusively done using such ID;
195  * the fd can be safely closed – on both ends – afterwards.
196  *
197  * On the sending side of this command, we want to close the passed fds
198  * directly after being sent. Meanwhile we're only allowed to asynchronously
199  * schedule packet writes to the pstream, so the job of closing passed fds is
200  * left to the pstream's actual writing function do_write(): it knows the
201  * exact point in time where the fds are passed to the other end through
202  * iochannels and the sendmsg() system call.
203  *
204  * Nonetheless not all code paths in the system desire their socket-passed
205  * fds to be closed after the send. srbchannel needs the passed fds to still
206  * be open for further communication. System-wide global memfd-backed pools
207  * also require the passed fd to be open: they pass the same fd, with the same
208  * ID registration mechanism, for each newly connected client to the system.
209  *
210  * So from all of the above, never close the ancillary fds by your own and
211  * always call below method instead. It takes care of closing the passed fds
212  * _only if allowed_ by the code paths that originally created them to do so.
213  * Moreover, it is multiple-invocations safe: failure handlers can, and
214  * should, call it for passed fds cleanup without worrying too much about
215  * the system state.
216  */
pa_cmsg_ancil_data_close_fds(struct pa_cmsg_ancil_data * ancil)217 void pa_cmsg_ancil_data_close_fds(struct pa_cmsg_ancil_data *ancil) {
218     if (ancil && ancil->nfd > 0 && ancil->close_fds_on_cleanup) {
219         int i;
220 
221         pa_assert(ancil->nfd <= MAX_ANCIL_DATA_FDS);
222 
223         for (i = 0; i < ancil->nfd; i++)
224             if (ancil->fds[i] != -1) {
225                 pa_assert_se(pa_close(ancil->fds[i]) == 0);
226                 ancil->fds[i] = -1;
227             }
228 
229         ancil->nfd = 0;
230         ancil->close_fds_on_cleanup = false;
231     }
232 }
233 #endif
234 
235 static int do_write(pa_pstream *p);
236 static int do_read(pa_pstream *p, struct pstream_read *re);
237 
do_pstream_read_write(pa_pstream * p)238 static void do_pstream_read_write(pa_pstream *p) {
239     pa_assert(p);
240     pa_assert(PA_REFCNT_VALUE(p) > 0);
241 
242     pa_pstream_ref(p);
243 
244     p->mainloop->defer_enable(p->defer_event, 0);
245 
246     if (!p->dead && p->srb) {
247         int r = 0;
248 
249         if(do_write(p) < 0)
250             goto fail;
251 
252         while (!p->dead && r == 0) {
253             r = do_read(p, &p->readsrb);
254             if (r < 0)
255                 goto fail;
256         }
257     }
258 
259     if (!p->dead && pa_iochannel_is_readable(p->io)) {
260         if (do_read(p, &p->readio) < 0)
261             goto fail;
262     } else if (!p->dead && pa_iochannel_is_hungup(p->io))
263         goto fail;
264 
265     while (!p->dead && pa_iochannel_is_writable(p->io)) {
266         int r = do_write(p);
267         if (r < 0)
268             goto fail;
269         if (r == 0)
270             break;
271     }
272 
273     pa_pstream_unref(p);
274     return;
275 
276 fail:
277 
278     if (p->die_callback)
279         p->die_callback(p, p->die_callback_userdata);
280 
281     pa_pstream_unlink(p);
282     pa_pstream_unref(p);
283 }
284 
srb_callback(pa_srbchannel * srb,void * userdata)285 static bool srb_callback(pa_srbchannel *srb, void *userdata) {
286     bool b;
287     pa_pstream *p = userdata;
288 
289     pa_assert(p);
290     pa_assert(PA_REFCNT_VALUE(p) > 0);
291     pa_assert(p->srb == srb);
292 
293     pa_pstream_ref(p);
294 
295     do_pstream_read_write(p);
296 
297     /* If either pstream or the srb is going away, return false.
298        We need to check this before p is destroyed. */
299     b = (PA_REFCNT_VALUE(p) > 1) && (p->srb == srb);
300     pa_pstream_unref(p);
301 
302     return b;
303 }
304 
io_callback(pa_iochannel * io,void * userdata)305 static void io_callback(pa_iochannel*io, void *userdata) {
306     pa_pstream *p = userdata;
307 
308     pa_assert(p);
309     pa_assert(PA_REFCNT_VALUE(p) > 0);
310     pa_assert(p->io == io);
311 
312     do_pstream_read_write(p);
313 }
314 
defer_callback(pa_mainloop_api * m,pa_defer_event * e,void * userdata)315 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
316     pa_pstream *p = userdata;
317 
318     pa_assert(p);
319     pa_assert(PA_REFCNT_VALUE(p) > 0);
320     pa_assert(p->defer_event == e);
321     pa_assert(p->mainloop == m);
322 
323     do_pstream_read_write(p);
324 }
325 
326 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
327 
pa_pstream_new(pa_mainloop_api * m,pa_iochannel * io,pa_mempool * pool)328 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
329     pa_pstream *p;
330 
331     pa_assert(m);
332     pa_assert(io);
333     pa_assert(pool);
334 
335     p = pa_xnew0(pa_pstream, 1);
336     PA_REFCNT_INIT(p);
337     p->io = io;
338     pa_iochannel_set_callback(io, io_callback, p);
339 
340     p->mainloop = m;
341     p->defer_event = m->defer_new(m, defer_callback, p);
342     m->defer_enable(p->defer_event, 0);
343 
344     p->send_queue = pa_queue_new();
345 
346     p->mempool = pool;
347 
348     /* We do importing unconditionally */
349     p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
350 
351     pa_iochannel_socket_set_rcvbuf(io, pa_mempool_block_size_max(p->mempool));
352     pa_iochannel_socket_set_sndbuf(io, pa_mempool_block_size_max(p->mempool));
353 
354     return p;
355 }
356 
357 /* Attach memfd<->SHM_ID mapping to given pstream and its memimport.
358  * Check pa_pstream_register_memfd_mempool() for further info.
359  *
360  * Caller owns the passed @memfd_fd and must close it down when appropriate. */
pa_pstream_attach_memfd_shmid(pa_pstream * p,unsigned shm_id,int memfd_fd)361 int pa_pstream_attach_memfd_shmid(pa_pstream *p, unsigned shm_id, int memfd_fd) {
362     int err = -1;
363 
364     pa_assert(memfd_fd != -1);
365 
366     if (!p->use_memfd) {
367         pa_log_warn("Received memfd ID registration request over a pipe "
368                     "that does not support memfds");
369         return err;
370     }
371 
372     if (pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
373         pa_log_warn("previously registered memfd SHM ID = %u", shm_id);
374         return err;
375     }
376 
377     if (pa_memimport_attach_memfd(p->import, shm_id, memfd_fd, true)) {
378         pa_log("Failed to create permanent mapping for memfd region with ID = %u", shm_id);
379         return err;
380     }
381 
382     pa_assert_se(pa_idxset_put(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL) == 0);
383     return 0;
384 }
385 
item_free(void * item)386 static void item_free(void *item) {
387     struct item_info *i = item;
388     pa_assert(i);
389 
390     if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
391         pa_assert(i->chunk.memblock);
392         pa_memblock_unref(i->chunk.memblock);
393     } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
394         pa_assert(i->packet);
395         pa_packet_unref(i->packet);
396     }
397 
398 #ifdef HAVE_CREDS
399     /* On error recovery paths, there might be lingering items
400      * on the pstream send queue and they are usually freed with
401      * a call to 'pa_queue_free(p->send_queue, item_free)'. Make
402      * sure we do not leak any fds in that case! */
403     if (i->with_ancil_data)
404         pa_cmsg_ancil_data_close_fds(&i->ancil_data);
405 #endif
406 
407     if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
408         pa_xfree(i);
409 }
410 
pstream_free(pa_pstream * p)411 static void pstream_free(pa_pstream *p) {
412     pa_assert(p);
413 
414     pa_pstream_unlink(p);
415 
416     pa_queue_free(p->send_queue, item_free);
417 
418     if (p->write.current)
419         item_free(p->write.current);
420 
421     if (p->write.memchunk.memblock)
422         pa_memblock_unref(p->write.memchunk.memblock);
423 
424     if (p->readsrb.memblock)
425         pa_memblock_unref(p->readsrb.memblock);
426 
427     if (p->readsrb.packet)
428         pa_packet_unref(p->readsrb.packet);
429 
430     if (p->readio.memblock)
431         pa_memblock_unref(p->readio.memblock);
432 
433     if (p->readio.packet)
434         pa_packet_unref(p->readio.packet);
435 
436     if (p->registered_memfd_ids)
437         pa_idxset_free(p->registered_memfd_ids, NULL);
438 
439     pa_xfree(p);
440 }
441 
pa_pstream_send_packet(pa_pstream * p,pa_packet * packet,pa_cmsg_ancil_data * ancil_data)442 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, pa_cmsg_ancil_data *ancil_data) {
443     struct item_info *i;
444 
445     pa_assert(p);
446     pa_assert(PA_REFCNT_VALUE(p) > 0);
447     pa_assert(packet);
448 
449     if (p->dead) {
450 #ifdef HAVE_CREDS
451         pa_cmsg_ancil_data_close_fds(ancil_data);
452 #endif
453         return;
454     }
455 
456     if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
457         i = pa_xnew(struct item_info, 1);
458 
459     i->type = PA_PSTREAM_ITEM_PACKET;
460     i->packet = pa_packet_ref(packet);
461 
462 #ifdef HAVE_CREDS
463     if ((i->with_ancil_data = !!ancil_data)) {
464         i->ancil_data = *ancil_data;
465         if (ancil_data->creds_valid)
466             pa_assert(ancil_data->nfd == 0);
467         else
468             pa_assert(ancil_data->nfd > 0);
469     }
470 #endif
471 
472     pa_queue_push(p->send_queue, i);
473 
474     p->mainloop->defer_enable(p->defer_event, 1);
475 }
476 
pa_pstream_send_memblock(pa_pstream * p,uint32_t channel,int64_t offset,pa_seek_mode_t seek_mode,const pa_memchunk * chunk)477 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
478     size_t length, idx;
479     size_t bsm;
480 
481     pa_assert(p);
482     pa_assert(PA_REFCNT_VALUE(p) > 0);
483     pa_assert(channel != (uint32_t) -1);
484     pa_assert(chunk);
485 
486     if (p->dead)
487         return;
488 
489     idx = 0;
490     length = chunk->length;
491 
492     bsm = pa_mempool_block_size_max(p->mempool);
493 
494     while (length > 0) {
495         struct item_info *i;
496         size_t n;
497 
498         if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
499             i = pa_xnew(struct item_info, 1);
500         i->type = PA_PSTREAM_ITEM_MEMBLOCK;
501 
502         n = PA_MIN(length, bsm);
503         i->chunk.index = chunk->index + idx;
504         i->chunk.length = n;
505         i->chunk.memblock = pa_memblock_ref(chunk->memblock);
506 
507         i->channel = channel;
508         i->offset = offset;
509         i->seek_mode = seek_mode;
510 #ifdef HAVE_CREDS
511         i->with_ancil_data = false;
512 #endif
513 
514         pa_queue_push(p->send_queue, i);
515 
516         idx += n;
517         length -= n;
518     }
519 
520     p->mainloop->defer_enable(p->defer_event, 1);
521 }
522 
pa_pstream_send_release(pa_pstream * p,uint32_t block_id)523 void pa_pstream_send_release(pa_pstream *p, uint32_t block_id) {
524     struct item_info *item;
525     pa_assert(p);
526     pa_assert(PA_REFCNT_VALUE(p) > 0);
527 
528     if (p->dead)
529         return;
530 
531 /*     pa_log("Releasing block %u", block_id); */
532 
533     if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
534         item = pa_xnew(struct item_info, 1);
535     item->type = PA_PSTREAM_ITEM_SHMRELEASE;
536     item->block_id = block_id;
537 #ifdef HAVE_CREDS
538     item->with_ancil_data = false;
539 #endif
540 
541     pa_queue_push(p->send_queue, item);
542     p->mainloop->defer_enable(p->defer_event, 1);
543 }
544 
545 /* might be called from thread context */
memimport_release_cb(pa_memimport * i,uint32_t block_id,void * userdata)546 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
547     pa_pstream *p = userdata;
548 
549     pa_assert(p);
550     pa_assert(PA_REFCNT_VALUE(p) > 0);
551 
552     if (p->dead)
553         return;
554 
555     if (p->release_callback)
556         p->release_callback(p, block_id, p->release_callback_userdata);
557     else
558         pa_pstream_send_release(p, block_id);
559 }
560 
pa_pstream_send_revoke(pa_pstream * p,uint32_t block_id)561 void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id) {
562     struct item_info *item;
563     pa_assert(p);
564     pa_assert(PA_REFCNT_VALUE(p) > 0);
565 
566     if (p->dead)
567         return;
568 /*     pa_log("Revoking block %u", block_id); */
569 
570     if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
571         item = pa_xnew(struct item_info, 1);
572     item->type = PA_PSTREAM_ITEM_SHMREVOKE;
573     item->block_id = block_id;
574 #ifdef HAVE_CREDS
575     item->with_ancil_data = false;
576 #endif
577 
578     pa_queue_push(p->send_queue, item);
579     p->mainloop->defer_enable(p->defer_event, 1);
580 }
581 
582 /* might be called from thread context */
memexport_revoke_cb(pa_memexport * e,uint32_t block_id,void * userdata)583 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
584     pa_pstream *p = userdata;
585 
586     pa_assert(p);
587     pa_assert(PA_REFCNT_VALUE(p) > 0);
588 
589     if (p->revoke_callback)
590         p->revoke_callback(p, block_id, p->revoke_callback_userdata);
591     else
592         pa_pstream_send_revoke(p, block_id);
593 }
594 
prepare_next_write_item(pa_pstream * p)595 static void prepare_next_write_item(pa_pstream *p) {
596     pa_assert(p);
597     pa_assert(PA_REFCNT_VALUE(p) > 0);
598 
599     p->write.current = pa_queue_pop(p->send_queue);
600 
601     if (!p->write.current)
602         return;
603     p->write.index = 0;
604     p->write.data = NULL;
605     p->write.minibuf_validsize = 0;
606     pa_memchunk_reset(&p->write.memchunk);
607 
608     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
609     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
610     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
611     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
612     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
613 
614     if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
615         size_t plen;
616 
617         pa_assert(p->write.current->packet);
618 
619         p->write.data = (void *) pa_packet_data(p->write.current->packet, &plen);
620         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) plen);
621 
622         if (plen <= MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
623             memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, plen);
624             p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + plen;
625         }
626 
627     } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
628 
629         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
630         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
631 
632     } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
633 
634         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
635         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
636 
637     } else {
638         uint32_t flags;
639         bool send_payload = true;
640 
641         pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
642         pa_assert(p->write.current->chunk.memblock);
643 
644         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
645         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
646         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
647 
648         flags = (uint32_t) (p->write.current->seek_mode & PA_FLAG_SEEKMASK);
649 
650         if (p->use_shm) {
651             pa_mem_type_t type;
652             uint32_t block_id, shm_id;
653             size_t offset, length;
654             uint32_t *shm_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE];
655             size_t shm_size = sizeof(uint32_t) * PA_PSTREAM_SHM_MAX;
656             pa_mempool *current_pool = pa_memblock_get_pool(p->write.current->chunk.memblock);
657             pa_memexport *current_export;
658 
659             if (p->mempool == current_pool)
660                 pa_assert_se(current_export = p->export);
661             else
662                 pa_assert_se(current_export = pa_memexport_new(current_pool, memexport_revoke_cb, p));
663 
664             if (pa_memexport_put(current_export,
665                                  p->write.current->chunk.memblock,
666                                  &type,
667                                  &block_id,
668                                  &shm_id,
669                                  &offset,
670                                  &length) >= 0) {
671 
672                 if (type == PA_MEM_TYPE_SHARED_POSIX)
673                     send_payload = false;
674 
675                 if (type == PA_MEM_TYPE_SHARED_MEMFD && p->use_memfd) {
676                     if (pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
677                         flags |= PA_FLAG_SHMDATA_MEMFD_BLOCK;
678                         send_payload = false;
679                     } else {
680                         if (pa_log_ratelimit(PA_LOG_ERROR)) {
681                             pa_log("Cannot send block reference with non-registered memfd ID = %u", shm_id);
682                             pa_log("Fallig back to copying full block data over socket");
683                         }
684                     }
685                 }
686 
687                 if (send_payload) {
688                     pa_assert_se(pa_memexport_process_release(current_export, block_id) == 0);
689                 } else {
690                     flags |= PA_FLAG_SHMDATA;
691                     if (pa_mempool_is_remote_writable(current_pool))
692                         flags |= PA_FLAG_SHMWRITABLE;
693 
694                     shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
695                     shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
696                     shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
697                     shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
698 
699                     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(shm_size);
700                     p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + shm_size;
701                 }
702             }
703 /*             else */
704 /*                 FIXME: Avoid memexport slot leaks. Call pa_memexport_process_release() */
705 /*                 pa_log_warn("Failed to export memory block."); */
706 
707             if (current_export != p->export)
708                 pa_memexport_free(current_export);
709             pa_mempool_unref(current_pool);
710         }
711 
712         if (send_payload) {
713             p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
714             p->write.memchunk = p->write.current->chunk;
715             pa_memblock_ref(p->write.memchunk.memblock);
716         }
717 
718         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
719     }
720 
721 #ifdef HAVE_CREDS
722     if ((p->send_ancil_data_now = p->write.current->with_ancil_data))
723         p->write_ancil_data = &p->write.current->ancil_data;
724 #endif
725 }
726 
check_srbpending(pa_pstream * p)727 static void check_srbpending(pa_pstream *p) {
728     if (!p->is_srbpending)
729         return;
730 
731     if (p->srb)
732         pa_srbchannel_free(p->srb);
733 
734     p->srb = p->srbpending;
735     p->is_srbpending = false;
736 
737     if (p->srb)
738         pa_srbchannel_set_callback(p->srb, srb_callback, p);
739 }
740 
do_write(pa_pstream * p)741 static int do_write(pa_pstream *p) {
742     void *d;
743     size_t l;
744     ssize_t r;
745     pa_memblock *release_memblock = NULL;
746 
747     pa_assert(p);
748     pa_assert(PA_REFCNT_VALUE(p) > 0);
749 
750     if (!p->write.current)
751         prepare_next_write_item(p);
752 
753     if (!p->write.current) {
754         /* The out queue is empty, so switching channels is safe */
755         check_srbpending(p);
756         return 0;
757     }
758 
759     if (p->write.minibuf_validsize > 0) {
760         d = p->write.minibuf + p->write.index;
761         l = p->write.minibuf_validsize - p->write.index;
762     } else if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
763         d = (uint8_t*) p->write.descriptor + p->write.index;
764         l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
765     } else {
766         pa_assert(p->write.data || p->write.memchunk.memblock);
767 
768         if (p->write.data)
769             d = p->write.data;
770         else {
771             d = pa_memblock_acquire_chunk(&p->write.memchunk);
772             release_memblock = p->write.memchunk.memblock;
773         }
774 
775         d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
776         l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
777     }
778 
779     pa_assert(l > 0);
780 
781 #ifdef HAVE_CREDS
782     if (p->send_ancil_data_now) {
783         if (p->write_ancil_data->creds_valid) {
784             pa_assert(p->write_ancil_data->nfd == 0);
785             if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_ancil_data->creds)) < 0)
786                 goto fail;
787         }
788         else
789             if ((r = pa_iochannel_write_with_fds(p->io, d, l, p->write_ancil_data->nfd, p->write_ancil_data->fds)) < 0)
790                 goto fail;
791 
792         pa_cmsg_ancil_data_close_fds(p->write_ancil_data);
793         p->send_ancil_data_now = false;
794     } else
795 #endif
796     if (p->srb)
797         r = pa_srbchannel_write(p->srb, d, l);
798     else if ((r = pa_iochannel_write(p->io, d, l)) < 0)
799         goto fail;
800 
801     if (release_memblock)
802         pa_memblock_release(release_memblock);
803 
804     p->write.index += (size_t) r;
805 
806     if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
807         pa_assert(p->write.current);
808         item_free(p->write.current);
809         p->write.current = NULL;
810 
811         if (p->write.memchunk.memblock)
812             pa_memblock_unref(p->write.memchunk.memblock);
813 
814         pa_memchunk_reset(&p->write.memchunk);
815 
816         if (p->drain_callback && !pa_pstream_is_pending(p))
817             p->drain_callback(p, p->drain_callback_userdata);
818     }
819 
820     return (size_t) r == l ? 1 : 0;
821 
822 fail:
823 #ifdef HAVE_CREDS
824     if (p->send_ancil_data_now)
825         pa_cmsg_ancil_data_close_fds(p->write_ancil_data);
826 #endif
827 
828     if (release_memblock)
829         pa_memblock_release(release_memblock);
830 
831     return -1;
832 }
833 
memblock_complete(pa_pstream * p,struct pstream_read * re)834 static void memblock_complete(pa_pstream *p, struct pstream_read *re) {
835     pa_memchunk chunk;
836     int64_t offset;
837 
838     if (!p->receive_memblock_callback)
839         return;
840 
841     chunk.memblock = re->memblock;
842     chunk.index = 0;
843     chunk.length = re->index - PA_PSTREAM_DESCRIPTOR_SIZE;
844 
845     offset = (int64_t) (
846              (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
847              (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
848 
849     p->receive_memblock_callback(
850         p,
851         ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
852         offset,
853         ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
854         &chunk,
855         p->receive_memblock_callback_userdata);
856 }
857 
do_read(pa_pstream * p,struct pstream_read * re)858 static int do_read(pa_pstream *p, struct pstream_read *re) {
859     void *d;
860     size_t l;
861     ssize_t r;
862     pa_memblock *release_memblock = NULL;
863     pa_assert(p);
864     pa_assert(PA_REFCNT_VALUE(p) > 0);
865 
866     if (re->index < PA_PSTREAM_DESCRIPTOR_SIZE) {
867         d = (uint8_t*) re->descriptor + re->index;
868         l = PA_PSTREAM_DESCRIPTOR_SIZE - re->index;
869     } else {
870         pa_assert(re->data || re->memblock);
871 
872         if (re->data)
873             d = re->data;
874         else {
875             d = pa_memblock_acquire(re->memblock);
876             release_memblock = re->memblock;
877         }
878 
879         d = (uint8_t*) d + re->index - PA_PSTREAM_DESCRIPTOR_SIZE;
880         l = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (re->index - PA_PSTREAM_DESCRIPTOR_SIZE);
881     }
882 
883     if (re == &p->readsrb) {
884         r = pa_srbchannel_read(p->srb, d, l);
885         if (r == 0) {
886             if (release_memblock)
887                 pa_memblock_release(release_memblock);
888             return 1;
889         }
890     }
891     else
892 #ifdef HAVE_CREDS
893     {
894         pa_cmsg_ancil_data b;
895 
896         if ((r = pa_iochannel_read_with_ancil_data(p->io, d, l, &b)) <= 0)
897             goto fail;
898 
899         if (b.creds_valid) {
900             p->read_ancil_data.creds_valid = true;
901             p->read_ancil_data.creds = b.creds;
902         }
903         if (b.nfd > 0) {
904             pa_assert(b.nfd <= MAX_ANCIL_DATA_FDS);
905             p->read_ancil_data.nfd = b.nfd;
906             memcpy(p->read_ancil_data.fds, b.fds, sizeof(int) * b.nfd);
907             p->read_ancil_data.close_fds_on_cleanup = b.close_fds_on_cleanup;
908         }
909     }
910 #else
911     if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
912         goto fail;
913 #endif
914 
915     if (release_memblock)
916         pa_memblock_release(release_memblock);
917 
918     re->index += (size_t) r;
919 
920     if (re->index == PA_PSTREAM_DESCRIPTOR_SIZE) {
921         uint32_t flags, length, channel;
922         /* Reading of frame descriptor complete */
923 
924         flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
925 
926         if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
927             pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
928             return -1;
929         }
930 
931         if (flags == PA_FLAG_SHMRELEASE) {
932 
933             /* This is a SHM memblock release frame with no payload */
934 
935 /*             pa_log("Got release frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
936 
937             pa_assert(p->export);
938             pa_memexport_process_release(p->export, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
939 
940             goto frame_done;
941 
942         } else if (flags == PA_FLAG_SHMREVOKE) {
943 
944             /* This is a SHM memblock revoke frame with no payload */
945 
946 /*             pa_log("Got revoke frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
947 
948             pa_assert(p->import);
949             pa_memimport_process_revoke(p->import, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
950 
951             goto frame_done;
952         }
953 
954         length = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
955 
956         if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
957             pa_log_warn("Received invalid frame size: %lu", (unsigned long) length);
958             return -1;
959         }
960 
961         pa_assert(!re->packet && !re->memblock);
962 
963         channel = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
964 
965         if (channel == (uint32_t) -1) {
966             size_t plen;
967 
968             if (flags != 0) {
969                 pa_log_warn("Received packet frame with invalid flags value.");
970                 return -1;
971             }
972 
973             /* Frame is a packet frame */
974             re->packet = pa_packet_new(length);
975             re->data = (void *) pa_packet_data(re->packet, &plen);
976 
977         } else {
978 
979             if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
980                 pa_log_warn("Received memblock frame with invalid seek mode.");
981                 return -1;
982             }
983 
984             if (((flags & PA_FLAG_SHMMASK) & PA_FLAG_SHMDATA) != 0) {
985 
986                 if (length != sizeof(re->shm_info)) {
987                     pa_log_warn("Received SHM memblock frame with invalid frame length.");
988                     return -1;
989                 }
990 
991                 /* Frame is a memblock frame referencing an SHM memblock */
992                 re->data = re->shm_info;
993 
994             } else if ((flags & PA_FLAG_SHMMASK) == 0) {
995 
996                 /* Frame is a memblock frame */
997 
998                 re->memblock = pa_memblock_new(p->mempool, length);
999                 re->data = NULL;
1000             } else {
1001 
1002                 pa_log_warn("Received memblock frame with invalid flags value.");
1003                 return -1;
1004             }
1005         }
1006 
1007     } else if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
1008         /* Frame complete */
1009 
1010         if (re->memblock) {
1011             memblock_complete(p, re);
1012 
1013             /* This was a memblock frame. We can unref the memblock now */
1014             pa_memblock_unref(re->memblock);
1015 
1016         } else if (re->packet) {
1017 
1018             if (p->receive_packet_callback)
1019 #ifdef HAVE_CREDS
1020                 p->receive_packet_callback(p, re->packet, &p->read_ancil_data, p->receive_packet_callback_userdata);
1021 #else
1022                 p->receive_packet_callback(p, re->packet, NULL, p->receive_packet_callback_userdata);
1023 #endif
1024 
1025             pa_packet_unref(re->packet);
1026         } else {
1027             pa_memblock *b = NULL;
1028             uint32_t flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
1029             uint32_t shm_id = ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]);
1030             pa_mem_type_t type = (flags & PA_FLAG_SHMDATA_MEMFD_BLOCK) ?
1031                                  PA_MEM_TYPE_SHARED_MEMFD : PA_MEM_TYPE_SHARED_POSIX;
1032 
1033             pa_assert(((flags & PA_FLAG_SHMMASK) & PA_FLAG_SHMDATA) != 0);
1034             pa_assert(p->import);
1035 
1036             if (type == PA_MEM_TYPE_SHARED_MEMFD && p->use_memfd &&
1037                 !pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
1038 
1039                 if (pa_log_ratelimit(PA_LOG_ERROR))
1040                     pa_log("Ignoring received block reference with non-registered memfd ID = %u", shm_id);
1041 
1042             } else if (!(b = pa_memimport_get(p->import,
1043                                               type,
1044                                               ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]),
1045                                               shm_id,
1046                                               ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]),
1047                                               ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]),
1048                                               !!(flags & PA_FLAG_SHMWRITABLE)))) {
1049 
1050                 if (pa_log_ratelimit(PA_LOG_DEBUG))
1051                     pa_log_debug("Failed to import memory block.");
1052             }
1053 
1054             if (p->receive_memblock_callback) {
1055                 int64_t offset;
1056                 pa_memchunk chunk;
1057 
1058                 chunk.memblock = b;
1059                 chunk.index = 0;
1060                 chunk.length = b ? pa_memblock_get_length(b) : ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]);
1061 
1062                 offset = (int64_t) (
1063                         (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
1064                         (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
1065 
1066                 p->receive_memblock_callback(
1067                         p,
1068                         ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
1069                         offset,
1070                         ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
1071                         &chunk,
1072                         p->receive_memblock_callback_userdata);
1073             }
1074 
1075             if (b)
1076                 pa_memblock_unref(b);
1077         }
1078 
1079         goto frame_done;
1080     }
1081 
1082     return 0;
1083 
1084 frame_done:
1085     re->memblock = NULL;
1086     re->packet = NULL;
1087     re->index = 0;
1088     re->data = NULL;
1089 
1090 #ifdef HAVE_CREDS
1091     /* FIXME: Close received ancillary data fds if the pstream's
1092      * receive_packet_callback did not do so.
1093      *
1094      * Malicious clients can attach fds to unknown commands, or attach them
1095      * to commands that does not expect fds. By doing so, server will reach
1096      * its open fd limit and future clients' SHM transfers will always fail.
1097      */
1098     p->read_ancil_data.creds_valid = false;
1099     p->read_ancil_data.nfd = 0;
1100 #endif
1101 
1102     return 0;
1103 
1104 fail:
1105     if (release_memblock)
1106         pa_memblock_release(release_memblock);
1107 
1108     return -1;
1109 }
1110 
pa_pstream_set_die_callback(pa_pstream * p,pa_pstream_notify_cb_t cb,void * userdata)1111 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
1112     pa_assert(p);
1113     pa_assert(PA_REFCNT_VALUE(p) > 0);
1114 
1115     p->die_callback = cb;
1116     p->die_callback_userdata = userdata;
1117 }
1118 
pa_pstream_set_drain_callback(pa_pstream * p,pa_pstream_notify_cb_t cb,void * userdata)1119 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
1120     pa_assert(p);
1121     pa_assert(PA_REFCNT_VALUE(p) > 0);
1122 
1123     p->drain_callback = cb;
1124     p->drain_callback_userdata = userdata;
1125 }
1126 
pa_pstream_set_receive_packet_callback(pa_pstream * p,pa_pstream_packet_cb_t cb,void * userdata)1127 void pa_pstream_set_receive_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
1128     pa_assert(p);
1129     pa_assert(PA_REFCNT_VALUE(p) > 0);
1130 
1131     p->receive_packet_callback = cb;
1132     p->receive_packet_callback_userdata = userdata;
1133 }
1134 
pa_pstream_set_receive_memblock_callback(pa_pstream * p,pa_pstream_memblock_cb_t cb,void * userdata)1135 void pa_pstream_set_receive_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
1136     pa_assert(p);
1137     pa_assert(PA_REFCNT_VALUE(p) > 0);
1138 
1139     p->receive_memblock_callback = cb;
1140     p->receive_memblock_callback_userdata = userdata;
1141 }
1142 
pa_pstream_set_release_callback(pa_pstream * p,pa_pstream_block_id_cb_t cb,void * userdata)1143 void pa_pstream_set_release_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
1144     pa_assert(p);
1145     pa_assert(PA_REFCNT_VALUE(p) > 0);
1146 
1147     p->release_callback = cb;
1148     p->release_callback_userdata = userdata;
1149 }
1150 
pa_pstream_set_revoke_callback(pa_pstream * p,pa_pstream_block_id_cb_t cb,void * userdata)1151 void pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
1152     pa_assert(p);
1153     pa_assert(PA_REFCNT_VALUE(p) > 0);
1154 
1155     p->revoke_callback = cb;
1156     p->revoke_callback_userdata = userdata;
1157 }
1158 
pa_pstream_is_pending(pa_pstream * p)1159 bool pa_pstream_is_pending(pa_pstream *p) {
1160     bool b;
1161 
1162     pa_assert(p);
1163     pa_assert(PA_REFCNT_VALUE(p) > 0);
1164 
1165     if (p->dead)
1166         b = false;
1167     else
1168         b = p->write.current || !pa_queue_isempty(p->send_queue);
1169 
1170     return b;
1171 }
1172 
pa_pstream_unref(pa_pstream * p)1173 void pa_pstream_unref(pa_pstream*p) {
1174     pa_assert(p);
1175     pa_assert(PA_REFCNT_VALUE(p) > 0);
1176 
1177     if (PA_REFCNT_DEC(p) <= 0)
1178         pstream_free(p);
1179 }
1180 
pa_pstream_ref(pa_pstream * p)1181 pa_pstream* pa_pstream_ref(pa_pstream*p) {
1182     pa_assert(p);
1183     pa_assert(PA_REFCNT_VALUE(p) > 0);
1184 
1185     PA_REFCNT_INC(p);
1186     return p;
1187 }
1188 
pa_pstream_unlink(pa_pstream * p)1189 void pa_pstream_unlink(pa_pstream *p) {
1190     pa_assert(p);
1191 
1192     if (p->dead)
1193         return;
1194 
1195     p->dead = true;
1196 
1197     while (p->srb || p->is_srbpending) /* In theory there could be one active and one pending */
1198         pa_pstream_set_srbchannel(p, NULL);
1199 
1200     if (p->import) {
1201         pa_memimport_free(p->import);
1202         p->import = NULL;
1203     }
1204 
1205     if (p->export) {
1206         pa_memexport_free(p->export);
1207         p->export = NULL;
1208     }
1209 
1210     if (p->io) {
1211         pa_iochannel_free(p->io);
1212         p->io = NULL;
1213     }
1214 
1215     if (p->defer_event) {
1216         p->mainloop->defer_free(p->defer_event);
1217         p->defer_event = NULL;
1218     }
1219 
1220     p->die_callback = NULL;
1221     p->drain_callback = NULL;
1222     p->receive_packet_callback = NULL;
1223     p->receive_memblock_callback = NULL;
1224 }
1225 
pa_pstream_enable_shm(pa_pstream * p,bool enable)1226 void pa_pstream_enable_shm(pa_pstream *p, bool enable) {
1227     pa_assert(p);
1228     pa_assert(PA_REFCNT_VALUE(p) > 0);
1229 
1230     p->use_shm = enable;
1231 
1232     if (enable) {
1233 
1234         if (!p->export)
1235             p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
1236 
1237     } else {
1238 
1239         if (p->export) {
1240             pa_memexport_free(p->export);
1241             p->export = NULL;
1242         }
1243     }
1244 }
1245 
pa_pstream_enable_memfd(pa_pstream * p)1246 void pa_pstream_enable_memfd(pa_pstream *p) {
1247     pa_assert(p);
1248     pa_assert(PA_REFCNT_VALUE(p) > 0);
1249     pa_assert(p->use_shm);
1250 
1251     p->use_memfd = true;
1252 
1253     if (!p->registered_memfd_ids) {
1254         p->registered_memfd_ids = pa_idxset_new(NULL, NULL);
1255     }
1256 }
1257 
pa_pstream_get_shm(pa_pstream * p)1258 bool pa_pstream_get_shm(pa_pstream *p) {
1259     pa_assert(p);
1260     pa_assert(PA_REFCNT_VALUE(p) > 0);
1261 
1262     return p->use_shm;
1263 }
1264 
pa_pstream_get_memfd(pa_pstream * p)1265 bool pa_pstream_get_memfd(pa_pstream *p) {
1266     pa_assert(p);
1267     pa_assert(PA_REFCNT_VALUE(p) > 0);
1268 
1269     return p->use_memfd;
1270 }
1271 
pa_pstream_set_srbchannel(pa_pstream * p,pa_srbchannel * srb)1272 void pa_pstream_set_srbchannel(pa_pstream *p, pa_srbchannel *srb) {
1273     pa_assert(p);
1274     pa_assert(PA_REFCNT_VALUE(p) > 0 || srb == NULL);
1275 
1276     if (srb == p->srb)
1277         return;
1278 
1279     /* We can't handle quick switches between srbchannels. */
1280     pa_assert(!p->is_srbpending);
1281 
1282     p->srbpending = srb;
1283     p->is_srbpending = true;
1284 
1285     /* Switch immediately, if possible. */
1286     if (p->dead)
1287         check_srbpending(p);
1288     else
1289         do_write(p);
1290 }
1291