1 /* GStreamer
2 * Copyright (C) <2009> Collabora Ltd
3 * @author: Olivier Crete <olivier.crete@collabora.co.uk
4 * Copyright (C) <2009> Nokia Inc
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to deal
8 * in the Software without restriction, including without limitation the rights
9 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 * copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22 * THE SOFTWARE.
23 */
24
25
26 #ifdef HAVE_CONFIG_H
27 #include "config.h"
28 #endif
29
30 #ifdef HAVE_OSX
31 #ifndef MSG_NOSIGNAL
32 #define MSG_NOSIGNAL SO_NOSIGPIPE
33 #endif
34 #endif
35
36 #include "shmpipe.h"
37
38 #include <sys/types.h>
39 #include <sys/socket.h>
40 #include <sys/un.h>
41 #include <string.h>
42 #include <stdio.h>
43 #include <errno.h>
44 #include <unistd.h>
45 #include <fcntl.h>
46 #include <limits.h>
47 #include <sys/mman.h>
48 #include <assert.h>
49
50 #include "shmalloc.h"
51
52 /*
53 * The protocol over the pipe is in packets
54 *
55 * The defined types are:
56 * type 1: new shm area
57 * Area length
58 * Size of path (followed by path)
59 *
60 * type 2: Close shm area:
61 * No payload
62 *
63 * type 3: shm buffer
64 * offset
65 * bufsize
66 *
67 * type 4: ack buffer
68 * offset
69 *
70 * Type 4 goes from the client to the server
71 * The rest are from the server to the client
72 * The client should never write in the SHM
73 */
74
75
76 #define LISTEN_BACKLOG 10
77
78 enum
79 {
80 COMMAND_NEW_SHM_AREA = 1,
81 COMMAND_CLOSE_SHM_AREA = 2,
82 COMMAND_NEW_BUFFER = 3,
83 COMMAND_ACK_BUFFER = 4
84 };
85
86 typedef struct _ShmArea ShmArea;
87
88 struct _ShmArea
89 {
90 int id;
91
92 int use_count;
93 int is_writer;
94
95 int shm_fd;
96
97 char *shm_area_buf;
98 size_t shm_area_len;
99
100 char *shm_area_name;
101
102 ShmAllocSpace *allocspace;
103
104 ShmArea *next;
105 };
106
107 struct _ShmBuffer
108 {
109 int use_count;
110
111 ShmArea *shm_area;
112 unsigned long offset;
113 size_t size;
114
115 ShmAllocBlock *ablock;
116
117 ShmBuffer *next;
118
119 void *tag;
120
121 int num_clients;
122 /* This must ALWAYS stay last in the struct */
123 int clients[0];
124 };
125
126
127 struct _ShmPipe
128 {
129 int main_socket;
130 char *socket_path;
131 int use_count;
132 void *data;
133
134 ShmArea *shm_area;
135
136 int next_area_id;
137
138 ShmBuffer *buffers;
139
140 int num_clients;
141 ShmClient *clients;
142
143 mode_t perms;
144 };
145
146 struct _ShmClient
147 {
148 int fd;
149
150 ShmClient *next;
151 };
152
153 struct _ShmBlock
154 {
155 ShmPipe *pipe;
156 ShmArea *area;
157 ShmAllocBlock *ablock;
158 };
159
160 struct CommandBuffer
161 {
162 unsigned int type;
163 int area_id;
164
165 union
166 {
167 struct
168 {
169 size_t size;
170 unsigned int path_size;
171 /* Followed by path */
172 } new_shm_area;
173 struct
174 {
175 unsigned long offset;
176 unsigned long size;
177 } buffer;
178 struct
179 {
180 unsigned long offset;
181 } ack_buffer;
182 } payload;
183 };
184
185 static ShmArea *sp_open_shm (char *path, int id, mode_t perms, size_t size);
186 static void sp_close_shm (ShmArea * area);
187 static int sp_shmbuf_dec (ShmPipe * self, ShmBuffer * buf,
188 ShmBuffer * prev_buf, ShmClient * client, void **tag);
189 static void sp_shm_area_dec (ShmPipe * self, ShmArea * area);
190
191
192
193 #define RETURN_ERROR(format, ...) do { \
194 fprintf (stderr, format, __VA_ARGS__); \
195 sp_writer_close (self, NULL, NULL); \
196 return NULL; \
197 } while (0)
198
199 ShmPipe *
sp_writer_create(const char * path,size_t size,mode_t perms)200 sp_writer_create (const char *path, size_t size, mode_t perms)
201 {
202 ShmPipe *self = spalloc_new (ShmPipe);
203 int flags;
204 struct sockaddr_un sock_un;
205 int i = 0;
206
207 memset (self, 0, sizeof (ShmPipe));
208
209 self->main_socket = socket (PF_UNIX, SOCK_STREAM, 0);
210 self->use_count = 1;
211
212 if (self->main_socket < 0)
213 RETURN_ERROR ("Could not create socket (%d): %s\n", errno,
214 strerror (errno));
215
216 flags = fcntl (self->main_socket, F_GETFL, 0);
217 if (flags < 0)
218 RETURN_ERROR ("fcntl(F_GETFL) failed (%d): %s\n", errno, strerror (errno));
219
220 if (fcntl (self->main_socket, F_SETFL, flags | O_NONBLOCK | FD_CLOEXEC) < 0)
221 RETURN_ERROR ("fcntl(F_SETFL) failed (%d): %s\n", errno, strerror (errno));
222
223 sock_un.sun_family = AF_UNIX;
224 strncpy (sock_un.sun_path, path, sizeof (sock_un.sun_path) - 1);
225
226 while (bind (self->main_socket, (struct sockaddr *) &sock_un,
227 sizeof (struct sockaddr_un)) < 0) {
228 if (errno != EADDRINUSE)
229 RETURN_ERROR ("bind() failed (%d): %s\n", errno, strerror (errno));
230
231 if (i > 256)
232 RETURN_ERROR ("Could not find a free socket name for %s", path);
233
234 snprintf (sock_un.sun_path, sizeof (sock_un.sun_path), "%s.%d", path, i);
235 i++;
236 }
237
238 self->socket_path = strdup (sock_un.sun_path);
239
240 if (chmod (self->socket_path, perms) < 0)
241 RETURN_ERROR ("failed to set socket permissions (%d): %s\n", errno,
242 strerror (errno));
243
244 if (listen (self->main_socket, LISTEN_BACKLOG) < 0)
245 RETURN_ERROR ("listen() failed (%d): %s\n", errno, strerror (errno));
246
247 self->shm_area = sp_open_shm (NULL, ++self->next_area_id, perms, size);
248
249 self->perms = perms;
250
251 if (!self->shm_area)
252 RETURN_ERROR ("Could not open shm area (%d): %s", errno, strerror (errno));
253
254 return self;
255 }
256
257 #undef RETURN_ERROR
258
259 #define RETURN_ERROR(format, ...) do { \
260 fprintf (stderr, format, __VA_ARGS__); \
261 area->use_count--; \
262 sp_close_shm (area); \
263 return NULL; \
264 } while (0)
265
266 /**
267 * sp_open_shm:
268 * @path: Path of the shm area for a reader,
269 * NULL if this is a writer (then it will allocate its own path)
270 *
271 * Opens a ShmArea
272 */
273
274 static ShmArea *
sp_open_shm(char * path,int id,mode_t perms,size_t size)275 sp_open_shm (char *path, int id, mode_t perms, size_t size)
276 {
277 ShmArea *area = spalloc_new (ShmArea);
278 char tmppath[32];
279 int flags;
280 int prot;
281 int i = 0;
282
283 memset (area, 0, sizeof (ShmArea));
284
285 area->shm_area_buf = MAP_FAILED;
286 area->use_count = 1;
287
288 area->shm_area_len = size;
289
290 area->is_writer = (path == NULL);
291
292
293 if (path)
294 flags = O_RDONLY;
295 else
296 #ifdef HAVE_OSX
297 flags = O_RDWR | O_CREAT | O_EXCL;
298 #else
299 flags = O_RDWR | O_CREAT | O_TRUNC | O_EXCL;
300 #endif
301
302 area->shm_fd = -1;
303
304 if (path) {
305 area->shm_fd = shm_open (path, flags, perms);
306 } else {
307 do {
308 snprintf (tmppath, sizeof (tmppath), "/shmpipe.%5d.%5d", getpid (), i++);
309 area->shm_fd = shm_open (tmppath, flags, perms);
310 } while (area->shm_fd < 0 && errno == EEXIST);
311 }
312
313 if (area->shm_fd < 0)
314 RETURN_ERROR ("shm_open failed on %s (%d): %s\n",
315 path ? path : tmppath, errno, strerror (errno));
316
317 if (!path) {
318 area->shm_area_name = strdup (tmppath);
319
320 if (ftruncate (area->shm_fd, size))
321 RETURN_ERROR ("Could not resize memory area to header size,"
322 " ftruncate failed (%d): %s\n", errno, strerror (errno));
323
324 prot = PROT_READ | PROT_WRITE;
325 } else {
326 area->shm_area_name = strdup (path);
327 prot = PROT_READ;
328 }
329
330 area->shm_area_buf = mmap (NULL, size, prot, MAP_SHARED, area->shm_fd, 0);
331
332 if (area->shm_area_buf == MAP_FAILED)
333 RETURN_ERROR ("mmap failed (%d): %s\n", errno, strerror (errno));
334
335 area->id = id;
336
337 if (!path)
338 area->allocspace = shm_alloc_space_new (area->shm_area_len);
339
340 return area;
341 }
342
343 #undef RETURN_ERROR
344
345 static void
sp_close_shm(ShmArea * area)346 sp_close_shm (ShmArea * area)
347 {
348 assert (area->use_count == 0);
349
350 if (area->allocspace)
351 shm_alloc_space_free (area->allocspace);
352
353 if (area->shm_area_buf != MAP_FAILED)
354 munmap (area->shm_area_buf, area->shm_area_len);
355
356 if (area->shm_fd >= 0)
357 close (area->shm_fd);
358
359 if (area->shm_area_name) {
360 if (area->is_writer)
361 shm_unlink (area->shm_area_name);
362 free (area->shm_area_name);
363 }
364
365 spalloc_free (ShmArea, area);
366 }
367
368 static void
sp_shm_area_inc(ShmArea * area)369 sp_shm_area_inc (ShmArea * area)
370 {
371 area->use_count++;
372 }
373
374 static void
sp_shm_area_dec(ShmPipe * self,ShmArea * area)375 sp_shm_area_dec (ShmPipe * self, ShmArea * area)
376 {
377 assert (area->use_count > 0);
378 area->use_count--;
379
380 if (area->use_count == 0) {
381 ShmArea *item = NULL;
382 ShmArea *prev_item = NULL;
383
384 for (item = self->shm_area; item; item = item->next) {
385 if (item == area) {
386 if (prev_item)
387 prev_item->next = item->next;
388 else
389 self->shm_area = item->next;
390 break;
391 }
392 prev_item = item;
393 }
394 assert (item);
395
396 sp_close_shm (area);
397 }
398 }
399
400 void *
sp_get_data(ShmPipe * self)401 sp_get_data (ShmPipe * self)
402 {
403 return self->data;
404 }
405
406 void
sp_set_data(ShmPipe * self,void * data)407 sp_set_data (ShmPipe * self, void *data)
408 {
409 self->data = data;
410 }
411
412 static void
sp_inc(ShmPipe * self)413 sp_inc (ShmPipe * self)
414 {
415 self->use_count++;
416 }
417
418 static void
sp_dec(ShmPipe * self)419 sp_dec (ShmPipe * self)
420 {
421 self->use_count--;
422
423 if (self->use_count > 0)
424 return;
425
426 while (self->shm_area)
427 sp_shm_area_dec (self, self->shm_area);
428
429 spalloc_free (ShmPipe, self);
430 }
431
432 void
sp_writer_close(ShmPipe * self,sp_buffer_free_callback callback,void * user_data)433 sp_writer_close (ShmPipe * self, sp_buffer_free_callback callback,
434 void *user_data)
435 {
436 if (self->main_socket >= 0) {
437 shutdown (self->main_socket, SHUT_RDWR);
438 close (self->main_socket);
439 }
440
441 if (self->socket_path) {
442 unlink (self->socket_path);
443 free (self->socket_path);
444 }
445
446 while (self->clients)
447 sp_writer_close_client (self, self->clients, callback, user_data);
448
449 sp_dec (self);
450 }
451
452 void
sp_client_close(ShmPipe * self)453 sp_client_close (ShmPipe * self)
454 {
455 sp_writer_close (self, NULL, NULL);
456 }
457
458
459 int
sp_writer_setperms_shm(ShmPipe * self,mode_t perms)460 sp_writer_setperms_shm (ShmPipe * self, mode_t perms)
461 {
462 int ret = 0;
463 ShmArea *area;
464
465 self->perms = perms;
466 for (area = self->shm_area; area; area = area->next)
467 ret |= fchmod (area->shm_fd, perms);
468
469 ret |= chmod (self->socket_path, perms);
470
471 return ret;
472 }
473
474 static int
send_command(int fd,struct CommandBuffer * cb,unsigned short int type,int area_id)475 send_command (int fd, struct CommandBuffer *cb, unsigned short int type,
476 int area_id)
477 {
478 cb->type = type;
479 cb->area_id = area_id;
480
481 if (send (fd, cb, sizeof (struct CommandBuffer), MSG_NOSIGNAL) !=
482 sizeof (struct CommandBuffer))
483 return 0;
484
485 return 1;
486 }
487
488 int
sp_writer_resize(ShmPipe * self,size_t size)489 sp_writer_resize (ShmPipe * self, size_t size)
490 {
491 ShmArea *newarea;
492 ShmArea *old_current;
493 ShmClient *client;
494 int c = 0;
495 int pathlen;
496
497 if (self->shm_area->shm_area_len == size)
498 return 0;
499
500 newarea = sp_open_shm (NULL, ++self->next_area_id, self->perms, size);
501
502 if (!newarea)
503 return -1;
504
505 old_current = self->shm_area;
506 newarea->next = self->shm_area;
507 self->shm_area = newarea;
508
509 pathlen = strlen (newarea->shm_area_name) + 1;
510
511 for (client = self->clients; client; client = client->next) {
512 struct CommandBuffer cb = { 0 };
513
514 if (!send_command (client->fd, &cb, COMMAND_CLOSE_SHM_AREA,
515 old_current->id))
516 continue;
517
518 cb.payload.new_shm_area.size = newarea->shm_area_len;
519 cb.payload.new_shm_area.path_size = pathlen;
520 if (!send_command (client->fd, &cb, COMMAND_NEW_SHM_AREA, newarea->id))
521 continue;
522
523 if (send (client->fd, newarea->shm_area_name, pathlen, MSG_NOSIGNAL) !=
524 pathlen)
525 continue;
526 c++;
527 }
528
529 sp_shm_area_dec (self, old_current);
530
531
532 return c;
533 }
534
535 ShmBlock *
sp_writer_alloc_block(ShmPipe * self,size_t size)536 sp_writer_alloc_block (ShmPipe * self, size_t size)
537 {
538 ShmBlock *block;
539 ShmAllocBlock *ablock =
540 shm_alloc_space_alloc_block (self->shm_area->allocspace, size);
541
542 if (!ablock)
543 return NULL;
544
545 block = spalloc_new (ShmBlock);
546 sp_shm_area_inc (self->shm_area);
547 block->pipe = self;
548 block->area = self->shm_area;
549 block->ablock = ablock;
550 sp_inc (self);
551 return block;
552 }
553
554 char *
sp_writer_block_get_buf(ShmBlock * block)555 sp_writer_block_get_buf (ShmBlock * block)
556 {
557 return block->area->shm_area_buf +
558 shm_alloc_space_alloc_block_get_offset (block->ablock);
559 }
560
561 ShmPipe *
sp_writer_block_get_pipe(ShmBlock * block)562 sp_writer_block_get_pipe (ShmBlock * block)
563 {
564 return block->pipe;
565 }
566
567 void
sp_writer_free_block(ShmBlock * block)568 sp_writer_free_block (ShmBlock * block)
569 {
570 shm_alloc_space_block_dec (block->ablock);
571 sp_shm_area_dec (block->pipe, block->area);
572 sp_dec (block->pipe);
573 spalloc_free (ShmBlock, block);
574 }
575
576 /* Returns the number of client this has successfully been sent to */
577
578 int
sp_writer_send_buf(ShmPipe * self,char * buf,size_t size,void * tag)579 sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, void *tag)
580 {
581 ShmArea *area = NULL;
582 unsigned long offset = 0;
583 unsigned long bsize = size;
584 ShmBuffer *sb;
585 ShmClient *client = NULL;
586 ShmAllocBlock *ablock = NULL;
587 int i = 0;
588 int c = 0;
589
590 if (self->num_clients == 0)
591 return 0;
592
593 for (area = self->shm_area; area; area = area->next) {
594 if (buf >= area->shm_area_buf &&
595 buf < (area->shm_area_buf + area->shm_area_len)) {
596 offset = buf - area->shm_area_buf;
597 ablock = shm_alloc_space_block_get (area->allocspace, offset);
598 assert (ablock);
599 break;
600 }
601 }
602
603 if (!ablock)
604 return -1;
605
606 sb = spalloc_alloc (sizeof (ShmBuffer) + sizeof (int) * self->num_clients);
607 memset (sb, 0, sizeof (ShmBuffer));
608 memset (sb->clients, -1, sizeof (int) * self->num_clients);
609 sb->shm_area = area;
610 sb->offset = offset;
611 sb->size = size;
612 sb->num_clients = self->num_clients;
613 sb->ablock = ablock;
614 sb->tag = tag;
615
616 for (client = self->clients; client; client = client->next) {
617 struct CommandBuffer cb = { 0 };
618 cb.payload.buffer.offset = offset;
619 cb.payload.buffer.size = bsize;
620 if (!send_command (client->fd, &cb, COMMAND_NEW_BUFFER, self->shm_area->id))
621 continue;
622 sb->clients[i++] = client->fd;
623 c++;
624 }
625
626 if (c == 0) {
627 spalloc_free1 (sizeof (ShmBuffer) + sizeof (int) * sb->num_clients, sb);
628 return 0;
629 }
630
631 sp_shm_area_inc (area);
632 shm_alloc_space_block_inc (ablock);
633
634 sb->use_count = c;
635
636 sb->next = self->buffers;
637 self->buffers = sb;
638
639 return c;
640 }
641
642 static int
recv_command(int fd,struct CommandBuffer * cb)643 recv_command (int fd, struct CommandBuffer *cb)
644 {
645 int retval;
646
647 retval = recv (fd, cb, sizeof (struct CommandBuffer), MSG_DONTWAIT);
648 if (retval == sizeof (struct CommandBuffer)) {
649 return 1;
650 } else {
651 return 0;
652 }
653 }
654
655 long int
sp_client_recv(ShmPipe * self,char ** buf)656 sp_client_recv (ShmPipe * self, char **buf)
657 {
658 char *area_name = NULL;
659 ShmArea *newarea;
660 ShmArea *area;
661 struct CommandBuffer cb;
662 int retval;
663
664 if (!recv_command (self->main_socket, &cb))
665 return -1;
666
667 switch (cb.type) {
668 case COMMAND_NEW_SHM_AREA:
669 assert (cb.payload.new_shm_area.path_size > 0);
670 assert (cb.payload.new_shm_area.size > 0);
671
672 area_name = malloc (cb.payload.new_shm_area.path_size + 1);
673 retval = recv (self->main_socket, area_name,
674 cb.payload.new_shm_area.path_size, 0);
675 if (retval != cb.payload.new_shm_area.path_size) {
676 free (area_name);
677 return -3;
678 }
679 /* Ensure area_name is NULL terminated */
680 area_name[retval] = 0;
681
682 newarea = sp_open_shm (area_name, cb.area_id, 0,
683 cb.payload.new_shm_area.size);
684 free (area_name);
685 if (!newarea)
686 return -4;
687
688 newarea->next = self->shm_area;
689 self->shm_area = newarea;
690 break;
691
692 case COMMAND_CLOSE_SHM_AREA:
693 for (area = self->shm_area; area; area = area->next) {
694 if (area->id == cb.area_id) {
695 sp_shm_area_dec (self, area);
696 break;
697 }
698 }
699 break;
700
701 case COMMAND_NEW_BUFFER:
702 assert (buf);
703 for (area = self->shm_area; area; area = area->next) {
704 if (area->id == cb.area_id) {
705 *buf = area->shm_area_buf + cb.payload.buffer.offset;
706 sp_shm_area_inc (area);
707 return cb.payload.buffer.size;
708 }
709 }
710 return -23;
711
712 default:
713 return -99;
714 }
715
716 return 0;
717 }
718
719 int
sp_writer_recv(ShmPipe * self,ShmClient * client,void ** tag)720 sp_writer_recv (ShmPipe * self, ShmClient * client, void **tag)
721 {
722 ShmBuffer *buf = NULL, *prev_buf = NULL;
723 struct CommandBuffer cb;
724
725 if (!recv_command (client->fd, &cb))
726 return -1;
727
728 switch (cb.type) {
729 case COMMAND_ACK_BUFFER:
730
731 for (buf = self->buffers; buf; buf = buf->next) {
732 if (buf->shm_area->id == cb.area_id &&
733 buf->offset == cb.payload.ack_buffer.offset) {
734 return sp_shmbuf_dec (self, buf, prev_buf, client, tag);
735 }
736 prev_buf = buf;
737 }
738
739 return -2;
740 default:
741 return -99;
742 }
743
744 return 0;
745 }
746
747 int
sp_client_recv_finish(ShmPipe * self,char * buf)748 sp_client_recv_finish (ShmPipe * self, char *buf)
749 {
750 ShmArea *shm_area = NULL;
751 unsigned long offset;
752 struct CommandBuffer cb = { 0 };
753
754 for (shm_area = self->shm_area; shm_area; shm_area = shm_area->next) {
755 if (buf >= shm_area->shm_area_buf &&
756 buf < shm_area->shm_area_buf + shm_area->shm_area_len)
757 break;
758 }
759
760 assert (shm_area);
761
762 offset = buf - shm_area->shm_area_buf;
763
764 sp_shm_area_dec (self, shm_area);
765
766 cb.payload.ack_buffer.offset = offset;
767 return send_command (self->main_socket, &cb, COMMAND_ACK_BUFFER,
768 self->shm_area->id);
769 }
770
771 ShmPipe *
sp_client_open(const char * path)772 sp_client_open (const char *path)
773 {
774 ShmPipe *self = spalloc_new (ShmPipe);
775 struct sockaddr_un sock_un;
776 int flags;
777
778 memset (self, 0, sizeof (ShmPipe));
779
780 self->main_socket = socket (PF_UNIX, SOCK_STREAM, 0);
781 self->use_count = 1;
782
783 if (self->main_socket < 0)
784 goto error;
785
786 flags = fcntl (self->main_socket, F_GETFL, 0);
787 if (flags < 0)
788 goto error;
789
790 if (fcntl (self->main_socket, F_SETFL, flags | FD_CLOEXEC) < 0)
791 goto error;
792
793 sock_un.sun_family = AF_UNIX;
794 strncpy (sock_un.sun_path, path, sizeof (sock_un.sun_path) - 1);
795
796 if (connect (self->main_socket, (struct sockaddr *) &sock_un,
797 sizeof (struct sockaddr_un)) < 0)
798 goto error;
799
800 return self;
801
802 error:
803 sp_client_close (self);
804 return NULL;
805 }
806
807
808 ShmClient *
sp_writer_accept_client(ShmPipe * self)809 sp_writer_accept_client (ShmPipe * self)
810 {
811 ShmClient *client = NULL;
812 int fd;
813 struct CommandBuffer cb = { 0 };
814 int pathlen = strlen (self->shm_area->shm_area_name) + 1;
815
816
817 fd = accept (self->main_socket, NULL, NULL);
818
819 if (fd < 0) {
820 fprintf (stderr, "Could not client connection");
821 return NULL;
822 }
823
824 cb.payload.new_shm_area.size = self->shm_area->shm_area_len;
825 cb.payload.new_shm_area.path_size = pathlen;
826 if (!send_command (fd, &cb, COMMAND_NEW_SHM_AREA, self->shm_area->id)) {
827 fprintf (stderr, "Sending new shm area failed: %s", strerror (errno));
828 goto error;
829 }
830
831 if (send (fd, self->shm_area->shm_area_name, pathlen, MSG_NOSIGNAL) !=
832 pathlen) {
833 fprintf (stderr, "Sending new shm area path failed: %s", strerror (errno));
834 goto error;
835 }
836
837 client = spalloc_new (ShmClient);
838 client->fd = fd;
839
840 /* Prepend ot linked list */
841 client->next = self->clients;
842 self->clients = client;
843 self->num_clients++;
844
845 return client;
846
847 error:
848 shutdown (fd, SHUT_RDWR);
849 close (fd);
850 return NULL;
851 }
852
853 static int
sp_shmbuf_dec(ShmPipe * self,ShmBuffer * buf,ShmBuffer * prev_buf,ShmClient * client,void ** tag)854 sp_shmbuf_dec (ShmPipe * self, ShmBuffer * buf, ShmBuffer * prev_buf,
855 ShmClient * client, void **tag)
856 {
857 int i;
858 int had_client = 0;
859
860 /**
861 * Remove client from the list of buffer users. Here we make sure that
862 * if a client closes connection but already decremented the use count
863 * for this buffer, but other clients didn't have time to decrement
864 * buffer will not be freed too early in sp_writer_close_client.
865 */
866 for (i = 0; i < buf->num_clients; i++) {
867 if (buf->clients[i] == client->fd) {
868 buf->clients[i] = -1;
869 had_client = 1;
870 break;
871 }
872 }
873 assert (had_client);
874
875 buf->use_count--;
876
877 if (buf->use_count == 0) {
878 /* Remove from linked list */
879 if (prev_buf)
880 prev_buf->next = buf->next;
881 else
882 self->buffers = buf->next;
883
884 if (tag)
885 *tag = buf->tag;
886 shm_alloc_space_block_dec (buf->ablock);
887 sp_shm_area_dec (self, buf->shm_area);
888 spalloc_free1 (sizeof (ShmBuffer) + sizeof (int) * buf->num_clients, buf);
889 return 0;
890 }
891 return 1;
892 }
893
894 void
sp_writer_close_client(ShmPipe * self,ShmClient * client,sp_buffer_free_callback callback,void * user_data)895 sp_writer_close_client (ShmPipe * self, ShmClient * client,
896 sp_buffer_free_callback callback, void *user_data)
897 {
898 ShmBuffer *buffer = NULL, *prev_buf = NULL;
899 ShmClient *item = NULL, *prev_item = NULL;
900
901 shutdown (client->fd, SHUT_RDWR);
902 close (client->fd);
903
904 again:
905 for (buffer = self->buffers; buffer; buffer = buffer->next) {
906 int i;
907 void *tag = NULL;
908
909 for (i = 0; i < buffer->num_clients; i++) {
910 if (buffer->clients[i] == client->fd) {
911 if (!sp_shmbuf_dec (self, buffer, prev_buf, client, &tag)) {
912 if (callback)
913 callback (tag, user_data);
914 goto again;
915 }
916 break;
917 }
918 }
919 prev_buf = buffer;
920 }
921
922 for (item = self->clients; item; item = item->next) {
923 if (item == client)
924 break;
925 prev_item = item;
926 }
927 assert (item);
928
929 if (prev_item)
930 prev_item->next = client->next;
931 else
932 self->clients = client->next;
933
934 self->num_clients--;
935
936 spalloc_free (ShmClient, client);
937 }
938
939 int
sp_get_fd(ShmPipe * self)940 sp_get_fd (ShmPipe * self)
941 {
942 return self->main_socket;
943 }
944
945 const char *
sp_get_shm_area_name(ShmPipe * self)946 sp_get_shm_area_name (ShmPipe * self)
947 {
948 if (self->shm_area)
949 return self->shm_area->shm_area_name;
950
951 return NULL;
952 }
953
954 int
sp_writer_get_client_fd(ShmClient * client)955 sp_writer_get_client_fd (ShmClient * client)
956 {
957 return client->fd;
958 }
959
960 int
sp_writer_pending_writes(ShmPipe * self)961 sp_writer_pending_writes (ShmPipe * self)
962 {
963 return (self->buffers != NULL);
964 }
965
966 const char *
sp_writer_get_path(ShmPipe * pipe)967 sp_writer_get_path (ShmPipe * pipe)
968 {
969 return pipe->socket_path;
970 }
971
972 ShmBuffer *
sp_writer_get_pending_buffers(ShmPipe * self)973 sp_writer_get_pending_buffers (ShmPipe * self)
974 {
975 return self->buffers;
976 }
977
978 ShmBuffer *
sp_writer_get_next_buffer(ShmBuffer * buffer)979 sp_writer_get_next_buffer (ShmBuffer * buffer)
980 {
981 return buffer->next;
982 }
983
984 void *
sp_writer_buf_get_tag(ShmBuffer * buffer)985 sp_writer_buf_get_tag (ShmBuffer * buffer)
986 {
987 return buffer->tag;
988 }
989
990 size_t
sp_writer_get_max_buf_size(ShmPipe * self)991 sp_writer_get_max_buf_size (ShmPipe * self)
992 {
993 if (self->shm_area == NULL)
994 return 0;
995
996 return self->shm_area->shm_area_len;
997 }
998