1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as
9 published by the Free Software Foundation; either version 2.1 of the
10 License, or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public
18 License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <unistd.h>
28
29 #ifdef HAVE_NETINET_IN_H
30 #include <netinet/in.h>
31 #endif
32
33 #include <pulse/xmalloc.h>
34
35 #include <pulsecore/idxset.h>
36 #include <pulsecore/socket.h>
37 #include <pulsecore/queue.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/creds.h>
40 #include <pulsecore/refcnt.h>
41 #include <pulsecore/flist.h>
42 #include <pulsecore/macro.h>
43
44 #include "pstream.h"
45
46 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
47 #define PA_FLAG_SHMDATA 0x80000000LU
48 #define PA_FLAG_SHMDATA_MEMFD_BLOCK 0x20000000LU
49 #define PA_FLAG_SHMRELEASE 0x40000000LU
50 #define PA_FLAG_SHMREVOKE 0xC0000000LU
51 #define PA_FLAG_SHMMASK 0xFF000000LU
52 #define PA_FLAG_SEEKMASK 0x000000FFLU
53 #define PA_FLAG_SHMWRITABLE 0x00800000LU
54
55 /* The sequence descriptor header consists of 5 32bit integers: */
56 enum {
57 PA_PSTREAM_DESCRIPTOR_LENGTH,
58 PA_PSTREAM_DESCRIPTOR_CHANNEL,
59 PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
60 PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
61 PA_PSTREAM_DESCRIPTOR_FLAGS,
62 PA_PSTREAM_DESCRIPTOR_MAX
63 };
64
65 /* If we have an SHM block, this info follows the descriptor */
66 enum {
67 PA_PSTREAM_SHM_BLOCKID,
68 PA_PSTREAM_SHM_SHMID,
69 PA_PSTREAM_SHM_INDEX,
70 PA_PSTREAM_SHM_LENGTH,
71 PA_PSTREAM_SHM_MAX
72 };
73
74 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
75
76 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
77
78 #define MINIBUF_SIZE (256)
79
80 /* To allow uploading a single sample in one frame, this value should be the
81 * same size (16 MB) as PA_SCACHE_ENTRY_SIZE_MAX from pulsecore/core-scache.h.
82 */
83 #define FRAME_SIZE_MAX_ALLOW (1024*1024*16)
84
85 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
86
87 struct item_info {
88 enum {
89 PA_PSTREAM_ITEM_PACKET,
90 PA_PSTREAM_ITEM_MEMBLOCK,
91 PA_PSTREAM_ITEM_SHMRELEASE,
92 PA_PSTREAM_ITEM_SHMREVOKE
93 } type;
94
95 /* packet info */
96 pa_packet *packet;
97 #ifdef HAVE_CREDS
98 bool with_ancil_data;
99 pa_cmsg_ancil_data ancil_data;
100 #endif
101
102 /* memblock info */
103 pa_memchunk chunk;
104 uint32_t channel;
105 int64_t offset;
106 pa_seek_mode_t seek_mode;
107
108 /* release/revoke info */
109 uint32_t block_id;
110 };
111
112 struct pstream_read {
113 pa_pstream_descriptor descriptor;
114 pa_memblock *memblock;
115 pa_packet *packet;
116 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
117 void *data;
118 size_t index;
119 };
120
121 struct pa_pstream {
122 PA_REFCNT_DECLARE;
123
124 pa_mainloop_api *mainloop;
125 pa_defer_event *defer_event;
126 pa_iochannel *io;
127 pa_srbchannel *srb, *srbpending;
128 bool is_srbpending;
129
130 pa_queue *send_queue;
131
132 bool dead;
133
134 struct {
135 union {
136 uint8_t minibuf[MINIBUF_SIZE];
137 pa_pstream_descriptor descriptor;
138 };
139 struct item_info* current;
140 void *data;
141 size_t index;
142 int minibuf_validsize;
143 pa_memchunk memchunk;
144 } write;
145
146 struct pstream_read readio, readsrb;
147
148 /* @use_shm: beside copying the full audio data to the other
149 * PA end, this pipe supports just sending references of the
150 * same audio data blocks if they reside in a SHM pool.
151 *
152 * @use_memfd: pipe supports sending SHM memfd block references
153 *
154 * @registered_memfd_ids: registered memfd pools SHM IDs. Check
155 * pa_pstream_register_memfd_mempool() for more information. */
156 bool use_shm, use_memfd;
157 bool non_registered_memfd_id_error_logged;
158 pa_idxset *registered_memfd_ids;
159
160 pa_memimport *import;
161 pa_memexport *export;
162
163 pa_pstream_packet_cb_t receive_packet_callback;
164 void *receive_packet_callback_userdata;
165
166 pa_pstream_memblock_cb_t receive_memblock_callback;
167 void *receive_memblock_callback_userdata;
168
169 pa_pstream_notify_cb_t drain_callback;
170 void *drain_callback_userdata;
171
172 pa_pstream_notify_cb_t die_callback;
173 void *die_callback_userdata;
174
175 pa_pstream_block_id_cb_t revoke_callback;
176 void *revoke_callback_userdata;
177
178 pa_pstream_block_id_cb_t release_callback;
179 void *release_callback_userdata;
180
181 pa_mempool *mempool;
182
183 #ifdef HAVE_CREDS
184 pa_cmsg_ancil_data read_ancil_data, *write_ancil_data;
185 bool send_ancil_data_now;
186 #endif
187 };
188
189 #ifdef HAVE_CREDS
190 /*
191 * memfd-backed SHM pools blocks transfer occur without passing the pool's
192 * fd every time, thus minimizing overhead and avoiding fd leaks. A
193 * REGISTER_MEMFD_SHMID command is sent, with the pool's memfd fd, very early
194 * on. This command has an ID that uniquely identifies the pool in question.
195 * Further pool's block references can then be exclusively done using such ID;
196 * the fd can be safely closed – on both ends – afterwards.
197 *
198 * On the sending side of this command, we want to close the passed fds
199 * directly after being sent. Meanwhile we're only allowed to asynchronously
200 * schedule packet writes to the pstream, so the job of closing passed fds is
201 * left to the pstream's actual writing function do_write(): it knows the
202 * exact point in time where the fds are passed to the other end through
203 * iochannels and the sendmsg() system call.
204 *
205 * Nonetheless not all code paths in the system desire their socket-passed
206 * fds to be closed after the send. srbchannel needs the passed fds to still
207 * be open for further communication. System-wide global memfd-backed pools
208 * also require the passed fd to be open: they pass the same fd, with the same
209 * ID registration mechanism, for each newly connected client to the system.
210 *
211 * So from all of the above, never close the ancillary fds by your own and
212 * always call below method instead. It takes care of closing the passed fds
213 * _only if allowed_ by the code paths that originally created them to do so.
214 * Moreover, it is multiple-invocations safe: failure handlers can, and
215 * should, call it for passed fds cleanup without worrying too much about
216 * the system state.
217 */
pa_cmsg_ancil_data_close_fds(struct pa_cmsg_ancil_data * ancil)218 void pa_cmsg_ancil_data_close_fds(struct pa_cmsg_ancil_data *ancil) {
219 if (ancil && ancil->nfd > 0 && ancil->close_fds_on_cleanup) {
220 int i;
221
222 pa_assert(ancil->nfd <= MAX_ANCIL_DATA_FDS);
223
224 for (i = 0; i < ancil->nfd; i++)
225 if (ancil->fds[i] != -1) {
226 pa_assert_se(pa_close(ancil->fds[i]) == 0);
227 ancil->fds[i] = -1;
228 }
229
230 ancil->nfd = 0;
231 ancil->close_fds_on_cleanup = false;
232 }
233 }
234 #endif
235
236 static int do_write(pa_pstream *p);
237 static int do_read(pa_pstream *p, struct pstream_read *re);
238
do_pstream_read_write(pa_pstream * p)239 static void do_pstream_read_write(pa_pstream *p) {
240 pa_assert(p);
241 pa_assert(PA_REFCNT_VALUE(p) > 0);
242
243 pa_pstream_ref(p);
244
245 p->mainloop->defer_enable(p->defer_event, 0);
246
247 if (!p->dead && p->srb) {
248 int r = 0;
249
250 if(do_write(p) < 0)
251 goto fail;
252
253 while (!p->dead && r == 0) {
254 r = do_read(p, &p->readsrb);
255 if (r < 0)
256 goto fail;
257 }
258 }
259
260 if (!p->dead && pa_iochannel_is_readable(p->io)) {
261 if (do_read(p, &p->readio) < 0)
262 goto fail;
263 } else if (!p->dead && pa_iochannel_is_hungup(p->io))
264 goto fail;
265
266 while (!p->dead && pa_iochannel_is_writable(p->io)) {
267 int r = do_write(p);
268 if (r < 0)
269 goto fail;
270 if (r == 0)
271 break;
272 }
273
274 pa_pstream_unref(p);
275 return;
276
277 fail:
278
279 if (p->die_callback)
280 p->die_callback(p, p->die_callback_userdata);
281
282 pa_pstream_unlink(p);
283 pa_pstream_unref(p);
284 }
285
srb_callback(pa_srbchannel * srb,void * userdata)286 static bool srb_callback(pa_srbchannel *srb, void *userdata) {
287 bool b;
288 pa_pstream *p = userdata;
289
290 pa_assert(p);
291 pa_assert(PA_REFCNT_VALUE(p) > 0);
292 pa_assert(p->srb == srb);
293
294 pa_pstream_ref(p);
295
296 do_pstream_read_write(p);
297
298 /* If either pstream or the srb is going away, return false.
299 We need to check this before p is destroyed. */
300 b = (PA_REFCNT_VALUE(p) > 1) && (p->srb == srb);
301 pa_pstream_unref(p);
302
303 return b;
304 }
305
io_callback(pa_iochannel * io,void * userdata)306 static void io_callback(pa_iochannel*io, void *userdata) {
307 pa_pstream *p = userdata;
308
309 pa_assert(p);
310 pa_assert(PA_REFCNT_VALUE(p) > 0);
311 pa_assert(p->io == io);
312
313 do_pstream_read_write(p);
314 }
315
defer_callback(pa_mainloop_api * m,pa_defer_event * e,void * userdata)316 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
317 pa_pstream *p = userdata;
318
319 pa_assert(p);
320 pa_assert(PA_REFCNT_VALUE(p) > 0);
321 pa_assert(p->defer_event == e);
322 pa_assert(p->mainloop == m);
323
324 do_pstream_read_write(p);
325 }
326
327 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
328
pa_pstream_new(pa_mainloop_api * m,pa_iochannel * io,pa_mempool * pool)329 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
330 pa_pstream *p;
331
332 pa_assert(m);
333 pa_assert(io);
334 pa_assert(pool);
335
336 p = pa_xnew0(pa_pstream, 1);
337 PA_REFCNT_INIT(p);
338 p->io = io;
339 pa_iochannel_set_callback(io, io_callback, p);
340
341 p->mainloop = m;
342 p->defer_event = m->defer_new(m, defer_callback, p);
343 m->defer_enable(p->defer_event, 0);
344
345 p->send_queue = pa_queue_new();
346
347 p->mempool = pool;
348
349 /* We do importing unconditionally */
350 p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
351
352 pa_iochannel_socket_set_rcvbuf(io, pa_mempool_block_size_max(p->mempool));
353 pa_iochannel_socket_set_sndbuf(io, pa_mempool_block_size_max(p->mempool));
354
355 return p;
356 }
357
358 /* Attach memfd<->SHM_ID mapping to given pstream and its memimport.
359 * Check pa_pstream_register_memfd_mempool() for further info.
360 *
361 * Caller owns the passed @memfd_fd and must close it down when appropriate. */
pa_pstream_attach_memfd_shmid(pa_pstream * p,unsigned shm_id,int memfd_fd)362 int pa_pstream_attach_memfd_shmid(pa_pstream *p, unsigned shm_id, int memfd_fd) {
363 int err = -1;
364
365 pa_assert(memfd_fd != -1);
366
367 if (!p->use_memfd) {
368 pa_log_warn("Received memfd ID registration request over a pipe "
369 "that does not support memfds");
370 return err;
371 }
372
373 if (pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
374 pa_log_warn("previously registered memfd SHM ID = %u", shm_id);
375 return err;
376 }
377
378 if (pa_memimport_attach_memfd(p->import, shm_id, memfd_fd, true)) {
379 pa_log("Failed to create permanent mapping for memfd region with ID = %u", shm_id);
380 return err;
381 }
382
383 pa_assert_se(pa_idxset_put(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL) == 0);
384 return 0;
385 }
386
item_free(void * item)387 static void item_free(void *item) {
388 struct item_info *i = item;
389 pa_assert(i);
390
391 if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
392 pa_assert(i->chunk.memblock);
393 pa_memblock_unref(i->chunk.memblock);
394 } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
395 pa_assert(i->packet);
396 pa_packet_unref(i->packet);
397 }
398
399 #ifdef HAVE_CREDS
400 /* On error recovery paths, there might be lingering items
401 * on the pstream send queue and they are usually freed with
402 * a call to 'pa_queue_free(p->send_queue, item_free)'. Make
403 * sure we do not leak any fds in that case! */
404 if (i->with_ancil_data)
405 pa_cmsg_ancil_data_close_fds(&i->ancil_data);
406 #endif
407
408 if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
409 pa_xfree(i);
410 }
411
pstream_free(pa_pstream * p)412 static void pstream_free(pa_pstream *p) {
413 pa_assert(p);
414
415 pa_pstream_unlink(p);
416
417 pa_queue_free(p->send_queue, item_free);
418
419 if (p->write.current)
420 item_free(p->write.current);
421
422 if (p->write.memchunk.memblock)
423 pa_memblock_unref(p->write.memchunk.memblock);
424
425 if (p->readsrb.memblock)
426 pa_memblock_unref(p->readsrb.memblock);
427
428 if (p->readsrb.packet)
429 pa_packet_unref(p->readsrb.packet);
430
431 if (p->readio.memblock)
432 pa_memblock_unref(p->readio.memblock);
433
434 if (p->readio.packet)
435 pa_packet_unref(p->readio.packet);
436
437 if (p->registered_memfd_ids)
438 pa_idxset_free(p->registered_memfd_ids, NULL);
439
440 pa_xfree(p);
441 }
442
pa_pstream_send_packet(pa_pstream * p,pa_packet * packet,pa_cmsg_ancil_data * ancil_data)443 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, pa_cmsg_ancil_data *ancil_data) {
444 struct item_info *i;
445
446 pa_assert(p);
447 pa_assert(PA_REFCNT_VALUE(p) > 0);
448 pa_assert(packet);
449
450 if (p->dead) {
451 #ifdef HAVE_CREDS
452 pa_cmsg_ancil_data_close_fds(ancil_data);
453 #endif
454 return;
455 }
456
457 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
458 i = pa_xnew(struct item_info, 1);
459
460 i->type = PA_PSTREAM_ITEM_PACKET;
461 i->packet = pa_packet_ref(packet);
462
463 #ifdef HAVE_CREDS
464 if ((i->with_ancil_data = !!ancil_data)) {
465 i->ancil_data = *ancil_data;
466 if (ancil_data->creds_valid)
467 pa_assert(ancil_data->nfd == 0);
468 else
469 pa_assert(ancil_data->nfd > 0);
470 }
471 #endif
472
473 pa_queue_push(p->send_queue, i);
474
475 p->mainloop->defer_enable(p->defer_event, 1);
476 }
477
pa_pstream_send_memblock(pa_pstream * p,uint32_t channel,int64_t offset,pa_seek_mode_t seek_mode,const pa_memchunk * chunk)478 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
479 size_t length, idx;
480 size_t bsm;
481
482 pa_assert(p);
483 pa_assert(PA_REFCNT_VALUE(p) > 0);
484 pa_assert(channel != (uint32_t) -1);
485 pa_assert(chunk);
486
487 if (p->dead)
488 return;
489
490 idx = 0;
491 length = chunk->length;
492
493 bsm = pa_mempool_block_size_max(p->mempool);
494
495 while (length > 0) {
496 struct item_info *i;
497 size_t n;
498
499 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
500 i = pa_xnew(struct item_info, 1);
501 i->type = PA_PSTREAM_ITEM_MEMBLOCK;
502
503 n = PA_MIN(length, bsm);
504 i->chunk.index = chunk->index + idx;
505 i->chunk.length = n;
506 i->chunk.memblock = pa_memblock_ref(chunk->memblock);
507
508 i->channel = channel;
509 i->offset = offset;
510 i->seek_mode = seek_mode;
511 #ifdef HAVE_CREDS
512 i->with_ancil_data = false;
513 #endif
514
515 pa_queue_push(p->send_queue, i);
516
517 idx += n;
518 length -= n;
519 }
520
521 p->mainloop->defer_enable(p->defer_event, 1);
522 }
523
pa_pstream_send_release(pa_pstream * p,uint32_t block_id)524 void pa_pstream_send_release(pa_pstream *p, uint32_t block_id) {
525 struct item_info *item;
526 pa_assert(p);
527 pa_assert(PA_REFCNT_VALUE(p) > 0);
528
529 if (p->dead)
530 return;
531
532 /* pa_log("Releasing block %u", block_id); */
533
534 if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
535 item = pa_xnew(struct item_info, 1);
536 item->type = PA_PSTREAM_ITEM_SHMRELEASE;
537 item->block_id = block_id;
538 #ifdef HAVE_CREDS
539 item->with_ancil_data = false;
540 #endif
541
542 pa_queue_push(p->send_queue, item);
543 p->mainloop->defer_enable(p->defer_event, 1);
544 }
545
546 /* might be called from thread context */
memimport_release_cb(pa_memimport * i,uint32_t block_id,void * userdata)547 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
548 pa_pstream *p = userdata;
549
550 pa_assert(p);
551 pa_assert(PA_REFCNT_VALUE(p) > 0);
552
553 if (p->dead)
554 return;
555
556 if (p->release_callback)
557 p->release_callback(p, block_id, p->release_callback_userdata);
558 else
559 pa_pstream_send_release(p, block_id);
560 }
561
pa_pstream_send_revoke(pa_pstream * p,uint32_t block_id)562 void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id) {
563 struct item_info *item;
564 pa_assert(p);
565 pa_assert(PA_REFCNT_VALUE(p) > 0);
566
567 if (p->dead)
568 return;
569 /* pa_log("Revoking block %u", block_id); */
570
571 if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
572 item = pa_xnew(struct item_info, 1);
573 item->type = PA_PSTREAM_ITEM_SHMREVOKE;
574 item->block_id = block_id;
575 #ifdef HAVE_CREDS
576 item->with_ancil_data = false;
577 #endif
578
579 pa_queue_push(p->send_queue, item);
580 p->mainloop->defer_enable(p->defer_event, 1);
581 }
582
583 /* might be called from thread context */
memexport_revoke_cb(pa_memexport * e,uint32_t block_id,void * userdata)584 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
585 pa_pstream *p = userdata;
586
587 pa_assert(p);
588 pa_assert(PA_REFCNT_VALUE(p) > 0);
589
590 if (p->revoke_callback)
591 p->revoke_callback(p, block_id, p->revoke_callback_userdata);
592 else
593 pa_pstream_send_revoke(p, block_id);
594 }
595
prepare_next_write_item(pa_pstream * p)596 static void prepare_next_write_item(pa_pstream *p) {
597 pa_assert(p);
598 pa_assert(PA_REFCNT_VALUE(p) > 0);
599
600 p->write.current = pa_queue_pop(p->send_queue);
601
602 if (!p->write.current)
603 return;
604 p->write.index = 0;
605 p->write.data = NULL;
606 p->write.minibuf_validsize = 0;
607 pa_memchunk_reset(&p->write.memchunk);
608
609 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
610 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
611 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
612 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
613 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
614
615 if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
616 size_t plen;
617
618 pa_assert(p->write.current->packet);
619
620 p->write.data = (void *) pa_packet_data(p->write.current->packet, &plen);
621 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) plen);
622
623 if (plen <= MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
624 memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, plen);
625 p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + plen;
626 }
627
628 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
629
630 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
631 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
632
633 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
634
635 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
636 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
637
638 } else {
639 uint32_t flags;
640 bool send_payload = true;
641
642 pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
643 pa_assert(p->write.current->chunk.memblock);
644
645 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
646 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
647 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
648
649 flags = (uint32_t) (p->write.current->seek_mode & PA_FLAG_SEEKMASK);
650
651 if (p->use_shm) {
652 pa_mem_type_t type;
653 uint32_t block_id, shm_id;
654 size_t offset, length;
655 uint32_t *shm_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE];
656 size_t shm_size = sizeof(uint32_t) * PA_PSTREAM_SHM_MAX;
657 pa_mempool *current_pool = pa_memblock_get_pool(p->write.current->chunk.memblock);
658 pa_memexport *current_export;
659
660 if (p->mempool == current_pool)
661 pa_assert_se(current_export = p->export);
662 else
663 pa_assert_se(current_export = pa_memexport_new(current_pool, memexport_revoke_cb, p));
664
665 if (pa_memexport_put(current_export,
666 p->write.current->chunk.memblock,
667 &type,
668 &block_id,
669 &shm_id,
670 &offset,
671 &length) >= 0) {
672
673 if (type == PA_MEM_TYPE_SHARED_POSIX)
674 send_payload = false;
675
676 if (type == PA_MEM_TYPE_SHARED_MEMFD && p->use_memfd) {
677 if (pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
678 flags |= PA_FLAG_SHMDATA_MEMFD_BLOCK;
679 send_payload = false;
680 } else {
681 if (!p->non_registered_memfd_id_error_logged) {
682 pa_log("Cannot send block reference with non-registered memfd ID = %u", shm_id);
683 pa_log("Falling back to copying full block data over socket");
684 pa_log("There's a bug report about this: https://gitlab.freedesktop.org/pulseaudio/pulseaudio/issues/824");
685 p->non_registered_memfd_id_error_logged = true;
686 }
687 }
688 }
689
690 if (send_payload) {
691 pa_assert_se(pa_memexport_process_release(current_export, block_id) == 0);
692 } else {
693 flags |= PA_FLAG_SHMDATA;
694 if (pa_mempool_is_remote_writable(current_pool))
695 flags |= PA_FLAG_SHMWRITABLE;
696
697 shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
698 shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
699 shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
700 shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
701
702 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(shm_size);
703 p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + shm_size;
704 }
705 }
706 /* else */
707 /* FIXME: Avoid memexport slot leaks. Call pa_memexport_process_release() */
708 /* pa_log_warn("Failed to export memory block."); */
709
710 if (current_export != p->export)
711 pa_memexport_free(current_export);
712 pa_mempool_unref(current_pool);
713 }
714
715 if (send_payload) {
716 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
717 p->write.memchunk = p->write.current->chunk;
718 pa_memblock_ref(p->write.memchunk.memblock);
719 }
720
721 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
722 }
723
724 #ifdef HAVE_CREDS
725 if ((p->send_ancil_data_now = p->write.current->with_ancil_data))
726 p->write_ancil_data = &p->write.current->ancil_data;
727 #endif
728 }
729
check_srbpending(pa_pstream * p)730 static void check_srbpending(pa_pstream *p) {
731 if (!p->is_srbpending)
732 return;
733
734 if (p->srb)
735 pa_srbchannel_free(p->srb);
736
737 p->srb = p->srbpending;
738 p->is_srbpending = false;
739
740 if (p->srb)
741 pa_srbchannel_set_callback(p->srb, srb_callback, p);
742 }
743
do_write(pa_pstream * p)744 static int do_write(pa_pstream *p) {
745 void *d;
746 size_t l;
747 ssize_t r;
748 pa_memblock *release_memblock = NULL;
749
750 pa_assert(p);
751 pa_assert(PA_REFCNT_VALUE(p) > 0);
752
753 if (!p->write.current)
754 prepare_next_write_item(p);
755
756 if (!p->write.current) {
757 /* The out queue is empty, so switching channels is safe */
758 check_srbpending(p);
759 return 0;
760 }
761
762 if (p->write.minibuf_validsize > 0) {
763 d = p->write.minibuf + p->write.index;
764 l = p->write.minibuf_validsize - p->write.index;
765 } else if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
766 d = (uint8_t*) p->write.descriptor + p->write.index;
767 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
768 } else {
769 pa_assert(p->write.data || p->write.memchunk.memblock);
770
771 if (p->write.data)
772 d = p->write.data;
773 else {
774 d = pa_memblock_acquire_chunk(&p->write.memchunk);
775 release_memblock = p->write.memchunk.memblock;
776 }
777
778 d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
779 l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
780 }
781
782 pa_assert(l > 0);
783
784 #ifdef HAVE_CREDS
785 if (p->send_ancil_data_now) {
786 if (p->write_ancil_data->creds_valid) {
787 pa_assert(p->write_ancil_data->nfd == 0);
788 if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_ancil_data->creds)) < 0)
789 goto fail;
790 }
791 else
792 if ((r = pa_iochannel_write_with_fds(p->io, d, l, p->write_ancil_data->nfd, p->write_ancil_data->fds)) < 0)
793 goto fail;
794
795 pa_cmsg_ancil_data_close_fds(p->write_ancil_data);
796 p->send_ancil_data_now = false;
797 } else
798 #endif
799 if (p->srb)
800 r = pa_srbchannel_write(p->srb, d, l);
801 else if ((r = pa_iochannel_write(p->io, d, l)) < 0)
802 goto fail;
803
804 if (release_memblock)
805 pa_memblock_release(release_memblock);
806
807 p->write.index += (size_t) r;
808
809 if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
810 pa_assert(p->write.current);
811 item_free(p->write.current);
812 p->write.current = NULL;
813
814 if (p->write.memchunk.memblock)
815 pa_memblock_unref(p->write.memchunk.memblock);
816
817 pa_memchunk_reset(&p->write.memchunk);
818
819 if (p->drain_callback && !pa_pstream_is_pending(p))
820 p->drain_callback(p, p->drain_callback_userdata);
821 }
822
823 return (size_t) r == l ? 1 : 0;
824
825 fail:
826 #ifdef HAVE_CREDS
827 if (p->send_ancil_data_now)
828 pa_cmsg_ancil_data_close_fds(p->write_ancil_data);
829 #endif
830
831 if (release_memblock)
832 pa_memblock_release(release_memblock);
833
834 return -1;
835 }
836
memblock_complete(pa_pstream * p,struct pstream_read * re)837 static void memblock_complete(pa_pstream *p, struct pstream_read *re) {
838 pa_memchunk chunk;
839 int64_t offset;
840
841 if (!p->receive_memblock_callback)
842 return;
843
844 chunk.memblock = re->memblock;
845 chunk.index = 0;
846 chunk.length = re->index - PA_PSTREAM_DESCRIPTOR_SIZE;
847
848 offset = (int64_t) (
849 (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
850 (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
851
852 p->receive_memblock_callback(
853 p,
854 ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
855 offset,
856 ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
857 &chunk,
858 p->receive_memblock_callback_userdata);
859 }
860
do_read(pa_pstream * p,struct pstream_read * re)861 static int do_read(pa_pstream *p, struct pstream_read *re) {
862 void *d;
863 size_t l;
864 ssize_t r;
865 pa_memblock *release_memblock = NULL;
866 pa_assert(p);
867 pa_assert(PA_REFCNT_VALUE(p) > 0);
868
869 if (re->index < PA_PSTREAM_DESCRIPTOR_SIZE) {
870 d = (uint8_t*) re->descriptor + re->index;
871 l = PA_PSTREAM_DESCRIPTOR_SIZE - re->index;
872 } else {
873 pa_assert(re->data || re->memblock);
874
875 if (re->data)
876 d = re->data;
877 else {
878 d = pa_memblock_acquire(re->memblock);
879 release_memblock = re->memblock;
880 }
881
882 d = (uint8_t*) d + re->index - PA_PSTREAM_DESCRIPTOR_SIZE;
883 l = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (re->index - PA_PSTREAM_DESCRIPTOR_SIZE);
884 }
885
886 if (re == &p->readsrb) {
887 r = pa_srbchannel_read(p->srb, d, l);
888 if (r == 0) {
889 if (release_memblock)
890 pa_memblock_release(release_memblock);
891 return 1;
892 }
893 }
894 else
895 #ifdef HAVE_CREDS
896 {
897 pa_cmsg_ancil_data b;
898
899 if ((r = pa_iochannel_read_with_ancil_data(p->io, d, l, &b)) <= 0)
900 goto fail;
901
902 if (b.creds_valid) {
903 p->read_ancil_data.creds_valid = true;
904 p->read_ancil_data.creds = b.creds;
905 }
906 if (b.nfd > 0) {
907 pa_assert(b.nfd <= MAX_ANCIL_DATA_FDS);
908 p->read_ancil_data.nfd = b.nfd;
909 memcpy(p->read_ancil_data.fds, b.fds, sizeof(int) * b.nfd);
910 p->read_ancil_data.close_fds_on_cleanup = b.close_fds_on_cleanup;
911 }
912 }
913 #else
914 if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
915 goto fail;
916 #endif
917
918 if (release_memblock)
919 pa_memblock_release(release_memblock);
920
921 re->index += (size_t) r;
922
923 if (re->index == PA_PSTREAM_DESCRIPTOR_SIZE) {
924 uint32_t flags, length, channel;
925 /* Reading of frame descriptor complete */
926
927 flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
928
929 if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
930 pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
931 return -1;
932 }
933
934 if (flags == PA_FLAG_SHMRELEASE) {
935
936 /* This is a SHM memblock release frame with no payload */
937
938 /* pa_log("Got release frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
939
940 pa_assert(p->export);
941 pa_memexport_process_release(p->export, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
942
943 goto frame_done;
944
945 } else if (flags == PA_FLAG_SHMREVOKE) {
946
947 /* This is a SHM memblock revoke frame with no payload */
948
949 /* pa_log("Got revoke frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
950
951 pa_assert(p->import);
952 pa_memimport_process_revoke(p->import, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
953
954 goto frame_done;
955 }
956
957 length = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
958
959 if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
960 pa_log_warn("Received invalid frame size: %lu", (unsigned long) length);
961 return -1;
962 }
963
964 pa_assert(!re->packet && !re->memblock);
965
966 channel = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
967
968 if (channel == (uint32_t) -1) {
969 size_t plen;
970
971 if (flags != 0) {
972 pa_log_warn("Received packet frame with invalid flags value.");
973 return -1;
974 }
975
976 /* Frame is a packet frame */
977 re->packet = pa_packet_new(length);
978 re->data = (void *) pa_packet_data(re->packet, &plen);
979
980 } else {
981
982 if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
983 pa_log_warn("Received memblock frame with invalid seek mode.");
984 return -1;
985 }
986
987 if (((flags & PA_FLAG_SHMMASK) & PA_FLAG_SHMDATA) != 0) {
988
989 if (length != sizeof(re->shm_info)) {
990 pa_log_warn("Received SHM memblock frame with invalid frame length.");
991 return -1;
992 }
993
994 /* Frame is a memblock frame referencing an SHM memblock */
995 re->data = re->shm_info;
996
997 } else if ((flags & PA_FLAG_SHMMASK) == 0) {
998
999 /* Frame is a memblock frame */
1000
1001 re->memblock = pa_memblock_new(p->mempool, length);
1002 re->data = NULL;
1003 } else {
1004
1005 pa_log_warn("Received memblock frame with invalid flags value.");
1006 return -1;
1007 }
1008 }
1009
1010 } else if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
1011 /* Frame complete */
1012
1013 if (re->memblock) {
1014 memblock_complete(p, re);
1015
1016 /* This was a memblock frame. We can unref the memblock now */
1017 pa_memblock_unref(re->memblock);
1018
1019 } else if (re->packet) {
1020
1021 if (p->receive_packet_callback)
1022 #ifdef HAVE_CREDS
1023 p->receive_packet_callback(p, re->packet, &p->read_ancil_data, p->receive_packet_callback_userdata);
1024 #else
1025 p->receive_packet_callback(p, re->packet, NULL, p->receive_packet_callback_userdata);
1026 #endif
1027
1028 pa_packet_unref(re->packet);
1029 } else {
1030 pa_memblock *b = NULL;
1031 uint32_t flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
1032 uint32_t shm_id = ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]);
1033 pa_mem_type_t type = (flags & PA_FLAG_SHMDATA_MEMFD_BLOCK) ?
1034 PA_MEM_TYPE_SHARED_MEMFD : PA_MEM_TYPE_SHARED_POSIX;
1035
1036 pa_assert(((flags & PA_FLAG_SHMMASK) & PA_FLAG_SHMDATA) != 0);
1037 pa_assert(p->import);
1038
1039 if (type == PA_MEM_TYPE_SHARED_MEMFD && p->use_memfd &&
1040 !pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
1041
1042 if (pa_log_ratelimit(PA_LOG_ERROR))
1043 pa_log("Ignoring received block reference with non-registered memfd ID = %u", shm_id);
1044
1045 } else if (!(b = pa_memimport_get(p->import,
1046 type,
1047 ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]),
1048 shm_id,
1049 ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]),
1050 ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]),
1051 !!(flags & PA_FLAG_SHMWRITABLE)))) {
1052
1053 if (pa_log_ratelimit(PA_LOG_DEBUG))
1054 pa_log_debug("Failed to import memory block.");
1055 }
1056
1057 if (p->receive_memblock_callback) {
1058 int64_t offset;
1059 pa_memchunk chunk;
1060
1061 chunk.memblock = b;
1062 chunk.index = 0;
1063 chunk.length = b ? pa_memblock_get_length(b) : ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]);
1064
1065 offset = (int64_t) (
1066 (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
1067 (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
1068
1069 p->receive_memblock_callback(
1070 p,
1071 ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
1072 offset,
1073 ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
1074 &chunk,
1075 p->receive_memblock_callback_userdata);
1076 }
1077
1078 if (b)
1079 pa_memblock_unref(b);
1080 }
1081
1082 goto frame_done;
1083 }
1084
1085 return 0;
1086
1087 frame_done:
1088 re->memblock = NULL;
1089 re->packet = NULL;
1090 re->index = 0;
1091 re->data = NULL;
1092
1093 #ifdef HAVE_CREDS
1094 /* FIXME: Close received ancillary data fds if the pstream's
1095 * receive_packet_callback did not do so.
1096 *
1097 * Malicious clients can attach fds to unknown commands, or attach them
1098 * to commands that does not expect fds. By doing so, server will reach
1099 * its open fd limit and future clients' SHM transfers will always fail.
1100 */
1101 p->read_ancil_data.creds_valid = false;
1102 p->read_ancil_data.nfd = 0;
1103 #endif
1104
1105 return 0;
1106
1107 fail:
1108 if (release_memblock)
1109 pa_memblock_release(release_memblock);
1110
1111 return -1;
1112 }
1113
pa_pstream_set_die_callback(pa_pstream * p,pa_pstream_notify_cb_t cb,void * userdata)1114 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
1115 pa_assert(p);
1116 pa_assert(PA_REFCNT_VALUE(p) > 0);
1117
1118 p->die_callback = cb;
1119 p->die_callback_userdata = userdata;
1120 }
1121
pa_pstream_set_drain_callback(pa_pstream * p,pa_pstream_notify_cb_t cb,void * userdata)1122 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
1123 pa_assert(p);
1124 pa_assert(PA_REFCNT_VALUE(p) > 0);
1125
1126 p->drain_callback = cb;
1127 p->drain_callback_userdata = userdata;
1128 }
1129
pa_pstream_set_receive_packet_callback(pa_pstream * p,pa_pstream_packet_cb_t cb,void * userdata)1130 void pa_pstream_set_receive_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
1131 pa_assert(p);
1132 pa_assert(PA_REFCNT_VALUE(p) > 0);
1133
1134 p->receive_packet_callback = cb;
1135 p->receive_packet_callback_userdata = userdata;
1136 }
1137
pa_pstream_set_receive_memblock_callback(pa_pstream * p,pa_pstream_memblock_cb_t cb,void * userdata)1138 void pa_pstream_set_receive_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
1139 pa_assert(p);
1140 pa_assert(PA_REFCNT_VALUE(p) > 0);
1141
1142 p->receive_memblock_callback = cb;
1143 p->receive_memblock_callback_userdata = userdata;
1144 }
1145
pa_pstream_set_release_callback(pa_pstream * p,pa_pstream_block_id_cb_t cb,void * userdata)1146 void pa_pstream_set_release_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
1147 pa_assert(p);
1148 pa_assert(PA_REFCNT_VALUE(p) > 0);
1149
1150 p->release_callback = cb;
1151 p->release_callback_userdata = userdata;
1152 }
1153
pa_pstream_set_revoke_callback(pa_pstream * p,pa_pstream_block_id_cb_t cb,void * userdata)1154 void pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
1155 pa_assert(p);
1156 pa_assert(PA_REFCNT_VALUE(p) > 0);
1157
1158 p->revoke_callback = cb;
1159 p->revoke_callback_userdata = userdata;
1160 }
1161
pa_pstream_is_pending(pa_pstream * p)1162 bool pa_pstream_is_pending(pa_pstream *p) {
1163 bool b;
1164
1165 pa_assert(p);
1166 pa_assert(PA_REFCNT_VALUE(p) > 0);
1167
1168 if (p->dead)
1169 b = false;
1170 else
1171 b = p->write.current || !pa_queue_isempty(p->send_queue);
1172
1173 return b;
1174 }
1175
pa_pstream_unref(pa_pstream * p)1176 void pa_pstream_unref(pa_pstream*p) {
1177 pa_assert(p);
1178 pa_assert(PA_REFCNT_VALUE(p) > 0);
1179
1180 if (PA_REFCNT_DEC(p) <= 0)
1181 pstream_free(p);
1182 }
1183
pa_pstream_ref(pa_pstream * p)1184 pa_pstream* pa_pstream_ref(pa_pstream*p) {
1185 pa_assert(p);
1186 pa_assert(PA_REFCNT_VALUE(p) > 0);
1187
1188 PA_REFCNT_INC(p);
1189 return p;
1190 }
1191
pa_pstream_unlink(pa_pstream * p)1192 void pa_pstream_unlink(pa_pstream *p) {
1193 pa_assert(p);
1194
1195 if (p->dead)
1196 return;
1197
1198 p->dead = true;
1199
1200 while (p->srb || p->is_srbpending) /* In theory there could be one active and one pending */
1201 pa_pstream_set_srbchannel(p, NULL);
1202
1203 if (p->import) {
1204 pa_memimport_free(p->import);
1205 p->import = NULL;
1206 }
1207
1208 if (p->export) {
1209 pa_memexport_free(p->export);
1210 p->export = NULL;
1211 }
1212
1213 if (p->io) {
1214 pa_iochannel_free(p->io);
1215 p->io = NULL;
1216 }
1217
1218 if (p->defer_event) {
1219 p->mainloop->defer_free(p->defer_event);
1220 p->defer_event = NULL;
1221 }
1222
1223 p->die_callback = NULL;
1224 p->drain_callback = NULL;
1225 p->receive_packet_callback = NULL;
1226 p->receive_memblock_callback = NULL;
1227 }
1228
pa_pstream_enable_shm(pa_pstream * p,bool enable)1229 void pa_pstream_enable_shm(pa_pstream *p, bool enable) {
1230 pa_assert(p);
1231 pa_assert(PA_REFCNT_VALUE(p) > 0);
1232
1233 p->use_shm = enable;
1234
1235 if (enable) {
1236
1237 if (!p->export)
1238 p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
1239
1240 } else {
1241
1242 if (p->export) {
1243 pa_memexport_free(p->export);
1244 p->export = NULL;
1245 }
1246 }
1247 }
1248
pa_pstream_enable_memfd(pa_pstream * p)1249 void pa_pstream_enable_memfd(pa_pstream *p) {
1250 pa_assert(p);
1251 pa_assert(PA_REFCNT_VALUE(p) > 0);
1252 pa_assert(p->use_shm);
1253
1254 p->use_memfd = true;
1255
1256 if (!p->registered_memfd_ids) {
1257 p->registered_memfd_ids = pa_idxset_new(NULL, NULL);
1258 }
1259 }
1260
pa_pstream_get_shm(pa_pstream * p)1261 bool pa_pstream_get_shm(pa_pstream *p) {
1262 pa_assert(p);
1263 pa_assert(PA_REFCNT_VALUE(p) > 0);
1264
1265 return p->use_shm;
1266 }
1267
pa_pstream_get_memfd(pa_pstream * p)1268 bool pa_pstream_get_memfd(pa_pstream *p) {
1269 pa_assert(p);
1270 pa_assert(PA_REFCNT_VALUE(p) > 0);
1271
1272 return p->use_memfd;
1273 }
1274
pa_pstream_set_srbchannel(pa_pstream * p,pa_srbchannel * srb)1275 void pa_pstream_set_srbchannel(pa_pstream *p, pa_srbchannel *srb) {
1276 pa_assert(p);
1277 pa_assert(PA_REFCNT_VALUE(p) > 0 || srb == NULL);
1278
1279 if (srb == p->srb)
1280 return;
1281
1282 /* We can't handle quick switches between srbchannels. */
1283 pa_assert(!p->is_srbpending);
1284
1285 p->srbpending = srb;
1286 p->is_srbpending = true;
1287
1288 /* Switch immediately, if possible. */
1289 if (p->dead)
1290 check_srbpending(p);
1291 else
1292 do_write(p);
1293 }
1294