• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /***
2   This file is part of PulseAudio.
3 
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6 
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as
9   published by the Free Software Foundation; either version 2.1 of the
10   License, or (at your option) any later version.
11 
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   Lesser General Public License for more details.
16 
17   You should have received a copy of the GNU Lesser General Public
18   License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20 
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24 
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <unistd.h>
28 
29 #ifdef HAVE_NETINET_IN_H
30 #include <netinet/in.h>
31 #endif
32 
33 #include <pulse/xmalloc.h>
34 
35 #include <pulsecore/idxset.h>
36 #include <pulsecore/socket.h>
37 #include <pulsecore/queue.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/creds.h>
40 #include <pulsecore/refcnt.h>
41 #include <pulsecore/flist.h>
42 #include <pulsecore/macro.h>
43 
44 #include "pstream.h"
45 #include "log/audio_log.h"
46 
47 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
48 #define PA_FLAG_SHMDATA     0x80000000LU
49 #define PA_FLAG_SHMDATA_MEMFD_BLOCK         0x20000000LU
50 #define PA_FLAG_SHMRELEASE  0x40000000LU
51 #define PA_FLAG_SHMREVOKE   0xC0000000LU
52 #define PA_FLAG_SHMMASK     0xFF000000LU
53 #define PA_FLAG_SEEKMASK    0x000000FFLU
54 #define PA_FLAG_SHMWRITABLE 0x00800000LU
55 
56 /* The sequence descriptor header consists of 5 32bit integers: */
57 enum {
58     PA_PSTREAM_DESCRIPTOR_LENGTH,
59     PA_PSTREAM_DESCRIPTOR_CHANNEL,
60     PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
61     PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
62     PA_PSTREAM_DESCRIPTOR_FLAGS,
63     PA_PSTREAM_DESCRIPTOR_MAX
64 };
65 
66 /* If we have an SHM block, this info follows the descriptor */
67 enum {
68     PA_PSTREAM_SHM_BLOCKID,
69     PA_PSTREAM_SHM_SHMID,
70     PA_PSTREAM_SHM_INDEX,
71     PA_PSTREAM_SHM_LENGTH,
72     PA_PSTREAM_SHM_MAX
73 };
74 
75 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
76 
77 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
78 
79 #define MINIBUF_SIZE (256)
80 
81 /* To allow uploading a single sample in one frame, this value should be the
82  * same size (16 MB) as PA_SCACHE_ENTRY_SIZE_MAX from pulsecore/core-scache.h.
83  */
84 #define FRAME_SIZE_MAX_ALLOW (1024*1024*16)
85 
86 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
87 
88 struct item_info {
89     enum {
90         PA_PSTREAM_ITEM_PACKET,
91         PA_PSTREAM_ITEM_MEMBLOCK,
92         PA_PSTREAM_ITEM_SHMRELEASE,
93         PA_PSTREAM_ITEM_SHMREVOKE
94     } type;
95 
96     /* packet info */
97     pa_packet *packet;
98 #ifdef HAVE_CREDS
99     bool with_ancil_data;
100     pa_cmsg_ancil_data ancil_data;
101 #endif
102 
103     /* memblock info */
104     pa_memchunk chunk;
105     uint32_t channel;
106     int64_t offset;
107     pa_seek_mode_t seek_mode;
108 
109     /* release/revoke info */
110     uint32_t block_id;
111 };
112 
113 struct pstream_read {
114     pa_pstream_descriptor descriptor;
115     pa_memblock *memblock;
116     pa_packet *packet;
117     uint32_t shm_info[PA_PSTREAM_SHM_MAX];
118     void *data;
119     size_t index;
120 };
121 
122 struct pa_pstream {
123     PA_REFCNT_DECLARE;
124 
125     pa_mainloop_api *mainloop;
126     pa_defer_event *defer_event;
127     pa_iochannel *io;
128     pa_srbchannel *srb, *srbpending;
129     bool is_srbpending;
130 
131     pa_queue *send_queue;
132 
133     bool dead;
134 
135     struct {
136         union {
137             uint8_t minibuf[MINIBUF_SIZE];
138             pa_pstream_descriptor descriptor;
139         };
140         struct item_info* current;
141         void *data;
142         size_t index;
143         int minibuf_validsize;
144         pa_memchunk memchunk;
145     } write;
146 
147     struct pstream_read readio, readsrb;
148 
149     /* @use_shm: beside copying the full audio data to the other
150      * PA end, this pipe supports just sending references of the
151      * same audio data blocks if they reside in a SHM pool.
152      *
153      * @use_memfd: pipe supports sending SHM memfd block references
154      *
155      * @registered_memfd_ids: registered memfd pools SHM IDs. Check
156      * pa_pstream_register_memfd_mempool() for more information. */
157     bool use_shm, use_memfd;
158     bool non_registered_memfd_id_error_logged;
159     pa_idxset *registered_memfd_ids;
160 
161     pa_memimport *import;
162     pa_memexport *export;
163 
164     pa_pstream_packet_cb_t receive_packet_callback;
165     void *receive_packet_callback_userdata;
166 
167     pa_pstream_memblock_cb_t receive_memblock_callback;
168     void *receive_memblock_callback_userdata;
169 
170     pa_pstream_notify_cb_t drain_callback;
171     void *drain_callback_userdata;
172 
173     pa_pstream_notify_cb_t die_callback;
174     void *die_callback_userdata;
175 
176     pa_pstream_block_id_cb_t revoke_callback;
177     void *revoke_callback_userdata;
178 
179     pa_pstream_block_id_cb_t release_callback;
180     void *release_callback_userdata;
181 
182     pa_mempool *mempool;
183 
184 #ifdef HAVE_CREDS
185     pa_cmsg_ancil_data read_ancil_data, *write_ancil_data;
186     bool send_ancil_data_now;
187 #endif
188 };
189 
190 #ifdef HAVE_CREDS
191 /*
192  * memfd-backed SHM pools blocks transfer occur without passing the pool's
193  * fd every time, thus minimizing overhead and avoiding fd leaks. A
194  * REGISTER_MEMFD_SHMID command is sent, with the pool's memfd fd, very early
195  * on. This command has an ID that uniquely identifies the pool in question.
196  * Further pool's block references can then be exclusively done using such ID;
197  * the fd can be safely closed – on both ends – afterwards.
198  *
199  * On the sending side of this command, we want to close the passed fds
200  * directly after being sent. Meanwhile we're only allowed to asynchronously
201  * schedule packet writes to the pstream, so the job of closing passed fds is
202  * left to the pstream's actual writing function do_write(): it knows the
203  * exact point in time where the fds are passed to the other end through
204  * iochannels and the sendmsg() system call.
205  *
206  * Nonetheless not all code paths in the system desire their socket-passed
207  * fds to be closed after the send. srbchannel needs the passed fds to still
208  * be open for further communication. System-wide global memfd-backed pools
209  * also require the passed fd to be open: they pass the same fd, with the same
210  * ID registration mechanism, for each newly connected client to the system.
211  *
212  * So from all of the above, never close the ancillary fds by your own and
213  * always call below method instead. It takes care of closing the passed fds
214  * _only if allowed_ by the code paths that originally created them to do so.
215  * Moreover, it is multiple-invocations safe: failure handlers can, and
216  * should, call it for passed fds cleanup without worrying too much about
217  * the system state.
218  */
pa_cmsg_ancil_data_close_fds(struct pa_cmsg_ancil_data * ancil)219 void pa_cmsg_ancil_data_close_fds(struct pa_cmsg_ancil_data *ancil) {
220     if (ancil && ancil->nfd > 0 && ancil->close_fds_on_cleanup) {
221         int i;
222 
223         pa_assert(ancil->nfd <= MAX_ANCIL_DATA_FDS);
224 
225         for (i = 0; i < ancil->nfd; i++)
226             if (ancil->fds[i] != -1) {
227                 pa_assert_se(pa_close(ancil->fds[i]) == 0);
228                 ancil->fds[i] = -1;
229             }
230 
231         ancil->nfd = 0;
232         ancil->close_fds_on_cleanup = false;
233     }
234 }
235 #endif
236 
237 static int do_write(pa_pstream *p);
238 static int do_read(pa_pstream *p, struct pstream_read *re);
239 
do_pstream_read_write(pa_pstream * p)240 static void do_pstream_read_write(pa_pstream *p) {
241     pa_assert(p);
242     pa_assert(PA_REFCNT_VALUE(p) > 0);
243 
244     pa_pstream_ref(p);
245 
246     p->mainloop->defer_enable(p->defer_event, 0);
247 
248     if (!p->dead && p->srb) {
249         int r = 0;
250 
251         if(do_write(p) < 0)
252             goto fail;
253 
254         while (!p->dead && r == 0) {
255             r = do_read(p, &p->readsrb);
256             if (r < 0)
257                 goto fail;
258         }
259     }
260 
261     if (!p->dead && pa_iochannel_is_readable(p->io)) {
262         if (do_read(p, &p->readio) < 0)
263             goto fail;
264     } else if (!p->dead && pa_iochannel_is_hungup(p->io))
265         goto fail;
266 
267     while (!p->dead && pa_iochannel_is_writable(p->io)) {
268         int r = do_write(p);
269         if (r < 0)
270             goto fail;
271         if (r == 0)
272             break;
273     }
274 
275     pa_pstream_unref(p);
276     return;
277 
278 fail:
279 
280     if (p->die_callback)
281         p->die_callback(p, p->die_callback_userdata);
282 
283     pa_pstream_unlink(p);
284     pa_pstream_unref(p);
285 }
286 
srb_callback(pa_srbchannel * srb,void * userdata)287 static bool srb_callback(pa_srbchannel *srb, void *userdata) {
288     bool b;
289     pa_pstream *p = userdata;
290 
291     pa_assert(p);
292     pa_assert(PA_REFCNT_VALUE(p) > 0);
293     pa_assert(p->srb == srb);
294 
295     pa_pstream_ref(p);
296 
297     do_pstream_read_write(p);
298 
299     /* If either pstream or the srb is going away, return false.
300        We need to check this before p is destroyed. */
301     b = (PA_REFCNT_VALUE(p) > 1) && (p->srb == srb);
302     pa_pstream_unref(p);
303 
304     return b;
305 }
306 
io_callback(pa_iochannel * io,void * userdata)307 static void io_callback(pa_iochannel*io, void *userdata) {
308     pa_pstream *p = userdata;
309 
310     pa_assert(p);
311     pa_assert(PA_REFCNT_VALUE(p) > 0);
312     pa_assert(p->io == io);
313 
314     do_pstream_read_write(p);
315 }
316 
defer_callback(pa_mainloop_api * m,pa_defer_event * e,void * userdata)317 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
318     pa_pstream *p = userdata;
319 
320     pa_assert(p);
321     pa_assert(PA_REFCNT_VALUE(p) > 0);
322     pa_assert(p->defer_event == e);
323     pa_assert(p->mainloop == m);
324 
325     do_pstream_read_write(p);
326 }
327 
328 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
329 
pa_pstream_new(pa_mainloop_api * m,pa_iochannel * io,pa_mempool * pool)330 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
331     pa_pstream *p;
332 
333     pa_assert(m);
334     pa_assert(io);
335     pa_assert(pool);
336 
337     p = pa_xnew0(pa_pstream, 1);
338     PA_REFCNT_INIT(p);
339     p->io = io;
340     pa_iochannel_set_callback(io, io_callback, p);
341 
342     p->mainloop = m;
343     p->defer_event = m->defer_new(m, defer_callback, p);
344     m->defer_enable(p->defer_event, 0);
345 
346     p->send_queue = pa_queue_new();
347 
348     p->mempool = pool;
349 
350     /* We do importing unconditionally */
351     p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
352 
353     pa_iochannel_socket_set_rcvbuf(io, pa_mempool_block_size_max(p->mempool));
354     pa_iochannel_socket_set_sndbuf(io, pa_mempool_block_size_max(p->mempool));
355 
356     return p;
357 }
358 
359 /* Attach memfd<->SHM_ID mapping to given pstream and its memimport.
360  * Check pa_pstream_register_memfd_mempool() for further info.
361  *
362  * Caller owns the passed @memfd_fd and must close it down when appropriate. */
pa_pstream_attach_memfd_shmid(pa_pstream * p,unsigned shm_id,int memfd_fd)363 int pa_pstream_attach_memfd_shmid(pa_pstream *p, unsigned shm_id, int memfd_fd) {
364     int err = -1;
365 
366     pa_assert(memfd_fd != -1);
367 
368     if (!p->use_memfd) {
369         pa_log_warn("Received memfd ID registration request over a pipe "
370                     "that does not support memfds");
371         return err;
372     }
373 
374     if (pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
375         pa_log_warn("previously registered memfd SHM ID = %u", shm_id);
376         return err;
377     }
378 
379     if (pa_memimport_attach_memfd(p->import, shm_id, memfd_fd, true)) {
380         pa_log("Failed to create permanent mapping for memfd region with ID = %u", shm_id);
381         return err;
382     }
383 
384     pa_assert_se(pa_idxset_put(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL) == 0);
385     return 0;
386 }
387 
item_free(void * item)388 static void item_free(void *item) {
389     struct item_info *i = item;
390     pa_assert(i);
391 
392     if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
393         pa_assert(i->chunk.memblock);
394         pa_memblock_unref(i->chunk.memblock);
395     } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
396         pa_assert(i->packet);
397         pa_packet_unref(i->packet);
398     }
399 
400 #ifdef HAVE_CREDS
401     /* On error recovery paths, there might be lingering items
402      * on the pstream send queue and they are usually freed with
403      * a call to 'pa_queue_free(p->send_queue, item_free)'. Make
404      * sure we do not leak any fds in that case! */
405     if (i->with_ancil_data)
406         pa_cmsg_ancil_data_close_fds(&i->ancil_data);
407 #endif
408 
409     if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
410         pa_xfree(i);
411 }
412 
pstream_free(pa_pstream * p)413 static void pstream_free(pa_pstream *p) {
414     pa_assert(p);
415 
416     pa_pstream_unlink(p);
417 
418     pa_queue_free(p->send_queue, item_free);
419 
420     if (p->write.current)
421         item_free(p->write.current);
422 
423     if (p->write.memchunk.memblock)
424         pa_memblock_unref(p->write.memchunk.memblock);
425 
426     if (p->readsrb.memblock)
427         pa_memblock_unref(p->readsrb.memblock);
428 
429     if (p->readsrb.packet)
430         pa_packet_unref(p->readsrb.packet);
431 
432     if (p->readio.memblock)
433         pa_memblock_unref(p->readio.memblock);
434 
435     if (p->readio.packet)
436         pa_packet_unref(p->readio.packet);
437 
438     if (p->registered_memfd_ids)
439         pa_idxset_free(p->registered_memfd_ids, NULL);
440 
441     pa_xfree(p);
442 }
443 
pa_pstream_send_packet(pa_pstream * p,pa_packet * packet,pa_cmsg_ancil_data * ancil_data)444 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, pa_cmsg_ancil_data *ancil_data) {
445     struct item_info *i;
446 
447     pa_assert(p);
448     pa_assert(PA_REFCNT_VALUE(p) > 0);
449     pa_assert(packet);
450 
451     if (p->dead) {
452 #ifdef HAVE_CREDS
453         pa_cmsg_ancil_data_close_fds(ancil_data);
454 #endif
455         return;
456     }
457 
458     if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
459         i = pa_xnew(struct item_info, 1);
460 
461     i->type = PA_PSTREAM_ITEM_PACKET;
462     i->packet = pa_packet_ref(packet);
463 
464 #ifdef HAVE_CREDS
465     if ((i->with_ancil_data = !!ancil_data)) {
466         i->ancil_data = *ancil_data;
467         if (ancil_data->creds_valid)
468             pa_assert(ancil_data->nfd == 0);
469         else
470             pa_assert(ancil_data->nfd > 0);
471     }
472 #endif
473 
474     pa_queue_push(p->send_queue, i);
475     if (PaQueueGetLen(p->send_queue) >= 3) {  // 3 maybe have msg backlog
476         AUDIO_WARNING_LOG("[MSG backlog]: @<%{public}p> PaQueueLen = %{public}u", p, PaQueueGetLen(p->send_queue));
477     }
478 
479     p->mainloop->defer_enable(p->defer_event, 1);
480 }
481 
pa_pstream_send_memblock(pa_pstream * p,uint32_t channel,int64_t offset,pa_seek_mode_t seek_mode,const pa_memchunk * chunk)482 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
483     size_t length, idx;
484     size_t bsm;
485 
486     pa_assert(p);
487     pa_assert(PA_REFCNT_VALUE(p) > 0);
488     pa_assert(channel != (uint32_t) -1);
489     pa_assert(chunk);
490 
491     if (p->dead)
492         return;
493 
494     idx = 0;
495     length = chunk->length;
496 
497     bsm = pa_mempool_block_size_max(p->mempool);
498 
499     while (length > 0) {
500         struct item_info *i;
501         size_t n;
502 
503         if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
504             i = pa_xnew(struct item_info, 1);
505         i->type = PA_PSTREAM_ITEM_MEMBLOCK;
506 
507         n = PA_MIN(length, bsm);
508         i->chunk.index = chunk->index + idx;
509         i->chunk.length = n;
510         i->chunk.memblock = pa_memblock_ref(chunk->memblock);
511 
512         i->channel = channel;
513         i->offset = offset;
514         i->seek_mode = seek_mode;
515 #ifdef HAVE_CREDS
516         i->with_ancil_data = false;
517 #endif
518 
519         pa_queue_push(p->send_queue, i);
520 
521         idx += n;
522         length -= n;
523     }
524 
525     p->mainloop->defer_enable(p->defer_event, 1);
526 }
527 
pa_pstream_send_release(pa_pstream * p,uint32_t block_id)528 void pa_pstream_send_release(pa_pstream *p, uint32_t block_id) {
529     struct item_info *item;
530     pa_assert(p);
531     pa_assert(PA_REFCNT_VALUE(p) > 0);
532 
533     if (p->dead)
534         return;
535 
536 /*     pa_log("Releasing block %u", block_id); */
537 
538     if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
539         item = pa_xnew(struct item_info, 1);
540     item->type = PA_PSTREAM_ITEM_SHMRELEASE;
541     item->block_id = block_id;
542 #ifdef HAVE_CREDS
543     item->with_ancil_data = false;
544 #endif
545 
546     pa_queue_push(p->send_queue, item);
547     p->mainloop->defer_enable(p->defer_event, 1);
548 }
549 
550 /* might be called from thread context */
memimport_release_cb(pa_memimport * i,uint32_t block_id,void * userdata)551 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
552     pa_pstream *p = userdata;
553 
554     pa_assert(p);
555     pa_assert(PA_REFCNT_VALUE(p) > 0);
556 
557     if (p->dead)
558         return;
559 
560     if (p->release_callback)
561         p->release_callback(p, block_id, p->release_callback_userdata);
562     else
563         pa_pstream_send_release(p, block_id);
564 }
565 
pa_pstream_send_revoke(pa_pstream * p,uint32_t block_id)566 void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id) {
567     struct item_info *item;
568     pa_assert(p);
569     pa_assert(PA_REFCNT_VALUE(p) > 0);
570 
571     if (p->dead)
572         return;
573 /*     pa_log("Revoking block %u", block_id); */
574 
575     if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
576         item = pa_xnew(struct item_info, 1);
577     item->type = PA_PSTREAM_ITEM_SHMREVOKE;
578     item->block_id = block_id;
579 #ifdef HAVE_CREDS
580     item->with_ancil_data = false;
581 #endif
582 
583     pa_queue_push(p->send_queue, item);
584     p->mainloop->defer_enable(p->defer_event, 1);
585 }
586 
587 /* might be called from thread context */
memexport_revoke_cb(pa_memexport * e,uint32_t block_id,void * userdata)588 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
589     pa_pstream *p = userdata;
590 
591     pa_assert(p);
592     pa_assert(PA_REFCNT_VALUE(p) > 0);
593 
594     if (p->revoke_callback)
595         p->revoke_callback(p, block_id, p->revoke_callback_userdata);
596     else
597         pa_pstream_send_revoke(p, block_id);
598 }
599 
prepare_next_write_item(pa_pstream * p)600 static void prepare_next_write_item(pa_pstream *p) {
601     pa_assert(p);
602     pa_assert(PA_REFCNT_VALUE(p) > 0);
603 
604     p->write.current = pa_queue_pop(p->send_queue);
605 
606     if (!p->write.current)
607         return;
608     p->write.index = 0;
609     p->write.data = NULL;
610     p->write.minibuf_validsize = 0;
611     pa_memchunk_reset(&p->write.memchunk);
612 
613     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
614     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
615     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
616     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
617     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
618 
619     if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
620         size_t plen;
621 
622         pa_assert(p->write.current->packet);
623 
624         p->write.data = (void *) pa_packet_data(p->write.current->packet, &plen);
625         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) plen);
626 
627         if (plen <= MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
628             memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, plen);
629             p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + plen;
630         }
631 
632     } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
633 
634         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
635         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
636 
637     } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
638 
639         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
640         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
641 
642     } else {
643         uint32_t flags;
644         bool send_payload = true;
645 
646         pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
647         pa_assert(p->write.current->chunk.memblock);
648 
649         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
650         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
651         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
652 
653         flags = (uint32_t) (p->write.current->seek_mode & PA_FLAG_SEEKMASK);
654 
655         if (p->use_shm) {
656             pa_mem_type_t type;
657             uint32_t block_id, shm_id;
658             size_t offset, length;
659             uint32_t *shm_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE];
660             size_t shm_size = sizeof(uint32_t) * PA_PSTREAM_SHM_MAX;
661             pa_mempool *current_pool = pa_memblock_get_pool(p->write.current->chunk.memblock);
662             pa_memexport *current_export;
663 
664             if (p->mempool == current_pool)
665                 pa_assert_se(current_export = p->export);
666             else
667                 pa_assert_se(current_export = pa_memexport_new(current_pool, memexport_revoke_cb, p));
668 
669             if (pa_memexport_put(current_export,
670                                  p->write.current->chunk.memblock,
671                                  &type,
672                                  &block_id,
673                                  &shm_id,
674                                  &offset,
675                                  &length) >= 0) {
676 
677                 if (type == PA_MEM_TYPE_SHARED_POSIX)
678                     send_payload = false;
679 
680                 if (type == PA_MEM_TYPE_SHARED_MEMFD && p->use_memfd) {
681                     if (pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
682                         flags |= PA_FLAG_SHMDATA_MEMFD_BLOCK;
683                         send_payload = false;
684                     } else {
685                         if (!p->non_registered_memfd_id_error_logged) {
686                             pa_log("Cannot send block reference with non-registered memfd ID = %u", shm_id);
687                             pa_log("Falling back to copying full block data over socket");
688                             pa_log("There's a bug report about this: https://gitlab.freedesktop.org/pulseaudio/pulseaudio/issues/824");
689                             p->non_registered_memfd_id_error_logged = true;
690                         }
691                     }
692                 }
693 
694                 if (send_payload) {
695                     pa_assert_se(pa_memexport_process_release(current_export, block_id) == 0);
696                 } else {
697                     flags |= PA_FLAG_SHMDATA;
698                     if (pa_mempool_is_remote_writable(current_pool))
699                         flags |= PA_FLAG_SHMWRITABLE;
700 
701                     shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
702                     shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
703                     shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
704                     shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
705 
706                     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(shm_size);
707                     p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + shm_size;
708                 }
709             }
710 /*             else */
711 /*                 FIXME: Avoid memexport slot leaks. Call pa_memexport_process_release() */
712 /*                 pa_log_warn("Failed to export memory block."); */
713 
714             if (current_export != p->export)
715                 pa_memexport_free(current_export);
716             pa_mempool_unref(current_pool);
717         }
718 
719         if (send_payload) {
720             p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
721             p->write.memchunk = p->write.current->chunk;
722             pa_memblock_ref(p->write.memchunk.memblock);
723         }
724 
725         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
726     }
727 
728 #ifdef HAVE_CREDS
729     if ((p->send_ancil_data_now = p->write.current->with_ancil_data))
730         p->write_ancil_data = &p->write.current->ancil_data;
731 #endif
732 }
733 
check_srbpending(pa_pstream * p)734 static void check_srbpending(pa_pstream *p) {
735     if (!p->is_srbpending)
736         return;
737 
738     if (p->srb)
739         pa_srbchannel_free(p->srb);
740 
741     p->srb = p->srbpending;
742     p->is_srbpending = false;
743 
744     if (p->srb)
745         pa_srbchannel_set_callback(p->srb, srb_callback, p);
746 }
747 
do_write(pa_pstream * p)748 static int do_write(pa_pstream *p) {
749     void *d;
750     size_t l;
751     ssize_t r;
752     pa_memblock *release_memblock = NULL;
753 
754     pa_assert(p);
755     pa_assert(PA_REFCNT_VALUE(p) > 0);
756 
757     if (!p->write.current)
758         prepare_next_write_item(p);
759 
760     if (!p->write.current) {
761         /* The out queue is empty, so switching channels is safe */
762         check_srbpending(p);
763         return 0;
764     }
765 
766     if (p->write.minibuf_validsize > 0) {
767         d = p->write.minibuf + p->write.index;
768         l = p->write.minibuf_validsize - p->write.index;
769     } else if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
770         d = (uint8_t*) p->write.descriptor + p->write.index;
771         l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
772     } else {
773         pa_assert(p->write.data || p->write.memchunk.memblock);
774 
775         if (p->write.data)
776             d = p->write.data;
777         else {
778             d = pa_memblock_acquire_chunk(&p->write.memchunk);
779             release_memblock = p->write.memchunk.memblock;
780         }
781 
782         d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
783         l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
784     }
785 
786     pa_assert(l > 0);
787 
788 #ifdef HAVE_CREDS
789     if (p->send_ancil_data_now) {
790         if (p->write_ancil_data->creds_valid) {
791             pa_assert(p->write_ancil_data->nfd == 0);
792             if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_ancil_data->creds)) < 0)
793                 goto fail;
794         }
795         else
796             if ((r = pa_iochannel_write_with_fds(p->io, d, l, p->write_ancil_data->nfd, p->write_ancil_data->fds)) < 0)
797                 goto fail;
798 
799         pa_cmsg_ancil_data_close_fds(p->write_ancil_data);
800         p->send_ancil_data_now = false;
801     } else
802 #endif
803     if (p->srb)
804         r = pa_srbchannel_write(p->srb, d, l);
805     else if ((r = pa_iochannel_write(p->io, d, l)) < 0)
806         goto fail;
807 
808     if (release_memblock)
809         pa_memblock_release(release_memblock);
810 
811     p->write.index += (size_t) r;
812 
813     if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
814         pa_assert(p->write.current);
815         item_free(p->write.current);
816         p->write.current = NULL;
817 
818         if (p->write.memchunk.memblock)
819             pa_memblock_unref(p->write.memchunk.memblock);
820 
821         pa_memchunk_reset(&p->write.memchunk);
822 
823         if (p->drain_callback && !pa_pstream_is_pending(p))
824             p->drain_callback(p, p->drain_callback_userdata);
825     }
826 
827     return (size_t) r == l ? 1 : 0;
828 
829 fail:
830 #ifdef HAVE_CREDS
831     if (p->send_ancil_data_now)
832         pa_cmsg_ancil_data_close_fds(p->write_ancil_data);
833 #endif
834 
835     if (release_memblock)
836         pa_memblock_release(release_memblock);
837 
838     return -1;
839 }
840 
memblock_complete(pa_pstream * p,struct pstream_read * re)841 static void memblock_complete(pa_pstream *p, struct pstream_read *re) {
842     pa_memchunk chunk;
843     int64_t offset;
844 
845     if (!p->receive_memblock_callback)
846         return;
847 
848     chunk.memblock = re->memblock;
849     chunk.index = 0;
850     chunk.length = re->index - PA_PSTREAM_DESCRIPTOR_SIZE;
851 
852     offset = (int64_t) (
853              (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
854              (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
855 
856     p->receive_memblock_callback(
857         p,
858         ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
859         offset,
860         ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
861         &chunk,
862         p->receive_memblock_callback_userdata);
863 }
864 
do_read(pa_pstream * p,struct pstream_read * re)865 static int do_read(pa_pstream *p, struct pstream_read *re) {
866     void *d;
867     size_t l;
868     ssize_t r;
869     pa_memblock *release_memblock = NULL;
870     pa_assert(p);
871     pa_assert(PA_REFCNT_VALUE(p) > 0);
872 
873     if (re->index < PA_PSTREAM_DESCRIPTOR_SIZE) {
874         d = (uint8_t*) re->descriptor + re->index;
875         l = PA_PSTREAM_DESCRIPTOR_SIZE - re->index;
876     } else {
877         pa_assert(re->data || re->memblock);
878 
879         if (re->data)
880             d = re->data;
881         else {
882             d = pa_memblock_acquire(re->memblock);
883             release_memblock = re->memblock;
884         }
885 
886         d = (uint8_t*) d + re->index - PA_PSTREAM_DESCRIPTOR_SIZE;
887         l = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (re->index - PA_PSTREAM_DESCRIPTOR_SIZE);
888     }
889 
890     if (re == &p->readsrb) {
891         r = pa_srbchannel_read(p->srb, d, l);
892         if (r == 0) {
893             if (release_memblock)
894                 pa_memblock_release(release_memblock);
895             return 1;
896         }
897     }
898     else
899 #ifdef HAVE_CREDS
900     {
901         pa_cmsg_ancil_data b;
902 
903         if ((r = pa_iochannel_read_with_ancil_data(p->io, d, l, &b)) <= 0)
904             goto fail;
905 
906         if (b.creds_valid) {
907             p->read_ancil_data.creds_valid = true;
908             p->read_ancil_data.creds = b.creds;
909         }
910         if (b.nfd > 0) {
911             pa_assert(b.nfd <= MAX_ANCIL_DATA_FDS);
912             p->read_ancil_data.nfd = b.nfd;
913             memcpy(p->read_ancil_data.fds, b.fds, sizeof(int) * b.nfd);
914             p->read_ancil_data.close_fds_on_cleanup = b.close_fds_on_cleanup;
915         }
916     }
917 #else
918     if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
919         goto fail;
920 #endif
921 
922     if (release_memblock)
923         pa_memblock_release(release_memblock);
924 
925     re->index += (size_t) r;
926 
927     if (re->index == PA_PSTREAM_DESCRIPTOR_SIZE) {
928         uint32_t flags, length, channel;
929         /* Reading of frame descriptor complete */
930 
931         flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
932 
933         if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
934             pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
935             return -1;
936         }
937 
938         if (flags == PA_FLAG_SHMRELEASE) {
939 
940             /* This is a SHM memblock release frame with no payload */
941 
942 /*             pa_log("Got release frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
943 
944             pa_assert(p->export);
945             pa_memexport_process_release(p->export, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
946 
947             goto frame_done;
948 
949         } else if (flags == PA_FLAG_SHMREVOKE) {
950 
951             /* This is a SHM memblock revoke frame with no payload */
952 
953 /*             pa_log("Got revoke frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
954 
955             pa_assert(p->import);
956             pa_memimport_process_revoke(p->import, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
957 
958             goto frame_done;
959         }
960 
961         length = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
962 
963         if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
964             pa_log_warn("Received invalid frame size: %lu", (unsigned long) length);
965             return -1;
966         }
967 
968         pa_assert(!re->packet && !re->memblock);
969 
970         channel = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
971 
972         if (channel == (uint32_t) -1) {
973             size_t plen;
974 
975             if (flags != 0) {
976                 pa_log_warn("Received packet frame with invalid flags value.");
977                 return -1;
978             }
979 
980             /* Frame is a packet frame */
981             re->packet = pa_packet_new(length);
982             re->data = (void *) pa_packet_data(re->packet, &plen);
983 
984         } else {
985 
986             if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
987                 pa_log_warn("Received memblock frame with invalid seek mode.");
988                 return -1;
989             }
990 
991             if (((flags & PA_FLAG_SHMMASK) & PA_FLAG_SHMDATA) != 0) {
992 
993                 if (length != sizeof(re->shm_info)) {
994                     pa_log_warn("Received SHM memblock frame with invalid frame length.");
995                     return -1;
996                 }
997 
998                 /* Frame is a memblock frame referencing an SHM memblock */
999                 re->data = re->shm_info;
1000 
1001             } else if ((flags & PA_FLAG_SHMMASK) == 0) {
1002 
1003                 /* Frame is a memblock frame */
1004 
1005                 re->memblock = pa_memblock_new(p->mempool, length);
1006                 re->data = NULL;
1007             } else {
1008 
1009                 pa_log_warn("Received memblock frame with invalid flags value.");
1010                 return -1;
1011             }
1012         }
1013 
1014     } else if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
1015         /* Frame complete */
1016 
1017         if (re->memblock) {
1018             memblock_complete(p, re);
1019 
1020             /* This was a memblock frame. We can unref the memblock now */
1021             pa_memblock_unref(re->memblock);
1022 
1023         } else if (re->packet) {
1024 
1025             if (p->receive_packet_callback)
1026 #ifdef HAVE_CREDS
1027                 p->receive_packet_callback(p, re->packet, &p->read_ancil_data, p->receive_packet_callback_userdata);
1028 #else
1029                 p->receive_packet_callback(p, re->packet, NULL, p->receive_packet_callback_userdata);
1030 #endif
1031 
1032             pa_packet_unref(re->packet);
1033         } else {
1034             pa_memblock *b = NULL;
1035             uint32_t flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
1036             uint32_t shm_id = ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]);
1037             pa_mem_type_t type = (flags & PA_FLAG_SHMDATA_MEMFD_BLOCK) ?
1038                                  PA_MEM_TYPE_SHARED_MEMFD : PA_MEM_TYPE_SHARED_POSIX;
1039 
1040             pa_assert(((flags & PA_FLAG_SHMMASK) & PA_FLAG_SHMDATA) != 0);
1041             pa_assert(p->import);
1042 
1043             if (type == PA_MEM_TYPE_SHARED_MEMFD && p->use_memfd &&
1044                 !pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
1045 
1046                 if (pa_log_ratelimit(PA_LOG_ERROR))
1047                     pa_log("Ignoring received block reference with non-registered memfd ID = %u", shm_id);
1048 
1049             } else if (!(b = pa_memimport_get(p->import,
1050                                               type,
1051                                               ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]),
1052                                               shm_id,
1053                                               ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]),
1054                                               ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]),
1055                                               !!(flags & PA_FLAG_SHMWRITABLE)))) {
1056 
1057                 if (pa_log_ratelimit(PA_LOG_DEBUG))
1058                     pa_log_debug("Failed to import memory block.");
1059             }
1060 
1061             if (p->receive_memblock_callback) {
1062                 int64_t offset;
1063                 pa_memchunk chunk;
1064 
1065                 chunk.memblock = b;
1066                 chunk.index = 0;
1067                 chunk.length = b ? pa_memblock_get_length(b) : ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]);
1068 
1069                 offset = (int64_t) (
1070                         (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
1071                         (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
1072 
1073                 p->receive_memblock_callback(
1074                         p,
1075                         ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
1076                         offset,
1077                         ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
1078                         &chunk,
1079                         p->receive_memblock_callback_userdata);
1080             }
1081 
1082             if (b)
1083                 pa_memblock_unref(b);
1084         }
1085 
1086         goto frame_done;
1087     }
1088 
1089     return 0;
1090 
1091 frame_done:
1092     re->memblock = NULL;
1093     re->packet = NULL;
1094     re->index = 0;
1095     re->data = NULL;
1096 
1097 #ifdef HAVE_CREDS
1098     /* FIXME: Close received ancillary data fds if the pstream's
1099      * receive_packet_callback did not do so.
1100      *
1101      * Malicious clients can attach fds to unknown commands, or attach them
1102      * to commands that does not expect fds. By doing so, server will reach
1103      * its open fd limit and future clients' SHM transfers will always fail.
1104      */
1105     p->read_ancil_data.creds_valid = false;
1106     p->read_ancil_data.nfd = 0;
1107 #endif
1108 
1109     return 0;
1110 
1111 fail:
1112     if (release_memblock)
1113         pa_memblock_release(release_memblock);
1114 
1115     return -1;
1116 }
1117 
pa_pstream_set_die_callback(pa_pstream * p,pa_pstream_notify_cb_t cb,void * userdata)1118 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
1119     pa_assert(p);
1120     pa_assert(PA_REFCNT_VALUE(p) > 0);
1121 
1122     p->die_callback = cb;
1123     p->die_callback_userdata = userdata;
1124 }
1125 
pa_pstream_set_drain_callback(pa_pstream * p,pa_pstream_notify_cb_t cb,void * userdata)1126 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
1127     pa_assert(p);
1128     pa_assert(PA_REFCNT_VALUE(p) > 0);
1129 
1130     p->drain_callback = cb;
1131     p->drain_callback_userdata = userdata;
1132 }
1133 
pa_pstream_set_receive_packet_callback(pa_pstream * p,pa_pstream_packet_cb_t cb,void * userdata)1134 void pa_pstream_set_receive_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
1135     pa_assert(p);
1136     pa_assert(PA_REFCNT_VALUE(p) > 0);
1137 
1138     p->receive_packet_callback = cb;
1139     p->receive_packet_callback_userdata = userdata;
1140 }
1141 
pa_pstream_set_receive_memblock_callback(pa_pstream * p,pa_pstream_memblock_cb_t cb,void * userdata)1142 void pa_pstream_set_receive_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
1143     pa_assert(p);
1144     pa_assert(PA_REFCNT_VALUE(p) > 0);
1145 
1146     p->receive_memblock_callback = cb;
1147     p->receive_memblock_callback_userdata = userdata;
1148 }
1149 
pa_pstream_set_release_callback(pa_pstream * p,pa_pstream_block_id_cb_t cb,void * userdata)1150 void pa_pstream_set_release_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
1151     pa_assert(p);
1152     pa_assert(PA_REFCNT_VALUE(p) > 0);
1153 
1154     p->release_callback = cb;
1155     p->release_callback_userdata = userdata;
1156 }
1157 
pa_pstream_set_revoke_callback(pa_pstream * p,pa_pstream_block_id_cb_t cb,void * userdata)1158 void pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
1159     pa_assert(p);
1160     pa_assert(PA_REFCNT_VALUE(p) > 0);
1161 
1162     p->revoke_callback = cb;
1163     p->revoke_callback_userdata = userdata;
1164 }
1165 
pa_pstream_is_pending(pa_pstream * p)1166 bool pa_pstream_is_pending(pa_pstream *p) {
1167     bool b;
1168 
1169     pa_assert(p);
1170     pa_assert(PA_REFCNT_VALUE(p) > 0);
1171 
1172     if (p->dead)
1173         b = false;
1174     else
1175         b = p->write.current || !pa_queue_isempty(p->send_queue);
1176 
1177     return b;
1178 }
1179 
pa_pstream_unref(pa_pstream * p)1180 void pa_pstream_unref(pa_pstream*p) {
1181     pa_assert(p);
1182     pa_assert(PA_REFCNT_VALUE(p) > 0);
1183 
1184     if (PA_REFCNT_DEC(p) <= 0)
1185         pstream_free(p);
1186 }
1187 
pa_pstream_ref(pa_pstream * p)1188 pa_pstream* pa_pstream_ref(pa_pstream*p) {
1189     pa_assert(p);
1190     pa_assert(PA_REFCNT_VALUE(p) > 0);
1191 
1192     PA_REFCNT_INC(p);
1193     return p;
1194 }
1195 
pa_pstream_unlink(pa_pstream * p)1196 void pa_pstream_unlink(pa_pstream *p) {
1197     pa_assert(p);
1198 
1199     if (p->dead)
1200         return;
1201 
1202     p->dead = true;
1203 
1204     while (p->srb || p->is_srbpending) /* In theory there could be one active and one pending */
1205         pa_pstream_set_srbchannel(p, NULL);
1206 
1207     if (p->import) {
1208         pa_memimport_free(p->import);
1209         p->import = NULL;
1210     }
1211 
1212     if (p->export) {
1213         pa_memexport_free(p->export);
1214         p->export = NULL;
1215     }
1216 
1217     if (p->io) {
1218         pa_iochannel_free(p->io);
1219         p->io = NULL;
1220     }
1221 
1222     if (p->defer_event) {
1223         p->mainloop->defer_free(p->defer_event);
1224         p->defer_event = NULL;
1225     }
1226 
1227     p->die_callback = NULL;
1228     p->drain_callback = NULL;
1229     p->receive_packet_callback = NULL;
1230     p->receive_memblock_callback = NULL;
1231 }
1232 
pa_pstream_enable_shm(pa_pstream * p,bool enable)1233 void pa_pstream_enable_shm(pa_pstream *p, bool enable) {
1234     pa_assert(p);
1235     pa_assert(PA_REFCNT_VALUE(p) > 0);
1236 
1237     p->use_shm = enable;
1238 
1239     if (enable) {
1240 
1241         if (!p->export)
1242             p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
1243 
1244     } else {
1245 
1246         if (p->export) {
1247             pa_memexport_free(p->export);
1248             p->export = NULL;
1249         }
1250     }
1251 }
1252 
pa_pstream_enable_memfd(pa_pstream * p)1253 void pa_pstream_enable_memfd(pa_pstream *p) {
1254     pa_assert(p);
1255     pa_assert(PA_REFCNT_VALUE(p) > 0);
1256     pa_assert(p->use_shm);
1257 
1258     p->use_memfd = true;
1259 
1260     if (!p->registered_memfd_ids) {
1261         p->registered_memfd_ids = pa_idxset_new(NULL, NULL);
1262     }
1263 }
1264 
pa_pstream_get_shm(pa_pstream * p)1265 bool pa_pstream_get_shm(pa_pstream *p) {
1266     pa_assert(p);
1267     pa_assert(PA_REFCNT_VALUE(p) > 0);
1268 
1269     return p->use_shm;
1270 }
1271 
pa_pstream_get_memfd(pa_pstream * p)1272 bool pa_pstream_get_memfd(pa_pstream *p) {
1273     pa_assert(p);
1274     pa_assert(PA_REFCNT_VALUE(p) > 0);
1275 
1276     return p->use_memfd;
1277 }
1278 
pa_pstream_set_srbchannel(pa_pstream * p,pa_srbchannel * srb)1279 void pa_pstream_set_srbchannel(pa_pstream *p, pa_srbchannel *srb) {
1280     pa_assert(p);
1281     pa_assert(PA_REFCNT_VALUE(p) > 0 || srb == NULL);
1282 
1283     if (srb == p->srb)
1284         return;
1285 
1286     /* We can't handle quick switches between srbchannels. */
1287     pa_assert(!p->is_srbpending);
1288 
1289     p->srbpending = srb;
1290     p->is_srbpending = true;
1291 
1292     /* Switch immediately, if possible. */
1293     if (p->dead)
1294         check_srbpending(p);
1295     else
1296         do_write(p);
1297 }
1298