1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as
9 published by the Free Software Foundation; either version 2.1 of the
10 License, or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public
18 License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <unistd.h>
28
29 #ifdef HAVE_NETINET_IN_H
30 #include <netinet/in.h>
31 #endif
32
33 #include <pulse/xmalloc.h>
34
35 #include <pulsecore/idxset.h>
36 #include <pulsecore/socket.h>
37 #include <pulsecore/queue.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/creds.h>
40 #include <pulsecore/refcnt.h>
41 #include <pulsecore/flist.h>
42 #include <pulsecore/macro.h>
43
44 #include "pstream.h"
45
46 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
47 #define PA_FLAG_SHMDATA 0x80000000LU
48 #define PA_FLAG_SHMDATA_MEMFD_BLOCK 0x20000000LU
49 #define PA_FLAG_SHMRELEASE 0x40000000LU
50 #define PA_FLAG_SHMREVOKE 0xC0000000LU
51 #define PA_FLAG_SHMMASK 0xFF000000LU
52 #define PA_FLAG_SEEKMASK 0x000000FFLU
53 #define PA_FLAG_SHMWRITABLE 0x00800000LU
54
55 /* The sequence descriptor header consists of 5 32bit integers: */
56 enum {
57 PA_PSTREAM_DESCRIPTOR_LENGTH,
58 PA_PSTREAM_DESCRIPTOR_CHANNEL,
59 PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
60 PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
61 PA_PSTREAM_DESCRIPTOR_FLAGS,
62 PA_PSTREAM_DESCRIPTOR_MAX
63 };
64
65 /* If we have an SHM block, this info follows the descriptor */
66 enum {
67 PA_PSTREAM_SHM_BLOCKID,
68 PA_PSTREAM_SHM_SHMID,
69 PA_PSTREAM_SHM_INDEX,
70 PA_PSTREAM_SHM_LENGTH,
71 PA_PSTREAM_SHM_MAX
72 };
73
74 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
75
76 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
77
78 #define MINIBUF_SIZE (256)
79
80 /* To allow uploading a single sample in one frame, this value should be the
81 * same size (16 MB) as PA_SCACHE_ENTRY_SIZE_MAX from pulsecore/core-scache.h.
82 */
83 #define FRAME_SIZE_MAX_ALLOW (1024*1024*16)
84
85 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
86
87 struct item_info {
88 enum {
89 PA_PSTREAM_ITEM_PACKET,
90 PA_PSTREAM_ITEM_MEMBLOCK,
91 PA_PSTREAM_ITEM_SHMRELEASE,
92 PA_PSTREAM_ITEM_SHMREVOKE
93 } type;
94
95 /* packet info */
96 pa_packet *packet;
97 #ifdef HAVE_CREDS
98 bool with_ancil_data;
99 pa_cmsg_ancil_data ancil_data;
100 #endif
101
102 /* memblock info */
103 pa_memchunk chunk;
104 uint32_t channel;
105 int64_t offset;
106 pa_seek_mode_t seek_mode;
107
108 /* release/revoke info */
109 uint32_t block_id;
110 };
111
112 struct pstream_read {
113 pa_pstream_descriptor descriptor;
114 pa_memblock *memblock;
115 pa_packet *packet;
116 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
117 void *data;
118 size_t index;
119 };
120
121 struct pa_pstream {
122 PA_REFCNT_DECLARE;
123
124 pa_mainloop_api *mainloop;
125 pa_defer_event *defer_event;
126 pa_iochannel *io;
127 pa_srbchannel *srb, *srbpending;
128 bool is_srbpending;
129
130 pa_queue *send_queue;
131
132 bool dead;
133
134 struct {
135 union {
136 uint8_t minibuf[MINIBUF_SIZE];
137 pa_pstream_descriptor descriptor;
138 };
139 struct item_info* current;
140 void *data;
141 size_t index;
142 int minibuf_validsize;
143 pa_memchunk memchunk;
144 } write;
145
146 struct pstream_read readio, readsrb;
147
148 /* @use_shm: beside copying the full audio data to the other
149 * PA end, this pipe supports just sending references of the
150 * same audio data blocks if they reside in a SHM pool.
151 *
152 * @use_memfd: pipe supports sending SHM memfd block references
153 *
154 * @registered_memfd_ids: registered memfd pools SHM IDs. Check
155 * pa_pstream_register_memfd_mempool() for more information. */
156 bool use_shm, use_memfd;
157 pa_idxset *registered_memfd_ids;
158
159 pa_memimport *import;
160 pa_memexport *export;
161
162 pa_pstream_packet_cb_t receive_packet_callback;
163 void *receive_packet_callback_userdata;
164
165 pa_pstream_memblock_cb_t receive_memblock_callback;
166 void *receive_memblock_callback_userdata;
167
168 pa_pstream_notify_cb_t drain_callback;
169 void *drain_callback_userdata;
170
171 pa_pstream_notify_cb_t die_callback;
172 void *die_callback_userdata;
173
174 pa_pstream_block_id_cb_t revoke_callback;
175 void *revoke_callback_userdata;
176
177 pa_pstream_block_id_cb_t release_callback;
178 void *release_callback_userdata;
179
180 pa_mempool *mempool;
181
182 #ifdef HAVE_CREDS
183 pa_cmsg_ancil_data read_ancil_data, *write_ancil_data;
184 bool send_ancil_data_now;
185 #endif
186 };
187
188 #ifdef HAVE_CREDS
189 /*
190 * memfd-backed SHM pools blocks transfer occur without passing the pool's
191 * fd every time, thus minimizing overhead and avoiding fd leaks. A
192 * REGISTER_MEMFD_SHMID command is sent, with the pool's memfd fd, very early
193 * on. This command has an ID that uniquely identifies the pool in question.
194 * Further pool's block references can then be exclusively done using such ID;
195 * the fd can be safely closed – on both ends – afterwards.
196 *
197 * On the sending side of this command, we want to close the passed fds
198 * directly after being sent. Meanwhile we're only allowed to asynchronously
199 * schedule packet writes to the pstream, so the job of closing passed fds is
200 * left to the pstream's actual writing function do_write(): it knows the
201 * exact point in time where the fds are passed to the other end through
202 * iochannels and the sendmsg() system call.
203 *
204 * Nonetheless not all code paths in the system desire their socket-passed
205 * fds to be closed after the send. srbchannel needs the passed fds to still
206 * be open for further communication. System-wide global memfd-backed pools
207 * also require the passed fd to be open: they pass the same fd, with the same
208 * ID registration mechanism, for each newly connected client to the system.
209 *
210 * So from all of the above, never close the ancillary fds by your own and
211 * always call below method instead. It takes care of closing the passed fds
212 * _only if allowed_ by the code paths that originally created them to do so.
213 * Moreover, it is multiple-invocations safe: failure handlers can, and
214 * should, call it for passed fds cleanup without worrying too much about
215 * the system state.
216 */
pa_cmsg_ancil_data_close_fds(struct pa_cmsg_ancil_data * ancil)217 void pa_cmsg_ancil_data_close_fds(struct pa_cmsg_ancil_data *ancil) {
218 if (ancil && ancil->nfd > 0 && ancil->close_fds_on_cleanup) {
219 int i;
220
221 pa_assert(ancil->nfd <= MAX_ANCIL_DATA_FDS);
222
223 for (i = 0; i < ancil->nfd; i++)
224 if (ancil->fds[i] != -1) {
225 pa_assert_se(pa_close(ancil->fds[i]) == 0);
226 ancil->fds[i] = -1;
227 }
228
229 ancil->nfd = 0;
230 ancil->close_fds_on_cleanup = false;
231 }
232 }
233 #endif
234
235 static int do_write(pa_pstream *p);
236 static int do_read(pa_pstream *p, struct pstream_read *re);
237
do_pstream_read_write(pa_pstream * p)238 static void do_pstream_read_write(pa_pstream *p) {
239 pa_assert(p);
240 pa_assert(PA_REFCNT_VALUE(p) > 0);
241
242 pa_pstream_ref(p);
243
244 p->mainloop->defer_enable(p->defer_event, 0);
245
246 if (!p->dead && p->srb) {
247 int r = 0;
248
249 if(do_write(p) < 0)
250 goto fail;
251
252 while (!p->dead && r == 0) {
253 r = do_read(p, &p->readsrb);
254 if (r < 0)
255 goto fail;
256 }
257 }
258
259 if (!p->dead && pa_iochannel_is_readable(p->io)) {
260 if (do_read(p, &p->readio) < 0)
261 goto fail;
262 } else if (!p->dead && pa_iochannel_is_hungup(p->io))
263 goto fail;
264
265 while (!p->dead && pa_iochannel_is_writable(p->io)) {
266 int r = do_write(p);
267 if (r < 0)
268 goto fail;
269 if (r == 0)
270 break;
271 }
272
273 pa_pstream_unref(p);
274 return;
275
276 fail:
277
278 if (p->die_callback)
279 p->die_callback(p, p->die_callback_userdata);
280
281 pa_pstream_unlink(p);
282 pa_pstream_unref(p);
283 }
284
srb_callback(pa_srbchannel * srb,void * userdata)285 static bool srb_callback(pa_srbchannel *srb, void *userdata) {
286 bool b;
287 pa_pstream *p = userdata;
288
289 pa_assert(p);
290 pa_assert(PA_REFCNT_VALUE(p) > 0);
291 pa_assert(p->srb == srb);
292
293 pa_pstream_ref(p);
294
295 do_pstream_read_write(p);
296
297 /* If either pstream or the srb is going away, return false.
298 We need to check this before p is destroyed. */
299 b = (PA_REFCNT_VALUE(p) > 1) && (p->srb == srb);
300 pa_pstream_unref(p);
301
302 return b;
303 }
304
io_callback(pa_iochannel * io,void * userdata)305 static void io_callback(pa_iochannel*io, void *userdata) {
306 pa_pstream *p = userdata;
307
308 pa_assert(p);
309 pa_assert(PA_REFCNT_VALUE(p) > 0);
310 pa_assert(p->io == io);
311
312 do_pstream_read_write(p);
313 }
314
defer_callback(pa_mainloop_api * m,pa_defer_event * e,void * userdata)315 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
316 pa_pstream *p = userdata;
317
318 pa_assert(p);
319 pa_assert(PA_REFCNT_VALUE(p) > 0);
320 pa_assert(p->defer_event == e);
321 pa_assert(p->mainloop == m);
322
323 do_pstream_read_write(p);
324 }
325
326 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
327
pa_pstream_new(pa_mainloop_api * m,pa_iochannel * io,pa_mempool * pool)328 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
329 pa_pstream *p;
330
331 pa_assert(m);
332 pa_assert(io);
333 pa_assert(pool);
334
335 p = pa_xnew0(pa_pstream, 1);
336 PA_REFCNT_INIT(p);
337 p->io = io;
338 pa_iochannel_set_callback(io, io_callback, p);
339
340 p->mainloop = m;
341 p->defer_event = m->defer_new(m, defer_callback, p);
342 m->defer_enable(p->defer_event, 0);
343
344 p->send_queue = pa_queue_new();
345
346 p->mempool = pool;
347
348 /* We do importing unconditionally */
349 p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
350
351 pa_iochannel_socket_set_rcvbuf(io, pa_mempool_block_size_max(p->mempool));
352 pa_iochannel_socket_set_sndbuf(io, pa_mempool_block_size_max(p->mempool));
353
354 return p;
355 }
356
357 /* Attach memfd<->SHM_ID mapping to given pstream and its memimport.
358 * Check pa_pstream_register_memfd_mempool() for further info.
359 *
360 * Caller owns the passed @memfd_fd and must close it down when appropriate. */
pa_pstream_attach_memfd_shmid(pa_pstream * p,unsigned shm_id,int memfd_fd)361 int pa_pstream_attach_memfd_shmid(pa_pstream *p, unsigned shm_id, int memfd_fd) {
362 int err = -1;
363
364 pa_assert(memfd_fd != -1);
365
366 if (!p->use_memfd) {
367 pa_log_warn("Received memfd ID registration request over a pipe "
368 "that does not support memfds");
369 return err;
370 }
371
372 if (pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
373 pa_log_warn("previously registered memfd SHM ID = %u", shm_id);
374 return err;
375 }
376
377 if (pa_memimport_attach_memfd(p->import, shm_id, memfd_fd, true)) {
378 pa_log("Failed to create permanent mapping for memfd region with ID = %u", shm_id);
379 return err;
380 }
381
382 pa_assert_se(pa_idxset_put(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL) == 0);
383 return 0;
384 }
385
item_free(void * item)386 static void item_free(void *item) {
387 struct item_info *i = item;
388 pa_assert(i);
389
390 if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
391 pa_assert(i->chunk.memblock);
392 pa_memblock_unref(i->chunk.memblock);
393 } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
394 pa_assert(i->packet);
395 pa_packet_unref(i->packet);
396 }
397
398 #ifdef HAVE_CREDS
399 /* On error recovery paths, there might be lingering items
400 * on the pstream send queue and they are usually freed with
401 * a call to 'pa_queue_free(p->send_queue, item_free)'. Make
402 * sure we do not leak any fds in that case! */
403 if (i->with_ancil_data)
404 pa_cmsg_ancil_data_close_fds(&i->ancil_data);
405 #endif
406
407 if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
408 pa_xfree(i);
409 }
410
pstream_free(pa_pstream * p)411 static void pstream_free(pa_pstream *p) {
412 pa_assert(p);
413
414 pa_pstream_unlink(p);
415
416 pa_queue_free(p->send_queue, item_free);
417
418 if (p->write.current)
419 item_free(p->write.current);
420
421 if (p->write.memchunk.memblock)
422 pa_memblock_unref(p->write.memchunk.memblock);
423
424 if (p->readsrb.memblock)
425 pa_memblock_unref(p->readsrb.memblock);
426
427 if (p->readsrb.packet)
428 pa_packet_unref(p->readsrb.packet);
429
430 if (p->readio.memblock)
431 pa_memblock_unref(p->readio.memblock);
432
433 if (p->readio.packet)
434 pa_packet_unref(p->readio.packet);
435
436 if (p->registered_memfd_ids)
437 pa_idxset_free(p->registered_memfd_ids, NULL);
438
439 pa_xfree(p);
440 }
441
pa_pstream_send_packet(pa_pstream * p,pa_packet * packet,pa_cmsg_ancil_data * ancil_data)442 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, pa_cmsg_ancil_data *ancil_data) {
443 struct item_info *i;
444
445 pa_assert(p);
446 pa_assert(PA_REFCNT_VALUE(p) > 0);
447 pa_assert(packet);
448
449 if (p->dead) {
450 #ifdef HAVE_CREDS
451 pa_cmsg_ancil_data_close_fds(ancil_data);
452 #endif
453 return;
454 }
455
456 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
457 i = pa_xnew(struct item_info, 1);
458
459 i->type = PA_PSTREAM_ITEM_PACKET;
460 i->packet = pa_packet_ref(packet);
461
462 #ifdef HAVE_CREDS
463 if ((i->with_ancil_data = !!ancil_data)) {
464 i->ancil_data = *ancil_data;
465 if (ancil_data->creds_valid)
466 pa_assert(ancil_data->nfd == 0);
467 else
468 pa_assert(ancil_data->nfd > 0);
469 }
470 #endif
471
472 pa_queue_push(p->send_queue, i);
473
474 p->mainloop->defer_enable(p->defer_event, 1);
475 }
476
pa_pstream_send_memblock(pa_pstream * p,uint32_t channel,int64_t offset,pa_seek_mode_t seek_mode,const pa_memchunk * chunk)477 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
478 size_t length, idx;
479 size_t bsm;
480
481 pa_assert(p);
482 pa_assert(PA_REFCNT_VALUE(p) > 0);
483 pa_assert(channel != (uint32_t) -1);
484 pa_assert(chunk);
485
486 if (p->dead)
487 return;
488
489 idx = 0;
490 length = chunk->length;
491
492 bsm = pa_mempool_block_size_max(p->mempool);
493
494 while (length > 0) {
495 struct item_info *i;
496 size_t n;
497
498 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
499 i = pa_xnew(struct item_info, 1);
500 i->type = PA_PSTREAM_ITEM_MEMBLOCK;
501
502 n = PA_MIN(length, bsm);
503 i->chunk.index = chunk->index + idx;
504 i->chunk.length = n;
505 i->chunk.memblock = pa_memblock_ref(chunk->memblock);
506
507 i->channel = channel;
508 i->offset = offset;
509 i->seek_mode = seek_mode;
510 #ifdef HAVE_CREDS
511 i->with_ancil_data = false;
512 #endif
513
514 pa_queue_push(p->send_queue, i);
515
516 idx += n;
517 length -= n;
518 }
519
520 p->mainloop->defer_enable(p->defer_event, 1);
521 }
522
pa_pstream_send_release(pa_pstream * p,uint32_t block_id)523 void pa_pstream_send_release(pa_pstream *p, uint32_t block_id) {
524 struct item_info *item;
525 pa_assert(p);
526 pa_assert(PA_REFCNT_VALUE(p) > 0);
527
528 if (p->dead)
529 return;
530
531 /* pa_log("Releasing block %u", block_id); */
532
533 if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
534 item = pa_xnew(struct item_info, 1);
535 item->type = PA_PSTREAM_ITEM_SHMRELEASE;
536 item->block_id = block_id;
537 #ifdef HAVE_CREDS
538 item->with_ancil_data = false;
539 #endif
540
541 pa_queue_push(p->send_queue, item);
542 p->mainloop->defer_enable(p->defer_event, 1);
543 }
544
545 /* might be called from thread context */
memimport_release_cb(pa_memimport * i,uint32_t block_id,void * userdata)546 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
547 pa_pstream *p = userdata;
548
549 pa_assert(p);
550 pa_assert(PA_REFCNT_VALUE(p) > 0);
551
552 if (p->dead)
553 return;
554
555 if (p->release_callback)
556 p->release_callback(p, block_id, p->release_callback_userdata);
557 else
558 pa_pstream_send_release(p, block_id);
559 }
560
pa_pstream_send_revoke(pa_pstream * p,uint32_t block_id)561 void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id) {
562 struct item_info *item;
563 pa_assert(p);
564 pa_assert(PA_REFCNT_VALUE(p) > 0);
565
566 if (p->dead)
567 return;
568 /* pa_log("Revoking block %u", block_id); */
569
570 if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
571 item = pa_xnew(struct item_info, 1);
572 item->type = PA_PSTREAM_ITEM_SHMREVOKE;
573 item->block_id = block_id;
574 #ifdef HAVE_CREDS
575 item->with_ancil_data = false;
576 #endif
577
578 pa_queue_push(p->send_queue, item);
579 p->mainloop->defer_enable(p->defer_event, 1);
580 }
581
582 /* might be called from thread context */
memexport_revoke_cb(pa_memexport * e,uint32_t block_id,void * userdata)583 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
584 pa_pstream *p = userdata;
585
586 pa_assert(p);
587 pa_assert(PA_REFCNT_VALUE(p) > 0);
588
589 if (p->revoke_callback)
590 p->revoke_callback(p, block_id, p->revoke_callback_userdata);
591 else
592 pa_pstream_send_revoke(p, block_id);
593 }
594
prepare_next_write_item(pa_pstream * p)595 static void prepare_next_write_item(pa_pstream *p) {
596 pa_assert(p);
597 pa_assert(PA_REFCNT_VALUE(p) > 0);
598
599 p->write.current = pa_queue_pop(p->send_queue);
600
601 if (!p->write.current)
602 return;
603 p->write.index = 0;
604 p->write.data = NULL;
605 p->write.minibuf_validsize = 0;
606 pa_memchunk_reset(&p->write.memchunk);
607
608 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
609 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
610 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
611 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
612 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
613
614 if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
615 size_t plen;
616
617 pa_assert(p->write.current->packet);
618
619 p->write.data = (void *) pa_packet_data(p->write.current->packet, &plen);
620 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) plen);
621
622 if (plen <= MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
623 memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, plen);
624 p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + plen;
625 }
626
627 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
628
629 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
630 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
631
632 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
633
634 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
635 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
636
637 } else {
638 uint32_t flags;
639 bool send_payload = true;
640
641 pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
642 pa_assert(p->write.current->chunk.memblock);
643
644 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
645 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
646 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
647
648 flags = (uint32_t) (p->write.current->seek_mode & PA_FLAG_SEEKMASK);
649
650 if (p->use_shm) {
651 pa_mem_type_t type;
652 uint32_t block_id, shm_id;
653 size_t offset, length;
654 uint32_t *shm_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE];
655 size_t shm_size = sizeof(uint32_t) * PA_PSTREAM_SHM_MAX;
656 pa_mempool *current_pool = pa_memblock_get_pool(p->write.current->chunk.memblock);
657 pa_memexport *current_export;
658
659 if (p->mempool == current_pool)
660 pa_assert_se(current_export = p->export);
661 else
662 pa_assert_se(current_export = pa_memexport_new(current_pool, memexport_revoke_cb, p));
663
664 if (pa_memexport_put(current_export,
665 p->write.current->chunk.memblock,
666 &type,
667 &block_id,
668 &shm_id,
669 &offset,
670 &length) >= 0) {
671
672 if (type == PA_MEM_TYPE_SHARED_POSIX)
673 send_payload = false;
674
675 if (type == PA_MEM_TYPE_SHARED_MEMFD && p->use_memfd) {
676 if (pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
677 flags |= PA_FLAG_SHMDATA_MEMFD_BLOCK;
678 send_payload = false;
679 } else {
680 if (pa_log_ratelimit(PA_LOG_ERROR)) {
681 pa_log("Cannot send block reference with non-registered memfd ID = %u", shm_id);
682 pa_log("Fallig back to copying full block data over socket");
683 }
684 }
685 }
686
687 if (send_payload) {
688 pa_assert_se(pa_memexport_process_release(current_export, block_id) == 0);
689 } else {
690 flags |= PA_FLAG_SHMDATA;
691 if (pa_mempool_is_remote_writable(current_pool))
692 flags |= PA_FLAG_SHMWRITABLE;
693
694 shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
695 shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
696 shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
697 shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
698
699 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(shm_size);
700 p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + shm_size;
701 }
702 }
703 /* else */
704 /* FIXME: Avoid memexport slot leaks. Call pa_memexport_process_release() */
705 /* pa_log_warn("Failed to export memory block."); */
706
707 if (current_export != p->export)
708 pa_memexport_free(current_export);
709 pa_mempool_unref(current_pool);
710 }
711
712 if (send_payload) {
713 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
714 p->write.memchunk = p->write.current->chunk;
715 pa_memblock_ref(p->write.memchunk.memblock);
716 }
717
718 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
719 }
720
721 #ifdef HAVE_CREDS
722 if ((p->send_ancil_data_now = p->write.current->with_ancil_data))
723 p->write_ancil_data = &p->write.current->ancil_data;
724 #endif
725 }
726
check_srbpending(pa_pstream * p)727 static void check_srbpending(pa_pstream *p) {
728 if (!p->is_srbpending)
729 return;
730
731 if (p->srb)
732 pa_srbchannel_free(p->srb);
733
734 p->srb = p->srbpending;
735 p->is_srbpending = false;
736
737 if (p->srb)
738 pa_srbchannel_set_callback(p->srb, srb_callback, p);
739 }
740
do_write(pa_pstream * p)741 static int do_write(pa_pstream *p) {
742 void *d;
743 size_t l;
744 ssize_t r;
745 pa_memblock *release_memblock = NULL;
746
747 pa_assert(p);
748 pa_assert(PA_REFCNT_VALUE(p) > 0);
749
750 if (!p->write.current)
751 prepare_next_write_item(p);
752
753 if (!p->write.current) {
754 /* The out queue is empty, so switching channels is safe */
755 check_srbpending(p);
756 return 0;
757 }
758
759 if (p->write.minibuf_validsize > 0) {
760 d = p->write.minibuf + p->write.index;
761 l = p->write.minibuf_validsize - p->write.index;
762 } else if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
763 d = (uint8_t*) p->write.descriptor + p->write.index;
764 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
765 } else {
766 pa_assert(p->write.data || p->write.memchunk.memblock);
767
768 if (p->write.data)
769 d = p->write.data;
770 else {
771 d = pa_memblock_acquire_chunk(&p->write.memchunk);
772 release_memblock = p->write.memchunk.memblock;
773 }
774
775 d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
776 l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
777 }
778
779 pa_assert(l > 0);
780
781 #ifdef HAVE_CREDS
782 if (p->send_ancil_data_now) {
783 if (p->write_ancil_data->creds_valid) {
784 pa_assert(p->write_ancil_data->nfd == 0);
785 if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_ancil_data->creds)) < 0)
786 goto fail;
787 }
788 else
789 if ((r = pa_iochannel_write_with_fds(p->io, d, l, p->write_ancil_data->nfd, p->write_ancil_data->fds)) < 0)
790 goto fail;
791
792 pa_cmsg_ancil_data_close_fds(p->write_ancil_data);
793 p->send_ancil_data_now = false;
794 } else
795 #endif
796 if (p->srb)
797 r = pa_srbchannel_write(p->srb, d, l);
798 else if ((r = pa_iochannel_write(p->io, d, l)) < 0)
799 goto fail;
800
801 if (release_memblock)
802 pa_memblock_release(release_memblock);
803
804 p->write.index += (size_t) r;
805
806 if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
807 pa_assert(p->write.current);
808 item_free(p->write.current);
809 p->write.current = NULL;
810
811 if (p->write.memchunk.memblock)
812 pa_memblock_unref(p->write.memchunk.memblock);
813
814 pa_memchunk_reset(&p->write.memchunk);
815
816 if (p->drain_callback && !pa_pstream_is_pending(p))
817 p->drain_callback(p, p->drain_callback_userdata);
818 }
819
820 return (size_t) r == l ? 1 : 0;
821
822 fail:
823 #ifdef HAVE_CREDS
824 if (p->send_ancil_data_now)
825 pa_cmsg_ancil_data_close_fds(p->write_ancil_data);
826 #endif
827
828 if (release_memblock)
829 pa_memblock_release(release_memblock);
830
831 return -1;
832 }
833
memblock_complete(pa_pstream * p,struct pstream_read * re)834 static void memblock_complete(pa_pstream *p, struct pstream_read *re) {
835 pa_memchunk chunk;
836 int64_t offset;
837
838 if (!p->receive_memblock_callback)
839 return;
840
841 chunk.memblock = re->memblock;
842 chunk.index = 0;
843 chunk.length = re->index - PA_PSTREAM_DESCRIPTOR_SIZE;
844
845 offset = (int64_t) (
846 (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
847 (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
848
849 p->receive_memblock_callback(
850 p,
851 ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
852 offset,
853 ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
854 &chunk,
855 p->receive_memblock_callback_userdata);
856 }
857
do_read(pa_pstream * p,struct pstream_read * re)858 static int do_read(pa_pstream *p, struct pstream_read *re) {
859 void *d;
860 size_t l;
861 ssize_t r;
862 pa_memblock *release_memblock = NULL;
863 pa_assert(p);
864 pa_assert(PA_REFCNT_VALUE(p) > 0);
865
866 if (re->index < PA_PSTREAM_DESCRIPTOR_SIZE) {
867 d = (uint8_t*) re->descriptor + re->index;
868 l = PA_PSTREAM_DESCRIPTOR_SIZE - re->index;
869 } else {
870 pa_assert(re->data || re->memblock);
871
872 if (re->data)
873 d = re->data;
874 else {
875 d = pa_memblock_acquire(re->memblock);
876 release_memblock = re->memblock;
877 }
878
879 d = (uint8_t*) d + re->index - PA_PSTREAM_DESCRIPTOR_SIZE;
880 l = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (re->index - PA_PSTREAM_DESCRIPTOR_SIZE);
881 }
882
883 if (re == &p->readsrb) {
884 r = pa_srbchannel_read(p->srb, d, l);
885 if (r == 0) {
886 if (release_memblock)
887 pa_memblock_release(release_memblock);
888 return 1;
889 }
890 }
891 else
892 #ifdef HAVE_CREDS
893 {
894 pa_cmsg_ancil_data b;
895
896 if ((r = pa_iochannel_read_with_ancil_data(p->io, d, l, &b)) <= 0)
897 goto fail;
898
899 if (b.creds_valid) {
900 p->read_ancil_data.creds_valid = true;
901 p->read_ancil_data.creds = b.creds;
902 }
903 if (b.nfd > 0) {
904 pa_assert(b.nfd <= MAX_ANCIL_DATA_FDS);
905 p->read_ancil_data.nfd = b.nfd;
906 memcpy(p->read_ancil_data.fds, b.fds, sizeof(int) * b.nfd);
907 p->read_ancil_data.close_fds_on_cleanup = b.close_fds_on_cleanup;
908 }
909 }
910 #else
911 if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
912 goto fail;
913 #endif
914
915 if (release_memblock)
916 pa_memblock_release(release_memblock);
917
918 re->index += (size_t) r;
919
920 if (re->index == PA_PSTREAM_DESCRIPTOR_SIZE) {
921 uint32_t flags, length, channel;
922 /* Reading of frame descriptor complete */
923
924 flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
925
926 if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
927 pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
928 return -1;
929 }
930
931 if (flags == PA_FLAG_SHMRELEASE) {
932
933 /* This is a SHM memblock release frame with no payload */
934
935 /* pa_log("Got release frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
936
937 pa_assert(p->export);
938 pa_memexport_process_release(p->export, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
939
940 goto frame_done;
941
942 } else if (flags == PA_FLAG_SHMREVOKE) {
943
944 /* This is a SHM memblock revoke frame with no payload */
945
946 /* pa_log("Got revoke frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
947
948 pa_assert(p->import);
949 pa_memimport_process_revoke(p->import, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
950
951 goto frame_done;
952 }
953
954 length = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
955
956 if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
957 pa_log_warn("Received invalid frame size: %lu", (unsigned long) length);
958 return -1;
959 }
960
961 pa_assert(!re->packet && !re->memblock);
962
963 channel = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
964
965 if (channel == (uint32_t) -1) {
966 size_t plen;
967
968 if (flags != 0) {
969 pa_log_warn("Received packet frame with invalid flags value.");
970 return -1;
971 }
972
973 /* Frame is a packet frame */
974 re->packet = pa_packet_new(length);
975 re->data = (void *) pa_packet_data(re->packet, &plen);
976
977 } else {
978
979 if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
980 pa_log_warn("Received memblock frame with invalid seek mode.");
981 return -1;
982 }
983
984 if (((flags & PA_FLAG_SHMMASK) & PA_FLAG_SHMDATA) != 0) {
985
986 if (length != sizeof(re->shm_info)) {
987 pa_log_warn("Received SHM memblock frame with invalid frame length.");
988 return -1;
989 }
990
991 /* Frame is a memblock frame referencing an SHM memblock */
992 re->data = re->shm_info;
993
994 } else if ((flags & PA_FLAG_SHMMASK) == 0) {
995
996 /* Frame is a memblock frame */
997
998 re->memblock = pa_memblock_new(p->mempool, length);
999 re->data = NULL;
1000 } else {
1001
1002 pa_log_warn("Received memblock frame with invalid flags value.");
1003 return -1;
1004 }
1005 }
1006
1007 } else if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
1008 /* Frame complete */
1009
1010 if (re->memblock) {
1011 memblock_complete(p, re);
1012
1013 /* This was a memblock frame. We can unref the memblock now */
1014 pa_memblock_unref(re->memblock);
1015
1016 } else if (re->packet) {
1017
1018 if (p->receive_packet_callback)
1019 #ifdef HAVE_CREDS
1020 p->receive_packet_callback(p, re->packet, &p->read_ancil_data, p->receive_packet_callback_userdata);
1021 #else
1022 p->receive_packet_callback(p, re->packet, NULL, p->receive_packet_callback_userdata);
1023 #endif
1024
1025 pa_packet_unref(re->packet);
1026 } else {
1027 pa_memblock *b = NULL;
1028 uint32_t flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
1029 uint32_t shm_id = ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]);
1030 pa_mem_type_t type = (flags & PA_FLAG_SHMDATA_MEMFD_BLOCK) ?
1031 PA_MEM_TYPE_SHARED_MEMFD : PA_MEM_TYPE_SHARED_POSIX;
1032
1033 pa_assert(((flags & PA_FLAG_SHMMASK) & PA_FLAG_SHMDATA) != 0);
1034 pa_assert(p->import);
1035
1036 if (type == PA_MEM_TYPE_SHARED_MEMFD && p->use_memfd &&
1037 !pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
1038
1039 if (pa_log_ratelimit(PA_LOG_ERROR))
1040 pa_log("Ignoring received block reference with non-registered memfd ID = %u", shm_id);
1041
1042 } else if (!(b = pa_memimport_get(p->import,
1043 type,
1044 ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]),
1045 shm_id,
1046 ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]),
1047 ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]),
1048 !!(flags & PA_FLAG_SHMWRITABLE)))) {
1049
1050 if (pa_log_ratelimit(PA_LOG_DEBUG))
1051 pa_log_debug("Failed to import memory block.");
1052 }
1053
1054 if (p->receive_memblock_callback) {
1055 int64_t offset;
1056 pa_memchunk chunk;
1057
1058 chunk.memblock = b;
1059 chunk.index = 0;
1060 chunk.length = b ? pa_memblock_get_length(b) : ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]);
1061
1062 offset = (int64_t) (
1063 (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
1064 (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
1065
1066 p->receive_memblock_callback(
1067 p,
1068 ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
1069 offset,
1070 ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
1071 &chunk,
1072 p->receive_memblock_callback_userdata);
1073 }
1074
1075 if (b)
1076 pa_memblock_unref(b);
1077 }
1078
1079 goto frame_done;
1080 }
1081
1082 return 0;
1083
1084 frame_done:
1085 re->memblock = NULL;
1086 re->packet = NULL;
1087 re->index = 0;
1088 re->data = NULL;
1089
1090 #ifdef HAVE_CREDS
1091 /* FIXME: Close received ancillary data fds if the pstream's
1092 * receive_packet_callback did not do so.
1093 *
1094 * Malicious clients can attach fds to unknown commands, or attach them
1095 * to commands that does not expect fds. By doing so, server will reach
1096 * its open fd limit and future clients' SHM transfers will always fail.
1097 */
1098 p->read_ancil_data.creds_valid = false;
1099 p->read_ancil_data.nfd = 0;
1100 #endif
1101
1102 return 0;
1103
1104 fail:
1105 if (release_memblock)
1106 pa_memblock_release(release_memblock);
1107
1108 return -1;
1109 }
1110
pa_pstream_set_die_callback(pa_pstream * p,pa_pstream_notify_cb_t cb,void * userdata)1111 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
1112 pa_assert(p);
1113 pa_assert(PA_REFCNT_VALUE(p) > 0);
1114
1115 p->die_callback = cb;
1116 p->die_callback_userdata = userdata;
1117 }
1118
pa_pstream_set_drain_callback(pa_pstream * p,pa_pstream_notify_cb_t cb,void * userdata)1119 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
1120 pa_assert(p);
1121 pa_assert(PA_REFCNT_VALUE(p) > 0);
1122
1123 p->drain_callback = cb;
1124 p->drain_callback_userdata = userdata;
1125 }
1126
pa_pstream_set_receive_packet_callback(pa_pstream * p,pa_pstream_packet_cb_t cb,void * userdata)1127 void pa_pstream_set_receive_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
1128 pa_assert(p);
1129 pa_assert(PA_REFCNT_VALUE(p) > 0);
1130
1131 p->receive_packet_callback = cb;
1132 p->receive_packet_callback_userdata = userdata;
1133 }
1134
pa_pstream_set_receive_memblock_callback(pa_pstream * p,pa_pstream_memblock_cb_t cb,void * userdata)1135 void pa_pstream_set_receive_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
1136 pa_assert(p);
1137 pa_assert(PA_REFCNT_VALUE(p) > 0);
1138
1139 p->receive_memblock_callback = cb;
1140 p->receive_memblock_callback_userdata = userdata;
1141 }
1142
pa_pstream_set_release_callback(pa_pstream * p,pa_pstream_block_id_cb_t cb,void * userdata)1143 void pa_pstream_set_release_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
1144 pa_assert(p);
1145 pa_assert(PA_REFCNT_VALUE(p) > 0);
1146
1147 p->release_callback = cb;
1148 p->release_callback_userdata = userdata;
1149 }
1150
pa_pstream_set_revoke_callback(pa_pstream * p,pa_pstream_block_id_cb_t cb,void * userdata)1151 void pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
1152 pa_assert(p);
1153 pa_assert(PA_REFCNT_VALUE(p) > 0);
1154
1155 p->revoke_callback = cb;
1156 p->revoke_callback_userdata = userdata;
1157 }
1158
pa_pstream_is_pending(pa_pstream * p)1159 bool pa_pstream_is_pending(pa_pstream *p) {
1160 bool b;
1161
1162 pa_assert(p);
1163 pa_assert(PA_REFCNT_VALUE(p) > 0);
1164
1165 if (p->dead)
1166 b = false;
1167 else
1168 b = p->write.current || !pa_queue_isempty(p->send_queue);
1169
1170 return b;
1171 }
1172
pa_pstream_unref(pa_pstream * p)1173 void pa_pstream_unref(pa_pstream*p) {
1174 pa_assert(p);
1175 pa_assert(PA_REFCNT_VALUE(p) > 0);
1176
1177 if (PA_REFCNT_DEC(p) <= 0)
1178 pstream_free(p);
1179 }
1180
pa_pstream_ref(pa_pstream * p)1181 pa_pstream* pa_pstream_ref(pa_pstream*p) {
1182 pa_assert(p);
1183 pa_assert(PA_REFCNT_VALUE(p) > 0);
1184
1185 PA_REFCNT_INC(p);
1186 return p;
1187 }
1188
pa_pstream_unlink(pa_pstream * p)1189 void pa_pstream_unlink(pa_pstream *p) {
1190 pa_assert(p);
1191
1192 if (p->dead)
1193 return;
1194
1195 p->dead = true;
1196
1197 while (p->srb || p->is_srbpending) /* In theory there could be one active and one pending */
1198 pa_pstream_set_srbchannel(p, NULL);
1199
1200 if (p->import) {
1201 pa_memimport_free(p->import);
1202 p->import = NULL;
1203 }
1204
1205 if (p->export) {
1206 pa_memexport_free(p->export);
1207 p->export = NULL;
1208 }
1209
1210 if (p->io) {
1211 pa_iochannel_free(p->io);
1212 p->io = NULL;
1213 }
1214
1215 if (p->defer_event) {
1216 p->mainloop->defer_free(p->defer_event);
1217 p->defer_event = NULL;
1218 }
1219
1220 p->die_callback = NULL;
1221 p->drain_callback = NULL;
1222 p->receive_packet_callback = NULL;
1223 p->receive_memblock_callback = NULL;
1224 }
1225
pa_pstream_enable_shm(pa_pstream * p,bool enable)1226 void pa_pstream_enable_shm(pa_pstream *p, bool enable) {
1227 pa_assert(p);
1228 pa_assert(PA_REFCNT_VALUE(p) > 0);
1229
1230 p->use_shm = enable;
1231
1232 if (enable) {
1233
1234 if (!p->export)
1235 p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
1236
1237 } else {
1238
1239 if (p->export) {
1240 pa_memexport_free(p->export);
1241 p->export = NULL;
1242 }
1243 }
1244 }
1245
pa_pstream_enable_memfd(pa_pstream * p)1246 void pa_pstream_enable_memfd(pa_pstream *p) {
1247 pa_assert(p);
1248 pa_assert(PA_REFCNT_VALUE(p) > 0);
1249 pa_assert(p->use_shm);
1250
1251 p->use_memfd = true;
1252
1253 if (!p->registered_memfd_ids) {
1254 p->registered_memfd_ids = pa_idxset_new(NULL, NULL);
1255 }
1256 }
1257
pa_pstream_get_shm(pa_pstream * p)1258 bool pa_pstream_get_shm(pa_pstream *p) {
1259 pa_assert(p);
1260 pa_assert(PA_REFCNT_VALUE(p) > 0);
1261
1262 return p->use_shm;
1263 }
1264
pa_pstream_get_memfd(pa_pstream * p)1265 bool pa_pstream_get_memfd(pa_pstream *p) {
1266 pa_assert(p);
1267 pa_assert(PA_REFCNT_VALUE(p) > 0);
1268
1269 return p->use_memfd;
1270 }
1271
pa_pstream_set_srbchannel(pa_pstream * p,pa_srbchannel * srb)1272 void pa_pstream_set_srbchannel(pa_pstream *p, pa_srbchannel *srb) {
1273 pa_assert(p);
1274 pa_assert(PA_REFCNT_VALUE(p) > 0 || srb == NULL);
1275
1276 if (srb == p->srb)
1277 return;
1278
1279 /* We can't handle quick switches between srbchannels. */
1280 pa_assert(!p->is_srbpending);
1281
1282 p->srbpending = srb;
1283 p->is_srbpending = true;
1284
1285 /* Switch immediately, if possible. */
1286 if (p->dead)
1287 check_srbpending(p);
1288 else
1289 do_write(p);
1290 }
1291