• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11 
12 #include <linux/module.h>
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "midcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27 
28 static int			ls_count;
29 static struct mutex		ls_lock;
30 static struct list_head		lslist;
31 static spinlock_t		lslist_lock;
32 static struct task_struct *	scand_task;
33 
34 
dlm_control_store(struct dlm_ls * ls,const char * buf,size_t len)35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37 	ssize_t ret = len;
38 	int n;
39 	int rc = kstrtoint(buf, 0, &n);
40 
41 	if (rc)
42 		return rc;
43 	ls = dlm_find_lockspace_local(ls->ls_local_handle);
44 	if (!ls)
45 		return -EINVAL;
46 
47 	switch (n) {
48 	case 0:
49 		dlm_ls_stop(ls);
50 		break;
51 	case 1:
52 		dlm_ls_start(ls);
53 		break;
54 	default:
55 		ret = -EINVAL;
56 	}
57 	dlm_put_lockspace(ls);
58 	return ret;
59 }
60 
dlm_event_store(struct dlm_ls * ls,const char * buf,size_t len)61 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
62 {
63 	int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
64 
65 	if (rc)
66 		return rc;
67 	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
68 	wake_up(&ls->ls_uevent_wait);
69 	return len;
70 }
71 
dlm_id_show(struct dlm_ls * ls,char * buf)72 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
73 {
74 	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
75 }
76 
dlm_id_store(struct dlm_ls * ls,const char * buf,size_t len)77 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
78 {
79 	int rc = kstrtouint(buf, 0, &ls->ls_global_id);
80 
81 	if (rc)
82 		return rc;
83 	return len;
84 }
85 
dlm_nodir_show(struct dlm_ls * ls,char * buf)86 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
87 {
88 	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
89 }
90 
dlm_nodir_store(struct dlm_ls * ls,const char * buf,size_t len)91 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
92 {
93 	int val;
94 	int rc = kstrtoint(buf, 0, &val);
95 
96 	if (rc)
97 		return rc;
98 	if (val == 1)
99 		set_bit(LSFL_NODIR, &ls->ls_flags);
100 	return len;
101 }
102 
dlm_recover_status_show(struct dlm_ls * ls,char * buf)103 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
104 {
105 	uint32_t status = dlm_recover_status(ls);
106 	return snprintf(buf, PAGE_SIZE, "%x\n", status);
107 }
108 
dlm_recover_nodeid_show(struct dlm_ls * ls,char * buf)109 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
110 {
111 	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
112 }
113 
114 struct dlm_attr {
115 	struct attribute attr;
116 	ssize_t (*show)(struct dlm_ls *, char *);
117 	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
118 };
119 
120 static struct dlm_attr dlm_attr_control = {
121 	.attr  = {.name = "control", .mode = S_IWUSR},
122 	.store = dlm_control_store
123 };
124 
125 static struct dlm_attr dlm_attr_event = {
126 	.attr  = {.name = "event_done", .mode = S_IWUSR},
127 	.store = dlm_event_store
128 };
129 
130 static struct dlm_attr dlm_attr_id = {
131 	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
132 	.show  = dlm_id_show,
133 	.store = dlm_id_store
134 };
135 
136 static struct dlm_attr dlm_attr_nodir = {
137 	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
138 	.show  = dlm_nodir_show,
139 	.store = dlm_nodir_store
140 };
141 
142 static struct dlm_attr dlm_attr_recover_status = {
143 	.attr  = {.name = "recover_status", .mode = S_IRUGO},
144 	.show  = dlm_recover_status_show
145 };
146 
147 static struct dlm_attr dlm_attr_recover_nodeid = {
148 	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
149 	.show  = dlm_recover_nodeid_show
150 };
151 
152 static struct attribute *dlm_attrs[] = {
153 	&dlm_attr_control.attr,
154 	&dlm_attr_event.attr,
155 	&dlm_attr_id.attr,
156 	&dlm_attr_nodir.attr,
157 	&dlm_attr_recover_status.attr,
158 	&dlm_attr_recover_nodeid.attr,
159 	NULL,
160 };
161 ATTRIBUTE_GROUPS(dlm);
162 
dlm_attr_show(struct kobject * kobj,struct attribute * attr,char * buf)163 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
164 			     char *buf)
165 {
166 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
167 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
168 	return a->show ? a->show(ls, buf) : 0;
169 }
170 
dlm_attr_store(struct kobject * kobj,struct attribute * attr,const char * buf,size_t len)171 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
172 			      const char *buf, size_t len)
173 {
174 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
175 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
176 	return a->store ? a->store(ls, buf, len) : len;
177 }
178 
lockspace_kobj_release(struct kobject * k)179 static void lockspace_kobj_release(struct kobject *k)
180 {
181 	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
182 	kfree(ls);
183 }
184 
185 static const struct sysfs_ops dlm_attr_ops = {
186 	.show  = dlm_attr_show,
187 	.store = dlm_attr_store,
188 };
189 
190 static struct kobj_type dlm_ktype = {
191 	.default_groups = dlm_groups,
192 	.sysfs_ops     = &dlm_attr_ops,
193 	.release       = lockspace_kobj_release,
194 };
195 
196 static struct kset *dlm_kset;
197 
do_uevent(struct dlm_ls * ls,int in)198 static int do_uevent(struct dlm_ls *ls, int in)
199 {
200 	if (in)
201 		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
202 	else
203 		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
204 
205 	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
206 
207 	/* dlm_controld will see the uevent, do the necessary group management
208 	   and then write to sysfs to wake us */
209 
210 	wait_event(ls->ls_uevent_wait,
211 		   test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
212 
213 	log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
214 
215 	return ls->ls_uevent_result;
216 }
217 
dlm_uevent(struct kobject * kobj,struct kobj_uevent_env * env)218 static int dlm_uevent(struct kobject *kobj, struct kobj_uevent_env *env)
219 {
220 	struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
221 
222 	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
223 	return 0;
224 }
225 
226 static const struct kset_uevent_ops dlm_uevent_ops = {
227 	.uevent = dlm_uevent,
228 };
229 
dlm_lockspace_init(void)230 int __init dlm_lockspace_init(void)
231 {
232 	ls_count = 0;
233 	mutex_init(&ls_lock);
234 	INIT_LIST_HEAD(&lslist);
235 	spin_lock_init(&lslist_lock);
236 
237 	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
238 	if (!dlm_kset) {
239 		printk(KERN_WARNING "%s: can not create kset\n", __func__);
240 		return -ENOMEM;
241 	}
242 	return 0;
243 }
244 
dlm_lockspace_exit(void)245 void dlm_lockspace_exit(void)
246 {
247 	kset_unregister(dlm_kset);
248 }
249 
find_ls_to_scan(void)250 static struct dlm_ls *find_ls_to_scan(void)
251 {
252 	struct dlm_ls *ls;
253 
254 	spin_lock(&lslist_lock);
255 	list_for_each_entry(ls, &lslist, ls_list) {
256 		if (time_after_eq(jiffies, ls->ls_scan_time +
257 					    dlm_config.ci_scan_secs * HZ)) {
258 			spin_unlock(&lslist_lock);
259 			return ls;
260 		}
261 	}
262 	spin_unlock(&lslist_lock);
263 	return NULL;
264 }
265 
dlm_scand(void * data)266 static int dlm_scand(void *data)
267 {
268 	struct dlm_ls *ls;
269 
270 	while (!kthread_should_stop()) {
271 		ls = find_ls_to_scan();
272 		if (ls) {
273 			if (dlm_lock_recovery_try(ls)) {
274 				ls->ls_scan_time = jiffies;
275 				dlm_scan_rsbs(ls);
276 				dlm_scan_timeout(ls);
277 				dlm_unlock_recovery(ls);
278 			} else {
279 				ls->ls_scan_time += HZ;
280 			}
281 			continue;
282 		}
283 		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
284 	}
285 	return 0;
286 }
287 
dlm_scand_start(void)288 static int dlm_scand_start(void)
289 {
290 	struct task_struct *p;
291 	int error = 0;
292 
293 	p = kthread_run(dlm_scand, NULL, "dlm_scand");
294 	if (IS_ERR(p))
295 		error = PTR_ERR(p);
296 	else
297 		scand_task = p;
298 	return error;
299 }
300 
dlm_scand_stop(void)301 static void dlm_scand_stop(void)
302 {
303 	kthread_stop(scand_task);
304 }
305 
dlm_find_lockspace_global(uint32_t id)306 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
307 {
308 	struct dlm_ls *ls;
309 
310 	spin_lock(&lslist_lock);
311 
312 	list_for_each_entry(ls, &lslist, ls_list) {
313 		if (ls->ls_global_id == id) {
314 			atomic_inc(&ls->ls_count);
315 			goto out;
316 		}
317 	}
318 	ls = NULL;
319  out:
320 	spin_unlock(&lslist_lock);
321 	return ls;
322 }
323 
dlm_find_lockspace_local(dlm_lockspace_t * lockspace)324 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
325 {
326 	struct dlm_ls *ls;
327 
328 	spin_lock(&lslist_lock);
329 	list_for_each_entry(ls, &lslist, ls_list) {
330 		if (ls->ls_local_handle == lockspace) {
331 			atomic_inc(&ls->ls_count);
332 			goto out;
333 		}
334 	}
335 	ls = NULL;
336  out:
337 	spin_unlock(&lslist_lock);
338 	return ls;
339 }
340 
dlm_find_lockspace_device(int minor)341 struct dlm_ls *dlm_find_lockspace_device(int minor)
342 {
343 	struct dlm_ls *ls;
344 
345 	spin_lock(&lslist_lock);
346 	list_for_each_entry(ls, &lslist, ls_list) {
347 		if (ls->ls_device.minor == minor) {
348 			atomic_inc(&ls->ls_count);
349 			goto out;
350 		}
351 	}
352 	ls = NULL;
353  out:
354 	spin_unlock(&lslist_lock);
355 	return ls;
356 }
357 
dlm_put_lockspace(struct dlm_ls * ls)358 void dlm_put_lockspace(struct dlm_ls *ls)
359 {
360 	if (atomic_dec_and_test(&ls->ls_count))
361 		wake_up(&ls->ls_count_wait);
362 }
363 
remove_lockspace(struct dlm_ls * ls)364 static void remove_lockspace(struct dlm_ls *ls)
365 {
366 retry:
367 	wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
368 
369 	spin_lock(&lslist_lock);
370 	if (atomic_read(&ls->ls_count) != 0) {
371 		spin_unlock(&lslist_lock);
372 		goto retry;
373 	}
374 
375 	WARN_ON(ls->ls_create_count != 0);
376 	list_del(&ls->ls_list);
377 	spin_unlock(&lslist_lock);
378 }
379 
threads_start(void)380 static int threads_start(void)
381 {
382 	int error;
383 
384 	/* Thread for sending/receiving messages for all lockspace's */
385 	error = dlm_midcomms_start();
386 	if (error) {
387 		log_print("cannot start dlm midcomms %d", error);
388 		goto fail;
389 	}
390 
391 	error = dlm_scand_start();
392 	if (error) {
393 		log_print("cannot start dlm_scand thread %d", error);
394 		goto midcomms_fail;
395 	}
396 
397 	return 0;
398 
399  midcomms_fail:
400 	dlm_midcomms_stop();
401  fail:
402 	return error;
403 }
404 
new_lockspace(const char * name,const char * cluster,uint32_t flags,int lvblen,const struct dlm_lockspace_ops * ops,void * ops_arg,int * ops_result,dlm_lockspace_t ** lockspace)405 static int new_lockspace(const char *name, const char *cluster,
406 			 uint32_t flags, int lvblen,
407 			 const struct dlm_lockspace_ops *ops, void *ops_arg,
408 			 int *ops_result, dlm_lockspace_t **lockspace)
409 {
410 	struct dlm_ls *ls;
411 	int i, size, error;
412 	int do_unreg = 0;
413 	int namelen = strlen(name);
414 
415 	if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
416 		return -EINVAL;
417 
418 	if (lvblen % 8)
419 		return -EINVAL;
420 
421 	if (!try_module_get(THIS_MODULE))
422 		return -EINVAL;
423 
424 	if (!dlm_user_daemon_available()) {
425 		log_print("dlm user daemon not available");
426 		error = -EUNATCH;
427 		goto out;
428 	}
429 
430 	if (ops && ops_result) {
431 	       	if (!dlm_config.ci_recover_callbacks)
432 			*ops_result = -EOPNOTSUPP;
433 		else
434 			*ops_result = 0;
435 	}
436 
437 	if (!cluster)
438 		log_print("dlm cluster name '%s' is being used without an application provided cluster name",
439 			  dlm_config.ci_cluster_name);
440 
441 	if (dlm_config.ci_recover_callbacks && cluster &&
442 	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
443 		log_print("dlm cluster name '%s' does not match "
444 			  "the application cluster name '%s'",
445 			  dlm_config.ci_cluster_name, cluster);
446 		error = -EBADR;
447 		goto out;
448 	}
449 
450 	error = 0;
451 
452 	spin_lock(&lslist_lock);
453 	list_for_each_entry(ls, &lslist, ls_list) {
454 		WARN_ON(ls->ls_create_count <= 0);
455 		if (ls->ls_namelen != namelen)
456 			continue;
457 		if (memcmp(ls->ls_name, name, namelen))
458 			continue;
459 		if (flags & DLM_LSFL_NEWEXCL) {
460 			error = -EEXIST;
461 			break;
462 		}
463 		ls->ls_create_count++;
464 		*lockspace = ls;
465 		error = 1;
466 		break;
467 	}
468 	spin_unlock(&lslist_lock);
469 
470 	if (error)
471 		goto out;
472 
473 	error = -ENOMEM;
474 
475 	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
476 	if (!ls)
477 		goto out;
478 	memcpy(ls->ls_name, name, namelen);
479 	ls->ls_namelen = namelen;
480 	ls->ls_lvblen = lvblen;
481 	atomic_set(&ls->ls_count, 0);
482 	init_waitqueue_head(&ls->ls_count_wait);
483 	ls->ls_flags = 0;
484 	ls->ls_scan_time = jiffies;
485 
486 	if (ops && dlm_config.ci_recover_callbacks) {
487 		ls->ls_ops = ops;
488 		ls->ls_ops_arg = ops_arg;
489 	}
490 
491 #ifdef CONFIG_DLM_DEPRECATED_API
492 	if (flags & DLM_LSFL_TIMEWARN) {
493 		pr_warn_once("===============================================================\n"
494 			     "WARNING: the dlm DLM_LSFL_TIMEWARN flag is being deprecated and\n"
495 			     "         will be removed in v6.2!\n"
496 			     "         Inclusive DLM_LSFL_TIMEWARN define in UAPI header!\n"
497 			     "===============================================================\n");
498 
499 		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
500 	}
501 
502 	/* ls_exflags are forced to match among nodes, and we don't
503 	 * need to require all nodes to have some flags set
504 	 */
505 	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
506 				    DLM_LSFL_NEWEXCL));
507 #else
508 	/* ls_exflags are forced to match among nodes, and we don't
509 	 * need to require all nodes to have some flags set
510 	 */
511 	ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL));
512 #endif
513 
514 	size = READ_ONCE(dlm_config.ci_rsbtbl_size);
515 	ls->ls_rsbtbl_size = size;
516 
517 	ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
518 	if (!ls->ls_rsbtbl)
519 		goto out_lsfree;
520 	for (i = 0; i < size; i++) {
521 		ls->ls_rsbtbl[i].keep.rb_node = NULL;
522 		ls->ls_rsbtbl[i].toss.rb_node = NULL;
523 		spin_lock_init(&ls->ls_rsbtbl[i].lock);
524 	}
525 
526 	spin_lock_init(&ls->ls_remove_spin);
527 	init_waitqueue_head(&ls->ls_remove_wait);
528 
529 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
530 		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
531 						 GFP_KERNEL);
532 		if (!ls->ls_remove_names[i])
533 			goto out_rsbtbl;
534 	}
535 
536 	idr_init(&ls->ls_lkbidr);
537 	spin_lock_init(&ls->ls_lkbidr_spin);
538 
539 	INIT_LIST_HEAD(&ls->ls_waiters);
540 	mutex_init(&ls->ls_waiters_mutex);
541 	INIT_LIST_HEAD(&ls->ls_orphans);
542 	mutex_init(&ls->ls_orphans_mutex);
543 #ifdef CONFIG_DLM_DEPRECATED_API
544 	INIT_LIST_HEAD(&ls->ls_timeout);
545 	mutex_init(&ls->ls_timeout_mutex);
546 #endif
547 
548 	INIT_LIST_HEAD(&ls->ls_new_rsb);
549 	spin_lock_init(&ls->ls_new_rsb_spin);
550 
551 	INIT_LIST_HEAD(&ls->ls_nodes);
552 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
553 	ls->ls_num_nodes = 0;
554 	ls->ls_low_nodeid = 0;
555 	ls->ls_total_weight = 0;
556 	ls->ls_node_array = NULL;
557 
558 	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
559 	ls->ls_stub_rsb.res_ls = ls;
560 
561 	ls->ls_debug_rsb_dentry = NULL;
562 	ls->ls_debug_waiters_dentry = NULL;
563 
564 	init_waitqueue_head(&ls->ls_uevent_wait);
565 	ls->ls_uevent_result = 0;
566 	init_completion(&ls->ls_recovery_done);
567 	ls->ls_recovery_result = -1;
568 
569 	mutex_init(&ls->ls_cb_mutex);
570 	INIT_LIST_HEAD(&ls->ls_cb_delay);
571 
572 	ls->ls_recoverd_task = NULL;
573 	mutex_init(&ls->ls_recoverd_active);
574 	spin_lock_init(&ls->ls_recover_lock);
575 	spin_lock_init(&ls->ls_rcom_spin);
576 	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
577 	ls->ls_recover_status = 0;
578 	ls->ls_recover_seq = 0;
579 	ls->ls_recover_args = NULL;
580 	init_rwsem(&ls->ls_in_recovery);
581 	init_rwsem(&ls->ls_recv_active);
582 	INIT_LIST_HEAD(&ls->ls_requestqueue);
583 	atomic_set(&ls->ls_requestqueue_cnt, 0);
584 	init_waitqueue_head(&ls->ls_requestqueue_wait);
585 	mutex_init(&ls->ls_requestqueue_mutex);
586 	spin_lock_init(&ls->ls_clear_proc_locks);
587 
588 	/* Due backwards compatibility with 3.1 we need to use maximum
589 	 * possible dlm message size to be sure the message will fit and
590 	 * not having out of bounds issues. However on sending side 3.2
591 	 * might send less.
592 	 */
593 	ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
594 	if (!ls->ls_recover_buf)
595 		goto out_lkbidr;
596 
597 	ls->ls_slot = 0;
598 	ls->ls_num_slots = 0;
599 	ls->ls_slots_size = 0;
600 	ls->ls_slots = NULL;
601 
602 	INIT_LIST_HEAD(&ls->ls_recover_list);
603 	spin_lock_init(&ls->ls_recover_list_lock);
604 	idr_init(&ls->ls_recover_idr);
605 	spin_lock_init(&ls->ls_recover_idr_lock);
606 	ls->ls_recover_list_count = 0;
607 	ls->ls_local_handle = ls;
608 	init_waitqueue_head(&ls->ls_wait_general);
609 	INIT_LIST_HEAD(&ls->ls_root_list);
610 	init_rwsem(&ls->ls_root_sem);
611 
612 	spin_lock(&lslist_lock);
613 	ls->ls_create_count = 1;
614 	list_add(&ls->ls_list, &lslist);
615 	spin_unlock(&lslist_lock);
616 
617 	if (flags & DLM_LSFL_FS) {
618 		error = dlm_callback_start(ls);
619 		if (error) {
620 			log_error(ls, "can't start dlm_callback %d", error);
621 			goto out_delist;
622 		}
623 	}
624 
625 	init_waitqueue_head(&ls->ls_recover_lock_wait);
626 
627 	/*
628 	 * Once started, dlm_recoverd first looks for ls in lslist, then
629 	 * initializes ls_in_recovery as locked in "down" mode.  We need
630 	 * to wait for the wakeup from dlm_recoverd because in_recovery
631 	 * has to start out in down mode.
632 	 */
633 
634 	error = dlm_recoverd_start(ls);
635 	if (error) {
636 		log_error(ls, "can't start dlm_recoverd %d", error);
637 		goto out_callback;
638 	}
639 
640 	wait_event(ls->ls_recover_lock_wait,
641 		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
642 
643 	/* let kobject handle freeing of ls if there's an error */
644 	do_unreg = 1;
645 
646 	ls->ls_kobj.kset = dlm_kset;
647 	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
648 				     "%s", ls->ls_name);
649 	if (error)
650 		goto out_recoverd;
651 	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
652 
653 	/* This uevent triggers dlm_controld in userspace to add us to the
654 	   group of nodes that are members of this lockspace (managed by the
655 	   cluster infrastructure.)  Once it's done that, it tells us who the
656 	   current lockspace members are (via configfs) and then tells the
657 	   lockspace to start running (via sysfs) in dlm_ls_start(). */
658 
659 	error = do_uevent(ls, 1);
660 	if (error)
661 		goto out_recoverd;
662 
663 	/* wait until recovery is successful or failed */
664 	wait_for_completion(&ls->ls_recovery_done);
665 	error = ls->ls_recovery_result;
666 	if (error)
667 		goto out_members;
668 
669 	dlm_create_debug_file(ls);
670 
671 	log_rinfo(ls, "join complete");
672 	*lockspace = ls;
673 	return 0;
674 
675  out_members:
676 	do_uevent(ls, 0);
677 	dlm_clear_members(ls);
678 	kfree(ls->ls_node_array);
679  out_recoverd:
680 	dlm_recoverd_stop(ls);
681  out_callback:
682 	dlm_callback_stop(ls);
683  out_delist:
684 	spin_lock(&lslist_lock);
685 	list_del(&ls->ls_list);
686 	spin_unlock(&lslist_lock);
687 	idr_destroy(&ls->ls_recover_idr);
688 	kfree(ls->ls_recover_buf);
689  out_lkbidr:
690 	idr_destroy(&ls->ls_lkbidr);
691  out_rsbtbl:
692 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
693 		kfree(ls->ls_remove_names[i]);
694 	vfree(ls->ls_rsbtbl);
695  out_lsfree:
696 	if (do_unreg)
697 		kobject_put(&ls->ls_kobj);
698 	else
699 		kfree(ls);
700  out:
701 	module_put(THIS_MODULE);
702 	return error;
703 }
704 
__dlm_new_lockspace(const char * name,const char * cluster,uint32_t flags,int lvblen,const struct dlm_lockspace_ops * ops,void * ops_arg,int * ops_result,dlm_lockspace_t ** lockspace)705 static int __dlm_new_lockspace(const char *name, const char *cluster,
706 			       uint32_t flags, int lvblen,
707 			       const struct dlm_lockspace_ops *ops,
708 			       void *ops_arg, int *ops_result,
709 			       dlm_lockspace_t **lockspace)
710 {
711 	int error = 0;
712 
713 	mutex_lock(&ls_lock);
714 	if (!ls_count)
715 		error = threads_start();
716 	if (error)
717 		goto out;
718 
719 	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
720 			      ops_result, lockspace);
721 	if (!error)
722 		ls_count++;
723 	if (error > 0)
724 		error = 0;
725 	if (!ls_count) {
726 		dlm_scand_stop();
727 		dlm_midcomms_shutdown();
728 		dlm_midcomms_stop();
729 	}
730  out:
731 	mutex_unlock(&ls_lock);
732 	return error;
733 }
734 
dlm_new_lockspace(const char * name,const char * cluster,uint32_t flags,int lvblen,const struct dlm_lockspace_ops * ops,void * ops_arg,int * ops_result,dlm_lockspace_t ** lockspace)735 int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags,
736 		      int lvblen, const struct dlm_lockspace_ops *ops,
737 		      void *ops_arg, int *ops_result,
738 		      dlm_lockspace_t **lockspace)
739 {
740 	return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen,
741 				   ops, ops_arg, ops_result, lockspace);
742 }
743 
dlm_new_user_lockspace(const char * name,const char * cluster,uint32_t flags,int lvblen,const struct dlm_lockspace_ops * ops,void * ops_arg,int * ops_result,dlm_lockspace_t ** lockspace)744 int dlm_new_user_lockspace(const char *name, const char *cluster,
745 			   uint32_t flags, int lvblen,
746 			   const struct dlm_lockspace_ops *ops,
747 			   void *ops_arg, int *ops_result,
748 			   dlm_lockspace_t **lockspace)
749 {
750 	return __dlm_new_lockspace(name, cluster, flags, lvblen, ops,
751 				   ops_arg, ops_result, lockspace);
752 }
753 
lkb_idr_is_local(int id,void * p,void * data)754 static int lkb_idr_is_local(int id, void *p, void *data)
755 {
756 	struct dlm_lkb *lkb = p;
757 
758 	return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
759 }
760 
lkb_idr_is_any(int id,void * p,void * data)761 static int lkb_idr_is_any(int id, void *p, void *data)
762 {
763 	return 1;
764 }
765 
lkb_idr_free(int id,void * p,void * data)766 static int lkb_idr_free(int id, void *p, void *data)
767 {
768 	struct dlm_lkb *lkb = p;
769 
770 	if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
771 		dlm_free_lvb(lkb->lkb_lvbptr);
772 
773 	dlm_free_lkb(lkb);
774 	return 0;
775 }
776 
777 /* NOTE: We check the lkbidr here rather than the resource table.
778    This is because there may be LKBs queued as ASTs that have been unlinked
779    from their RSBs and are pending deletion once the AST has been delivered */
780 
lockspace_busy(struct dlm_ls * ls,int force)781 static int lockspace_busy(struct dlm_ls *ls, int force)
782 {
783 	int rv;
784 
785 	spin_lock(&ls->ls_lkbidr_spin);
786 	if (force == 0) {
787 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
788 	} else if (force == 1) {
789 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
790 	} else {
791 		rv = 0;
792 	}
793 	spin_unlock(&ls->ls_lkbidr_spin);
794 	return rv;
795 }
796 
release_lockspace(struct dlm_ls * ls,int force)797 static int release_lockspace(struct dlm_ls *ls, int force)
798 {
799 	struct dlm_rsb *rsb;
800 	struct rb_node *n;
801 	int i, busy, rv;
802 
803 	busy = lockspace_busy(ls, force);
804 
805 	spin_lock(&lslist_lock);
806 	if (ls->ls_create_count == 1) {
807 		if (busy) {
808 			rv = -EBUSY;
809 		} else {
810 			/* remove_lockspace takes ls off lslist */
811 			ls->ls_create_count = 0;
812 			rv = 0;
813 		}
814 	} else if (ls->ls_create_count > 1) {
815 		rv = --ls->ls_create_count;
816 	} else {
817 		rv = -EINVAL;
818 	}
819 	spin_unlock(&lslist_lock);
820 
821 	if (rv) {
822 		log_debug(ls, "release_lockspace no remove %d", rv);
823 		return rv;
824 	}
825 
826 	dlm_device_deregister(ls);
827 
828 	if (force < 3 && dlm_user_daemon_available())
829 		do_uevent(ls, 0);
830 
831 	dlm_recoverd_stop(ls);
832 
833 	if (ls_count == 1) {
834 		dlm_scand_stop();
835 		dlm_clear_members(ls);
836 		dlm_midcomms_shutdown();
837 	}
838 
839 	dlm_callback_stop(ls);
840 
841 	remove_lockspace(ls);
842 
843 	dlm_delete_debug_file(ls);
844 
845 	idr_destroy(&ls->ls_recover_idr);
846 	kfree(ls->ls_recover_buf);
847 
848 	/*
849 	 * Free all lkb's in idr
850 	 */
851 
852 	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
853 	idr_destroy(&ls->ls_lkbidr);
854 
855 	/*
856 	 * Free all rsb's on rsbtbl[] lists
857 	 */
858 
859 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
860 		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
861 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
862 			rb_erase(n, &ls->ls_rsbtbl[i].keep);
863 			dlm_free_rsb(rsb);
864 		}
865 
866 		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
867 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
868 			rb_erase(n, &ls->ls_rsbtbl[i].toss);
869 			dlm_free_rsb(rsb);
870 		}
871 	}
872 
873 	vfree(ls->ls_rsbtbl);
874 
875 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
876 		kfree(ls->ls_remove_names[i]);
877 
878 	while (!list_empty(&ls->ls_new_rsb)) {
879 		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
880 				       res_hashchain);
881 		list_del(&rsb->res_hashchain);
882 		dlm_free_rsb(rsb);
883 	}
884 
885 	/*
886 	 * Free structures on any other lists
887 	 */
888 
889 	dlm_purge_requestqueue(ls);
890 	kfree(ls->ls_recover_args);
891 	dlm_clear_members(ls);
892 	dlm_clear_members_gone(ls);
893 	kfree(ls->ls_node_array);
894 	log_rinfo(ls, "release_lockspace final free");
895 	kobject_put(&ls->ls_kobj);
896 	/* The ls structure will be freed when the kobject is done with */
897 
898 	module_put(THIS_MODULE);
899 	return 0;
900 }
901 
902 /*
903  * Called when a system has released all its locks and is not going to use the
904  * lockspace any longer.  We free everything we're managing for this lockspace.
905  * Remaining nodes will go through the recovery process as if we'd died.  The
906  * lockspace must continue to function as usual, participating in recoveries,
907  * until this returns.
908  *
909  * Force has 4 possible values:
910  * 0 - don't destroy lockspace if it has any LKBs
911  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
912  * 2 - destroy lockspace regardless of LKBs
913  * 3 - destroy lockspace as part of a forced shutdown
914  */
915 
dlm_release_lockspace(void * lockspace,int force)916 int dlm_release_lockspace(void *lockspace, int force)
917 {
918 	struct dlm_ls *ls;
919 	int error;
920 
921 	ls = dlm_find_lockspace_local(lockspace);
922 	if (!ls)
923 		return -EINVAL;
924 	dlm_put_lockspace(ls);
925 
926 	mutex_lock(&ls_lock);
927 	error = release_lockspace(ls, force);
928 	if (!error)
929 		ls_count--;
930 	if (!ls_count)
931 		dlm_midcomms_stop();
932 	mutex_unlock(&ls_lock);
933 
934 	return error;
935 }
936 
dlm_stop_lockspaces(void)937 void dlm_stop_lockspaces(void)
938 {
939 	struct dlm_ls *ls;
940 	int count;
941 
942  restart:
943 	count = 0;
944 	spin_lock(&lslist_lock);
945 	list_for_each_entry(ls, &lslist, ls_list) {
946 		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
947 			count++;
948 			continue;
949 		}
950 		spin_unlock(&lslist_lock);
951 		log_error(ls, "no userland control daemon, stopping lockspace");
952 		dlm_ls_stop(ls);
953 		goto restart;
954 	}
955 	spin_unlock(&lslist_lock);
956 
957 	if (count)
958 		log_print("dlm user daemon left %d lockspaces", count);
959 }
960 
961