1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as
9 published by the Free Software Foundation; either version 2.1 of the
10 License, or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public
18 License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <unistd.h>
28
29 #ifdef HAVE_NETINET_IN_H
30 #include <netinet/in.h>
31 #endif
32
33 #include <pulse/xmalloc.h>
34
35 #include <pulsecore/idxset.h>
36 #include <pulsecore/socket.h>
37 #include <pulsecore/queue.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/creds.h>
40 #include <pulsecore/refcnt.h>
41 #include <pulsecore/flist.h>
42 #include <pulsecore/macro.h>
43
44 #include "pstream.h"
45
46 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
47 #define PA_FLAG_SHMDATA 0x80000000LU
48 #define PA_FLAG_SHMDATA_MEMFD_BLOCK 0x20000000LU
49 #define PA_FLAG_SHMRELEASE 0x40000000LU
50 #define PA_FLAG_SHMREVOKE 0xC0000000LU
51 #define PA_FLAG_SHMMASK 0xFF000000LU
52 #define PA_FLAG_SEEKMASK 0x000000FFLU
53 #define PA_FLAG_SHMWRITABLE 0x00800000LU
54
55 /* The sequence descriptor header consists of 5 32bit integers: */
56 enum {
57 PA_PSTREAM_DESCRIPTOR_LENGTH,
58 PA_PSTREAM_DESCRIPTOR_CHANNEL,
59 PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
60 PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
61 PA_PSTREAM_DESCRIPTOR_FLAGS,
62 PA_PSTREAM_DESCRIPTOR_MAX
63 };
64
65 /* If we have an SHM block, this info follows the descriptor */
66 enum {
67 PA_PSTREAM_SHM_BLOCKID,
68 PA_PSTREAM_SHM_SHMID,
69 PA_PSTREAM_SHM_INDEX,
70 PA_PSTREAM_SHM_LENGTH,
71 PA_PSTREAM_SHM_MAX
72 };
73
74 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
75
76 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
77
78 #define MINIBUF_SIZE (256)
79
80 /* To allow uploading a single sample in one frame, this value should be the
81 * same size (16 MB) as PA_SCACHE_ENTRY_SIZE_MAX from pulsecore/core-scache.h.
82 */
83 #define FRAME_SIZE_MAX_ALLOW (1024*1024*16)
84
85 /* Default memblock alignment used with pa_pstream_send_memblock()
86 */
87 #define DEFAULT_PSTREAM_MEMBLOCK_ALIGN (256)
88
89 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
90
91 struct item_info {
92 enum {
93 PA_PSTREAM_ITEM_PACKET,
94 PA_PSTREAM_ITEM_MEMBLOCK,
95 PA_PSTREAM_ITEM_SHMRELEASE,
96 PA_PSTREAM_ITEM_SHMREVOKE
97 } type;
98
99 /* packet info */
100 pa_packet *packet;
101 #ifdef HAVE_CREDS
102 bool with_ancil_data;
103 pa_cmsg_ancil_data ancil_data;
104 #endif
105
106 /* memblock info */
107 pa_memchunk chunk;
108 uint32_t channel;
109 int64_t offset;
110 pa_seek_mode_t seek_mode;
111
112 /* release/revoke info */
113 uint32_t block_id;
114 };
115
116 struct pstream_read {
117 pa_pstream_descriptor descriptor;
118 pa_memblock *memblock;
119 pa_packet *packet;
120 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
121 void *data;
122 size_t index;
123 };
124
125 struct pa_pstream {
126 PA_REFCNT_DECLARE;
127
128 pa_mainloop_api *mainloop;
129 pa_defer_event *defer_event;
130 pa_iochannel *io;
131 pa_srbchannel *srb, *srbpending;
132 bool is_srbpending;
133
134 pa_queue *send_queue;
135
136 bool dead;
137
138 struct {
139 union {
140 uint8_t minibuf[MINIBUF_SIZE];
141 pa_pstream_descriptor descriptor;
142 };
143 struct item_info* current;
144 void *data;
145 size_t index;
146 int minibuf_validsize;
147 pa_memchunk memchunk;
148 } write;
149
150 struct pstream_read readio, readsrb;
151
152 /* @use_shm: beside copying the full audio data to the other
153 * PA end, this pipe supports just sending references of the
154 * same audio data blocks if they reside in a SHM pool.
155 *
156 * @use_memfd: pipe supports sending SHM memfd block references
157 *
158 * @registered_memfd_ids: registered memfd pools SHM IDs. Check
159 * pa_pstream_register_memfd_mempool() for more information. */
160 bool use_shm, use_memfd;
161 bool non_registered_memfd_id_error_logged;
162 pa_idxset *registered_memfd_ids;
163
164 pa_memimport *import;
165 pa_memexport *export;
166
167 pa_pstream_packet_cb_t receive_packet_callback;
168 void *receive_packet_callback_userdata;
169
170 pa_pstream_memblock_cb_t receive_memblock_callback;
171 void *receive_memblock_callback_userdata;
172
173 pa_pstream_notify_cb_t drain_callback;
174 void *drain_callback_userdata;
175
176 pa_pstream_notify_cb_t die_callback;
177 void *die_callback_userdata;
178
179 pa_pstream_block_id_cb_t revoke_callback;
180 void *revoke_callback_userdata;
181
182 pa_pstream_block_id_cb_t release_callback;
183 void *release_callback_userdata;
184
185 pa_mempool *mempool;
186
187 #ifdef HAVE_CREDS
188 pa_cmsg_ancil_data read_ancil_data, *write_ancil_data;
189 bool send_ancil_data_now;
190 #endif
191 };
192
193 #ifdef HAVE_CREDS
194 /*
195 * memfd-backed SHM pools blocks transfer occur without passing the pool's
196 * fd every time, thus minimizing overhead and avoiding fd leaks. A
197 * REGISTER_MEMFD_SHMID command is sent, with the pool's memfd fd, very early
198 * on. This command has an ID that uniquely identifies the pool in question.
199 * Further pool's block references can then be exclusively done using such ID;
200 * the fd can be safely closed – on both ends – afterwards.
201 *
202 * On the sending side of this command, we want to close the passed fds
203 * directly after being sent. Meanwhile we're only allowed to asynchronously
204 * schedule packet writes to the pstream, so the job of closing passed fds is
205 * left to the pstream's actual writing function do_write(): it knows the
206 * exact point in time where the fds are passed to the other end through
207 * iochannels and the sendmsg() system call.
208 *
209 * Nonetheless not all code paths in the system desire their socket-passed
210 * fds to be closed after the send. srbchannel needs the passed fds to still
211 * be open for further communication. System-wide global memfd-backed pools
212 * also require the passed fd to be open: they pass the same fd, with the same
213 * ID registration mechanism, for each newly connected client to the system.
214 *
215 * So from all of the above, never close the ancillary fds by your own and
216 * always call below method instead. It takes care of closing the passed fds
217 * _only if allowed_ by the code paths that originally created them to do so.
218 * Moreover, it is multiple-invocations safe: failure handlers can, and
219 * should, call it for passed fds cleanup without worrying too much about
220 * the system state.
221 */
pa_cmsg_ancil_data_close_fds(struct pa_cmsg_ancil_data * ancil)222 void pa_cmsg_ancil_data_close_fds(struct pa_cmsg_ancil_data *ancil) {
223 if (ancil && ancil->nfd > 0 && ancil->close_fds_on_cleanup) {
224 int i;
225
226 pa_assert(ancil->nfd <= MAX_ANCIL_DATA_FDS);
227
228 for (i = 0; i < ancil->nfd; i++)
229 if (ancil->fds[i] != -1) {
230 pa_assert_se(pa_close(ancil->fds[i]) == 0);
231 ancil->fds[i] = -1;
232 }
233
234 ancil->nfd = 0;
235 ancil->close_fds_on_cleanup = false;
236 }
237 }
238 #endif
239
240 static int do_write(pa_pstream *p);
241 static int do_read(pa_pstream *p, struct pstream_read *re);
242
do_pstream_read_write(pa_pstream * p)243 static void do_pstream_read_write(pa_pstream *p) {
244 pa_assert(p);
245 pa_assert(PA_REFCNT_VALUE(p) > 0);
246
247 pa_pstream_ref(p);
248
249 p->mainloop->defer_enable(p->defer_event, 0);
250
251 if (!p->dead && p->srb) {
252 int r = 0;
253
254 if(do_write(p) < 0)
255 goto fail;
256
257 while (!p->dead && r == 0) {
258 r = do_read(p, &p->readsrb);
259 if (r < 0)
260 goto fail;
261 }
262 }
263
264 if (!p->dead && pa_iochannel_is_readable(p->io)) {
265 if (do_read(p, &p->readio) < 0)
266 goto fail;
267 } else if (!p->dead && pa_iochannel_is_hungup(p->io))
268 goto fail;
269
270 while (!p->dead && pa_iochannel_is_writable(p->io)) {
271 int r = do_write(p);
272 if (r < 0)
273 goto fail;
274 if (r == 0)
275 break;
276 }
277
278 pa_pstream_unref(p);
279 return;
280
281 fail:
282
283 if (p->die_callback)
284 p->die_callback(p, p->die_callback_userdata);
285
286 pa_pstream_unlink(p);
287 pa_pstream_unref(p);
288 }
289
srb_callback(pa_srbchannel * srb,void * userdata)290 static bool srb_callback(pa_srbchannel *srb, void *userdata) {
291 bool b;
292 pa_pstream *p = userdata;
293
294 pa_assert(p);
295 pa_assert(PA_REFCNT_VALUE(p) > 0);
296 pa_assert(p->srb == srb);
297
298 pa_pstream_ref(p);
299
300 do_pstream_read_write(p);
301
302 /* If either pstream or the srb is going away, return false.
303 We need to check this before p is destroyed. */
304 b = (PA_REFCNT_VALUE(p) > 1) && (p->srb == srb);
305 pa_pstream_unref(p);
306
307 return b;
308 }
309
io_callback(pa_iochannel * io,void * userdata)310 static void io_callback(pa_iochannel*io, void *userdata) {
311 pa_pstream *p = userdata;
312
313 pa_assert(p);
314 pa_assert(PA_REFCNT_VALUE(p) > 0);
315 pa_assert(p->io == io);
316
317 do_pstream_read_write(p);
318 }
319
defer_callback(pa_mainloop_api * m,pa_defer_event * e,void * userdata)320 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
321 pa_pstream *p = userdata;
322
323 pa_assert(p);
324 pa_assert(PA_REFCNT_VALUE(p) > 0);
325 pa_assert(p->defer_event == e);
326 pa_assert(p->mainloop == m);
327
328 do_pstream_read_write(p);
329 }
330
331 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
332
pa_pstream_new(pa_mainloop_api * m,pa_iochannel * io,pa_mempool * pool)333 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
334 pa_pstream *p;
335
336 pa_assert(m);
337 pa_assert(io);
338 pa_assert(pool);
339
340 p = pa_xnew0(pa_pstream, 1);
341 PA_REFCNT_INIT(p);
342 p->io = io;
343 pa_iochannel_set_callback(io, io_callback, p);
344
345 p->mainloop = m;
346 p->defer_event = m->defer_new(m, defer_callback, p);
347 m->defer_enable(p->defer_event, 0);
348
349 p->send_queue = pa_queue_new();
350
351 p->mempool = pool;
352
353 /* We do importing unconditionally */
354 p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
355
356 pa_iochannel_socket_set_rcvbuf(io, pa_mempool_block_size_max(p->mempool));
357 pa_iochannel_socket_set_sndbuf(io, pa_mempool_block_size_max(p->mempool));
358
359 return p;
360 }
361
362 /* Attach memfd<->SHM_ID mapping to given pstream and its memimport.
363 * Check pa_pstream_register_memfd_mempool() for further info.
364 *
365 * Caller owns the passed @memfd_fd and must close it down when appropriate. */
pa_pstream_attach_memfd_shmid(pa_pstream * p,unsigned shm_id,int memfd_fd)366 int pa_pstream_attach_memfd_shmid(pa_pstream *p, unsigned shm_id, int memfd_fd) {
367 int err = -1;
368
369 pa_assert(memfd_fd != -1);
370
371 if (!p->use_memfd) {
372 pa_log_warn("Received memfd ID registration request over a pipe "
373 "that does not support memfds");
374 return err;
375 }
376
377 if (pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
378 pa_log_warn("previously registered memfd SHM ID = %u", shm_id);
379 return err;
380 }
381
382 if (pa_memimport_attach_memfd(p->import, shm_id, memfd_fd, true)) {
383 pa_log("Failed to create permanent mapping for memfd region with ID = %u", shm_id);
384 return err;
385 }
386
387 pa_assert_se(pa_idxset_put(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL) == 0);
388 return 0;
389 }
390
item_free(void * item)391 static void item_free(void *item) {
392 struct item_info *i = item;
393 pa_assert(i);
394
395 if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
396 pa_assert(i->chunk.memblock);
397 pa_memblock_unref(i->chunk.memblock);
398 } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
399 pa_assert(i->packet);
400 pa_packet_unref(i->packet);
401 }
402
403 #ifdef HAVE_CREDS
404 /* On error recovery paths, there might be lingering items
405 * on the pstream send queue and they are usually freed with
406 * a call to 'pa_queue_free(p->send_queue, item_free)'. Make
407 * sure we do not leak any fds in that case! */
408 if (i->with_ancil_data)
409 pa_cmsg_ancil_data_close_fds(&i->ancil_data);
410 #endif
411
412 if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
413 pa_xfree(i);
414 }
415
pstream_free(pa_pstream * p)416 static void pstream_free(pa_pstream *p) {
417 pa_assert(p);
418
419 pa_pstream_unlink(p);
420
421 pa_queue_free(p->send_queue, item_free);
422
423 if (p->write.current)
424 item_free(p->write.current);
425
426 if (p->write.memchunk.memblock)
427 pa_memblock_unref(p->write.memchunk.memblock);
428
429 if (p->readsrb.memblock)
430 pa_memblock_unref(p->readsrb.memblock);
431
432 if (p->readsrb.packet)
433 pa_packet_unref(p->readsrb.packet);
434
435 if (p->readio.memblock)
436 pa_memblock_unref(p->readio.memblock);
437
438 if (p->readio.packet)
439 pa_packet_unref(p->readio.packet);
440
441 if (p->registered_memfd_ids)
442 pa_idxset_free(p->registered_memfd_ids, NULL);
443
444 pa_xfree(p);
445 }
446
pa_pstream_send_packet(pa_pstream * p,pa_packet * packet,pa_cmsg_ancil_data * ancil_data)447 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, pa_cmsg_ancil_data *ancil_data) {
448 struct item_info *i;
449
450 pa_assert(p);
451 pa_assert(PA_REFCNT_VALUE(p) > 0);
452 pa_assert(packet);
453
454 if (p->dead) {
455 #ifdef HAVE_CREDS
456 pa_cmsg_ancil_data_close_fds(ancil_data);
457 #endif
458 return;
459 }
460
461 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
462 i = pa_xnew(struct item_info, 1);
463
464 i->type = PA_PSTREAM_ITEM_PACKET;
465 i->packet = pa_packet_ref(packet);
466
467 #ifdef HAVE_CREDS
468 if ((i->with_ancil_data = !!ancil_data)) {
469 i->ancil_data = *ancil_data;
470 if (ancil_data->creds_valid)
471 pa_assert(ancil_data->nfd == 0);
472 else
473 pa_assert(ancil_data->nfd > 0);
474 }
475 #endif
476
477 pa_queue_push(p->send_queue, i);
478 if (PaQueueGetLen(p->send_queue) >= 10) { // 10 maybe have msg backlog
479 pa_log_warn("[MSG backlog]: PaQueueLen = %u", PaQueueGetLen(p->send_queue));
480 }
481
482 p->mainloop->defer_enable(p->defer_event, 1);
483 }
484
pa_pstream_send_memblock(pa_pstream * p,uint32_t channel,int64_t offset,pa_seek_mode_t seek_mode,const pa_memchunk * chunk,size_t align)485 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk, size_t align) {
486 size_t length, idx;
487 size_t bsm;
488
489 pa_assert(p);
490 pa_assert(PA_REFCNT_VALUE(p) > 0);
491 pa_assert(channel != (uint32_t) -1);
492 pa_assert(chunk);
493
494 if (p->dead)
495 return;
496
497 idx = 0;
498 length = chunk->length;
499
500 bsm = pa_mempool_block_size_max(p->mempool);
501
502 if (align == 0)
503 align = DEFAULT_PSTREAM_MEMBLOCK_ALIGN;
504
505 bsm = (bsm / align) * align;
506
507 while (length > 0) {
508 struct item_info *i;
509 size_t n;
510
511 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
512 i = pa_xnew(struct item_info, 1);
513 i->type = PA_PSTREAM_ITEM_MEMBLOCK;
514
515 n = PA_MIN(length, bsm);
516 i->chunk.index = chunk->index + idx;
517 i->chunk.length = n;
518 i->chunk.memblock = pa_memblock_ref(chunk->memblock);
519
520 i->channel = channel;
521 i->offset = offset;
522 i->seek_mode = seek_mode;
523 #ifdef HAVE_CREDS
524 i->with_ancil_data = false;
525 #endif
526
527 pa_queue_push(p->send_queue, i);
528
529 idx += n;
530 length -= n;
531 }
532
533 p->mainloop->defer_enable(p->defer_event, 1);
534 }
535
pa_pstream_send_release(pa_pstream * p,uint32_t block_id)536 void pa_pstream_send_release(pa_pstream *p, uint32_t block_id) {
537 struct item_info *item;
538 pa_assert(p);
539 pa_assert(PA_REFCNT_VALUE(p) > 0);
540
541 if (p->dead)
542 return;
543
544 /* pa_log("Releasing block %u", block_id); */
545
546 if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
547 item = pa_xnew(struct item_info, 1);
548 item->type = PA_PSTREAM_ITEM_SHMRELEASE;
549 item->block_id = block_id;
550 #ifdef HAVE_CREDS
551 item->with_ancil_data = false;
552 #endif
553
554 pa_queue_push(p->send_queue, item);
555 p->mainloop->defer_enable(p->defer_event, 1);
556 }
557
558 /* might be called from thread context */
memimport_release_cb(pa_memimport * i,uint32_t block_id,void * userdata)559 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
560 pa_pstream *p = userdata;
561
562 pa_assert(p);
563 pa_assert(PA_REFCNT_VALUE(p) > 0);
564
565 if (p->dead)
566 return;
567
568 if (p->release_callback)
569 p->release_callback(p, block_id, p->release_callback_userdata);
570 else
571 pa_pstream_send_release(p, block_id);
572 }
573
pa_pstream_send_revoke(pa_pstream * p,uint32_t block_id)574 void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id) {
575 struct item_info *item;
576 pa_assert(p);
577 pa_assert(PA_REFCNT_VALUE(p) > 0);
578
579 if (p->dead)
580 return;
581 /* pa_log("Revoking block %u", block_id); */
582
583 if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
584 item = pa_xnew(struct item_info, 1);
585 item->type = PA_PSTREAM_ITEM_SHMREVOKE;
586 item->block_id = block_id;
587 #ifdef HAVE_CREDS
588 item->with_ancil_data = false;
589 #endif
590
591 pa_queue_push(p->send_queue, item);
592 p->mainloop->defer_enable(p->defer_event, 1);
593 }
594
595 /* might be called from thread context */
memexport_revoke_cb(pa_memexport * e,uint32_t block_id,void * userdata)596 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
597 pa_pstream *p = userdata;
598
599 pa_assert(p);
600 pa_assert(PA_REFCNT_VALUE(p) > 0);
601
602 if (p->revoke_callback)
603 p->revoke_callback(p, block_id, p->revoke_callback_userdata);
604 else
605 pa_pstream_send_revoke(p, block_id);
606 }
607
prepare_next_write_item(pa_pstream * p)608 static void prepare_next_write_item(pa_pstream *p) {
609 pa_assert(p);
610 pa_assert(PA_REFCNT_VALUE(p) > 0);
611
612 p->write.current = pa_queue_pop(p->send_queue);
613
614 if (!p->write.current)
615 return;
616 p->write.index = 0;
617 p->write.data = NULL;
618 p->write.minibuf_validsize = 0;
619 pa_memchunk_reset(&p->write.memchunk);
620
621 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
622 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
623 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
624 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
625 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
626
627 if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
628 size_t plen;
629
630 pa_assert(p->write.current->packet);
631
632 p->write.data = (void *) pa_packet_data(p->write.current->packet, &plen);
633 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) plen);
634
635 if (plen <= MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
636 memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, plen);
637 p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + plen;
638 }
639
640 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
641
642 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
643 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
644
645 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
646
647 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
648 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
649
650 } else {
651 uint32_t flags;
652 bool send_payload = true;
653
654 pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
655 pa_assert(p->write.current->chunk.memblock);
656
657 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
658 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
659 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
660
661 flags = (uint32_t) (p->write.current->seek_mode & PA_FLAG_SEEKMASK);
662
663 if (p->use_shm) {
664 pa_mem_type_t type;
665 uint32_t block_id, shm_id;
666 size_t offset, length;
667 uint32_t *shm_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE];
668 size_t shm_size = sizeof(uint32_t) * PA_PSTREAM_SHM_MAX;
669 pa_mempool *current_pool = pa_memblock_get_pool(p->write.current->chunk.memblock);
670 pa_memexport *current_export;
671
672 if (p->mempool == current_pool)
673 pa_assert_se(current_export = p->export);
674 else
675 pa_assert_se(current_export = pa_memexport_new(current_pool, memexport_revoke_cb, p));
676
677 if (pa_memexport_put(current_export,
678 p->write.current->chunk.memblock,
679 &type,
680 &block_id,
681 &shm_id,
682 &offset,
683 &length) >= 0) {
684
685 if (type == PA_MEM_TYPE_SHARED_POSIX)
686 send_payload = false;
687
688 if (type == PA_MEM_TYPE_SHARED_MEMFD && p->use_memfd) {
689 if (pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
690 flags |= PA_FLAG_SHMDATA_MEMFD_BLOCK;
691 send_payload = false;
692 } else {
693 if (!p->non_registered_memfd_id_error_logged) {
694 pa_log("Cannot send block reference with non-registered memfd ID = %u", shm_id);
695 pa_log("Falling back to copying full block data over socket");
696 pa_log("There's a bug report about this: https://gitlab.freedesktop.org/pulseaudio/pulseaudio/issues/824");
697 p->non_registered_memfd_id_error_logged = true;
698 }
699 }
700 }
701
702 if (send_payload) {
703 pa_assert_se(pa_memexport_process_release(current_export, block_id) == 0);
704 } else {
705 flags |= PA_FLAG_SHMDATA;
706 if (pa_mempool_is_remote_writable(current_pool))
707 flags |= PA_FLAG_SHMWRITABLE;
708
709 shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
710 shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
711 shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
712 shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
713
714 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(shm_size);
715 p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + shm_size;
716 }
717 }
718 /* else */
719 /* FIXME: Avoid memexport slot leaks. Call pa_memexport_process_release() */
720 /* pa_log_warn("Failed to export memory block."); */
721
722 if (current_export != p->export)
723 pa_memexport_free(current_export);
724 pa_mempool_unref(current_pool);
725 }
726
727 if (send_payload) {
728 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
729 p->write.memchunk = p->write.current->chunk;
730 pa_memblock_ref(p->write.memchunk.memblock);
731 }
732
733 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
734 }
735
736 #ifdef HAVE_CREDS
737 if ((p->send_ancil_data_now = p->write.current->with_ancil_data))
738 p->write_ancil_data = &p->write.current->ancil_data;
739 #endif
740 }
741
check_srbpending(pa_pstream * p)742 static void check_srbpending(pa_pstream *p) {
743 if (!p->is_srbpending)
744 return;
745
746 if (p->srb)
747 pa_srbchannel_free(p->srb);
748
749 p->srb = p->srbpending;
750 p->is_srbpending = false;
751
752 if (p->srb)
753 pa_srbchannel_set_callback(p->srb, srb_callback, p);
754 }
755
do_write(pa_pstream * p)756 static int do_write(pa_pstream *p) {
757 void *d;
758 size_t l;
759 ssize_t r;
760 pa_memblock *release_memblock = NULL;
761
762 pa_assert(p);
763 pa_assert(PA_REFCNT_VALUE(p) > 0);
764
765 if (!p->write.current)
766 prepare_next_write_item(p);
767
768 if (!p->write.current) {
769 /* The out queue is empty, so switching channels is safe */
770 check_srbpending(p);
771 return 0;
772 }
773
774 if (p->write.minibuf_validsize > 0) {
775 d = p->write.minibuf + p->write.index;
776 l = p->write.minibuf_validsize - p->write.index;
777 } else if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
778 d = (uint8_t*) p->write.descriptor + p->write.index;
779 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
780 } else {
781 pa_assert(p->write.data || p->write.memchunk.memblock);
782
783 if (p->write.data)
784 d = p->write.data;
785 else {
786 d = pa_memblock_acquire_chunk(&p->write.memchunk);
787 release_memblock = p->write.memchunk.memblock;
788 }
789
790 d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
791 l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
792 }
793
794 pa_assert(l > 0);
795
796 #ifdef HAVE_CREDS
797 if (p->send_ancil_data_now) {
798 if (p->write_ancil_data->creds_valid) {
799 pa_assert(p->write_ancil_data->nfd == 0);
800 if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_ancil_data->creds)) < 0)
801 goto fail;
802 }
803 else
804 if ((r = pa_iochannel_write_with_fds(p->io, d, l, p->write_ancil_data->nfd, p->write_ancil_data->fds)) < 0)
805 goto fail;
806
807 pa_cmsg_ancil_data_close_fds(p->write_ancil_data);
808 p->send_ancil_data_now = false;
809 } else
810 #endif
811 if (p->srb)
812 r = pa_srbchannel_write(p->srb, d, l);
813 else if ((r = pa_iochannel_write(p->io, d, l)) < 0)
814 goto fail;
815
816 if (release_memblock)
817 pa_memblock_release(release_memblock);
818
819 p->write.index += (size_t) r;
820
821 if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
822 pa_assert(p->write.current);
823 item_free(p->write.current);
824 p->write.current = NULL;
825
826 if (p->write.memchunk.memblock)
827 pa_memblock_unref(p->write.memchunk.memblock);
828
829 pa_memchunk_reset(&p->write.memchunk);
830
831 if (p->drain_callback && !pa_pstream_is_pending(p))
832 p->drain_callback(p, p->drain_callback_userdata);
833 }
834
835 return (size_t) r == l ? 1 : 0;
836
837 fail:
838 #ifdef HAVE_CREDS
839 if (p->send_ancil_data_now)
840 pa_cmsg_ancil_data_close_fds(p->write_ancil_data);
841 #endif
842
843 if (release_memblock)
844 pa_memblock_release(release_memblock);
845
846 return -1;
847 }
848
memblock_complete(pa_pstream * p,struct pstream_read * re)849 static void memblock_complete(pa_pstream *p, struct pstream_read *re) {
850 pa_memchunk chunk;
851 int64_t offset;
852
853 if (!p->receive_memblock_callback)
854 return;
855
856 chunk.memblock = re->memblock;
857 chunk.index = 0;
858 chunk.length = re->index - PA_PSTREAM_DESCRIPTOR_SIZE;
859
860 offset = (int64_t) (
861 (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
862 (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
863
864 p->receive_memblock_callback(
865 p,
866 ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
867 offset,
868 ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
869 &chunk,
870 p->receive_memblock_callback_userdata);
871 }
872
do_read(pa_pstream * p,struct pstream_read * re)873 static int do_read(pa_pstream *p, struct pstream_read *re) {
874 void *d;
875 size_t l;
876 ssize_t r;
877 pa_memblock *release_memblock = NULL;
878 pa_assert(p);
879 pa_assert(PA_REFCNT_VALUE(p) > 0);
880
881 if (re->index < PA_PSTREAM_DESCRIPTOR_SIZE) {
882 d = (uint8_t*) re->descriptor + re->index;
883 l = PA_PSTREAM_DESCRIPTOR_SIZE - re->index;
884 } else {
885 pa_assert(re->data || re->memblock);
886
887 if (re->data)
888 d = re->data;
889 else {
890 d = pa_memblock_acquire(re->memblock);
891 release_memblock = re->memblock;
892 }
893
894 d = (uint8_t*) d + re->index - PA_PSTREAM_DESCRIPTOR_SIZE;
895 l = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (re->index - PA_PSTREAM_DESCRIPTOR_SIZE);
896 }
897
898 if (re == &p->readsrb) {
899 r = pa_srbchannel_read(p->srb, d, l);
900 if (r == 0) {
901 if (release_memblock)
902 pa_memblock_release(release_memblock);
903 return 1;
904 }
905 }
906 else
907 #ifdef HAVE_CREDS
908 {
909 pa_cmsg_ancil_data b;
910
911 if ((r = pa_iochannel_read_with_ancil_data(p->io, d, l, &b)) <= 0)
912 goto fail;
913
914 if (b.creds_valid) {
915 p->read_ancil_data.creds_valid = true;
916 p->read_ancil_data.creds = b.creds;
917 }
918 if (b.nfd > 0) {
919 pa_assert(b.nfd <= MAX_ANCIL_DATA_FDS);
920 p->read_ancil_data.nfd = b.nfd;
921 memcpy(p->read_ancil_data.fds, b.fds, sizeof(int) * b.nfd);
922 p->read_ancil_data.close_fds_on_cleanup = b.close_fds_on_cleanup;
923 }
924 }
925 #else
926 if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
927 goto fail;
928 #endif
929
930 if (release_memblock)
931 pa_memblock_release(release_memblock);
932
933 re->index += (size_t) r;
934
935 if (re->index == PA_PSTREAM_DESCRIPTOR_SIZE) {
936 uint32_t flags, length, channel;
937 /* Reading of frame descriptor complete */
938
939 flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
940
941 if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
942 pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
943 return -1;
944 }
945
946 if (flags == PA_FLAG_SHMRELEASE) {
947
948 /* This is a SHM memblock release frame with no payload */
949
950 /* pa_log("Got release frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
951
952 pa_assert(p->export);
953 pa_memexport_process_release(p->export, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
954
955 goto frame_done;
956
957 } else if (flags == PA_FLAG_SHMREVOKE) {
958
959 /* This is a SHM memblock revoke frame with no payload */
960
961 /* pa_log("Got revoke frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
962
963 pa_assert(p->import);
964 pa_memimport_process_revoke(p->import, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
965
966 goto frame_done;
967 }
968
969 length = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
970
971 if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
972 pa_log_warn("Received invalid frame size: %lu", (unsigned long) length);
973 return -1;
974 }
975
976 pa_assert(!re->packet && !re->memblock);
977
978 channel = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
979
980 if (channel == (uint32_t) -1) {
981 size_t plen;
982
983 if (flags != 0) {
984 pa_log_warn("Received packet frame with invalid flags value.");
985 return -1;
986 }
987
988 /* Frame is a packet frame */
989 re->packet = pa_packet_new(length);
990 re->data = (void *) pa_packet_data(re->packet, &plen);
991
992 } else {
993
994 if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
995 pa_log_warn("Received memblock frame with invalid seek mode.");
996 return -1;
997 }
998
999 if (((flags & PA_FLAG_SHMMASK) & PA_FLAG_SHMDATA) != 0) {
1000
1001 if (length != sizeof(re->shm_info)) {
1002 pa_log_warn("Received SHM memblock frame with invalid frame length.");
1003 return -1;
1004 }
1005
1006 /* Frame is a memblock frame referencing an SHM memblock */
1007 re->data = re->shm_info;
1008
1009 } else if ((flags & PA_FLAG_SHMMASK) == 0) {
1010
1011 /* Frame is a memblock frame */
1012
1013 re->memblock = pa_memblock_new(p->mempool, length);
1014 re->data = NULL;
1015 } else {
1016
1017 pa_log_warn("Received memblock frame with invalid flags value.");
1018 return -1;
1019 }
1020 }
1021
1022 } else if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
1023 /* Frame complete */
1024
1025 if (re->memblock) {
1026 memblock_complete(p, re);
1027
1028 /* This was a memblock frame. We can unref the memblock now */
1029 pa_memblock_unref(re->memblock);
1030
1031 } else if (re->packet) {
1032
1033 if (p->receive_packet_callback)
1034 #ifdef HAVE_CREDS
1035 p->receive_packet_callback(p, re->packet, &p->read_ancil_data, p->receive_packet_callback_userdata);
1036 #else
1037 p->receive_packet_callback(p, re->packet, NULL, p->receive_packet_callback_userdata);
1038 #endif
1039
1040 pa_packet_unref(re->packet);
1041 } else {
1042 pa_memblock *b = NULL;
1043 uint32_t flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
1044 uint32_t shm_id = ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]);
1045 pa_mem_type_t type = (flags & PA_FLAG_SHMDATA_MEMFD_BLOCK) ?
1046 PA_MEM_TYPE_SHARED_MEMFD : PA_MEM_TYPE_SHARED_POSIX;
1047
1048 pa_assert(((flags & PA_FLAG_SHMMASK) & PA_FLAG_SHMDATA) != 0);
1049 pa_assert(p->import);
1050
1051 if (type == PA_MEM_TYPE_SHARED_MEMFD && p->use_memfd &&
1052 !pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
1053
1054 if (pa_log_ratelimit(PA_LOG_ERROR))
1055 pa_log("Ignoring received block reference with non-registered memfd ID = %u", shm_id);
1056
1057 } else if (!(b = pa_memimport_get(p->import,
1058 type,
1059 ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]),
1060 shm_id,
1061 ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]),
1062 ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]),
1063 !!(flags & PA_FLAG_SHMWRITABLE)))) {
1064
1065 if (pa_log_ratelimit(PA_LOG_DEBUG))
1066 pa_log_debug("Failed to import memory block.");
1067 }
1068
1069 if (p->receive_memblock_callback) {
1070 int64_t offset;
1071 pa_memchunk chunk;
1072
1073 chunk.memblock = b;
1074 chunk.index = 0;
1075 chunk.length = b ? pa_memblock_get_length(b) : ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]);
1076
1077 offset = (int64_t) (
1078 (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
1079 (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
1080
1081 p->receive_memblock_callback(
1082 p,
1083 ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
1084 offset,
1085 ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
1086 &chunk,
1087 p->receive_memblock_callback_userdata);
1088 }
1089
1090 if (b)
1091 pa_memblock_unref(b);
1092 }
1093
1094 goto frame_done;
1095 }
1096
1097 return 0;
1098
1099 frame_done:
1100 re->memblock = NULL;
1101 re->packet = NULL;
1102 re->index = 0;
1103 re->data = NULL;
1104
1105 #ifdef HAVE_CREDS
1106 /* FIXME: Close received ancillary data fds if the pstream's
1107 * receive_packet_callback did not do so.
1108 *
1109 * Malicious clients can attach fds to unknown commands, or attach them
1110 * to commands that does not expect fds. By doing so, server will reach
1111 * its open fd limit and future clients' SHM transfers will always fail.
1112 */
1113 p->read_ancil_data.creds_valid = false;
1114 p->read_ancil_data.nfd = 0;
1115 #endif
1116
1117 return 0;
1118
1119 fail:
1120 if (release_memblock)
1121 pa_memblock_release(release_memblock);
1122
1123 return -1;
1124 }
1125
pa_pstream_set_die_callback(pa_pstream * p,pa_pstream_notify_cb_t cb,void * userdata)1126 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
1127 pa_assert(p);
1128 pa_assert(PA_REFCNT_VALUE(p) > 0);
1129
1130 p->die_callback = cb;
1131 p->die_callback_userdata = userdata;
1132 }
1133
pa_pstream_set_drain_callback(pa_pstream * p,pa_pstream_notify_cb_t cb,void * userdata)1134 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
1135 pa_assert(p);
1136 pa_assert(PA_REFCNT_VALUE(p) > 0);
1137
1138 p->drain_callback = cb;
1139 p->drain_callback_userdata = userdata;
1140 }
1141
pa_pstream_set_receive_packet_callback(pa_pstream * p,pa_pstream_packet_cb_t cb,void * userdata)1142 void pa_pstream_set_receive_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
1143 pa_assert(p);
1144 pa_assert(PA_REFCNT_VALUE(p) > 0);
1145
1146 p->receive_packet_callback = cb;
1147 p->receive_packet_callback_userdata = userdata;
1148 }
1149
pa_pstream_set_receive_memblock_callback(pa_pstream * p,pa_pstream_memblock_cb_t cb,void * userdata)1150 void pa_pstream_set_receive_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
1151 pa_assert(p);
1152 pa_assert(PA_REFCNT_VALUE(p) > 0);
1153
1154 p->receive_memblock_callback = cb;
1155 p->receive_memblock_callback_userdata = userdata;
1156 }
1157
pa_pstream_set_release_callback(pa_pstream * p,pa_pstream_block_id_cb_t cb,void * userdata)1158 void pa_pstream_set_release_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
1159 pa_assert(p);
1160 pa_assert(PA_REFCNT_VALUE(p) > 0);
1161
1162 p->release_callback = cb;
1163 p->release_callback_userdata = userdata;
1164 }
1165
pa_pstream_set_revoke_callback(pa_pstream * p,pa_pstream_block_id_cb_t cb,void * userdata)1166 void pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
1167 pa_assert(p);
1168 pa_assert(PA_REFCNT_VALUE(p) > 0);
1169
1170 p->revoke_callback = cb;
1171 p->revoke_callback_userdata = userdata;
1172 }
1173
pa_pstream_is_pending(pa_pstream * p)1174 bool pa_pstream_is_pending(pa_pstream *p) {
1175 bool b;
1176
1177 pa_assert(p);
1178 pa_assert(PA_REFCNT_VALUE(p) > 0);
1179
1180 if (p->dead)
1181 b = false;
1182 else
1183 b = p->write.current || !pa_queue_isempty(p->send_queue);
1184
1185 return b;
1186 }
1187
pa_pstream_unref(pa_pstream * p)1188 void pa_pstream_unref(pa_pstream*p) {
1189 pa_assert(p);
1190 pa_assert(PA_REFCNT_VALUE(p) > 0);
1191
1192 if (PA_REFCNT_DEC(p) <= 0)
1193 pstream_free(p);
1194 }
1195
pa_pstream_ref(pa_pstream * p)1196 pa_pstream* pa_pstream_ref(pa_pstream*p) {
1197 pa_assert(p);
1198 pa_assert(PA_REFCNT_VALUE(p) > 0);
1199
1200 PA_REFCNT_INC(p);
1201 return p;
1202 }
1203
pa_pstream_unlink(pa_pstream * p)1204 void pa_pstream_unlink(pa_pstream *p) {
1205 pa_assert(p);
1206
1207 if (p->dead)
1208 return;
1209
1210 p->dead = true;
1211
1212 while (p->srb || p->is_srbpending) /* In theory there could be one active and one pending */
1213 pa_pstream_set_srbchannel(p, NULL);
1214
1215 if (p->import) {
1216 pa_memimport_free(p->import);
1217 p->import = NULL;
1218 }
1219
1220 if (p->export) {
1221 pa_memexport_free(p->export);
1222 p->export = NULL;
1223 }
1224
1225 if (p->io) {
1226 pa_iochannel_free(p->io);
1227 p->io = NULL;
1228 }
1229
1230 if (p->defer_event) {
1231 p->mainloop->defer_free(p->defer_event);
1232 p->defer_event = NULL;
1233 }
1234
1235 p->die_callback = NULL;
1236 p->drain_callback = NULL;
1237 p->receive_packet_callback = NULL;
1238 p->receive_memblock_callback = NULL;
1239 }
1240
pa_pstream_enable_shm(pa_pstream * p,bool enable)1241 void pa_pstream_enable_shm(pa_pstream *p, bool enable) {
1242 pa_assert(p);
1243 pa_assert(PA_REFCNT_VALUE(p) > 0);
1244
1245 p->use_shm = enable;
1246
1247 if (enable) {
1248
1249 if (!p->export)
1250 p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
1251
1252 } else {
1253
1254 if (p->export) {
1255 pa_memexport_free(p->export);
1256 p->export = NULL;
1257 }
1258 }
1259 }
1260
pa_pstream_enable_memfd(pa_pstream * p)1261 void pa_pstream_enable_memfd(pa_pstream *p) {
1262 pa_assert(p);
1263 pa_assert(PA_REFCNT_VALUE(p) > 0);
1264 pa_assert(p->use_shm);
1265
1266 p->use_memfd = true;
1267
1268 if (!p->registered_memfd_ids) {
1269 p->registered_memfd_ids = pa_idxset_new(NULL, NULL);
1270 }
1271 }
1272
pa_pstream_get_shm(pa_pstream * p)1273 bool pa_pstream_get_shm(pa_pstream *p) {
1274 pa_assert(p);
1275 pa_assert(PA_REFCNT_VALUE(p) > 0);
1276
1277 return p->use_shm;
1278 }
1279
pa_pstream_get_memfd(pa_pstream * p)1280 bool pa_pstream_get_memfd(pa_pstream *p) {
1281 pa_assert(p);
1282 pa_assert(PA_REFCNT_VALUE(p) > 0);
1283
1284 return p->use_memfd;
1285 }
1286
pa_pstream_set_srbchannel(pa_pstream * p,pa_srbchannel * srb)1287 void pa_pstream_set_srbchannel(pa_pstream *p, pa_srbchannel *srb) {
1288 pa_assert(p);
1289 pa_assert(PA_REFCNT_VALUE(p) > 0 || srb == NULL);
1290
1291 if (srb == p->srb)
1292 return;
1293
1294 /* We can't handle quick switches between srbchannels. */
1295 pa_assert(!p->is_srbpending);
1296
1297 p->srbpending = srb;
1298 p->is_srbpending = true;
1299
1300 /* Switch immediately, if possible. */
1301 if (p->dead)
1302 check_srbpending(p);
1303 else
1304 do_write(p);
1305 }
1306