• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2011 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12 
13 #include "dlm_internal.h"
14 #include "lockspace.h"
15 #include "member.h"
16 #include "recoverd.h"
17 #include "recover.h"
18 #include "rcom.h"
19 #include "config.h"
20 #include "lowcomms.h"
21 
dlm_slots_version(struct dlm_header * h)22 int dlm_slots_version(struct dlm_header *h)
23 {
24 	if ((h->h_version & 0x0000FFFF) < DLM_HEADER_SLOTS)
25 		return 0;
26 	return 1;
27 }
28 
dlm_slot_save(struct dlm_ls * ls,struct dlm_rcom * rc,struct dlm_member * memb)29 void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
30 		   struct dlm_member *memb)
31 {
32 	struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
33 
34 	if (!dlm_slots_version(&rc->rc_header))
35 		return;
36 
37 	memb->slot = le16_to_cpu(rf->rf_our_slot);
38 	memb->generation = le32_to_cpu(rf->rf_generation);
39 }
40 
dlm_slots_copy_out(struct dlm_ls * ls,struct dlm_rcom * rc)41 void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc)
42 {
43 	struct dlm_slot *slot;
44 	struct rcom_slot *ro;
45 	int i;
46 
47 	ro = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
48 
49 	/* ls_slots array is sparse, but not rcom_slots */
50 
51 	for (i = 0; i < ls->ls_slots_size; i++) {
52 		slot = &ls->ls_slots[i];
53 		if (!slot->nodeid)
54 			continue;
55 		ro->ro_nodeid = cpu_to_le32(slot->nodeid);
56 		ro->ro_slot = cpu_to_le16(slot->slot);
57 		ro++;
58 	}
59 }
60 
61 #define SLOT_DEBUG_LINE 128
62 
log_slots(struct dlm_ls * ls,uint32_t gen,int num_slots,struct rcom_slot * ro0,struct dlm_slot * array,int array_size)63 static void log_slots(struct dlm_ls *ls, uint32_t gen, int num_slots,
64 		      struct rcom_slot *ro0, struct dlm_slot *array,
65 		      int array_size)
66 {
67 	char line[SLOT_DEBUG_LINE];
68 	int len = SLOT_DEBUG_LINE - 1;
69 	int pos = 0;
70 	int ret, i;
71 
72 	memset(line, 0, sizeof(line));
73 
74 	if (array) {
75 		for (i = 0; i < array_size; i++) {
76 			if (!array[i].nodeid)
77 				continue;
78 
79 			ret = snprintf(line + pos, len - pos, " %d:%d",
80 				       array[i].slot, array[i].nodeid);
81 			if (ret >= len - pos)
82 				break;
83 			pos += ret;
84 		}
85 	} else if (ro0) {
86 		for (i = 0; i < num_slots; i++) {
87 			ret = snprintf(line + pos, len - pos, " %d:%d",
88 				       ro0[i].ro_slot, ro0[i].ro_nodeid);
89 			if (ret >= len - pos)
90 				break;
91 			pos += ret;
92 		}
93 	}
94 
95 	log_rinfo(ls, "generation %u slots %d%s", gen, num_slots, line);
96 }
97 
dlm_slots_copy_in(struct dlm_ls * ls)98 int dlm_slots_copy_in(struct dlm_ls *ls)
99 {
100 	struct dlm_member *memb;
101 	struct dlm_rcom *rc = ls->ls_recover_buf;
102 	struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
103 	struct rcom_slot *ro0, *ro;
104 	int our_nodeid = dlm_our_nodeid();
105 	int i, num_slots;
106 	uint32_t gen;
107 
108 	if (!dlm_slots_version(&rc->rc_header))
109 		return -1;
110 
111 	gen = le32_to_cpu(rf->rf_generation);
112 	if (gen <= ls->ls_generation) {
113 		log_error(ls, "dlm_slots_copy_in gen %u old %u",
114 			  gen, ls->ls_generation);
115 	}
116 	ls->ls_generation = gen;
117 
118 	num_slots = le16_to_cpu(rf->rf_num_slots);
119 	if (!num_slots)
120 		return -1;
121 
122 	ro0 = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
123 
124 	for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
125 		ro->ro_nodeid = le32_to_cpu(ro->ro_nodeid);
126 		ro->ro_slot = le16_to_cpu(ro->ro_slot);
127 	}
128 
129 	log_slots(ls, gen, num_slots, ro0, NULL, 0);
130 
131 	list_for_each_entry(memb, &ls->ls_nodes, list) {
132 		for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
133 			if (ro->ro_nodeid != memb->nodeid)
134 				continue;
135 			memb->slot = ro->ro_slot;
136 			memb->slot_prev = memb->slot;
137 			break;
138 		}
139 
140 		if (memb->nodeid == our_nodeid) {
141 			if (ls->ls_slot && ls->ls_slot != memb->slot) {
142 				log_error(ls, "dlm_slots_copy_in our slot "
143 					  "changed %d %d", ls->ls_slot,
144 					  memb->slot);
145 				return -1;
146 			}
147 
148 			if (!ls->ls_slot)
149 				ls->ls_slot = memb->slot;
150 		}
151 
152 		if (!memb->slot) {
153 			log_error(ls, "dlm_slots_copy_in nodeid %d no slot",
154 				   memb->nodeid);
155 			return -1;
156 		}
157 	}
158 
159 	return 0;
160 }
161 
162 /* for any nodes that do not support slots, we will not have set memb->slot
163    in wait_status_all(), so memb->slot will remain -1, and we will not
164    assign slots or set ls_num_slots here */
165 
dlm_slots_assign(struct dlm_ls * ls,int * num_slots,int * slots_size,struct dlm_slot ** slots_out,uint32_t * gen_out)166 int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
167 		     struct dlm_slot **slots_out, uint32_t *gen_out)
168 {
169 	struct dlm_member *memb;
170 	struct dlm_slot *array;
171 	int our_nodeid = dlm_our_nodeid();
172 	int array_size, max_slots, i;
173 	int need = 0;
174 	int max = 0;
175 	int num = 0;
176 	uint32_t gen = 0;
177 
178 	/* our own memb struct will have slot -1 gen 0 */
179 
180 	list_for_each_entry(memb, &ls->ls_nodes, list) {
181 		if (memb->nodeid == our_nodeid) {
182 			memb->slot = ls->ls_slot;
183 			memb->generation = ls->ls_generation;
184 			break;
185 		}
186 	}
187 
188 	list_for_each_entry(memb, &ls->ls_nodes, list) {
189 		if (memb->generation > gen)
190 			gen = memb->generation;
191 
192 		/* node doesn't support slots */
193 
194 		if (memb->slot == -1)
195 			return -1;
196 
197 		/* node needs a slot assigned */
198 
199 		if (!memb->slot)
200 			need++;
201 
202 		/* node has a slot assigned */
203 
204 		num++;
205 
206 		if (!max || max < memb->slot)
207 			max = memb->slot;
208 
209 		/* sanity check, once slot is assigned it shouldn't change */
210 
211 		if (memb->slot_prev && memb->slot && memb->slot_prev != memb->slot) {
212 			log_error(ls, "nodeid %d slot changed %d %d",
213 				  memb->nodeid, memb->slot_prev, memb->slot);
214 			return -1;
215 		}
216 		memb->slot_prev = memb->slot;
217 	}
218 
219 	array_size = max + need;
220 
221 	array = kzalloc(array_size * sizeof(struct dlm_slot), GFP_NOFS);
222 	if (!array)
223 		return -ENOMEM;
224 
225 	num = 0;
226 
227 	/* fill in slots (offsets) that are used */
228 
229 	list_for_each_entry(memb, &ls->ls_nodes, list) {
230 		if (!memb->slot)
231 			continue;
232 
233 		if (memb->slot > array_size) {
234 			log_error(ls, "invalid slot number %d", memb->slot);
235 			kfree(array);
236 			return -1;
237 		}
238 
239 		array[memb->slot - 1].nodeid = memb->nodeid;
240 		array[memb->slot - 1].slot = memb->slot;
241 		num++;
242 	}
243 
244 	/* assign new slots from unused offsets */
245 
246 	list_for_each_entry(memb, &ls->ls_nodes, list) {
247 		if (memb->slot)
248 			continue;
249 
250 		for (i = 0; i < array_size; i++) {
251 			if (array[i].nodeid)
252 				continue;
253 
254 			memb->slot = i + 1;
255 			memb->slot_prev = memb->slot;
256 			array[i].nodeid = memb->nodeid;
257 			array[i].slot = memb->slot;
258 			num++;
259 
260 			if (!ls->ls_slot && memb->nodeid == our_nodeid)
261 				ls->ls_slot = memb->slot;
262 			break;
263 		}
264 
265 		if (!memb->slot) {
266 			log_error(ls, "no free slot found");
267 			kfree(array);
268 			return -1;
269 		}
270 	}
271 
272 	gen++;
273 
274 	log_slots(ls, gen, num, NULL, array, array_size);
275 
276 	max_slots = (dlm_config.ci_buffer_size - sizeof(struct dlm_rcom) -
277 		     sizeof(struct rcom_config)) / sizeof(struct rcom_slot);
278 
279 	if (num > max_slots) {
280 		log_error(ls, "num_slots %d exceeds max_slots %d",
281 			  num, max_slots);
282 		kfree(array);
283 		return -1;
284 	}
285 
286 	*gen_out = gen;
287 	*slots_out = array;
288 	*slots_size = array_size;
289 	*num_slots = num;
290 	return 0;
291 }
292 
add_ordered_member(struct dlm_ls * ls,struct dlm_member * new)293 static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
294 {
295 	struct dlm_member *memb = NULL;
296 	struct list_head *tmp;
297 	struct list_head *newlist = &new->list;
298 	struct list_head *head = &ls->ls_nodes;
299 
300 	list_for_each(tmp, head) {
301 		memb = list_entry(tmp, struct dlm_member, list);
302 		if (new->nodeid < memb->nodeid)
303 			break;
304 	}
305 
306 	if (!memb)
307 		list_add_tail(newlist, head);
308 	else {
309 		/* FIXME: can use list macro here */
310 		newlist->prev = tmp->prev;
311 		newlist->next = tmp;
312 		tmp->prev->next = newlist;
313 		tmp->prev = newlist;
314 	}
315 }
316 
dlm_add_member(struct dlm_ls * ls,struct dlm_config_node * node)317 static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
318 {
319 	struct dlm_member *memb;
320 	int error;
321 
322 	memb = kzalloc(sizeof(struct dlm_member), GFP_NOFS);
323 	if (!memb)
324 		return -ENOMEM;
325 
326 	error = dlm_lowcomms_connect_node(node->nodeid);
327 	if (error < 0) {
328 		kfree(memb);
329 		return error;
330 	}
331 
332 	memb->nodeid = node->nodeid;
333 	memb->weight = node->weight;
334 	memb->comm_seq = node->comm_seq;
335 	add_ordered_member(ls, memb);
336 	ls->ls_num_nodes++;
337 	return 0;
338 }
339 
find_memb(struct list_head * head,int nodeid)340 static struct dlm_member *find_memb(struct list_head *head, int nodeid)
341 {
342 	struct dlm_member *memb;
343 
344 	list_for_each_entry(memb, head, list) {
345 		if (memb->nodeid == nodeid)
346 			return memb;
347 	}
348 	return NULL;
349 }
350 
dlm_is_member(struct dlm_ls * ls,int nodeid)351 int dlm_is_member(struct dlm_ls *ls, int nodeid)
352 {
353 	if (find_memb(&ls->ls_nodes, nodeid))
354 		return 1;
355 	return 0;
356 }
357 
dlm_is_removed(struct dlm_ls * ls,int nodeid)358 int dlm_is_removed(struct dlm_ls *ls, int nodeid)
359 {
360 	if (find_memb(&ls->ls_nodes_gone, nodeid))
361 		return 1;
362 	return 0;
363 }
364 
clear_memb_list(struct list_head * head)365 static void clear_memb_list(struct list_head *head)
366 {
367 	struct dlm_member *memb;
368 
369 	while (!list_empty(head)) {
370 		memb = list_entry(head->next, struct dlm_member, list);
371 		list_del(&memb->list);
372 		kfree(memb);
373 	}
374 }
375 
dlm_clear_members(struct dlm_ls * ls)376 void dlm_clear_members(struct dlm_ls *ls)
377 {
378 	clear_memb_list(&ls->ls_nodes);
379 	ls->ls_num_nodes = 0;
380 }
381 
dlm_clear_members_gone(struct dlm_ls * ls)382 void dlm_clear_members_gone(struct dlm_ls *ls)
383 {
384 	clear_memb_list(&ls->ls_nodes_gone);
385 }
386 
make_member_array(struct dlm_ls * ls)387 static void make_member_array(struct dlm_ls *ls)
388 {
389 	struct dlm_member *memb;
390 	int i, w, x = 0, total = 0, all_zero = 0, *array;
391 
392 	kfree(ls->ls_node_array);
393 	ls->ls_node_array = NULL;
394 
395 	list_for_each_entry(memb, &ls->ls_nodes, list) {
396 		if (memb->weight)
397 			total += memb->weight;
398 	}
399 
400 	/* all nodes revert to weight of 1 if all have weight 0 */
401 
402 	if (!total) {
403 		total = ls->ls_num_nodes;
404 		all_zero = 1;
405 	}
406 
407 	ls->ls_total_weight = total;
408 
409 	array = kmalloc(sizeof(int) * total, GFP_NOFS);
410 	if (!array)
411 		return;
412 
413 	list_for_each_entry(memb, &ls->ls_nodes, list) {
414 		if (!all_zero && !memb->weight)
415 			continue;
416 
417 		if (all_zero)
418 			w = 1;
419 		else
420 			w = memb->weight;
421 
422 		DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
423 
424 		for (i = 0; i < w; i++)
425 			array[x++] = memb->nodeid;
426 	}
427 
428 	ls->ls_node_array = array;
429 }
430 
431 /* send a status request to all members just to establish comms connections */
432 
ping_members(struct dlm_ls * ls)433 static int ping_members(struct dlm_ls *ls)
434 {
435 	struct dlm_member *memb;
436 	int error = 0;
437 
438 	list_for_each_entry(memb, &ls->ls_nodes, list) {
439 		error = dlm_recovery_stopped(ls);
440 		if (error)
441 			break;
442 		error = dlm_rcom_status(ls, memb->nodeid, 0);
443 		if (error)
444 			break;
445 	}
446 	if (error)
447 		log_rinfo(ls, "ping_members aborted %d last nodeid %d",
448 			  error, ls->ls_recover_nodeid);
449 	return error;
450 }
451 
dlm_lsop_recover_prep(struct dlm_ls * ls)452 static void dlm_lsop_recover_prep(struct dlm_ls *ls)
453 {
454 	if (!ls->ls_ops || !ls->ls_ops->recover_prep)
455 		return;
456 	ls->ls_ops->recover_prep(ls->ls_ops_arg);
457 }
458 
dlm_lsop_recover_slot(struct dlm_ls * ls,struct dlm_member * memb)459 static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb)
460 {
461 	struct dlm_slot slot;
462 	uint32_t seq;
463 	int error;
464 
465 	if (!ls->ls_ops || !ls->ls_ops->recover_slot)
466 		return;
467 
468 	/* if there is no comms connection with this node
469 	   or the present comms connection is newer
470 	   than the one when this member was added, then
471 	   we consider the node to have failed (versus
472 	   being removed due to dlm_release_lockspace) */
473 
474 	error = dlm_comm_seq(memb->nodeid, &seq);
475 
476 	if (!error && seq == memb->comm_seq)
477 		return;
478 
479 	slot.nodeid = memb->nodeid;
480 	slot.slot = memb->slot;
481 
482 	ls->ls_ops->recover_slot(ls->ls_ops_arg, &slot);
483 }
484 
dlm_lsop_recover_done(struct dlm_ls * ls)485 void dlm_lsop_recover_done(struct dlm_ls *ls)
486 {
487 	struct dlm_member *memb;
488 	struct dlm_slot *slots;
489 	int i, num;
490 
491 	if (!ls->ls_ops || !ls->ls_ops->recover_done)
492 		return;
493 
494 	num = ls->ls_num_nodes;
495 
496 	slots = kzalloc(num * sizeof(struct dlm_slot), GFP_KERNEL);
497 	if (!slots)
498 		return;
499 
500 	i = 0;
501 	list_for_each_entry(memb, &ls->ls_nodes, list) {
502 		if (i == num) {
503 			log_error(ls, "dlm_lsop_recover_done bad num %d", num);
504 			goto out;
505 		}
506 		slots[i].nodeid = memb->nodeid;
507 		slots[i].slot = memb->slot;
508 		i++;
509 	}
510 
511 	ls->ls_ops->recover_done(ls->ls_ops_arg, slots, num,
512 				 ls->ls_slot, ls->ls_generation);
513  out:
514 	kfree(slots);
515 }
516 
find_config_node(struct dlm_recover * rv,int nodeid)517 static struct dlm_config_node *find_config_node(struct dlm_recover *rv,
518 						int nodeid)
519 {
520 	int i;
521 
522 	for (i = 0; i < rv->nodes_count; i++) {
523 		if (rv->nodes[i].nodeid == nodeid)
524 			return &rv->nodes[i];
525 	}
526 	return NULL;
527 }
528 
dlm_recover_members(struct dlm_ls * ls,struct dlm_recover * rv,int * neg_out)529 int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
530 {
531 	struct dlm_member *memb, *safe;
532 	struct dlm_config_node *node;
533 	int i, error, neg = 0, low = -1;
534 
535 	/* previously removed members that we've not finished removing need to
536 	   count as a negative change so the "neg" recovery steps will happen */
537 
538 	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
539 		log_rinfo(ls, "prev removed member %d", memb->nodeid);
540 		neg++;
541 	}
542 
543 	/* move departed members from ls_nodes to ls_nodes_gone */
544 
545 	list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
546 		node = find_config_node(rv, memb->nodeid);
547 		if (node && !node->new)
548 			continue;
549 
550 		if (!node) {
551 			log_rinfo(ls, "remove member %d", memb->nodeid);
552 		} else {
553 			/* removed and re-added */
554 			log_rinfo(ls, "remove member %d comm_seq %u %u",
555 				  memb->nodeid, memb->comm_seq, node->comm_seq);
556 		}
557 
558 		neg++;
559 		list_move(&memb->list, &ls->ls_nodes_gone);
560 		ls->ls_num_nodes--;
561 		dlm_lsop_recover_slot(ls, memb);
562 	}
563 
564 	/* add new members to ls_nodes */
565 
566 	for (i = 0; i < rv->nodes_count; i++) {
567 		node = &rv->nodes[i];
568 		if (dlm_is_member(ls, node->nodeid))
569 			continue;
570 		dlm_add_member(ls, node);
571 		log_rinfo(ls, "add member %d", node->nodeid);
572 	}
573 
574 	list_for_each_entry(memb, &ls->ls_nodes, list) {
575 		if (low == -1 || memb->nodeid < low)
576 			low = memb->nodeid;
577 	}
578 	ls->ls_low_nodeid = low;
579 
580 	make_member_array(ls);
581 	*neg_out = neg;
582 
583 	error = ping_members(ls);
584 	if (!error || error == -EPROTO) {
585 		/* new_lockspace() may be waiting to know if the config
586 		   is good or bad */
587 		ls->ls_members_result = error;
588 		complete(&ls->ls_members_done);
589 	}
590 
591 	log_rinfo(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes);
592 	return error;
593 }
594 
595 /* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
596    dlm_ls_start() is called on any of them to start the new recovery. */
597 
dlm_ls_stop(struct dlm_ls * ls)598 int dlm_ls_stop(struct dlm_ls *ls)
599 {
600 	int new;
601 
602 	/*
603 	 * Prevent dlm_recv from being in the middle of something when we do
604 	 * the stop.  This includes ensuring dlm_recv isn't processing a
605 	 * recovery message (rcom), while dlm_recoverd is aborting and
606 	 * resetting things from an in-progress recovery.  i.e. we want
607 	 * dlm_recoverd to abort its recovery without worrying about dlm_recv
608 	 * processing an rcom at the same time.  Stopping dlm_recv also makes
609 	 * it easy for dlm_receive_message() to check locking stopped and add a
610 	 * message to the requestqueue without races.
611 	 */
612 
613 	down_write(&ls->ls_recv_active);
614 
615 	/*
616 	 * Abort any recovery that's in progress (see RECOVER_STOP,
617 	 * dlm_recovery_stopped()) and tell any other threads running in the
618 	 * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
619 	 */
620 
621 	spin_lock(&ls->ls_recover_lock);
622 	set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
623 	new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
624 	ls->ls_recover_seq++;
625 	spin_unlock(&ls->ls_recover_lock);
626 
627 	/*
628 	 * Let dlm_recv run again, now any normal messages will be saved on the
629 	 * requestqueue for later.
630 	 */
631 
632 	up_write(&ls->ls_recv_active);
633 
634 	/*
635 	 * This in_recovery lock does two things:
636 	 * 1) Keeps this function from returning until all threads are out
637 	 *    of locking routines and locking is truly stopped.
638 	 * 2) Keeps any new requests from being processed until it's unlocked
639 	 *    when recovery is complete.
640 	 */
641 
642 	if (new) {
643 		set_bit(LSFL_RECOVER_DOWN, &ls->ls_flags);
644 		wake_up_process(ls->ls_recoverd_task);
645 		wait_event(ls->ls_recover_lock_wait,
646 			   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
647 	}
648 
649 	/*
650 	 * The recoverd suspend/resume makes sure that dlm_recoverd (if
651 	 * running) has noticed RECOVER_STOP above and quit processing the
652 	 * previous recovery.
653 	 */
654 
655 	dlm_recoverd_suspend(ls);
656 
657 	spin_lock(&ls->ls_recover_lock);
658 	kfree(ls->ls_slots);
659 	ls->ls_slots = NULL;
660 	ls->ls_num_slots = 0;
661 	ls->ls_slots_size = 0;
662 	ls->ls_recover_status = 0;
663 	spin_unlock(&ls->ls_recover_lock);
664 
665 	dlm_recoverd_resume(ls);
666 
667 	if (!ls->ls_recover_begin)
668 		ls->ls_recover_begin = jiffies;
669 
670 	dlm_lsop_recover_prep(ls);
671 	return 0;
672 }
673 
dlm_ls_start(struct dlm_ls * ls)674 int dlm_ls_start(struct dlm_ls *ls)
675 {
676 	struct dlm_recover *rv = NULL, *rv_old;
677 	struct dlm_config_node *nodes;
678 	int error, count;
679 
680 	rv = kzalloc(sizeof(struct dlm_recover), GFP_NOFS);
681 	if (!rv)
682 		return -ENOMEM;
683 
684 	error = dlm_config_nodes(ls->ls_name, &nodes, &count);
685 	if (error < 0)
686 		goto fail;
687 
688 	spin_lock(&ls->ls_recover_lock);
689 
690 	/* the lockspace needs to be stopped before it can be started */
691 
692 	if (!dlm_locking_stopped(ls)) {
693 		spin_unlock(&ls->ls_recover_lock);
694 		log_error(ls, "start ignored: lockspace running");
695 		error = -EINVAL;
696 		goto fail;
697 	}
698 
699 	rv->nodes = nodes;
700 	rv->nodes_count = count;
701 	rv->seq = ++ls->ls_recover_seq;
702 	rv_old = ls->ls_recover_args;
703 	ls->ls_recover_args = rv;
704 	spin_unlock(&ls->ls_recover_lock);
705 
706 	if (rv_old) {
707 		log_error(ls, "unused recovery %llx %d",
708 			  (unsigned long long)rv_old->seq, rv_old->nodes_count);
709 		kfree(rv_old->nodes);
710 		kfree(rv_old);
711 	}
712 
713 	set_bit(LSFL_RECOVER_WORK, &ls->ls_flags);
714 	wake_up_process(ls->ls_recoverd_task);
715 	return 0;
716 
717  fail:
718 	kfree(rv);
719 	kfree(nodes);
720 	return error;
721 }
722 
723