1 /* GStreamer
2 * Copyright (C) <2009> Collabora Ltd
3 * @author: Olivier Crete <olivier.crete@collabora.co.uk
4 * Copyright (C) <2009> Nokia Inc
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to deal
8 * in the Software without restriction, including without limitation the rights
9 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 * copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22 * THE SOFTWARE.
23 */
24
25
26 #ifdef HAVE_CONFIG_H
27 #include "config.h"
28 #endif
29
30 #ifdef HAVE_OSX
31 #ifndef MSG_NOSIGNAL
32 #define MSG_NOSIGNAL SO_NOSIGPIPE
33 #endif
34 #endif
35
36 #include "shmpipe.h"
37
38 #include <sys/types.h>
39 #include <sys/socket.h>
40 #include <sys/un.h>
41 #include <string.h>
42 #include <stdio.h>
43 #include <errno.h>
44 #include <unistd.h>
45 #include <fcntl.h>
46 #include <limits.h>
47 #include <sys/mman.h>
48 #include <assert.h>
49
50 #include "shmalloc.h"
51
52 /*
53 * The protocol over the pipe is in packets
54 *
55 * The defined types are:
56 * type 1: new shm area
57 * Area length
58 * Size of path (followed by path)
59 *
60 * type 2: Close shm area:
61 * No payload
62 *
63 * type 3: shm buffer
64 * offset
65 * bufsize
66 *
67 * type 4: ack buffer
68 * offset
69 *
70 * Type 4 goes from the client to the server
71 * The rest are from the server to the client
72 * The client should never write in the SHM
73 */
74
75
76 #define LISTEN_BACKLOG 10
77
78 enum
79 {
80 COMMAND_NEW_SHM_AREA = 1,
81 COMMAND_CLOSE_SHM_AREA = 2,
82 COMMAND_NEW_BUFFER = 3,
83 COMMAND_ACK_BUFFER = 4
84 };
85
86 typedef struct _ShmArea ShmArea;
87
88 struct _ShmArea
89 {
90 int id;
91
92 int use_count;
93 int is_writer;
94
95 int shm_fd;
96
97 char *shm_area_buf;
98 size_t shm_area_len;
99
100 char *shm_area_name;
101
102 ShmAllocSpace *allocspace;
103
104 ShmArea *next;
105 };
106
107 struct _ShmBuffer
108 {
109 int use_count;
110
111 ShmArea *shm_area;
112 unsigned long offset;
113 size_t size;
114
115 ShmAllocBlock *ablock;
116
117 ShmBuffer *next;
118
119 void *tag;
120
121 int num_clients;
122 /* This must ALWAYS stay last in the struct */
123 int clients[0];
124 };
125
126
127 struct _ShmPipe
128 {
129 int main_socket;
130 char *socket_path;
131 int use_count;
132 void *data;
133
134 ShmArea *shm_area;
135
136 int next_area_id;
137
138 ShmBuffer *buffers;
139
140 int num_clients;
141 ShmClient *clients;
142
143 mode_t perms;
144 };
145
146 struct _ShmClient
147 {
148 int fd;
149
150 ShmClient *next;
151 };
152
153 struct _ShmBlock
154 {
155 ShmPipe *pipe;
156 ShmArea *area;
157 ShmAllocBlock *ablock;
158 };
159
160 struct CommandBuffer
161 {
162 unsigned int type;
163 int area_id;
164
165 union
166 {
167 struct
168 {
169 size_t size;
170 unsigned int path_size;
171 /* Followed by path */
172 } new_shm_area;
173 struct
174 {
175 unsigned long offset;
176 unsigned long size;
177 } buffer;
178 struct
179 {
180 unsigned long offset;
181 } ack_buffer;
182 } payload;
183 };
184
185 static ShmArea *sp_open_shm (char *path, int id, mode_t perms, size_t size);
186 static void sp_close_shm (ShmArea * area);
187 static int sp_shmbuf_dec (ShmPipe * self, ShmBuffer * buf,
188 ShmBuffer * prev_buf, ShmClient * client, void **tag);
189 static void sp_shm_area_dec (ShmPipe * self, ShmArea * area);
190
191
192
193 #define RETURN_ERROR(format, ...) do { \
194 fprintf (stderr, format, __VA_ARGS__); \
195 sp_writer_close (self, NULL, NULL); \
196 return NULL; \
197 } while (0)
198
199 ShmPipe *
sp_writer_create(const char * path,size_t size,mode_t perms)200 sp_writer_create (const char *path, size_t size, mode_t perms)
201 {
202 ShmPipe *self = spalloc_new (ShmPipe);
203 int flags;
204 struct sockaddr_un sock_un;
205 int i = 0;
206
207 memset (self, 0, sizeof (ShmPipe));
208
209 self->main_socket = socket (PF_UNIX, SOCK_STREAM, 0);
210 self->use_count = 1;
211
212 if (self->main_socket < 0)
213 RETURN_ERROR ("Could not create socket (%d): %s\n", errno,
214 strerror (errno));
215
216 flags = fcntl (self->main_socket, F_GETFL, 0);
217 if (flags < 0)
218 RETURN_ERROR ("fcntl(F_GETFL) failed (%d): %s\n", errno, strerror (errno));
219
220 if (fcntl (self->main_socket, F_SETFL, flags | O_NONBLOCK | FD_CLOEXEC) < 0)
221 RETURN_ERROR ("fcntl(F_SETFL) failed (%d): %s\n", errno, strerror (errno));
222
223 sock_un.sun_family = AF_UNIX;
224 strncpy (sock_un.sun_path, path, sizeof (sock_un.sun_path) - 1);
225
226 while (bind (self->main_socket, (struct sockaddr *) &sock_un,
227 sizeof (struct sockaddr_un)) < 0) {
228 if (errno != EADDRINUSE)
229 RETURN_ERROR ("bind() failed (%d): %s\n", errno, strerror (errno));
230
231 if (i > 256)
232 RETURN_ERROR ("Could not find a free socket name for %s", path);
233
234 snprintf (sock_un.sun_path, sizeof (sock_un.sun_path), "%s.%d", path, i);
235 i++;
236 }
237
238 self->socket_path = strdup (sock_un.sun_path);
239
240 if (chmod (self->socket_path, perms) < 0)
241 RETURN_ERROR ("failed to set socket permissions (%d): %s\n", errno,
242 strerror (errno));
243
244 if (listen (self->main_socket, LISTEN_BACKLOG) < 0)
245 RETURN_ERROR ("listen() failed (%d): %s\n", errno, strerror (errno));
246
247 self->shm_area = sp_open_shm (NULL, ++self->next_area_id, perms, size);
248
249 self->perms = perms;
250
251 if (!self->shm_area)
252 RETURN_ERROR ("Could not open shm area (%d): %s", errno, strerror (errno));
253
254 return self;
255 }
256
257 #undef RETURN_ERROR
258
259 #define RETURN_ERROR(format, ...) do { \
260 fprintf (stderr, format, __VA_ARGS__); \
261 area->use_count--; \
262 sp_close_shm (area); \
263 return NULL; \
264 } while (0)
265
266 /* sp_open_shm:
267 * @path: Path of the shm area for a reader,
268 * NULL if this is a writer (then it will allocate its own path)
269 *
270 * Opens a ShmArea
271 */
272
273 static ShmArea *
sp_open_shm(char * path,int id,mode_t perms,size_t size)274 sp_open_shm (char *path, int id, mode_t perms, size_t size)
275 {
276 ShmArea *area = spalloc_new (ShmArea);
277 char tmppath[32];
278 int flags;
279 int prot;
280 int i = 0;
281
282 memset (area, 0, sizeof (ShmArea));
283
284 area->shm_area_buf = MAP_FAILED;
285 area->use_count = 1;
286
287 area->shm_area_len = size;
288
289 area->is_writer = (path == NULL);
290
291
292 if (path)
293 flags = O_RDONLY;
294 else
295 #ifdef HAVE_OSX
296 flags = O_RDWR | O_CREAT | O_EXCL;
297 #else
298 flags = O_RDWR | O_CREAT | O_TRUNC | O_EXCL;
299 #endif
300
301 area->shm_fd = -1;
302
303 if (path) {
304 area->shm_fd = shm_open (path, flags, perms);
305 } else {
306 do {
307 snprintf (tmppath, sizeof (tmppath), "/shmpipe.%5d.%5d", getpid (), i++);
308 area->shm_fd = shm_open (tmppath, flags, perms);
309 } while (area->shm_fd < 0 && errno == EEXIST);
310 }
311
312 if (area->shm_fd < 0)
313 RETURN_ERROR ("shm_open failed on %s (%d): %s\n",
314 path ? path : tmppath, errno, strerror (errno));
315
316 if (!path) {
317 area->shm_area_name = strdup (tmppath);
318
319 if (ftruncate (area->shm_fd, size))
320 RETURN_ERROR ("Could not resize memory area to header size,"
321 " ftruncate failed (%d): %s\n", errno, strerror (errno));
322
323 prot = PROT_READ | PROT_WRITE;
324 } else {
325 area->shm_area_name = strdup (path);
326 prot = PROT_READ;
327 }
328
329 area->shm_area_buf = mmap (NULL, size, prot, MAP_SHARED, area->shm_fd, 0);
330
331 if (area->shm_area_buf == MAP_FAILED)
332 RETURN_ERROR ("mmap failed (%d): %s\n", errno, strerror (errno));
333
334 area->id = id;
335
336 if (!path)
337 area->allocspace = shm_alloc_space_new (area->shm_area_len);
338
339 return area;
340 }
341
342 #undef RETURN_ERROR
343
344 static void
sp_close_shm(ShmArea * area)345 sp_close_shm (ShmArea * area)
346 {
347 assert (area->use_count == 0);
348
349 if (area->allocspace)
350 shm_alloc_space_free (area->allocspace);
351
352 if (area->shm_area_buf != MAP_FAILED)
353 munmap (area->shm_area_buf, area->shm_area_len);
354
355 if (area->shm_fd >= 0)
356 close (area->shm_fd);
357
358 if (area->shm_area_name) {
359 if (area->is_writer)
360 shm_unlink (area->shm_area_name);
361 free (area->shm_area_name);
362 }
363
364 spalloc_free (ShmArea, area);
365 }
366
367 static void
sp_shm_area_inc(ShmArea * area)368 sp_shm_area_inc (ShmArea * area)
369 {
370 area->use_count++;
371 }
372
373 static void
sp_shm_area_dec(ShmPipe * self,ShmArea * area)374 sp_shm_area_dec (ShmPipe * self, ShmArea * area)
375 {
376 assert (area->use_count > 0);
377 area->use_count--;
378
379 if (area->use_count == 0) {
380 ShmArea *item = NULL;
381 ShmArea *prev_item = NULL;
382
383 for (item = self->shm_area; item; item = item->next) {
384 if (item == area) {
385 if (prev_item)
386 prev_item->next = item->next;
387 else
388 self->shm_area = item->next;
389 break;
390 }
391 prev_item = item;
392 }
393 assert (item);
394
395 sp_close_shm (area);
396 }
397 }
398
399 void *
sp_get_data(ShmPipe * self)400 sp_get_data (ShmPipe * self)
401 {
402 return self->data;
403 }
404
405 void
sp_set_data(ShmPipe * self,void * data)406 sp_set_data (ShmPipe * self, void *data)
407 {
408 self->data = data;
409 }
410
411 static void
sp_inc(ShmPipe * self)412 sp_inc (ShmPipe * self)
413 {
414 self->use_count++;
415 }
416
417 static void
sp_dec(ShmPipe * self)418 sp_dec (ShmPipe * self)
419 {
420 self->use_count--;
421
422 if (self->use_count > 0)
423 return;
424
425 while (self->shm_area)
426 sp_shm_area_dec (self, self->shm_area);
427
428 spalloc_free (ShmPipe, self);
429 }
430
431 void
sp_writer_close(ShmPipe * self,sp_buffer_free_callback callback,void * user_data)432 sp_writer_close (ShmPipe * self, sp_buffer_free_callback callback,
433 void *user_data)
434 {
435 if (self->main_socket >= 0) {
436 shutdown (self->main_socket, SHUT_RDWR);
437 close (self->main_socket);
438 }
439
440 if (self->socket_path) {
441 unlink (self->socket_path);
442 free (self->socket_path);
443 }
444
445 while (self->clients)
446 sp_writer_close_client (self, self->clients, callback, user_data);
447
448 sp_dec (self);
449 }
450
451 void
sp_client_close(ShmPipe * self)452 sp_client_close (ShmPipe * self)
453 {
454 sp_writer_close (self, NULL, NULL);
455 }
456
457
458 int
sp_writer_setperms_shm(ShmPipe * self,mode_t perms)459 sp_writer_setperms_shm (ShmPipe * self, mode_t perms)
460 {
461 int ret = 0;
462 ShmArea *area;
463
464 self->perms = perms;
465 for (area = self->shm_area; area; area = area->next)
466 ret |= fchmod (area->shm_fd, perms);
467
468 ret |= chmod (self->socket_path, perms);
469
470 return ret;
471 }
472
473 static int
send_command(int fd,struct CommandBuffer * cb,unsigned short int type,int area_id)474 send_command (int fd, struct CommandBuffer *cb, unsigned short int type,
475 int area_id)
476 {
477 cb->type = type;
478 cb->area_id = area_id;
479
480 if (send (fd, cb, sizeof (struct CommandBuffer), MSG_NOSIGNAL) !=
481 sizeof (struct CommandBuffer))
482 return 0;
483
484 return 1;
485 }
486
487 int
sp_writer_resize(ShmPipe * self,size_t size)488 sp_writer_resize (ShmPipe * self, size_t size)
489 {
490 ShmArea *newarea;
491 ShmArea *old_current;
492 ShmClient *client;
493 int c = 0;
494 int pathlen;
495
496 if (self->shm_area->shm_area_len == size)
497 return 0;
498
499 newarea = sp_open_shm (NULL, ++self->next_area_id, self->perms, size);
500
501 if (!newarea)
502 return -1;
503
504 old_current = self->shm_area;
505 newarea->next = self->shm_area;
506 self->shm_area = newarea;
507
508 pathlen = strlen (newarea->shm_area_name) + 1;
509
510 for (client = self->clients; client; client = client->next) {
511 struct CommandBuffer cb = { 0 };
512
513 if (!send_command (client->fd, &cb, COMMAND_CLOSE_SHM_AREA,
514 old_current->id))
515 continue;
516
517 cb.payload.new_shm_area.size = newarea->shm_area_len;
518 cb.payload.new_shm_area.path_size = pathlen;
519 if (!send_command (client->fd, &cb, COMMAND_NEW_SHM_AREA, newarea->id))
520 continue;
521
522 if (send (client->fd, newarea->shm_area_name, pathlen, MSG_NOSIGNAL) !=
523 pathlen)
524 continue;
525 c++;
526 }
527
528 sp_shm_area_dec (self, old_current);
529
530
531 return c;
532 }
533
534 ShmBlock *
sp_writer_alloc_block(ShmPipe * self,size_t size)535 sp_writer_alloc_block (ShmPipe * self, size_t size)
536 {
537 ShmBlock *block;
538 ShmAllocBlock *ablock =
539 shm_alloc_space_alloc_block (self->shm_area->allocspace, size);
540
541 if (!ablock)
542 return NULL;
543
544 block = spalloc_new (ShmBlock);
545 sp_shm_area_inc (self->shm_area);
546 block->pipe = self;
547 block->area = self->shm_area;
548 block->ablock = ablock;
549 sp_inc (self);
550 return block;
551 }
552
553 char *
sp_writer_block_get_buf(ShmBlock * block)554 sp_writer_block_get_buf (ShmBlock * block)
555 {
556 return block->area->shm_area_buf +
557 shm_alloc_space_alloc_block_get_offset (block->ablock);
558 }
559
560 ShmPipe *
sp_writer_block_get_pipe(ShmBlock * block)561 sp_writer_block_get_pipe (ShmBlock * block)
562 {
563 return block->pipe;
564 }
565
566 void
sp_writer_free_block(ShmBlock * block)567 sp_writer_free_block (ShmBlock * block)
568 {
569 shm_alloc_space_block_dec (block->ablock);
570 sp_shm_area_dec (block->pipe, block->area);
571 sp_dec (block->pipe);
572 spalloc_free (ShmBlock, block);
573 }
574
575 /* Returns the number of client this has successfully been sent to */
576
577 int
sp_writer_send_buf(ShmPipe * self,char * buf,size_t size,void * tag)578 sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, void *tag)
579 {
580 ShmArea *area = NULL;
581 unsigned long offset = 0;
582 unsigned long bsize = size;
583 ShmBuffer *sb;
584 ShmClient *client = NULL;
585 ShmAllocBlock *ablock = NULL;
586 int i = 0;
587 int c = 0;
588
589 if (self->num_clients == 0)
590 return 0;
591
592 for (area = self->shm_area; area; area = area->next) {
593 if (buf >= area->shm_area_buf &&
594 buf < (area->shm_area_buf + area->shm_area_len)) {
595 offset = buf - area->shm_area_buf;
596 ablock = shm_alloc_space_block_get (area->allocspace, offset);
597 assert (ablock);
598 break;
599 }
600 }
601
602 if (!ablock)
603 return -1;
604
605 sb = spalloc_alloc (sizeof (ShmBuffer) + sizeof (int) * self->num_clients);
606 memset (sb, 0, sizeof (ShmBuffer));
607 memset (sb->clients, -1, sizeof (int) * self->num_clients);
608 sb->shm_area = area;
609 sb->offset = offset;
610 sb->size = size;
611 sb->num_clients = self->num_clients;
612 sb->ablock = ablock;
613 sb->tag = tag;
614
615 for (client = self->clients; client; client = client->next) {
616 struct CommandBuffer cb = { 0 };
617 cb.payload.buffer.offset = offset;
618 cb.payload.buffer.size = bsize;
619 if (!send_command (client->fd, &cb, COMMAND_NEW_BUFFER, self->shm_area->id))
620 continue;
621 sb->clients[i++] = client->fd;
622 c++;
623 }
624
625 if (c == 0) {
626 spalloc_free1 (sizeof (ShmBuffer) + sizeof (int) * sb->num_clients, sb);
627 return 0;
628 }
629
630 sp_shm_area_inc (area);
631 shm_alloc_space_block_inc (ablock);
632
633 sb->use_count = c;
634
635 sb->next = self->buffers;
636 self->buffers = sb;
637
638 return c;
639 }
640
641 static int
recv_command(int fd,struct CommandBuffer * cb)642 recv_command (int fd, struct CommandBuffer *cb)
643 {
644 int retval;
645
646 retval = recv (fd, cb, sizeof (struct CommandBuffer), MSG_DONTWAIT);
647 if (retval == sizeof (struct CommandBuffer)) {
648 return 1;
649 } else {
650 return 0;
651 }
652 }
653
654 long int
sp_client_recv(ShmPipe * self,char ** buf)655 sp_client_recv (ShmPipe * self, char **buf)
656 {
657 char *area_name = NULL;
658 ShmArea *newarea;
659 ShmArea *area;
660 struct CommandBuffer cb;
661 int retval;
662
663 if (!recv_command (self->main_socket, &cb))
664 return -1;
665
666 switch (cb.type) {
667 case COMMAND_NEW_SHM_AREA:
668 assert (cb.payload.new_shm_area.path_size > 0);
669 assert (cb.payload.new_shm_area.size > 0);
670
671 area_name = malloc (cb.payload.new_shm_area.path_size + 1);
672 retval = recv (self->main_socket, area_name,
673 cb.payload.new_shm_area.path_size, 0);
674 if (retval != cb.payload.new_shm_area.path_size) {
675 free (area_name);
676 return -3;
677 }
678 /* Ensure area_name is NULL terminated */
679 area_name[retval] = 0;
680
681 newarea = sp_open_shm (area_name, cb.area_id, 0,
682 cb.payload.new_shm_area.size);
683 free (area_name);
684 if (!newarea)
685 return -4;
686
687 newarea->next = self->shm_area;
688 self->shm_area = newarea;
689 break;
690
691 case COMMAND_CLOSE_SHM_AREA:
692 for (area = self->shm_area; area; area = area->next) {
693 if (area->id == cb.area_id) {
694 sp_shm_area_dec (self, area);
695 break;
696 }
697 }
698 break;
699
700 case COMMAND_NEW_BUFFER:
701 assert (buf);
702 for (area = self->shm_area; area; area = area->next) {
703 if (area->id == cb.area_id) {
704 *buf = area->shm_area_buf + cb.payload.buffer.offset;
705 sp_shm_area_inc (area);
706 return cb.payload.buffer.size;
707 }
708 }
709 return -23;
710
711 default:
712 return -99;
713 }
714
715 return 0;
716 }
717
718 int
sp_writer_recv(ShmPipe * self,ShmClient * client,void ** tag)719 sp_writer_recv (ShmPipe * self, ShmClient * client, void **tag)
720 {
721 ShmBuffer *buf = NULL, *prev_buf = NULL;
722 struct CommandBuffer cb;
723
724 if (!recv_command (client->fd, &cb))
725 return -1;
726
727 switch (cb.type) {
728 case COMMAND_ACK_BUFFER:
729
730 for (buf = self->buffers; buf; buf = buf->next) {
731 if (buf->shm_area->id == cb.area_id &&
732 buf->offset == cb.payload.ack_buffer.offset) {
733 return sp_shmbuf_dec (self, buf, prev_buf, client, tag);
734 }
735 prev_buf = buf;
736 }
737
738 return -2;
739 default:
740 return -99;
741 }
742
743 return 0;
744 }
745
746 int
sp_client_recv_finish(ShmPipe * self,char * buf)747 sp_client_recv_finish (ShmPipe * self, char *buf)
748 {
749 ShmArea *shm_area = NULL;
750 unsigned long offset;
751 struct CommandBuffer cb = { 0 };
752
753 for (shm_area = self->shm_area; shm_area; shm_area = shm_area->next) {
754 if (buf >= shm_area->shm_area_buf &&
755 buf < shm_area->shm_area_buf + shm_area->shm_area_len)
756 break;
757 }
758
759 assert (shm_area);
760
761 offset = buf - shm_area->shm_area_buf;
762
763 sp_shm_area_dec (self, shm_area);
764
765 cb.payload.ack_buffer.offset = offset;
766 return send_command (self->main_socket, &cb, COMMAND_ACK_BUFFER,
767 self->shm_area->id);
768 }
769
770 ShmPipe *
sp_client_open(const char * path)771 sp_client_open (const char *path)
772 {
773 ShmPipe *self = spalloc_new (ShmPipe);
774 struct sockaddr_un sock_un;
775 int flags;
776
777 memset (self, 0, sizeof (ShmPipe));
778
779 self->main_socket = socket (PF_UNIX, SOCK_STREAM, 0);
780 self->use_count = 1;
781
782 if (self->main_socket < 0)
783 goto error;
784
785 flags = fcntl (self->main_socket, F_GETFL, 0);
786 if (flags < 0)
787 goto error;
788
789 if (fcntl (self->main_socket, F_SETFL, flags | FD_CLOEXEC) < 0)
790 goto error;
791
792 sock_un.sun_family = AF_UNIX;
793 strncpy (sock_un.sun_path, path, sizeof (sock_un.sun_path) - 1);
794
795 if (connect (self->main_socket, (struct sockaddr *) &sock_un,
796 sizeof (struct sockaddr_un)) < 0)
797 goto error;
798
799 return self;
800
801 error:
802 sp_client_close (self);
803 return NULL;
804 }
805
806
807 ShmClient *
sp_writer_accept_client(ShmPipe * self)808 sp_writer_accept_client (ShmPipe * self)
809 {
810 ShmClient *client = NULL;
811 int fd;
812 struct CommandBuffer cb = { 0 };
813 int pathlen = strlen (self->shm_area->shm_area_name) + 1;
814
815
816 fd = accept (self->main_socket, NULL, NULL);
817
818 if (fd < 0) {
819 fprintf (stderr, "Could not client connection");
820 return NULL;
821 }
822
823 cb.payload.new_shm_area.size = self->shm_area->shm_area_len;
824 cb.payload.new_shm_area.path_size = pathlen;
825 if (!send_command (fd, &cb, COMMAND_NEW_SHM_AREA, self->shm_area->id)) {
826 fprintf (stderr, "Sending new shm area failed: %s", strerror (errno));
827 goto error;
828 }
829
830 if (send (fd, self->shm_area->shm_area_name, pathlen, MSG_NOSIGNAL) !=
831 pathlen) {
832 fprintf (stderr, "Sending new shm area path failed: %s", strerror (errno));
833 goto error;
834 }
835
836 client = spalloc_new (ShmClient);
837 client->fd = fd;
838
839 /* Prepend ot linked list */
840 client->next = self->clients;
841 self->clients = client;
842 self->num_clients++;
843
844 return client;
845
846 error:
847 shutdown (fd, SHUT_RDWR);
848 close (fd);
849 return NULL;
850 }
851
852 static int
sp_shmbuf_dec(ShmPipe * self,ShmBuffer * buf,ShmBuffer * prev_buf,ShmClient * client,void ** tag)853 sp_shmbuf_dec (ShmPipe * self, ShmBuffer * buf, ShmBuffer * prev_buf,
854 ShmClient * client, void **tag)
855 {
856 int i;
857 int had_client = 0;
858
859 /* Remove client from the list of buffer users. Here we make sure that
860 * if a client closes connection but already decremented the use count
861 * for this buffer, but other clients didn't have time to decrement
862 * buffer will not be freed too early in sp_writer_close_client.
863 */
864 for (i = 0; i < buf->num_clients; i++) {
865 if (buf->clients[i] == client->fd) {
866 buf->clients[i] = -1;
867 had_client = 1;
868 break;
869 }
870 }
871 assert (had_client);
872
873 buf->use_count--;
874
875 if (buf->use_count == 0) {
876 /* Remove from linked list */
877 if (prev_buf)
878 prev_buf->next = buf->next;
879 else
880 self->buffers = buf->next;
881
882 if (tag)
883 *tag = buf->tag;
884 shm_alloc_space_block_dec (buf->ablock);
885 sp_shm_area_dec (self, buf->shm_area);
886 spalloc_free1 (sizeof (ShmBuffer) + sizeof (int) * buf->num_clients, buf);
887 return 0;
888 }
889 return 1;
890 }
891
892 void
sp_writer_close_client(ShmPipe * self,ShmClient * client,sp_buffer_free_callback callback,void * user_data)893 sp_writer_close_client (ShmPipe * self, ShmClient * client,
894 sp_buffer_free_callback callback, void *user_data)
895 {
896 ShmBuffer *buffer = NULL, *prev_buf = NULL;
897 ShmClient *item = NULL, *prev_item = NULL;
898
899 shutdown (client->fd, SHUT_RDWR);
900 close (client->fd);
901
902 again:
903 for (buffer = self->buffers; buffer; buffer = buffer->next) {
904 int i;
905 void *tag = NULL;
906
907 for (i = 0; i < buffer->num_clients; i++) {
908 if (buffer->clients[i] == client->fd) {
909 if (!sp_shmbuf_dec (self, buffer, prev_buf, client, &tag)) {
910 if (callback)
911 callback (tag, user_data);
912 goto again;
913 }
914 break;
915 }
916 }
917 prev_buf = buffer;
918 }
919
920 for (item = self->clients; item; item = item->next) {
921 if (item == client)
922 break;
923 prev_item = item;
924 }
925 assert (item);
926
927 if (prev_item)
928 prev_item->next = client->next;
929 else
930 self->clients = client->next;
931
932 self->num_clients--;
933
934 spalloc_free (ShmClient, client);
935 }
936
937 int
sp_get_fd(ShmPipe * self)938 sp_get_fd (ShmPipe * self)
939 {
940 return self->main_socket;
941 }
942
943 const char *
sp_get_shm_area_name(ShmPipe * self)944 sp_get_shm_area_name (ShmPipe * self)
945 {
946 if (self->shm_area)
947 return self->shm_area->shm_area_name;
948
949 return NULL;
950 }
951
952 int
sp_writer_get_client_fd(ShmClient * client)953 sp_writer_get_client_fd (ShmClient * client)
954 {
955 return client->fd;
956 }
957
958 int
sp_writer_pending_writes(ShmPipe * self)959 sp_writer_pending_writes (ShmPipe * self)
960 {
961 return (self->buffers != NULL);
962 }
963
964 const char *
sp_writer_get_path(ShmPipe * pipe)965 sp_writer_get_path (ShmPipe * pipe)
966 {
967 return pipe->socket_path;
968 }
969
970 ShmBuffer *
sp_writer_get_pending_buffers(ShmPipe * self)971 sp_writer_get_pending_buffers (ShmPipe * self)
972 {
973 return self->buffers;
974 }
975
976 ShmBuffer *
sp_writer_get_next_buffer(ShmBuffer * buffer)977 sp_writer_get_next_buffer (ShmBuffer * buffer)
978 {
979 return buffer->next;
980 }
981
982 void *
sp_writer_buf_get_tag(ShmBuffer * buffer)983 sp_writer_buf_get_tag (ShmBuffer * buffer)
984 {
985 return buffer->tag;
986 }
987
988 size_t
sp_writer_get_max_buf_size(ShmPipe * self)989 sp_writer_get_max_buf_size (ShmPipe * self)
990 {
991 if (self->shm_area == NULL)
992 return 0;
993
994 return self->shm_area->shm_area_len;
995 }
996