1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
6 ** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11
12 #include <linux/module.h>
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "lowcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27
28 static int ls_count;
29 static struct mutex ls_lock;
30 static struct list_head lslist;
31 static spinlock_t lslist_lock;
32 static struct task_struct * scand_task;
33
34
dlm_control_store(struct dlm_ls * ls,const char * buf,size_t len)35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37 ssize_t ret = len;
38 int n;
39 int rc = kstrtoint(buf, 0, &n);
40
41 if (rc)
42 return rc;
43 ls = dlm_find_lockspace_local(ls->ls_local_handle);
44 if (!ls)
45 return -EINVAL;
46
47 switch (n) {
48 case 0:
49 dlm_ls_stop(ls);
50 break;
51 case 1:
52 dlm_ls_start(ls);
53 break;
54 default:
55 ret = -EINVAL;
56 }
57 dlm_put_lockspace(ls);
58 return ret;
59 }
60
dlm_event_store(struct dlm_ls * ls,const char * buf,size_t len)61 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
62 {
63 int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
64
65 if (rc)
66 return rc;
67 set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
68 wake_up(&ls->ls_uevent_wait);
69 return len;
70 }
71
dlm_id_show(struct dlm_ls * ls,char * buf)72 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
73 {
74 return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
75 }
76
dlm_id_store(struct dlm_ls * ls,const char * buf,size_t len)77 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
78 {
79 int rc = kstrtouint(buf, 0, &ls->ls_global_id);
80
81 if (rc)
82 return rc;
83 return len;
84 }
85
dlm_nodir_show(struct dlm_ls * ls,char * buf)86 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
87 {
88 return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
89 }
90
dlm_nodir_store(struct dlm_ls * ls,const char * buf,size_t len)91 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
92 {
93 int val;
94 int rc = kstrtoint(buf, 0, &val);
95
96 if (rc)
97 return rc;
98 if (val == 1)
99 set_bit(LSFL_NODIR, &ls->ls_flags);
100 return len;
101 }
102
dlm_recover_status_show(struct dlm_ls * ls,char * buf)103 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
104 {
105 uint32_t status = dlm_recover_status(ls);
106 return snprintf(buf, PAGE_SIZE, "%x\n", status);
107 }
108
dlm_recover_nodeid_show(struct dlm_ls * ls,char * buf)109 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
110 {
111 return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
112 }
113
114 struct dlm_attr {
115 struct attribute attr;
116 ssize_t (*show)(struct dlm_ls *, char *);
117 ssize_t (*store)(struct dlm_ls *, const char *, size_t);
118 };
119
120 static struct dlm_attr dlm_attr_control = {
121 .attr = {.name = "control", .mode = S_IWUSR},
122 .store = dlm_control_store
123 };
124
125 static struct dlm_attr dlm_attr_event = {
126 .attr = {.name = "event_done", .mode = S_IWUSR},
127 .store = dlm_event_store
128 };
129
130 static struct dlm_attr dlm_attr_id = {
131 .attr = {.name = "id", .mode = S_IRUGO | S_IWUSR},
132 .show = dlm_id_show,
133 .store = dlm_id_store
134 };
135
136 static struct dlm_attr dlm_attr_nodir = {
137 .attr = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
138 .show = dlm_nodir_show,
139 .store = dlm_nodir_store
140 };
141
142 static struct dlm_attr dlm_attr_recover_status = {
143 .attr = {.name = "recover_status", .mode = S_IRUGO},
144 .show = dlm_recover_status_show
145 };
146
147 static struct dlm_attr dlm_attr_recover_nodeid = {
148 .attr = {.name = "recover_nodeid", .mode = S_IRUGO},
149 .show = dlm_recover_nodeid_show
150 };
151
152 static struct attribute *dlm_attrs[] = {
153 &dlm_attr_control.attr,
154 &dlm_attr_event.attr,
155 &dlm_attr_id.attr,
156 &dlm_attr_nodir.attr,
157 &dlm_attr_recover_status.attr,
158 &dlm_attr_recover_nodeid.attr,
159 NULL,
160 };
161 ATTRIBUTE_GROUPS(dlm);
162
dlm_attr_show(struct kobject * kobj,struct attribute * attr,char * buf)163 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
164 char *buf)
165 {
166 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
167 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
168 return a->show ? a->show(ls, buf) : 0;
169 }
170
dlm_attr_store(struct kobject * kobj,struct attribute * attr,const char * buf,size_t len)171 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
172 const char *buf, size_t len)
173 {
174 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
175 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
176 return a->store ? a->store(ls, buf, len) : len;
177 }
178
lockspace_kobj_release(struct kobject * k)179 static void lockspace_kobj_release(struct kobject *k)
180 {
181 struct dlm_ls *ls = container_of(k, struct dlm_ls, ls_kobj);
182 kfree(ls);
183 }
184
185 static const struct sysfs_ops dlm_attr_ops = {
186 .show = dlm_attr_show,
187 .store = dlm_attr_store,
188 };
189
190 static struct kobj_type dlm_ktype = {
191 .default_groups = dlm_groups,
192 .sysfs_ops = &dlm_attr_ops,
193 .release = lockspace_kobj_release,
194 };
195
196 static struct kset *dlm_kset;
197
do_uevent(struct dlm_ls * ls,int in)198 static int do_uevent(struct dlm_ls *ls, int in)
199 {
200 if (in)
201 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
202 else
203 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
204
205 log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
206
207 /* dlm_controld will see the uevent, do the necessary group management
208 and then write to sysfs to wake us */
209
210 wait_event(ls->ls_uevent_wait,
211 test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
212
213 log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
214
215 return ls->ls_uevent_result;
216 }
217
dlm_uevent(struct kset * kset,struct kobject * kobj,struct kobj_uevent_env * env)218 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
219 struct kobj_uevent_env *env)
220 {
221 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
222
223 add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
224 return 0;
225 }
226
227 static const struct kset_uevent_ops dlm_uevent_ops = {
228 .uevent = dlm_uevent,
229 };
230
dlm_lockspace_init(void)231 int __init dlm_lockspace_init(void)
232 {
233 ls_count = 0;
234 mutex_init(&ls_lock);
235 INIT_LIST_HEAD(&lslist);
236 spin_lock_init(&lslist_lock);
237
238 dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
239 if (!dlm_kset) {
240 printk(KERN_WARNING "%s: can not create kset\n", __func__);
241 return -ENOMEM;
242 }
243 return 0;
244 }
245
dlm_lockspace_exit(void)246 void dlm_lockspace_exit(void)
247 {
248 kset_unregister(dlm_kset);
249 }
250
find_ls_to_scan(void)251 static struct dlm_ls *find_ls_to_scan(void)
252 {
253 struct dlm_ls *ls;
254
255 spin_lock(&lslist_lock);
256 list_for_each_entry(ls, &lslist, ls_list) {
257 if (time_after_eq(jiffies, ls->ls_scan_time +
258 dlm_config.ci_scan_secs * HZ)) {
259 spin_unlock(&lslist_lock);
260 return ls;
261 }
262 }
263 spin_unlock(&lslist_lock);
264 return NULL;
265 }
266
dlm_scand(void * data)267 static int dlm_scand(void *data)
268 {
269 struct dlm_ls *ls;
270
271 while (!kthread_should_stop()) {
272 ls = find_ls_to_scan();
273 if (ls) {
274 if (dlm_lock_recovery_try(ls)) {
275 ls->ls_scan_time = jiffies;
276 dlm_scan_rsbs(ls);
277 dlm_scan_timeout(ls);
278 dlm_scan_waiters(ls);
279 dlm_unlock_recovery(ls);
280 } else {
281 ls->ls_scan_time += HZ;
282 }
283 continue;
284 }
285 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
286 }
287 return 0;
288 }
289
dlm_scand_start(void)290 static int dlm_scand_start(void)
291 {
292 struct task_struct *p;
293 int error = 0;
294
295 p = kthread_run(dlm_scand, NULL, "dlm_scand");
296 if (IS_ERR(p))
297 error = PTR_ERR(p);
298 else
299 scand_task = p;
300 return error;
301 }
302
dlm_scand_stop(void)303 static void dlm_scand_stop(void)
304 {
305 kthread_stop(scand_task);
306 }
307
dlm_find_lockspace_global(uint32_t id)308 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
309 {
310 struct dlm_ls *ls;
311
312 spin_lock(&lslist_lock);
313
314 list_for_each_entry(ls, &lslist, ls_list) {
315 if (ls->ls_global_id == id) {
316 ls->ls_count++;
317 goto out;
318 }
319 }
320 ls = NULL;
321 out:
322 spin_unlock(&lslist_lock);
323 return ls;
324 }
325
dlm_find_lockspace_local(dlm_lockspace_t * lockspace)326 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
327 {
328 struct dlm_ls *ls;
329
330 spin_lock(&lslist_lock);
331 list_for_each_entry(ls, &lslist, ls_list) {
332 if (ls->ls_local_handle == lockspace) {
333 ls->ls_count++;
334 goto out;
335 }
336 }
337 ls = NULL;
338 out:
339 spin_unlock(&lslist_lock);
340 return ls;
341 }
342
dlm_find_lockspace_device(int minor)343 struct dlm_ls *dlm_find_lockspace_device(int minor)
344 {
345 struct dlm_ls *ls;
346
347 spin_lock(&lslist_lock);
348 list_for_each_entry(ls, &lslist, ls_list) {
349 if (ls->ls_device.minor == minor) {
350 ls->ls_count++;
351 goto out;
352 }
353 }
354 ls = NULL;
355 out:
356 spin_unlock(&lslist_lock);
357 return ls;
358 }
359
dlm_put_lockspace(struct dlm_ls * ls)360 void dlm_put_lockspace(struct dlm_ls *ls)
361 {
362 spin_lock(&lslist_lock);
363 ls->ls_count--;
364 spin_unlock(&lslist_lock);
365 }
366
remove_lockspace(struct dlm_ls * ls)367 static void remove_lockspace(struct dlm_ls *ls)
368 {
369 for (;;) {
370 spin_lock(&lslist_lock);
371 if (ls->ls_count == 0) {
372 WARN_ON(ls->ls_create_count != 0);
373 list_del(&ls->ls_list);
374 spin_unlock(&lslist_lock);
375 return;
376 }
377 spin_unlock(&lslist_lock);
378 ssleep(1);
379 }
380 }
381
threads_start(void)382 static int threads_start(void)
383 {
384 int error;
385
386 error = dlm_scand_start();
387 if (error) {
388 log_print("cannot start dlm_scand thread %d", error);
389 goto fail;
390 }
391
392 /* Thread for sending/receiving messages for all lockspace's */
393 error = dlm_lowcomms_start();
394 if (error) {
395 log_print("cannot start dlm lowcomms %d", error);
396 goto scand_fail;
397 }
398
399 return 0;
400
401 scand_fail:
402 dlm_scand_stop();
403 fail:
404 return error;
405 }
406
threads_stop(void)407 static void threads_stop(void)
408 {
409 dlm_scand_stop();
410 dlm_lowcomms_stop();
411 }
412
new_lockspace(const char * name,const char * cluster,uint32_t flags,int lvblen,const struct dlm_lockspace_ops * ops,void * ops_arg,int * ops_result,dlm_lockspace_t ** lockspace)413 static int new_lockspace(const char *name, const char *cluster,
414 uint32_t flags, int lvblen,
415 const struct dlm_lockspace_ops *ops, void *ops_arg,
416 int *ops_result, dlm_lockspace_t **lockspace)
417 {
418 struct dlm_ls *ls;
419 int i, size, error;
420 int do_unreg = 0;
421 int namelen = strlen(name);
422
423 if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
424 return -EINVAL;
425
426 if (!lvblen || (lvblen % 8))
427 return -EINVAL;
428
429 if (!try_module_get(THIS_MODULE))
430 return -EINVAL;
431
432 if (!dlm_user_daemon_available()) {
433 log_print("dlm user daemon not available");
434 error = -EUNATCH;
435 goto out;
436 }
437
438 if (ops && ops_result) {
439 if (!dlm_config.ci_recover_callbacks)
440 *ops_result = -EOPNOTSUPP;
441 else
442 *ops_result = 0;
443 }
444
445 if (!cluster)
446 log_print("dlm cluster name '%s' is being used without an application provided cluster name",
447 dlm_config.ci_cluster_name);
448
449 if (dlm_config.ci_recover_callbacks && cluster &&
450 strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
451 log_print("dlm cluster name '%s' does not match "
452 "the application cluster name '%s'",
453 dlm_config.ci_cluster_name, cluster);
454 error = -EBADR;
455 goto out;
456 }
457
458 error = 0;
459
460 spin_lock(&lslist_lock);
461 list_for_each_entry(ls, &lslist, ls_list) {
462 WARN_ON(ls->ls_create_count <= 0);
463 if (ls->ls_namelen != namelen)
464 continue;
465 if (memcmp(ls->ls_name, name, namelen))
466 continue;
467 if (flags & DLM_LSFL_NEWEXCL) {
468 error = -EEXIST;
469 break;
470 }
471 ls->ls_create_count++;
472 *lockspace = ls;
473 error = 1;
474 break;
475 }
476 spin_unlock(&lslist_lock);
477
478 if (error)
479 goto out;
480
481 error = -ENOMEM;
482
483 ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
484 if (!ls)
485 goto out;
486 memcpy(ls->ls_name, name, namelen);
487 ls->ls_namelen = namelen;
488 ls->ls_lvblen = lvblen;
489 ls->ls_count = 0;
490 ls->ls_flags = 0;
491 ls->ls_scan_time = jiffies;
492
493 if (ops && dlm_config.ci_recover_callbacks) {
494 ls->ls_ops = ops;
495 ls->ls_ops_arg = ops_arg;
496 }
497
498 if (flags & DLM_LSFL_TIMEWARN)
499 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
500
501 /* ls_exflags are forced to match among nodes, and we don't
502 need to require all nodes to have some flags set */
503 ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
504 DLM_LSFL_NEWEXCL));
505
506 size = dlm_config.ci_rsbtbl_size;
507 ls->ls_rsbtbl_size = size;
508
509 ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
510 if (!ls->ls_rsbtbl)
511 goto out_lsfree;
512 for (i = 0; i < size; i++) {
513 ls->ls_rsbtbl[i].keep.rb_node = NULL;
514 ls->ls_rsbtbl[i].toss.rb_node = NULL;
515 spin_lock_init(&ls->ls_rsbtbl[i].lock);
516 }
517
518 spin_lock_init(&ls->ls_remove_spin);
519
520 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
521 ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
522 GFP_KERNEL);
523 if (!ls->ls_remove_names[i])
524 goto out_rsbtbl;
525 }
526
527 idr_init(&ls->ls_lkbidr);
528 spin_lock_init(&ls->ls_lkbidr_spin);
529
530 INIT_LIST_HEAD(&ls->ls_waiters);
531 mutex_init(&ls->ls_waiters_mutex);
532 INIT_LIST_HEAD(&ls->ls_orphans);
533 mutex_init(&ls->ls_orphans_mutex);
534 INIT_LIST_HEAD(&ls->ls_timeout);
535 mutex_init(&ls->ls_timeout_mutex);
536
537 INIT_LIST_HEAD(&ls->ls_new_rsb);
538 spin_lock_init(&ls->ls_new_rsb_spin);
539
540 INIT_LIST_HEAD(&ls->ls_nodes);
541 INIT_LIST_HEAD(&ls->ls_nodes_gone);
542 ls->ls_num_nodes = 0;
543 ls->ls_low_nodeid = 0;
544 ls->ls_total_weight = 0;
545 ls->ls_node_array = NULL;
546
547 memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
548 ls->ls_stub_rsb.res_ls = ls;
549
550 ls->ls_debug_rsb_dentry = NULL;
551 ls->ls_debug_waiters_dentry = NULL;
552
553 init_waitqueue_head(&ls->ls_uevent_wait);
554 ls->ls_uevent_result = 0;
555 init_completion(&ls->ls_members_done);
556 ls->ls_members_result = -1;
557
558 mutex_init(&ls->ls_cb_mutex);
559 INIT_LIST_HEAD(&ls->ls_cb_delay);
560
561 ls->ls_recoverd_task = NULL;
562 mutex_init(&ls->ls_recoverd_active);
563 spin_lock_init(&ls->ls_recover_lock);
564 spin_lock_init(&ls->ls_rcom_spin);
565 get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
566 ls->ls_recover_status = 0;
567 ls->ls_recover_seq = 0;
568 ls->ls_recover_args = NULL;
569 init_rwsem(&ls->ls_in_recovery);
570 init_rwsem(&ls->ls_recv_active);
571 INIT_LIST_HEAD(&ls->ls_requestqueue);
572 mutex_init(&ls->ls_requestqueue_mutex);
573 mutex_init(&ls->ls_clear_proc_locks);
574
575 ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
576 if (!ls->ls_recover_buf)
577 goto out_lkbidr;
578
579 ls->ls_slot = 0;
580 ls->ls_num_slots = 0;
581 ls->ls_slots_size = 0;
582 ls->ls_slots = NULL;
583
584 INIT_LIST_HEAD(&ls->ls_recover_list);
585 spin_lock_init(&ls->ls_recover_list_lock);
586 idr_init(&ls->ls_recover_idr);
587 spin_lock_init(&ls->ls_recover_idr_lock);
588 ls->ls_recover_list_count = 0;
589 ls->ls_local_handle = ls;
590 init_waitqueue_head(&ls->ls_wait_general);
591 INIT_LIST_HEAD(&ls->ls_root_list);
592 init_rwsem(&ls->ls_root_sem);
593
594 spin_lock(&lslist_lock);
595 ls->ls_create_count = 1;
596 list_add(&ls->ls_list, &lslist);
597 spin_unlock(&lslist_lock);
598
599 if (flags & DLM_LSFL_FS) {
600 error = dlm_callback_start(ls);
601 if (error) {
602 log_error(ls, "can't start dlm_callback %d", error);
603 goto out_delist;
604 }
605 }
606
607 init_waitqueue_head(&ls->ls_recover_lock_wait);
608
609 /*
610 * Once started, dlm_recoverd first looks for ls in lslist, then
611 * initializes ls_in_recovery as locked in "down" mode. We need
612 * to wait for the wakeup from dlm_recoverd because in_recovery
613 * has to start out in down mode.
614 */
615
616 error = dlm_recoverd_start(ls);
617 if (error) {
618 log_error(ls, "can't start dlm_recoverd %d", error);
619 goto out_callback;
620 }
621
622 wait_event(ls->ls_recover_lock_wait,
623 test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
624
625 /* let kobject handle freeing of ls if there's an error */
626 do_unreg = 1;
627
628 ls->ls_kobj.kset = dlm_kset;
629 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
630 "%s", ls->ls_name);
631 if (error)
632 goto out_recoverd;
633 kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
634
635 /* This uevent triggers dlm_controld in userspace to add us to the
636 group of nodes that are members of this lockspace (managed by the
637 cluster infrastructure.) Once it's done that, it tells us who the
638 current lockspace members are (via configfs) and then tells the
639 lockspace to start running (via sysfs) in dlm_ls_start(). */
640
641 error = do_uevent(ls, 1);
642 if (error)
643 goto out_recoverd;
644
645 wait_for_completion(&ls->ls_members_done);
646 error = ls->ls_members_result;
647 if (error)
648 goto out_members;
649
650 dlm_create_debug_file(ls);
651
652 log_rinfo(ls, "join complete");
653 *lockspace = ls;
654 return 0;
655
656 out_members:
657 do_uevent(ls, 0);
658 dlm_clear_members(ls);
659 kfree(ls->ls_node_array);
660 out_recoverd:
661 dlm_recoverd_stop(ls);
662 out_callback:
663 dlm_callback_stop(ls);
664 out_delist:
665 spin_lock(&lslist_lock);
666 list_del(&ls->ls_list);
667 spin_unlock(&lslist_lock);
668 idr_destroy(&ls->ls_recover_idr);
669 kfree(ls->ls_recover_buf);
670 out_lkbidr:
671 idr_destroy(&ls->ls_lkbidr);
672 out_rsbtbl:
673 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
674 kfree(ls->ls_remove_names[i]);
675 vfree(ls->ls_rsbtbl);
676 out_lsfree:
677 if (do_unreg)
678 kobject_put(&ls->ls_kobj);
679 else
680 kfree(ls);
681 out:
682 module_put(THIS_MODULE);
683 return error;
684 }
685
dlm_new_lockspace(const char * name,const char * cluster,uint32_t flags,int lvblen,const struct dlm_lockspace_ops * ops,void * ops_arg,int * ops_result,dlm_lockspace_t ** lockspace)686 int dlm_new_lockspace(const char *name, const char *cluster,
687 uint32_t flags, int lvblen,
688 const struct dlm_lockspace_ops *ops, void *ops_arg,
689 int *ops_result, dlm_lockspace_t **lockspace)
690 {
691 int error = 0;
692
693 mutex_lock(&ls_lock);
694 if (!ls_count)
695 error = threads_start();
696 if (error)
697 goto out;
698
699 error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
700 ops_result, lockspace);
701 if (!error)
702 ls_count++;
703 if (error > 0)
704 error = 0;
705 if (!ls_count)
706 threads_stop();
707 out:
708 mutex_unlock(&ls_lock);
709 return error;
710 }
711
lkb_idr_is_local(int id,void * p,void * data)712 static int lkb_idr_is_local(int id, void *p, void *data)
713 {
714 struct dlm_lkb *lkb = p;
715
716 return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
717 }
718
lkb_idr_is_any(int id,void * p,void * data)719 static int lkb_idr_is_any(int id, void *p, void *data)
720 {
721 return 1;
722 }
723
lkb_idr_free(int id,void * p,void * data)724 static int lkb_idr_free(int id, void *p, void *data)
725 {
726 struct dlm_lkb *lkb = p;
727
728 if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
729 dlm_free_lvb(lkb->lkb_lvbptr);
730
731 dlm_free_lkb(lkb);
732 return 0;
733 }
734
735 /* NOTE: We check the lkbidr here rather than the resource table.
736 This is because there may be LKBs queued as ASTs that have been unlinked
737 from their RSBs and are pending deletion once the AST has been delivered */
738
lockspace_busy(struct dlm_ls * ls,int force)739 static int lockspace_busy(struct dlm_ls *ls, int force)
740 {
741 int rv;
742
743 spin_lock(&ls->ls_lkbidr_spin);
744 if (force == 0) {
745 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
746 } else if (force == 1) {
747 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
748 } else {
749 rv = 0;
750 }
751 spin_unlock(&ls->ls_lkbidr_spin);
752 return rv;
753 }
754
release_lockspace(struct dlm_ls * ls,int force)755 static int release_lockspace(struct dlm_ls *ls, int force)
756 {
757 struct dlm_rsb *rsb;
758 struct rb_node *n;
759 int i, busy, rv;
760
761 busy = lockspace_busy(ls, force);
762
763 spin_lock(&lslist_lock);
764 if (ls->ls_create_count == 1) {
765 if (busy) {
766 rv = -EBUSY;
767 } else {
768 /* remove_lockspace takes ls off lslist */
769 ls->ls_create_count = 0;
770 rv = 0;
771 }
772 } else if (ls->ls_create_count > 1) {
773 rv = --ls->ls_create_count;
774 } else {
775 rv = -EINVAL;
776 }
777 spin_unlock(&lslist_lock);
778
779 if (rv) {
780 log_debug(ls, "release_lockspace no remove %d", rv);
781 return rv;
782 }
783
784 dlm_device_deregister(ls);
785
786 if (force < 3 && dlm_user_daemon_available())
787 do_uevent(ls, 0);
788
789 dlm_recoverd_stop(ls);
790
791 dlm_callback_stop(ls);
792
793 remove_lockspace(ls);
794
795 dlm_delete_debug_file(ls);
796
797 idr_destroy(&ls->ls_recover_idr);
798 kfree(ls->ls_recover_buf);
799
800 /*
801 * Free all lkb's in idr
802 */
803
804 idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
805 idr_destroy(&ls->ls_lkbidr);
806
807 /*
808 * Free all rsb's on rsbtbl[] lists
809 */
810
811 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
812 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
813 rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
814 rb_erase(n, &ls->ls_rsbtbl[i].keep);
815 dlm_free_rsb(rsb);
816 }
817
818 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
819 rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
820 rb_erase(n, &ls->ls_rsbtbl[i].toss);
821 dlm_free_rsb(rsb);
822 }
823 }
824
825 vfree(ls->ls_rsbtbl);
826
827 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
828 kfree(ls->ls_remove_names[i]);
829
830 while (!list_empty(&ls->ls_new_rsb)) {
831 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
832 res_hashchain);
833 list_del(&rsb->res_hashchain);
834 dlm_free_rsb(rsb);
835 }
836
837 /*
838 * Free structures on any other lists
839 */
840
841 dlm_purge_requestqueue(ls);
842 kfree(ls->ls_recover_args);
843 dlm_clear_members(ls);
844 dlm_clear_members_gone(ls);
845 kfree(ls->ls_node_array);
846 log_rinfo(ls, "release_lockspace final free");
847 kobject_put(&ls->ls_kobj);
848 /* The ls structure will be freed when the kobject is done with */
849
850 module_put(THIS_MODULE);
851 return 0;
852 }
853
854 /*
855 * Called when a system has released all its locks and is not going to use the
856 * lockspace any longer. We free everything we're managing for this lockspace.
857 * Remaining nodes will go through the recovery process as if we'd died. The
858 * lockspace must continue to function as usual, participating in recoveries,
859 * until this returns.
860 *
861 * Force has 4 possible values:
862 * 0 - don't destroy locksapce if it has any LKBs
863 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
864 * 2 - destroy lockspace regardless of LKBs
865 * 3 - destroy lockspace as part of a forced shutdown
866 */
867
dlm_release_lockspace(void * lockspace,int force)868 int dlm_release_lockspace(void *lockspace, int force)
869 {
870 struct dlm_ls *ls;
871 int error;
872
873 ls = dlm_find_lockspace_local(lockspace);
874 if (!ls)
875 return -EINVAL;
876 dlm_put_lockspace(ls);
877
878 mutex_lock(&ls_lock);
879 error = release_lockspace(ls, force);
880 if (!error)
881 ls_count--;
882 if (!ls_count)
883 threads_stop();
884 mutex_unlock(&ls_lock);
885
886 return error;
887 }
888
dlm_stop_lockspaces(void)889 void dlm_stop_lockspaces(void)
890 {
891 struct dlm_ls *ls;
892 int count;
893
894 restart:
895 count = 0;
896 spin_lock(&lslist_lock);
897 list_for_each_entry(ls, &lslist, ls_list) {
898 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
899 count++;
900 continue;
901 }
902 spin_unlock(&lslist_lock);
903 log_error(ls, "no userland control daemon, stopping lockspace");
904 dlm_ls_stop(ls);
905 goto restart;
906 }
907 spin_unlock(&lslist_lock);
908
909 if (count)
910 log_print("dlm user daemon left %d lockspaces", count);
911 }
912
913