1 /******************************************************************************
2 *******************************************************************************
3 **
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2004-2008 Red Hat, Inc. All rights reserved.
6 **
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
26 #include "user.h"
27
28 static int ls_count;
29 static struct mutex ls_lock;
30 static struct list_head lslist;
31 static spinlock_t lslist_lock;
32 static struct task_struct * scand_task;
33
34
dlm_control_store(struct dlm_ls * ls,const char * buf,size_t len)35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37 ssize_t ret = len;
38 int n = simple_strtol(buf, NULL, 0);
39
40 ls = dlm_find_lockspace_local(ls->ls_local_handle);
41 if (!ls)
42 return -EINVAL;
43
44 switch (n) {
45 case 0:
46 dlm_ls_stop(ls);
47 break;
48 case 1:
49 dlm_ls_start(ls);
50 break;
51 default:
52 ret = -EINVAL;
53 }
54 dlm_put_lockspace(ls);
55 return ret;
56 }
57
dlm_event_store(struct dlm_ls * ls,const char * buf,size_t len)58 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
59 {
60 ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
61 set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
62 wake_up(&ls->ls_uevent_wait);
63 return len;
64 }
65
dlm_id_show(struct dlm_ls * ls,char * buf)66 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
67 {
68 return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
69 }
70
dlm_id_store(struct dlm_ls * ls,const char * buf,size_t len)71 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
72 {
73 ls->ls_global_id = simple_strtoul(buf, NULL, 0);
74 return len;
75 }
76
dlm_recover_status_show(struct dlm_ls * ls,char * buf)77 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
78 {
79 uint32_t status = dlm_recover_status(ls);
80 return snprintf(buf, PAGE_SIZE, "%x\n", status);
81 }
82
dlm_recover_nodeid_show(struct dlm_ls * ls,char * buf)83 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
84 {
85 return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
86 }
87
88 struct dlm_attr {
89 struct attribute attr;
90 ssize_t (*show)(struct dlm_ls *, char *);
91 ssize_t (*store)(struct dlm_ls *, const char *, size_t);
92 };
93
94 static struct dlm_attr dlm_attr_control = {
95 .attr = {.name = "control", .mode = S_IWUSR},
96 .store = dlm_control_store
97 };
98
99 static struct dlm_attr dlm_attr_event = {
100 .attr = {.name = "event_done", .mode = S_IWUSR},
101 .store = dlm_event_store
102 };
103
104 static struct dlm_attr dlm_attr_id = {
105 .attr = {.name = "id", .mode = S_IRUGO | S_IWUSR},
106 .show = dlm_id_show,
107 .store = dlm_id_store
108 };
109
110 static struct dlm_attr dlm_attr_recover_status = {
111 .attr = {.name = "recover_status", .mode = S_IRUGO},
112 .show = dlm_recover_status_show
113 };
114
115 static struct dlm_attr dlm_attr_recover_nodeid = {
116 .attr = {.name = "recover_nodeid", .mode = S_IRUGO},
117 .show = dlm_recover_nodeid_show
118 };
119
120 static struct attribute *dlm_attrs[] = {
121 &dlm_attr_control.attr,
122 &dlm_attr_event.attr,
123 &dlm_attr_id.attr,
124 &dlm_attr_recover_status.attr,
125 &dlm_attr_recover_nodeid.attr,
126 NULL,
127 };
128
dlm_attr_show(struct kobject * kobj,struct attribute * attr,char * buf)129 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
130 char *buf)
131 {
132 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
133 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
134 return a->show ? a->show(ls, buf) : 0;
135 }
136
dlm_attr_store(struct kobject * kobj,struct attribute * attr,const char * buf,size_t len)137 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
138 const char *buf, size_t len)
139 {
140 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
141 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
142 return a->store ? a->store(ls, buf, len) : len;
143 }
144
lockspace_kobj_release(struct kobject * k)145 static void lockspace_kobj_release(struct kobject *k)
146 {
147 struct dlm_ls *ls = container_of(k, struct dlm_ls, ls_kobj);
148 kfree(ls);
149 }
150
151 static struct sysfs_ops dlm_attr_ops = {
152 .show = dlm_attr_show,
153 .store = dlm_attr_store,
154 };
155
156 static struct kobj_type dlm_ktype = {
157 .default_attrs = dlm_attrs,
158 .sysfs_ops = &dlm_attr_ops,
159 .release = lockspace_kobj_release,
160 };
161
162 static struct kset *dlm_kset;
163
do_uevent(struct dlm_ls * ls,int in)164 static int do_uevent(struct dlm_ls *ls, int in)
165 {
166 int error;
167
168 if (in)
169 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
170 else
171 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
172
173 log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
174
175 /* dlm_controld will see the uevent, do the necessary group management
176 and then write to sysfs to wake us */
177
178 error = wait_event_interruptible(ls->ls_uevent_wait,
179 test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
180
181 log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
182
183 if (error)
184 goto out;
185
186 error = ls->ls_uevent_result;
187 out:
188 if (error)
189 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
190 error, ls->ls_uevent_result);
191 return error;
192 }
193
194
dlm_lockspace_init(void)195 int __init dlm_lockspace_init(void)
196 {
197 ls_count = 0;
198 mutex_init(&ls_lock);
199 INIT_LIST_HEAD(&lslist);
200 spin_lock_init(&lslist_lock);
201
202 dlm_kset = kset_create_and_add("dlm", NULL, kernel_kobj);
203 if (!dlm_kset) {
204 printk(KERN_WARNING "%s: can not create kset\n", __func__);
205 return -ENOMEM;
206 }
207 return 0;
208 }
209
dlm_lockspace_exit(void)210 void dlm_lockspace_exit(void)
211 {
212 kset_unregister(dlm_kset);
213 }
214
find_ls_to_scan(void)215 static struct dlm_ls *find_ls_to_scan(void)
216 {
217 struct dlm_ls *ls;
218
219 spin_lock(&lslist_lock);
220 list_for_each_entry(ls, &lslist, ls_list) {
221 if (time_after_eq(jiffies, ls->ls_scan_time +
222 dlm_config.ci_scan_secs * HZ)) {
223 spin_unlock(&lslist_lock);
224 return ls;
225 }
226 }
227 spin_unlock(&lslist_lock);
228 return NULL;
229 }
230
dlm_scand(void * data)231 static int dlm_scand(void *data)
232 {
233 struct dlm_ls *ls;
234 int timeout_jiffies = dlm_config.ci_scan_secs * HZ;
235
236 while (!kthread_should_stop()) {
237 ls = find_ls_to_scan();
238 if (ls) {
239 if (dlm_lock_recovery_try(ls)) {
240 ls->ls_scan_time = jiffies;
241 dlm_scan_rsbs(ls);
242 dlm_scan_timeout(ls);
243 dlm_unlock_recovery(ls);
244 } else {
245 ls->ls_scan_time += HZ;
246 }
247 } else {
248 schedule_timeout_interruptible(timeout_jiffies);
249 }
250 }
251 return 0;
252 }
253
dlm_scand_start(void)254 static int dlm_scand_start(void)
255 {
256 struct task_struct *p;
257 int error = 0;
258
259 p = kthread_run(dlm_scand, NULL, "dlm_scand");
260 if (IS_ERR(p))
261 error = PTR_ERR(p);
262 else
263 scand_task = p;
264 return error;
265 }
266
dlm_scand_stop(void)267 static void dlm_scand_stop(void)
268 {
269 kthread_stop(scand_task);
270 }
271
dlm_find_lockspace_global(uint32_t id)272 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
273 {
274 struct dlm_ls *ls;
275
276 spin_lock(&lslist_lock);
277
278 list_for_each_entry(ls, &lslist, ls_list) {
279 if (ls->ls_global_id == id) {
280 ls->ls_count++;
281 goto out;
282 }
283 }
284 ls = NULL;
285 out:
286 spin_unlock(&lslist_lock);
287 return ls;
288 }
289
dlm_find_lockspace_local(dlm_lockspace_t * lockspace)290 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
291 {
292 struct dlm_ls *ls;
293
294 spin_lock(&lslist_lock);
295 list_for_each_entry(ls, &lslist, ls_list) {
296 if (ls->ls_local_handle == lockspace) {
297 ls->ls_count++;
298 goto out;
299 }
300 }
301 ls = NULL;
302 out:
303 spin_unlock(&lslist_lock);
304 return ls;
305 }
306
dlm_find_lockspace_device(int minor)307 struct dlm_ls *dlm_find_lockspace_device(int minor)
308 {
309 struct dlm_ls *ls;
310
311 spin_lock(&lslist_lock);
312 list_for_each_entry(ls, &lslist, ls_list) {
313 if (ls->ls_device.minor == minor) {
314 ls->ls_count++;
315 goto out;
316 }
317 }
318 ls = NULL;
319 out:
320 spin_unlock(&lslist_lock);
321 return ls;
322 }
323
dlm_put_lockspace(struct dlm_ls * ls)324 void dlm_put_lockspace(struct dlm_ls *ls)
325 {
326 spin_lock(&lslist_lock);
327 ls->ls_count--;
328 spin_unlock(&lslist_lock);
329 }
330
remove_lockspace(struct dlm_ls * ls)331 static void remove_lockspace(struct dlm_ls *ls)
332 {
333 for (;;) {
334 spin_lock(&lslist_lock);
335 if (ls->ls_count == 0) {
336 WARN_ON(ls->ls_create_count != 0);
337 list_del(&ls->ls_list);
338 spin_unlock(&lslist_lock);
339 return;
340 }
341 spin_unlock(&lslist_lock);
342 ssleep(1);
343 }
344 }
345
threads_start(void)346 static int threads_start(void)
347 {
348 int error;
349
350 /* Thread which process lock requests for all lockspace's */
351 error = dlm_astd_start();
352 if (error) {
353 log_print("cannot start dlm_astd thread %d", error);
354 goto fail;
355 }
356
357 error = dlm_scand_start();
358 if (error) {
359 log_print("cannot start dlm_scand thread %d", error);
360 goto astd_fail;
361 }
362
363 /* Thread for sending/receiving messages for all lockspace's */
364 error = dlm_lowcomms_start();
365 if (error) {
366 log_print("cannot start dlm lowcomms %d", error);
367 goto scand_fail;
368 }
369
370 return 0;
371
372 scand_fail:
373 dlm_scand_stop();
374 astd_fail:
375 dlm_astd_stop();
376 fail:
377 return error;
378 }
379
threads_stop(void)380 static void threads_stop(void)
381 {
382 dlm_scand_stop();
383 dlm_lowcomms_stop();
384 dlm_astd_stop();
385 }
386
new_lockspace(char * name,int namelen,void ** lockspace,uint32_t flags,int lvblen)387 static int new_lockspace(char *name, int namelen, void **lockspace,
388 uint32_t flags, int lvblen)
389 {
390 struct dlm_ls *ls;
391 int i, size, error;
392 int do_unreg = 0;
393
394 if (namelen > DLM_LOCKSPACE_LEN)
395 return -EINVAL;
396
397 if (!lvblen || (lvblen % 8))
398 return -EINVAL;
399
400 if (!try_module_get(THIS_MODULE))
401 return -EINVAL;
402
403 if (!dlm_user_daemon_available()) {
404 module_put(THIS_MODULE);
405 return -EUNATCH;
406 }
407
408 error = 0;
409
410 spin_lock(&lslist_lock);
411 list_for_each_entry(ls, &lslist, ls_list) {
412 WARN_ON(ls->ls_create_count <= 0);
413 if (ls->ls_namelen != namelen)
414 continue;
415 if (memcmp(ls->ls_name, name, namelen))
416 continue;
417 if (flags & DLM_LSFL_NEWEXCL) {
418 error = -EEXIST;
419 break;
420 }
421 ls->ls_create_count++;
422 module_put(THIS_MODULE);
423 error = 1; /* not an error, return 0 */
424 break;
425 }
426 spin_unlock(&lslist_lock);
427
428 if (error < 0)
429 goto out;
430 if (error)
431 goto ret_zero;
432
433 error = -ENOMEM;
434
435 ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
436 if (!ls)
437 goto out;
438 memcpy(ls->ls_name, name, namelen);
439 ls->ls_namelen = namelen;
440 ls->ls_lvblen = lvblen;
441 ls->ls_count = 0;
442 ls->ls_flags = 0;
443 ls->ls_scan_time = jiffies;
444
445 if (flags & DLM_LSFL_TIMEWARN)
446 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
447
448 if (flags & DLM_LSFL_FS)
449 ls->ls_allocation = GFP_NOFS;
450 else
451 ls->ls_allocation = GFP_KERNEL;
452
453 /* ls_exflags are forced to match among nodes, and we don't
454 need to require all nodes to have some flags set */
455 ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
456 DLM_LSFL_NEWEXCL));
457
458 size = dlm_config.ci_rsbtbl_size;
459 ls->ls_rsbtbl_size = size;
460
461 ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
462 if (!ls->ls_rsbtbl)
463 goto out_lsfree;
464 for (i = 0; i < size; i++) {
465 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
466 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
467 spin_lock_init(&ls->ls_rsbtbl[i].lock);
468 }
469
470 size = dlm_config.ci_lkbtbl_size;
471 ls->ls_lkbtbl_size = size;
472
473 ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
474 if (!ls->ls_lkbtbl)
475 goto out_rsbfree;
476 for (i = 0; i < size; i++) {
477 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
478 rwlock_init(&ls->ls_lkbtbl[i].lock);
479 ls->ls_lkbtbl[i].counter = 1;
480 }
481
482 size = dlm_config.ci_dirtbl_size;
483 ls->ls_dirtbl_size = size;
484
485 ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
486 if (!ls->ls_dirtbl)
487 goto out_lkbfree;
488 for (i = 0; i < size; i++) {
489 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
490 rwlock_init(&ls->ls_dirtbl[i].lock);
491 }
492
493 INIT_LIST_HEAD(&ls->ls_waiters);
494 mutex_init(&ls->ls_waiters_mutex);
495 INIT_LIST_HEAD(&ls->ls_orphans);
496 mutex_init(&ls->ls_orphans_mutex);
497 INIT_LIST_HEAD(&ls->ls_timeout);
498 mutex_init(&ls->ls_timeout_mutex);
499
500 INIT_LIST_HEAD(&ls->ls_nodes);
501 INIT_LIST_HEAD(&ls->ls_nodes_gone);
502 ls->ls_num_nodes = 0;
503 ls->ls_low_nodeid = 0;
504 ls->ls_total_weight = 0;
505 ls->ls_node_array = NULL;
506
507 memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
508 ls->ls_stub_rsb.res_ls = ls;
509
510 ls->ls_debug_rsb_dentry = NULL;
511 ls->ls_debug_waiters_dentry = NULL;
512
513 init_waitqueue_head(&ls->ls_uevent_wait);
514 ls->ls_uevent_result = 0;
515 init_completion(&ls->ls_members_done);
516 ls->ls_members_result = -1;
517
518 ls->ls_recoverd_task = NULL;
519 mutex_init(&ls->ls_recoverd_active);
520 spin_lock_init(&ls->ls_recover_lock);
521 spin_lock_init(&ls->ls_rcom_spin);
522 get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
523 ls->ls_recover_status = 0;
524 ls->ls_recover_seq = 0;
525 ls->ls_recover_args = NULL;
526 init_rwsem(&ls->ls_in_recovery);
527 init_rwsem(&ls->ls_recv_active);
528 INIT_LIST_HEAD(&ls->ls_requestqueue);
529 mutex_init(&ls->ls_requestqueue_mutex);
530 mutex_init(&ls->ls_clear_proc_locks);
531
532 ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL);
533 if (!ls->ls_recover_buf)
534 goto out_dirfree;
535
536 INIT_LIST_HEAD(&ls->ls_recover_list);
537 spin_lock_init(&ls->ls_recover_list_lock);
538 ls->ls_recover_list_count = 0;
539 ls->ls_local_handle = ls;
540 init_waitqueue_head(&ls->ls_wait_general);
541 INIT_LIST_HEAD(&ls->ls_root_list);
542 init_rwsem(&ls->ls_root_sem);
543
544 down_write(&ls->ls_in_recovery);
545
546 spin_lock(&lslist_lock);
547 ls->ls_create_count = 1;
548 list_add(&ls->ls_list, &lslist);
549 spin_unlock(&lslist_lock);
550
551 /* needs to find ls in lslist */
552 error = dlm_recoverd_start(ls);
553 if (error) {
554 log_error(ls, "can't start dlm_recoverd %d", error);
555 goto out_delist;
556 }
557
558 ls->ls_kobj.kset = dlm_kset;
559 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
560 "%s", ls->ls_name);
561 if (error)
562 goto out_stop;
563 kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
564
565 /* let kobject handle freeing of ls if there's an error */
566 do_unreg = 1;
567
568 /* This uevent triggers dlm_controld in userspace to add us to the
569 group of nodes that are members of this lockspace (managed by the
570 cluster infrastructure.) Once it's done that, it tells us who the
571 current lockspace members are (via configfs) and then tells the
572 lockspace to start running (via sysfs) in dlm_ls_start(). */
573
574 error = do_uevent(ls, 1);
575 if (error)
576 goto out_stop;
577
578 wait_for_completion(&ls->ls_members_done);
579 error = ls->ls_members_result;
580 if (error)
581 goto out_members;
582
583 dlm_create_debug_file(ls);
584
585 log_debug(ls, "join complete");
586 ret_zero:
587 *lockspace = ls;
588 return 0;
589
590 out_members:
591 do_uevent(ls, 0);
592 dlm_clear_members(ls);
593 kfree(ls->ls_node_array);
594 out_stop:
595 dlm_recoverd_stop(ls);
596 out_delist:
597 spin_lock(&lslist_lock);
598 list_del(&ls->ls_list);
599 spin_unlock(&lslist_lock);
600 kfree(ls->ls_recover_buf);
601 out_dirfree:
602 kfree(ls->ls_dirtbl);
603 out_lkbfree:
604 kfree(ls->ls_lkbtbl);
605 out_rsbfree:
606 kfree(ls->ls_rsbtbl);
607 out_lsfree:
608 if (do_unreg)
609 kobject_put(&ls->ls_kobj);
610 else
611 kfree(ls);
612 out:
613 module_put(THIS_MODULE);
614 return error;
615 }
616
dlm_new_lockspace(char * name,int namelen,void ** lockspace,uint32_t flags,int lvblen)617 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
618 uint32_t flags, int lvblen)
619 {
620 int error = 0;
621
622 mutex_lock(&ls_lock);
623 if (!ls_count)
624 error = threads_start();
625 if (error)
626 goto out;
627
628 error = new_lockspace(name, namelen, lockspace, flags, lvblen);
629 if (!error)
630 ls_count++;
631 else if (!ls_count)
632 threads_stop();
633 out:
634 mutex_unlock(&ls_lock);
635 return error;
636 }
637
638 /* Return 1 if the lockspace still has active remote locks,
639 * 2 if the lockspace still has active local locks.
640 */
lockspace_busy(struct dlm_ls * ls)641 static int lockspace_busy(struct dlm_ls *ls)
642 {
643 int i, lkb_found = 0;
644 struct dlm_lkb *lkb;
645
646 /* NOTE: We check the lockidtbl here rather than the resource table.
647 This is because there may be LKBs queued as ASTs that have been
648 unlinked from their RSBs and are pending deletion once the AST has
649 been delivered */
650
651 for (i = 0; i < ls->ls_lkbtbl_size; i++) {
652 read_lock(&ls->ls_lkbtbl[i].lock);
653 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
654 lkb_found = 1;
655 list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
656 lkb_idtbl_list) {
657 if (!lkb->lkb_nodeid) {
658 read_unlock(&ls->ls_lkbtbl[i].lock);
659 return 2;
660 }
661 }
662 }
663 read_unlock(&ls->ls_lkbtbl[i].lock);
664 }
665 return lkb_found;
666 }
667
release_lockspace(struct dlm_ls * ls,int force)668 static int release_lockspace(struct dlm_ls *ls, int force)
669 {
670 struct dlm_lkb *lkb;
671 struct dlm_rsb *rsb;
672 struct list_head *head;
673 int i, busy, rv;
674
675 busy = lockspace_busy(ls);
676
677 spin_lock(&lslist_lock);
678 if (ls->ls_create_count == 1) {
679 if (busy > force)
680 rv = -EBUSY;
681 else {
682 /* remove_lockspace takes ls off lslist */
683 ls->ls_create_count = 0;
684 rv = 0;
685 }
686 } else if (ls->ls_create_count > 1) {
687 rv = --ls->ls_create_count;
688 } else {
689 rv = -EINVAL;
690 }
691 spin_unlock(&lslist_lock);
692
693 if (rv) {
694 log_debug(ls, "release_lockspace no remove %d", rv);
695 return rv;
696 }
697
698 dlm_device_deregister(ls);
699
700 if (force < 3 && dlm_user_daemon_available())
701 do_uevent(ls, 0);
702
703 dlm_recoverd_stop(ls);
704
705 remove_lockspace(ls);
706
707 dlm_delete_debug_file(ls);
708
709 dlm_astd_suspend();
710
711 kfree(ls->ls_recover_buf);
712
713 /*
714 * Free direntry structs.
715 */
716
717 dlm_dir_clear(ls);
718 kfree(ls->ls_dirtbl);
719
720 /*
721 * Free all lkb's on lkbtbl[] lists.
722 */
723
724 for (i = 0; i < ls->ls_lkbtbl_size; i++) {
725 head = &ls->ls_lkbtbl[i].list;
726 while (!list_empty(head)) {
727 lkb = list_entry(head->next, struct dlm_lkb,
728 lkb_idtbl_list);
729
730 list_del(&lkb->lkb_idtbl_list);
731
732 dlm_del_ast(lkb);
733
734 if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
735 dlm_free_lvb(lkb->lkb_lvbptr);
736
737 dlm_free_lkb(lkb);
738 }
739 }
740 dlm_astd_resume();
741
742 kfree(ls->ls_lkbtbl);
743
744 /*
745 * Free all rsb's on rsbtbl[] lists
746 */
747
748 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
749 head = &ls->ls_rsbtbl[i].list;
750 while (!list_empty(head)) {
751 rsb = list_entry(head->next, struct dlm_rsb,
752 res_hashchain);
753
754 list_del(&rsb->res_hashchain);
755 dlm_free_rsb(rsb);
756 }
757
758 head = &ls->ls_rsbtbl[i].toss;
759 while (!list_empty(head)) {
760 rsb = list_entry(head->next, struct dlm_rsb,
761 res_hashchain);
762 list_del(&rsb->res_hashchain);
763 dlm_free_rsb(rsb);
764 }
765 }
766
767 kfree(ls->ls_rsbtbl);
768
769 /*
770 * Free structures on any other lists
771 */
772
773 dlm_purge_requestqueue(ls);
774 kfree(ls->ls_recover_args);
775 dlm_clear_free_entries(ls);
776 dlm_clear_members(ls);
777 dlm_clear_members_gone(ls);
778 kfree(ls->ls_node_array);
779 log_debug(ls, "release_lockspace final free");
780 kobject_put(&ls->ls_kobj);
781 /* The ls structure will be freed when the kobject is done with */
782
783 module_put(THIS_MODULE);
784 return 0;
785 }
786
787 /*
788 * Called when a system has released all its locks and is not going to use the
789 * lockspace any longer. We free everything we're managing for this lockspace.
790 * Remaining nodes will go through the recovery process as if we'd died. The
791 * lockspace must continue to function as usual, participating in recoveries,
792 * until this returns.
793 *
794 * Force has 4 possible values:
795 * 0 - don't destroy locksapce if it has any LKBs
796 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
797 * 2 - destroy lockspace regardless of LKBs
798 * 3 - destroy lockspace as part of a forced shutdown
799 */
800
dlm_release_lockspace(void * lockspace,int force)801 int dlm_release_lockspace(void *lockspace, int force)
802 {
803 struct dlm_ls *ls;
804 int error;
805
806 ls = dlm_find_lockspace_local(lockspace);
807 if (!ls)
808 return -EINVAL;
809 dlm_put_lockspace(ls);
810
811 mutex_lock(&ls_lock);
812 error = release_lockspace(ls, force);
813 if (!error)
814 ls_count--;
815 if (!ls_count)
816 threads_stop();
817 mutex_unlock(&ls_lock);
818
819 return error;
820 }
821
dlm_stop_lockspaces(void)822 void dlm_stop_lockspaces(void)
823 {
824 struct dlm_ls *ls;
825
826 restart:
827 spin_lock(&lslist_lock);
828 list_for_each_entry(ls, &lslist, ls_list) {
829 if (!test_bit(LSFL_RUNNING, &ls->ls_flags))
830 continue;
831 spin_unlock(&lslist_lock);
832 log_error(ls, "no userland control daemon, stopping lockspace");
833 dlm_ls_stop(ls);
834 goto restart;
835 }
836 spin_unlock(&lslist_lock);
837 }
838
839