• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmmod.c
5  *
6  * standalone DLM module
7  *
8  * Copyright (C) 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  *
25  */
26 
27 
28 #include <linux/module.h>
29 #include <linux/fs.h>
30 #include <linux/types.h>
31 #include <linux/slab.h>
32 #include <linux/highmem.h>
33 #include <linux/init.h>
34 #include <linux/sysctl.h>
35 #include <linux/random.h>
36 #include <linux/blkdev.h>
37 #include <linux/socket.h>
38 #include <linux/inet.h>
39 #include <linux/spinlock.h>
40 #include <linux/delay.h>
41 
42 
43 #include "cluster/heartbeat.h"
44 #include "cluster/nodemanager.h"
45 #include "cluster/tcp.h"
46 
47 #include "dlmapi.h"
48 #include "dlmcommon.h"
49 #include "dlmdomain.h"
50 #include "dlmdebug.h"
51 
52 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
53 #include "cluster/masklog.h"
54 
55 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
56 			      struct dlm_master_list_entry *mle,
57 			      struct o2nm_node *node,
58 			      int idx);
59 static void dlm_mle_node_up(struct dlm_ctxt *dlm,
60 			    struct dlm_master_list_entry *mle,
61 			    struct o2nm_node *node,
62 			    int idx);
63 
64 static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
65 static int dlm_do_assert_master(struct dlm_ctxt *dlm,
66 				struct dlm_lock_resource *res,
67 				void *nodemap, u32 flags);
68 static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data);
69 
dlm_mle_equal(struct dlm_ctxt * dlm,struct dlm_master_list_entry * mle,const char * name,unsigned int namelen)70 static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
71 				struct dlm_master_list_entry *mle,
72 				const char *name,
73 				unsigned int namelen)
74 {
75 	if (dlm != mle->dlm)
76 		return 0;
77 
78 	if (namelen != mle->mnamelen ||
79 	    memcmp(name, mle->mname, namelen) != 0)
80 		return 0;
81 
82 	return 1;
83 }
84 
85 static struct kmem_cache *dlm_lockres_cache;
86 static struct kmem_cache *dlm_lockname_cache;
87 static struct kmem_cache *dlm_mle_cache;
88 
89 static void dlm_mle_release(struct kref *kref);
90 static void dlm_init_mle(struct dlm_master_list_entry *mle,
91 			enum dlm_mle_type type,
92 			struct dlm_ctxt *dlm,
93 			struct dlm_lock_resource *res,
94 			const char *name,
95 			unsigned int namelen);
96 static void dlm_put_mle(struct dlm_master_list_entry *mle);
97 static void __dlm_put_mle(struct dlm_master_list_entry *mle);
98 static int dlm_find_mle(struct dlm_ctxt *dlm,
99 			struct dlm_master_list_entry **mle,
100 			char *name, unsigned int namelen);
101 
102 static int dlm_do_master_request(struct dlm_lock_resource *res,
103 				 struct dlm_master_list_entry *mle, int to);
104 
105 
106 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
107 				     struct dlm_lock_resource *res,
108 				     struct dlm_master_list_entry *mle,
109 				     int *blocked);
110 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
111 				    struct dlm_lock_resource *res,
112 				    struct dlm_master_list_entry *mle,
113 				    int blocked);
114 static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
115 				 struct dlm_lock_resource *res,
116 				 struct dlm_master_list_entry *mle,
117 				 struct dlm_master_list_entry **oldmle,
118 				 const char *name, unsigned int namelen,
119 				 u8 new_master, u8 master);
120 
121 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
122 				    struct dlm_lock_resource *res);
123 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
124 				      struct dlm_lock_resource *res);
125 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
126 				       struct dlm_lock_resource *res,
127 				       u8 target);
128 static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
129 				       struct dlm_lock_resource *res);
130 
131 
dlm_is_host_down(int errno)132 int dlm_is_host_down(int errno)
133 {
134 	switch (errno) {
135 		case -EBADF:
136 		case -ECONNREFUSED:
137 		case -ENOTCONN:
138 		case -ECONNRESET:
139 		case -EPIPE:
140 		case -EHOSTDOWN:
141 		case -EHOSTUNREACH:
142 		case -ETIMEDOUT:
143 		case -ECONNABORTED:
144 		case -ENETDOWN:
145 		case -ENETUNREACH:
146 		case -ENETRESET:
147 		case -ESHUTDOWN:
148 		case -ENOPROTOOPT:
149 		case -EINVAL:   /* if returned from our tcp code,
150 				   this means there is no socket */
151 			return 1;
152 	}
153 	return 0;
154 }
155 
156 
157 /*
158  * MASTER LIST FUNCTIONS
159  */
160 
161 
162 /*
163  * regarding master list entries and heartbeat callbacks:
164  *
165  * in order to avoid sleeping and allocation that occurs in
166  * heartbeat, master list entries are simply attached to the
167  * dlm's established heartbeat callbacks.  the mle is attached
168  * when it is created, and since the dlm->spinlock is held at
169  * that time, any heartbeat event will be properly discovered
170  * by the mle.  the mle needs to be detached from the
171  * dlm->mle_hb_events list as soon as heartbeat events are no
172  * longer useful to the mle, and before the mle is freed.
173  *
174  * as a general rule, heartbeat events are no longer needed by
175  * the mle once an "answer" regarding the lock master has been
176  * received.
177  */
__dlm_mle_attach_hb_events(struct dlm_ctxt * dlm,struct dlm_master_list_entry * mle)178 static inline void __dlm_mle_attach_hb_events(struct dlm_ctxt *dlm,
179 					      struct dlm_master_list_entry *mle)
180 {
181 	assert_spin_locked(&dlm->spinlock);
182 
183 	list_add_tail(&mle->hb_events, &dlm->mle_hb_events);
184 }
185 
186 
__dlm_mle_detach_hb_events(struct dlm_ctxt * dlm,struct dlm_master_list_entry * mle)187 static inline void __dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
188 					      struct dlm_master_list_entry *mle)
189 {
190 	if (!list_empty(&mle->hb_events))
191 		list_del_init(&mle->hb_events);
192 }
193 
194 
dlm_mle_detach_hb_events(struct dlm_ctxt * dlm,struct dlm_master_list_entry * mle)195 static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
196 					    struct dlm_master_list_entry *mle)
197 {
198 	spin_lock(&dlm->spinlock);
199 	__dlm_mle_detach_hb_events(dlm, mle);
200 	spin_unlock(&dlm->spinlock);
201 }
202 
dlm_get_mle_inuse(struct dlm_master_list_entry * mle)203 static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
204 {
205 	struct dlm_ctxt *dlm;
206 	dlm = mle->dlm;
207 
208 	assert_spin_locked(&dlm->spinlock);
209 	assert_spin_locked(&dlm->master_lock);
210 	mle->inuse++;
211 	kref_get(&mle->mle_refs);
212 }
213 
dlm_put_mle_inuse(struct dlm_master_list_entry * mle)214 static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
215 {
216 	struct dlm_ctxt *dlm;
217 	dlm = mle->dlm;
218 
219 	spin_lock(&dlm->spinlock);
220 	spin_lock(&dlm->master_lock);
221 	mle->inuse--;
222 	__dlm_put_mle(mle);
223 	spin_unlock(&dlm->master_lock);
224 	spin_unlock(&dlm->spinlock);
225 
226 }
227 
228 /* remove from list and free */
__dlm_put_mle(struct dlm_master_list_entry * mle)229 static void __dlm_put_mle(struct dlm_master_list_entry *mle)
230 {
231 	struct dlm_ctxt *dlm;
232 	dlm = mle->dlm;
233 
234 	assert_spin_locked(&dlm->spinlock);
235 	assert_spin_locked(&dlm->master_lock);
236 	if (!atomic_read(&mle->mle_refs.refcount)) {
237 		/* this may or may not crash, but who cares.
238 		 * it's a BUG. */
239 		mlog(ML_ERROR, "bad mle: %p\n", mle);
240 		dlm_print_one_mle(mle);
241 		BUG();
242 	} else
243 		kref_put(&mle->mle_refs, dlm_mle_release);
244 }
245 
246 
247 /* must not have any spinlocks coming in */
dlm_put_mle(struct dlm_master_list_entry * mle)248 static void dlm_put_mle(struct dlm_master_list_entry *mle)
249 {
250 	struct dlm_ctxt *dlm;
251 	dlm = mle->dlm;
252 
253 	spin_lock(&dlm->spinlock);
254 	spin_lock(&dlm->master_lock);
255 	__dlm_put_mle(mle);
256 	spin_unlock(&dlm->master_lock);
257 	spin_unlock(&dlm->spinlock);
258 }
259 
dlm_get_mle(struct dlm_master_list_entry * mle)260 static inline void dlm_get_mle(struct dlm_master_list_entry *mle)
261 {
262 	kref_get(&mle->mle_refs);
263 }
264 
dlm_init_mle(struct dlm_master_list_entry * mle,enum dlm_mle_type type,struct dlm_ctxt * dlm,struct dlm_lock_resource * res,const char * name,unsigned int namelen)265 static void dlm_init_mle(struct dlm_master_list_entry *mle,
266 			enum dlm_mle_type type,
267 			struct dlm_ctxt *dlm,
268 			struct dlm_lock_resource *res,
269 			const char *name,
270 			unsigned int namelen)
271 {
272 	assert_spin_locked(&dlm->spinlock);
273 
274 	mle->dlm = dlm;
275 	mle->type = type;
276 	INIT_HLIST_NODE(&mle->master_hash_node);
277 	INIT_LIST_HEAD(&mle->hb_events);
278 	memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
279 	spin_lock_init(&mle->spinlock);
280 	init_waitqueue_head(&mle->wq);
281 	atomic_set(&mle->woken, 0);
282 	kref_init(&mle->mle_refs);
283 	memset(mle->response_map, 0, sizeof(mle->response_map));
284 	mle->master = O2NM_MAX_NODES;
285 	mle->new_master = O2NM_MAX_NODES;
286 	mle->inuse = 0;
287 
288 	BUG_ON(mle->type != DLM_MLE_BLOCK &&
289 	       mle->type != DLM_MLE_MASTER &&
290 	       mle->type != DLM_MLE_MIGRATION);
291 
292 	if (mle->type == DLM_MLE_MASTER) {
293 		BUG_ON(!res);
294 		mle->mleres = res;
295 		memcpy(mle->mname, res->lockname.name, res->lockname.len);
296 		mle->mnamelen = res->lockname.len;
297 		mle->mnamehash = res->lockname.hash;
298 	} else {
299 		BUG_ON(!name);
300 		mle->mleres = NULL;
301 		memcpy(mle->mname, name, namelen);
302 		mle->mnamelen = namelen;
303 		mle->mnamehash = dlm_lockid_hash(name, namelen);
304 	}
305 
306 	atomic_inc(&dlm->mle_tot_count[mle->type]);
307 	atomic_inc(&dlm->mle_cur_count[mle->type]);
308 
309 	/* copy off the node_map and register hb callbacks on our copy */
310 	memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map));
311 	memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map));
312 	clear_bit(dlm->node_num, mle->vote_map);
313 	clear_bit(dlm->node_num, mle->node_map);
314 
315 	/* attach the mle to the domain node up/down events */
316 	__dlm_mle_attach_hb_events(dlm, mle);
317 }
318 
__dlm_unlink_mle(struct dlm_ctxt * dlm,struct dlm_master_list_entry * mle)319 void __dlm_unlink_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
320 {
321 	assert_spin_locked(&dlm->spinlock);
322 	assert_spin_locked(&dlm->master_lock);
323 
324 	if (!hlist_unhashed(&mle->master_hash_node))
325 		hlist_del_init(&mle->master_hash_node);
326 }
327 
__dlm_insert_mle(struct dlm_ctxt * dlm,struct dlm_master_list_entry * mle)328 void __dlm_insert_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
329 {
330 	struct hlist_head *bucket;
331 
332 	assert_spin_locked(&dlm->master_lock);
333 
334 	bucket = dlm_master_hash(dlm, mle->mnamehash);
335 	hlist_add_head(&mle->master_hash_node, bucket);
336 }
337 
338 /* returns 1 if found, 0 if not */
dlm_find_mle(struct dlm_ctxt * dlm,struct dlm_master_list_entry ** mle,char * name,unsigned int namelen)339 static int dlm_find_mle(struct dlm_ctxt *dlm,
340 			struct dlm_master_list_entry **mle,
341 			char *name, unsigned int namelen)
342 {
343 	struct dlm_master_list_entry *tmpmle;
344 	struct hlist_head *bucket;
345 	unsigned int hash;
346 
347 	assert_spin_locked(&dlm->master_lock);
348 
349 	hash = dlm_lockid_hash(name, namelen);
350 	bucket = dlm_master_hash(dlm, hash);
351 	hlist_for_each_entry(tmpmle, bucket, master_hash_node) {
352 		if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
353 			continue;
354 		dlm_get_mle(tmpmle);
355 		*mle = tmpmle;
356 		return 1;
357 	}
358 	return 0;
359 }
360 
dlm_hb_event_notify_attached(struct dlm_ctxt * dlm,int idx,int node_up)361 void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up)
362 {
363 	struct dlm_master_list_entry *mle;
364 
365 	assert_spin_locked(&dlm->spinlock);
366 
367 	list_for_each_entry(mle, &dlm->mle_hb_events, hb_events) {
368 		if (node_up)
369 			dlm_mle_node_up(dlm, mle, NULL, idx);
370 		else
371 			dlm_mle_node_down(dlm, mle, NULL, idx);
372 	}
373 }
374 
dlm_mle_node_down(struct dlm_ctxt * dlm,struct dlm_master_list_entry * mle,struct o2nm_node * node,int idx)375 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
376 			      struct dlm_master_list_entry *mle,
377 			      struct o2nm_node *node, int idx)
378 {
379 	spin_lock(&mle->spinlock);
380 
381 	if (!test_bit(idx, mle->node_map))
382 		mlog(0, "node %u already removed from nodemap!\n", idx);
383 	else
384 		clear_bit(idx, mle->node_map);
385 
386 	spin_unlock(&mle->spinlock);
387 }
388 
dlm_mle_node_up(struct dlm_ctxt * dlm,struct dlm_master_list_entry * mle,struct o2nm_node * node,int idx)389 static void dlm_mle_node_up(struct dlm_ctxt *dlm,
390 			    struct dlm_master_list_entry *mle,
391 			    struct o2nm_node *node, int idx)
392 {
393 	spin_lock(&mle->spinlock);
394 
395 	if (test_bit(idx, mle->node_map))
396 		mlog(0, "node %u already in node map!\n", idx);
397 	else
398 		set_bit(idx, mle->node_map);
399 
400 	spin_unlock(&mle->spinlock);
401 }
402 
403 
dlm_init_mle_cache(void)404 int dlm_init_mle_cache(void)
405 {
406 	dlm_mle_cache = kmem_cache_create("o2dlm_mle",
407 					  sizeof(struct dlm_master_list_entry),
408 					  0, SLAB_HWCACHE_ALIGN,
409 					  NULL);
410 	if (dlm_mle_cache == NULL)
411 		return -ENOMEM;
412 	return 0;
413 }
414 
dlm_destroy_mle_cache(void)415 void dlm_destroy_mle_cache(void)
416 {
417 	if (dlm_mle_cache)
418 		kmem_cache_destroy(dlm_mle_cache);
419 }
420 
dlm_mle_release(struct kref * kref)421 static void dlm_mle_release(struct kref *kref)
422 {
423 	struct dlm_master_list_entry *mle;
424 	struct dlm_ctxt *dlm;
425 
426 	mle = container_of(kref, struct dlm_master_list_entry, mle_refs);
427 	dlm = mle->dlm;
428 
429 	assert_spin_locked(&dlm->spinlock);
430 	assert_spin_locked(&dlm->master_lock);
431 
432 	mlog(0, "Releasing mle for %.*s, type %d\n", mle->mnamelen, mle->mname,
433 	     mle->type);
434 
435 	/* remove from list if not already */
436 	__dlm_unlink_mle(dlm, mle);
437 
438 	/* detach the mle from the domain node up/down events */
439 	__dlm_mle_detach_hb_events(dlm, mle);
440 
441 	atomic_dec(&dlm->mle_cur_count[mle->type]);
442 
443 	/* NOTE: kfree under spinlock here.
444 	 * if this is bad, we can move this to a freelist. */
445 	kmem_cache_free(dlm_mle_cache, mle);
446 }
447 
448 
449 /*
450  * LOCK RESOURCE FUNCTIONS
451  */
452 
dlm_init_master_caches(void)453 int dlm_init_master_caches(void)
454 {
455 	dlm_lockres_cache = kmem_cache_create("o2dlm_lockres",
456 					      sizeof(struct dlm_lock_resource),
457 					      0, SLAB_HWCACHE_ALIGN, NULL);
458 	if (!dlm_lockres_cache)
459 		goto bail;
460 
461 	dlm_lockname_cache = kmem_cache_create("o2dlm_lockname",
462 					       DLM_LOCKID_NAME_MAX, 0,
463 					       SLAB_HWCACHE_ALIGN, NULL);
464 	if (!dlm_lockname_cache)
465 		goto bail;
466 
467 	return 0;
468 bail:
469 	dlm_destroy_master_caches();
470 	return -ENOMEM;
471 }
472 
dlm_destroy_master_caches(void)473 void dlm_destroy_master_caches(void)
474 {
475 	if (dlm_lockname_cache) {
476 		kmem_cache_destroy(dlm_lockname_cache);
477 		dlm_lockname_cache = NULL;
478 	}
479 
480 	if (dlm_lockres_cache) {
481 		kmem_cache_destroy(dlm_lockres_cache);
482 		dlm_lockres_cache = NULL;
483 	}
484 }
485 
dlm_lockres_release(struct kref * kref)486 static void dlm_lockres_release(struct kref *kref)
487 {
488 	struct dlm_lock_resource *res;
489 	struct dlm_ctxt *dlm;
490 
491 	res = container_of(kref, struct dlm_lock_resource, refs);
492 	dlm = res->dlm;
493 
494 	/* This should not happen -- all lockres' have a name
495 	 * associated with them at init time. */
496 	BUG_ON(!res->lockname.name);
497 
498 	mlog(0, "destroying lockres %.*s\n", res->lockname.len,
499 	     res->lockname.name);
500 
501 	spin_lock(&dlm->track_lock);
502 	if (!list_empty(&res->tracking))
503 		list_del_init(&res->tracking);
504 	else {
505 		mlog(ML_ERROR, "Resource %.*s not on the Tracking list\n",
506 		     res->lockname.len, res->lockname.name);
507 		dlm_print_one_lock_resource(res);
508 	}
509 	spin_unlock(&dlm->track_lock);
510 
511 	atomic_dec(&dlm->res_cur_count);
512 
513 	if (!hlist_unhashed(&res->hash_node) ||
514 	    !list_empty(&res->granted) ||
515 	    !list_empty(&res->converting) ||
516 	    !list_empty(&res->blocked) ||
517 	    !list_empty(&res->dirty) ||
518 	    !list_empty(&res->recovering) ||
519 	    !list_empty(&res->purge)) {
520 		mlog(ML_ERROR,
521 		     "Going to BUG for resource %.*s."
522 		     "  We're on a list! [%c%c%c%c%c%c%c]\n",
523 		     res->lockname.len, res->lockname.name,
524 		     !hlist_unhashed(&res->hash_node) ? 'H' : ' ',
525 		     !list_empty(&res->granted) ? 'G' : ' ',
526 		     !list_empty(&res->converting) ? 'C' : ' ',
527 		     !list_empty(&res->blocked) ? 'B' : ' ',
528 		     !list_empty(&res->dirty) ? 'D' : ' ',
529 		     !list_empty(&res->recovering) ? 'R' : ' ',
530 		     !list_empty(&res->purge) ? 'P' : ' ');
531 
532 		dlm_print_one_lock_resource(res);
533 	}
534 
535 	/* By the time we're ready to blow this guy away, we shouldn't
536 	 * be on any lists. */
537 	BUG_ON(!hlist_unhashed(&res->hash_node));
538 	BUG_ON(!list_empty(&res->granted));
539 	BUG_ON(!list_empty(&res->converting));
540 	BUG_ON(!list_empty(&res->blocked));
541 	BUG_ON(!list_empty(&res->dirty));
542 	BUG_ON(!list_empty(&res->recovering));
543 	BUG_ON(!list_empty(&res->purge));
544 
545 	kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
546 
547 	kmem_cache_free(dlm_lockres_cache, res);
548 }
549 
dlm_lockres_put(struct dlm_lock_resource * res)550 void dlm_lockres_put(struct dlm_lock_resource *res)
551 {
552 	kref_put(&res->refs, dlm_lockres_release);
553 }
554 
dlm_init_lockres(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,const char * name,unsigned int namelen)555 static void dlm_init_lockres(struct dlm_ctxt *dlm,
556 			     struct dlm_lock_resource *res,
557 			     const char *name, unsigned int namelen)
558 {
559 	char *qname;
560 
561 	/* If we memset here, we lose our reference to the kmalloc'd
562 	 * res->lockname.name, so be sure to init every field
563 	 * correctly! */
564 
565 	qname = (char *) res->lockname.name;
566 	memcpy(qname, name, namelen);
567 
568 	res->lockname.len = namelen;
569 	res->lockname.hash = dlm_lockid_hash(name, namelen);
570 
571 	init_waitqueue_head(&res->wq);
572 	spin_lock_init(&res->spinlock);
573 	INIT_HLIST_NODE(&res->hash_node);
574 	INIT_LIST_HEAD(&res->granted);
575 	INIT_LIST_HEAD(&res->converting);
576 	INIT_LIST_HEAD(&res->blocked);
577 	INIT_LIST_HEAD(&res->dirty);
578 	INIT_LIST_HEAD(&res->recovering);
579 	INIT_LIST_HEAD(&res->purge);
580 	INIT_LIST_HEAD(&res->tracking);
581 	atomic_set(&res->asts_reserved, 0);
582 	res->migration_pending = 0;
583 	res->inflight_locks = 0;
584 	res->inflight_assert_workers = 0;
585 
586 	res->dlm = dlm;
587 
588 	kref_init(&res->refs);
589 
590 	atomic_inc(&dlm->res_tot_count);
591 	atomic_inc(&dlm->res_cur_count);
592 
593 	/* just for consistency */
594 	spin_lock(&res->spinlock);
595 	dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
596 	spin_unlock(&res->spinlock);
597 
598 	res->state = DLM_LOCK_RES_IN_PROGRESS;
599 
600 	res->last_used = 0;
601 
602 	spin_lock(&dlm->spinlock);
603 	list_add_tail(&res->tracking, &dlm->tracking_list);
604 	spin_unlock(&dlm->spinlock);
605 
606 	memset(res->lvb, 0, DLM_LVB_LEN);
607 	memset(res->refmap, 0, sizeof(res->refmap));
608 }
609 
dlm_new_lockres(struct dlm_ctxt * dlm,const char * name,unsigned int namelen)610 struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
611 				   const char *name,
612 				   unsigned int namelen)
613 {
614 	struct dlm_lock_resource *res = NULL;
615 
616 	res = kmem_cache_zalloc(dlm_lockres_cache, GFP_NOFS);
617 	if (!res)
618 		goto error;
619 
620 	res->lockname.name = kmem_cache_zalloc(dlm_lockname_cache, GFP_NOFS);
621 	if (!res->lockname.name)
622 		goto error;
623 
624 	dlm_init_lockres(dlm, res, name, namelen);
625 	return res;
626 
627 error:
628 	if (res)
629 		kmem_cache_free(dlm_lockres_cache, res);
630 	return NULL;
631 }
632 
dlm_lockres_set_refmap_bit(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,int bit)633 void dlm_lockres_set_refmap_bit(struct dlm_ctxt *dlm,
634 				struct dlm_lock_resource *res, int bit)
635 {
636 	assert_spin_locked(&res->spinlock);
637 
638 	mlog(0, "res %.*s, set node %u, %ps()\n", res->lockname.len,
639 	     res->lockname.name, bit, __builtin_return_address(0));
640 
641 	set_bit(bit, res->refmap);
642 }
643 
dlm_lockres_clear_refmap_bit(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,int bit)644 void dlm_lockres_clear_refmap_bit(struct dlm_ctxt *dlm,
645 				  struct dlm_lock_resource *res, int bit)
646 {
647 	assert_spin_locked(&res->spinlock);
648 
649 	mlog(0, "res %.*s, clr node %u, %ps()\n", res->lockname.len,
650 	     res->lockname.name, bit, __builtin_return_address(0));
651 
652 	clear_bit(bit, res->refmap);
653 }
654 
__dlm_lockres_grab_inflight_ref(struct dlm_ctxt * dlm,struct dlm_lock_resource * res)655 static void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
656 				   struct dlm_lock_resource *res)
657 {
658 	res->inflight_locks++;
659 
660 	mlog(0, "%s: res %.*s, inflight++: now %u, %ps()\n", dlm->name,
661 	     res->lockname.len, res->lockname.name, res->inflight_locks,
662 	     __builtin_return_address(0));
663 }
664 
dlm_lockres_grab_inflight_ref(struct dlm_ctxt * dlm,struct dlm_lock_resource * res)665 void dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
666 				   struct dlm_lock_resource *res)
667 {
668 	assert_spin_locked(&res->spinlock);
669 	__dlm_lockres_grab_inflight_ref(dlm, res);
670 }
671 
dlm_lockres_drop_inflight_ref(struct dlm_ctxt * dlm,struct dlm_lock_resource * res)672 void dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
673 				   struct dlm_lock_resource *res)
674 {
675 	assert_spin_locked(&res->spinlock);
676 
677 	BUG_ON(res->inflight_locks == 0);
678 
679 	res->inflight_locks--;
680 
681 	mlog(0, "%s: res %.*s, inflight--: now %u, %ps()\n", dlm->name,
682 	     res->lockname.len, res->lockname.name, res->inflight_locks,
683 	     __builtin_return_address(0));
684 
685 	wake_up(&res->wq);
686 }
687 
__dlm_lockres_grab_inflight_worker(struct dlm_ctxt * dlm,struct dlm_lock_resource * res)688 void __dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm,
689 		struct dlm_lock_resource *res)
690 {
691 	assert_spin_locked(&res->spinlock);
692 	res->inflight_assert_workers++;
693 	mlog(0, "%s:%.*s: inflight assert worker++: now %u\n",
694 			dlm->name, res->lockname.len, res->lockname.name,
695 			res->inflight_assert_workers);
696 }
697 
dlm_lockres_grab_inflight_worker(struct dlm_ctxt * dlm,struct dlm_lock_resource * res)698 static void dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm,
699 		struct dlm_lock_resource *res)
700 {
701 	spin_lock(&res->spinlock);
702 	__dlm_lockres_grab_inflight_worker(dlm, res);
703 	spin_unlock(&res->spinlock);
704 }
705 
__dlm_lockres_drop_inflight_worker(struct dlm_ctxt * dlm,struct dlm_lock_resource * res)706 static void __dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
707 		struct dlm_lock_resource *res)
708 {
709 	assert_spin_locked(&res->spinlock);
710 	BUG_ON(res->inflight_assert_workers == 0);
711 	res->inflight_assert_workers--;
712 	mlog(0, "%s:%.*s: inflight assert worker--: now %u\n",
713 			dlm->name, res->lockname.len, res->lockname.name,
714 			res->inflight_assert_workers);
715 }
716 
dlm_lockres_drop_inflight_worker(struct dlm_ctxt * dlm,struct dlm_lock_resource * res)717 static void dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
718 		struct dlm_lock_resource *res)
719 {
720 	spin_lock(&res->spinlock);
721 	__dlm_lockres_drop_inflight_worker(dlm, res);
722 	spin_unlock(&res->spinlock);
723 }
724 
725 /*
726  * lookup a lock resource by name.
727  * may already exist in the hashtable.
728  * lockid is null terminated
729  *
730  * if not, allocate enough for the lockres and for
731  * the temporary structure used in doing the mastering.
732  *
733  * also, do a lookup in the dlm->master_list to see
734  * if another node has begun mastering the same lock.
735  * if so, there should be a block entry in there
736  * for this name, and we should *not* attempt to master
737  * the lock here.   need to wait around for that node
738  * to assert_master (or die).
739  *
740  */
dlm_get_lock_resource(struct dlm_ctxt * dlm,const char * lockid,int namelen,int flags)741 struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
742 					  const char *lockid,
743 					  int namelen,
744 					  int flags)
745 {
746 	struct dlm_lock_resource *tmpres=NULL, *res=NULL;
747 	struct dlm_master_list_entry *mle = NULL;
748 	struct dlm_master_list_entry *alloc_mle = NULL;
749 	int blocked = 0;
750 	int ret, nodenum;
751 	struct dlm_node_iter iter;
752 	unsigned int hash;
753 	int tries = 0;
754 	int bit, wait_on_recovery = 0;
755 
756 	BUG_ON(!lockid);
757 
758 	hash = dlm_lockid_hash(lockid, namelen);
759 
760 	mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
761 
762 lookup:
763 	spin_lock(&dlm->spinlock);
764 	tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
765 	if (tmpres) {
766 		spin_unlock(&dlm->spinlock);
767 		spin_lock(&tmpres->spinlock);
768 
769 		/*
770 		 * Right after dlm spinlock was released, dlm_thread could have
771 		 * purged the lockres. Check if lockres got unhashed. If so
772 		 * start over.
773 		 */
774 		if (hlist_unhashed(&tmpres->hash_node)) {
775 			spin_unlock(&tmpres->spinlock);
776 			dlm_lockres_put(tmpres);
777 			tmpres = NULL;
778 			goto lookup;
779 		}
780 
781 		/* Wait on the thread that is mastering the resource */
782 		if (tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
783 			__dlm_wait_on_lockres(tmpres);
784 			BUG_ON(tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN);
785 			spin_unlock(&tmpres->spinlock);
786 			dlm_lockres_put(tmpres);
787 			tmpres = NULL;
788 			goto lookup;
789 		}
790 
791 		/* Wait on the resource purge to complete before continuing */
792 		if (tmpres->state & DLM_LOCK_RES_DROPPING_REF) {
793 			BUG_ON(tmpres->owner == dlm->node_num);
794 			__dlm_wait_on_lockres_flags(tmpres,
795 						    DLM_LOCK_RES_DROPPING_REF);
796 			spin_unlock(&tmpres->spinlock);
797 			dlm_lockres_put(tmpres);
798 			tmpres = NULL;
799 			goto lookup;
800 		}
801 
802 		/* Grab inflight ref to pin the resource */
803 		dlm_lockres_grab_inflight_ref(dlm, tmpres);
804 
805 		spin_unlock(&tmpres->spinlock);
806 		if (res)
807 			dlm_lockres_put(res);
808 		res = tmpres;
809 		goto leave;
810 	}
811 
812 	if (!res) {
813 		spin_unlock(&dlm->spinlock);
814 		mlog(0, "allocating a new resource\n");
815 		/* nothing found and we need to allocate one. */
816 		alloc_mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
817 		if (!alloc_mle)
818 			goto leave;
819 		res = dlm_new_lockres(dlm, lockid, namelen);
820 		if (!res)
821 			goto leave;
822 		goto lookup;
823 	}
824 
825 	mlog(0, "no lockres found, allocated our own: %p\n", res);
826 
827 	if (flags & LKM_LOCAL) {
828 		/* caller knows it's safe to assume it's not mastered elsewhere
829 		 * DONE!  return right away */
830 		spin_lock(&res->spinlock);
831 		dlm_change_lockres_owner(dlm, res, dlm->node_num);
832 		__dlm_insert_lockres(dlm, res);
833 		dlm_lockres_grab_inflight_ref(dlm, res);
834 		spin_unlock(&res->spinlock);
835 		spin_unlock(&dlm->spinlock);
836 		/* lockres still marked IN_PROGRESS */
837 		goto wake_waiters;
838 	}
839 
840 	/* check master list to see if another node has started mastering it */
841 	spin_lock(&dlm->master_lock);
842 
843 	/* if we found a block, wait for lock to be mastered by another node */
844 	blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
845 	if (blocked) {
846 		int mig;
847 		if (mle->type == DLM_MLE_MASTER) {
848 			mlog(ML_ERROR, "master entry for nonexistent lock!\n");
849 			BUG();
850 		}
851 		mig = (mle->type == DLM_MLE_MIGRATION);
852 		/* if there is a migration in progress, let the migration
853 		 * finish before continuing.  we can wait for the absence
854 		 * of the MIGRATION mle: either the migrate finished or
855 		 * one of the nodes died and the mle was cleaned up.
856 		 * if there is a BLOCK here, but it already has a master
857 		 * set, we are too late.  the master does not have a ref
858 		 * for us in the refmap.  detach the mle and drop it.
859 		 * either way, go back to the top and start over. */
860 		if (mig || mle->master != O2NM_MAX_NODES) {
861 			BUG_ON(mig && mle->master == dlm->node_num);
862 			/* we arrived too late.  the master does not
863 			 * have a ref for us. retry. */
864 			mlog(0, "%s:%.*s: late on %s\n",
865 			     dlm->name, namelen, lockid,
866 			     mig ?  "MIGRATION" : "BLOCK");
867 			spin_unlock(&dlm->master_lock);
868 			spin_unlock(&dlm->spinlock);
869 
870 			/* master is known, detach */
871 			if (!mig)
872 				dlm_mle_detach_hb_events(dlm, mle);
873 			dlm_put_mle(mle);
874 			mle = NULL;
875 			/* this is lame, but we can't wait on either
876 			 * the mle or lockres waitqueue here */
877 			if (mig)
878 				msleep(100);
879 			goto lookup;
880 		}
881 	} else {
882 		/* go ahead and try to master lock on this node */
883 		mle = alloc_mle;
884 		/* make sure this does not get freed below */
885 		alloc_mle = NULL;
886 		dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
887 		set_bit(dlm->node_num, mle->maybe_map);
888 		__dlm_insert_mle(dlm, mle);
889 
890 		/* still holding the dlm spinlock, check the recovery map
891 		 * to see if there are any nodes that still need to be
892 		 * considered.  these will not appear in the mle nodemap
893 		 * but they might own this lockres.  wait on them. */
894 		bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
895 		if (bit < O2NM_MAX_NODES) {
896 			mlog(0, "%s: res %.*s, At least one node (%d) "
897 			     "to recover before lock mastery can begin\n",
898 			     dlm->name, namelen, (char *)lockid, bit);
899 			wait_on_recovery = 1;
900 		}
901 	}
902 
903 	/* at this point there is either a DLM_MLE_BLOCK or a
904 	 * DLM_MLE_MASTER on the master list, so it's safe to add the
905 	 * lockres to the hashtable.  anyone who finds the lock will
906 	 * still have to wait on the IN_PROGRESS. */
907 
908 	/* finally add the lockres to its hash bucket */
909 	__dlm_insert_lockres(dlm, res);
910 
911 	/* since this lockres is new it doesn't not require the spinlock */
912 	__dlm_lockres_grab_inflight_ref(dlm, res);
913 
914 	/* get an extra ref on the mle in case this is a BLOCK
915 	 * if so, the creator of the BLOCK may try to put the last
916 	 * ref at this time in the assert master handler, so we
917 	 * need an extra one to keep from a bad ptr deref. */
918 	dlm_get_mle_inuse(mle);
919 	spin_unlock(&dlm->master_lock);
920 	spin_unlock(&dlm->spinlock);
921 
922 redo_request:
923 	while (wait_on_recovery) {
924 		/* any cluster changes that occurred after dropping the
925 		 * dlm spinlock would be detectable be a change on the mle,
926 		 * so we only need to clear out the recovery map once. */
927 		if (dlm_is_recovery_lock(lockid, namelen)) {
928 			mlog(0, "%s: Recovery map is not empty, but must "
929 			     "master $RECOVERY lock now\n", dlm->name);
930 			if (!dlm_pre_master_reco_lockres(dlm, res))
931 				wait_on_recovery = 0;
932 			else {
933 				mlog(0, "%s: waiting 500ms for heartbeat state "
934 				    "change\n", dlm->name);
935 				msleep(500);
936 			}
937 			continue;
938 		}
939 
940 		dlm_kick_recovery_thread(dlm);
941 		msleep(1000);
942 		dlm_wait_for_recovery(dlm);
943 
944 		spin_lock(&dlm->spinlock);
945 		bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
946 		if (bit < O2NM_MAX_NODES) {
947 			mlog(0, "%s: res %.*s, At least one node (%d) "
948 			     "to recover before lock mastery can begin\n",
949 			     dlm->name, namelen, (char *)lockid, bit);
950 			wait_on_recovery = 1;
951 		} else
952 			wait_on_recovery = 0;
953 		spin_unlock(&dlm->spinlock);
954 
955 		if (wait_on_recovery)
956 			dlm_wait_for_node_recovery(dlm, bit, 10000);
957 	}
958 
959 	/* must wait for lock to be mastered elsewhere */
960 	if (blocked)
961 		goto wait;
962 
963 	ret = -EINVAL;
964 	dlm_node_iter_init(mle->vote_map, &iter);
965 	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
966 		ret = dlm_do_master_request(res, mle, nodenum);
967 		if (ret < 0)
968 			mlog_errno(ret);
969 		if (mle->master != O2NM_MAX_NODES) {
970 			/* found a master ! */
971 			if (mle->master <= nodenum)
972 				break;
973 			/* if our master request has not reached the master
974 			 * yet, keep going until it does.  this is how the
975 			 * master will know that asserts are needed back to
976 			 * the lower nodes. */
977 			mlog(0, "%s: res %.*s, Requests only up to %u but "
978 			     "master is %u, keep going\n", dlm->name, namelen,
979 			     lockid, nodenum, mle->master);
980 		}
981 	}
982 
983 wait:
984 	/* keep going until the response map includes all nodes */
985 	ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
986 	if (ret < 0) {
987 		wait_on_recovery = 1;
988 		mlog(0, "%s: res %.*s, Node map changed, redo the master "
989 		     "request now, blocked=%d\n", dlm->name, res->lockname.len,
990 		     res->lockname.name, blocked);
991 		if (++tries > 20) {
992 			mlog(ML_ERROR, "%s: res %.*s, Spinning on "
993 			     "dlm_wait_for_lock_mastery, blocked = %d\n",
994 			     dlm->name, res->lockname.len,
995 			     res->lockname.name, blocked);
996 			dlm_print_one_lock_resource(res);
997 			dlm_print_one_mle(mle);
998 			tries = 0;
999 		}
1000 		goto redo_request;
1001 	}
1002 
1003 	mlog(0, "%s: res %.*s, Mastered by %u\n", dlm->name, res->lockname.len,
1004 	     res->lockname.name, res->owner);
1005 	/* make sure we never continue without this */
1006 	BUG_ON(res->owner == O2NM_MAX_NODES);
1007 
1008 	/* master is known, detach if not already detached */
1009 	dlm_mle_detach_hb_events(dlm, mle);
1010 	dlm_put_mle(mle);
1011 	/* put the extra ref */
1012 	dlm_put_mle_inuse(mle);
1013 
1014 wake_waiters:
1015 	spin_lock(&res->spinlock);
1016 	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
1017 	spin_unlock(&res->spinlock);
1018 	wake_up(&res->wq);
1019 
1020 leave:
1021 	/* need to free the unused mle */
1022 	if (alloc_mle)
1023 		kmem_cache_free(dlm_mle_cache, alloc_mle);
1024 
1025 	return res;
1026 }
1027 
1028 
1029 #define DLM_MASTERY_TIMEOUT_MS   5000
1030 
dlm_wait_for_lock_mastery(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,struct dlm_master_list_entry * mle,int * blocked)1031 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
1032 				     struct dlm_lock_resource *res,
1033 				     struct dlm_master_list_entry *mle,
1034 				     int *blocked)
1035 {
1036 	u8 m;
1037 	int ret, bit;
1038 	int map_changed, voting_done;
1039 	int assert, sleep;
1040 
1041 recheck:
1042 	ret = 0;
1043 	assert = 0;
1044 
1045 	/* check if another node has already become the owner */
1046 	spin_lock(&res->spinlock);
1047 	if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1048 		mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name,
1049 		     res->lockname.len, res->lockname.name, res->owner);
1050 		spin_unlock(&res->spinlock);
1051 		/* this will cause the master to re-assert across
1052 		 * the whole cluster, freeing up mles */
1053 		if (res->owner != dlm->node_num) {
1054 			ret = dlm_do_master_request(res, mle, res->owner);
1055 			if (ret < 0) {
1056 				/* give recovery a chance to run */
1057 				mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
1058 				msleep(500);
1059 				goto recheck;
1060 			}
1061 		}
1062 		ret = 0;
1063 		goto leave;
1064 	}
1065 	spin_unlock(&res->spinlock);
1066 
1067 	spin_lock(&mle->spinlock);
1068 	m = mle->master;
1069 	map_changed = (memcmp(mle->vote_map, mle->node_map,
1070 			      sizeof(mle->vote_map)) != 0);
1071 	voting_done = (memcmp(mle->vote_map, mle->response_map,
1072 			     sizeof(mle->vote_map)) == 0);
1073 
1074 	/* restart if we hit any errors */
1075 	if (map_changed) {
1076 		int b;
1077 		mlog(0, "%s: %.*s: node map changed, restarting\n",
1078 		     dlm->name, res->lockname.len, res->lockname.name);
1079 		ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);
1080 		b = (mle->type == DLM_MLE_BLOCK);
1081 		if ((*blocked && !b) || (!*blocked && b)) {
1082 			mlog(0, "%s:%.*s: status change: old=%d new=%d\n",
1083 			     dlm->name, res->lockname.len, res->lockname.name,
1084 			     *blocked, b);
1085 			*blocked = b;
1086 		}
1087 		spin_unlock(&mle->spinlock);
1088 		if (ret < 0) {
1089 			mlog_errno(ret);
1090 			goto leave;
1091 		}
1092 		mlog(0, "%s:%.*s: restart lock mastery succeeded, "
1093 		     "rechecking now\n", dlm->name, res->lockname.len,
1094 		     res->lockname.name);
1095 		goto recheck;
1096 	} else {
1097 		if (!voting_done) {
1098 			mlog(0, "map not changed and voting not done "
1099 			     "for %s:%.*s\n", dlm->name, res->lockname.len,
1100 			     res->lockname.name);
1101 		}
1102 	}
1103 
1104 	if (m != O2NM_MAX_NODES) {
1105 		/* another node has done an assert!
1106 		 * all done! */
1107 		sleep = 0;
1108 	} else {
1109 		sleep = 1;
1110 		/* have all nodes responded? */
1111 		if (voting_done && !*blocked) {
1112 			bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
1113 			if (dlm->node_num <= bit) {
1114 				/* my node number is lowest.
1115 			 	 * now tell other nodes that I am
1116 				 * mastering this. */
1117 				mle->master = dlm->node_num;
1118 				/* ref was grabbed in get_lock_resource
1119 				 * will be dropped in dlmlock_master */
1120 				assert = 1;
1121 				sleep = 0;
1122 			}
1123 			/* if voting is done, but we have not received
1124 			 * an assert master yet, we must sleep */
1125 		}
1126 	}
1127 
1128 	spin_unlock(&mle->spinlock);
1129 
1130 	/* sleep if we haven't finished voting yet */
1131 	if (sleep) {
1132 		unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS);
1133 
1134 		/*
1135 		if (atomic_read(&mle->mle_refs.refcount) < 2)
1136 			mlog(ML_ERROR, "mle (%p) refs=%d, name=%.*s\n", mle,
1137 			atomic_read(&mle->mle_refs.refcount),
1138 			res->lockname.len, res->lockname.name);
1139 		*/
1140 		atomic_set(&mle->woken, 0);
1141 		(void)wait_event_timeout(mle->wq,
1142 					 (atomic_read(&mle->woken) == 1),
1143 					 timeo);
1144 		if (res->owner == O2NM_MAX_NODES) {
1145 			mlog(0, "%s:%.*s: waiting again\n", dlm->name,
1146 			     res->lockname.len, res->lockname.name);
1147 			goto recheck;
1148 		}
1149 		mlog(0, "done waiting, master is %u\n", res->owner);
1150 		ret = 0;
1151 		goto leave;
1152 	}
1153 
1154 	ret = 0;   /* done */
1155 	if (assert) {
1156 		m = dlm->node_num;
1157 		mlog(0, "about to master %.*s here, this=%u\n",
1158 		     res->lockname.len, res->lockname.name, m);
1159 		ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0);
1160 		if (ret) {
1161 			/* This is a failure in the network path,
1162 			 * not in the response to the assert_master
1163 			 * (any nonzero response is a BUG on this node).
1164 			 * Most likely a socket just got disconnected
1165 			 * due to node death. */
1166 			mlog_errno(ret);
1167 		}
1168 		/* no longer need to restart lock mastery.
1169 		 * all living nodes have been contacted. */
1170 		ret = 0;
1171 	}
1172 
1173 	/* set the lockres owner */
1174 	spin_lock(&res->spinlock);
1175 	/* mastery reference obtained either during
1176 	 * assert_master_handler or in get_lock_resource */
1177 	dlm_change_lockres_owner(dlm, res, m);
1178 	spin_unlock(&res->spinlock);
1179 
1180 leave:
1181 	return ret;
1182 }
1183 
1184 struct dlm_bitmap_diff_iter
1185 {
1186 	int curnode;
1187 	unsigned long *orig_bm;
1188 	unsigned long *cur_bm;
1189 	unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];
1190 };
1191 
1192 enum dlm_node_state_change
1193 {
1194 	NODE_DOWN = -1,
1195 	NODE_NO_CHANGE = 0,
1196 	NODE_UP
1197 };
1198 
dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter * iter,unsigned long * orig_bm,unsigned long * cur_bm)1199 static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,
1200 				      unsigned long *orig_bm,
1201 				      unsigned long *cur_bm)
1202 {
1203 	unsigned long p1, p2;
1204 	int i;
1205 
1206 	iter->curnode = -1;
1207 	iter->orig_bm = orig_bm;
1208 	iter->cur_bm = cur_bm;
1209 
1210 	for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {
1211        		p1 = *(iter->orig_bm + i);
1212 	       	p2 = *(iter->cur_bm + i);
1213 		iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);
1214 	}
1215 }
1216 
dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter * iter,enum dlm_node_state_change * state)1217 static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,
1218 				     enum dlm_node_state_change *state)
1219 {
1220 	int bit;
1221 
1222 	if (iter->curnode >= O2NM_MAX_NODES)
1223 		return -ENOENT;
1224 
1225 	bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES,
1226 			    iter->curnode+1);
1227 	if (bit >= O2NM_MAX_NODES) {
1228 		iter->curnode = O2NM_MAX_NODES;
1229 		return -ENOENT;
1230 	}
1231 
1232 	/* if it was there in the original then this node died */
1233 	if (test_bit(bit, iter->orig_bm))
1234 		*state = NODE_DOWN;
1235 	else
1236 		*state = NODE_UP;
1237 
1238 	iter->curnode = bit;
1239 	return bit;
1240 }
1241 
1242 
dlm_restart_lock_mastery(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,struct dlm_master_list_entry * mle,int blocked)1243 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1244 				    struct dlm_lock_resource *res,
1245 				    struct dlm_master_list_entry *mle,
1246 				    int blocked)
1247 {
1248 	struct dlm_bitmap_diff_iter bdi;
1249 	enum dlm_node_state_change sc;
1250 	int node;
1251 	int ret = 0;
1252 
1253 	mlog(0, "something happened such that the "
1254 	     "master process may need to be restarted!\n");
1255 
1256 	assert_spin_locked(&mle->spinlock);
1257 
1258 	dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);
1259 	node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1260 	while (node >= 0) {
1261 		if (sc == NODE_UP) {
1262 			/* a node came up.  clear any old vote from
1263 			 * the response map and set it in the vote map
1264 			 * then restart the mastery. */
1265 			mlog(ML_NOTICE, "node %d up while restarting\n", node);
1266 
1267 			/* redo the master request, but only for the new node */
1268 			mlog(0, "sending request to new node\n");
1269 			clear_bit(node, mle->response_map);
1270 			set_bit(node, mle->vote_map);
1271 		} else {
1272 			mlog(ML_ERROR, "node down! %d\n", node);
1273 			if (blocked) {
1274 				int lowest = find_next_bit(mle->maybe_map,
1275 						       O2NM_MAX_NODES, 0);
1276 
1277 				/* act like it was never there */
1278 				clear_bit(node, mle->maybe_map);
1279 
1280 			       	if (node == lowest) {
1281 					mlog(0, "expected master %u died"
1282 					    " while this node was blocked "
1283 					    "waiting on it!\n", node);
1284 					lowest = find_next_bit(mle->maybe_map,
1285 						       	O2NM_MAX_NODES,
1286 						       	lowest+1);
1287 					if (lowest < O2NM_MAX_NODES) {
1288 						mlog(0, "%s:%.*s:still "
1289 						     "blocked. waiting on %u "
1290 						     "now\n", dlm->name,
1291 						     res->lockname.len,
1292 						     res->lockname.name,
1293 						     lowest);
1294 					} else {
1295 						/* mle is an MLE_BLOCK, but
1296 						 * there is now nothing left to
1297 						 * block on.  we need to return
1298 						 * all the way back out and try
1299 						 * again with an MLE_MASTER.
1300 						 * dlm_do_local_recovery_cleanup
1301 						 * has already run, so the mle
1302 						 * refcount is ok */
1303 						mlog(0, "%s:%.*s: no "
1304 						     "longer blocking. try to "
1305 						     "master this here\n",
1306 						     dlm->name,
1307 						     res->lockname.len,
1308 						     res->lockname.name);
1309 						mle->type = DLM_MLE_MASTER;
1310 						mle->mleres = res;
1311 					}
1312 				}
1313 			}
1314 
1315 			/* now blank out everything, as if we had never
1316 			 * contacted anyone */
1317 			memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
1318 			memset(mle->response_map, 0, sizeof(mle->response_map));
1319 			/* reset the vote_map to the current node_map */
1320 			memcpy(mle->vote_map, mle->node_map,
1321 			       sizeof(mle->node_map));
1322 			/* put myself into the maybe map */
1323 			if (mle->type != DLM_MLE_BLOCK)
1324 				set_bit(dlm->node_num, mle->maybe_map);
1325 		}
1326 		ret = -EAGAIN;
1327 		node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1328 	}
1329 	return ret;
1330 }
1331 
1332 
1333 /*
1334  * DLM_MASTER_REQUEST_MSG
1335  *
1336  * returns: 0 on success,
1337  *          -errno on a network error
1338  *
1339  * on error, the caller should assume the target node is "dead"
1340  *
1341  */
1342 
dlm_do_master_request(struct dlm_lock_resource * res,struct dlm_master_list_entry * mle,int to)1343 static int dlm_do_master_request(struct dlm_lock_resource *res,
1344 				 struct dlm_master_list_entry *mle, int to)
1345 {
1346 	struct dlm_ctxt *dlm = mle->dlm;
1347 	struct dlm_master_request request;
1348 	int ret, response=0, resend;
1349 
1350 	memset(&request, 0, sizeof(request));
1351 	request.node_idx = dlm->node_num;
1352 
1353 	BUG_ON(mle->type == DLM_MLE_MIGRATION);
1354 
1355 	request.namelen = (u8)mle->mnamelen;
1356 	memcpy(request.name, mle->mname, request.namelen);
1357 
1358 again:
1359 	ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,
1360 				 sizeof(request), to, &response);
1361 	if (ret < 0)  {
1362 		if (ret == -ESRCH) {
1363 			/* should never happen */
1364 			mlog(ML_ERROR, "TCP stack not ready!\n");
1365 			BUG();
1366 		} else if (ret == -EINVAL) {
1367 			mlog(ML_ERROR, "bad args passed to o2net!\n");
1368 			BUG();
1369 		} else if (ret == -ENOMEM) {
1370 			mlog(ML_ERROR, "out of memory while trying to send "
1371 			     "network message!  retrying\n");
1372 			/* this is totally crude */
1373 			msleep(50);
1374 			goto again;
1375 		} else if (!dlm_is_host_down(ret)) {
1376 			/* not a network error. bad. */
1377 			mlog_errno(ret);
1378 			mlog(ML_ERROR, "unhandled error!");
1379 			BUG();
1380 		}
1381 		/* all other errors should be network errors,
1382 		 * and likely indicate node death */
1383 		mlog(ML_ERROR, "link to %d went down!\n", to);
1384 		goto out;
1385 	}
1386 
1387 	ret = 0;
1388 	resend = 0;
1389 	spin_lock(&mle->spinlock);
1390 	switch (response) {
1391 		case DLM_MASTER_RESP_YES:
1392 			set_bit(to, mle->response_map);
1393 			mlog(0, "node %u is the master, response=YES\n", to);
1394 			mlog(0, "%s:%.*s: master node %u now knows I have a "
1395 			     "reference\n", dlm->name, res->lockname.len,
1396 			     res->lockname.name, to);
1397 			mle->master = to;
1398 			break;
1399 		case DLM_MASTER_RESP_NO:
1400 			mlog(0, "node %u not master, response=NO\n", to);
1401 			set_bit(to, mle->response_map);
1402 			break;
1403 		case DLM_MASTER_RESP_MAYBE:
1404 			mlog(0, "node %u not master, response=MAYBE\n", to);
1405 			set_bit(to, mle->response_map);
1406 			set_bit(to, mle->maybe_map);
1407 			break;
1408 		case DLM_MASTER_RESP_ERROR:
1409 			mlog(0, "node %u hit an error, resending\n", to);
1410 			resend = 1;
1411 			response = 0;
1412 			break;
1413 		default:
1414 			mlog(ML_ERROR, "bad response! %u\n", response);
1415 			BUG();
1416 	}
1417 	spin_unlock(&mle->spinlock);
1418 	if (resend) {
1419 		/* this is also totally crude */
1420 		msleep(50);
1421 		goto again;
1422 	}
1423 
1424 out:
1425 	return ret;
1426 }
1427 
1428 /*
1429  * locks that can be taken here:
1430  * dlm->spinlock
1431  * res->spinlock
1432  * mle->spinlock
1433  * dlm->master_list
1434  *
1435  * if possible, TRIM THIS DOWN!!!
1436  */
dlm_master_request_handler(struct o2net_msg * msg,u32 len,void * data,void ** ret_data)1437 int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data,
1438 			       void **ret_data)
1439 {
1440 	u8 response = DLM_MASTER_RESP_MAYBE;
1441 	struct dlm_ctxt *dlm = data;
1442 	struct dlm_lock_resource *res = NULL;
1443 	struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
1444 	struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
1445 	char *name;
1446 	unsigned int namelen, hash;
1447 	int found, ret;
1448 	int set_maybe;
1449 	int dispatch_assert = 0;
1450 	int dispatched = 0;
1451 
1452 	if (!dlm_grab(dlm))
1453 		return DLM_MASTER_RESP_NO;
1454 
1455 	if (!dlm_domain_fully_joined(dlm)) {
1456 		response = DLM_MASTER_RESP_NO;
1457 		goto send_response;
1458 	}
1459 
1460 	name = request->name;
1461 	namelen = request->namelen;
1462 	hash = dlm_lockid_hash(name, namelen);
1463 
1464 	if (namelen > DLM_LOCKID_NAME_MAX) {
1465 		response = DLM_IVBUFLEN;
1466 		goto send_response;
1467 	}
1468 
1469 way_up_top:
1470 	spin_lock(&dlm->spinlock);
1471 	res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1472 	if (res) {
1473 		spin_unlock(&dlm->spinlock);
1474 
1475 		/* take care of the easy cases up front */
1476 		spin_lock(&res->spinlock);
1477 		if (res->state & (DLM_LOCK_RES_RECOVERING|
1478 				  DLM_LOCK_RES_MIGRATING)) {
1479 			spin_unlock(&res->spinlock);
1480 			mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
1481 			     "being recovered/migrated\n");
1482 			response = DLM_MASTER_RESP_ERROR;
1483 			if (mle)
1484 				kmem_cache_free(dlm_mle_cache, mle);
1485 			goto send_response;
1486 		}
1487 
1488 		if (res->owner == dlm->node_num) {
1489 			dlm_lockres_set_refmap_bit(dlm, res, request->node_idx);
1490 			spin_unlock(&res->spinlock);
1491 			response = DLM_MASTER_RESP_YES;
1492 			if (mle)
1493 				kmem_cache_free(dlm_mle_cache, mle);
1494 
1495 			/* this node is the owner.
1496 			 * there is some extra work that needs to
1497 			 * happen now.  the requesting node has
1498 			 * caused all nodes up to this one to
1499 			 * create mles.  this node now needs to
1500 			 * go back and clean those up. */
1501 			dispatch_assert = 1;
1502 			goto send_response;
1503 		} else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1504 			spin_unlock(&res->spinlock);
1505 			// mlog(0, "node %u is the master\n", res->owner);
1506 			response = DLM_MASTER_RESP_NO;
1507 			if (mle)
1508 				kmem_cache_free(dlm_mle_cache, mle);
1509 			goto send_response;
1510 		}
1511 
1512 		/* ok, there is no owner.  either this node is
1513 		 * being blocked, or it is actively trying to
1514 		 * master this lock. */
1515 		if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1516 			mlog(ML_ERROR, "lock with no owner should be "
1517 			     "in-progress!\n");
1518 			BUG();
1519 		}
1520 
1521 		// mlog(0, "lockres is in progress...\n");
1522 		spin_lock(&dlm->master_lock);
1523 		found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1524 		if (!found) {
1525 			mlog(ML_ERROR, "no mle found for this lock!\n");
1526 			BUG();
1527 		}
1528 		set_maybe = 1;
1529 		spin_lock(&tmpmle->spinlock);
1530 		if (tmpmle->type == DLM_MLE_BLOCK) {
1531 			// mlog(0, "this node is waiting for "
1532 			// "lockres to be mastered\n");
1533 			response = DLM_MASTER_RESP_NO;
1534 		} else if (tmpmle->type == DLM_MLE_MIGRATION) {
1535 			mlog(0, "node %u is master, but trying to migrate to "
1536 			     "node %u.\n", tmpmle->master, tmpmle->new_master);
1537 			if (tmpmle->master == dlm->node_num) {
1538 				mlog(ML_ERROR, "no owner on lockres, but this "
1539 				     "node is trying to migrate it to %u?!\n",
1540 				     tmpmle->new_master);
1541 				BUG();
1542 			} else {
1543 				/* the real master can respond on its own */
1544 				response = DLM_MASTER_RESP_NO;
1545 			}
1546 		} else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {
1547 			set_maybe = 0;
1548 			if (tmpmle->master == dlm->node_num) {
1549 				response = DLM_MASTER_RESP_YES;
1550 				/* this node will be the owner.
1551 				 * go back and clean the mles on any
1552 				 * other nodes */
1553 				dispatch_assert = 1;
1554 				dlm_lockres_set_refmap_bit(dlm, res,
1555 							   request->node_idx);
1556 			} else
1557 				response = DLM_MASTER_RESP_NO;
1558 		} else {
1559 			// mlog(0, "this node is attempting to "
1560 			// "master lockres\n");
1561 			response = DLM_MASTER_RESP_MAYBE;
1562 		}
1563 		if (set_maybe)
1564 			set_bit(request->node_idx, tmpmle->maybe_map);
1565 		spin_unlock(&tmpmle->spinlock);
1566 
1567 		spin_unlock(&dlm->master_lock);
1568 		spin_unlock(&res->spinlock);
1569 
1570 		/* keep the mle attached to heartbeat events */
1571 		dlm_put_mle(tmpmle);
1572 		if (mle)
1573 			kmem_cache_free(dlm_mle_cache, mle);
1574 		goto send_response;
1575 	}
1576 
1577 	/*
1578 	 * lockres doesn't exist on this node
1579 	 * if there is an MLE_BLOCK, return NO
1580 	 * if there is an MLE_MASTER, return MAYBE
1581 	 * otherwise, add an MLE_BLOCK, return NO
1582 	 */
1583 	spin_lock(&dlm->master_lock);
1584 	found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1585 	if (!found) {
1586 		/* this lockid has never been seen on this node yet */
1587 		// mlog(0, "no mle found\n");
1588 		if (!mle) {
1589 			spin_unlock(&dlm->master_lock);
1590 			spin_unlock(&dlm->spinlock);
1591 
1592 			mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
1593 			if (!mle) {
1594 				response = DLM_MASTER_RESP_ERROR;
1595 				mlog_errno(-ENOMEM);
1596 				goto send_response;
1597 			}
1598 			goto way_up_top;
1599 		}
1600 
1601 		// mlog(0, "this is second time thru, already allocated, "
1602 		// "add the block.\n");
1603 		dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen);
1604 		set_bit(request->node_idx, mle->maybe_map);
1605 		__dlm_insert_mle(dlm, mle);
1606 		response = DLM_MASTER_RESP_NO;
1607 	} else {
1608 		// mlog(0, "mle was found\n");
1609 		set_maybe = 1;
1610 		spin_lock(&tmpmle->spinlock);
1611 		if (tmpmle->master == dlm->node_num) {
1612 			mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n");
1613 			BUG();
1614 		}
1615 		if (tmpmle->type == DLM_MLE_BLOCK)
1616 			response = DLM_MASTER_RESP_NO;
1617 		else if (tmpmle->type == DLM_MLE_MIGRATION) {
1618 			mlog(0, "migration mle was found (%u->%u)\n",
1619 			     tmpmle->master, tmpmle->new_master);
1620 			/* real master can respond on its own */
1621 			response = DLM_MASTER_RESP_NO;
1622 		} else
1623 			response = DLM_MASTER_RESP_MAYBE;
1624 		if (set_maybe)
1625 			set_bit(request->node_idx, tmpmle->maybe_map);
1626 		spin_unlock(&tmpmle->spinlock);
1627 	}
1628 	spin_unlock(&dlm->master_lock);
1629 	spin_unlock(&dlm->spinlock);
1630 
1631 	if (found) {
1632 		/* keep the mle attached to heartbeat events */
1633 		dlm_put_mle(tmpmle);
1634 	}
1635 send_response:
1636 	/*
1637 	 * __dlm_lookup_lockres() grabbed a reference to this lockres.
1638 	 * The reference is released by dlm_assert_master_worker() under
1639 	 * the call to dlm_dispatch_assert_master().  If
1640 	 * dlm_assert_master_worker() isn't called, we drop it here.
1641 	 */
1642 	if (dispatch_assert) {
1643 		if (response != DLM_MASTER_RESP_YES)
1644 			mlog(ML_ERROR, "invalid response %d\n", response);
1645 		if (!res) {
1646 			mlog(ML_ERROR, "bad lockres while trying to assert!\n");
1647 			BUG();
1648 		}
1649 		mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
1650 			     dlm->node_num, res->lockname.len, res->lockname.name);
1651 		ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx,
1652 						 DLM_ASSERT_MASTER_MLE_CLEANUP);
1653 		if (ret < 0) {
1654 			mlog(ML_ERROR, "failed to dispatch assert master work\n");
1655 			response = DLM_MASTER_RESP_ERROR;
1656 			dlm_lockres_put(res);
1657 		} else {
1658 			dispatched = 1;
1659 			dlm_lockres_grab_inflight_worker(dlm, res);
1660 		}
1661 	} else {
1662 		if (res)
1663 			dlm_lockres_put(res);
1664 	}
1665 
1666 	if (!dispatched)
1667 		dlm_put(dlm);
1668 	return response;
1669 }
1670 
1671 /*
1672  * DLM_ASSERT_MASTER_MSG
1673  */
1674 
1675 
1676 /*
1677  * NOTE: this can be used for debugging
1678  * can periodically run all locks owned by this node
1679  * and re-assert across the cluster...
1680  */
dlm_do_assert_master(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,void * nodemap,u32 flags)1681 static int dlm_do_assert_master(struct dlm_ctxt *dlm,
1682 				struct dlm_lock_resource *res,
1683 				void *nodemap, u32 flags)
1684 {
1685 	struct dlm_assert_master assert;
1686 	int to, tmpret;
1687 	struct dlm_node_iter iter;
1688 	int ret = 0;
1689 	int reassert;
1690 	const char *lockname = res->lockname.name;
1691 	unsigned int namelen = res->lockname.len;
1692 
1693 	BUG_ON(namelen > O2NM_MAX_NAME_LEN);
1694 
1695 	spin_lock(&res->spinlock);
1696 	res->state |= DLM_LOCK_RES_SETREF_INPROG;
1697 	spin_unlock(&res->spinlock);
1698 
1699 again:
1700 	reassert = 0;
1701 
1702 	/* note that if this nodemap is empty, it returns 0 */
1703 	dlm_node_iter_init(nodemap, &iter);
1704 	while ((to = dlm_node_iter_next(&iter)) >= 0) {
1705 		int r = 0;
1706 		struct dlm_master_list_entry *mle = NULL;
1707 
1708 		mlog(0, "sending assert master to %d (%.*s)\n", to,
1709 		     namelen, lockname);
1710 		memset(&assert, 0, sizeof(assert));
1711 		assert.node_idx = dlm->node_num;
1712 		assert.namelen = namelen;
1713 		memcpy(assert.name, lockname, namelen);
1714 		assert.flags = cpu_to_be32(flags);
1715 
1716 		tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
1717 					    &assert, sizeof(assert), to, &r);
1718 		if (tmpret < 0) {
1719 			mlog(ML_ERROR, "Error %d when sending message %u (key "
1720 			     "0x%x) to node %u\n", tmpret,
1721 			     DLM_ASSERT_MASTER_MSG, dlm->key, to);
1722 			if (!dlm_is_host_down(tmpret)) {
1723 				mlog(ML_ERROR, "unhandled error=%d!\n", tmpret);
1724 				BUG();
1725 			}
1726 			/* a node died.  finish out the rest of the nodes. */
1727 			mlog(0, "link to %d went down!\n", to);
1728 			/* any nonzero status return will do */
1729 			ret = tmpret;
1730 			r = 0;
1731 		} else if (r < 0) {
1732 			/* ok, something horribly messed.  kill thyself. */
1733 			mlog(ML_ERROR,"during assert master of %.*s to %u, "
1734 			     "got %d.\n", namelen, lockname, to, r);
1735 			spin_lock(&dlm->spinlock);
1736 			spin_lock(&dlm->master_lock);
1737 			if (dlm_find_mle(dlm, &mle, (char *)lockname,
1738 					 namelen)) {
1739 				dlm_print_one_mle(mle);
1740 				__dlm_put_mle(mle);
1741 			}
1742 			spin_unlock(&dlm->master_lock);
1743 			spin_unlock(&dlm->spinlock);
1744 			BUG();
1745 		}
1746 
1747 		if (r & DLM_ASSERT_RESPONSE_REASSERT &&
1748 		    !(r & DLM_ASSERT_RESPONSE_MASTERY_REF)) {
1749 				mlog(ML_ERROR, "%.*s: very strange, "
1750 				     "master MLE but no lockres on %u\n",
1751 				     namelen, lockname, to);
1752 		}
1753 
1754 		if (r & DLM_ASSERT_RESPONSE_REASSERT) {
1755 			mlog(0, "%.*s: node %u create mles on other "
1756 			     "nodes and requests a re-assert\n",
1757 			     namelen, lockname, to);
1758 			reassert = 1;
1759 		}
1760 		if (r & DLM_ASSERT_RESPONSE_MASTERY_REF) {
1761 			mlog(0, "%.*s: node %u has a reference to this "
1762 			     "lockres, set the bit in the refmap\n",
1763 			     namelen, lockname, to);
1764 			spin_lock(&res->spinlock);
1765 			dlm_lockres_set_refmap_bit(dlm, res, to);
1766 			spin_unlock(&res->spinlock);
1767 		}
1768 	}
1769 
1770 	if (reassert)
1771 		goto again;
1772 
1773 	spin_lock(&res->spinlock);
1774 	res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
1775 	spin_unlock(&res->spinlock);
1776 	wake_up(&res->wq);
1777 
1778 	return ret;
1779 }
1780 
1781 /*
1782  * locks that can be taken here:
1783  * dlm->spinlock
1784  * res->spinlock
1785  * mle->spinlock
1786  * dlm->master_list
1787  *
1788  * if possible, TRIM THIS DOWN!!!
1789  */
dlm_assert_master_handler(struct o2net_msg * msg,u32 len,void * data,void ** ret_data)1790 int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data,
1791 			      void **ret_data)
1792 {
1793 	struct dlm_ctxt *dlm = data;
1794 	struct dlm_master_list_entry *mle = NULL;
1795 	struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;
1796 	struct dlm_lock_resource *res = NULL;
1797 	char *name;
1798 	unsigned int namelen, hash;
1799 	u32 flags;
1800 	int master_request = 0, have_lockres_ref = 0;
1801 	int ret = 0;
1802 
1803 	if (!dlm_grab(dlm))
1804 		return 0;
1805 
1806 	name = assert->name;
1807 	namelen = assert->namelen;
1808 	hash = dlm_lockid_hash(name, namelen);
1809 	flags = be32_to_cpu(assert->flags);
1810 
1811 	if (namelen > DLM_LOCKID_NAME_MAX) {
1812 		mlog(ML_ERROR, "Invalid name length!");
1813 		goto done;
1814 	}
1815 
1816 	spin_lock(&dlm->spinlock);
1817 
1818 	if (flags)
1819 		mlog(0, "assert_master with flags: %u\n", flags);
1820 
1821 	/* find the MLE */
1822 	spin_lock(&dlm->master_lock);
1823 	if (!dlm_find_mle(dlm, &mle, name, namelen)) {
1824 		/* not an error, could be master just re-asserting */
1825 		mlog(0, "just got an assert_master from %u, but no "
1826 		     "MLE for it! (%.*s)\n", assert->node_idx,
1827 		     namelen, name);
1828 	} else {
1829 		int bit = find_next_bit (mle->maybe_map, O2NM_MAX_NODES, 0);
1830 		if (bit >= O2NM_MAX_NODES) {
1831 			/* not necessarily an error, though less likely.
1832 			 * could be master just re-asserting. */
1833 			mlog(0, "no bits set in the maybe_map, but %u "
1834 			     "is asserting! (%.*s)\n", assert->node_idx,
1835 			     namelen, name);
1836 		} else if (bit != assert->node_idx) {
1837 			if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1838 				mlog(0, "master %u was found, %u should "
1839 				     "back off\n", assert->node_idx, bit);
1840 			} else {
1841 				/* with the fix for bug 569, a higher node
1842 				 * number winning the mastery will respond
1843 				 * YES to mastery requests, but this node
1844 				 * had no way of knowing.  let it pass. */
1845 				mlog(0, "%u is the lowest node, "
1846 				     "%u is asserting. (%.*s)  %u must "
1847 				     "have begun after %u won.\n", bit,
1848 				     assert->node_idx, namelen, name, bit,
1849 				     assert->node_idx);
1850 			}
1851 		}
1852 		if (mle->type == DLM_MLE_MIGRATION) {
1853 			if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1854 				mlog(0, "%s:%.*s: got cleanup assert"
1855 				     " from %u for migration\n",
1856 				     dlm->name, namelen, name,
1857 				     assert->node_idx);
1858 			} else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) {
1859 				mlog(0, "%s:%.*s: got unrelated assert"
1860 				     " from %u for migration, ignoring\n",
1861 				     dlm->name, namelen, name,
1862 				     assert->node_idx);
1863 				__dlm_put_mle(mle);
1864 				spin_unlock(&dlm->master_lock);
1865 				spin_unlock(&dlm->spinlock);
1866 				goto done;
1867 			}
1868 		}
1869 	}
1870 	spin_unlock(&dlm->master_lock);
1871 
1872 	/* ok everything checks out with the MLE
1873 	 * now check to see if there is a lockres */
1874 	res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1875 	if (res) {
1876 		spin_lock(&res->spinlock);
1877 		if (res->state & DLM_LOCK_RES_RECOVERING)  {
1878 			mlog(ML_ERROR, "%u asserting but %.*s is "
1879 			     "RECOVERING!\n", assert->node_idx, namelen, name);
1880 			goto kill;
1881 		}
1882 		if (!mle) {
1883 			if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN &&
1884 			    res->owner != assert->node_idx) {
1885 				mlog(ML_ERROR, "DIE! Mastery assert from %u, "
1886 				     "but current owner is %u! (%.*s)\n",
1887 				     assert->node_idx, res->owner, namelen,
1888 				     name);
1889 				__dlm_print_one_lock_resource(res);
1890 				BUG();
1891 			}
1892 		} else if (mle->type != DLM_MLE_MIGRATION) {
1893 			if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1894 				/* owner is just re-asserting */
1895 				if (res->owner == assert->node_idx) {
1896 					mlog(0, "owner %u re-asserting on "
1897 					     "lock %.*s\n", assert->node_idx,
1898 					     namelen, name);
1899 					goto ok;
1900 				}
1901 				mlog(ML_ERROR, "got assert_master from "
1902 				     "node %u, but %u is the owner! "
1903 				     "(%.*s)\n", assert->node_idx,
1904 				     res->owner, namelen, name);
1905 				goto kill;
1906 			}
1907 			if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1908 				mlog(ML_ERROR, "got assert from %u, but lock "
1909 				     "with no owner should be "
1910 				     "in-progress! (%.*s)\n",
1911 				     assert->node_idx,
1912 				     namelen, name);
1913 				goto kill;
1914 			}
1915 		} else /* mle->type == DLM_MLE_MIGRATION */ {
1916 			/* should only be getting an assert from new master */
1917 			if (assert->node_idx != mle->new_master) {
1918 				mlog(ML_ERROR, "got assert from %u, but "
1919 				     "new master is %u, and old master "
1920 				     "was %u (%.*s)\n",
1921 				     assert->node_idx, mle->new_master,
1922 				     mle->master, namelen, name);
1923 				goto kill;
1924 			}
1925 
1926 		}
1927 ok:
1928 		spin_unlock(&res->spinlock);
1929 	}
1930 
1931 	// mlog(0, "woo!  got an assert_master from node %u!\n",
1932 	// 	     assert->node_idx);
1933 	if (mle) {
1934 		int extra_ref = 0;
1935 		int nn = -1;
1936 		int rr, err = 0;
1937 
1938 		spin_lock(&mle->spinlock);
1939 		if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
1940 			extra_ref = 1;
1941 		else {
1942 			/* MASTER mle: if any bits set in the response map
1943 			 * then the calling node needs to re-assert to clear
1944 			 * up nodes that this node contacted */
1945 			while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES,
1946 						    nn+1)) < O2NM_MAX_NODES) {
1947 				if (nn != dlm->node_num && nn != assert->node_idx) {
1948 					master_request = 1;
1949 					break;
1950 				}
1951 			}
1952 		}
1953 		mle->master = assert->node_idx;
1954 		atomic_set(&mle->woken, 1);
1955 		wake_up(&mle->wq);
1956 		spin_unlock(&mle->spinlock);
1957 
1958 		if (res) {
1959 			int wake = 0;
1960 			spin_lock(&res->spinlock);
1961 			if (mle->type == DLM_MLE_MIGRATION) {
1962 				mlog(0, "finishing off migration of lockres %.*s, "
1963 			     		"from %u to %u\n",
1964 			       		res->lockname.len, res->lockname.name,
1965 			       		dlm->node_num, mle->new_master);
1966 				res->state &= ~DLM_LOCK_RES_MIGRATING;
1967 				wake = 1;
1968 				dlm_change_lockres_owner(dlm, res, mle->new_master);
1969 				BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
1970 			} else {
1971 				dlm_change_lockres_owner(dlm, res, mle->master);
1972 			}
1973 			spin_unlock(&res->spinlock);
1974 			have_lockres_ref = 1;
1975 			if (wake)
1976 				wake_up(&res->wq);
1977 		}
1978 
1979 		/* master is known, detach if not already detached.
1980 		 * ensures that only one assert_master call will happen
1981 		 * on this mle. */
1982 		spin_lock(&dlm->master_lock);
1983 
1984 		rr = atomic_read(&mle->mle_refs.refcount);
1985 		if (mle->inuse > 0) {
1986 			if (extra_ref && rr < 3)
1987 				err = 1;
1988 			else if (!extra_ref && rr < 2)
1989 				err = 1;
1990 		} else {
1991 			if (extra_ref && rr < 2)
1992 				err = 1;
1993 			else if (!extra_ref && rr < 1)
1994 				err = 1;
1995 		}
1996 		if (err) {
1997 			mlog(ML_ERROR, "%s:%.*s: got assert master from %u "
1998 			     "that will mess up this node, refs=%d, extra=%d, "
1999 			     "inuse=%d\n", dlm->name, namelen, name,
2000 			     assert->node_idx, rr, extra_ref, mle->inuse);
2001 			dlm_print_one_mle(mle);
2002 		}
2003 		__dlm_unlink_mle(dlm, mle);
2004 		__dlm_mle_detach_hb_events(dlm, mle);
2005 		__dlm_put_mle(mle);
2006 		if (extra_ref) {
2007 			/* the assert master message now balances the extra
2008 		 	 * ref given by the master / migration request message.
2009 		 	 * if this is the last put, it will be removed
2010 		 	 * from the list. */
2011 			__dlm_put_mle(mle);
2012 		}
2013 		spin_unlock(&dlm->master_lock);
2014 	} else if (res) {
2015 		if (res->owner != assert->node_idx) {
2016 			mlog(0, "assert_master from %u, but current "
2017 			     "owner is %u (%.*s), no mle\n", assert->node_idx,
2018 			     res->owner, namelen, name);
2019 		}
2020 	}
2021 	spin_unlock(&dlm->spinlock);
2022 
2023 done:
2024 	ret = 0;
2025 	if (res) {
2026 		spin_lock(&res->spinlock);
2027 		res->state |= DLM_LOCK_RES_SETREF_INPROG;
2028 		spin_unlock(&res->spinlock);
2029 		*ret_data = (void *)res;
2030 	}
2031 	dlm_put(dlm);
2032 	if (master_request) {
2033 		mlog(0, "need to tell master to reassert\n");
2034 		/* positive. negative would shoot down the node. */
2035 		ret |= DLM_ASSERT_RESPONSE_REASSERT;
2036 		if (!have_lockres_ref) {
2037 			mlog(ML_ERROR, "strange, got assert from %u, MASTER "
2038 			     "mle present here for %s:%.*s, but no lockres!\n",
2039 			     assert->node_idx, dlm->name, namelen, name);
2040 		}
2041 	}
2042 	if (have_lockres_ref) {
2043 		/* let the master know we have a reference to the lockres */
2044 		ret |= DLM_ASSERT_RESPONSE_MASTERY_REF;
2045 		mlog(0, "%s:%.*s: got assert from %u, need a ref\n",
2046 		     dlm->name, namelen, name, assert->node_idx);
2047 	}
2048 	return ret;
2049 
2050 kill:
2051 	/* kill the caller! */
2052 	mlog(ML_ERROR, "Bad message received from another node.  Dumping state "
2053 	     "and killing the other node now!  This node is OK and can continue.\n");
2054 	__dlm_print_one_lock_resource(res);
2055 	spin_unlock(&res->spinlock);
2056 	spin_lock(&dlm->master_lock);
2057 	if (mle)
2058 		__dlm_put_mle(mle);
2059 	spin_unlock(&dlm->master_lock);
2060 	spin_unlock(&dlm->spinlock);
2061 	*ret_data = (void *)res;
2062 	dlm_put(dlm);
2063 	return -EINVAL;
2064 }
2065 
dlm_assert_master_post_handler(int status,void * data,void * ret_data)2066 void dlm_assert_master_post_handler(int status, void *data, void *ret_data)
2067 {
2068 	struct dlm_lock_resource *res = (struct dlm_lock_resource *)ret_data;
2069 
2070 	if (ret_data) {
2071 		spin_lock(&res->spinlock);
2072 		res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
2073 		spin_unlock(&res->spinlock);
2074 		wake_up(&res->wq);
2075 		dlm_lockres_put(res);
2076 	}
2077 	return;
2078 }
2079 
dlm_dispatch_assert_master(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,int ignore_higher,u8 request_from,u32 flags)2080 int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
2081 			       struct dlm_lock_resource *res,
2082 			       int ignore_higher, u8 request_from, u32 flags)
2083 {
2084 	struct dlm_work_item *item;
2085 	item = kzalloc(sizeof(*item), GFP_ATOMIC);
2086 	if (!item)
2087 		return -ENOMEM;
2088 
2089 
2090 	/* queue up work for dlm_assert_master_worker */
2091 	dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);
2092 	item->u.am.lockres = res; /* already have a ref */
2093 	/* can optionally ignore node numbers higher than this node */
2094 	item->u.am.ignore_higher = ignore_higher;
2095 	item->u.am.request_from = request_from;
2096 	item->u.am.flags = flags;
2097 
2098 	if (ignore_higher)
2099 		mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len,
2100 		     res->lockname.name);
2101 
2102 	spin_lock(&dlm->work_lock);
2103 	list_add_tail(&item->list, &dlm->work_list);
2104 	spin_unlock(&dlm->work_lock);
2105 
2106 	queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2107 	return 0;
2108 }
2109 
dlm_assert_master_worker(struct dlm_work_item * item,void * data)2110 static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
2111 {
2112 	struct dlm_ctxt *dlm = data;
2113 	int ret = 0;
2114 	struct dlm_lock_resource *res;
2115 	unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)];
2116 	int ignore_higher;
2117 	int bit;
2118 	u8 request_from;
2119 	u32 flags;
2120 
2121 	dlm = item->dlm;
2122 	res = item->u.am.lockres;
2123 	ignore_higher = item->u.am.ignore_higher;
2124 	request_from = item->u.am.request_from;
2125 	flags = item->u.am.flags;
2126 
2127 	spin_lock(&dlm->spinlock);
2128 	memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
2129 	spin_unlock(&dlm->spinlock);
2130 
2131 	clear_bit(dlm->node_num, nodemap);
2132 	if (ignore_higher) {
2133 		/* if is this just to clear up mles for nodes below
2134 		 * this node, do not send the message to the original
2135 		 * caller or any node number higher than this */
2136 		clear_bit(request_from, nodemap);
2137 		bit = dlm->node_num;
2138 		while (1) {
2139 			bit = find_next_bit(nodemap, O2NM_MAX_NODES,
2140 					    bit+1);
2141 		       	if (bit >= O2NM_MAX_NODES)
2142 				break;
2143 			clear_bit(bit, nodemap);
2144 		}
2145 	}
2146 
2147 	/*
2148 	 * If we're migrating this lock to someone else, we are no
2149 	 * longer allowed to assert out own mastery.  OTOH, we need to
2150 	 * prevent migration from starting while we're still asserting
2151 	 * our dominance.  The reserved ast delays migration.
2152 	 */
2153 	spin_lock(&res->spinlock);
2154 	if (res->state & DLM_LOCK_RES_MIGRATING) {
2155 		mlog(0, "Someone asked us to assert mastery, but we're "
2156 		     "in the middle of migration.  Skipping assert, "
2157 		     "the new master will handle that.\n");
2158 		spin_unlock(&res->spinlock);
2159 		goto put;
2160 	} else
2161 		__dlm_lockres_reserve_ast(res);
2162 	spin_unlock(&res->spinlock);
2163 
2164 	/* this call now finishes out the nodemap
2165 	 * even if one or more nodes die */
2166 	mlog(0, "worker about to master %.*s here, this=%u\n",
2167 		     res->lockname.len, res->lockname.name, dlm->node_num);
2168 	ret = dlm_do_assert_master(dlm, res, nodemap, flags);
2169 	if (ret < 0) {
2170 		/* no need to restart, we are done */
2171 		if (!dlm_is_host_down(ret))
2172 			mlog_errno(ret);
2173 	}
2174 
2175 	/* Ok, we've asserted ourselves.  Let's let migration start. */
2176 	dlm_lockres_release_ast(dlm, res);
2177 
2178 put:
2179 	dlm_lockres_drop_inflight_worker(dlm, res);
2180 
2181 	dlm_lockres_put(res);
2182 
2183 	mlog(0, "finished with dlm_assert_master_worker\n");
2184 }
2185 
2186 /* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
2187  * We cannot wait for node recovery to complete to begin mastering this
2188  * lockres because this lockres is used to kick off recovery! ;-)
2189  * So, do a pre-check on all living nodes to see if any of those nodes
2190  * think that $RECOVERY is currently mastered by a dead node.  If so,
2191  * we wait a short time to allow that node to get notified by its own
2192  * heartbeat stack, then check again.  All $RECOVERY lock resources
2193  * mastered by dead nodes are purged when the hearbeat callback is
2194  * fired, so we can know for sure that it is safe to continue once
2195  * the node returns a live node or no node.  */
dlm_pre_master_reco_lockres(struct dlm_ctxt * dlm,struct dlm_lock_resource * res)2196 static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
2197 				       struct dlm_lock_resource *res)
2198 {
2199 	struct dlm_node_iter iter;
2200 	int nodenum;
2201 	int ret = 0;
2202 	u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;
2203 
2204 	spin_lock(&dlm->spinlock);
2205 	dlm_node_iter_init(dlm->domain_map, &iter);
2206 	spin_unlock(&dlm->spinlock);
2207 
2208 	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
2209 		/* do not send to self */
2210 		if (nodenum == dlm->node_num)
2211 			continue;
2212 		ret = dlm_do_master_requery(dlm, res, nodenum, &master);
2213 		if (ret < 0) {
2214 			mlog_errno(ret);
2215 			if (!dlm_is_host_down(ret))
2216 				BUG();
2217 			/* host is down, so answer for that node would be
2218 			 * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
2219 			ret = 0;
2220 		}
2221 
2222 		if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
2223 			/* check to see if this master is in the recovery map */
2224 			spin_lock(&dlm->spinlock);
2225 			if (test_bit(master, dlm->recovery_map)) {
2226 				mlog(ML_NOTICE, "%s: node %u has not seen "
2227 				     "node %u go down yet, and thinks the "
2228 				     "dead node is mastering the recovery "
2229 				     "lock.  must wait.\n", dlm->name,
2230 				     nodenum, master);
2231 				ret = -EAGAIN;
2232 			}
2233 			spin_unlock(&dlm->spinlock);
2234 			mlog(0, "%s: reco lock master is %u\n", dlm->name,
2235 			     master);
2236 			break;
2237 		}
2238 	}
2239 	return ret;
2240 }
2241 
2242 /*
2243  * DLM_DEREF_LOCKRES_MSG
2244  */
2245 
dlm_drop_lockres_ref(struct dlm_ctxt * dlm,struct dlm_lock_resource * res)2246 int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2247 {
2248 	struct dlm_deref_lockres deref;
2249 	int ret = 0, r;
2250 	const char *lockname;
2251 	unsigned int namelen;
2252 
2253 	lockname = res->lockname.name;
2254 	namelen = res->lockname.len;
2255 	BUG_ON(namelen > O2NM_MAX_NAME_LEN);
2256 
2257 	memset(&deref, 0, sizeof(deref));
2258 	deref.node_idx = dlm->node_num;
2259 	deref.namelen = namelen;
2260 	memcpy(deref.name, lockname, namelen);
2261 
2262 	ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,
2263 				 &deref, sizeof(deref), res->owner, &r);
2264 	if (ret < 0)
2265 		mlog(ML_ERROR, "%s: res %.*s, error %d send DEREF to node %u\n",
2266 		     dlm->name, namelen, lockname, ret, res->owner);
2267 	else if (r < 0) {
2268 		/* BAD.  other node says I did not have a ref. */
2269 		mlog(ML_ERROR, "%s: res %.*s, DEREF to node %u got %d\n",
2270 		     dlm->name, namelen, lockname, res->owner, r);
2271 		dlm_print_one_lock_resource(res);
2272 		BUG();
2273 	}
2274 	return ret;
2275 }
2276 
dlm_deref_lockres_handler(struct o2net_msg * msg,u32 len,void * data,void ** ret_data)2277 int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
2278 			      void **ret_data)
2279 {
2280 	struct dlm_ctxt *dlm = data;
2281 	struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf;
2282 	struct dlm_lock_resource *res = NULL;
2283 	char *name;
2284 	unsigned int namelen;
2285 	int ret = -EINVAL;
2286 	u8 node;
2287 	unsigned int hash;
2288 	struct dlm_work_item *item;
2289 	int cleared = 0;
2290 	int dispatch = 0;
2291 
2292 	if (!dlm_grab(dlm))
2293 		return 0;
2294 
2295 	name = deref->name;
2296 	namelen = deref->namelen;
2297 	node = deref->node_idx;
2298 
2299 	if (namelen > DLM_LOCKID_NAME_MAX) {
2300 		mlog(ML_ERROR, "Invalid name length!");
2301 		goto done;
2302 	}
2303 	if (deref->node_idx >= O2NM_MAX_NODES) {
2304 		mlog(ML_ERROR, "Invalid node number: %u\n", node);
2305 		goto done;
2306 	}
2307 
2308 	hash = dlm_lockid_hash(name, namelen);
2309 
2310 	spin_lock(&dlm->spinlock);
2311 	res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
2312 	if (!res) {
2313 		spin_unlock(&dlm->spinlock);
2314 		mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
2315 		     dlm->name, namelen, name);
2316 		goto done;
2317 	}
2318 	spin_unlock(&dlm->spinlock);
2319 
2320 	spin_lock(&res->spinlock);
2321 	if (res->state & DLM_LOCK_RES_SETREF_INPROG)
2322 		dispatch = 1;
2323 	else {
2324 		BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2325 		if (test_bit(node, res->refmap)) {
2326 			dlm_lockres_clear_refmap_bit(dlm, res, node);
2327 			cleared = 1;
2328 		}
2329 	}
2330 	spin_unlock(&res->spinlock);
2331 
2332 	if (!dispatch) {
2333 		if (cleared)
2334 			dlm_lockres_calc_usage(dlm, res);
2335 		else {
2336 			mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2337 		     	"but it is already dropped!\n", dlm->name,
2338 		     	res->lockname.len, res->lockname.name, node);
2339 			dlm_print_one_lock_resource(res);
2340 		}
2341 		ret = 0;
2342 		goto done;
2343 	}
2344 
2345 	item = kzalloc(sizeof(*item), GFP_NOFS);
2346 	if (!item) {
2347 		ret = -ENOMEM;
2348 		mlog_errno(ret);
2349 		goto done;
2350 	}
2351 
2352 	dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL);
2353 	item->u.dl.deref_res = res;
2354 	item->u.dl.deref_node = node;
2355 
2356 	spin_lock(&dlm->work_lock);
2357 	list_add_tail(&item->list, &dlm->work_list);
2358 	spin_unlock(&dlm->work_lock);
2359 
2360 	queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2361 	return 0;
2362 
2363 done:
2364 	if (res)
2365 		dlm_lockres_put(res);
2366 	dlm_put(dlm);
2367 
2368 	return ret;
2369 }
2370 
dlm_deref_lockres_worker(struct dlm_work_item * item,void * data)2371 static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
2372 {
2373 	struct dlm_ctxt *dlm;
2374 	struct dlm_lock_resource *res;
2375 	u8 node;
2376 	u8 cleared = 0;
2377 
2378 	dlm = item->dlm;
2379 	res = item->u.dl.deref_res;
2380 	node = item->u.dl.deref_node;
2381 
2382 	spin_lock(&res->spinlock);
2383 	BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2384 	if (test_bit(node, res->refmap)) {
2385 		__dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
2386 		dlm_lockres_clear_refmap_bit(dlm, res, node);
2387 		cleared = 1;
2388 	}
2389 	spin_unlock(&res->spinlock);
2390 
2391 	if (cleared) {
2392 		mlog(0, "%s:%.*s node %u ref dropped in dispatch\n",
2393 		     dlm->name, res->lockname.len, res->lockname.name, node);
2394 		dlm_lockres_calc_usage(dlm, res);
2395 	} else {
2396 		mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2397 		     "but it is already dropped!\n", dlm->name,
2398 		     res->lockname.len, res->lockname.name, node);
2399 		dlm_print_one_lock_resource(res);
2400 	}
2401 
2402 	dlm_lockres_put(res);
2403 }
2404 
2405 /*
2406  * A migrateable resource is one that is :
2407  * 1. locally mastered, and,
2408  * 2. zero local locks, and,
2409  * 3. one or more non-local locks, or, one or more references
2410  * Returns 1 if yes, 0 if not.
2411  */
dlm_is_lockres_migrateable(struct dlm_ctxt * dlm,struct dlm_lock_resource * res)2412 static int dlm_is_lockres_migrateable(struct dlm_ctxt *dlm,
2413 				      struct dlm_lock_resource *res)
2414 {
2415 	enum dlm_lockres_list idx;
2416 	int nonlocal = 0, node_ref;
2417 	struct list_head *queue;
2418 	struct dlm_lock *lock;
2419 	u64 cookie;
2420 
2421 	assert_spin_locked(&res->spinlock);
2422 
2423 	/* delay migration when the lockres is in MIGRATING state */
2424 	if (res->state & DLM_LOCK_RES_MIGRATING)
2425 		return 0;
2426 
2427 	/* delay migration when the lockres is in RECOCERING state */
2428 	if (res->state & DLM_LOCK_RES_RECOVERING)
2429 		return 0;
2430 
2431 	if (res->owner != dlm->node_num)
2432 		return 0;
2433 
2434         for (idx = DLM_GRANTED_LIST; idx <= DLM_BLOCKED_LIST; idx++) {
2435 		queue = dlm_list_idx_to_ptr(res, idx);
2436 		list_for_each_entry(lock, queue, list) {
2437 			if (lock->ml.node != dlm->node_num) {
2438 				nonlocal++;
2439 				continue;
2440 			}
2441 			cookie = be64_to_cpu(lock->ml.cookie);
2442 			mlog(0, "%s: Not migrateable res %.*s, lock %u:%llu on "
2443 			     "%s list\n", dlm->name, res->lockname.len,
2444 			     res->lockname.name,
2445 			     dlm_get_lock_cookie_node(cookie),
2446 			     dlm_get_lock_cookie_seq(cookie),
2447 			     dlm_list_in_text(idx));
2448 			return 0;
2449 		}
2450 	}
2451 
2452 	if (!nonlocal) {
2453 		node_ref = find_next_bit(res->refmap, O2NM_MAX_NODES, 0);
2454 		if (node_ref >= O2NM_MAX_NODES)
2455 			return 0;
2456 	}
2457 
2458 	mlog(0, "%s: res %.*s, Migrateable\n", dlm->name, res->lockname.len,
2459 	     res->lockname.name);
2460 
2461 	return 1;
2462 }
2463 
2464 /*
2465  * DLM_MIGRATE_LOCKRES
2466  */
2467 
2468 
dlm_migrate_lockres(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,u8 target)2469 static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
2470 			       struct dlm_lock_resource *res, u8 target)
2471 {
2472 	struct dlm_master_list_entry *mle = NULL;
2473 	struct dlm_master_list_entry *oldmle = NULL;
2474  	struct dlm_migratable_lockres *mres = NULL;
2475 	int ret = 0;
2476 	const char *name;
2477 	unsigned int namelen;
2478 	int mle_added = 0;
2479 	int wake = 0;
2480 
2481 	if (!dlm_grab(dlm))
2482 		return -EINVAL;
2483 
2484 	BUG_ON(target == O2NM_MAX_NODES);
2485 
2486 	name = res->lockname.name;
2487 	namelen = res->lockname.len;
2488 
2489 	mlog(0, "%s: Migrating %.*s to node %u\n", dlm->name, namelen, name,
2490 	     target);
2491 
2492 	/* preallocate up front. if this fails, abort */
2493 	ret = -ENOMEM;
2494 	mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_NOFS);
2495 	if (!mres) {
2496 		mlog_errno(ret);
2497 		goto leave;
2498 	}
2499 
2500 	mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
2501 	if (!mle) {
2502 		mlog_errno(ret);
2503 		goto leave;
2504 	}
2505 	ret = 0;
2506 
2507 	/*
2508 	 * clear any existing master requests and
2509 	 * add the migration mle to the list
2510 	 */
2511 	spin_lock(&dlm->spinlock);
2512 	spin_lock(&dlm->master_lock);
2513 	ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name,
2514 				    namelen, target, dlm->node_num);
2515 	/* get an extra reference on the mle.
2516 	 * otherwise the assert_master from the new
2517 	 * master will destroy this.
2518 	 */
2519 	dlm_get_mle_inuse(mle);
2520 	spin_unlock(&dlm->master_lock);
2521 	spin_unlock(&dlm->spinlock);
2522 
2523 	if (ret == -EEXIST) {
2524 		mlog(0, "another process is already migrating it\n");
2525 		goto fail;
2526 	}
2527 	mle_added = 1;
2528 
2529 	/*
2530 	 * set the MIGRATING flag and flush asts
2531 	 * if we fail after this we need to re-dirty the lockres
2532 	 */
2533 	if (dlm_mark_lockres_migrating(dlm, res, target) < 0) {
2534 		mlog(ML_ERROR, "tried to migrate %.*s to %u, but "
2535 		     "the target went down.\n", res->lockname.len,
2536 		     res->lockname.name, target);
2537 		spin_lock(&res->spinlock);
2538 		res->state &= ~DLM_LOCK_RES_MIGRATING;
2539 		wake = 1;
2540 		spin_unlock(&res->spinlock);
2541 		ret = -EINVAL;
2542 	}
2543 
2544 fail:
2545 	if (oldmle) {
2546 		/* master is known, detach if not already detached */
2547 		dlm_mle_detach_hb_events(dlm, oldmle);
2548 		dlm_put_mle(oldmle);
2549 	}
2550 
2551 	if (ret < 0) {
2552 		if (mle_added) {
2553 			dlm_mle_detach_hb_events(dlm, mle);
2554 			dlm_put_mle(mle);
2555 			dlm_put_mle_inuse(mle);
2556 		} else if (mle) {
2557 			kmem_cache_free(dlm_mle_cache, mle);
2558 			mle = NULL;
2559 		}
2560 		goto leave;
2561 	}
2562 
2563 	/*
2564 	 * at this point, we have a migration target, an mle
2565 	 * in the master list, and the MIGRATING flag set on
2566 	 * the lockres
2567 	 */
2568 
2569 	/* now that remote nodes are spinning on the MIGRATING flag,
2570 	 * ensure that all assert_master work is flushed. */
2571 	flush_workqueue(dlm->dlm_worker);
2572 
2573 	/* notify new node and send all lock state */
2574 	/* call send_one_lockres with migration flag.
2575 	 * this serves as notice to the target node that a
2576 	 * migration is starting. */
2577 	ret = dlm_send_one_lockres(dlm, res, mres, target,
2578 				   DLM_MRES_MIGRATION);
2579 
2580 	if (ret < 0) {
2581 		mlog(0, "migration to node %u failed with %d\n",
2582 		     target, ret);
2583 		/* migration failed, detach and clean up mle */
2584 		dlm_mle_detach_hb_events(dlm, mle);
2585 		dlm_put_mle(mle);
2586 		dlm_put_mle_inuse(mle);
2587 		spin_lock(&res->spinlock);
2588 		res->state &= ~DLM_LOCK_RES_MIGRATING;
2589 		wake = 1;
2590 		spin_unlock(&res->spinlock);
2591 		if (dlm_is_host_down(ret))
2592 			dlm_wait_for_node_death(dlm, target,
2593 						DLM_NODE_DEATH_WAIT_MAX);
2594 		goto leave;
2595 	}
2596 
2597 	/* at this point, the target sends a message to all nodes,
2598 	 * (using dlm_do_migrate_request).  this node is skipped since
2599 	 * we had to put an mle in the list to begin the process.  this
2600 	 * node now waits for target to do an assert master.  this node
2601 	 * will be the last one notified, ensuring that the migration
2602 	 * is complete everywhere.  if the target dies while this is
2603 	 * going on, some nodes could potentially see the target as the
2604 	 * master, so it is important that my recovery finds the migration
2605 	 * mle and sets the master to UNKNOWN. */
2606 
2607 
2608 	/* wait for new node to assert master */
2609 	while (1) {
2610 		ret = wait_event_interruptible_timeout(mle->wq,
2611 					(atomic_read(&mle->woken) == 1),
2612 					msecs_to_jiffies(5000));
2613 
2614 		if (ret >= 0) {
2615 		       	if (atomic_read(&mle->woken) == 1 ||
2616 			    res->owner == target)
2617 				break;
2618 
2619 			mlog(0, "%s:%.*s: timed out during migration\n",
2620 			     dlm->name, res->lockname.len, res->lockname.name);
2621 			/* avoid hang during shutdown when migrating lockres
2622 			 * to a node which also goes down */
2623 			if (dlm_is_node_dead(dlm, target)) {
2624 				mlog(0, "%s:%.*s: expected migration "
2625 				     "target %u is no longer up, restarting\n",
2626 				     dlm->name, res->lockname.len,
2627 				     res->lockname.name, target);
2628 				ret = -EINVAL;
2629 				/* migration failed, detach and clean up mle */
2630 				dlm_mle_detach_hb_events(dlm, mle);
2631 				dlm_put_mle(mle);
2632 				dlm_put_mle_inuse(mle);
2633 				spin_lock(&res->spinlock);
2634 				res->state &= ~DLM_LOCK_RES_MIGRATING;
2635 				wake = 1;
2636 				spin_unlock(&res->spinlock);
2637 				goto leave;
2638 			}
2639 		} else
2640 			mlog(0, "%s:%.*s: caught signal during migration\n",
2641 			     dlm->name, res->lockname.len, res->lockname.name);
2642 	}
2643 
2644 	/* all done, set the owner, clear the flag */
2645 	spin_lock(&res->spinlock);
2646 	dlm_set_lockres_owner(dlm, res, target);
2647 	res->state &= ~DLM_LOCK_RES_MIGRATING;
2648 	dlm_remove_nonlocal_locks(dlm, res);
2649 	spin_unlock(&res->spinlock);
2650 	wake_up(&res->wq);
2651 
2652 	/* master is known, detach if not already detached */
2653 	dlm_mle_detach_hb_events(dlm, mle);
2654 	dlm_put_mle_inuse(mle);
2655 	ret = 0;
2656 
2657 	dlm_lockres_calc_usage(dlm, res);
2658 
2659 leave:
2660 	/* re-dirty the lockres if we failed */
2661 	if (ret < 0)
2662 		dlm_kick_thread(dlm, res);
2663 
2664 	/* wake up waiters if the MIGRATING flag got set
2665 	 * but migration failed */
2666 	if (wake)
2667 		wake_up(&res->wq);
2668 
2669 	if (mres)
2670 		free_page((unsigned long)mres);
2671 
2672 	dlm_put(dlm);
2673 
2674 	mlog(0, "%s: Migrating %.*s to %u, returns %d\n", dlm->name, namelen,
2675 	     name, target, ret);
2676 	return ret;
2677 }
2678 
2679 #define DLM_MIGRATION_RETRY_MS  100
2680 
2681 /*
2682  * Should be called only after beginning the domain leave process.
2683  * There should not be any remaining locks on nonlocal lock resources,
2684  * and there should be no local locks left on locally mastered resources.
2685  *
2686  * Called with the dlm spinlock held, may drop it to do migration, but
2687  * will re-acquire before exit.
2688  *
2689  * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped
2690  */
dlm_empty_lockres(struct dlm_ctxt * dlm,struct dlm_lock_resource * res)2691 int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2692 {
2693 	int ret;
2694 	int lock_dropped = 0;
2695 	u8 target = O2NM_MAX_NODES;
2696 
2697 	assert_spin_locked(&dlm->spinlock);
2698 
2699 	spin_lock(&res->spinlock);
2700 	if (dlm_is_lockres_migrateable(dlm, res))
2701 		target = dlm_pick_migration_target(dlm, res);
2702 	spin_unlock(&res->spinlock);
2703 
2704 	if (target == O2NM_MAX_NODES)
2705 		goto leave;
2706 
2707 	/* Wheee! Migrate lockres here! Will sleep so drop spinlock. */
2708 	spin_unlock(&dlm->spinlock);
2709 	lock_dropped = 1;
2710 	ret = dlm_migrate_lockres(dlm, res, target);
2711 	if (ret)
2712 		mlog(0, "%s: res %.*s, Migrate to node %u failed with %d\n",
2713 		     dlm->name, res->lockname.len, res->lockname.name,
2714 		     target, ret);
2715 	spin_lock(&dlm->spinlock);
2716 leave:
2717 	return lock_dropped;
2718 }
2719 
dlm_lock_basts_flushed(struct dlm_ctxt * dlm,struct dlm_lock * lock)2720 int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
2721 {
2722 	int ret;
2723 	spin_lock(&dlm->ast_lock);
2724 	spin_lock(&lock->spinlock);
2725 	ret = (list_empty(&lock->bast_list) && !lock->bast_pending);
2726 	spin_unlock(&lock->spinlock);
2727 	spin_unlock(&dlm->ast_lock);
2728 	return ret;
2729 }
2730 
dlm_migration_can_proceed(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,u8 mig_target)2731 static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
2732 				     struct dlm_lock_resource *res,
2733 				     u8 mig_target)
2734 {
2735 	int can_proceed;
2736 	spin_lock(&res->spinlock);
2737 	can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING);
2738 	spin_unlock(&res->spinlock);
2739 
2740 	/* target has died, so make the caller break out of the
2741 	 * wait_event, but caller must recheck the domain_map */
2742 	spin_lock(&dlm->spinlock);
2743 	if (!test_bit(mig_target, dlm->domain_map))
2744 		can_proceed = 1;
2745 	spin_unlock(&dlm->spinlock);
2746 	return can_proceed;
2747 }
2748 
dlm_lockres_is_dirty(struct dlm_ctxt * dlm,struct dlm_lock_resource * res)2749 static int dlm_lockres_is_dirty(struct dlm_ctxt *dlm,
2750 				struct dlm_lock_resource *res)
2751 {
2752 	int ret;
2753 	spin_lock(&res->spinlock);
2754 	ret = !!(res->state & DLM_LOCK_RES_DIRTY);
2755 	spin_unlock(&res->spinlock);
2756 	return ret;
2757 }
2758 
2759 
dlm_mark_lockres_migrating(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,u8 target)2760 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
2761 				       struct dlm_lock_resource *res,
2762 				       u8 target)
2763 {
2764 	int ret = 0;
2765 
2766 	mlog(0, "dlm_mark_lockres_migrating: %.*s, from %u to %u\n",
2767 	       res->lockname.len, res->lockname.name, dlm->node_num,
2768 	       target);
2769 	/* need to set MIGRATING flag on lockres.  this is done by
2770 	 * ensuring that all asts have been flushed for this lockres. */
2771 	spin_lock(&res->spinlock);
2772 	BUG_ON(res->migration_pending);
2773 	res->migration_pending = 1;
2774 	/* strategy is to reserve an extra ast then release
2775 	 * it below, letting the release do all of the work */
2776 	__dlm_lockres_reserve_ast(res);
2777 	spin_unlock(&res->spinlock);
2778 
2779 	/* now flush all the pending asts */
2780 	dlm_kick_thread(dlm, res);
2781 	/* before waiting on DIRTY, block processes which may
2782 	 * try to dirty the lockres before MIGRATING is set */
2783 	spin_lock(&res->spinlock);
2784 	BUG_ON(res->state & DLM_LOCK_RES_BLOCK_DIRTY);
2785 	res->state |= DLM_LOCK_RES_BLOCK_DIRTY;
2786 	spin_unlock(&res->spinlock);
2787 	/* now wait on any pending asts and the DIRTY state */
2788 	wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
2789 	dlm_lockres_release_ast(dlm, res);
2790 
2791 	mlog(0, "about to wait on migration_wq, dirty=%s\n",
2792 	       res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
2793 	/* if the extra ref we just put was the final one, this
2794 	 * will pass thru immediately.  otherwise, we need to wait
2795 	 * for the last ast to finish. */
2796 again:
2797 	ret = wait_event_interruptible_timeout(dlm->migration_wq,
2798 		   dlm_migration_can_proceed(dlm, res, target),
2799 		   msecs_to_jiffies(1000));
2800 	if (ret < 0) {
2801 		mlog(0, "woken again: migrating? %s, dead? %s\n",
2802 		       res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2803 		       test_bit(target, dlm->domain_map) ? "no":"yes");
2804 	} else {
2805 		mlog(0, "all is well: migrating? %s, dead? %s\n",
2806 		       res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2807 		       test_bit(target, dlm->domain_map) ? "no":"yes");
2808 	}
2809 	if (!dlm_migration_can_proceed(dlm, res, target)) {
2810 		mlog(0, "trying again...\n");
2811 		goto again;
2812 	}
2813 
2814 	ret = 0;
2815 	/* did the target go down or die? */
2816 	spin_lock(&dlm->spinlock);
2817 	if (!test_bit(target, dlm->domain_map)) {
2818 		mlog(ML_ERROR, "aha. migration target %u just went down\n",
2819 		     target);
2820 		ret = -EHOSTDOWN;
2821 	}
2822 	spin_unlock(&dlm->spinlock);
2823 
2824 	/*
2825 	 * if target is down, we need to clear DLM_LOCK_RES_BLOCK_DIRTY for
2826 	 * another try; otherwise, we are sure the MIGRATING state is there,
2827 	 * drop the unneded state which blocked threads trying to DIRTY
2828 	 */
2829 	spin_lock(&res->spinlock);
2830 	BUG_ON(!(res->state & DLM_LOCK_RES_BLOCK_DIRTY));
2831 	res->state &= ~DLM_LOCK_RES_BLOCK_DIRTY;
2832 	if (!ret)
2833 		BUG_ON(!(res->state & DLM_LOCK_RES_MIGRATING));
2834 	spin_unlock(&res->spinlock);
2835 
2836 	/*
2837 	 * at this point:
2838 	 *
2839 	 *   o the DLM_LOCK_RES_MIGRATING flag is set if target not down
2840 	 *   o there are no pending asts on this lockres
2841 	 *   o all processes trying to reserve an ast on this
2842 	 *     lockres must wait for the MIGRATING flag to clear
2843 	 */
2844 	return ret;
2845 }
2846 
2847 /* last step in the migration process.
2848  * original master calls this to free all of the dlm_lock
2849  * structures that used to be for other nodes. */
dlm_remove_nonlocal_locks(struct dlm_ctxt * dlm,struct dlm_lock_resource * res)2850 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2851 				      struct dlm_lock_resource *res)
2852 {
2853 	struct list_head *queue = &res->granted;
2854 	int i, bit;
2855 	struct dlm_lock *lock, *next;
2856 
2857 	assert_spin_locked(&res->spinlock);
2858 
2859 	BUG_ON(res->owner == dlm->node_num);
2860 
2861 	for (i=0; i<3; i++) {
2862 		list_for_each_entry_safe(lock, next, queue, list) {
2863 			if (lock->ml.node != dlm->node_num) {
2864 				mlog(0, "putting lock for node %u\n",
2865 				     lock->ml.node);
2866 				/* be extra careful */
2867 				BUG_ON(!list_empty(&lock->ast_list));
2868 				BUG_ON(!list_empty(&lock->bast_list));
2869 				BUG_ON(lock->ast_pending);
2870 				BUG_ON(lock->bast_pending);
2871 				dlm_lockres_clear_refmap_bit(dlm, res,
2872 							     lock->ml.node);
2873 				list_del_init(&lock->list);
2874 				dlm_lock_put(lock);
2875 				/* In a normal unlock, we would have added a
2876 				 * DLM_UNLOCK_FREE_LOCK action. Force it. */
2877 				dlm_lock_put(lock);
2878 			}
2879 		}
2880 		queue++;
2881 	}
2882 	bit = 0;
2883 	while (1) {
2884 		bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
2885 		if (bit >= O2NM_MAX_NODES)
2886 			break;
2887 		/* do not clear the local node reference, if there is a
2888 		 * process holding this, let it drop the ref itself */
2889 		if (bit != dlm->node_num) {
2890 			mlog(0, "%s:%.*s: node %u had a ref to this "
2891 			     "migrating lockres, clearing\n", dlm->name,
2892 			     res->lockname.len, res->lockname.name, bit);
2893 			dlm_lockres_clear_refmap_bit(dlm, res, bit);
2894 		}
2895 		bit++;
2896 	}
2897 }
2898 
2899 /*
2900  * Pick a node to migrate the lock resource to. This function selects a
2901  * potential target based first on the locks and then on refmap. It skips
2902  * nodes that are in the process of exiting the domain.
2903  */
dlm_pick_migration_target(struct dlm_ctxt * dlm,struct dlm_lock_resource * res)2904 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
2905 				    struct dlm_lock_resource *res)
2906 {
2907 	enum dlm_lockres_list idx;
2908 	struct list_head *queue = &res->granted;
2909 	struct dlm_lock *lock;
2910 	int noderef;
2911 	u8 nodenum = O2NM_MAX_NODES;
2912 
2913 	assert_spin_locked(&dlm->spinlock);
2914 	assert_spin_locked(&res->spinlock);
2915 
2916 	/* Go through all the locks */
2917 	for (idx = DLM_GRANTED_LIST; idx <= DLM_BLOCKED_LIST; idx++) {
2918 		queue = dlm_list_idx_to_ptr(res, idx);
2919 		list_for_each_entry(lock, queue, list) {
2920 			if (lock->ml.node == dlm->node_num)
2921 				continue;
2922 			if (test_bit(lock->ml.node, dlm->exit_domain_map))
2923 				continue;
2924 			nodenum = lock->ml.node;
2925 			goto bail;
2926 		}
2927 	}
2928 
2929 	/* Go thru the refmap */
2930 	noderef = -1;
2931 	while (1) {
2932 		noderef = find_next_bit(res->refmap, O2NM_MAX_NODES,
2933 					noderef + 1);
2934 		if (noderef >= O2NM_MAX_NODES)
2935 			break;
2936 		if (noderef == dlm->node_num)
2937 			continue;
2938 		if (test_bit(noderef, dlm->exit_domain_map))
2939 			continue;
2940 		nodenum = noderef;
2941 		goto bail;
2942 	}
2943 
2944 bail:
2945 	return nodenum;
2946 }
2947 
2948 /* this is called by the new master once all lockres
2949  * data has been received */
dlm_do_migrate_request(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,u8 master,u8 new_master,struct dlm_node_iter * iter)2950 static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
2951 				  struct dlm_lock_resource *res,
2952 				  u8 master, u8 new_master,
2953 				  struct dlm_node_iter *iter)
2954 {
2955 	struct dlm_migrate_request migrate;
2956 	int ret, skip, status = 0;
2957 	int nodenum;
2958 
2959 	memset(&migrate, 0, sizeof(migrate));
2960 	migrate.namelen = res->lockname.len;
2961 	memcpy(migrate.name, res->lockname.name, migrate.namelen);
2962 	migrate.new_master = new_master;
2963 	migrate.master = master;
2964 
2965 	ret = 0;
2966 
2967 	/* send message to all nodes, except the master and myself */
2968 	while ((nodenum = dlm_node_iter_next(iter)) >= 0) {
2969 		if (nodenum == master ||
2970 		    nodenum == new_master)
2971 			continue;
2972 
2973 		/* We could race exit domain. If exited, skip. */
2974 		spin_lock(&dlm->spinlock);
2975 		skip = (!test_bit(nodenum, dlm->domain_map));
2976 		spin_unlock(&dlm->spinlock);
2977 		if (skip) {
2978 			clear_bit(nodenum, iter->node_map);
2979 			continue;
2980 		}
2981 
2982 		ret = o2net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key,
2983 					 &migrate, sizeof(migrate), nodenum,
2984 					 &status);
2985 		if (ret < 0) {
2986 			mlog(ML_ERROR, "%s: res %.*s, Error %d send "
2987 			     "MIGRATE_REQUEST to node %u\n", dlm->name,
2988 			     migrate.namelen, migrate.name, ret, nodenum);
2989 			if (!dlm_is_host_down(ret)) {
2990 				mlog(ML_ERROR, "unhandled error=%d!\n", ret);
2991 				BUG();
2992 			}
2993 			clear_bit(nodenum, iter->node_map);
2994 			ret = 0;
2995 		} else if (status < 0) {
2996 			mlog(0, "migrate request (node %u) returned %d!\n",
2997 			     nodenum, status);
2998 			ret = status;
2999 		} else if (status == DLM_MIGRATE_RESPONSE_MASTERY_REF) {
3000 			/* during the migration request we short-circuited
3001 			 * the mastery of the lockres.  make sure we have
3002 			 * a mastery ref for nodenum */
3003 			mlog(0, "%s:%.*s: need ref for node %u\n",
3004 			     dlm->name, res->lockname.len, res->lockname.name,
3005 			     nodenum);
3006 			spin_lock(&res->spinlock);
3007 			dlm_lockres_set_refmap_bit(dlm, res, nodenum);
3008 			spin_unlock(&res->spinlock);
3009 		}
3010 	}
3011 
3012 	if (ret < 0)
3013 		mlog_errno(ret);
3014 
3015 	mlog(0, "returning ret=%d\n", ret);
3016 	return ret;
3017 }
3018 
3019 
3020 /* if there is an existing mle for this lockres, we now know who the master is.
3021  * (the one who sent us *this* message) we can clear it up right away.
3022  * since the process that put the mle on the list still has a reference to it,
3023  * we can unhash it now, set the master and wake the process.  as a result,
3024  * we will have no mle in the list to start with.  now we can add an mle for
3025  * the migration and this should be the only one found for those scanning the
3026  * list.  */
dlm_migrate_request_handler(struct o2net_msg * msg,u32 len,void * data,void ** ret_data)3027 int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data,
3028 				void **ret_data)
3029 {
3030 	struct dlm_ctxt *dlm = data;
3031 	struct dlm_lock_resource *res = NULL;
3032 	struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf;
3033 	struct dlm_master_list_entry *mle = NULL, *oldmle = NULL;
3034 	const char *name;
3035 	unsigned int namelen, hash;
3036 	int ret = 0;
3037 
3038 	if (!dlm_grab(dlm))
3039 		return -EINVAL;
3040 
3041 	name = migrate->name;
3042 	namelen = migrate->namelen;
3043 	hash = dlm_lockid_hash(name, namelen);
3044 
3045 	/* preallocate.. if this fails, abort */
3046 	mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
3047 
3048 	if (!mle) {
3049 		ret = -ENOMEM;
3050 		goto leave;
3051 	}
3052 
3053 	/* check for pre-existing lock */
3054 	spin_lock(&dlm->spinlock);
3055 	res = __dlm_lookup_lockres(dlm, name, namelen, hash);
3056 	if (res) {
3057 		spin_lock(&res->spinlock);
3058 		if (res->state & DLM_LOCK_RES_RECOVERING) {
3059 			/* if all is working ok, this can only mean that we got
3060 		 	* a migrate request from a node that we now see as
3061 		 	* dead.  what can we do here?  drop it to the floor? */
3062 			spin_unlock(&res->spinlock);
3063 			mlog(ML_ERROR, "Got a migrate request, but the "
3064 			     "lockres is marked as recovering!");
3065 			kmem_cache_free(dlm_mle_cache, mle);
3066 			ret = -EINVAL; /* need a better solution */
3067 			goto unlock;
3068 		}
3069 		res->state |= DLM_LOCK_RES_MIGRATING;
3070 		spin_unlock(&res->spinlock);
3071 	}
3072 
3073 	spin_lock(&dlm->master_lock);
3074 	/* ignore status.  only nonzero status would BUG. */
3075 	ret = dlm_add_migration_mle(dlm, res, mle, &oldmle,
3076 				    name, namelen,
3077 				    migrate->new_master,
3078 				    migrate->master);
3079 
3080 	spin_unlock(&dlm->master_lock);
3081 unlock:
3082 	spin_unlock(&dlm->spinlock);
3083 
3084 	if (oldmle) {
3085 		/* master is known, detach if not already detached */
3086 		dlm_mle_detach_hb_events(dlm, oldmle);
3087 		dlm_put_mle(oldmle);
3088 	}
3089 
3090 	if (res)
3091 		dlm_lockres_put(res);
3092 leave:
3093 	dlm_put(dlm);
3094 	return ret;
3095 }
3096 
3097 /* must be holding dlm->spinlock and dlm->master_lock
3098  * when adding a migration mle, we can clear any other mles
3099  * in the master list because we know with certainty that
3100  * the master is "master".  so we remove any old mle from
3101  * the list after setting it's master field, and then add
3102  * the new migration mle.  this way we can hold with the rule
3103  * of having only one mle for a given lock name at all times. */
dlm_add_migration_mle(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,struct dlm_master_list_entry * mle,struct dlm_master_list_entry ** oldmle,const char * name,unsigned int namelen,u8 new_master,u8 master)3104 static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
3105 				 struct dlm_lock_resource *res,
3106 				 struct dlm_master_list_entry *mle,
3107 				 struct dlm_master_list_entry **oldmle,
3108 				 const char *name, unsigned int namelen,
3109 				 u8 new_master, u8 master)
3110 {
3111 	int found;
3112 	int ret = 0;
3113 
3114 	*oldmle = NULL;
3115 
3116 	assert_spin_locked(&dlm->spinlock);
3117 	assert_spin_locked(&dlm->master_lock);
3118 
3119 	/* caller is responsible for any ref taken here on oldmle */
3120 	found = dlm_find_mle(dlm, oldmle, (char *)name, namelen);
3121 	if (found) {
3122 		struct dlm_master_list_entry *tmp = *oldmle;
3123 		spin_lock(&tmp->spinlock);
3124 		if (tmp->type == DLM_MLE_MIGRATION) {
3125 			if (master == dlm->node_num) {
3126 				/* ah another process raced me to it */
3127 				mlog(0, "tried to migrate %.*s, but some "
3128 				     "process beat me to it\n",
3129 				     namelen, name);
3130 				ret = -EEXIST;
3131 			} else {
3132 				/* bad.  2 NODES are trying to migrate! */
3133 				mlog(ML_ERROR, "migration error  mle: "
3134 				     "master=%u new_master=%u // request: "
3135 				     "master=%u new_master=%u // "
3136 				     "lockres=%.*s\n",
3137 				     tmp->master, tmp->new_master,
3138 				     master, new_master,
3139 				     namelen, name);
3140 				BUG();
3141 			}
3142 		} else {
3143 			/* this is essentially what assert_master does */
3144 			tmp->master = master;
3145 			atomic_set(&tmp->woken, 1);
3146 			wake_up(&tmp->wq);
3147 			/* remove it so that only one mle will be found */
3148 			__dlm_unlink_mle(dlm, tmp);
3149 			__dlm_mle_detach_hb_events(dlm, tmp);
3150 			if (tmp->type == DLM_MLE_MASTER) {
3151 				ret = DLM_MIGRATE_RESPONSE_MASTERY_REF;
3152 				mlog(0, "%s:%.*s: master=%u, newmaster=%u, "
3153 						"telling master to get ref "
3154 						"for cleared out mle during "
3155 						"migration\n", dlm->name,
3156 						namelen, name, master,
3157 						new_master);
3158 			}
3159 		}
3160 		spin_unlock(&tmp->spinlock);
3161 	}
3162 
3163 	/* now add a migration mle to the tail of the list */
3164 	dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
3165 	mle->new_master = new_master;
3166 	/* the new master will be sending an assert master for this.
3167 	 * at that point we will get the refmap reference */
3168 	mle->master = master;
3169 	/* do this for consistency with other mle types */
3170 	set_bit(new_master, mle->maybe_map);
3171 	__dlm_insert_mle(dlm, mle);
3172 
3173 	return ret;
3174 }
3175 
3176 /*
3177  * Sets the owner of the lockres, associated to the mle, to UNKNOWN
3178  */
dlm_reset_mleres_owner(struct dlm_ctxt * dlm,struct dlm_master_list_entry * mle)3179 static struct dlm_lock_resource *dlm_reset_mleres_owner(struct dlm_ctxt *dlm,
3180 					struct dlm_master_list_entry *mle)
3181 {
3182 	struct dlm_lock_resource *res;
3183 
3184 	/* Find the lockres associated to the mle and set its owner to UNK */
3185 	res = __dlm_lookup_lockres(dlm, mle->mname, mle->mnamelen,
3186 				   mle->mnamehash);
3187 	if (res) {
3188 		spin_unlock(&dlm->master_lock);
3189 
3190 		/* move lockres onto recovery list */
3191 		spin_lock(&res->spinlock);
3192 		dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
3193 		dlm_move_lockres_to_recovery_list(dlm, res);
3194 		spin_unlock(&res->spinlock);
3195 		dlm_lockres_put(res);
3196 
3197 		/* about to get rid of mle, detach from heartbeat */
3198 		__dlm_mle_detach_hb_events(dlm, mle);
3199 
3200 		/* dump the mle */
3201 		spin_lock(&dlm->master_lock);
3202 		__dlm_put_mle(mle);
3203 		spin_unlock(&dlm->master_lock);
3204 	}
3205 
3206 	return res;
3207 }
3208 
dlm_clean_migration_mle(struct dlm_ctxt * dlm,struct dlm_master_list_entry * mle)3209 static void dlm_clean_migration_mle(struct dlm_ctxt *dlm,
3210 				    struct dlm_master_list_entry *mle)
3211 {
3212 	__dlm_mle_detach_hb_events(dlm, mle);
3213 
3214 	spin_lock(&mle->spinlock);
3215 	__dlm_unlink_mle(dlm, mle);
3216 	atomic_set(&mle->woken, 1);
3217 	spin_unlock(&mle->spinlock);
3218 
3219 	wake_up(&mle->wq);
3220 }
3221 
dlm_clean_block_mle(struct dlm_ctxt * dlm,struct dlm_master_list_entry * mle,u8 dead_node)3222 static void dlm_clean_block_mle(struct dlm_ctxt *dlm,
3223 				struct dlm_master_list_entry *mle, u8 dead_node)
3224 {
3225 	int bit;
3226 
3227 	BUG_ON(mle->type != DLM_MLE_BLOCK);
3228 
3229 	spin_lock(&mle->spinlock);
3230 	bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
3231 	if (bit != dead_node) {
3232 		mlog(0, "mle found, but dead node %u would not have been "
3233 		     "master\n", dead_node);
3234 		spin_unlock(&mle->spinlock);
3235 	} else {
3236 		/* Must drop the refcount by one since the assert_master will
3237 		 * never arrive. This may result in the mle being unlinked and
3238 		 * freed, but there may still be a process waiting in the
3239 		 * dlmlock path which is fine. */
3240 		mlog(0, "node %u was expected master\n", dead_node);
3241 		atomic_set(&mle->woken, 1);
3242 		spin_unlock(&mle->spinlock);
3243 		wake_up(&mle->wq);
3244 
3245 		/* Do not need events any longer, so detach from heartbeat */
3246 		__dlm_mle_detach_hb_events(dlm, mle);
3247 		__dlm_put_mle(mle);
3248 	}
3249 }
3250 
dlm_clean_master_list(struct dlm_ctxt * dlm,u8 dead_node)3251 void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
3252 {
3253 	struct dlm_master_list_entry *mle;
3254 	struct dlm_lock_resource *res;
3255 	struct hlist_head *bucket;
3256 	struct hlist_node *tmp;
3257 	unsigned int i;
3258 
3259 	mlog(0, "dlm=%s, dead node=%u\n", dlm->name, dead_node);
3260 top:
3261 	assert_spin_locked(&dlm->spinlock);
3262 
3263 	/* clean the master list */
3264 	spin_lock(&dlm->master_lock);
3265 	for (i = 0; i < DLM_HASH_BUCKETS; i++) {
3266 		bucket = dlm_master_hash(dlm, i);
3267 		hlist_for_each_entry_safe(mle, tmp, bucket, master_hash_node) {
3268 			BUG_ON(mle->type != DLM_MLE_BLOCK &&
3269 			       mle->type != DLM_MLE_MASTER &&
3270 			       mle->type != DLM_MLE_MIGRATION);
3271 
3272 			/* MASTER mles are initiated locally. The waiting
3273 			 * process will notice the node map change shortly.
3274 			 * Let that happen as normal. */
3275 			if (mle->type == DLM_MLE_MASTER)
3276 				continue;
3277 
3278 			/* BLOCK mles are initiated by other nodes. Need to
3279 			 * clean up if the dead node would have been the
3280 			 * master. */
3281 			if (mle->type == DLM_MLE_BLOCK) {
3282 				dlm_clean_block_mle(dlm, mle, dead_node);
3283 				continue;
3284 			}
3285 
3286 			/* Everything else is a MIGRATION mle */
3287 
3288 			/* The rule for MIGRATION mles is that the master
3289 			 * becomes UNKNOWN if *either* the original or the new
3290 			 * master dies. All UNKNOWN lockres' are sent to
3291 			 * whichever node becomes the recovery master. The new
3292 			 * master is responsible for determining if there is
3293 			 * still a master for this lockres, or if he needs to
3294 			 * take over mastery. Either way, this node should
3295 			 * expect another message to resolve this. */
3296 
3297 			if (mle->master != dead_node &&
3298 			    mle->new_master != dead_node)
3299 				continue;
3300 
3301 			if (mle->new_master == dead_node && mle->inuse) {
3302 				mlog(ML_NOTICE, "%s: target %u died during "
3303 						"migration from %u, the MLE is "
3304 						"still keep used, ignore it!\n",
3305 						dlm->name, dead_node,
3306 						mle->master);
3307 				continue;
3308 			}
3309 
3310 			/* If we have reached this point, this mle needs to be
3311 			 * removed from the list and freed. */
3312 			dlm_clean_migration_mle(dlm, mle);
3313 
3314 			mlog(0, "%s: node %u died during migration from "
3315 			     "%u to %u!\n", dlm->name, dead_node, mle->master,
3316 			     mle->new_master);
3317 
3318 			/* If we find a lockres associated with the mle, we've
3319 			 * hit this rare case that messes up our lock ordering.
3320 			 * If so, we need to drop the master lock so that we can
3321 			 * take the lockres lock, meaning that we will have to
3322 			 * restart from the head of list. */
3323 			res = dlm_reset_mleres_owner(dlm, mle);
3324 			if (res)
3325 				/* restart */
3326 				goto top;
3327 
3328 			/* This may be the last reference */
3329 			__dlm_put_mle(mle);
3330 		}
3331 	}
3332 	spin_unlock(&dlm->master_lock);
3333 }
3334 
dlm_finish_migration(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,u8 old_master)3335 int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
3336 			 u8 old_master)
3337 {
3338 	struct dlm_node_iter iter;
3339 	int ret = 0;
3340 
3341 	spin_lock(&dlm->spinlock);
3342 	dlm_node_iter_init(dlm->domain_map, &iter);
3343 	clear_bit(old_master, iter.node_map);
3344 	clear_bit(dlm->node_num, iter.node_map);
3345 	spin_unlock(&dlm->spinlock);
3346 
3347 	/* ownership of the lockres is changing.  account for the
3348 	 * mastery reference here since old_master will briefly have
3349 	 * a reference after the migration completes */
3350 	spin_lock(&res->spinlock);
3351 	dlm_lockres_set_refmap_bit(dlm, res, old_master);
3352 	spin_unlock(&res->spinlock);
3353 
3354 	mlog(0, "now time to do a migrate request to other nodes\n");
3355 	ret = dlm_do_migrate_request(dlm, res, old_master,
3356 				     dlm->node_num, &iter);
3357 	if (ret < 0) {
3358 		mlog_errno(ret);
3359 		goto leave;
3360 	}
3361 
3362 	mlog(0, "doing assert master of %.*s to all except the original node\n",
3363 	     res->lockname.len, res->lockname.name);
3364 	/* this call now finishes out the nodemap
3365 	 * even if one or more nodes die */
3366 	ret = dlm_do_assert_master(dlm, res, iter.node_map,
3367 				   DLM_ASSERT_MASTER_FINISH_MIGRATION);
3368 	if (ret < 0) {
3369 		/* no longer need to retry.  all living nodes contacted. */
3370 		mlog_errno(ret);
3371 		ret = 0;
3372 	}
3373 
3374 	memset(iter.node_map, 0, sizeof(iter.node_map));
3375 	set_bit(old_master, iter.node_map);
3376 	mlog(0, "doing assert master of %.*s back to %u\n",
3377 	     res->lockname.len, res->lockname.name, old_master);
3378 	ret = dlm_do_assert_master(dlm, res, iter.node_map,
3379 				   DLM_ASSERT_MASTER_FINISH_MIGRATION);
3380 	if (ret < 0) {
3381 		mlog(0, "assert master to original master failed "
3382 		     "with %d.\n", ret);
3383 		/* the only nonzero status here would be because of
3384 		 * a dead original node.  we're done. */
3385 		ret = 0;
3386 	}
3387 
3388 	/* all done, set the owner, clear the flag */
3389 	spin_lock(&res->spinlock);
3390 	dlm_set_lockres_owner(dlm, res, dlm->node_num);
3391 	res->state &= ~DLM_LOCK_RES_MIGRATING;
3392 	spin_unlock(&res->spinlock);
3393 	/* re-dirty it on the new master */
3394 	dlm_kick_thread(dlm, res);
3395 	wake_up(&res->wq);
3396 leave:
3397 	return ret;
3398 }
3399 
3400 /*
3401  * LOCKRES AST REFCOUNT
3402  * this is integral to migration
3403  */
3404 
3405 /* for future intent to call an ast, reserve one ahead of time.
3406  * this should be called only after waiting on the lockres
3407  * with dlm_wait_on_lockres, and while still holding the
3408  * spinlock after the call. */
__dlm_lockres_reserve_ast(struct dlm_lock_resource * res)3409 void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res)
3410 {
3411 	assert_spin_locked(&res->spinlock);
3412 	if (res->state & DLM_LOCK_RES_MIGRATING) {
3413 		__dlm_print_one_lock_resource(res);
3414 	}
3415 	BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
3416 
3417 	atomic_inc(&res->asts_reserved);
3418 }
3419 
3420 /*
3421  * used to drop the reserved ast, either because it went unused,
3422  * or because the ast/bast was actually called.
3423  *
3424  * also, if there is a pending migration on this lockres,
3425  * and this was the last pending ast on the lockres,
3426  * atomically set the MIGRATING flag before we drop the lock.
3427  * this is how we ensure that migration can proceed with no
3428  * asts in progress.  note that it is ok if the state of the
3429  * queues is such that a lock should be granted in the future
3430  * or that a bast should be fired, because the new master will
3431  * shuffle the lists on this lockres as soon as it is migrated.
3432  */
dlm_lockres_release_ast(struct dlm_ctxt * dlm,struct dlm_lock_resource * res)3433 void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
3434 			     struct dlm_lock_resource *res)
3435 {
3436 	if (!atomic_dec_and_lock(&res->asts_reserved, &res->spinlock))
3437 		return;
3438 
3439 	if (!res->migration_pending) {
3440 		spin_unlock(&res->spinlock);
3441 		return;
3442 	}
3443 
3444 	BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
3445 	res->migration_pending = 0;
3446 	res->state |= DLM_LOCK_RES_MIGRATING;
3447 	spin_unlock(&res->spinlock);
3448 	wake_up(&res->wq);
3449 	wake_up(&dlm->migration_wq);
3450 }
3451 
dlm_force_free_mles(struct dlm_ctxt * dlm)3452 void dlm_force_free_mles(struct dlm_ctxt *dlm)
3453 {
3454 	int i;
3455 	struct hlist_head *bucket;
3456 	struct dlm_master_list_entry *mle;
3457 	struct hlist_node *tmp;
3458 
3459 	/*
3460 	 * We notified all other nodes that we are exiting the domain and
3461 	 * marked the dlm state to DLM_CTXT_LEAVING. If any mles are still
3462 	 * around we force free them and wake any processes that are waiting
3463 	 * on the mles
3464 	 */
3465 	spin_lock(&dlm->spinlock);
3466 	spin_lock(&dlm->master_lock);
3467 
3468 	BUG_ON(dlm->dlm_state != DLM_CTXT_LEAVING);
3469 	BUG_ON((find_next_bit(dlm->domain_map, O2NM_MAX_NODES, 0) < O2NM_MAX_NODES));
3470 
3471 	for (i = 0; i < DLM_HASH_BUCKETS; i++) {
3472 		bucket = dlm_master_hash(dlm, i);
3473 		hlist_for_each_entry_safe(mle, tmp, bucket, master_hash_node) {
3474 			if (mle->type != DLM_MLE_BLOCK) {
3475 				mlog(ML_ERROR, "bad mle: %p\n", mle);
3476 				dlm_print_one_mle(mle);
3477 			}
3478 			atomic_set(&mle->woken, 1);
3479 			wake_up(&mle->wq);
3480 
3481 			__dlm_unlink_mle(dlm, mle);
3482 			__dlm_mle_detach_hb_events(dlm, mle);
3483 			__dlm_put_mle(mle);
3484 		}
3485 	}
3486 	spin_unlock(&dlm->master_lock);
3487 	spin_unlock(&dlm->spinlock);
3488 }
3489