1 /*
2 FUSE: Filesystem in Userspace
3 Copyright (C) 2001-2007 Miklos Szeredi <miklos@szeredi.hu>
4
5 Implementation of the multi-threaded FUSE session loop.
6
7 This program can be distributed under the terms of the GNU LGPLv2.
8 See the file COPYING.LIB.
9 */
10
11 #include "config.h"
12 #include "fuse_lowlevel.h"
13 #include "fuse_misc.h"
14 #include "fuse_kernel.h"
15 #include "fuse_i.h"
16
17 #include <stdio.h>
18 #include <stdlib.h>
19 #include <string.h>
20 #include <unistd.h>
21 #include <signal.h>
22 #include <semaphore.h>
23 #include <errno.h>
24 #include <sys/time.h>
25 #include <sys/ioctl.h>
26 #include <assert.h>
27
28 /* Environment var controlling the thread stack size */
29 #define ENVNAME_THREAD_STACK "FUSE_THREAD_STACK"
30
31 struct fuse_worker {
32 struct fuse_worker *prev;
33 struct fuse_worker *next;
34 pthread_t thread_id;
35 size_t bufsize;
36
37 // We need to include fuse_buf so that we can properly free
38 // it when a thread is terminated by pthread_cancel().
39 struct fuse_buf fbuf;
40 struct fuse_chan *ch;
41 struct fuse_mt *mt;
42 };
43
44 struct fuse_mt {
45 pthread_mutex_t lock;
46 int numworker;
47 int numavail;
48 struct fuse_session *se;
49 struct fuse_worker main;
50 sem_t finish;
51 int exit;
52 int error;
53 int clone_fd;
54 int max_idle;
55 };
56
fuse_chan_new(int fd)57 static struct fuse_chan *fuse_chan_new(int fd)
58 {
59 struct fuse_chan *ch = (struct fuse_chan *) malloc(sizeof(*ch));
60 if (ch == NULL) {
61 fuse_log(FUSE_LOG_ERR, "fuse: failed to allocate channel\n");
62 return NULL;
63 }
64
65 memset(ch, 0, sizeof(*ch));
66 ch->fd = fd;
67 ch->ctr = 1;
68 fuse_mutex_init(&ch->lock);
69
70 return ch;
71 }
72
fuse_chan_get(struct fuse_chan * ch)73 struct fuse_chan *fuse_chan_get(struct fuse_chan *ch)
74 {
75 assert(ch->ctr > 0);
76 pthread_mutex_lock(&ch->lock);
77 ch->ctr++;
78 pthread_mutex_unlock(&ch->lock);
79
80 return ch;
81 }
82
fuse_chan_put(struct fuse_chan * ch)83 void fuse_chan_put(struct fuse_chan *ch)
84 {
85 if (ch == NULL)
86 return;
87 pthread_mutex_lock(&ch->lock);
88 ch->ctr--;
89 if (!ch->ctr) {
90 pthread_mutex_unlock(&ch->lock);
91 close(ch->fd);
92 pthread_mutex_destroy(&ch->lock);
93 free(ch);
94 } else
95 pthread_mutex_unlock(&ch->lock);
96 }
97
list_add_worker(struct fuse_worker * w,struct fuse_worker * next)98 static void list_add_worker(struct fuse_worker *w, struct fuse_worker *next)
99 {
100 struct fuse_worker *prev = next->prev;
101 w->next = next;
102 w->prev = prev;
103 prev->next = w;
104 next->prev = w;
105 }
106
list_del_worker(struct fuse_worker * w)107 static void list_del_worker(struct fuse_worker *w)
108 {
109 struct fuse_worker *prev = w->prev;
110 struct fuse_worker *next = w->next;
111 prev->next = next;
112 next->prev = prev;
113 }
114
115 static int fuse_loop_start_thread(struct fuse_mt *mt);
116
fuse_do_work(void * data)117 static void *fuse_do_work(void *data)
118 {
119 struct fuse_worker *w = (struct fuse_worker *) data;
120 struct fuse_mt *mt = w->mt;
121
122 while (!fuse_session_exited(mt->se)) {
123 int isforget = 0;
124 int res;
125
126 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
127 res = fuse_session_receive_buf_int(mt->se, &w->fbuf, w->ch);
128 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
129 if (res == -EINTR)
130 continue;
131 if (res <= 0) {
132 if (res < 0) {
133 fuse_session_exit(mt->se);
134 mt->error = res;
135 }
136 break;
137 }
138
139 pthread_mutex_lock(&mt->lock);
140 if (mt->exit) {
141 pthread_mutex_unlock(&mt->lock);
142 return NULL;
143 }
144
145 /*
146 * This disgusting hack is needed so that zillions of threads
147 * are not created on a burst of FORGET messages
148 */
149 if (!(w->fbuf.flags & FUSE_BUF_IS_FD)) {
150 struct fuse_in_header *in = w->fbuf.mem;
151
152 if (in->opcode == FUSE_FORGET ||
153 in->opcode == FUSE_BATCH_FORGET)
154 isforget = 1;
155 }
156
157 if (!isforget)
158 mt->numavail--;
159 if (mt->numavail == 0)
160 fuse_loop_start_thread(mt);
161 pthread_mutex_unlock(&mt->lock);
162
163 fuse_session_process_buf_int(mt->se, &w->fbuf, w->ch);
164
165 pthread_mutex_lock(&mt->lock);
166 if (!isforget)
167 mt->numavail++;
168 if (mt->numavail > mt->max_idle) {
169 if (mt->exit) {
170 pthread_mutex_unlock(&mt->lock);
171 return NULL;
172 }
173 list_del_worker(w);
174 mt->numavail--;
175 mt->numworker--;
176 pthread_mutex_unlock(&mt->lock);
177
178 pthread_detach(w->thread_id);
179 free(w->fbuf.mem);
180 fuse_chan_put(w->ch);
181 free(w);
182 return NULL;
183 }
184 pthread_mutex_unlock(&mt->lock);
185 }
186
187 sem_post(&mt->finish);
188
189 return NULL;
190 }
191
fuse_start_thread(pthread_t * thread_id,void * (* func)(void *),void * arg)192 int fuse_start_thread(pthread_t *thread_id, void *(*func)(void *), void *arg)
193 {
194 sigset_t oldset;
195 sigset_t newset;
196 int res;
197 pthread_attr_t attr;
198 char *stack_size;
199
200 /* Override default stack size */
201 pthread_attr_init(&attr);
202 stack_size = getenv(ENVNAME_THREAD_STACK);
203 if (stack_size && pthread_attr_setstacksize(&attr, atoi(stack_size)))
204 fuse_log(FUSE_LOG_ERR, "fuse: invalid stack size: %s\n", stack_size);
205
206 /* Disallow signal reception in worker threads */
207 sigemptyset(&newset);
208 sigaddset(&newset, SIGTERM);
209 sigaddset(&newset, SIGINT);
210 sigaddset(&newset, SIGHUP);
211 sigaddset(&newset, SIGQUIT);
212 pthread_sigmask(SIG_BLOCK, &newset, &oldset);
213 res = pthread_create(thread_id, &attr, func, arg);
214 pthread_sigmask(SIG_SETMASK, &oldset, NULL);
215 pthread_attr_destroy(&attr);
216 if (res != 0) {
217 fuse_log(FUSE_LOG_ERR, "fuse: error creating thread: %s\n",
218 strerror(res));
219 return -1;
220 }
221
222 return 0;
223 }
224
fuse_clone_chan(struct fuse_mt * mt)225 static struct fuse_chan *fuse_clone_chan(struct fuse_mt *mt)
226 {
227 int res;
228 int clonefd;
229 uint32_t masterfd;
230 struct fuse_chan *newch;
231 const char *devname = "/dev/fuse";
232
233 #ifndef O_CLOEXEC
234 #define O_CLOEXEC 0
235 #endif
236 clonefd = open(devname, O_RDWR | O_CLOEXEC);
237 if (clonefd == -1) {
238 fuse_log(FUSE_LOG_ERR, "fuse: failed to open %s: %s\n", devname,
239 strerror(errno));
240 return NULL;
241 }
242 fcntl(clonefd, F_SETFD, FD_CLOEXEC);
243
244 masterfd = mt->se->fd;
245 res = ioctl(clonefd, FUSE_DEV_IOC_CLONE, &masterfd);
246 if (res == -1) {
247 fuse_log(FUSE_LOG_ERR, "fuse: failed to clone device fd: %s\n",
248 strerror(errno));
249 close(clonefd);
250 return NULL;
251 }
252 newch = fuse_chan_new(clonefd);
253 if (newch == NULL)
254 close(clonefd);
255
256 return newch;
257 }
258
fuse_loop_start_thread(struct fuse_mt * mt)259 static int fuse_loop_start_thread(struct fuse_mt *mt)
260 {
261 int res;
262
263 struct fuse_worker *w = malloc(sizeof(struct fuse_worker));
264 if (!w) {
265 fuse_log(FUSE_LOG_ERR, "fuse: failed to allocate worker structure\n");
266 return -1;
267 }
268 memset(w, 0, sizeof(struct fuse_worker));
269 w->fbuf.mem = NULL;
270 w->mt = mt;
271
272 w->ch = NULL;
273 if (mt->clone_fd) {
274 w->ch = fuse_clone_chan(mt);
275 if(!w->ch) {
276 /* Don't attempt this again */
277 fuse_log(FUSE_LOG_ERR, "fuse: trying to continue "
278 "without -o clone_fd.\n");
279 mt->clone_fd = 0;
280 }
281 }
282
283 res = fuse_start_thread(&w->thread_id, fuse_do_work, w);
284 if (res == -1) {
285 fuse_chan_put(w->ch);
286 free(w);
287 return -1;
288 }
289 list_add_worker(w, &mt->main);
290 mt->numavail ++;
291 mt->numworker ++;
292
293 return 0;
294 }
295
fuse_join_worker(struct fuse_mt * mt,struct fuse_worker * w)296 static void fuse_join_worker(struct fuse_mt *mt, struct fuse_worker *w)
297 {
298 pthread_join(w->thread_id, NULL);
299 pthread_mutex_lock(&mt->lock);
300 list_del_worker(w);
301 pthread_mutex_unlock(&mt->lock);
302 free(w->fbuf.mem);
303 fuse_chan_put(w->ch);
304 free(w);
305 }
306
307 FUSE_SYMVER(".symver fuse_session_loop_mt_32,fuse_session_loop_mt@@FUSE_3.2");
fuse_session_loop_mt_32(struct fuse_session * se,struct fuse_loop_config * config)308 int fuse_session_loop_mt_32(struct fuse_session *se, struct fuse_loop_config *config)
309 {
310 int err;
311 struct fuse_mt mt;
312 struct fuse_worker *w;
313
314 memset(&mt, 0, sizeof(struct fuse_mt));
315 mt.se = se;
316 mt.clone_fd = config->clone_fd;
317 mt.error = 0;
318 mt.numworker = 0;
319 mt.numavail = 0;
320 mt.max_idle = config->max_idle_threads;
321 mt.main.thread_id = pthread_self();
322 mt.main.prev = mt.main.next = &mt.main;
323 sem_init(&mt.finish, 0, 0);
324 fuse_mutex_init(&mt.lock);
325
326 pthread_mutex_lock(&mt.lock);
327 err = fuse_loop_start_thread(&mt);
328 pthread_mutex_unlock(&mt.lock);
329 if (!err) {
330 /* sem_wait() is interruptible */
331 while (!fuse_session_exited(se))
332 sem_wait(&mt.finish);
333
334 pthread_mutex_lock(&mt.lock);
335 for (w = mt.main.next; w != &mt.main; w = w->next)
336 pthread_cancel(w->thread_id);
337 mt.exit = 1;
338 pthread_mutex_unlock(&mt.lock);
339
340 while (mt.main.next != &mt.main)
341 fuse_join_worker(&mt, mt.main.next);
342
343 err = mt.error;
344 }
345
346 pthread_mutex_destroy(&mt.lock);
347 sem_destroy(&mt.finish);
348 if(se->error != 0)
349 err = se->error;
350 fuse_session_reset(se);
351 return err;
352 }
353
354 int fuse_session_loop_mt_31(struct fuse_session *se, int clone_fd);
355 FUSE_SYMVER(".symver fuse_session_loop_mt_31,fuse_session_loop_mt@FUSE_3.0");
fuse_session_loop_mt_31(struct fuse_session * se,int clone_fd)356 int fuse_session_loop_mt_31(struct fuse_session *se, int clone_fd)
357 {
358 struct fuse_loop_config config;
359 config.clone_fd = clone_fd;
360 config.max_idle_threads = 10;
361 return fuse_session_loop_mt_32(se, &config);
362 }
363