• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) <2009> Collabora Ltd
3  *  @author: Olivier Crete <olivier.crete@collabora.co.uk
4  * Copyright (C) <2009> Nokia Inc
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22  * THE SOFTWARE.
23  */
24 
25 
26 #ifdef HAVE_CONFIG_H
27 #include "config.h"
28 #endif
29 
30 #ifdef HAVE_OSX
31 #ifndef MSG_NOSIGNAL
32 #define MSG_NOSIGNAL SO_NOSIGPIPE
33 #endif
34 #endif
35 
36 #include "shmpipe.h"
37 
38 #include <sys/types.h>
39 #include <sys/socket.h>
40 #include <sys/un.h>
41 #include <string.h>
42 #include <stdio.h>
43 #include <errno.h>
44 #include <unistd.h>
45 #include <fcntl.h>
46 #include <limits.h>
47 #include <sys/mman.h>
48 #include <assert.h>
49 
50 #include "shmalloc.h"
51 
52 /*
53  * The protocol over the pipe is in packets
54  *
55  * The defined types are:
56  * type 1: new shm area
57  * Area length
58  * Size of path (followed by path)
59  *
60  * type 2: Close shm area:
61  * No payload
62  *
63  * type 3: shm buffer
64  * offset
65  * bufsize
66  *
67  * type 4: ack buffer
68  * offset
69  *
70  * Type 4 goes from the client to the server
71  * The rest are from the server to the client
72  * The client should never write in the SHM
73  */
74 
75 
76 #define LISTEN_BACKLOG 10
77 
78 enum
79 {
80   COMMAND_NEW_SHM_AREA = 1,
81   COMMAND_CLOSE_SHM_AREA = 2,
82   COMMAND_NEW_BUFFER = 3,
83   COMMAND_ACK_BUFFER = 4
84 };
85 
86 typedef struct _ShmArea ShmArea;
87 
88 struct _ShmArea
89 {
90   int id;
91 
92   int use_count;
93   int is_writer;
94 
95   int shm_fd;
96 
97   char *shm_area_buf;
98   size_t shm_area_len;
99 
100   char *shm_area_name;
101 
102   ShmAllocSpace *allocspace;
103 
104   ShmArea *next;
105 };
106 
107 struct _ShmBuffer
108 {
109   int use_count;
110 
111   ShmArea *shm_area;
112   unsigned long offset;
113   size_t size;
114 
115   ShmAllocBlock *ablock;
116 
117   ShmBuffer *next;
118 
119   void *tag;
120 
121   int num_clients;
122   /* This must ALWAYS stay last in the struct */
123   int clients[0];
124 };
125 
126 
127 struct _ShmPipe
128 {
129   int main_socket;
130   char *socket_path;
131   int use_count;
132   void *data;
133 
134   ShmArea *shm_area;
135 
136   int next_area_id;
137 
138   ShmBuffer *buffers;
139 
140   int num_clients;
141   ShmClient *clients;
142 
143   mode_t perms;
144 };
145 
146 struct _ShmClient
147 {
148   int fd;
149 
150   ShmClient *next;
151 };
152 
153 struct _ShmBlock
154 {
155   ShmPipe *pipe;
156   ShmArea *area;
157   ShmAllocBlock *ablock;
158 };
159 
160 struct CommandBuffer
161 {
162   unsigned int type;
163   int area_id;
164 
165   union
166   {
167     struct
168     {
169       size_t size;
170       unsigned int path_size;
171       /* Followed by path */
172     } new_shm_area;
173     struct
174     {
175       unsigned long offset;
176       unsigned long size;
177     } buffer;
178     struct
179     {
180       unsigned long offset;
181     } ack_buffer;
182   } payload;
183 };
184 
185 static ShmArea *sp_open_shm (char *path, int id, mode_t perms, size_t size);
186 static void sp_close_shm (ShmArea * area);
187 static int sp_shmbuf_dec (ShmPipe * self, ShmBuffer * buf,
188     ShmBuffer * prev_buf, ShmClient * client, void **tag);
189 static void sp_shm_area_dec (ShmPipe * self, ShmArea * area);
190 
191 
192 
193 #define RETURN_ERROR(format, ...) do {                  \
194   fprintf (stderr, format, __VA_ARGS__);                \
195   sp_writer_close (self, NULL, NULL);                   \
196   return NULL;                                          \
197   } while (0)
198 
199 ShmPipe *
sp_writer_create(const char * path,size_t size,mode_t perms)200 sp_writer_create (const char *path, size_t size, mode_t perms)
201 {
202   ShmPipe *self = spalloc_new (ShmPipe);
203   int flags;
204   struct sockaddr_un sock_un;
205   int i = 0;
206 
207   memset (self, 0, sizeof (ShmPipe));
208 
209   self->main_socket = socket (PF_UNIX, SOCK_STREAM, 0);
210   self->use_count = 1;
211 
212   if (self->main_socket < 0)
213     RETURN_ERROR ("Could not create socket (%d): %s\n", errno,
214         strerror (errno));
215 
216   flags = fcntl (self->main_socket, F_GETFL, 0);
217   if (flags < 0)
218     RETURN_ERROR ("fcntl(F_GETFL) failed (%d): %s\n", errno, strerror (errno));
219 
220   if (fcntl (self->main_socket, F_SETFL, flags | O_NONBLOCK | FD_CLOEXEC) < 0)
221     RETURN_ERROR ("fcntl(F_SETFL) failed (%d): %s\n", errno, strerror (errno));
222 
223   sock_un.sun_family = AF_UNIX;
224   strncpy (sock_un.sun_path, path, sizeof (sock_un.sun_path) - 1);
225 
226   while (bind (self->main_socket, (struct sockaddr *) &sock_un,
227           sizeof (struct sockaddr_un)) < 0) {
228     if (errno != EADDRINUSE)
229       RETURN_ERROR ("bind() failed (%d): %s\n", errno, strerror (errno));
230 
231     if (i > 256)
232       RETURN_ERROR ("Could not find a free socket name for %s", path);
233 
234     snprintf (sock_un.sun_path, sizeof (sock_un.sun_path), "%s.%d", path, i);
235     i++;
236   }
237 
238   self->socket_path = strdup (sock_un.sun_path);
239 
240   if (chmod (self->socket_path, perms) < 0)
241     RETURN_ERROR ("failed to set socket permissions (%d): %s\n", errno,
242         strerror (errno));
243 
244   if (listen (self->main_socket, LISTEN_BACKLOG) < 0)
245     RETURN_ERROR ("listen() failed (%d): %s\n", errno, strerror (errno));
246 
247   self->shm_area = sp_open_shm (NULL, ++self->next_area_id, perms, size);
248 
249   self->perms = perms;
250 
251   if (!self->shm_area)
252     RETURN_ERROR ("Could not open shm area (%d): %s", errno, strerror (errno));
253 
254   return self;
255 }
256 
257 #undef RETURN_ERROR
258 
259 #define RETURN_ERROR(format, ...)  do {                   \
260   fprintf (stderr, format, __VA_ARGS__);                  \
261   area->use_count--;                                      \
262   sp_close_shm (area);                                    \
263   return NULL;                                            \
264   } while (0)
265 
266 /**
267  * sp_open_shm:
268  * @path: Path of the shm area for a reader,
269  *  NULL if this is a writer (then it will allocate its own path)
270  *
271  * Opens a ShmArea
272  */
273 
274 static ShmArea *
sp_open_shm(char * path,int id,mode_t perms,size_t size)275 sp_open_shm (char *path, int id, mode_t perms, size_t size)
276 {
277   ShmArea *area = spalloc_new (ShmArea);
278   char tmppath[32];
279   int flags;
280   int prot;
281   int i = 0;
282 
283   memset (area, 0, sizeof (ShmArea));
284 
285   area->shm_area_buf = MAP_FAILED;
286   area->use_count = 1;
287 
288   area->shm_area_len = size;
289 
290   area->is_writer = (path == NULL);
291 
292 
293   if (path)
294     flags = O_RDONLY;
295   else
296 #ifdef HAVE_OSX
297     flags = O_RDWR | O_CREAT | O_EXCL;
298 #else
299     flags = O_RDWR | O_CREAT | O_TRUNC | O_EXCL;
300 #endif
301 
302   area->shm_fd = -1;
303 
304   if (path) {
305     area->shm_fd = shm_open (path, flags, perms);
306   } else {
307     do {
308       snprintf (tmppath, sizeof (tmppath), "/shmpipe.%5d.%5d", getpid (), i++);
309       area->shm_fd = shm_open (tmppath, flags, perms);
310     } while (area->shm_fd < 0 && errno == EEXIST);
311   }
312 
313   if (area->shm_fd < 0)
314     RETURN_ERROR ("shm_open failed on %s (%d): %s\n",
315         path ? path : tmppath, errno, strerror (errno));
316 
317   if (!path) {
318     area->shm_area_name = strdup (tmppath);
319 
320     if (ftruncate (area->shm_fd, size))
321       RETURN_ERROR ("Could not resize memory area to header size,"
322           " ftruncate failed (%d): %s\n", errno, strerror (errno));
323 
324     prot = PROT_READ | PROT_WRITE;
325   } else {
326     area->shm_area_name = strdup (path);
327     prot = PROT_READ;
328   }
329 
330   area->shm_area_buf = mmap (NULL, size, prot, MAP_SHARED, area->shm_fd, 0);
331 
332   if (area->shm_area_buf == MAP_FAILED)
333     RETURN_ERROR ("mmap failed (%d): %s\n", errno, strerror (errno));
334 
335   area->id = id;
336 
337   if (!path)
338     area->allocspace = shm_alloc_space_new (area->shm_area_len);
339 
340   return area;
341 }
342 
343 #undef RETURN_ERROR
344 
345 static void
sp_close_shm(ShmArea * area)346 sp_close_shm (ShmArea * area)
347 {
348   assert (area->use_count == 0);
349 
350   if (area->allocspace)
351     shm_alloc_space_free (area->allocspace);
352 
353   if (area->shm_area_buf != MAP_FAILED)
354     munmap (area->shm_area_buf, area->shm_area_len);
355 
356   if (area->shm_fd >= 0)
357     close (area->shm_fd);
358 
359   if (area->shm_area_name) {
360     if (area->is_writer)
361       shm_unlink (area->shm_area_name);
362     free (area->shm_area_name);
363   }
364 
365   spalloc_free (ShmArea, area);
366 }
367 
368 static void
sp_shm_area_inc(ShmArea * area)369 sp_shm_area_inc (ShmArea * area)
370 {
371   area->use_count++;
372 }
373 
374 static void
sp_shm_area_dec(ShmPipe * self,ShmArea * area)375 sp_shm_area_dec (ShmPipe * self, ShmArea * area)
376 {
377   assert (area->use_count > 0);
378   area->use_count--;
379 
380   if (area->use_count == 0) {
381     ShmArea *item = NULL;
382     ShmArea *prev_item = NULL;
383 
384     for (item = self->shm_area; item; item = item->next) {
385       if (item == area) {
386         if (prev_item)
387           prev_item->next = item->next;
388         else
389           self->shm_area = item->next;
390         break;
391       }
392       prev_item = item;
393     }
394     assert (item);
395 
396     sp_close_shm (area);
397   }
398 }
399 
400 void *
sp_get_data(ShmPipe * self)401 sp_get_data (ShmPipe * self)
402 {
403   return self->data;
404 }
405 
406 void
sp_set_data(ShmPipe * self,void * data)407 sp_set_data (ShmPipe * self, void *data)
408 {
409   self->data = data;
410 }
411 
412 static void
sp_inc(ShmPipe * self)413 sp_inc (ShmPipe * self)
414 {
415   self->use_count++;
416 }
417 
418 static void
sp_dec(ShmPipe * self)419 sp_dec (ShmPipe * self)
420 {
421   self->use_count--;
422 
423   if (self->use_count > 0)
424     return;
425 
426   while (self->shm_area)
427     sp_shm_area_dec (self, self->shm_area);
428 
429   spalloc_free (ShmPipe, self);
430 }
431 
432 void
sp_writer_close(ShmPipe * self,sp_buffer_free_callback callback,void * user_data)433 sp_writer_close (ShmPipe * self, sp_buffer_free_callback callback,
434     void *user_data)
435 {
436   if (self->main_socket >= 0) {
437     shutdown (self->main_socket, SHUT_RDWR);
438     close (self->main_socket);
439   }
440 
441   if (self->socket_path) {
442     unlink (self->socket_path);
443     free (self->socket_path);
444   }
445 
446   while (self->clients)
447     sp_writer_close_client (self, self->clients, callback, user_data);
448 
449   sp_dec (self);
450 }
451 
452 void
sp_client_close(ShmPipe * self)453 sp_client_close (ShmPipe * self)
454 {
455   sp_writer_close (self, NULL, NULL);
456 }
457 
458 
459 int
sp_writer_setperms_shm(ShmPipe * self,mode_t perms)460 sp_writer_setperms_shm (ShmPipe * self, mode_t perms)
461 {
462   int ret = 0;
463   ShmArea *area;
464 
465   self->perms = perms;
466   for (area = self->shm_area; area; area = area->next)
467     ret |= fchmod (area->shm_fd, perms);
468 
469   ret |= chmod (self->socket_path, perms);
470 
471   return ret;
472 }
473 
474 static int
send_command(int fd,struct CommandBuffer * cb,unsigned short int type,int area_id)475 send_command (int fd, struct CommandBuffer *cb, unsigned short int type,
476     int area_id)
477 {
478   cb->type = type;
479   cb->area_id = area_id;
480 
481   if (send (fd, cb, sizeof (struct CommandBuffer), MSG_NOSIGNAL) !=
482       sizeof (struct CommandBuffer))
483     return 0;
484 
485   return 1;
486 }
487 
488 int
sp_writer_resize(ShmPipe * self,size_t size)489 sp_writer_resize (ShmPipe * self, size_t size)
490 {
491   ShmArea *newarea;
492   ShmArea *old_current;
493   ShmClient *client;
494   int c = 0;
495   int pathlen;
496 
497   if (self->shm_area->shm_area_len == size)
498     return 0;
499 
500   newarea = sp_open_shm (NULL, ++self->next_area_id, self->perms, size);
501 
502   if (!newarea)
503     return -1;
504 
505   old_current = self->shm_area;
506   newarea->next = self->shm_area;
507   self->shm_area = newarea;
508 
509   pathlen = strlen (newarea->shm_area_name) + 1;
510 
511   for (client = self->clients; client; client = client->next) {
512     struct CommandBuffer cb = { 0 };
513 
514     if (!send_command (client->fd, &cb, COMMAND_CLOSE_SHM_AREA,
515             old_current->id))
516       continue;
517 
518     cb.payload.new_shm_area.size = newarea->shm_area_len;
519     cb.payload.new_shm_area.path_size = pathlen;
520     if (!send_command (client->fd, &cb, COMMAND_NEW_SHM_AREA, newarea->id))
521       continue;
522 
523     if (send (client->fd, newarea->shm_area_name, pathlen, MSG_NOSIGNAL) !=
524         pathlen)
525       continue;
526     c++;
527   }
528 
529   sp_shm_area_dec (self, old_current);
530 
531 
532   return c;
533 }
534 
535 ShmBlock *
sp_writer_alloc_block(ShmPipe * self,size_t size)536 sp_writer_alloc_block (ShmPipe * self, size_t size)
537 {
538   ShmBlock *block;
539   ShmAllocBlock *ablock =
540       shm_alloc_space_alloc_block (self->shm_area->allocspace, size);
541 
542   if (!ablock)
543     return NULL;
544 
545   block = spalloc_new (ShmBlock);
546   sp_shm_area_inc (self->shm_area);
547   block->pipe = self;
548   block->area = self->shm_area;
549   block->ablock = ablock;
550   sp_inc (self);
551   return block;
552 }
553 
554 char *
sp_writer_block_get_buf(ShmBlock * block)555 sp_writer_block_get_buf (ShmBlock * block)
556 {
557   return block->area->shm_area_buf +
558       shm_alloc_space_alloc_block_get_offset (block->ablock);
559 }
560 
561 ShmPipe *
sp_writer_block_get_pipe(ShmBlock * block)562 sp_writer_block_get_pipe (ShmBlock * block)
563 {
564   return block->pipe;
565 }
566 
567 void
sp_writer_free_block(ShmBlock * block)568 sp_writer_free_block (ShmBlock * block)
569 {
570   shm_alloc_space_block_dec (block->ablock);
571   sp_shm_area_dec (block->pipe, block->area);
572   sp_dec (block->pipe);
573   spalloc_free (ShmBlock, block);
574 }
575 
576 /* Returns the number of client this has successfully been sent to */
577 
578 int
sp_writer_send_buf(ShmPipe * self,char * buf,size_t size,void * tag)579 sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, void *tag)
580 {
581   ShmArea *area = NULL;
582   unsigned long offset = 0;
583   unsigned long bsize = size;
584   ShmBuffer *sb;
585   ShmClient *client = NULL;
586   ShmAllocBlock *ablock = NULL;
587   int i = 0;
588   int c = 0;
589 
590   if (self->num_clients == 0)
591     return 0;
592 
593   for (area = self->shm_area; area; area = area->next) {
594     if (buf >= area->shm_area_buf &&
595         buf < (area->shm_area_buf + area->shm_area_len)) {
596       offset = buf - area->shm_area_buf;
597       ablock = shm_alloc_space_block_get (area->allocspace, offset);
598       assert (ablock);
599       break;
600     }
601   }
602 
603   if (!ablock)
604     return -1;
605 
606   sb = spalloc_alloc (sizeof (ShmBuffer) + sizeof (int) * self->num_clients);
607   memset (sb, 0, sizeof (ShmBuffer));
608   memset (sb->clients, -1, sizeof (int) * self->num_clients);
609   sb->shm_area = area;
610   sb->offset = offset;
611   sb->size = size;
612   sb->num_clients = self->num_clients;
613   sb->ablock = ablock;
614   sb->tag = tag;
615 
616   for (client = self->clients; client; client = client->next) {
617     struct CommandBuffer cb = { 0 };
618     cb.payload.buffer.offset = offset;
619     cb.payload.buffer.size = bsize;
620     if (!send_command (client->fd, &cb, COMMAND_NEW_BUFFER, self->shm_area->id))
621       continue;
622     sb->clients[i++] = client->fd;
623     c++;
624   }
625 
626   if (c == 0) {
627     spalloc_free1 (sizeof (ShmBuffer) + sizeof (int) * sb->num_clients, sb);
628     return 0;
629   }
630 
631   sp_shm_area_inc (area);
632   shm_alloc_space_block_inc (ablock);
633 
634   sb->use_count = c;
635 
636   sb->next = self->buffers;
637   self->buffers = sb;
638 
639   return c;
640 }
641 
642 static int
recv_command(int fd,struct CommandBuffer * cb)643 recv_command (int fd, struct CommandBuffer *cb)
644 {
645   int retval;
646 
647   retval = recv (fd, cb, sizeof (struct CommandBuffer), MSG_DONTWAIT);
648   if (retval == sizeof (struct CommandBuffer)) {
649     return 1;
650   } else {
651     return 0;
652   }
653 }
654 
655 long int
sp_client_recv(ShmPipe * self,char ** buf)656 sp_client_recv (ShmPipe * self, char **buf)
657 {
658   char *area_name = NULL;
659   ShmArea *newarea;
660   ShmArea *area;
661   struct CommandBuffer cb;
662   int retval;
663 
664   if (!recv_command (self->main_socket, &cb))
665     return -1;
666 
667   switch (cb.type) {
668     case COMMAND_NEW_SHM_AREA:
669       assert (cb.payload.new_shm_area.path_size > 0);
670       assert (cb.payload.new_shm_area.size > 0);
671 
672       area_name = malloc (cb.payload.new_shm_area.path_size + 1);
673       retval = recv (self->main_socket, area_name,
674           cb.payload.new_shm_area.path_size, 0);
675       if (retval != cb.payload.new_shm_area.path_size) {
676         free (area_name);
677         return -3;
678       }
679       /* Ensure area_name is NULL terminated */
680       area_name[retval] = 0;
681 
682       newarea = sp_open_shm (area_name, cb.area_id, 0,
683           cb.payload.new_shm_area.size);
684       free (area_name);
685       if (!newarea)
686         return -4;
687 
688       newarea->next = self->shm_area;
689       self->shm_area = newarea;
690       break;
691 
692     case COMMAND_CLOSE_SHM_AREA:
693       for (area = self->shm_area; area; area = area->next) {
694         if (area->id == cb.area_id) {
695           sp_shm_area_dec (self, area);
696           break;
697         }
698       }
699       break;
700 
701     case COMMAND_NEW_BUFFER:
702       assert (buf);
703       for (area = self->shm_area; area; area = area->next) {
704         if (area->id == cb.area_id) {
705           *buf = area->shm_area_buf + cb.payload.buffer.offset;
706           sp_shm_area_inc (area);
707           return cb.payload.buffer.size;
708         }
709       }
710       return -23;
711 
712     default:
713       return -99;
714   }
715 
716   return 0;
717 }
718 
719 int
sp_writer_recv(ShmPipe * self,ShmClient * client,void ** tag)720 sp_writer_recv (ShmPipe * self, ShmClient * client, void **tag)
721 {
722   ShmBuffer *buf = NULL, *prev_buf = NULL;
723   struct CommandBuffer cb;
724 
725   if (!recv_command (client->fd, &cb))
726     return -1;
727 
728   switch (cb.type) {
729     case COMMAND_ACK_BUFFER:
730 
731       for (buf = self->buffers; buf; buf = buf->next) {
732         if (buf->shm_area->id == cb.area_id &&
733             buf->offset == cb.payload.ack_buffer.offset) {
734           return sp_shmbuf_dec (self, buf, prev_buf, client, tag);
735         }
736         prev_buf = buf;
737       }
738 
739       return -2;
740     default:
741       return -99;
742   }
743 
744   return 0;
745 }
746 
747 int
sp_client_recv_finish(ShmPipe * self,char * buf)748 sp_client_recv_finish (ShmPipe * self, char *buf)
749 {
750   ShmArea *shm_area = NULL;
751   unsigned long offset;
752   struct CommandBuffer cb = { 0 };
753 
754   for (shm_area = self->shm_area; shm_area; shm_area = shm_area->next) {
755     if (buf >= shm_area->shm_area_buf &&
756         buf < shm_area->shm_area_buf + shm_area->shm_area_len)
757       break;
758   }
759 
760   assert (shm_area);
761 
762   offset = buf - shm_area->shm_area_buf;
763 
764   sp_shm_area_dec (self, shm_area);
765 
766   cb.payload.ack_buffer.offset = offset;
767   return send_command (self->main_socket, &cb, COMMAND_ACK_BUFFER,
768       self->shm_area->id);
769 }
770 
771 ShmPipe *
sp_client_open(const char * path)772 sp_client_open (const char *path)
773 {
774   ShmPipe *self = spalloc_new (ShmPipe);
775   struct sockaddr_un sock_un;
776   int flags;
777 
778   memset (self, 0, sizeof (ShmPipe));
779 
780   self->main_socket = socket (PF_UNIX, SOCK_STREAM, 0);
781   self->use_count = 1;
782 
783   if (self->main_socket < 0)
784     goto error;
785 
786   flags = fcntl (self->main_socket, F_GETFL, 0);
787   if (flags < 0)
788     goto error;
789 
790   if (fcntl (self->main_socket, F_SETFL, flags | FD_CLOEXEC) < 0)
791     goto error;
792 
793   sock_un.sun_family = AF_UNIX;
794   strncpy (sock_un.sun_path, path, sizeof (sock_un.sun_path) - 1);
795 
796   if (connect (self->main_socket, (struct sockaddr *) &sock_un,
797           sizeof (struct sockaddr_un)) < 0)
798     goto error;
799 
800   return self;
801 
802 error:
803   sp_client_close (self);
804   return NULL;
805 }
806 
807 
808 ShmClient *
sp_writer_accept_client(ShmPipe * self)809 sp_writer_accept_client (ShmPipe * self)
810 {
811   ShmClient *client = NULL;
812   int fd;
813   struct CommandBuffer cb = { 0 };
814   int pathlen = strlen (self->shm_area->shm_area_name) + 1;
815 
816 
817   fd = accept (self->main_socket, NULL, NULL);
818 
819   if (fd < 0) {
820     fprintf (stderr, "Could not client connection");
821     return NULL;
822   }
823 
824   cb.payload.new_shm_area.size = self->shm_area->shm_area_len;
825   cb.payload.new_shm_area.path_size = pathlen;
826   if (!send_command (fd, &cb, COMMAND_NEW_SHM_AREA, self->shm_area->id)) {
827     fprintf (stderr, "Sending new shm area failed: %s", strerror (errno));
828     goto error;
829   }
830 
831   if (send (fd, self->shm_area->shm_area_name, pathlen, MSG_NOSIGNAL) !=
832       pathlen) {
833     fprintf (stderr, "Sending new shm area path failed: %s", strerror (errno));
834     goto error;
835   }
836 
837   client = spalloc_new (ShmClient);
838   client->fd = fd;
839 
840   /* Prepend ot linked list */
841   client->next = self->clients;
842   self->clients = client;
843   self->num_clients++;
844 
845   return client;
846 
847 error:
848   shutdown (fd, SHUT_RDWR);
849   close (fd);
850   return NULL;
851 }
852 
853 static int
sp_shmbuf_dec(ShmPipe * self,ShmBuffer * buf,ShmBuffer * prev_buf,ShmClient * client,void ** tag)854 sp_shmbuf_dec (ShmPipe * self, ShmBuffer * buf, ShmBuffer * prev_buf,
855     ShmClient * client, void **tag)
856 {
857   int i;
858   int had_client = 0;
859 
860   /**
861    * Remove client from the list of buffer users. Here we make sure that
862    * if a client closes connection but already decremented the use count
863    * for this buffer, but other clients didn't have time to decrement
864    * buffer will not be freed too early in sp_writer_close_client.
865    */
866   for (i = 0; i < buf->num_clients; i++) {
867     if (buf->clients[i] == client->fd) {
868       buf->clients[i] = -1;
869       had_client = 1;
870       break;
871     }
872   }
873   assert (had_client);
874 
875   buf->use_count--;
876 
877   if (buf->use_count == 0) {
878     /* Remove from linked list */
879     if (prev_buf)
880       prev_buf->next = buf->next;
881     else
882       self->buffers = buf->next;
883 
884     if (tag)
885       *tag = buf->tag;
886     shm_alloc_space_block_dec (buf->ablock);
887     sp_shm_area_dec (self, buf->shm_area);
888     spalloc_free1 (sizeof (ShmBuffer) + sizeof (int) * buf->num_clients, buf);
889     return 0;
890   }
891   return 1;
892 }
893 
894 void
sp_writer_close_client(ShmPipe * self,ShmClient * client,sp_buffer_free_callback callback,void * user_data)895 sp_writer_close_client (ShmPipe * self, ShmClient * client,
896     sp_buffer_free_callback callback, void *user_data)
897 {
898   ShmBuffer *buffer = NULL, *prev_buf = NULL;
899   ShmClient *item = NULL, *prev_item = NULL;
900 
901   shutdown (client->fd, SHUT_RDWR);
902   close (client->fd);
903 
904 again:
905   for (buffer = self->buffers; buffer; buffer = buffer->next) {
906     int i;
907     void *tag = NULL;
908 
909     for (i = 0; i < buffer->num_clients; i++) {
910       if (buffer->clients[i] == client->fd) {
911         if (!sp_shmbuf_dec (self, buffer, prev_buf, client, &tag)) {
912           if (callback)
913             callback (tag, user_data);
914           goto again;
915         }
916         break;
917       }
918     }
919     prev_buf = buffer;
920   }
921 
922   for (item = self->clients; item; item = item->next) {
923     if (item == client)
924       break;
925     prev_item = item;
926   }
927   assert (item);
928 
929   if (prev_item)
930     prev_item->next = client->next;
931   else
932     self->clients = client->next;
933 
934   self->num_clients--;
935 
936   spalloc_free (ShmClient, client);
937 }
938 
939 int
sp_get_fd(ShmPipe * self)940 sp_get_fd (ShmPipe * self)
941 {
942   return self->main_socket;
943 }
944 
945 const char *
sp_get_shm_area_name(ShmPipe * self)946 sp_get_shm_area_name (ShmPipe * self)
947 {
948   if (self->shm_area)
949     return self->shm_area->shm_area_name;
950 
951   return NULL;
952 }
953 
954 int
sp_writer_get_client_fd(ShmClient * client)955 sp_writer_get_client_fd (ShmClient * client)
956 {
957   return client->fd;
958 }
959 
960 int
sp_writer_pending_writes(ShmPipe * self)961 sp_writer_pending_writes (ShmPipe * self)
962 {
963   return (self->buffers != NULL);
964 }
965 
966 const char *
sp_writer_get_path(ShmPipe * pipe)967 sp_writer_get_path (ShmPipe * pipe)
968 {
969   return pipe->socket_path;
970 }
971 
972 ShmBuffer *
sp_writer_get_pending_buffers(ShmPipe * self)973 sp_writer_get_pending_buffers (ShmPipe * self)
974 {
975   return self->buffers;
976 }
977 
978 ShmBuffer *
sp_writer_get_next_buffer(ShmBuffer * buffer)979 sp_writer_get_next_buffer (ShmBuffer * buffer)
980 {
981   return buffer->next;
982 }
983 
984 void *
sp_writer_buf_get_tag(ShmBuffer * buffer)985 sp_writer_buf_get_tag (ShmBuffer * buffer)
986 {
987   return buffer->tag;
988 }
989 
990 size_t
sp_writer_get_max_buf_size(ShmPipe * self)991 sp_writer_get_max_buf_size (ShmPipe * self)
992 {
993   if (self->shm_area == NULL)
994     return 0;
995 
996   return self->shm_area->shm_area_len;
997 }
998