1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as
9 published by the Free Software Foundation; either version 2.1 of the
10 License, or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public
18 License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <unistd.h>
28
29 #ifdef HAVE_NETINET_IN_H
30 #include <netinet/in.h>
31 #endif
32
33 #include <pulse/xmalloc.h>
34
35 #include <pulsecore/idxset.h>
36 #include <pulsecore/socket.h>
37 #include <pulsecore/queue.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/creds.h>
40 #include <pulsecore/refcnt.h>
41 #include <pulsecore/flist.h>
42 #include <pulsecore/macro.h>
43
44 #include "pstream.h"
45 #include "log/audio_log.h"
46
47 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
48 #define PA_FLAG_SHMDATA 0x80000000LU
49 #define PA_FLAG_SHMDATA_MEMFD_BLOCK 0x20000000LU
50 #define PA_FLAG_SHMRELEASE 0x40000000LU
51 #define PA_FLAG_SHMREVOKE 0xC0000000LU
52 #define PA_FLAG_SHMMASK 0xFF000000LU
53 #define PA_FLAG_SEEKMASK 0x000000FFLU
54 #define PA_FLAG_SHMWRITABLE 0x00800000LU
55
56 /* The sequence descriptor header consists of 5 32bit integers: */
57 enum {
58 PA_PSTREAM_DESCRIPTOR_LENGTH,
59 PA_PSTREAM_DESCRIPTOR_CHANNEL,
60 PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
61 PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
62 PA_PSTREAM_DESCRIPTOR_FLAGS,
63 PA_PSTREAM_DESCRIPTOR_MAX
64 };
65
66 /* If we have an SHM block, this info follows the descriptor */
67 enum {
68 PA_PSTREAM_SHM_BLOCKID,
69 PA_PSTREAM_SHM_SHMID,
70 PA_PSTREAM_SHM_INDEX,
71 PA_PSTREAM_SHM_LENGTH,
72 PA_PSTREAM_SHM_MAX
73 };
74
75 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
76
77 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
78
79 #define MINIBUF_SIZE (256)
80
81 /* To allow uploading a single sample in one frame, this value should be the
82 * same size (16 MB) as PA_SCACHE_ENTRY_SIZE_MAX from pulsecore/core-scache.h.
83 */
84 #define FRAME_SIZE_MAX_ALLOW (1024*1024*16)
85
86 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
87
88 struct item_info {
89 enum {
90 PA_PSTREAM_ITEM_PACKET,
91 PA_PSTREAM_ITEM_MEMBLOCK,
92 PA_PSTREAM_ITEM_SHMRELEASE,
93 PA_PSTREAM_ITEM_SHMREVOKE
94 } type;
95
96 /* packet info */
97 pa_packet *packet;
98 #ifdef HAVE_CREDS
99 bool with_ancil_data;
100 pa_cmsg_ancil_data ancil_data;
101 #endif
102
103 /* memblock info */
104 pa_memchunk chunk;
105 uint32_t channel;
106 int64_t offset;
107 pa_seek_mode_t seek_mode;
108
109 /* release/revoke info */
110 uint32_t block_id;
111 };
112
113 struct pstream_read {
114 pa_pstream_descriptor descriptor;
115 pa_memblock *memblock;
116 pa_packet *packet;
117 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
118 void *data;
119 size_t index;
120 };
121
122 struct pa_pstream {
123 PA_REFCNT_DECLARE;
124
125 pa_mainloop_api *mainloop;
126 pa_defer_event *defer_event;
127 pa_iochannel *io;
128 pa_srbchannel *srb, *srbpending;
129 bool is_srbpending;
130
131 pa_queue *send_queue;
132
133 bool dead;
134
135 struct {
136 union {
137 uint8_t minibuf[MINIBUF_SIZE];
138 pa_pstream_descriptor descriptor;
139 };
140 struct item_info* current;
141 void *data;
142 size_t index;
143 int minibuf_validsize;
144 pa_memchunk memchunk;
145 } write;
146
147 struct pstream_read readio, readsrb;
148
149 /* @use_shm: beside copying the full audio data to the other
150 * PA end, this pipe supports just sending references of the
151 * same audio data blocks if they reside in a SHM pool.
152 *
153 * @use_memfd: pipe supports sending SHM memfd block references
154 *
155 * @registered_memfd_ids: registered memfd pools SHM IDs. Check
156 * pa_pstream_register_memfd_mempool() for more information. */
157 bool use_shm, use_memfd;
158 bool non_registered_memfd_id_error_logged;
159 pa_idxset *registered_memfd_ids;
160
161 pa_memimport *import;
162 pa_memexport *export;
163
164 pa_pstream_packet_cb_t receive_packet_callback;
165 void *receive_packet_callback_userdata;
166
167 pa_pstream_memblock_cb_t receive_memblock_callback;
168 void *receive_memblock_callback_userdata;
169
170 pa_pstream_notify_cb_t drain_callback;
171 void *drain_callback_userdata;
172
173 pa_pstream_notify_cb_t die_callback;
174 void *die_callback_userdata;
175
176 pa_pstream_block_id_cb_t revoke_callback;
177 void *revoke_callback_userdata;
178
179 pa_pstream_block_id_cb_t release_callback;
180 void *release_callback_userdata;
181
182 pa_mempool *mempool;
183
184 #ifdef HAVE_CREDS
185 pa_cmsg_ancil_data read_ancil_data, *write_ancil_data;
186 bool send_ancil_data_now;
187 #endif
188 };
189
190 #ifdef HAVE_CREDS
191 /*
192 * memfd-backed SHM pools blocks transfer occur without passing the pool's
193 * fd every time, thus minimizing overhead and avoiding fd leaks. A
194 * REGISTER_MEMFD_SHMID command is sent, with the pool's memfd fd, very early
195 * on. This command has an ID that uniquely identifies the pool in question.
196 * Further pool's block references can then be exclusively done using such ID;
197 * the fd can be safely closed – on both ends – afterwards.
198 *
199 * On the sending side of this command, we want to close the passed fds
200 * directly after being sent. Meanwhile we're only allowed to asynchronously
201 * schedule packet writes to the pstream, so the job of closing passed fds is
202 * left to the pstream's actual writing function do_write(): it knows the
203 * exact point in time where the fds are passed to the other end through
204 * iochannels and the sendmsg() system call.
205 *
206 * Nonetheless not all code paths in the system desire their socket-passed
207 * fds to be closed after the send. srbchannel needs the passed fds to still
208 * be open for further communication. System-wide global memfd-backed pools
209 * also require the passed fd to be open: they pass the same fd, with the same
210 * ID registration mechanism, for each newly connected client to the system.
211 *
212 * So from all of the above, never close the ancillary fds by your own and
213 * always call below method instead. It takes care of closing the passed fds
214 * _only if allowed_ by the code paths that originally created them to do so.
215 * Moreover, it is multiple-invocations safe: failure handlers can, and
216 * should, call it for passed fds cleanup without worrying too much about
217 * the system state.
218 */
pa_cmsg_ancil_data_close_fds(struct pa_cmsg_ancil_data * ancil)219 void pa_cmsg_ancil_data_close_fds(struct pa_cmsg_ancil_data *ancil) {
220 if (ancil && ancil->nfd > 0 && ancil->close_fds_on_cleanup) {
221 int i;
222
223 pa_assert(ancil->nfd <= MAX_ANCIL_DATA_FDS);
224
225 for (i = 0; i < ancil->nfd; i++)
226 if (ancil->fds[i] != -1) {
227 pa_assert_se(pa_close(ancil->fds[i]) == 0);
228 ancil->fds[i] = -1;
229 }
230
231 ancil->nfd = 0;
232 ancil->close_fds_on_cleanup = false;
233 }
234 }
235 #endif
236
237 static int do_write(pa_pstream *p);
238 static int do_read(pa_pstream *p, struct pstream_read *re);
239
do_pstream_read_write(pa_pstream * p)240 static void do_pstream_read_write(pa_pstream *p) {
241 pa_assert(p);
242 pa_assert(PA_REFCNT_VALUE(p) > 0);
243
244 pa_pstream_ref(p);
245
246 p->mainloop->defer_enable(p->defer_event, 0);
247
248 if (!p->dead && p->srb) {
249 int r = 0;
250
251 if(do_write(p) < 0)
252 goto fail;
253
254 while (!p->dead && r == 0) {
255 r = do_read(p, &p->readsrb);
256 if (r < 0)
257 goto fail;
258 }
259 }
260
261 if (!p->dead && pa_iochannel_is_readable(p->io)) {
262 if (do_read(p, &p->readio) < 0)
263 goto fail;
264 } else if (!p->dead && pa_iochannel_is_hungup(p->io))
265 goto fail;
266
267 while (!p->dead && pa_iochannel_is_writable(p->io)) {
268 int r = do_write(p);
269 if (r < 0)
270 goto fail;
271 if (r == 0)
272 break;
273 }
274
275 pa_pstream_unref(p);
276 return;
277
278 fail:
279
280 if (p->die_callback)
281 p->die_callback(p, p->die_callback_userdata);
282
283 pa_pstream_unlink(p);
284 pa_pstream_unref(p);
285 }
286
srb_callback(pa_srbchannel * srb,void * userdata)287 static bool srb_callback(pa_srbchannel *srb, void *userdata) {
288 bool b;
289 pa_pstream *p = userdata;
290
291 pa_assert(p);
292 pa_assert(PA_REFCNT_VALUE(p) > 0);
293 pa_assert(p->srb == srb);
294
295 pa_pstream_ref(p);
296
297 do_pstream_read_write(p);
298
299 /* If either pstream or the srb is going away, return false.
300 We need to check this before p is destroyed. */
301 b = (PA_REFCNT_VALUE(p) > 1) && (p->srb == srb);
302 pa_pstream_unref(p);
303
304 return b;
305 }
306
io_callback(pa_iochannel * io,void * userdata)307 static void io_callback(pa_iochannel*io, void *userdata) {
308 pa_pstream *p = userdata;
309
310 pa_assert(p);
311 pa_assert(PA_REFCNT_VALUE(p) > 0);
312 pa_assert(p->io == io);
313
314 do_pstream_read_write(p);
315 }
316
defer_callback(pa_mainloop_api * m,pa_defer_event * e,void * userdata)317 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
318 pa_pstream *p = userdata;
319
320 pa_assert(p);
321 pa_assert(PA_REFCNT_VALUE(p) > 0);
322 pa_assert(p->defer_event == e);
323 pa_assert(p->mainloop == m);
324
325 do_pstream_read_write(p);
326 }
327
328 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
329
pa_pstream_new(pa_mainloop_api * m,pa_iochannel * io,pa_mempool * pool)330 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
331 pa_pstream *p;
332
333 pa_assert(m);
334 pa_assert(io);
335 pa_assert(pool);
336
337 p = pa_xnew0(pa_pstream, 1);
338 PA_REFCNT_INIT(p);
339 p->io = io;
340 pa_iochannel_set_callback(io, io_callback, p);
341
342 p->mainloop = m;
343 p->defer_event = m->defer_new(m, defer_callback, p);
344 m->defer_enable(p->defer_event, 0);
345
346 p->send_queue = pa_queue_new();
347
348 p->mempool = pool;
349
350 /* We do importing unconditionally */
351 p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
352
353 pa_iochannel_socket_set_rcvbuf(io, pa_mempool_block_size_max(p->mempool));
354 pa_iochannel_socket_set_sndbuf(io, pa_mempool_block_size_max(p->mempool));
355
356 return p;
357 }
358
359 /* Attach memfd<->SHM_ID mapping to given pstream and its memimport.
360 * Check pa_pstream_register_memfd_mempool() for further info.
361 *
362 * Caller owns the passed @memfd_fd and must close it down when appropriate. */
pa_pstream_attach_memfd_shmid(pa_pstream * p,unsigned shm_id,int memfd_fd)363 int pa_pstream_attach_memfd_shmid(pa_pstream *p, unsigned shm_id, int memfd_fd) {
364 int err = -1;
365
366 pa_assert(memfd_fd != -1);
367
368 if (!p->use_memfd) {
369 pa_log_warn("Received memfd ID registration request over a pipe "
370 "that does not support memfds");
371 return err;
372 }
373
374 if (pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
375 pa_log_warn("previously registered memfd SHM ID = %u", shm_id);
376 return err;
377 }
378
379 if (pa_memimport_attach_memfd(p->import, shm_id, memfd_fd, true)) {
380 pa_log("Failed to create permanent mapping for memfd region with ID = %u", shm_id);
381 return err;
382 }
383
384 pa_assert_se(pa_idxset_put(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL) == 0);
385 return 0;
386 }
387
item_free(void * item)388 static void item_free(void *item) {
389 struct item_info *i = item;
390 pa_assert(i);
391
392 if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
393 pa_assert(i->chunk.memblock);
394 pa_memblock_unref(i->chunk.memblock);
395 } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
396 pa_assert(i->packet);
397 pa_packet_unref(i->packet);
398 }
399
400 #ifdef HAVE_CREDS
401 /* On error recovery paths, there might be lingering items
402 * on the pstream send queue and they are usually freed with
403 * a call to 'pa_queue_free(p->send_queue, item_free)'. Make
404 * sure we do not leak any fds in that case! */
405 if (i->with_ancil_data)
406 pa_cmsg_ancil_data_close_fds(&i->ancil_data);
407 #endif
408
409 if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
410 pa_xfree(i);
411 }
412
pstream_free(pa_pstream * p)413 static void pstream_free(pa_pstream *p) {
414 pa_assert(p);
415
416 pa_pstream_unlink(p);
417
418 pa_queue_free(p->send_queue, item_free);
419
420 if (p->write.current)
421 item_free(p->write.current);
422
423 if (p->write.memchunk.memblock)
424 pa_memblock_unref(p->write.memchunk.memblock);
425
426 if (p->readsrb.memblock)
427 pa_memblock_unref(p->readsrb.memblock);
428
429 if (p->readsrb.packet)
430 pa_packet_unref(p->readsrb.packet);
431
432 if (p->readio.memblock)
433 pa_memblock_unref(p->readio.memblock);
434
435 if (p->readio.packet)
436 pa_packet_unref(p->readio.packet);
437
438 if (p->registered_memfd_ids)
439 pa_idxset_free(p->registered_memfd_ids, NULL);
440
441 pa_xfree(p);
442 }
443
pa_pstream_send_packet(pa_pstream * p,pa_packet * packet,pa_cmsg_ancil_data * ancil_data)444 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, pa_cmsg_ancil_data *ancil_data) {
445 struct item_info *i;
446
447 pa_assert(p);
448 pa_assert(PA_REFCNT_VALUE(p) > 0);
449 pa_assert(packet);
450
451 if (p->dead) {
452 #ifdef HAVE_CREDS
453 pa_cmsg_ancil_data_close_fds(ancil_data);
454 #endif
455 return;
456 }
457
458 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
459 i = pa_xnew(struct item_info, 1);
460
461 i->type = PA_PSTREAM_ITEM_PACKET;
462 i->packet = pa_packet_ref(packet);
463
464 #ifdef HAVE_CREDS
465 if ((i->with_ancil_data = !!ancil_data)) {
466 i->ancil_data = *ancil_data;
467 if (ancil_data->creds_valid)
468 pa_assert(ancil_data->nfd == 0);
469 else
470 pa_assert(ancil_data->nfd > 0);
471 }
472 #endif
473
474 pa_queue_push(p->send_queue, i);
475 if (PaQueueGetLen(p->send_queue) >= 3) { // 3 maybe have msg backlog
476 AUDIO_WARNING_LOG("[MSG backlog]: @<%{public}p> PaQueueLen = %{public}u", p, PaQueueGetLen(p->send_queue));
477 }
478
479 p->mainloop->defer_enable(p->defer_event, 1);
480 }
481
pa_pstream_send_memblock(pa_pstream * p,uint32_t channel,int64_t offset,pa_seek_mode_t seek_mode,const pa_memchunk * chunk)482 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
483 size_t length, idx;
484 size_t bsm;
485
486 pa_assert(p);
487 pa_assert(PA_REFCNT_VALUE(p) > 0);
488 pa_assert(channel != (uint32_t) -1);
489 pa_assert(chunk);
490
491 if (p->dead)
492 return;
493
494 idx = 0;
495 length = chunk->length;
496
497 bsm = pa_mempool_block_size_max(p->mempool);
498
499 while (length > 0) {
500 struct item_info *i;
501 size_t n;
502
503 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
504 i = pa_xnew(struct item_info, 1);
505 i->type = PA_PSTREAM_ITEM_MEMBLOCK;
506
507 n = PA_MIN(length, bsm);
508 i->chunk.index = chunk->index + idx;
509 i->chunk.length = n;
510 i->chunk.memblock = pa_memblock_ref(chunk->memblock);
511
512 i->channel = channel;
513 i->offset = offset;
514 i->seek_mode = seek_mode;
515 #ifdef HAVE_CREDS
516 i->with_ancil_data = false;
517 #endif
518
519 pa_queue_push(p->send_queue, i);
520
521 idx += n;
522 length -= n;
523 }
524
525 p->mainloop->defer_enable(p->defer_event, 1);
526 }
527
pa_pstream_send_release(pa_pstream * p,uint32_t block_id)528 void pa_pstream_send_release(pa_pstream *p, uint32_t block_id) {
529 struct item_info *item;
530 pa_assert(p);
531 pa_assert(PA_REFCNT_VALUE(p) > 0);
532
533 if (p->dead)
534 return;
535
536 /* pa_log("Releasing block %u", block_id); */
537
538 if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
539 item = pa_xnew(struct item_info, 1);
540 item->type = PA_PSTREAM_ITEM_SHMRELEASE;
541 item->block_id = block_id;
542 #ifdef HAVE_CREDS
543 item->with_ancil_data = false;
544 #endif
545
546 pa_queue_push(p->send_queue, item);
547 p->mainloop->defer_enable(p->defer_event, 1);
548 }
549
550 /* might be called from thread context */
memimport_release_cb(pa_memimport * i,uint32_t block_id,void * userdata)551 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
552 pa_pstream *p = userdata;
553
554 pa_assert(p);
555 pa_assert(PA_REFCNT_VALUE(p) > 0);
556
557 if (p->dead)
558 return;
559
560 if (p->release_callback)
561 p->release_callback(p, block_id, p->release_callback_userdata);
562 else
563 pa_pstream_send_release(p, block_id);
564 }
565
pa_pstream_send_revoke(pa_pstream * p,uint32_t block_id)566 void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id) {
567 struct item_info *item;
568 pa_assert(p);
569 pa_assert(PA_REFCNT_VALUE(p) > 0);
570
571 if (p->dead)
572 return;
573 /* pa_log("Revoking block %u", block_id); */
574
575 if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
576 item = pa_xnew(struct item_info, 1);
577 item->type = PA_PSTREAM_ITEM_SHMREVOKE;
578 item->block_id = block_id;
579 #ifdef HAVE_CREDS
580 item->with_ancil_data = false;
581 #endif
582
583 pa_queue_push(p->send_queue, item);
584 p->mainloop->defer_enable(p->defer_event, 1);
585 }
586
587 /* might be called from thread context */
memexport_revoke_cb(pa_memexport * e,uint32_t block_id,void * userdata)588 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
589 pa_pstream *p = userdata;
590
591 pa_assert(p);
592 pa_assert(PA_REFCNT_VALUE(p) > 0);
593
594 if (p->revoke_callback)
595 p->revoke_callback(p, block_id, p->revoke_callback_userdata);
596 else
597 pa_pstream_send_revoke(p, block_id);
598 }
599
prepare_next_write_item(pa_pstream * p)600 static void prepare_next_write_item(pa_pstream *p) {
601 pa_assert(p);
602 pa_assert(PA_REFCNT_VALUE(p) > 0);
603
604 p->write.current = pa_queue_pop(p->send_queue);
605
606 if (!p->write.current)
607 return;
608 p->write.index = 0;
609 p->write.data = NULL;
610 p->write.minibuf_validsize = 0;
611 pa_memchunk_reset(&p->write.memchunk);
612
613 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
614 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
615 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
616 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
617 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
618
619 if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
620 size_t plen;
621
622 pa_assert(p->write.current->packet);
623
624 p->write.data = (void *) pa_packet_data(p->write.current->packet, &plen);
625 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) plen);
626
627 if (plen <= MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
628 memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, plen);
629 p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + plen;
630 }
631
632 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
633
634 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
635 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
636
637 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
638
639 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
640 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
641
642 } else {
643 uint32_t flags;
644 bool send_payload = true;
645
646 pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
647 pa_assert(p->write.current->chunk.memblock);
648
649 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
650 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
651 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
652
653 flags = (uint32_t) (p->write.current->seek_mode & PA_FLAG_SEEKMASK);
654
655 if (p->use_shm) {
656 pa_mem_type_t type;
657 uint32_t block_id, shm_id;
658 size_t offset, length;
659 uint32_t *shm_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE];
660 size_t shm_size = sizeof(uint32_t) * PA_PSTREAM_SHM_MAX;
661 pa_mempool *current_pool = pa_memblock_get_pool(p->write.current->chunk.memblock);
662 pa_memexport *current_export;
663
664 if (p->mempool == current_pool)
665 pa_assert_se(current_export = p->export);
666 else
667 pa_assert_se(current_export = pa_memexport_new(current_pool, memexport_revoke_cb, p));
668
669 if (pa_memexport_put(current_export,
670 p->write.current->chunk.memblock,
671 &type,
672 &block_id,
673 &shm_id,
674 &offset,
675 &length) >= 0) {
676
677 if (type == PA_MEM_TYPE_SHARED_POSIX)
678 send_payload = false;
679
680 if (type == PA_MEM_TYPE_SHARED_MEMFD && p->use_memfd) {
681 if (pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
682 flags |= PA_FLAG_SHMDATA_MEMFD_BLOCK;
683 send_payload = false;
684 } else {
685 if (!p->non_registered_memfd_id_error_logged) {
686 pa_log("Cannot send block reference with non-registered memfd ID = %u", shm_id);
687 pa_log("Falling back to copying full block data over socket");
688 pa_log("There's a bug report about this: https://gitlab.freedesktop.org/pulseaudio/pulseaudio/issues/824");
689 p->non_registered_memfd_id_error_logged = true;
690 }
691 }
692 }
693
694 if (send_payload) {
695 pa_assert_se(pa_memexport_process_release(current_export, block_id) == 0);
696 } else {
697 flags |= PA_FLAG_SHMDATA;
698 if (pa_mempool_is_remote_writable(current_pool))
699 flags |= PA_FLAG_SHMWRITABLE;
700
701 shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
702 shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
703 shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
704 shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
705
706 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(shm_size);
707 p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + shm_size;
708 }
709 }
710 /* else */
711 /* FIXME: Avoid memexport slot leaks. Call pa_memexport_process_release() */
712 /* pa_log_warn("Failed to export memory block."); */
713
714 if (current_export != p->export)
715 pa_memexport_free(current_export);
716 pa_mempool_unref(current_pool);
717 }
718
719 if (send_payload) {
720 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
721 p->write.memchunk = p->write.current->chunk;
722 pa_memblock_ref(p->write.memchunk.memblock);
723 }
724
725 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
726 }
727
728 #ifdef HAVE_CREDS
729 if ((p->send_ancil_data_now = p->write.current->with_ancil_data))
730 p->write_ancil_data = &p->write.current->ancil_data;
731 #endif
732 }
733
check_srbpending(pa_pstream * p)734 static void check_srbpending(pa_pstream *p) {
735 if (!p->is_srbpending)
736 return;
737
738 if (p->srb)
739 pa_srbchannel_free(p->srb);
740
741 p->srb = p->srbpending;
742 p->is_srbpending = false;
743
744 if (p->srb)
745 pa_srbchannel_set_callback(p->srb, srb_callback, p);
746 }
747
do_write(pa_pstream * p)748 static int do_write(pa_pstream *p) {
749 void *d;
750 size_t l;
751 ssize_t r;
752 pa_memblock *release_memblock = NULL;
753
754 pa_assert(p);
755 pa_assert(PA_REFCNT_VALUE(p) > 0);
756
757 if (!p->write.current)
758 prepare_next_write_item(p);
759
760 if (!p->write.current) {
761 /* The out queue is empty, so switching channels is safe */
762 check_srbpending(p);
763 return 0;
764 }
765
766 if (p->write.minibuf_validsize > 0) {
767 d = p->write.minibuf + p->write.index;
768 l = p->write.minibuf_validsize - p->write.index;
769 } else if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
770 d = (uint8_t*) p->write.descriptor + p->write.index;
771 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
772 } else {
773 pa_assert(p->write.data || p->write.memchunk.memblock);
774
775 if (p->write.data)
776 d = p->write.data;
777 else {
778 d = pa_memblock_acquire_chunk(&p->write.memchunk);
779 release_memblock = p->write.memchunk.memblock;
780 }
781
782 d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
783 l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
784 }
785
786 pa_assert(l > 0);
787
788 #ifdef HAVE_CREDS
789 if (p->send_ancil_data_now) {
790 if (p->write_ancil_data->creds_valid) {
791 pa_assert(p->write_ancil_data->nfd == 0);
792 if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_ancil_data->creds)) < 0)
793 goto fail;
794 }
795 else
796 if ((r = pa_iochannel_write_with_fds(p->io, d, l, p->write_ancil_data->nfd, p->write_ancil_data->fds)) < 0)
797 goto fail;
798
799 pa_cmsg_ancil_data_close_fds(p->write_ancil_data);
800 p->send_ancil_data_now = false;
801 } else
802 #endif
803 if (p->srb)
804 r = pa_srbchannel_write(p->srb, d, l);
805 else if ((r = pa_iochannel_write(p->io, d, l)) < 0)
806 goto fail;
807
808 if (release_memblock)
809 pa_memblock_release(release_memblock);
810
811 p->write.index += (size_t) r;
812
813 if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
814 pa_assert(p->write.current);
815 item_free(p->write.current);
816 p->write.current = NULL;
817
818 if (p->write.memchunk.memblock)
819 pa_memblock_unref(p->write.memchunk.memblock);
820
821 pa_memchunk_reset(&p->write.memchunk);
822
823 if (p->drain_callback && !pa_pstream_is_pending(p))
824 p->drain_callback(p, p->drain_callback_userdata);
825 }
826
827 return (size_t) r == l ? 1 : 0;
828
829 fail:
830 #ifdef HAVE_CREDS
831 if (p->send_ancil_data_now)
832 pa_cmsg_ancil_data_close_fds(p->write_ancil_data);
833 #endif
834
835 if (release_memblock)
836 pa_memblock_release(release_memblock);
837
838 return -1;
839 }
840
memblock_complete(pa_pstream * p,struct pstream_read * re)841 static void memblock_complete(pa_pstream *p, struct pstream_read *re) {
842 pa_memchunk chunk;
843 int64_t offset;
844
845 if (!p->receive_memblock_callback)
846 return;
847
848 chunk.memblock = re->memblock;
849 chunk.index = 0;
850 chunk.length = re->index - PA_PSTREAM_DESCRIPTOR_SIZE;
851
852 offset = (int64_t) (
853 (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
854 (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
855
856 p->receive_memblock_callback(
857 p,
858 ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
859 offset,
860 ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
861 &chunk,
862 p->receive_memblock_callback_userdata);
863 }
864
do_read(pa_pstream * p,struct pstream_read * re)865 static int do_read(pa_pstream *p, struct pstream_read *re) {
866 void *d;
867 size_t l;
868 ssize_t r;
869 pa_memblock *release_memblock = NULL;
870 pa_assert(p);
871 pa_assert(PA_REFCNT_VALUE(p) > 0);
872
873 if (re->index < PA_PSTREAM_DESCRIPTOR_SIZE) {
874 d = (uint8_t*) re->descriptor + re->index;
875 l = PA_PSTREAM_DESCRIPTOR_SIZE - re->index;
876 } else {
877 pa_assert(re->data || re->memblock);
878
879 if (re->data)
880 d = re->data;
881 else {
882 d = pa_memblock_acquire(re->memblock);
883 release_memblock = re->memblock;
884 }
885
886 d = (uint8_t*) d + re->index - PA_PSTREAM_DESCRIPTOR_SIZE;
887 l = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (re->index - PA_PSTREAM_DESCRIPTOR_SIZE);
888 }
889
890 if (re == &p->readsrb) {
891 r = pa_srbchannel_read(p->srb, d, l);
892 if (r == 0) {
893 if (release_memblock)
894 pa_memblock_release(release_memblock);
895 return 1;
896 }
897 }
898 else
899 #ifdef HAVE_CREDS
900 {
901 pa_cmsg_ancil_data b;
902
903 if ((r = pa_iochannel_read_with_ancil_data(p->io, d, l, &b)) <= 0)
904 goto fail;
905
906 if (b.creds_valid) {
907 p->read_ancil_data.creds_valid = true;
908 p->read_ancil_data.creds = b.creds;
909 }
910 if (b.nfd > 0) {
911 pa_assert(b.nfd <= MAX_ANCIL_DATA_FDS);
912 p->read_ancil_data.nfd = b.nfd;
913 memcpy(p->read_ancil_data.fds, b.fds, sizeof(int) * b.nfd);
914 p->read_ancil_data.close_fds_on_cleanup = b.close_fds_on_cleanup;
915 }
916 }
917 #else
918 if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
919 goto fail;
920 #endif
921
922 if (release_memblock)
923 pa_memblock_release(release_memblock);
924
925 re->index += (size_t) r;
926
927 if (re->index == PA_PSTREAM_DESCRIPTOR_SIZE) {
928 uint32_t flags, length, channel;
929 /* Reading of frame descriptor complete */
930
931 flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
932
933 if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
934 pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
935 return -1;
936 }
937
938 if (flags == PA_FLAG_SHMRELEASE) {
939
940 /* This is a SHM memblock release frame with no payload */
941
942 /* pa_log("Got release frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
943
944 pa_assert(p->export);
945 pa_memexport_process_release(p->export, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
946
947 goto frame_done;
948
949 } else if (flags == PA_FLAG_SHMREVOKE) {
950
951 /* This is a SHM memblock revoke frame with no payload */
952
953 /* pa_log("Got revoke frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
954
955 pa_assert(p->import);
956 pa_memimport_process_revoke(p->import, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
957
958 goto frame_done;
959 }
960
961 length = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
962
963 if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
964 pa_log_warn("Received invalid frame size: %lu", (unsigned long) length);
965 return -1;
966 }
967
968 pa_assert(!re->packet && !re->memblock);
969
970 channel = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
971
972 if (channel == (uint32_t) -1) {
973 size_t plen;
974
975 if (flags != 0) {
976 pa_log_warn("Received packet frame with invalid flags value.");
977 return -1;
978 }
979
980 /* Frame is a packet frame */
981 re->packet = pa_packet_new(length);
982 re->data = (void *) pa_packet_data(re->packet, &plen);
983
984 } else {
985
986 if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
987 pa_log_warn("Received memblock frame with invalid seek mode.");
988 return -1;
989 }
990
991 if (((flags & PA_FLAG_SHMMASK) & PA_FLAG_SHMDATA) != 0) {
992
993 if (length != sizeof(re->shm_info)) {
994 pa_log_warn("Received SHM memblock frame with invalid frame length.");
995 return -1;
996 }
997
998 /* Frame is a memblock frame referencing an SHM memblock */
999 re->data = re->shm_info;
1000
1001 } else if ((flags & PA_FLAG_SHMMASK) == 0) {
1002
1003 /* Frame is a memblock frame */
1004
1005 re->memblock = pa_memblock_new(p->mempool, length);
1006 re->data = NULL;
1007 } else {
1008
1009 pa_log_warn("Received memblock frame with invalid flags value.");
1010 return -1;
1011 }
1012 }
1013
1014 } else if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
1015 /* Frame complete */
1016
1017 if (re->memblock) {
1018 memblock_complete(p, re);
1019
1020 /* This was a memblock frame. We can unref the memblock now */
1021 pa_memblock_unref(re->memblock);
1022
1023 } else if (re->packet) {
1024
1025 if (p->receive_packet_callback)
1026 #ifdef HAVE_CREDS
1027 p->receive_packet_callback(p, re->packet, &p->read_ancil_data, p->receive_packet_callback_userdata);
1028 #else
1029 p->receive_packet_callback(p, re->packet, NULL, p->receive_packet_callback_userdata);
1030 #endif
1031
1032 pa_packet_unref(re->packet);
1033 } else {
1034 pa_memblock *b = NULL;
1035 uint32_t flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
1036 uint32_t shm_id = ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]);
1037 pa_mem_type_t type = (flags & PA_FLAG_SHMDATA_MEMFD_BLOCK) ?
1038 PA_MEM_TYPE_SHARED_MEMFD : PA_MEM_TYPE_SHARED_POSIX;
1039
1040 pa_assert(((flags & PA_FLAG_SHMMASK) & PA_FLAG_SHMDATA) != 0);
1041 pa_assert(p->import);
1042
1043 if (type == PA_MEM_TYPE_SHARED_MEMFD && p->use_memfd &&
1044 !pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
1045
1046 if (pa_log_ratelimit(PA_LOG_ERROR))
1047 pa_log("Ignoring received block reference with non-registered memfd ID = %u", shm_id);
1048
1049 } else if (!(b = pa_memimport_get(p->import,
1050 type,
1051 ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]),
1052 shm_id,
1053 ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]),
1054 ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]),
1055 !!(flags & PA_FLAG_SHMWRITABLE)))) {
1056
1057 if (pa_log_ratelimit(PA_LOG_DEBUG))
1058 pa_log_debug("Failed to import memory block.");
1059 }
1060
1061 if (p->receive_memblock_callback) {
1062 int64_t offset;
1063 pa_memchunk chunk;
1064
1065 chunk.memblock = b;
1066 chunk.index = 0;
1067 chunk.length = b ? pa_memblock_get_length(b) : ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]);
1068
1069 offset = (int64_t) (
1070 (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
1071 (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
1072
1073 p->receive_memblock_callback(
1074 p,
1075 ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
1076 offset,
1077 ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
1078 &chunk,
1079 p->receive_memblock_callback_userdata);
1080 }
1081
1082 if (b)
1083 pa_memblock_unref(b);
1084 }
1085
1086 goto frame_done;
1087 }
1088
1089 return 0;
1090
1091 frame_done:
1092 re->memblock = NULL;
1093 re->packet = NULL;
1094 re->index = 0;
1095 re->data = NULL;
1096
1097 #ifdef HAVE_CREDS
1098 /* FIXME: Close received ancillary data fds if the pstream's
1099 * receive_packet_callback did not do so.
1100 *
1101 * Malicious clients can attach fds to unknown commands, or attach them
1102 * to commands that does not expect fds. By doing so, server will reach
1103 * its open fd limit and future clients' SHM transfers will always fail.
1104 */
1105 p->read_ancil_data.creds_valid = false;
1106 p->read_ancil_data.nfd = 0;
1107 #endif
1108
1109 return 0;
1110
1111 fail:
1112 if (release_memblock)
1113 pa_memblock_release(release_memblock);
1114
1115 return -1;
1116 }
1117
pa_pstream_set_die_callback(pa_pstream * p,pa_pstream_notify_cb_t cb,void * userdata)1118 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
1119 pa_assert(p);
1120 pa_assert(PA_REFCNT_VALUE(p) > 0);
1121
1122 p->die_callback = cb;
1123 p->die_callback_userdata = userdata;
1124 }
1125
pa_pstream_set_drain_callback(pa_pstream * p,pa_pstream_notify_cb_t cb,void * userdata)1126 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
1127 pa_assert(p);
1128 pa_assert(PA_REFCNT_VALUE(p) > 0);
1129
1130 p->drain_callback = cb;
1131 p->drain_callback_userdata = userdata;
1132 }
1133
pa_pstream_set_receive_packet_callback(pa_pstream * p,pa_pstream_packet_cb_t cb,void * userdata)1134 void pa_pstream_set_receive_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
1135 pa_assert(p);
1136 pa_assert(PA_REFCNT_VALUE(p) > 0);
1137
1138 p->receive_packet_callback = cb;
1139 p->receive_packet_callback_userdata = userdata;
1140 }
1141
pa_pstream_set_receive_memblock_callback(pa_pstream * p,pa_pstream_memblock_cb_t cb,void * userdata)1142 void pa_pstream_set_receive_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
1143 pa_assert(p);
1144 pa_assert(PA_REFCNT_VALUE(p) > 0);
1145
1146 p->receive_memblock_callback = cb;
1147 p->receive_memblock_callback_userdata = userdata;
1148 }
1149
pa_pstream_set_release_callback(pa_pstream * p,pa_pstream_block_id_cb_t cb,void * userdata)1150 void pa_pstream_set_release_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
1151 pa_assert(p);
1152 pa_assert(PA_REFCNT_VALUE(p) > 0);
1153
1154 p->release_callback = cb;
1155 p->release_callback_userdata = userdata;
1156 }
1157
pa_pstream_set_revoke_callback(pa_pstream * p,pa_pstream_block_id_cb_t cb,void * userdata)1158 void pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
1159 pa_assert(p);
1160 pa_assert(PA_REFCNT_VALUE(p) > 0);
1161
1162 p->revoke_callback = cb;
1163 p->revoke_callback_userdata = userdata;
1164 }
1165
pa_pstream_is_pending(pa_pstream * p)1166 bool pa_pstream_is_pending(pa_pstream *p) {
1167 bool b;
1168
1169 pa_assert(p);
1170 pa_assert(PA_REFCNT_VALUE(p) > 0);
1171
1172 if (p->dead)
1173 b = false;
1174 else
1175 b = p->write.current || !pa_queue_isempty(p->send_queue);
1176
1177 return b;
1178 }
1179
pa_pstream_unref(pa_pstream * p)1180 void pa_pstream_unref(pa_pstream*p) {
1181 pa_assert(p);
1182 pa_assert(PA_REFCNT_VALUE(p) > 0);
1183
1184 if (PA_REFCNT_DEC(p) <= 0)
1185 pstream_free(p);
1186 }
1187
pa_pstream_ref(pa_pstream * p)1188 pa_pstream* pa_pstream_ref(pa_pstream*p) {
1189 pa_assert(p);
1190 pa_assert(PA_REFCNT_VALUE(p) > 0);
1191
1192 PA_REFCNT_INC(p);
1193 return p;
1194 }
1195
pa_pstream_unlink(pa_pstream * p)1196 void pa_pstream_unlink(pa_pstream *p) {
1197 pa_assert(p);
1198
1199 if (p->dead)
1200 return;
1201
1202 p->dead = true;
1203
1204 while (p->srb || p->is_srbpending) /* In theory there could be one active and one pending */
1205 pa_pstream_set_srbchannel(p, NULL);
1206
1207 if (p->import) {
1208 pa_memimport_free(p->import);
1209 p->import = NULL;
1210 }
1211
1212 if (p->export) {
1213 pa_memexport_free(p->export);
1214 p->export = NULL;
1215 }
1216
1217 if (p->io) {
1218 pa_iochannel_free(p->io);
1219 p->io = NULL;
1220 }
1221
1222 if (p->defer_event) {
1223 p->mainloop->defer_free(p->defer_event);
1224 p->defer_event = NULL;
1225 }
1226
1227 p->die_callback = NULL;
1228 p->drain_callback = NULL;
1229 p->receive_packet_callback = NULL;
1230 p->receive_memblock_callback = NULL;
1231 }
1232
pa_pstream_enable_shm(pa_pstream * p,bool enable)1233 void pa_pstream_enable_shm(pa_pstream *p, bool enable) {
1234 pa_assert(p);
1235 pa_assert(PA_REFCNT_VALUE(p) > 0);
1236
1237 p->use_shm = enable;
1238
1239 if (enable) {
1240
1241 if (!p->export)
1242 p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
1243
1244 } else {
1245
1246 if (p->export) {
1247 pa_memexport_free(p->export);
1248 p->export = NULL;
1249 }
1250 }
1251 }
1252
pa_pstream_enable_memfd(pa_pstream * p)1253 void pa_pstream_enable_memfd(pa_pstream *p) {
1254 pa_assert(p);
1255 pa_assert(PA_REFCNT_VALUE(p) > 0);
1256 pa_assert(p->use_shm);
1257
1258 p->use_memfd = true;
1259
1260 if (!p->registered_memfd_ids) {
1261 p->registered_memfd_ids = pa_idxset_new(NULL, NULL);
1262 }
1263 }
1264
pa_pstream_get_shm(pa_pstream * p)1265 bool pa_pstream_get_shm(pa_pstream *p) {
1266 pa_assert(p);
1267 pa_assert(PA_REFCNT_VALUE(p) > 0);
1268
1269 return p->use_shm;
1270 }
1271
pa_pstream_get_memfd(pa_pstream * p)1272 bool pa_pstream_get_memfd(pa_pstream *p) {
1273 pa_assert(p);
1274 pa_assert(PA_REFCNT_VALUE(p) > 0);
1275
1276 return p->use_memfd;
1277 }
1278
pa_pstream_set_srbchannel(pa_pstream * p,pa_srbchannel * srb)1279 void pa_pstream_set_srbchannel(pa_pstream *p, pa_srbchannel *srb) {
1280 pa_assert(p);
1281 pa_assert(PA_REFCNT_VALUE(p) > 0 || srb == NULL);
1282
1283 if (srb == p->srb)
1284 return;
1285
1286 /* We can't handle quick switches between srbchannels. */
1287 pa_assert(!p->is_srbpending);
1288
1289 p->srbpending = srb;
1290 p->is_srbpending = true;
1291
1292 /* Switch immediately, if possible. */
1293 if (p->dead)
1294 check_srbpending(p);
1295 else
1296 do_write(p);
1297 }
1298