• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) <2009> Collabora Ltd
3  *  @author: Olivier Crete <olivier.crete@collabora.co.uk
4  * Copyright (C) <2009> Nokia Inc
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22  * THE SOFTWARE.
23  */
24 
25 
26 #ifdef HAVE_CONFIG_H
27 #include "config.h"
28 #endif
29 
30 #ifdef HAVE_OSX
31 #ifndef MSG_NOSIGNAL
32 #define MSG_NOSIGNAL SO_NOSIGPIPE
33 #endif
34 #endif
35 
36 #include "shmpipe.h"
37 
38 #include <sys/types.h>
39 #include <sys/socket.h>
40 #include <sys/un.h>
41 #include <string.h>
42 #include <stdio.h>
43 #include <errno.h>
44 #include <unistd.h>
45 #include <fcntl.h>
46 #include <limits.h>
47 #include <sys/mman.h>
48 #include <assert.h>
49 
50 #include "shmalloc.h"
51 
52 /*
53  * The protocol over the pipe is in packets
54  *
55  * The defined types are:
56  * type 1: new shm area
57  * Area length
58  * Size of path (followed by path)
59  *
60  * type 2: Close shm area:
61  * No payload
62  *
63  * type 3: shm buffer
64  * offset
65  * bufsize
66  *
67  * type 4: ack buffer
68  * offset
69  *
70  * Type 4 goes from the client to the server
71  * The rest are from the server to the client
72  * The client should never write in the SHM
73  */
74 
75 
76 #define LISTEN_BACKLOG 10
77 
78 enum
79 {
80   COMMAND_NEW_SHM_AREA = 1,
81   COMMAND_CLOSE_SHM_AREA = 2,
82   COMMAND_NEW_BUFFER = 3,
83   COMMAND_ACK_BUFFER = 4
84 };
85 
86 typedef struct _ShmArea ShmArea;
87 
88 struct _ShmArea
89 {
90   int id;
91 
92   int use_count;
93   int is_writer;
94 
95   int shm_fd;
96 
97   char *shm_area_buf;
98   size_t shm_area_len;
99 
100   char *shm_area_name;
101 
102   ShmAllocSpace *allocspace;
103 
104   ShmArea *next;
105 };
106 
107 struct _ShmBuffer
108 {
109   int use_count;
110 
111   ShmArea *shm_area;
112   unsigned long offset;
113   size_t size;
114 
115   ShmAllocBlock *ablock;
116 
117   ShmBuffer *next;
118 
119   void *tag;
120 
121   int num_clients;
122   /* This must ALWAYS stay last in the struct */
123   int clients[0];
124 };
125 
126 
127 struct _ShmPipe
128 {
129   int main_socket;
130   char *socket_path;
131   int use_count;
132   void *data;
133 
134   ShmArea *shm_area;
135 
136   int next_area_id;
137 
138   ShmBuffer *buffers;
139 
140   int num_clients;
141   ShmClient *clients;
142 
143   mode_t perms;
144 };
145 
146 struct _ShmClient
147 {
148   int fd;
149 
150   ShmClient *next;
151 };
152 
153 struct _ShmBlock
154 {
155   ShmPipe *pipe;
156   ShmArea *area;
157   ShmAllocBlock *ablock;
158 };
159 
160 struct CommandBuffer
161 {
162   unsigned int type;
163   int area_id;
164 
165   union
166   {
167     struct
168     {
169       size_t size;
170       unsigned int path_size;
171       /* Followed by path */
172     } new_shm_area;
173     struct
174     {
175       unsigned long offset;
176       unsigned long size;
177     } buffer;
178     struct
179     {
180       unsigned long offset;
181     } ack_buffer;
182   } payload;
183 };
184 
185 static ShmArea *sp_open_shm (char *path, int id, mode_t perms, size_t size);
186 static void sp_close_shm (ShmArea * area);
187 static int sp_shmbuf_dec (ShmPipe * self, ShmBuffer * buf,
188     ShmBuffer * prev_buf, ShmClient * client, void **tag);
189 static void sp_shm_area_dec (ShmPipe * self, ShmArea * area);
190 
191 
192 
193 #define RETURN_ERROR(format, ...) do {                  \
194   fprintf (stderr, format, __VA_ARGS__);                \
195   sp_writer_close (self, NULL, NULL);                   \
196   return NULL;                                          \
197   } while (0)
198 
199 ShmPipe *
sp_writer_create(const char * path,size_t size,mode_t perms)200 sp_writer_create (const char *path, size_t size, mode_t perms)
201 {
202   ShmPipe *self = spalloc_new (ShmPipe);
203   int flags;
204   struct sockaddr_un sock_un;
205   int i = 0;
206 
207   memset (self, 0, sizeof (ShmPipe));
208 
209   self->main_socket = socket (PF_UNIX, SOCK_STREAM, 0);
210   self->use_count = 1;
211 
212   if (self->main_socket < 0)
213     RETURN_ERROR ("Could not create socket (%d): %s\n", errno,
214         strerror (errno));
215 
216   flags = fcntl (self->main_socket, F_GETFL, 0);
217   if (flags < 0)
218     RETURN_ERROR ("fcntl(F_GETFL) failed (%d): %s\n", errno, strerror (errno));
219 
220   if (fcntl (self->main_socket, F_SETFL, flags | O_NONBLOCK | FD_CLOEXEC) < 0)
221     RETURN_ERROR ("fcntl(F_SETFL) failed (%d): %s\n", errno, strerror (errno));
222 
223   sock_un.sun_family = AF_UNIX;
224   strncpy (sock_un.sun_path, path, sizeof (sock_un.sun_path) - 1);
225 
226   while (bind (self->main_socket, (struct sockaddr *) &sock_un,
227           sizeof (struct sockaddr_un)) < 0) {
228     if (errno != EADDRINUSE)
229       RETURN_ERROR ("bind() failed (%d): %s\n", errno, strerror (errno));
230 
231     if (i > 256)
232       RETURN_ERROR ("Could not find a free socket name for %s", path);
233 
234     snprintf (sock_un.sun_path, sizeof (sock_un.sun_path), "%s.%d", path, i);
235     i++;
236   }
237 
238   self->socket_path = strdup (sock_un.sun_path);
239 
240   if (chmod (self->socket_path, perms) < 0)
241     RETURN_ERROR ("failed to set socket permissions (%d): %s\n", errno,
242         strerror (errno));
243 
244   if (listen (self->main_socket, LISTEN_BACKLOG) < 0)
245     RETURN_ERROR ("listen() failed (%d): %s\n", errno, strerror (errno));
246 
247   self->shm_area = sp_open_shm (NULL, ++self->next_area_id, perms, size);
248 
249   self->perms = perms;
250 
251   if (!self->shm_area)
252     RETURN_ERROR ("Could not open shm area (%d): %s", errno, strerror (errno));
253 
254   return self;
255 }
256 
257 #undef RETURN_ERROR
258 
259 #define RETURN_ERROR(format, ...)  do {                   \
260   fprintf (stderr, format, __VA_ARGS__);                  \
261   area->use_count--;                                      \
262   sp_close_shm (area);                                    \
263   return NULL;                                            \
264   } while (0)
265 
266 /* sp_open_shm:
267  * @path: Path of the shm area for a reader,
268  *  NULL if this is a writer (then it will allocate its own path)
269  *
270  * Opens a ShmArea
271  */
272 
273 static ShmArea *
sp_open_shm(char * path,int id,mode_t perms,size_t size)274 sp_open_shm (char *path, int id, mode_t perms, size_t size)
275 {
276   ShmArea *area = spalloc_new (ShmArea);
277   char tmppath[32];
278   int flags;
279   int prot;
280   int i = 0;
281 
282   memset (area, 0, sizeof (ShmArea));
283 
284   area->shm_area_buf = MAP_FAILED;
285   area->use_count = 1;
286 
287   area->shm_area_len = size;
288 
289   area->is_writer = (path == NULL);
290 
291 
292   if (path)
293     flags = O_RDONLY;
294   else
295 #ifdef HAVE_OSX
296     flags = O_RDWR | O_CREAT | O_EXCL;
297 #else
298     flags = O_RDWR | O_CREAT | O_TRUNC | O_EXCL;
299 #endif
300 
301   area->shm_fd = -1;
302 
303   if (path) {
304     area->shm_fd = shm_open (path, flags, perms);
305   } else {
306     do {
307       snprintf (tmppath, sizeof (tmppath), "/shmpipe.%5d.%5d", getpid (), i++);
308       area->shm_fd = shm_open (tmppath, flags, perms);
309     } while (area->shm_fd < 0 && errno == EEXIST);
310   }
311 
312   if (area->shm_fd < 0)
313     RETURN_ERROR ("shm_open failed on %s (%d): %s\n",
314         path ? path : tmppath, errno, strerror (errno));
315 
316   if (!path) {
317     area->shm_area_name = strdup (tmppath);
318 
319     if (ftruncate (area->shm_fd, size))
320       RETURN_ERROR ("Could not resize memory area to header size,"
321           " ftruncate failed (%d): %s\n", errno, strerror (errno));
322 
323     prot = PROT_READ | PROT_WRITE;
324   } else {
325     area->shm_area_name = strdup (path);
326     prot = PROT_READ;
327   }
328 
329   area->shm_area_buf = mmap (NULL, size, prot, MAP_SHARED, area->shm_fd, 0);
330 
331   if (area->shm_area_buf == MAP_FAILED)
332     RETURN_ERROR ("mmap failed (%d): %s\n", errno, strerror (errno));
333 
334   area->id = id;
335 
336   if (!path)
337     area->allocspace = shm_alloc_space_new (area->shm_area_len);
338 
339   return area;
340 }
341 
342 #undef RETURN_ERROR
343 
344 static void
sp_close_shm(ShmArea * area)345 sp_close_shm (ShmArea * area)
346 {
347   assert (area->use_count == 0);
348 
349   if (area->allocspace)
350     shm_alloc_space_free (area->allocspace);
351 
352   if (area->shm_area_buf != MAP_FAILED)
353     munmap (area->shm_area_buf, area->shm_area_len);
354 
355   if (area->shm_fd >= 0)
356     close (area->shm_fd);
357 
358   if (area->shm_area_name) {
359     if (area->is_writer)
360       shm_unlink (area->shm_area_name);
361     free (area->shm_area_name);
362   }
363 
364   spalloc_free (ShmArea, area);
365 }
366 
367 static void
sp_shm_area_inc(ShmArea * area)368 sp_shm_area_inc (ShmArea * area)
369 {
370   area->use_count++;
371 }
372 
373 static void
sp_shm_area_dec(ShmPipe * self,ShmArea * area)374 sp_shm_area_dec (ShmPipe * self, ShmArea * area)
375 {
376   assert (area->use_count > 0);
377   area->use_count--;
378 
379   if (area->use_count == 0) {
380     ShmArea *item = NULL;
381     ShmArea *prev_item = NULL;
382 
383     for (item = self->shm_area; item; item = item->next) {
384       if (item == area) {
385         if (prev_item)
386           prev_item->next = item->next;
387         else
388           self->shm_area = item->next;
389         break;
390       }
391       prev_item = item;
392     }
393     assert (item);
394 
395     sp_close_shm (area);
396   }
397 }
398 
399 void *
sp_get_data(ShmPipe * self)400 sp_get_data (ShmPipe * self)
401 {
402   return self->data;
403 }
404 
405 void
sp_set_data(ShmPipe * self,void * data)406 sp_set_data (ShmPipe * self, void *data)
407 {
408   self->data = data;
409 }
410 
411 static void
sp_inc(ShmPipe * self)412 sp_inc (ShmPipe * self)
413 {
414   self->use_count++;
415 }
416 
417 static void
sp_dec(ShmPipe * self)418 sp_dec (ShmPipe * self)
419 {
420   self->use_count--;
421 
422   if (self->use_count > 0)
423     return;
424 
425   while (self->shm_area)
426     sp_shm_area_dec (self, self->shm_area);
427 
428   spalloc_free (ShmPipe, self);
429 }
430 
431 void
sp_writer_close(ShmPipe * self,sp_buffer_free_callback callback,void * user_data)432 sp_writer_close (ShmPipe * self, sp_buffer_free_callback callback,
433     void *user_data)
434 {
435   if (self->main_socket >= 0) {
436     shutdown (self->main_socket, SHUT_RDWR);
437     close (self->main_socket);
438   }
439 
440   if (self->socket_path) {
441     unlink (self->socket_path);
442     free (self->socket_path);
443   }
444 
445   while (self->clients)
446     sp_writer_close_client (self, self->clients, callback, user_data);
447 
448   sp_dec (self);
449 }
450 
451 void
sp_client_close(ShmPipe * self)452 sp_client_close (ShmPipe * self)
453 {
454   sp_writer_close (self, NULL, NULL);
455 }
456 
457 
458 int
sp_writer_setperms_shm(ShmPipe * self,mode_t perms)459 sp_writer_setperms_shm (ShmPipe * self, mode_t perms)
460 {
461   int ret = 0;
462   ShmArea *area;
463 
464   self->perms = perms;
465   for (area = self->shm_area; area; area = area->next)
466     ret |= fchmod (area->shm_fd, perms);
467 
468   ret |= chmod (self->socket_path, perms);
469 
470   return ret;
471 }
472 
473 static int
send_command(int fd,struct CommandBuffer * cb,unsigned short int type,int area_id)474 send_command (int fd, struct CommandBuffer *cb, unsigned short int type,
475     int area_id)
476 {
477   cb->type = type;
478   cb->area_id = area_id;
479 
480   if (send (fd, cb, sizeof (struct CommandBuffer), MSG_NOSIGNAL) !=
481       sizeof (struct CommandBuffer))
482     return 0;
483 
484   return 1;
485 }
486 
487 int
sp_writer_resize(ShmPipe * self,size_t size)488 sp_writer_resize (ShmPipe * self, size_t size)
489 {
490   ShmArea *newarea;
491   ShmArea *old_current;
492   ShmClient *client;
493   int c = 0;
494   int pathlen;
495 
496   if (self->shm_area->shm_area_len == size)
497     return 0;
498 
499   newarea = sp_open_shm (NULL, ++self->next_area_id, self->perms, size);
500 
501   if (!newarea)
502     return -1;
503 
504   old_current = self->shm_area;
505   newarea->next = self->shm_area;
506   self->shm_area = newarea;
507 
508   pathlen = strlen (newarea->shm_area_name) + 1;
509 
510   for (client = self->clients; client; client = client->next) {
511     struct CommandBuffer cb = { 0 };
512 
513     if (!send_command (client->fd, &cb, COMMAND_CLOSE_SHM_AREA,
514             old_current->id))
515       continue;
516 
517     cb.payload.new_shm_area.size = newarea->shm_area_len;
518     cb.payload.new_shm_area.path_size = pathlen;
519     if (!send_command (client->fd, &cb, COMMAND_NEW_SHM_AREA, newarea->id))
520       continue;
521 
522     if (send (client->fd, newarea->shm_area_name, pathlen, MSG_NOSIGNAL) !=
523         pathlen)
524       continue;
525     c++;
526   }
527 
528   sp_shm_area_dec (self, old_current);
529 
530 
531   return c;
532 }
533 
534 ShmBlock *
sp_writer_alloc_block(ShmPipe * self,size_t size)535 sp_writer_alloc_block (ShmPipe * self, size_t size)
536 {
537   ShmBlock *block;
538   ShmAllocBlock *ablock =
539       shm_alloc_space_alloc_block (self->shm_area->allocspace, size);
540 
541   if (!ablock)
542     return NULL;
543 
544   block = spalloc_new (ShmBlock);
545   sp_shm_area_inc (self->shm_area);
546   block->pipe = self;
547   block->area = self->shm_area;
548   block->ablock = ablock;
549   sp_inc (self);
550   return block;
551 }
552 
553 char *
sp_writer_block_get_buf(ShmBlock * block)554 sp_writer_block_get_buf (ShmBlock * block)
555 {
556   return block->area->shm_area_buf +
557       shm_alloc_space_alloc_block_get_offset (block->ablock);
558 }
559 
560 ShmPipe *
sp_writer_block_get_pipe(ShmBlock * block)561 sp_writer_block_get_pipe (ShmBlock * block)
562 {
563   return block->pipe;
564 }
565 
566 void
sp_writer_free_block(ShmBlock * block)567 sp_writer_free_block (ShmBlock * block)
568 {
569   shm_alloc_space_block_dec (block->ablock);
570   sp_shm_area_dec (block->pipe, block->area);
571   sp_dec (block->pipe);
572   spalloc_free (ShmBlock, block);
573 }
574 
575 /* Returns the number of client this has successfully been sent to */
576 
577 int
sp_writer_send_buf(ShmPipe * self,char * buf,size_t size,void * tag)578 sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, void *tag)
579 {
580   ShmArea *area = NULL;
581   unsigned long offset = 0;
582   unsigned long bsize = size;
583   ShmBuffer *sb;
584   ShmClient *client = NULL;
585   ShmAllocBlock *ablock = NULL;
586   int i = 0;
587   int c = 0;
588 
589   if (self->num_clients == 0)
590     return 0;
591 
592   for (area = self->shm_area; area; area = area->next) {
593     if (buf >= area->shm_area_buf &&
594         buf < (area->shm_area_buf + area->shm_area_len)) {
595       offset = buf - area->shm_area_buf;
596       ablock = shm_alloc_space_block_get (area->allocspace, offset);
597       assert (ablock);
598       break;
599     }
600   }
601 
602   if (!ablock)
603     return -1;
604 
605   sb = spalloc_alloc (sizeof (ShmBuffer) + sizeof (int) * self->num_clients);
606   memset (sb, 0, sizeof (ShmBuffer));
607   memset (sb->clients, -1, sizeof (int) * self->num_clients);
608   sb->shm_area = area;
609   sb->offset = offset;
610   sb->size = size;
611   sb->num_clients = self->num_clients;
612   sb->ablock = ablock;
613   sb->tag = tag;
614 
615   for (client = self->clients; client; client = client->next) {
616     struct CommandBuffer cb = { 0 };
617     cb.payload.buffer.offset = offset;
618     cb.payload.buffer.size = bsize;
619     if (!send_command (client->fd, &cb, COMMAND_NEW_BUFFER, self->shm_area->id))
620       continue;
621     sb->clients[i++] = client->fd;
622     c++;
623   }
624 
625   if (c == 0) {
626     spalloc_free1 (sizeof (ShmBuffer) + sizeof (int) * sb->num_clients, sb);
627     return 0;
628   }
629 
630   sp_shm_area_inc (area);
631   shm_alloc_space_block_inc (ablock);
632 
633   sb->use_count = c;
634 
635   sb->next = self->buffers;
636   self->buffers = sb;
637 
638   return c;
639 }
640 
641 static int
recv_command(int fd,struct CommandBuffer * cb)642 recv_command (int fd, struct CommandBuffer *cb)
643 {
644   int retval;
645 
646   retval = recv (fd, cb, sizeof (struct CommandBuffer), MSG_DONTWAIT);
647   if (retval == sizeof (struct CommandBuffer)) {
648     return 1;
649   } else {
650     return 0;
651   }
652 }
653 
654 long int
sp_client_recv(ShmPipe * self,char ** buf)655 sp_client_recv (ShmPipe * self, char **buf)
656 {
657   char *area_name = NULL;
658   ShmArea *newarea;
659   ShmArea *area;
660   struct CommandBuffer cb;
661   int retval;
662 
663   if (!recv_command (self->main_socket, &cb))
664     return -1;
665 
666   switch (cb.type) {
667     case COMMAND_NEW_SHM_AREA:
668       assert (cb.payload.new_shm_area.path_size > 0);
669       assert (cb.payload.new_shm_area.size > 0);
670 
671       area_name = malloc (cb.payload.new_shm_area.path_size + 1);
672       retval = recv (self->main_socket, area_name,
673           cb.payload.new_shm_area.path_size, 0);
674       if (retval != cb.payload.new_shm_area.path_size) {
675         free (area_name);
676         return -3;
677       }
678       /* Ensure area_name is NULL terminated */
679       area_name[retval] = 0;
680 
681       newarea = sp_open_shm (area_name, cb.area_id, 0,
682           cb.payload.new_shm_area.size);
683       free (area_name);
684       if (!newarea)
685         return -4;
686 
687       newarea->next = self->shm_area;
688       self->shm_area = newarea;
689       break;
690 
691     case COMMAND_CLOSE_SHM_AREA:
692       for (area = self->shm_area; area; area = area->next) {
693         if (area->id == cb.area_id) {
694           sp_shm_area_dec (self, area);
695           break;
696         }
697       }
698       break;
699 
700     case COMMAND_NEW_BUFFER:
701       assert (buf);
702       for (area = self->shm_area; area; area = area->next) {
703         if (area->id == cb.area_id) {
704           *buf = area->shm_area_buf + cb.payload.buffer.offset;
705           sp_shm_area_inc (area);
706           return cb.payload.buffer.size;
707         }
708       }
709       return -23;
710 
711     default:
712       return -99;
713   }
714 
715   return 0;
716 }
717 
718 int
sp_writer_recv(ShmPipe * self,ShmClient * client,void ** tag)719 sp_writer_recv (ShmPipe * self, ShmClient * client, void **tag)
720 {
721   ShmBuffer *buf = NULL, *prev_buf = NULL;
722   struct CommandBuffer cb;
723 
724   if (!recv_command (client->fd, &cb))
725     return -1;
726 
727   switch (cb.type) {
728     case COMMAND_ACK_BUFFER:
729 
730       for (buf = self->buffers; buf; buf = buf->next) {
731         if (buf->shm_area->id == cb.area_id &&
732             buf->offset == cb.payload.ack_buffer.offset) {
733           return sp_shmbuf_dec (self, buf, prev_buf, client, tag);
734         }
735         prev_buf = buf;
736       }
737 
738       return -2;
739     default:
740       return -99;
741   }
742 
743   return 0;
744 }
745 
746 int
sp_client_recv_finish(ShmPipe * self,char * buf)747 sp_client_recv_finish (ShmPipe * self, char *buf)
748 {
749   ShmArea *shm_area = NULL;
750   unsigned long offset;
751   struct CommandBuffer cb = { 0 };
752 
753   for (shm_area = self->shm_area; shm_area; shm_area = shm_area->next) {
754     if (buf >= shm_area->shm_area_buf &&
755         buf < shm_area->shm_area_buf + shm_area->shm_area_len)
756       break;
757   }
758 
759   assert (shm_area);
760 
761   offset = buf - shm_area->shm_area_buf;
762 
763   sp_shm_area_dec (self, shm_area);
764 
765   cb.payload.ack_buffer.offset = offset;
766   return send_command (self->main_socket, &cb, COMMAND_ACK_BUFFER,
767       self->shm_area->id);
768 }
769 
770 ShmPipe *
sp_client_open(const char * path)771 sp_client_open (const char *path)
772 {
773   ShmPipe *self = spalloc_new (ShmPipe);
774   struct sockaddr_un sock_un;
775   int flags;
776 
777   memset (self, 0, sizeof (ShmPipe));
778 
779   self->main_socket = socket (PF_UNIX, SOCK_STREAM, 0);
780   self->use_count = 1;
781 
782   if (self->main_socket < 0)
783     goto error;
784 
785   flags = fcntl (self->main_socket, F_GETFL, 0);
786   if (flags < 0)
787     goto error;
788 
789   if (fcntl (self->main_socket, F_SETFL, flags | FD_CLOEXEC) < 0)
790     goto error;
791 
792   sock_un.sun_family = AF_UNIX;
793   strncpy (sock_un.sun_path, path, sizeof (sock_un.sun_path) - 1);
794 
795   if (connect (self->main_socket, (struct sockaddr *) &sock_un,
796           sizeof (struct sockaddr_un)) < 0)
797     goto error;
798 
799   return self;
800 
801 error:
802   sp_client_close (self);
803   return NULL;
804 }
805 
806 
807 ShmClient *
sp_writer_accept_client(ShmPipe * self)808 sp_writer_accept_client (ShmPipe * self)
809 {
810   ShmClient *client = NULL;
811   int fd;
812   struct CommandBuffer cb = { 0 };
813   int pathlen = strlen (self->shm_area->shm_area_name) + 1;
814 
815 
816   fd = accept (self->main_socket, NULL, NULL);
817 
818   if (fd < 0) {
819     fprintf (stderr, "Could not client connection");
820     return NULL;
821   }
822 
823   cb.payload.new_shm_area.size = self->shm_area->shm_area_len;
824   cb.payload.new_shm_area.path_size = pathlen;
825   if (!send_command (fd, &cb, COMMAND_NEW_SHM_AREA, self->shm_area->id)) {
826     fprintf (stderr, "Sending new shm area failed: %s", strerror (errno));
827     goto error;
828   }
829 
830   if (send (fd, self->shm_area->shm_area_name, pathlen, MSG_NOSIGNAL) !=
831       pathlen) {
832     fprintf (stderr, "Sending new shm area path failed: %s", strerror (errno));
833     goto error;
834   }
835 
836   client = spalloc_new (ShmClient);
837   client->fd = fd;
838 
839   /* Prepend ot linked list */
840   client->next = self->clients;
841   self->clients = client;
842   self->num_clients++;
843 
844   return client;
845 
846 error:
847   shutdown (fd, SHUT_RDWR);
848   close (fd);
849   return NULL;
850 }
851 
852 static int
sp_shmbuf_dec(ShmPipe * self,ShmBuffer * buf,ShmBuffer * prev_buf,ShmClient * client,void ** tag)853 sp_shmbuf_dec (ShmPipe * self, ShmBuffer * buf, ShmBuffer * prev_buf,
854     ShmClient * client, void **tag)
855 {
856   int i;
857   int had_client = 0;
858 
859   /* Remove client from the list of buffer users. Here we make sure that
860    * if a client closes connection but already decremented the use count
861    * for this buffer, but other clients didn't have time to decrement
862    * buffer will not be freed too early in sp_writer_close_client.
863    */
864   for (i = 0; i < buf->num_clients; i++) {
865     if (buf->clients[i] == client->fd) {
866       buf->clients[i] = -1;
867       had_client = 1;
868       break;
869     }
870   }
871   assert (had_client);
872 
873   buf->use_count--;
874 
875   if (buf->use_count == 0) {
876     /* Remove from linked list */
877     if (prev_buf)
878       prev_buf->next = buf->next;
879     else
880       self->buffers = buf->next;
881 
882     if (tag)
883       *tag = buf->tag;
884     shm_alloc_space_block_dec (buf->ablock);
885     sp_shm_area_dec (self, buf->shm_area);
886     spalloc_free1 (sizeof (ShmBuffer) + sizeof (int) * buf->num_clients, buf);
887     return 0;
888   }
889   return 1;
890 }
891 
892 void
sp_writer_close_client(ShmPipe * self,ShmClient * client,sp_buffer_free_callback callback,void * user_data)893 sp_writer_close_client (ShmPipe * self, ShmClient * client,
894     sp_buffer_free_callback callback, void *user_data)
895 {
896   ShmBuffer *buffer = NULL, *prev_buf = NULL;
897   ShmClient *item = NULL, *prev_item = NULL;
898 
899   shutdown (client->fd, SHUT_RDWR);
900   close (client->fd);
901 
902 again:
903   for (buffer = self->buffers; buffer; buffer = buffer->next) {
904     int i;
905     void *tag = NULL;
906 
907     for (i = 0; i < buffer->num_clients; i++) {
908       if (buffer->clients[i] == client->fd) {
909         if (!sp_shmbuf_dec (self, buffer, prev_buf, client, &tag)) {
910           if (callback)
911             callback (tag, user_data);
912           goto again;
913         }
914         break;
915       }
916     }
917     prev_buf = buffer;
918   }
919 
920   for (item = self->clients; item; item = item->next) {
921     if (item == client)
922       break;
923     prev_item = item;
924   }
925   assert (item);
926 
927   if (prev_item)
928     prev_item->next = client->next;
929   else
930     self->clients = client->next;
931 
932   self->num_clients--;
933 
934   spalloc_free (ShmClient, client);
935 }
936 
937 int
sp_get_fd(ShmPipe * self)938 sp_get_fd (ShmPipe * self)
939 {
940   return self->main_socket;
941 }
942 
943 const char *
sp_get_shm_area_name(ShmPipe * self)944 sp_get_shm_area_name (ShmPipe * self)
945 {
946   if (self->shm_area)
947     return self->shm_area->shm_area_name;
948 
949   return NULL;
950 }
951 
952 int
sp_writer_get_client_fd(ShmClient * client)953 sp_writer_get_client_fd (ShmClient * client)
954 {
955   return client->fd;
956 }
957 
958 int
sp_writer_pending_writes(ShmPipe * self)959 sp_writer_pending_writes (ShmPipe * self)
960 {
961   return (self->buffers != NULL);
962 }
963 
964 const char *
sp_writer_get_path(ShmPipe * pipe)965 sp_writer_get_path (ShmPipe * pipe)
966 {
967   return pipe->socket_path;
968 }
969 
970 ShmBuffer *
sp_writer_get_pending_buffers(ShmPipe * self)971 sp_writer_get_pending_buffers (ShmPipe * self)
972 {
973   return self->buffers;
974 }
975 
976 ShmBuffer *
sp_writer_get_next_buffer(ShmBuffer * buffer)977 sp_writer_get_next_buffer (ShmBuffer * buffer)
978 {
979   return buffer->next;
980 }
981 
982 void *
sp_writer_buf_get_tag(ShmBuffer * buffer)983 sp_writer_buf_get_tag (ShmBuffer * buffer)
984 {
985   return buffer->tag;
986 }
987 
988 size_t
sp_writer_get_max_buf_size(ShmPipe * self)989 sp_writer_get_max_buf_size (ShmPipe * self)
990 {
991   if (self->shm_area == NULL)
992     return 0;
993 
994   return self->shm_area->shm_area_len;
995 }
996