• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11 
12 #include <linux/module.h>
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "midcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27 
28 static int			ls_count;
29 static struct mutex		ls_lock;
30 static struct list_head		lslist;
31 static spinlock_t		lslist_lock;
32 static struct task_struct *	scand_task;
33 
34 
dlm_control_store(struct dlm_ls * ls,const char * buf,size_t len)35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37 	ssize_t ret = len;
38 	int n;
39 	int rc = kstrtoint(buf, 0, &n);
40 
41 	if (rc)
42 		return rc;
43 	ls = dlm_find_lockspace_local(ls->ls_local_handle);
44 	if (!ls)
45 		return -EINVAL;
46 
47 	switch (n) {
48 	case 0:
49 		dlm_ls_stop(ls);
50 		break;
51 	case 1:
52 		dlm_ls_start(ls);
53 		break;
54 	default:
55 		ret = -EINVAL;
56 	}
57 	dlm_put_lockspace(ls);
58 	return ret;
59 }
60 
dlm_event_store(struct dlm_ls * ls,const char * buf,size_t len)61 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
62 {
63 	int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
64 
65 	if (rc)
66 		return rc;
67 	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
68 	wake_up(&ls->ls_uevent_wait);
69 	return len;
70 }
71 
dlm_id_show(struct dlm_ls * ls,char * buf)72 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
73 {
74 	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
75 }
76 
dlm_id_store(struct dlm_ls * ls,const char * buf,size_t len)77 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
78 {
79 	int rc = kstrtouint(buf, 0, &ls->ls_global_id);
80 
81 	if (rc)
82 		return rc;
83 	return len;
84 }
85 
dlm_nodir_show(struct dlm_ls * ls,char * buf)86 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
87 {
88 	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
89 }
90 
dlm_nodir_store(struct dlm_ls * ls,const char * buf,size_t len)91 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
92 {
93 	int val;
94 	int rc = kstrtoint(buf, 0, &val);
95 
96 	if (rc)
97 		return rc;
98 	if (val == 1)
99 		set_bit(LSFL_NODIR, &ls->ls_flags);
100 	return len;
101 }
102 
dlm_recover_status_show(struct dlm_ls * ls,char * buf)103 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
104 {
105 	uint32_t status = dlm_recover_status(ls);
106 	return snprintf(buf, PAGE_SIZE, "%x\n", status);
107 }
108 
dlm_recover_nodeid_show(struct dlm_ls * ls,char * buf)109 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
110 {
111 	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
112 }
113 
114 struct dlm_attr {
115 	struct attribute attr;
116 	ssize_t (*show)(struct dlm_ls *, char *);
117 	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
118 };
119 
120 static struct dlm_attr dlm_attr_control = {
121 	.attr  = {.name = "control", .mode = S_IWUSR},
122 	.store = dlm_control_store
123 };
124 
125 static struct dlm_attr dlm_attr_event = {
126 	.attr  = {.name = "event_done", .mode = S_IWUSR},
127 	.store = dlm_event_store
128 };
129 
130 static struct dlm_attr dlm_attr_id = {
131 	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
132 	.show  = dlm_id_show,
133 	.store = dlm_id_store
134 };
135 
136 static struct dlm_attr dlm_attr_nodir = {
137 	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
138 	.show  = dlm_nodir_show,
139 	.store = dlm_nodir_store
140 };
141 
142 static struct dlm_attr dlm_attr_recover_status = {
143 	.attr  = {.name = "recover_status", .mode = S_IRUGO},
144 	.show  = dlm_recover_status_show
145 };
146 
147 static struct dlm_attr dlm_attr_recover_nodeid = {
148 	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
149 	.show  = dlm_recover_nodeid_show
150 };
151 
152 static struct attribute *dlm_attrs[] = {
153 	&dlm_attr_control.attr,
154 	&dlm_attr_event.attr,
155 	&dlm_attr_id.attr,
156 	&dlm_attr_nodir.attr,
157 	&dlm_attr_recover_status.attr,
158 	&dlm_attr_recover_nodeid.attr,
159 	NULL,
160 };
161 ATTRIBUTE_GROUPS(dlm);
162 
dlm_attr_show(struct kobject * kobj,struct attribute * attr,char * buf)163 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
164 			     char *buf)
165 {
166 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
167 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
168 	return a->show ? a->show(ls, buf) : 0;
169 }
170 
dlm_attr_store(struct kobject * kobj,struct attribute * attr,const char * buf,size_t len)171 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
172 			      const char *buf, size_t len)
173 {
174 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
175 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
176 	return a->store ? a->store(ls, buf, len) : len;
177 }
178 
lockspace_kobj_release(struct kobject * k)179 static void lockspace_kobj_release(struct kobject *k)
180 {
181 	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
182 	kfree(ls);
183 }
184 
185 static const struct sysfs_ops dlm_attr_ops = {
186 	.show  = dlm_attr_show,
187 	.store = dlm_attr_store,
188 };
189 
190 static struct kobj_type dlm_ktype = {
191 	.default_groups = dlm_groups,
192 	.sysfs_ops     = &dlm_attr_ops,
193 	.release       = lockspace_kobj_release,
194 };
195 
196 static struct kset *dlm_kset;
197 
do_uevent(struct dlm_ls * ls,int in)198 static int do_uevent(struct dlm_ls *ls, int in)
199 {
200 	if (in)
201 		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
202 	else
203 		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
204 
205 	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
206 
207 	/* dlm_controld will see the uevent, do the necessary group management
208 	   and then write to sysfs to wake us */
209 
210 	wait_event(ls->ls_uevent_wait,
211 		   test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
212 
213 	log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
214 
215 	return ls->ls_uevent_result;
216 }
217 
dlm_uevent(struct kset * kset,struct kobject * kobj,struct kobj_uevent_env * env)218 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
219 		      struct kobj_uevent_env *env)
220 {
221 	struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
222 
223 	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
224 	return 0;
225 }
226 
227 static const struct kset_uevent_ops dlm_uevent_ops = {
228 	.uevent = dlm_uevent,
229 };
230 
dlm_lockspace_init(void)231 int __init dlm_lockspace_init(void)
232 {
233 	ls_count = 0;
234 	mutex_init(&ls_lock);
235 	INIT_LIST_HEAD(&lslist);
236 	spin_lock_init(&lslist_lock);
237 
238 	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
239 	if (!dlm_kset) {
240 		printk(KERN_WARNING "%s: can not create kset\n", __func__);
241 		return -ENOMEM;
242 	}
243 	return 0;
244 }
245 
dlm_lockspace_exit(void)246 void dlm_lockspace_exit(void)
247 {
248 	kset_unregister(dlm_kset);
249 }
250 
find_ls_to_scan(void)251 static struct dlm_ls *find_ls_to_scan(void)
252 {
253 	struct dlm_ls *ls;
254 
255 	spin_lock(&lslist_lock);
256 	list_for_each_entry(ls, &lslist, ls_list) {
257 		if (time_after_eq(jiffies, ls->ls_scan_time +
258 					    dlm_config.ci_scan_secs * HZ)) {
259 			spin_unlock(&lslist_lock);
260 			return ls;
261 		}
262 	}
263 	spin_unlock(&lslist_lock);
264 	return NULL;
265 }
266 
dlm_scand(void * data)267 static int dlm_scand(void *data)
268 {
269 	struct dlm_ls *ls;
270 
271 	while (!kthread_should_stop()) {
272 		ls = find_ls_to_scan();
273 		if (ls) {
274 			if (dlm_lock_recovery_try(ls)) {
275 				ls->ls_scan_time = jiffies;
276 				dlm_scan_rsbs(ls);
277 				dlm_scan_timeout(ls);
278 				dlm_scan_waiters(ls);
279 				dlm_unlock_recovery(ls);
280 			} else {
281 				ls->ls_scan_time += HZ;
282 			}
283 			continue;
284 		}
285 		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
286 	}
287 	return 0;
288 }
289 
dlm_scand_start(void)290 static int dlm_scand_start(void)
291 {
292 	struct task_struct *p;
293 	int error = 0;
294 
295 	p = kthread_run(dlm_scand, NULL, "dlm_scand");
296 	if (IS_ERR(p))
297 		error = PTR_ERR(p);
298 	else
299 		scand_task = p;
300 	return error;
301 }
302 
dlm_scand_stop(void)303 static void dlm_scand_stop(void)
304 {
305 	kthread_stop(scand_task);
306 }
307 
dlm_find_lockspace_global(uint32_t id)308 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
309 {
310 	struct dlm_ls *ls;
311 
312 	spin_lock(&lslist_lock);
313 
314 	list_for_each_entry(ls, &lslist, ls_list) {
315 		if (ls->ls_global_id == id) {
316 			ls->ls_count++;
317 			goto out;
318 		}
319 	}
320 	ls = NULL;
321  out:
322 	spin_unlock(&lslist_lock);
323 	return ls;
324 }
325 
dlm_find_lockspace_local(dlm_lockspace_t * lockspace)326 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
327 {
328 	struct dlm_ls *ls;
329 
330 	spin_lock(&lslist_lock);
331 	list_for_each_entry(ls, &lslist, ls_list) {
332 		if (ls->ls_local_handle == lockspace) {
333 			ls->ls_count++;
334 			goto out;
335 		}
336 	}
337 	ls = NULL;
338  out:
339 	spin_unlock(&lslist_lock);
340 	return ls;
341 }
342 
dlm_find_lockspace_device(int minor)343 struct dlm_ls *dlm_find_lockspace_device(int minor)
344 {
345 	struct dlm_ls *ls;
346 
347 	spin_lock(&lslist_lock);
348 	list_for_each_entry(ls, &lslist, ls_list) {
349 		if (ls->ls_device.minor == minor) {
350 			ls->ls_count++;
351 			goto out;
352 		}
353 	}
354 	ls = NULL;
355  out:
356 	spin_unlock(&lslist_lock);
357 	return ls;
358 }
359 
dlm_put_lockspace(struct dlm_ls * ls)360 void dlm_put_lockspace(struct dlm_ls *ls)
361 {
362 	spin_lock(&lslist_lock);
363 	ls->ls_count--;
364 	spin_unlock(&lslist_lock);
365 }
366 
remove_lockspace(struct dlm_ls * ls)367 static void remove_lockspace(struct dlm_ls *ls)
368 {
369 	for (;;) {
370 		spin_lock(&lslist_lock);
371 		if (ls->ls_count == 0) {
372 			WARN_ON(ls->ls_create_count != 0);
373 			list_del(&ls->ls_list);
374 			spin_unlock(&lslist_lock);
375 			return;
376 		}
377 		spin_unlock(&lslist_lock);
378 		ssleep(1);
379 	}
380 }
381 
threads_start(void)382 static int threads_start(void)
383 {
384 	int error;
385 
386 	/* Thread for sending/receiving messages for all lockspace's */
387 	error = dlm_midcomms_start();
388 	if (error) {
389 		log_print("cannot start dlm midcomms %d", error);
390 		goto fail;
391 	}
392 
393 	error = dlm_scand_start();
394 	if (error) {
395 		log_print("cannot start dlm_scand thread %d", error);
396 		goto midcomms_fail;
397 	}
398 
399 	return 0;
400 
401  midcomms_fail:
402 	dlm_midcomms_stop();
403  fail:
404 	return error;
405 }
406 
new_lockspace(const char * name,const char * cluster,uint32_t flags,int lvblen,const struct dlm_lockspace_ops * ops,void * ops_arg,int * ops_result,dlm_lockspace_t ** lockspace)407 static int new_lockspace(const char *name, const char *cluster,
408 			 uint32_t flags, int lvblen,
409 			 const struct dlm_lockspace_ops *ops, void *ops_arg,
410 			 int *ops_result, dlm_lockspace_t **lockspace)
411 {
412 	struct dlm_ls *ls;
413 	int i, size, error;
414 	int do_unreg = 0;
415 	int namelen = strlen(name);
416 
417 	if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
418 		return -EINVAL;
419 
420 	if (!lvblen || (lvblen % 8))
421 		return -EINVAL;
422 
423 	if (!try_module_get(THIS_MODULE))
424 		return -EINVAL;
425 
426 	if (!dlm_user_daemon_available()) {
427 		log_print("dlm user daemon not available");
428 		error = -EUNATCH;
429 		goto out;
430 	}
431 
432 	if (ops && ops_result) {
433 	       	if (!dlm_config.ci_recover_callbacks)
434 			*ops_result = -EOPNOTSUPP;
435 		else
436 			*ops_result = 0;
437 	}
438 
439 	if (!cluster)
440 		log_print("dlm cluster name '%s' is being used without an application provided cluster name",
441 			  dlm_config.ci_cluster_name);
442 
443 	if (dlm_config.ci_recover_callbacks && cluster &&
444 	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
445 		log_print("dlm cluster name '%s' does not match "
446 			  "the application cluster name '%s'",
447 			  dlm_config.ci_cluster_name, cluster);
448 		error = -EBADR;
449 		goto out;
450 	}
451 
452 	error = 0;
453 
454 	spin_lock(&lslist_lock);
455 	list_for_each_entry(ls, &lslist, ls_list) {
456 		WARN_ON(ls->ls_create_count <= 0);
457 		if (ls->ls_namelen != namelen)
458 			continue;
459 		if (memcmp(ls->ls_name, name, namelen))
460 			continue;
461 		if (flags & DLM_LSFL_NEWEXCL) {
462 			error = -EEXIST;
463 			break;
464 		}
465 		ls->ls_create_count++;
466 		*lockspace = ls;
467 		error = 1;
468 		break;
469 	}
470 	spin_unlock(&lslist_lock);
471 
472 	if (error)
473 		goto out;
474 
475 	error = -ENOMEM;
476 
477 	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
478 	if (!ls)
479 		goto out;
480 	memcpy(ls->ls_name, name, namelen);
481 	ls->ls_namelen = namelen;
482 	ls->ls_lvblen = lvblen;
483 	ls->ls_count = 0;
484 	ls->ls_flags = 0;
485 	ls->ls_scan_time = jiffies;
486 
487 	if (ops && dlm_config.ci_recover_callbacks) {
488 		ls->ls_ops = ops;
489 		ls->ls_ops_arg = ops_arg;
490 	}
491 
492 	if (flags & DLM_LSFL_TIMEWARN)
493 		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
494 
495 	/* ls_exflags are forced to match among nodes, and we don't
496 	   need to require all nodes to have some flags set */
497 	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
498 				    DLM_LSFL_NEWEXCL));
499 
500 	size = READ_ONCE(dlm_config.ci_rsbtbl_size);
501 	ls->ls_rsbtbl_size = size;
502 
503 	ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
504 	if (!ls->ls_rsbtbl)
505 		goto out_lsfree;
506 	for (i = 0; i < size; i++) {
507 		ls->ls_rsbtbl[i].keep.rb_node = NULL;
508 		ls->ls_rsbtbl[i].toss.rb_node = NULL;
509 		spin_lock_init(&ls->ls_rsbtbl[i].lock);
510 	}
511 
512 	spin_lock_init(&ls->ls_remove_spin);
513 
514 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
515 		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
516 						 GFP_KERNEL);
517 		if (!ls->ls_remove_names[i])
518 			goto out_rsbtbl;
519 	}
520 
521 	idr_init(&ls->ls_lkbidr);
522 	spin_lock_init(&ls->ls_lkbidr_spin);
523 
524 	INIT_LIST_HEAD(&ls->ls_waiters);
525 	mutex_init(&ls->ls_waiters_mutex);
526 	INIT_LIST_HEAD(&ls->ls_orphans);
527 	mutex_init(&ls->ls_orphans_mutex);
528 	INIT_LIST_HEAD(&ls->ls_timeout);
529 	mutex_init(&ls->ls_timeout_mutex);
530 
531 	INIT_LIST_HEAD(&ls->ls_new_rsb);
532 	spin_lock_init(&ls->ls_new_rsb_spin);
533 
534 	INIT_LIST_HEAD(&ls->ls_nodes);
535 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
536 	ls->ls_num_nodes = 0;
537 	ls->ls_low_nodeid = 0;
538 	ls->ls_total_weight = 0;
539 	ls->ls_node_array = NULL;
540 
541 	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
542 	ls->ls_stub_rsb.res_ls = ls;
543 
544 	ls->ls_debug_rsb_dentry = NULL;
545 	ls->ls_debug_waiters_dentry = NULL;
546 
547 	init_waitqueue_head(&ls->ls_uevent_wait);
548 	ls->ls_uevent_result = 0;
549 	init_completion(&ls->ls_members_done);
550 	ls->ls_members_result = -1;
551 
552 	mutex_init(&ls->ls_cb_mutex);
553 	INIT_LIST_HEAD(&ls->ls_cb_delay);
554 
555 	ls->ls_recoverd_task = NULL;
556 	mutex_init(&ls->ls_recoverd_active);
557 	spin_lock_init(&ls->ls_recover_lock);
558 	spin_lock_init(&ls->ls_rcom_spin);
559 	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
560 	ls->ls_recover_status = 0;
561 	ls->ls_recover_seq = 0;
562 	ls->ls_recover_args = NULL;
563 	init_rwsem(&ls->ls_in_recovery);
564 	init_rwsem(&ls->ls_recv_active);
565 	INIT_LIST_HEAD(&ls->ls_requestqueue);
566 	mutex_init(&ls->ls_requestqueue_mutex);
567 	mutex_init(&ls->ls_clear_proc_locks);
568 
569 	/* Due backwards compatibility with 3.1 we need to use maximum
570 	 * possible dlm message size to be sure the message will fit and
571 	 * not having out of bounds issues. However on sending side 3.2
572 	 * might send less.
573 	 */
574 	ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
575 	if (!ls->ls_recover_buf)
576 		goto out_lkbidr;
577 
578 	ls->ls_slot = 0;
579 	ls->ls_num_slots = 0;
580 	ls->ls_slots_size = 0;
581 	ls->ls_slots = NULL;
582 
583 	INIT_LIST_HEAD(&ls->ls_recover_list);
584 	spin_lock_init(&ls->ls_recover_list_lock);
585 	idr_init(&ls->ls_recover_idr);
586 	spin_lock_init(&ls->ls_recover_idr_lock);
587 	ls->ls_recover_list_count = 0;
588 	ls->ls_local_handle = ls;
589 	init_waitqueue_head(&ls->ls_wait_general);
590 	INIT_LIST_HEAD(&ls->ls_root_list);
591 	init_rwsem(&ls->ls_root_sem);
592 
593 	spin_lock(&lslist_lock);
594 	ls->ls_create_count = 1;
595 	list_add(&ls->ls_list, &lslist);
596 	spin_unlock(&lslist_lock);
597 
598 	if (flags & DLM_LSFL_FS) {
599 		error = dlm_callback_start(ls);
600 		if (error) {
601 			log_error(ls, "can't start dlm_callback %d", error);
602 			goto out_delist;
603 		}
604 	}
605 
606 	init_waitqueue_head(&ls->ls_recover_lock_wait);
607 
608 	/*
609 	 * Once started, dlm_recoverd first looks for ls in lslist, then
610 	 * initializes ls_in_recovery as locked in "down" mode.  We need
611 	 * to wait for the wakeup from dlm_recoverd because in_recovery
612 	 * has to start out in down mode.
613 	 */
614 
615 	error = dlm_recoverd_start(ls);
616 	if (error) {
617 		log_error(ls, "can't start dlm_recoverd %d", error);
618 		goto out_callback;
619 	}
620 
621 	wait_event(ls->ls_recover_lock_wait,
622 		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
623 
624 	/* let kobject handle freeing of ls if there's an error */
625 	do_unreg = 1;
626 
627 	ls->ls_kobj.kset = dlm_kset;
628 	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
629 				     "%s", ls->ls_name);
630 	if (error)
631 		goto out_recoverd;
632 	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
633 
634 	/* This uevent triggers dlm_controld in userspace to add us to the
635 	   group of nodes that are members of this lockspace (managed by the
636 	   cluster infrastructure.)  Once it's done that, it tells us who the
637 	   current lockspace members are (via configfs) and then tells the
638 	   lockspace to start running (via sysfs) in dlm_ls_start(). */
639 
640 	error = do_uevent(ls, 1);
641 	if (error)
642 		goto out_recoverd;
643 
644 	wait_for_completion(&ls->ls_members_done);
645 	error = ls->ls_members_result;
646 	if (error)
647 		goto out_members;
648 
649 	dlm_create_debug_file(ls);
650 
651 	log_rinfo(ls, "join complete");
652 	*lockspace = ls;
653 	return 0;
654 
655  out_members:
656 	do_uevent(ls, 0);
657 	dlm_clear_members(ls);
658 	kfree(ls->ls_node_array);
659  out_recoverd:
660 	dlm_recoverd_stop(ls);
661  out_callback:
662 	dlm_callback_stop(ls);
663  out_delist:
664 	spin_lock(&lslist_lock);
665 	list_del(&ls->ls_list);
666 	spin_unlock(&lslist_lock);
667 	idr_destroy(&ls->ls_recover_idr);
668 	kfree(ls->ls_recover_buf);
669  out_lkbidr:
670 	idr_destroy(&ls->ls_lkbidr);
671  out_rsbtbl:
672 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
673 		kfree(ls->ls_remove_names[i]);
674 	vfree(ls->ls_rsbtbl);
675  out_lsfree:
676 	if (do_unreg)
677 		kobject_put(&ls->ls_kobj);
678 	else
679 		kfree(ls);
680  out:
681 	module_put(THIS_MODULE);
682 	return error;
683 }
684 
dlm_new_lockspace(const char * name,const char * cluster,uint32_t flags,int lvblen,const struct dlm_lockspace_ops * ops,void * ops_arg,int * ops_result,dlm_lockspace_t ** lockspace)685 int dlm_new_lockspace(const char *name, const char *cluster,
686 		      uint32_t flags, int lvblen,
687 		      const struct dlm_lockspace_ops *ops, void *ops_arg,
688 		      int *ops_result, dlm_lockspace_t **lockspace)
689 {
690 	int error = 0;
691 
692 	mutex_lock(&ls_lock);
693 	if (!ls_count)
694 		error = threads_start();
695 	if (error)
696 		goto out;
697 
698 	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
699 			      ops_result, lockspace);
700 	if (!error)
701 		ls_count++;
702 	if (error > 0)
703 		error = 0;
704 	if (!ls_count) {
705 		dlm_scand_stop();
706 		dlm_midcomms_shutdown();
707 		dlm_midcomms_stop();
708 	}
709  out:
710 	mutex_unlock(&ls_lock);
711 	return error;
712 }
713 
lkb_idr_is_local(int id,void * p,void * data)714 static int lkb_idr_is_local(int id, void *p, void *data)
715 {
716 	struct dlm_lkb *lkb = p;
717 
718 	return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
719 }
720 
lkb_idr_is_any(int id,void * p,void * data)721 static int lkb_idr_is_any(int id, void *p, void *data)
722 {
723 	return 1;
724 }
725 
lkb_idr_free(int id,void * p,void * data)726 static int lkb_idr_free(int id, void *p, void *data)
727 {
728 	struct dlm_lkb *lkb = p;
729 
730 	if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
731 		dlm_free_lvb(lkb->lkb_lvbptr);
732 
733 	dlm_free_lkb(lkb);
734 	return 0;
735 }
736 
737 /* NOTE: We check the lkbidr here rather than the resource table.
738    This is because there may be LKBs queued as ASTs that have been unlinked
739    from their RSBs and are pending deletion once the AST has been delivered */
740 
lockspace_busy(struct dlm_ls * ls,int force)741 static int lockspace_busy(struct dlm_ls *ls, int force)
742 {
743 	int rv;
744 
745 	spin_lock(&ls->ls_lkbidr_spin);
746 	if (force == 0) {
747 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
748 	} else if (force == 1) {
749 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
750 	} else {
751 		rv = 0;
752 	}
753 	spin_unlock(&ls->ls_lkbidr_spin);
754 	return rv;
755 }
756 
release_lockspace(struct dlm_ls * ls,int force)757 static int release_lockspace(struct dlm_ls *ls, int force)
758 {
759 	struct dlm_rsb *rsb;
760 	struct rb_node *n;
761 	int i, busy, rv;
762 
763 	busy = lockspace_busy(ls, force);
764 
765 	spin_lock(&lslist_lock);
766 	if (ls->ls_create_count == 1) {
767 		if (busy) {
768 			rv = -EBUSY;
769 		} else {
770 			/* remove_lockspace takes ls off lslist */
771 			ls->ls_create_count = 0;
772 			rv = 0;
773 		}
774 	} else if (ls->ls_create_count > 1) {
775 		rv = --ls->ls_create_count;
776 	} else {
777 		rv = -EINVAL;
778 	}
779 	spin_unlock(&lslist_lock);
780 
781 	if (rv) {
782 		log_debug(ls, "release_lockspace no remove %d", rv);
783 		return rv;
784 	}
785 
786 	dlm_device_deregister(ls);
787 
788 	if (force < 3 && dlm_user_daemon_available())
789 		do_uevent(ls, 0);
790 
791 	dlm_recoverd_stop(ls);
792 
793 	if (ls_count == 1) {
794 		dlm_scand_stop();
795 		dlm_clear_members(ls);
796 		dlm_midcomms_shutdown();
797 	}
798 
799 	dlm_callback_stop(ls);
800 
801 	remove_lockspace(ls);
802 
803 	dlm_delete_debug_file(ls);
804 
805 	idr_destroy(&ls->ls_recover_idr);
806 	kfree(ls->ls_recover_buf);
807 
808 	/*
809 	 * Free all lkb's in idr
810 	 */
811 
812 	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
813 	idr_destroy(&ls->ls_lkbidr);
814 
815 	/*
816 	 * Free all rsb's on rsbtbl[] lists
817 	 */
818 
819 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
820 		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
821 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
822 			rb_erase(n, &ls->ls_rsbtbl[i].keep);
823 			dlm_free_rsb(rsb);
824 		}
825 
826 		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
827 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
828 			rb_erase(n, &ls->ls_rsbtbl[i].toss);
829 			dlm_free_rsb(rsb);
830 		}
831 	}
832 
833 	vfree(ls->ls_rsbtbl);
834 
835 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
836 		kfree(ls->ls_remove_names[i]);
837 
838 	while (!list_empty(&ls->ls_new_rsb)) {
839 		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
840 				       res_hashchain);
841 		list_del(&rsb->res_hashchain);
842 		dlm_free_rsb(rsb);
843 	}
844 
845 	/*
846 	 * Free structures on any other lists
847 	 */
848 
849 	dlm_purge_requestqueue(ls);
850 	kfree(ls->ls_recover_args);
851 	dlm_clear_members(ls);
852 	dlm_clear_members_gone(ls);
853 	kfree(ls->ls_node_array);
854 	log_rinfo(ls, "release_lockspace final free");
855 	kobject_put(&ls->ls_kobj);
856 	/* The ls structure will be freed when the kobject is done with */
857 
858 	module_put(THIS_MODULE);
859 	return 0;
860 }
861 
862 /*
863  * Called when a system has released all its locks and is not going to use the
864  * lockspace any longer.  We free everything we're managing for this lockspace.
865  * Remaining nodes will go through the recovery process as if we'd died.  The
866  * lockspace must continue to function as usual, participating in recoveries,
867  * until this returns.
868  *
869  * Force has 4 possible values:
870  * 0 - don't destroy locksapce if it has any LKBs
871  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
872  * 2 - destroy lockspace regardless of LKBs
873  * 3 - destroy lockspace as part of a forced shutdown
874  */
875 
dlm_release_lockspace(void * lockspace,int force)876 int dlm_release_lockspace(void *lockspace, int force)
877 {
878 	struct dlm_ls *ls;
879 	int error;
880 
881 	ls = dlm_find_lockspace_local(lockspace);
882 	if (!ls)
883 		return -EINVAL;
884 	dlm_put_lockspace(ls);
885 
886 	mutex_lock(&ls_lock);
887 	error = release_lockspace(ls, force);
888 	if (!error)
889 		ls_count--;
890 	if (!ls_count)
891 		dlm_midcomms_stop();
892 	mutex_unlock(&ls_lock);
893 
894 	return error;
895 }
896 
dlm_stop_lockspaces(void)897 void dlm_stop_lockspaces(void)
898 {
899 	struct dlm_ls *ls;
900 	int count;
901 
902  restart:
903 	count = 0;
904 	spin_lock(&lslist_lock);
905 	list_for_each_entry(ls, &lslist, ls_list) {
906 		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
907 			count++;
908 			continue;
909 		}
910 		spin_unlock(&lslist_lock);
911 		log_error(ls, "no userland control daemon, stopping lockspace");
912 		dlm_ls_stop(ls);
913 		goto restart;
914 	}
915 	spin_unlock(&lslist_lock);
916 
917 	if (count)
918 		log_print("dlm user daemon left %d lockspaces", count);
919 }
920 
921