1 /* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * dlmmod.c
5 *
6 * standalone DLM module
7 *
8 * Copyright (C) 2004 Oracle. All rights reserved.
9 *
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
24 *
25 */
26
27
28 #include <linux/module.h>
29 #include <linux/fs.h>
30 #include <linux/types.h>
31 #include <linux/slab.h>
32 #include <linux/highmem.h>
33 #include <linux/init.h>
34 #include <linux/sysctl.h>
35 #include <linux/random.h>
36 #include <linux/blkdev.h>
37 #include <linux/socket.h>
38 #include <linux/inet.h>
39 #include <linux/spinlock.h>
40 #include <linux/delay.h>
41
42
43 #include "cluster/heartbeat.h"
44 #include "cluster/nodemanager.h"
45 #include "cluster/tcp.h"
46
47 #include "dlmapi.h"
48 #include "dlmcommon.h"
49 #include "dlmdomain.h"
50 #include "dlmdebug.h"
51
52 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
53 #include "cluster/masklog.h"
54
55 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
56 struct dlm_master_list_entry *mle,
57 struct o2nm_node *node,
58 int idx);
59 static void dlm_mle_node_up(struct dlm_ctxt *dlm,
60 struct dlm_master_list_entry *mle,
61 struct o2nm_node *node,
62 int idx);
63
64 static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
65 static int dlm_do_assert_master(struct dlm_ctxt *dlm,
66 struct dlm_lock_resource *res,
67 void *nodemap, u32 flags);
68 static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data);
69
dlm_mle_equal(struct dlm_ctxt * dlm,struct dlm_master_list_entry * mle,const char * name,unsigned int namelen)70 static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
71 struct dlm_master_list_entry *mle,
72 const char *name,
73 unsigned int namelen)
74 {
75 if (dlm != mle->dlm)
76 return 0;
77
78 if (namelen != mle->mnamelen ||
79 memcmp(name, mle->mname, namelen) != 0)
80 return 0;
81
82 return 1;
83 }
84
85 static struct kmem_cache *dlm_lockres_cache;
86 static struct kmem_cache *dlm_lockname_cache;
87 static struct kmem_cache *dlm_mle_cache;
88
89 static void dlm_mle_release(struct kref *kref);
90 static void dlm_init_mle(struct dlm_master_list_entry *mle,
91 enum dlm_mle_type type,
92 struct dlm_ctxt *dlm,
93 struct dlm_lock_resource *res,
94 const char *name,
95 unsigned int namelen);
96 static void dlm_put_mle(struct dlm_master_list_entry *mle);
97 static void __dlm_put_mle(struct dlm_master_list_entry *mle);
98 static int dlm_find_mle(struct dlm_ctxt *dlm,
99 struct dlm_master_list_entry **mle,
100 char *name, unsigned int namelen);
101
102 static int dlm_do_master_request(struct dlm_lock_resource *res,
103 struct dlm_master_list_entry *mle, int to);
104
105
106 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
107 struct dlm_lock_resource *res,
108 struct dlm_master_list_entry *mle,
109 int *blocked);
110 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
111 struct dlm_lock_resource *res,
112 struct dlm_master_list_entry *mle,
113 int blocked);
114 static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
115 struct dlm_lock_resource *res,
116 struct dlm_master_list_entry *mle,
117 struct dlm_master_list_entry **oldmle,
118 const char *name, unsigned int namelen,
119 u8 new_master, u8 master);
120
121 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
122 struct dlm_lock_resource *res);
123 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
124 struct dlm_lock_resource *res);
125 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
126 struct dlm_lock_resource *res,
127 u8 target);
128 static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
129 struct dlm_lock_resource *res);
130
131
dlm_is_host_down(int errno)132 int dlm_is_host_down(int errno)
133 {
134 switch (errno) {
135 case -EBADF:
136 case -ECONNREFUSED:
137 case -ENOTCONN:
138 case -ECONNRESET:
139 case -EPIPE:
140 case -EHOSTDOWN:
141 case -EHOSTUNREACH:
142 case -ETIMEDOUT:
143 case -ECONNABORTED:
144 case -ENETDOWN:
145 case -ENETUNREACH:
146 case -ENETRESET:
147 case -ESHUTDOWN:
148 case -ENOPROTOOPT:
149 case -EINVAL: /* if returned from our tcp code,
150 this means there is no socket */
151 return 1;
152 }
153 return 0;
154 }
155
156
157 /*
158 * MASTER LIST FUNCTIONS
159 */
160
161
162 /*
163 * regarding master list entries and heartbeat callbacks:
164 *
165 * in order to avoid sleeping and allocation that occurs in
166 * heartbeat, master list entries are simply attached to the
167 * dlm's established heartbeat callbacks. the mle is attached
168 * when it is created, and since the dlm->spinlock is held at
169 * that time, any heartbeat event will be properly discovered
170 * by the mle. the mle needs to be detached from the
171 * dlm->mle_hb_events list as soon as heartbeat events are no
172 * longer useful to the mle, and before the mle is freed.
173 *
174 * as a general rule, heartbeat events are no longer needed by
175 * the mle once an "answer" regarding the lock master has been
176 * received.
177 */
__dlm_mle_attach_hb_events(struct dlm_ctxt * dlm,struct dlm_master_list_entry * mle)178 static inline void __dlm_mle_attach_hb_events(struct dlm_ctxt *dlm,
179 struct dlm_master_list_entry *mle)
180 {
181 assert_spin_locked(&dlm->spinlock);
182
183 list_add_tail(&mle->hb_events, &dlm->mle_hb_events);
184 }
185
186
__dlm_mle_detach_hb_events(struct dlm_ctxt * dlm,struct dlm_master_list_entry * mle)187 static inline void __dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
188 struct dlm_master_list_entry *mle)
189 {
190 if (!list_empty(&mle->hb_events))
191 list_del_init(&mle->hb_events);
192 }
193
194
dlm_mle_detach_hb_events(struct dlm_ctxt * dlm,struct dlm_master_list_entry * mle)195 static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
196 struct dlm_master_list_entry *mle)
197 {
198 spin_lock(&dlm->spinlock);
199 __dlm_mle_detach_hb_events(dlm, mle);
200 spin_unlock(&dlm->spinlock);
201 }
202
dlm_get_mle_inuse(struct dlm_master_list_entry * mle)203 static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
204 {
205 struct dlm_ctxt *dlm;
206 dlm = mle->dlm;
207
208 assert_spin_locked(&dlm->spinlock);
209 assert_spin_locked(&dlm->master_lock);
210 mle->inuse++;
211 kref_get(&mle->mle_refs);
212 }
213
dlm_put_mle_inuse(struct dlm_master_list_entry * mle)214 static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
215 {
216 struct dlm_ctxt *dlm;
217 dlm = mle->dlm;
218
219 spin_lock(&dlm->spinlock);
220 spin_lock(&dlm->master_lock);
221 mle->inuse--;
222 __dlm_put_mle(mle);
223 spin_unlock(&dlm->master_lock);
224 spin_unlock(&dlm->spinlock);
225
226 }
227
228 /* remove from list and free */
__dlm_put_mle(struct dlm_master_list_entry * mle)229 static void __dlm_put_mle(struct dlm_master_list_entry *mle)
230 {
231 struct dlm_ctxt *dlm;
232 dlm = mle->dlm;
233
234 assert_spin_locked(&dlm->spinlock);
235 assert_spin_locked(&dlm->master_lock);
236 if (!atomic_read(&mle->mle_refs.refcount)) {
237 /* this may or may not crash, but who cares.
238 * it's a BUG. */
239 mlog(ML_ERROR, "bad mle: %p\n", mle);
240 dlm_print_one_mle(mle);
241 BUG();
242 } else
243 kref_put(&mle->mle_refs, dlm_mle_release);
244 }
245
246
247 /* must not have any spinlocks coming in */
dlm_put_mle(struct dlm_master_list_entry * mle)248 static void dlm_put_mle(struct dlm_master_list_entry *mle)
249 {
250 struct dlm_ctxt *dlm;
251 dlm = mle->dlm;
252
253 spin_lock(&dlm->spinlock);
254 spin_lock(&dlm->master_lock);
255 __dlm_put_mle(mle);
256 spin_unlock(&dlm->master_lock);
257 spin_unlock(&dlm->spinlock);
258 }
259
dlm_get_mle(struct dlm_master_list_entry * mle)260 static inline void dlm_get_mle(struct dlm_master_list_entry *mle)
261 {
262 kref_get(&mle->mle_refs);
263 }
264
dlm_init_mle(struct dlm_master_list_entry * mle,enum dlm_mle_type type,struct dlm_ctxt * dlm,struct dlm_lock_resource * res,const char * name,unsigned int namelen)265 static void dlm_init_mle(struct dlm_master_list_entry *mle,
266 enum dlm_mle_type type,
267 struct dlm_ctxt *dlm,
268 struct dlm_lock_resource *res,
269 const char *name,
270 unsigned int namelen)
271 {
272 assert_spin_locked(&dlm->spinlock);
273
274 mle->dlm = dlm;
275 mle->type = type;
276 INIT_HLIST_NODE(&mle->master_hash_node);
277 INIT_LIST_HEAD(&mle->hb_events);
278 memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
279 spin_lock_init(&mle->spinlock);
280 init_waitqueue_head(&mle->wq);
281 atomic_set(&mle->woken, 0);
282 kref_init(&mle->mle_refs);
283 memset(mle->response_map, 0, sizeof(mle->response_map));
284 mle->master = O2NM_MAX_NODES;
285 mle->new_master = O2NM_MAX_NODES;
286 mle->inuse = 0;
287
288 BUG_ON(mle->type != DLM_MLE_BLOCK &&
289 mle->type != DLM_MLE_MASTER &&
290 mle->type != DLM_MLE_MIGRATION);
291
292 if (mle->type == DLM_MLE_MASTER) {
293 BUG_ON(!res);
294 mle->mleres = res;
295 memcpy(mle->mname, res->lockname.name, res->lockname.len);
296 mle->mnamelen = res->lockname.len;
297 mle->mnamehash = res->lockname.hash;
298 } else {
299 BUG_ON(!name);
300 mle->mleres = NULL;
301 memcpy(mle->mname, name, namelen);
302 mle->mnamelen = namelen;
303 mle->mnamehash = dlm_lockid_hash(name, namelen);
304 }
305
306 atomic_inc(&dlm->mle_tot_count[mle->type]);
307 atomic_inc(&dlm->mle_cur_count[mle->type]);
308
309 /* copy off the node_map and register hb callbacks on our copy */
310 memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map));
311 memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map));
312 clear_bit(dlm->node_num, mle->vote_map);
313 clear_bit(dlm->node_num, mle->node_map);
314
315 /* attach the mle to the domain node up/down events */
316 __dlm_mle_attach_hb_events(dlm, mle);
317 }
318
__dlm_unlink_mle(struct dlm_ctxt * dlm,struct dlm_master_list_entry * mle)319 void __dlm_unlink_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
320 {
321 assert_spin_locked(&dlm->spinlock);
322 assert_spin_locked(&dlm->master_lock);
323
324 if (!hlist_unhashed(&mle->master_hash_node))
325 hlist_del_init(&mle->master_hash_node);
326 }
327
__dlm_insert_mle(struct dlm_ctxt * dlm,struct dlm_master_list_entry * mle)328 void __dlm_insert_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
329 {
330 struct hlist_head *bucket;
331
332 assert_spin_locked(&dlm->master_lock);
333
334 bucket = dlm_master_hash(dlm, mle->mnamehash);
335 hlist_add_head(&mle->master_hash_node, bucket);
336 }
337
338 /* returns 1 if found, 0 if not */
dlm_find_mle(struct dlm_ctxt * dlm,struct dlm_master_list_entry ** mle,char * name,unsigned int namelen)339 static int dlm_find_mle(struct dlm_ctxt *dlm,
340 struct dlm_master_list_entry **mle,
341 char *name, unsigned int namelen)
342 {
343 struct dlm_master_list_entry *tmpmle;
344 struct hlist_head *bucket;
345 unsigned int hash;
346
347 assert_spin_locked(&dlm->master_lock);
348
349 hash = dlm_lockid_hash(name, namelen);
350 bucket = dlm_master_hash(dlm, hash);
351 hlist_for_each_entry(tmpmle, bucket, master_hash_node) {
352 if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
353 continue;
354 dlm_get_mle(tmpmle);
355 *mle = tmpmle;
356 return 1;
357 }
358 return 0;
359 }
360
dlm_hb_event_notify_attached(struct dlm_ctxt * dlm,int idx,int node_up)361 void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up)
362 {
363 struct dlm_master_list_entry *mle;
364
365 assert_spin_locked(&dlm->spinlock);
366
367 list_for_each_entry(mle, &dlm->mle_hb_events, hb_events) {
368 if (node_up)
369 dlm_mle_node_up(dlm, mle, NULL, idx);
370 else
371 dlm_mle_node_down(dlm, mle, NULL, idx);
372 }
373 }
374
dlm_mle_node_down(struct dlm_ctxt * dlm,struct dlm_master_list_entry * mle,struct o2nm_node * node,int idx)375 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
376 struct dlm_master_list_entry *mle,
377 struct o2nm_node *node, int idx)
378 {
379 spin_lock(&mle->spinlock);
380
381 if (!test_bit(idx, mle->node_map))
382 mlog(0, "node %u already removed from nodemap!\n", idx);
383 else
384 clear_bit(idx, mle->node_map);
385
386 spin_unlock(&mle->spinlock);
387 }
388
dlm_mle_node_up(struct dlm_ctxt * dlm,struct dlm_master_list_entry * mle,struct o2nm_node * node,int idx)389 static void dlm_mle_node_up(struct dlm_ctxt *dlm,
390 struct dlm_master_list_entry *mle,
391 struct o2nm_node *node, int idx)
392 {
393 spin_lock(&mle->spinlock);
394
395 if (test_bit(idx, mle->node_map))
396 mlog(0, "node %u already in node map!\n", idx);
397 else
398 set_bit(idx, mle->node_map);
399
400 spin_unlock(&mle->spinlock);
401 }
402
403
dlm_init_mle_cache(void)404 int dlm_init_mle_cache(void)
405 {
406 dlm_mle_cache = kmem_cache_create("o2dlm_mle",
407 sizeof(struct dlm_master_list_entry),
408 0, SLAB_HWCACHE_ALIGN,
409 NULL);
410 if (dlm_mle_cache == NULL)
411 return -ENOMEM;
412 return 0;
413 }
414
dlm_destroy_mle_cache(void)415 void dlm_destroy_mle_cache(void)
416 {
417 if (dlm_mle_cache)
418 kmem_cache_destroy(dlm_mle_cache);
419 }
420
dlm_mle_release(struct kref * kref)421 static void dlm_mle_release(struct kref *kref)
422 {
423 struct dlm_master_list_entry *mle;
424 struct dlm_ctxt *dlm;
425
426 mle = container_of(kref, struct dlm_master_list_entry, mle_refs);
427 dlm = mle->dlm;
428
429 assert_spin_locked(&dlm->spinlock);
430 assert_spin_locked(&dlm->master_lock);
431
432 mlog(0, "Releasing mle for %.*s, type %d\n", mle->mnamelen, mle->mname,
433 mle->type);
434
435 /* remove from list if not already */
436 __dlm_unlink_mle(dlm, mle);
437
438 /* detach the mle from the domain node up/down events */
439 __dlm_mle_detach_hb_events(dlm, mle);
440
441 atomic_dec(&dlm->mle_cur_count[mle->type]);
442
443 /* NOTE: kfree under spinlock here.
444 * if this is bad, we can move this to a freelist. */
445 kmem_cache_free(dlm_mle_cache, mle);
446 }
447
448
449 /*
450 * LOCK RESOURCE FUNCTIONS
451 */
452
dlm_init_master_caches(void)453 int dlm_init_master_caches(void)
454 {
455 dlm_lockres_cache = kmem_cache_create("o2dlm_lockres",
456 sizeof(struct dlm_lock_resource),
457 0, SLAB_HWCACHE_ALIGN, NULL);
458 if (!dlm_lockres_cache)
459 goto bail;
460
461 dlm_lockname_cache = kmem_cache_create("o2dlm_lockname",
462 DLM_LOCKID_NAME_MAX, 0,
463 SLAB_HWCACHE_ALIGN, NULL);
464 if (!dlm_lockname_cache)
465 goto bail;
466
467 return 0;
468 bail:
469 dlm_destroy_master_caches();
470 return -ENOMEM;
471 }
472
dlm_destroy_master_caches(void)473 void dlm_destroy_master_caches(void)
474 {
475 if (dlm_lockname_cache) {
476 kmem_cache_destroy(dlm_lockname_cache);
477 dlm_lockname_cache = NULL;
478 }
479
480 if (dlm_lockres_cache) {
481 kmem_cache_destroy(dlm_lockres_cache);
482 dlm_lockres_cache = NULL;
483 }
484 }
485
dlm_lockres_release(struct kref * kref)486 static void dlm_lockres_release(struct kref *kref)
487 {
488 struct dlm_lock_resource *res;
489 struct dlm_ctxt *dlm;
490
491 res = container_of(kref, struct dlm_lock_resource, refs);
492 dlm = res->dlm;
493
494 /* This should not happen -- all lockres' have a name
495 * associated with them at init time. */
496 BUG_ON(!res->lockname.name);
497
498 mlog(0, "destroying lockres %.*s\n", res->lockname.len,
499 res->lockname.name);
500
501 spin_lock(&dlm->track_lock);
502 if (!list_empty(&res->tracking))
503 list_del_init(&res->tracking);
504 else {
505 mlog(ML_ERROR, "Resource %.*s not on the Tracking list\n",
506 res->lockname.len, res->lockname.name);
507 dlm_print_one_lock_resource(res);
508 }
509 spin_unlock(&dlm->track_lock);
510
511 atomic_dec(&dlm->res_cur_count);
512
513 if (!hlist_unhashed(&res->hash_node) ||
514 !list_empty(&res->granted) ||
515 !list_empty(&res->converting) ||
516 !list_empty(&res->blocked) ||
517 !list_empty(&res->dirty) ||
518 !list_empty(&res->recovering) ||
519 !list_empty(&res->purge)) {
520 mlog(ML_ERROR,
521 "Going to BUG for resource %.*s."
522 " We're on a list! [%c%c%c%c%c%c%c]\n",
523 res->lockname.len, res->lockname.name,
524 !hlist_unhashed(&res->hash_node) ? 'H' : ' ',
525 !list_empty(&res->granted) ? 'G' : ' ',
526 !list_empty(&res->converting) ? 'C' : ' ',
527 !list_empty(&res->blocked) ? 'B' : ' ',
528 !list_empty(&res->dirty) ? 'D' : ' ',
529 !list_empty(&res->recovering) ? 'R' : ' ',
530 !list_empty(&res->purge) ? 'P' : ' ');
531
532 dlm_print_one_lock_resource(res);
533 }
534
535 /* By the time we're ready to blow this guy away, we shouldn't
536 * be on any lists. */
537 BUG_ON(!hlist_unhashed(&res->hash_node));
538 BUG_ON(!list_empty(&res->granted));
539 BUG_ON(!list_empty(&res->converting));
540 BUG_ON(!list_empty(&res->blocked));
541 BUG_ON(!list_empty(&res->dirty));
542 BUG_ON(!list_empty(&res->recovering));
543 BUG_ON(!list_empty(&res->purge));
544
545 kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
546
547 kmem_cache_free(dlm_lockres_cache, res);
548 }
549
dlm_lockres_put(struct dlm_lock_resource * res)550 void dlm_lockres_put(struct dlm_lock_resource *res)
551 {
552 kref_put(&res->refs, dlm_lockres_release);
553 }
554
dlm_init_lockres(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,const char * name,unsigned int namelen)555 static void dlm_init_lockres(struct dlm_ctxt *dlm,
556 struct dlm_lock_resource *res,
557 const char *name, unsigned int namelen)
558 {
559 char *qname;
560
561 /* If we memset here, we lose our reference to the kmalloc'd
562 * res->lockname.name, so be sure to init every field
563 * correctly! */
564
565 qname = (char *) res->lockname.name;
566 memcpy(qname, name, namelen);
567
568 res->lockname.len = namelen;
569 res->lockname.hash = dlm_lockid_hash(name, namelen);
570
571 init_waitqueue_head(&res->wq);
572 spin_lock_init(&res->spinlock);
573 INIT_HLIST_NODE(&res->hash_node);
574 INIT_LIST_HEAD(&res->granted);
575 INIT_LIST_HEAD(&res->converting);
576 INIT_LIST_HEAD(&res->blocked);
577 INIT_LIST_HEAD(&res->dirty);
578 INIT_LIST_HEAD(&res->recovering);
579 INIT_LIST_HEAD(&res->purge);
580 INIT_LIST_HEAD(&res->tracking);
581 atomic_set(&res->asts_reserved, 0);
582 res->migration_pending = 0;
583 res->inflight_locks = 0;
584 res->inflight_assert_workers = 0;
585
586 res->dlm = dlm;
587
588 kref_init(&res->refs);
589
590 atomic_inc(&dlm->res_tot_count);
591 atomic_inc(&dlm->res_cur_count);
592
593 /* just for consistency */
594 spin_lock(&res->spinlock);
595 dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
596 spin_unlock(&res->spinlock);
597
598 res->state = DLM_LOCK_RES_IN_PROGRESS;
599
600 res->last_used = 0;
601
602 spin_lock(&dlm->spinlock);
603 list_add_tail(&res->tracking, &dlm->tracking_list);
604 spin_unlock(&dlm->spinlock);
605
606 memset(res->lvb, 0, DLM_LVB_LEN);
607 memset(res->refmap, 0, sizeof(res->refmap));
608 }
609
dlm_new_lockres(struct dlm_ctxt * dlm,const char * name,unsigned int namelen)610 struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
611 const char *name,
612 unsigned int namelen)
613 {
614 struct dlm_lock_resource *res = NULL;
615
616 res = kmem_cache_zalloc(dlm_lockres_cache, GFP_NOFS);
617 if (!res)
618 goto error;
619
620 res->lockname.name = kmem_cache_zalloc(dlm_lockname_cache, GFP_NOFS);
621 if (!res->lockname.name)
622 goto error;
623
624 dlm_init_lockres(dlm, res, name, namelen);
625 return res;
626
627 error:
628 if (res)
629 kmem_cache_free(dlm_lockres_cache, res);
630 return NULL;
631 }
632
dlm_lockres_set_refmap_bit(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,int bit)633 void dlm_lockres_set_refmap_bit(struct dlm_ctxt *dlm,
634 struct dlm_lock_resource *res, int bit)
635 {
636 assert_spin_locked(&res->spinlock);
637
638 mlog(0, "res %.*s, set node %u, %ps()\n", res->lockname.len,
639 res->lockname.name, bit, __builtin_return_address(0));
640
641 set_bit(bit, res->refmap);
642 }
643
dlm_lockres_clear_refmap_bit(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,int bit)644 void dlm_lockres_clear_refmap_bit(struct dlm_ctxt *dlm,
645 struct dlm_lock_resource *res, int bit)
646 {
647 assert_spin_locked(&res->spinlock);
648
649 mlog(0, "res %.*s, clr node %u, %ps()\n", res->lockname.len,
650 res->lockname.name, bit, __builtin_return_address(0));
651
652 clear_bit(bit, res->refmap);
653 }
654
__dlm_lockres_grab_inflight_ref(struct dlm_ctxt * dlm,struct dlm_lock_resource * res)655 static void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
656 struct dlm_lock_resource *res)
657 {
658 res->inflight_locks++;
659
660 mlog(0, "%s: res %.*s, inflight++: now %u, %ps()\n", dlm->name,
661 res->lockname.len, res->lockname.name, res->inflight_locks,
662 __builtin_return_address(0));
663 }
664
dlm_lockres_grab_inflight_ref(struct dlm_ctxt * dlm,struct dlm_lock_resource * res)665 void dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
666 struct dlm_lock_resource *res)
667 {
668 assert_spin_locked(&res->spinlock);
669 __dlm_lockres_grab_inflight_ref(dlm, res);
670 }
671
dlm_lockres_drop_inflight_ref(struct dlm_ctxt * dlm,struct dlm_lock_resource * res)672 void dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
673 struct dlm_lock_resource *res)
674 {
675 assert_spin_locked(&res->spinlock);
676
677 BUG_ON(res->inflight_locks == 0);
678
679 res->inflight_locks--;
680
681 mlog(0, "%s: res %.*s, inflight--: now %u, %ps()\n", dlm->name,
682 res->lockname.len, res->lockname.name, res->inflight_locks,
683 __builtin_return_address(0));
684
685 wake_up(&res->wq);
686 }
687
__dlm_lockres_grab_inflight_worker(struct dlm_ctxt * dlm,struct dlm_lock_resource * res)688 void __dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm,
689 struct dlm_lock_resource *res)
690 {
691 assert_spin_locked(&res->spinlock);
692 res->inflight_assert_workers++;
693 mlog(0, "%s:%.*s: inflight assert worker++: now %u\n",
694 dlm->name, res->lockname.len, res->lockname.name,
695 res->inflight_assert_workers);
696 }
697
dlm_lockres_grab_inflight_worker(struct dlm_ctxt * dlm,struct dlm_lock_resource * res)698 static void dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm,
699 struct dlm_lock_resource *res)
700 {
701 spin_lock(&res->spinlock);
702 __dlm_lockres_grab_inflight_worker(dlm, res);
703 spin_unlock(&res->spinlock);
704 }
705
__dlm_lockres_drop_inflight_worker(struct dlm_ctxt * dlm,struct dlm_lock_resource * res)706 static void __dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
707 struct dlm_lock_resource *res)
708 {
709 assert_spin_locked(&res->spinlock);
710 BUG_ON(res->inflight_assert_workers == 0);
711 res->inflight_assert_workers--;
712 mlog(0, "%s:%.*s: inflight assert worker--: now %u\n",
713 dlm->name, res->lockname.len, res->lockname.name,
714 res->inflight_assert_workers);
715 }
716
dlm_lockres_drop_inflight_worker(struct dlm_ctxt * dlm,struct dlm_lock_resource * res)717 static void dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
718 struct dlm_lock_resource *res)
719 {
720 spin_lock(&res->spinlock);
721 __dlm_lockres_drop_inflight_worker(dlm, res);
722 spin_unlock(&res->spinlock);
723 }
724
725 /*
726 * lookup a lock resource by name.
727 * may already exist in the hashtable.
728 * lockid is null terminated
729 *
730 * if not, allocate enough for the lockres and for
731 * the temporary structure used in doing the mastering.
732 *
733 * also, do a lookup in the dlm->master_list to see
734 * if another node has begun mastering the same lock.
735 * if so, there should be a block entry in there
736 * for this name, and we should *not* attempt to master
737 * the lock here. need to wait around for that node
738 * to assert_master (or die).
739 *
740 */
dlm_get_lock_resource(struct dlm_ctxt * dlm,const char * lockid,int namelen,int flags)741 struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
742 const char *lockid,
743 int namelen,
744 int flags)
745 {
746 struct dlm_lock_resource *tmpres=NULL, *res=NULL;
747 struct dlm_master_list_entry *mle = NULL;
748 struct dlm_master_list_entry *alloc_mle = NULL;
749 int blocked = 0;
750 int ret, nodenum;
751 struct dlm_node_iter iter;
752 unsigned int hash;
753 int tries = 0;
754 int bit, wait_on_recovery = 0;
755
756 BUG_ON(!lockid);
757
758 hash = dlm_lockid_hash(lockid, namelen);
759
760 mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
761
762 lookup:
763 spin_lock(&dlm->spinlock);
764 tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
765 if (tmpres) {
766 spin_unlock(&dlm->spinlock);
767 spin_lock(&tmpres->spinlock);
768
769 /*
770 * Right after dlm spinlock was released, dlm_thread could have
771 * purged the lockres. Check if lockres got unhashed. If so
772 * start over.
773 */
774 if (hlist_unhashed(&tmpres->hash_node)) {
775 spin_unlock(&tmpres->spinlock);
776 dlm_lockres_put(tmpres);
777 tmpres = NULL;
778 goto lookup;
779 }
780
781 /* Wait on the thread that is mastering the resource */
782 if (tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
783 __dlm_wait_on_lockres(tmpres);
784 BUG_ON(tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN);
785 spin_unlock(&tmpres->spinlock);
786 dlm_lockres_put(tmpres);
787 tmpres = NULL;
788 goto lookup;
789 }
790
791 /* Wait on the resource purge to complete before continuing */
792 if (tmpres->state & DLM_LOCK_RES_DROPPING_REF) {
793 BUG_ON(tmpres->owner == dlm->node_num);
794 __dlm_wait_on_lockres_flags(tmpres,
795 DLM_LOCK_RES_DROPPING_REF);
796 spin_unlock(&tmpres->spinlock);
797 dlm_lockres_put(tmpres);
798 tmpres = NULL;
799 goto lookup;
800 }
801
802 /* Grab inflight ref to pin the resource */
803 dlm_lockres_grab_inflight_ref(dlm, tmpres);
804
805 spin_unlock(&tmpres->spinlock);
806 if (res)
807 dlm_lockres_put(res);
808 res = tmpres;
809 goto leave;
810 }
811
812 if (!res) {
813 spin_unlock(&dlm->spinlock);
814 mlog(0, "allocating a new resource\n");
815 /* nothing found and we need to allocate one. */
816 alloc_mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
817 if (!alloc_mle)
818 goto leave;
819 res = dlm_new_lockres(dlm, lockid, namelen);
820 if (!res)
821 goto leave;
822 goto lookup;
823 }
824
825 mlog(0, "no lockres found, allocated our own: %p\n", res);
826
827 if (flags & LKM_LOCAL) {
828 /* caller knows it's safe to assume it's not mastered elsewhere
829 * DONE! return right away */
830 spin_lock(&res->spinlock);
831 dlm_change_lockres_owner(dlm, res, dlm->node_num);
832 __dlm_insert_lockres(dlm, res);
833 dlm_lockres_grab_inflight_ref(dlm, res);
834 spin_unlock(&res->spinlock);
835 spin_unlock(&dlm->spinlock);
836 /* lockres still marked IN_PROGRESS */
837 goto wake_waiters;
838 }
839
840 /* check master list to see if another node has started mastering it */
841 spin_lock(&dlm->master_lock);
842
843 /* if we found a block, wait for lock to be mastered by another node */
844 blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
845 if (blocked) {
846 int mig;
847 if (mle->type == DLM_MLE_MASTER) {
848 mlog(ML_ERROR, "master entry for nonexistent lock!\n");
849 BUG();
850 }
851 mig = (mle->type == DLM_MLE_MIGRATION);
852 /* if there is a migration in progress, let the migration
853 * finish before continuing. we can wait for the absence
854 * of the MIGRATION mle: either the migrate finished or
855 * one of the nodes died and the mle was cleaned up.
856 * if there is a BLOCK here, but it already has a master
857 * set, we are too late. the master does not have a ref
858 * for us in the refmap. detach the mle and drop it.
859 * either way, go back to the top and start over. */
860 if (mig || mle->master != O2NM_MAX_NODES) {
861 BUG_ON(mig && mle->master == dlm->node_num);
862 /* we arrived too late. the master does not
863 * have a ref for us. retry. */
864 mlog(0, "%s:%.*s: late on %s\n",
865 dlm->name, namelen, lockid,
866 mig ? "MIGRATION" : "BLOCK");
867 spin_unlock(&dlm->master_lock);
868 spin_unlock(&dlm->spinlock);
869
870 /* master is known, detach */
871 if (!mig)
872 dlm_mle_detach_hb_events(dlm, mle);
873 dlm_put_mle(mle);
874 mle = NULL;
875 /* this is lame, but we can't wait on either
876 * the mle or lockres waitqueue here */
877 if (mig)
878 msleep(100);
879 goto lookup;
880 }
881 } else {
882 /* go ahead and try to master lock on this node */
883 mle = alloc_mle;
884 /* make sure this does not get freed below */
885 alloc_mle = NULL;
886 dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
887 set_bit(dlm->node_num, mle->maybe_map);
888 __dlm_insert_mle(dlm, mle);
889
890 /* still holding the dlm spinlock, check the recovery map
891 * to see if there are any nodes that still need to be
892 * considered. these will not appear in the mle nodemap
893 * but they might own this lockres. wait on them. */
894 bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
895 if (bit < O2NM_MAX_NODES) {
896 mlog(0, "%s: res %.*s, At least one node (%d) "
897 "to recover before lock mastery can begin\n",
898 dlm->name, namelen, (char *)lockid, bit);
899 wait_on_recovery = 1;
900 }
901 }
902
903 /* at this point there is either a DLM_MLE_BLOCK or a
904 * DLM_MLE_MASTER on the master list, so it's safe to add the
905 * lockres to the hashtable. anyone who finds the lock will
906 * still have to wait on the IN_PROGRESS. */
907
908 /* finally add the lockres to its hash bucket */
909 __dlm_insert_lockres(dlm, res);
910
911 /* since this lockres is new it doesn't not require the spinlock */
912 __dlm_lockres_grab_inflight_ref(dlm, res);
913
914 /* get an extra ref on the mle in case this is a BLOCK
915 * if so, the creator of the BLOCK may try to put the last
916 * ref at this time in the assert master handler, so we
917 * need an extra one to keep from a bad ptr deref. */
918 dlm_get_mle_inuse(mle);
919 spin_unlock(&dlm->master_lock);
920 spin_unlock(&dlm->spinlock);
921
922 redo_request:
923 while (wait_on_recovery) {
924 /* any cluster changes that occurred after dropping the
925 * dlm spinlock would be detectable be a change on the mle,
926 * so we only need to clear out the recovery map once. */
927 if (dlm_is_recovery_lock(lockid, namelen)) {
928 mlog(0, "%s: Recovery map is not empty, but must "
929 "master $RECOVERY lock now\n", dlm->name);
930 if (!dlm_pre_master_reco_lockres(dlm, res))
931 wait_on_recovery = 0;
932 else {
933 mlog(0, "%s: waiting 500ms for heartbeat state "
934 "change\n", dlm->name);
935 msleep(500);
936 }
937 continue;
938 }
939
940 dlm_kick_recovery_thread(dlm);
941 msleep(1000);
942 dlm_wait_for_recovery(dlm);
943
944 spin_lock(&dlm->spinlock);
945 bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
946 if (bit < O2NM_MAX_NODES) {
947 mlog(0, "%s: res %.*s, At least one node (%d) "
948 "to recover before lock mastery can begin\n",
949 dlm->name, namelen, (char *)lockid, bit);
950 wait_on_recovery = 1;
951 } else
952 wait_on_recovery = 0;
953 spin_unlock(&dlm->spinlock);
954
955 if (wait_on_recovery)
956 dlm_wait_for_node_recovery(dlm, bit, 10000);
957 }
958
959 /* must wait for lock to be mastered elsewhere */
960 if (blocked)
961 goto wait;
962
963 ret = -EINVAL;
964 dlm_node_iter_init(mle->vote_map, &iter);
965 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
966 ret = dlm_do_master_request(res, mle, nodenum);
967 if (ret < 0)
968 mlog_errno(ret);
969 if (mle->master != O2NM_MAX_NODES) {
970 /* found a master ! */
971 if (mle->master <= nodenum)
972 break;
973 /* if our master request has not reached the master
974 * yet, keep going until it does. this is how the
975 * master will know that asserts are needed back to
976 * the lower nodes. */
977 mlog(0, "%s: res %.*s, Requests only up to %u but "
978 "master is %u, keep going\n", dlm->name, namelen,
979 lockid, nodenum, mle->master);
980 }
981 }
982
983 wait:
984 /* keep going until the response map includes all nodes */
985 ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
986 if (ret < 0) {
987 wait_on_recovery = 1;
988 mlog(0, "%s: res %.*s, Node map changed, redo the master "
989 "request now, blocked=%d\n", dlm->name, res->lockname.len,
990 res->lockname.name, blocked);
991 if (++tries > 20) {
992 mlog(ML_ERROR, "%s: res %.*s, Spinning on "
993 "dlm_wait_for_lock_mastery, blocked = %d\n",
994 dlm->name, res->lockname.len,
995 res->lockname.name, blocked);
996 dlm_print_one_lock_resource(res);
997 dlm_print_one_mle(mle);
998 tries = 0;
999 }
1000 goto redo_request;
1001 }
1002
1003 mlog(0, "%s: res %.*s, Mastered by %u\n", dlm->name, res->lockname.len,
1004 res->lockname.name, res->owner);
1005 /* make sure we never continue without this */
1006 BUG_ON(res->owner == O2NM_MAX_NODES);
1007
1008 /* master is known, detach if not already detached */
1009 dlm_mle_detach_hb_events(dlm, mle);
1010 dlm_put_mle(mle);
1011 /* put the extra ref */
1012 dlm_put_mle_inuse(mle);
1013
1014 wake_waiters:
1015 spin_lock(&res->spinlock);
1016 res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
1017 spin_unlock(&res->spinlock);
1018 wake_up(&res->wq);
1019
1020 leave:
1021 /* need to free the unused mle */
1022 if (alloc_mle)
1023 kmem_cache_free(dlm_mle_cache, alloc_mle);
1024
1025 return res;
1026 }
1027
1028
1029 #define DLM_MASTERY_TIMEOUT_MS 5000
1030
dlm_wait_for_lock_mastery(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,struct dlm_master_list_entry * mle,int * blocked)1031 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
1032 struct dlm_lock_resource *res,
1033 struct dlm_master_list_entry *mle,
1034 int *blocked)
1035 {
1036 u8 m;
1037 int ret, bit;
1038 int map_changed, voting_done;
1039 int assert, sleep;
1040
1041 recheck:
1042 ret = 0;
1043 assert = 0;
1044
1045 /* check if another node has already become the owner */
1046 spin_lock(&res->spinlock);
1047 if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1048 mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name,
1049 res->lockname.len, res->lockname.name, res->owner);
1050 spin_unlock(&res->spinlock);
1051 /* this will cause the master to re-assert across
1052 * the whole cluster, freeing up mles */
1053 if (res->owner != dlm->node_num) {
1054 ret = dlm_do_master_request(res, mle, res->owner);
1055 if (ret < 0) {
1056 /* give recovery a chance to run */
1057 mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
1058 msleep(500);
1059 goto recheck;
1060 }
1061 }
1062 ret = 0;
1063 goto leave;
1064 }
1065 spin_unlock(&res->spinlock);
1066
1067 spin_lock(&mle->spinlock);
1068 m = mle->master;
1069 map_changed = (memcmp(mle->vote_map, mle->node_map,
1070 sizeof(mle->vote_map)) != 0);
1071 voting_done = (memcmp(mle->vote_map, mle->response_map,
1072 sizeof(mle->vote_map)) == 0);
1073
1074 /* restart if we hit any errors */
1075 if (map_changed) {
1076 int b;
1077 mlog(0, "%s: %.*s: node map changed, restarting\n",
1078 dlm->name, res->lockname.len, res->lockname.name);
1079 ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);
1080 b = (mle->type == DLM_MLE_BLOCK);
1081 if ((*blocked && !b) || (!*blocked && b)) {
1082 mlog(0, "%s:%.*s: status change: old=%d new=%d\n",
1083 dlm->name, res->lockname.len, res->lockname.name,
1084 *blocked, b);
1085 *blocked = b;
1086 }
1087 spin_unlock(&mle->spinlock);
1088 if (ret < 0) {
1089 mlog_errno(ret);
1090 goto leave;
1091 }
1092 mlog(0, "%s:%.*s: restart lock mastery succeeded, "
1093 "rechecking now\n", dlm->name, res->lockname.len,
1094 res->lockname.name);
1095 goto recheck;
1096 } else {
1097 if (!voting_done) {
1098 mlog(0, "map not changed and voting not done "
1099 "for %s:%.*s\n", dlm->name, res->lockname.len,
1100 res->lockname.name);
1101 }
1102 }
1103
1104 if (m != O2NM_MAX_NODES) {
1105 /* another node has done an assert!
1106 * all done! */
1107 sleep = 0;
1108 } else {
1109 sleep = 1;
1110 /* have all nodes responded? */
1111 if (voting_done && !*blocked) {
1112 bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
1113 if (dlm->node_num <= bit) {
1114 /* my node number is lowest.
1115 * now tell other nodes that I am
1116 * mastering this. */
1117 mle->master = dlm->node_num;
1118 /* ref was grabbed in get_lock_resource
1119 * will be dropped in dlmlock_master */
1120 assert = 1;
1121 sleep = 0;
1122 }
1123 /* if voting is done, but we have not received
1124 * an assert master yet, we must sleep */
1125 }
1126 }
1127
1128 spin_unlock(&mle->spinlock);
1129
1130 /* sleep if we haven't finished voting yet */
1131 if (sleep) {
1132 unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS);
1133
1134 /*
1135 if (atomic_read(&mle->mle_refs.refcount) < 2)
1136 mlog(ML_ERROR, "mle (%p) refs=%d, name=%.*s\n", mle,
1137 atomic_read(&mle->mle_refs.refcount),
1138 res->lockname.len, res->lockname.name);
1139 */
1140 atomic_set(&mle->woken, 0);
1141 (void)wait_event_timeout(mle->wq,
1142 (atomic_read(&mle->woken) == 1),
1143 timeo);
1144 if (res->owner == O2NM_MAX_NODES) {
1145 mlog(0, "%s:%.*s: waiting again\n", dlm->name,
1146 res->lockname.len, res->lockname.name);
1147 goto recheck;
1148 }
1149 mlog(0, "done waiting, master is %u\n", res->owner);
1150 ret = 0;
1151 goto leave;
1152 }
1153
1154 ret = 0; /* done */
1155 if (assert) {
1156 m = dlm->node_num;
1157 mlog(0, "about to master %.*s here, this=%u\n",
1158 res->lockname.len, res->lockname.name, m);
1159 ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0);
1160 if (ret) {
1161 /* This is a failure in the network path,
1162 * not in the response to the assert_master
1163 * (any nonzero response is a BUG on this node).
1164 * Most likely a socket just got disconnected
1165 * due to node death. */
1166 mlog_errno(ret);
1167 }
1168 /* no longer need to restart lock mastery.
1169 * all living nodes have been contacted. */
1170 ret = 0;
1171 }
1172
1173 /* set the lockres owner */
1174 spin_lock(&res->spinlock);
1175 /* mastery reference obtained either during
1176 * assert_master_handler or in get_lock_resource */
1177 dlm_change_lockres_owner(dlm, res, m);
1178 spin_unlock(&res->spinlock);
1179
1180 leave:
1181 return ret;
1182 }
1183
1184 struct dlm_bitmap_diff_iter
1185 {
1186 int curnode;
1187 unsigned long *orig_bm;
1188 unsigned long *cur_bm;
1189 unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];
1190 };
1191
1192 enum dlm_node_state_change
1193 {
1194 NODE_DOWN = -1,
1195 NODE_NO_CHANGE = 0,
1196 NODE_UP
1197 };
1198
dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter * iter,unsigned long * orig_bm,unsigned long * cur_bm)1199 static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,
1200 unsigned long *orig_bm,
1201 unsigned long *cur_bm)
1202 {
1203 unsigned long p1, p2;
1204 int i;
1205
1206 iter->curnode = -1;
1207 iter->orig_bm = orig_bm;
1208 iter->cur_bm = cur_bm;
1209
1210 for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {
1211 p1 = *(iter->orig_bm + i);
1212 p2 = *(iter->cur_bm + i);
1213 iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);
1214 }
1215 }
1216
dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter * iter,enum dlm_node_state_change * state)1217 static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,
1218 enum dlm_node_state_change *state)
1219 {
1220 int bit;
1221
1222 if (iter->curnode >= O2NM_MAX_NODES)
1223 return -ENOENT;
1224
1225 bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES,
1226 iter->curnode+1);
1227 if (bit >= O2NM_MAX_NODES) {
1228 iter->curnode = O2NM_MAX_NODES;
1229 return -ENOENT;
1230 }
1231
1232 /* if it was there in the original then this node died */
1233 if (test_bit(bit, iter->orig_bm))
1234 *state = NODE_DOWN;
1235 else
1236 *state = NODE_UP;
1237
1238 iter->curnode = bit;
1239 return bit;
1240 }
1241
1242
dlm_restart_lock_mastery(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,struct dlm_master_list_entry * mle,int blocked)1243 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1244 struct dlm_lock_resource *res,
1245 struct dlm_master_list_entry *mle,
1246 int blocked)
1247 {
1248 struct dlm_bitmap_diff_iter bdi;
1249 enum dlm_node_state_change sc;
1250 int node;
1251 int ret = 0;
1252
1253 mlog(0, "something happened such that the "
1254 "master process may need to be restarted!\n");
1255
1256 assert_spin_locked(&mle->spinlock);
1257
1258 dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);
1259 node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1260 while (node >= 0) {
1261 if (sc == NODE_UP) {
1262 /* a node came up. clear any old vote from
1263 * the response map and set it in the vote map
1264 * then restart the mastery. */
1265 mlog(ML_NOTICE, "node %d up while restarting\n", node);
1266
1267 /* redo the master request, but only for the new node */
1268 mlog(0, "sending request to new node\n");
1269 clear_bit(node, mle->response_map);
1270 set_bit(node, mle->vote_map);
1271 } else {
1272 mlog(ML_ERROR, "node down! %d\n", node);
1273 if (blocked) {
1274 int lowest = find_next_bit(mle->maybe_map,
1275 O2NM_MAX_NODES, 0);
1276
1277 /* act like it was never there */
1278 clear_bit(node, mle->maybe_map);
1279
1280 if (node == lowest) {
1281 mlog(0, "expected master %u died"
1282 " while this node was blocked "
1283 "waiting on it!\n", node);
1284 lowest = find_next_bit(mle->maybe_map,
1285 O2NM_MAX_NODES,
1286 lowest+1);
1287 if (lowest < O2NM_MAX_NODES) {
1288 mlog(0, "%s:%.*s:still "
1289 "blocked. waiting on %u "
1290 "now\n", dlm->name,
1291 res->lockname.len,
1292 res->lockname.name,
1293 lowest);
1294 } else {
1295 /* mle is an MLE_BLOCK, but
1296 * there is now nothing left to
1297 * block on. we need to return
1298 * all the way back out and try
1299 * again with an MLE_MASTER.
1300 * dlm_do_local_recovery_cleanup
1301 * has already run, so the mle
1302 * refcount is ok */
1303 mlog(0, "%s:%.*s: no "
1304 "longer blocking. try to "
1305 "master this here\n",
1306 dlm->name,
1307 res->lockname.len,
1308 res->lockname.name);
1309 mle->type = DLM_MLE_MASTER;
1310 mle->mleres = res;
1311 }
1312 }
1313 }
1314
1315 /* now blank out everything, as if we had never
1316 * contacted anyone */
1317 memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
1318 memset(mle->response_map, 0, sizeof(mle->response_map));
1319 /* reset the vote_map to the current node_map */
1320 memcpy(mle->vote_map, mle->node_map,
1321 sizeof(mle->node_map));
1322 /* put myself into the maybe map */
1323 if (mle->type != DLM_MLE_BLOCK)
1324 set_bit(dlm->node_num, mle->maybe_map);
1325 }
1326 ret = -EAGAIN;
1327 node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1328 }
1329 return ret;
1330 }
1331
1332
1333 /*
1334 * DLM_MASTER_REQUEST_MSG
1335 *
1336 * returns: 0 on success,
1337 * -errno on a network error
1338 *
1339 * on error, the caller should assume the target node is "dead"
1340 *
1341 */
1342
dlm_do_master_request(struct dlm_lock_resource * res,struct dlm_master_list_entry * mle,int to)1343 static int dlm_do_master_request(struct dlm_lock_resource *res,
1344 struct dlm_master_list_entry *mle, int to)
1345 {
1346 struct dlm_ctxt *dlm = mle->dlm;
1347 struct dlm_master_request request;
1348 int ret, response=0, resend;
1349
1350 memset(&request, 0, sizeof(request));
1351 request.node_idx = dlm->node_num;
1352
1353 BUG_ON(mle->type == DLM_MLE_MIGRATION);
1354
1355 request.namelen = (u8)mle->mnamelen;
1356 memcpy(request.name, mle->mname, request.namelen);
1357
1358 again:
1359 ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,
1360 sizeof(request), to, &response);
1361 if (ret < 0) {
1362 if (ret == -ESRCH) {
1363 /* should never happen */
1364 mlog(ML_ERROR, "TCP stack not ready!\n");
1365 BUG();
1366 } else if (ret == -EINVAL) {
1367 mlog(ML_ERROR, "bad args passed to o2net!\n");
1368 BUG();
1369 } else if (ret == -ENOMEM) {
1370 mlog(ML_ERROR, "out of memory while trying to send "
1371 "network message! retrying\n");
1372 /* this is totally crude */
1373 msleep(50);
1374 goto again;
1375 } else if (!dlm_is_host_down(ret)) {
1376 /* not a network error. bad. */
1377 mlog_errno(ret);
1378 mlog(ML_ERROR, "unhandled error!");
1379 BUG();
1380 }
1381 /* all other errors should be network errors,
1382 * and likely indicate node death */
1383 mlog(ML_ERROR, "link to %d went down!\n", to);
1384 goto out;
1385 }
1386
1387 ret = 0;
1388 resend = 0;
1389 spin_lock(&mle->spinlock);
1390 switch (response) {
1391 case DLM_MASTER_RESP_YES:
1392 set_bit(to, mle->response_map);
1393 mlog(0, "node %u is the master, response=YES\n", to);
1394 mlog(0, "%s:%.*s: master node %u now knows I have a "
1395 "reference\n", dlm->name, res->lockname.len,
1396 res->lockname.name, to);
1397 mle->master = to;
1398 break;
1399 case DLM_MASTER_RESP_NO:
1400 mlog(0, "node %u not master, response=NO\n", to);
1401 set_bit(to, mle->response_map);
1402 break;
1403 case DLM_MASTER_RESP_MAYBE:
1404 mlog(0, "node %u not master, response=MAYBE\n", to);
1405 set_bit(to, mle->response_map);
1406 set_bit(to, mle->maybe_map);
1407 break;
1408 case DLM_MASTER_RESP_ERROR:
1409 mlog(0, "node %u hit an error, resending\n", to);
1410 resend = 1;
1411 response = 0;
1412 break;
1413 default:
1414 mlog(ML_ERROR, "bad response! %u\n", response);
1415 BUG();
1416 }
1417 spin_unlock(&mle->spinlock);
1418 if (resend) {
1419 /* this is also totally crude */
1420 msleep(50);
1421 goto again;
1422 }
1423
1424 out:
1425 return ret;
1426 }
1427
1428 /*
1429 * locks that can be taken here:
1430 * dlm->spinlock
1431 * res->spinlock
1432 * mle->spinlock
1433 * dlm->master_list
1434 *
1435 * if possible, TRIM THIS DOWN!!!
1436 */
dlm_master_request_handler(struct o2net_msg * msg,u32 len,void * data,void ** ret_data)1437 int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data,
1438 void **ret_data)
1439 {
1440 u8 response = DLM_MASTER_RESP_MAYBE;
1441 struct dlm_ctxt *dlm = data;
1442 struct dlm_lock_resource *res = NULL;
1443 struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
1444 struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
1445 char *name;
1446 unsigned int namelen, hash;
1447 int found, ret;
1448 int set_maybe;
1449 int dispatch_assert = 0;
1450 int dispatched = 0;
1451
1452 if (!dlm_grab(dlm))
1453 return DLM_MASTER_RESP_NO;
1454
1455 if (!dlm_domain_fully_joined(dlm)) {
1456 response = DLM_MASTER_RESP_NO;
1457 goto send_response;
1458 }
1459
1460 name = request->name;
1461 namelen = request->namelen;
1462 hash = dlm_lockid_hash(name, namelen);
1463
1464 if (namelen > DLM_LOCKID_NAME_MAX) {
1465 response = DLM_IVBUFLEN;
1466 goto send_response;
1467 }
1468
1469 way_up_top:
1470 spin_lock(&dlm->spinlock);
1471 res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1472 if (res) {
1473 spin_unlock(&dlm->spinlock);
1474
1475 /* take care of the easy cases up front */
1476 spin_lock(&res->spinlock);
1477 if (res->state & (DLM_LOCK_RES_RECOVERING|
1478 DLM_LOCK_RES_MIGRATING)) {
1479 spin_unlock(&res->spinlock);
1480 mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
1481 "being recovered/migrated\n");
1482 response = DLM_MASTER_RESP_ERROR;
1483 if (mle)
1484 kmem_cache_free(dlm_mle_cache, mle);
1485 goto send_response;
1486 }
1487
1488 if (res->owner == dlm->node_num) {
1489 dlm_lockres_set_refmap_bit(dlm, res, request->node_idx);
1490 spin_unlock(&res->spinlock);
1491 response = DLM_MASTER_RESP_YES;
1492 if (mle)
1493 kmem_cache_free(dlm_mle_cache, mle);
1494
1495 /* this node is the owner.
1496 * there is some extra work that needs to
1497 * happen now. the requesting node has
1498 * caused all nodes up to this one to
1499 * create mles. this node now needs to
1500 * go back and clean those up. */
1501 dispatch_assert = 1;
1502 goto send_response;
1503 } else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1504 spin_unlock(&res->spinlock);
1505 // mlog(0, "node %u is the master\n", res->owner);
1506 response = DLM_MASTER_RESP_NO;
1507 if (mle)
1508 kmem_cache_free(dlm_mle_cache, mle);
1509 goto send_response;
1510 }
1511
1512 /* ok, there is no owner. either this node is
1513 * being blocked, or it is actively trying to
1514 * master this lock. */
1515 if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1516 mlog(ML_ERROR, "lock with no owner should be "
1517 "in-progress!\n");
1518 BUG();
1519 }
1520
1521 // mlog(0, "lockres is in progress...\n");
1522 spin_lock(&dlm->master_lock);
1523 found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1524 if (!found) {
1525 mlog(ML_ERROR, "no mle found for this lock!\n");
1526 BUG();
1527 }
1528 set_maybe = 1;
1529 spin_lock(&tmpmle->spinlock);
1530 if (tmpmle->type == DLM_MLE_BLOCK) {
1531 // mlog(0, "this node is waiting for "
1532 // "lockres to be mastered\n");
1533 response = DLM_MASTER_RESP_NO;
1534 } else if (tmpmle->type == DLM_MLE_MIGRATION) {
1535 mlog(0, "node %u is master, but trying to migrate to "
1536 "node %u.\n", tmpmle->master, tmpmle->new_master);
1537 if (tmpmle->master == dlm->node_num) {
1538 mlog(ML_ERROR, "no owner on lockres, but this "
1539 "node is trying to migrate it to %u?!\n",
1540 tmpmle->new_master);
1541 BUG();
1542 } else {
1543 /* the real master can respond on its own */
1544 response = DLM_MASTER_RESP_NO;
1545 }
1546 } else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {
1547 set_maybe = 0;
1548 if (tmpmle->master == dlm->node_num) {
1549 response = DLM_MASTER_RESP_YES;
1550 /* this node will be the owner.
1551 * go back and clean the mles on any
1552 * other nodes */
1553 dispatch_assert = 1;
1554 dlm_lockres_set_refmap_bit(dlm, res,
1555 request->node_idx);
1556 } else
1557 response = DLM_MASTER_RESP_NO;
1558 } else {
1559 // mlog(0, "this node is attempting to "
1560 // "master lockres\n");
1561 response = DLM_MASTER_RESP_MAYBE;
1562 }
1563 if (set_maybe)
1564 set_bit(request->node_idx, tmpmle->maybe_map);
1565 spin_unlock(&tmpmle->spinlock);
1566
1567 spin_unlock(&dlm->master_lock);
1568 spin_unlock(&res->spinlock);
1569
1570 /* keep the mle attached to heartbeat events */
1571 dlm_put_mle(tmpmle);
1572 if (mle)
1573 kmem_cache_free(dlm_mle_cache, mle);
1574 goto send_response;
1575 }
1576
1577 /*
1578 * lockres doesn't exist on this node
1579 * if there is an MLE_BLOCK, return NO
1580 * if there is an MLE_MASTER, return MAYBE
1581 * otherwise, add an MLE_BLOCK, return NO
1582 */
1583 spin_lock(&dlm->master_lock);
1584 found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1585 if (!found) {
1586 /* this lockid has never been seen on this node yet */
1587 // mlog(0, "no mle found\n");
1588 if (!mle) {
1589 spin_unlock(&dlm->master_lock);
1590 spin_unlock(&dlm->spinlock);
1591
1592 mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
1593 if (!mle) {
1594 response = DLM_MASTER_RESP_ERROR;
1595 mlog_errno(-ENOMEM);
1596 goto send_response;
1597 }
1598 goto way_up_top;
1599 }
1600
1601 // mlog(0, "this is second time thru, already allocated, "
1602 // "add the block.\n");
1603 dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen);
1604 set_bit(request->node_idx, mle->maybe_map);
1605 __dlm_insert_mle(dlm, mle);
1606 response = DLM_MASTER_RESP_NO;
1607 } else {
1608 // mlog(0, "mle was found\n");
1609 set_maybe = 1;
1610 spin_lock(&tmpmle->spinlock);
1611 if (tmpmle->master == dlm->node_num) {
1612 mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n");
1613 BUG();
1614 }
1615 if (tmpmle->type == DLM_MLE_BLOCK)
1616 response = DLM_MASTER_RESP_NO;
1617 else if (tmpmle->type == DLM_MLE_MIGRATION) {
1618 mlog(0, "migration mle was found (%u->%u)\n",
1619 tmpmle->master, tmpmle->new_master);
1620 /* real master can respond on its own */
1621 response = DLM_MASTER_RESP_NO;
1622 } else
1623 response = DLM_MASTER_RESP_MAYBE;
1624 if (set_maybe)
1625 set_bit(request->node_idx, tmpmle->maybe_map);
1626 spin_unlock(&tmpmle->spinlock);
1627 }
1628 spin_unlock(&dlm->master_lock);
1629 spin_unlock(&dlm->spinlock);
1630
1631 if (found) {
1632 /* keep the mle attached to heartbeat events */
1633 dlm_put_mle(tmpmle);
1634 }
1635 send_response:
1636 /*
1637 * __dlm_lookup_lockres() grabbed a reference to this lockres.
1638 * The reference is released by dlm_assert_master_worker() under
1639 * the call to dlm_dispatch_assert_master(). If
1640 * dlm_assert_master_worker() isn't called, we drop it here.
1641 */
1642 if (dispatch_assert) {
1643 if (response != DLM_MASTER_RESP_YES)
1644 mlog(ML_ERROR, "invalid response %d\n", response);
1645 if (!res) {
1646 mlog(ML_ERROR, "bad lockres while trying to assert!\n");
1647 BUG();
1648 }
1649 mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
1650 dlm->node_num, res->lockname.len, res->lockname.name);
1651 ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx,
1652 DLM_ASSERT_MASTER_MLE_CLEANUP);
1653 if (ret < 0) {
1654 mlog(ML_ERROR, "failed to dispatch assert master work\n");
1655 response = DLM_MASTER_RESP_ERROR;
1656 dlm_lockres_put(res);
1657 } else {
1658 dispatched = 1;
1659 dlm_lockres_grab_inflight_worker(dlm, res);
1660 }
1661 } else {
1662 if (res)
1663 dlm_lockres_put(res);
1664 }
1665
1666 if (!dispatched)
1667 dlm_put(dlm);
1668 return response;
1669 }
1670
1671 /*
1672 * DLM_ASSERT_MASTER_MSG
1673 */
1674
1675
1676 /*
1677 * NOTE: this can be used for debugging
1678 * can periodically run all locks owned by this node
1679 * and re-assert across the cluster...
1680 */
dlm_do_assert_master(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,void * nodemap,u32 flags)1681 static int dlm_do_assert_master(struct dlm_ctxt *dlm,
1682 struct dlm_lock_resource *res,
1683 void *nodemap, u32 flags)
1684 {
1685 struct dlm_assert_master assert;
1686 int to, tmpret;
1687 struct dlm_node_iter iter;
1688 int ret = 0;
1689 int reassert;
1690 const char *lockname = res->lockname.name;
1691 unsigned int namelen = res->lockname.len;
1692
1693 BUG_ON(namelen > O2NM_MAX_NAME_LEN);
1694
1695 spin_lock(&res->spinlock);
1696 res->state |= DLM_LOCK_RES_SETREF_INPROG;
1697 spin_unlock(&res->spinlock);
1698
1699 again:
1700 reassert = 0;
1701
1702 /* note that if this nodemap is empty, it returns 0 */
1703 dlm_node_iter_init(nodemap, &iter);
1704 while ((to = dlm_node_iter_next(&iter)) >= 0) {
1705 int r = 0;
1706 struct dlm_master_list_entry *mle = NULL;
1707
1708 mlog(0, "sending assert master to %d (%.*s)\n", to,
1709 namelen, lockname);
1710 memset(&assert, 0, sizeof(assert));
1711 assert.node_idx = dlm->node_num;
1712 assert.namelen = namelen;
1713 memcpy(assert.name, lockname, namelen);
1714 assert.flags = cpu_to_be32(flags);
1715
1716 tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
1717 &assert, sizeof(assert), to, &r);
1718 if (tmpret < 0) {
1719 mlog(ML_ERROR, "Error %d when sending message %u (key "
1720 "0x%x) to node %u\n", tmpret,
1721 DLM_ASSERT_MASTER_MSG, dlm->key, to);
1722 if (!dlm_is_host_down(tmpret)) {
1723 mlog(ML_ERROR, "unhandled error=%d!\n", tmpret);
1724 BUG();
1725 }
1726 /* a node died. finish out the rest of the nodes. */
1727 mlog(0, "link to %d went down!\n", to);
1728 /* any nonzero status return will do */
1729 ret = tmpret;
1730 r = 0;
1731 } else if (r < 0) {
1732 /* ok, something horribly messed. kill thyself. */
1733 mlog(ML_ERROR,"during assert master of %.*s to %u, "
1734 "got %d.\n", namelen, lockname, to, r);
1735 spin_lock(&dlm->spinlock);
1736 spin_lock(&dlm->master_lock);
1737 if (dlm_find_mle(dlm, &mle, (char *)lockname,
1738 namelen)) {
1739 dlm_print_one_mle(mle);
1740 __dlm_put_mle(mle);
1741 }
1742 spin_unlock(&dlm->master_lock);
1743 spin_unlock(&dlm->spinlock);
1744 BUG();
1745 }
1746
1747 if (r & DLM_ASSERT_RESPONSE_REASSERT &&
1748 !(r & DLM_ASSERT_RESPONSE_MASTERY_REF)) {
1749 mlog(ML_ERROR, "%.*s: very strange, "
1750 "master MLE but no lockres on %u\n",
1751 namelen, lockname, to);
1752 }
1753
1754 if (r & DLM_ASSERT_RESPONSE_REASSERT) {
1755 mlog(0, "%.*s: node %u create mles on other "
1756 "nodes and requests a re-assert\n",
1757 namelen, lockname, to);
1758 reassert = 1;
1759 }
1760 if (r & DLM_ASSERT_RESPONSE_MASTERY_REF) {
1761 mlog(0, "%.*s: node %u has a reference to this "
1762 "lockres, set the bit in the refmap\n",
1763 namelen, lockname, to);
1764 spin_lock(&res->spinlock);
1765 dlm_lockres_set_refmap_bit(dlm, res, to);
1766 spin_unlock(&res->spinlock);
1767 }
1768 }
1769
1770 if (reassert)
1771 goto again;
1772
1773 spin_lock(&res->spinlock);
1774 res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
1775 spin_unlock(&res->spinlock);
1776 wake_up(&res->wq);
1777
1778 return ret;
1779 }
1780
1781 /*
1782 * locks that can be taken here:
1783 * dlm->spinlock
1784 * res->spinlock
1785 * mle->spinlock
1786 * dlm->master_list
1787 *
1788 * if possible, TRIM THIS DOWN!!!
1789 */
dlm_assert_master_handler(struct o2net_msg * msg,u32 len,void * data,void ** ret_data)1790 int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data,
1791 void **ret_data)
1792 {
1793 struct dlm_ctxt *dlm = data;
1794 struct dlm_master_list_entry *mle = NULL;
1795 struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;
1796 struct dlm_lock_resource *res = NULL;
1797 char *name;
1798 unsigned int namelen, hash;
1799 u32 flags;
1800 int master_request = 0, have_lockres_ref = 0;
1801 int ret = 0;
1802
1803 if (!dlm_grab(dlm))
1804 return 0;
1805
1806 name = assert->name;
1807 namelen = assert->namelen;
1808 hash = dlm_lockid_hash(name, namelen);
1809 flags = be32_to_cpu(assert->flags);
1810
1811 if (namelen > DLM_LOCKID_NAME_MAX) {
1812 mlog(ML_ERROR, "Invalid name length!");
1813 goto done;
1814 }
1815
1816 spin_lock(&dlm->spinlock);
1817
1818 if (flags)
1819 mlog(0, "assert_master with flags: %u\n", flags);
1820
1821 /* find the MLE */
1822 spin_lock(&dlm->master_lock);
1823 if (!dlm_find_mle(dlm, &mle, name, namelen)) {
1824 /* not an error, could be master just re-asserting */
1825 mlog(0, "just got an assert_master from %u, but no "
1826 "MLE for it! (%.*s)\n", assert->node_idx,
1827 namelen, name);
1828 } else {
1829 int bit = find_next_bit (mle->maybe_map, O2NM_MAX_NODES, 0);
1830 if (bit >= O2NM_MAX_NODES) {
1831 /* not necessarily an error, though less likely.
1832 * could be master just re-asserting. */
1833 mlog(0, "no bits set in the maybe_map, but %u "
1834 "is asserting! (%.*s)\n", assert->node_idx,
1835 namelen, name);
1836 } else if (bit != assert->node_idx) {
1837 if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1838 mlog(0, "master %u was found, %u should "
1839 "back off\n", assert->node_idx, bit);
1840 } else {
1841 /* with the fix for bug 569, a higher node
1842 * number winning the mastery will respond
1843 * YES to mastery requests, but this node
1844 * had no way of knowing. let it pass. */
1845 mlog(0, "%u is the lowest node, "
1846 "%u is asserting. (%.*s) %u must "
1847 "have begun after %u won.\n", bit,
1848 assert->node_idx, namelen, name, bit,
1849 assert->node_idx);
1850 }
1851 }
1852 if (mle->type == DLM_MLE_MIGRATION) {
1853 if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1854 mlog(0, "%s:%.*s: got cleanup assert"
1855 " from %u for migration\n",
1856 dlm->name, namelen, name,
1857 assert->node_idx);
1858 } else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) {
1859 mlog(0, "%s:%.*s: got unrelated assert"
1860 " from %u for migration, ignoring\n",
1861 dlm->name, namelen, name,
1862 assert->node_idx);
1863 __dlm_put_mle(mle);
1864 spin_unlock(&dlm->master_lock);
1865 spin_unlock(&dlm->spinlock);
1866 goto done;
1867 }
1868 }
1869 }
1870 spin_unlock(&dlm->master_lock);
1871
1872 /* ok everything checks out with the MLE
1873 * now check to see if there is a lockres */
1874 res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1875 if (res) {
1876 spin_lock(&res->spinlock);
1877 if (res->state & DLM_LOCK_RES_RECOVERING) {
1878 mlog(ML_ERROR, "%u asserting but %.*s is "
1879 "RECOVERING!\n", assert->node_idx, namelen, name);
1880 goto kill;
1881 }
1882 if (!mle) {
1883 if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN &&
1884 res->owner != assert->node_idx) {
1885 mlog(ML_ERROR, "DIE! Mastery assert from %u, "
1886 "but current owner is %u! (%.*s)\n",
1887 assert->node_idx, res->owner, namelen,
1888 name);
1889 __dlm_print_one_lock_resource(res);
1890 BUG();
1891 }
1892 } else if (mle->type != DLM_MLE_MIGRATION) {
1893 if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1894 /* owner is just re-asserting */
1895 if (res->owner == assert->node_idx) {
1896 mlog(0, "owner %u re-asserting on "
1897 "lock %.*s\n", assert->node_idx,
1898 namelen, name);
1899 goto ok;
1900 }
1901 mlog(ML_ERROR, "got assert_master from "
1902 "node %u, but %u is the owner! "
1903 "(%.*s)\n", assert->node_idx,
1904 res->owner, namelen, name);
1905 goto kill;
1906 }
1907 if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1908 mlog(ML_ERROR, "got assert from %u, but lock "
1909 "with no owner should be "
1910 "in-progress! (%.*s)\n",
1911 assert->node_idx,
1912 namelen, name);
1913 goto kill;
1914 }
1915 } else /* mle->type == DLM_MLE_MIGRATION */ {
1916 /* should only be getting an assert from new master */
1917 if (assert->node_idx != mle->new_master) {
1918 mlog(ML_ERROR, "got assert from %u, but "
1919 "new master is %u, and old master "
1920 "was %u (%.*s)\n",
1921 assert->node_idx, mle->new_master,
1922 mle->master, namelen, name);
1923 goto kill;
1924 }
1925
1926 }
1927 ok:
1928 spin_unlock(&res->spinlock);
1929 }
1930
1931 // mlog(0, "woo! got an assert_master from node %u!\n",
1932 // assert->node_idx);
1933 if (mle) {
1934 int extra_ref = 0;
1935 int nn = -1;
1936 int rr, err = 0;
1937
1938 spin_lock(&mle->spinlock);
1939 if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
1940 extra_ref = 1;
1941 else {
1942 /* MASTER mle: if any bits set in the response map
1943 * then the calling node needs to re-assert to clear
1944 * up nodes that this node contacted */
1945 while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES,
1946 nn+1)) < O2NM_MAX_NODES) {
1947 if (nn != dlm->node_num && nn != assert->node_idx) {
1948 master_request = 1;
1949 break;
1950 }
1951 }
1952 }
1953 mle->master = assert->node_idx;
1954 atomic_set(&mle->woken, 1);
1955 wake_up(&mle->wq);
1956 spin_unlock(&mle->spinlock);
1957
1958 if (res) {
1959 int wake = 0;
1960 spin_lock(&res->spinlock);
1961 if (mle->type == DLM_MLE_MIGRATION) {
1962 mlog(0, "finishing off migration of lockres %.*s, "
1963 "from %u to %u\n",
1964 res->lockname.len, res->lockname.name,
1965 dlm->node_num, mle->new_master);
1966 res->state &= ~DLM_LOCK_RES_MIGRATING;
1967 wake = 1;
1968 dlm_change_lockres_owner(dlm, res, mle->new_master);
1969 BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
1970 } else {
1971 dlm_change_lockres_owner(dlm, res, mle->master);
1972 }
1973 spin_unlock(&res->spinlock);
1974 have_lockres_ref = 1;
1975 if (wake)
1976 wake_up(&res->wq);
1977 }
1978
1979 /* master is known, detach if not already detached.
1980 * ensures that only one assert_master call will happen
1981 * on this mle. */
1982 spin_lock(&dlm->master_lock);
1983
1984 rr = atomic_read(&mle->mle_refs.refcount);
1985 if (mle->inuse > 0) {
1986 if (extra_ref && rr < 3)
1987 err = 1;
1988 else if (!extra_ref && rr < 2)
1989 err = 1;
1990 } else {
1991 if (extra_ref && rr < 2)
1992 err = 1;
1993 else if (!extra_ref && rr < 1)
1994 err = 1;
1995 }
1996 if (err) {
1997 mlog(ML_ERROR, "%s:%.*s: got assert master from %u "
1998 "that will mess up this node, refs=%d, extra=%d, "
1999 "inuse=%d\n", dlm->name, namelen, name,
2000 assert->node_idx, rr, extra_ref, mle->inuse);
2001 dlm_print_one_mle(mle);
2002 }
2003 __dlm_unlink_mle(dlm, mle);
2004 __dlm_mle_detach_hb_events(dlm, mle);
2005 __dlm_put_mle(mle);
2006 if (extra_ref) {
2007 /* the assert master message now balances the extra
2008 * ref given by the master / migration request message.
2009 * if this is the last put, it will be removed
2010 * from the list. */
2011 __dlm_put_mle(mle);
2012 }
2013 spin_unlock(&dlm->master_lock);
2014 } else if (res) {
2015 if (res->owner != assert->node_idx) {
2016 mlog(0, "assert_master from %u, but current "
2017 "owner is %u (%.*s), no mle\n", assert->node_idx,
2018 res->owner, namelen, name);
2019 }
2020 }
2021 spin_unlock(&dlm->spinlock);
2022
2023 done:
2024 ret = 0;
2025 if (res) {
2026 spin_lock(&res->spinlock);
2027 res->state |= DLM_LOCK_RES_SETREF_INPROG;
2028 spin_unlock(&res->spinlock);
2029 *ret_data = (void *)res;
2030 }
2031 dlm_put(dlm);
2032 if (master_request) {
2033 mlog(0, "need to tell master to reassert\n");
2034 /* positive. negative would shoot down the node. */
2035 ret |= DLM_ASSERT_RESPONSE_REASSERT;
2036 if (!have_lockres_ref) {
2037 mlog(ML_ERROR, "strange, got assert from %u, MASTER "
2038 "mle present here for %s:%.*s, but no lockres!\n",
2039 assert->node_idx, dlm->name, namelen, name);
2040 }
2041 }
2042 if (have_lockres_ref) {
2043 /* let the master know we have a reference to the lockres */
2044 ret |= DLM_ASSERT_RESPONSE_MASTERY_REF;
2045 mlog(0, "%s:%.*s: got assert from %u, need a ref\n",
2046 dlm->name, namelen, name, assert->node_idx);
2047 }
2048 return ret;
2049
2050 kill:
2051 /* kill the caller! */
2052 mlog(ML_ERROR, "Bad message received from another node. Dumping state "
2053 "and killing the other node now! This node is OK and can continue.\n");
2054 __dlm_print_one_lock_resource(res);
2055 spin_unlock(&res->spinlock);
2056 spin_lock(&dlm->master_lock);
2057 if (mle)
2058 __dlm_put_mle(mle);
2059 spin_unlock(&dlm->master_lock);
2060 spin_unlock(&dlm->spinlock);
2061 *ret_data = (void *)res;
2062 dlm_put(dlm);
2063 return -EINVAL;
2064 }
2065
dlm_assert_master_post_handler(int status,void * data,void * ret_data)2066 void dlm_assert_master_post_handler(int status, void *data, void *ret_data)
2067 {
2068 struct dlm_lock_resource *res = (struct dlm_lock_resource *)ret_data;
2069
2070 if (ret_data) {
2071 spin_lock(&res->spinlock);
2072 res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
2073 spin_unlock(&res->spinlock);
2074 wake_up(&res->wq);
2075 dlm_lockres_put(res);
2076 }
2077 return;
2078 }
2079
dlm_dispatch_assert_master(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,int ignore_higher,u8 request_from,u32 flags)2080 int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
2081 struct dlm_lock_resource *res,
2082 int ignore_higher, u8 request_from, u32 flags)
2083 {
2084 struct dlm_work_item *item;
2085 item = kzalloc(sizeof(*item), GFP_ATOMIC);
2086 if (!item)
2087 return -ENOMEM;
2088
2089
2090 /* queue up work for dlm_assert_master_worker */
2091 dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);
2092 item->u.am.lockres = res; /* already have a ref */
2093 /* can optionally ignore node numbers higher than this node */
2094 item->u.am.ignore_higher = ignore_higher;
2095 item->u.am.request_from = request_from;
2096 item->u.am.flags = flags;
2097
2098 if (ignore_higher)
2099 mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len,
2100 res->lockname.name);
2101
2102 spin_lock(&dlm->work_lock);
2103 list_add_tail(&item->list, &dlm->work_list);
2104 spin_unlock(&dlm->work_lock);
2105
2106 queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2107 return 0;
2108 }
2109
dlm_assert_master_worker(struct dlm_work_item * item,void * data)2110 static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
2111 {
2112 struct dlm_ctxt *dlm = data;
2113 int ret = 0;
2114 struct dlm_lock_resource *res;
2115 unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)];
2116 int ignore_higher;
2117 int bit;
2118 u8 request_from;
2119 u32 flags;
2120
2121 dlm = item->dlm;
2122 res = item->u.am.lockres;
2123 ignore_higher = item->u.am.ignore_higher;
2124 request_from = item->u.am.request_from;
2125 flags = item->u.am.flags;
2126
2127 spin_lock(&dlm->spinlock);
2128 memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
2129 spin_unlock(&dlm->spinlock);
2130
2131 clear_bit(dlm->node_num, nodemap);
2132 if (ignore_higher) {
2133 /* if is this just to clear up mles for nodes below
2134 * this node, do not send the message to the original
2135 * caller or any node number higher than this */
2136 clear_bit(request_from, nodemap);
2137 bit = dlm->node_num;
2138 while (1) {
2139 bit = find_next_bit(nodemap, O2NM_MAX_NODES,
2140 bit+1);
2141 if (bit >= O2NM_MAX_NODES)
2142 break;
2143 clear_bit(bit, nodemap);
2144 }
2145 }
2146
2147 /*
2148 * If we're migrating this lock to someone else, we are no
2149 * longer allowed to assert out own mastery. OTOH, we need to
2150 * prevent migration from starting while we're still asserting
2151 * our dominance. The reserved ast delays migration.
2152 */
2153 spin_lock(&res->spinlock);
2154 if (res->state & DLM_LOCK_RES_MIGRATING) {
2155 mlog(0, "Someone asked us to assert mastery, but we're "
2156 "in the middle of migration. Skipping assert, "
2157 "the new master will handle that.\n");
2158 spin_unlock(&res->spinlock);
2159 goto put;
2160 } else
2161 __dlm_lockres_reserve_ast(res);
2162 spin_unlock(&res->spinlock);
2163
2164 /* this call now finishes out the nodemap
2165 * even if one or more nodes die */
2166 mlog(0, "worker about to master %.*s here, this=%u\n",
2167 res->lockname.len, res->lockname.name, dlm->node_num);
2168 ret = dlm_do_assert_master(dlm, res, nodemap, flags);
2169 if (ret < 0) {
2170 /* no need to restart, we are done */
2171 if (!dlm_is_host_down(ret))
2172 mlog_errno(ret);
2173 }
2174
2175 /* Ok, we've asserted ourselves. Let's let migration start. */
2176 dlm_lockres_release_ast(dlm, res);
2177
2178 put:
2179 dlm_lockres_drop_inflight_worker(dlm, res);
2180
2181 dlm_lockres_put(res);
2182
2183 mlog(0, "finished with dlm_assert_master_worker\n");
2184 }
2185
2186 /* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
2187 * We cannot wait for node recovery to complete to begin mastering this
2188 * lockres because this lockres is used to kick off recovery! ;-)
2189 * So, do a pre-check on all living nodes to see if any of those nodes
2190 * think that $RECOVERY is currently mastered by a dead node. If so,
2191 * we wait a short time to allow that node to get notified by its own
2192 * heartbeat stack, then check again. All $RECOVERY lock resources
2193 * mastered by dead nodes are purged when the hearbeat callback is
2194 * fired, so we can know for sure that it is safe to continue once
2195 * the node returns a live node or no node. */
dlm_pre_master_reco_lockres(struct dlm_ctxt * dlm,struct dlm_lock_resource * res)2196 static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
2197 struct dlm_lock_resource *res)
2198 {
2199 struct dlm_node_iter iter;
2200 int nodenum;
2201 int ret = 0;
2202 u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;
2203
2204 spin_lock(&dlm->spinlock);
2205 dlm_node_iter_init(dlm->domain_map, &iter);
2206 spin_unlock(&dlm->spinlock);
2207
2208 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
2209 /* do not send to self */
2210 if (nodenum == dlm->node_num)
2211 continue;
2212 ret = dlm_do_master_requery(dlm, res, nodenum, &master);
2213 if (ret < 0) {
2214 mlog_errno(ret);
2215 if (!dlm_is_host_down(ret))
2216 BUG();
2217 /* host is down, so answer for that node would be
2218 * DLM_LOCK_RES_OWNER_UNKNOWN. continue. */
2219 ret = 0;
2220 }
2221
2222 if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
2223 /* check to see if this master is in the recovery map */
2224 spin_lock(&dlm->spinlock);
2225 if (test_bit(master, dlm->recovery_map)) {
2226 mlog(ML_NOTICE, "%s: node %u has not seen "
2227 "node %u go down yet, and thinks the "
2228 "dead node is mastering the recovery "
2229 "lock. must wait.\n", dlm->name,
2230 nodenum, master);
2231 ret = -EAGAIN;
2232 }
2233 spin_unlock(&dlm->spinlock);
2234 mlog(0, "%s: reco lock master is %u\n", dlm->name,
2235 master);
2236 break;
2237 }
2238 }
2239 return ret;
2240 }
2241
2242 /*
2243 * DLM_DEREF_LOCKRES_MSG
2244 */
2245
dlm_drop_lockres_ref(struct dlm_ctxt * dlm,struct dlm_lock_resource * res)2246 int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2247 {
2248 struct dlm_deref_lockres deref;
2249 int ret = 0, r;
2250 const char *lockname;
2251 unsigned int namelen;
2252
2253 lockname = res->lockname.name;
2254 namelen = res->lockname.len;
2255 BUG_ON(namelen > O2NM_MAX_NAME_LEN);
2256
2257 memset(&deref, 0, sizeof(deref));
2258 deref.node_idx = dlm->node_num;
2259 deref.namelen = namelen;
2260 memcpy(deref.name, lockname, namelen);
2261
2262 ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,
2263 &deref, sizeof(deref), res->owner, &r);
2264 if (ret < 0)
2265 mlog(ML_ERROR, "%s: res %.*s, error %d send DEREF to node %u\n",
2266 dlm->name, namelen, lockname, ret, res->owner);
2267 else if (r < 0) {
2268 /* BAD. other node says I did not have a ref. */
2269 mlog(ML_ERROR, "%s: res %.*s, DEREF to node %u got %d\n",
2270 dlm->name, namelen, lockname, res->owner, r);
2271 dlm_print_one_lock_resource(res);
2272 BUG();
2273 }
2274 return ret;
2275 }
2276
dlm_deref_lockres_handler(struct o2net_msg * msg,u32 len,void * data,void ** ret_data)2277 int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
2278 void **ret_data)
2279 {
2280 struct dlm_ctxt *dlm = data;
2281 struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf;
2282 struct dlm_lock_resource *res = NULL;
2283 char *name;
2284 unsigned int namelen;
2285 int ret = -EINVAL;
2286 u8 node;
2287 unsigned int hash;
2288 struct dlm_work_item *item;
2289 int cleared = 0;
2290 int dispatch = 0;
2291
2292 if (!dlm_grab(dlm))
2293 return 0;
2294
2295 name = deref->name;
2296 namelen = deref->namelen;
2297 node = deref->node_idx;
2298
2299 if (namelen > DLM_LOCKID_NAME_MAX) {
2300 mlog(ML_ERROR, "Invalid name length!");
2301 goto done;
2302 }
2303 if (deref->node_idx >= O2NM_MAX_NODES) {
2304 mlog(ML_ERROR, "Invalid node number: %u\n", node);
2305 goto done;
2306 }
2307
2308 hash = dlm_lockid_hash(name, namelen);
2309
2310 spin_lock(&dlm->spinlock);
2311 res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
2312 if (!res) {
2313 spin_unlock(&dlm->spinlock);
2314 mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
2315 dlm->name, namelen, name);
2316 goto done;
2317 }
2318 spin_unlock(&dlm->spinlock);
2319
2320 spin_lock(&res->spinlock);
2321 if (res->state & DLM_LOCK_RES_SETREF_INPROG)
2322 dispatch = 1;
2323 else {
2324 BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2325 if (test_bit(node, res->refmap)) {
2326 dlm_lockres_clear_refmap_bit(dlm, res, node);
2327 cleared = 1;
2328 }
2329 }
2330 spin_unlock(&res->spinlock);
2331
2332 if (!dispatch) {
2333 if (cleared)
2334 dlm_lockres_calc_usage(dlm, res);
2335 else {
2336 mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2337 "but it is already dropped!\n", dlm->name,
2338 res->lockname.len, res->lockname.name, node);
2339 dlm_print_one_lock_resource(res);
2340 }
2341 ret = 0;
2342 goto done;
2343 }
2344
2345 item = kzalloc(sizeof(*item), GFP_NOFS);
2346 if (!item) {
2347 ret = -ENOMEM;
2348 mlog_errno(ret);
2349 goto done;
2350 }
2351
2352 dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL);
2353 item->u.dl.deref_res = res;
2354 item->u.dl.deref_node = node;
2355
2356 spin_lock(&dlm->work_lock);
2357 list_add_tail(&item->list, &dlm->work_list);
2358 spin_unlock(&dlm->work_lock);
2359
2360 queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2361 return 0;
2362
2363 done:
2364 if (res)
2365 dlm_lockres_put(res);
2366 dlm_put(dlm);
2367
2368 return ret;
2369 }
2370
dlm_deref_lockres_worker(struct dlm_work_item * item,void * data)2371 static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
2372 {
2373 struct dlm_ctxt *dlm;
2374 struct dlm_lock_resource *res;
2375 u8 node;
2376 u8 cleared = 0;
2377
2378 dlm = item->dlm;
2379 res = item->u.dl.deref_res;
2380 node = item->u.dl.deref_node;
2381
2382 spin_lock(&res->spinlock);
2383 BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2384 if (test_bit(node, res->refmap)) {
2385 __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
2386 dlm_lockres_clear_refmap_bit(dlm, res, node);
2387 cleared = 1;
2388 }
2389 spin_unlock(&res->spinlock);
2390
2391 if (cleared) {
2392 mlog(0, "%s:%.*s node %u ref dropped in dispatch\n",
2393 dlm->name, res->lockname.len, res->lockname.name, node);
2394 dlm_lockres_calc_usage(dlm, res);
2395 } else {
2396 mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2397 "but it is already dropped!\n", dlm->name,
2398 res->lockname.len, res->lockname.name, node);
2399 dlm_print_one_lock_resource(res);
2400 }
2401
2402 dlm_lockres_put(res);
2403 }
2404
2405 /*
2406 * A migrateable resource is one that is :
2407 * 1. locally mastered, and,
2408 * 2. zero local locks, and,
2409 * 3. one or more non-local locks, or, one or more references
2410 * Returns 1 if yes, 0 if not.
2411 */
dlm_is_lockres_migrateable(struct dlm_ctxt * dlm,struct dlm_lock_resource * res)2412 static int dlm_is_lockres_migrateable(struct dlm_ctxt *dlm,
2413 struct dlm_lock_resource *res)
2414 {
2415 enum dlm_lockres_list idx;
2416 int nonlocal = 0, node_ref;
2417 struct list_head *queue;
2418 struct dlm_lock *lock;
2419 u64 cookie;
2420
2421 assert_spin_locked(&res->spinlock);
2422
2423 /* delay migration when the lockres is in MIGRATING state */
2424 if (res->state & DLM_LOCK_RES_MIGRATING)
2425 return 0;
2426
2427 /* delay migration when the lockres is in RECOCERING state */
2428 if (res->state & DLM_LOCK_RES_RECOVERING)
2429 return 0;
2430
2431 if (res->owner != dlm->node_num)
2432 return 0;
2433
2434 for (idx = DLM_GRANTED_LIST; idx <= DLM_BLOCKED_LIST; idx++) {
2435 queue = dlm_list_idx_to_ptr(res, idx);
2436 list_for_each_entry(lock, queue, list) {
2437 if (lock->ml.node != dlm->node_num) {
2438 nonlocal++;
2439 continue;
2440 }
2441 cookie = be64_to_cpu(lock->ml.cookie);
2442 mlog(0, "%s: Not migrateable res %.*s, lock %u:%llu on "
2443 "%s list\n", dlm->name, res->lockname.len,
2444 res->lockname.name,
2445 dlm_get_lock_cookie_node(cookie),
2446 dlm_get_lock_cookie_seq(cookie),
2447 dlm_list_in_text(idx));
2448 return 0;
2449 }
2450 }
2451
2452 if (!nonlocal) {
2453 node_ref = find_next_bit(res->refmap, O2NM_MAX_NODES, 0);
2454 if (node_ref >= O2NM_MAX_NODES)
2455 return 0;
2456 }
2457
2458 mlog(0, "%s: res %.*s, Migrateable\n", dlm->name, res->lockname.len,
2459 res->lockname.name);
2460
2461 return 1;
2462 }
2463
2464 /*
2465 * DLM_MIGRATE_LOCKRES
2466 */
2467
2468
dlm_migrate_lockres(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,u8 target)2469 static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
2470 struct dlm_lock_resource *res, u8 target)
2471 {
2472 struct dlm_master_list_entry *mle = NULL;
2473 struct dlm_master_list_entry *oldmle = NULL;
2474 struct dlm_migratable_lockres *mres = NULL;
2475 int ret = 0;
2476 const char *name;
2477 unsigned int namelen;
2478 int mle_added = 0;
2479 int wake = 0;
2480
2481 if (!dlm_grab(dlm))
2482 return -EINVAL;
2483
2484 BUG_ON(target == O2NM_MAX_NODES);
2485
2486 name = res->lockname.name;
2487 namelen = res->lockname.len;
2488
2489 mlog(0, "%s: Migrating %.*s to node %u\n", dlm->name, namelen, name,
2490 target);
2491
2492 /* preallocate up front. if this fails, abort */
2493 ret = -ENOMEM;
2494 mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_NOFS);
2495 if (!mres) {
2496 mlog_errno(ret);
2497 goto leave;
2498 }
2499
2500 mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
2501 if (!mle) {
2502 mlog_errno(ret);
2503 goto leave;
2504 }
2505 ret = 0;
2506
2507 /*
2508 * clear any existing master requests and
2509 * add the migration mle to the list
2510 */
2511 spin_lock(&dlm->spinlock);
2512 spin_lock(&dlm->master_lock);
2513 ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name,
2514 namelen, target, dlm->node_num);
2515 /* get an extra reference on the mle.
2516 * otherwise the assert_master from the new
2517 * master will destroy this.
2518 */
2519 dlm_get_mle_inuse(mle);
2520 spin_unlock(&dlm->master_lock);
2521 spin_unlock(&dlm->spinlock);
2522
2523 if (ret == -EEXIST) {
2524 mlog(0, "another process is already migrating it\n");
2525 goto fail;
2526 }
2527 mle_added = 1;
2528
2529 /*
2530 * set the MIGRATING flag and flush asts
2531 * if we fail after this we need to re-dirty the lockres
2532 */
2533 if (dlm_mark_lockres_migrating(dlm, res, target) < 0) {
2534 mlog(ML_ERROR, "tried to migrate %.*s to %u, but "
2535 "the target went down.\n", res->lockname.len,
2536 res->lockname.name, target);
2537 spin_lock(&res->spinlock);
2538 res->state &= ~DLM_LOCK_RES_MIGRATING;
2539 wake = 1;
2540 spin_unlock(&res->spinlock);
2541 ret = -EINVAL;
2542 }
2543
2544 fail:
2545 if (oldmle) {
2546 /* master is known, detach if not already detached */
2547 dlm_mle_detach_hb_events(dlm, oldmle);
2548 dlm_put_mle(oldmle);
2549 }
2550
2551 if (ret < 0) {
2552 if (mle_added) {
2553 dlm_mle_detach_hb_events(dlm, mle);
2554 dlm_put_mle(mle);
2555 dlm_put_mle_inuse(mle);
2556 } else if (mle) {
2557 kmem_cache_free(dlm_mle_cache, mle);
2558 mle = NULL;
2559 }
2560 goto leave;
2561 }
2562
2563 /*
2564 * at this point, we have a migration target, an mle
2565 * in the master list, and the MIGRATING flag set on
2566 * the lockres
2567 */
2568
2569 /* now that remote nodes are spinning on the MIGRATING flag,
2570 * ensure that all assert_master work is flushed. */
2571 flush_workqueue(dlm->dlm_worker);
2572
2573 /* notify new node and send all lock state */
2574 /* call send_one_lockres with migration flag.
2575 * this serves as notice to the target node that a
2576 * migration is starting. */
2577 ret = dlm_send_one_lockres(dlm, res, mres, target,
2578 DLM_MRES_MIGRATION);
2579
2580 if (ret < 0) {
2581 mlog(0, "migration to node %u failed with %d\n",
2582 target, ret);
2583 /* migration failed, detach and clean up mle */
2584 dlm_mle_detach_hb_events(dlm, mle);
2585 dlm_put_mle(mle);
2586 dlm_put_mle_inuse(mle);
2587 spin_lock(&res->spinlock);
2588 res->state &= ~DLM_LOCK_RES_MIGRATING;
2589 wake = 1;
2590 spin_unlock(&res->spinlock);
2591 if (dlm_is_host_down(ret))
2592 dlm_wait_for_node_death(dlm, target,
2593 DLM_NODE_DEATH_WAIT_MAX);
2594 goto leave;
2595 }
2596
2597 /* at this point, the target sends a message to all nodes,
2598 * (using dlm_do_migrate_request). this node is skipped since
2599 * we had to put an mle in the list to begin the process. this
2600 * node now waits for target to do an assert master. this node
2601 * will be the last one notified, ensuring that the migration
2602 * is complete everywhere. if the target dies while this is
2603 * going on, some nodes could potentially see the target as the
2604 * master, so it is important that my recovery finds the migration
2605 * mle and sets the master to UNKNOWN. */
2606
2607
2608 /* wait for new node to assert master */
2609 while (1) {
2610 ret = wait_event_interruptible_timeout(mle->wq,
2611 (atomic_read(&mle->woken) == 1),
2612 msecs_to_jiffies(5000));
2613
2614 if (ret >= 0) {
2615 if (atomic_read(&mle->woken) == 1 ||
2616 res->owner == target)
2617 break;
2618
2619 mlog(0, "%s:%.*s: timed out during migration\n",
2620 dlm->name, res->lockname.len, res->lockname.name);
2621 /* avoid hang during shutdown when migrating lockres
2622 * to a node which also goes down */
2623 if (dlm_is_node_dead(dlm, target)) {
2624 mlog(0, "%s:%.*s: expected migration "
2625 "target %u is no longer up, restarting\n",
2626 dlm->name, res->lockname.len,
2627 res->lockname.name, target);
2628 ret = -EINVAL;
2629 /* migration failed, detach and clean up mle */
2630 dlm_mle_detach_hb_events(dlm, mle);
2631 dlm_put_mle(mle);
2632 dlm_put_mle_inuse(mle);
2633 spin_lock(&res->spinlock);
2634 res->state &= ~DLM_LOCK_RES_MIGRATING;
2635 wake = 1;
2636 spin_unlock(&res->spinlock);
2637 goto leave;
2638 }
2639 } else
2640 mlog(0, "%s:%.*s: caught signal during migration\n",
2641 dlm->name, res->lockname.len, res->lockname.name);
2642 }
2643
2644 /* all done, set the owner, clear the flag */
2645 spin_lock(&res->spinlock);
2646 dlm_set_lockres_owner(dlm, res, target);
2647 res->state &= ~DLM_LOCK_RES_MIGRATING;
2648 dlm_remove_nonlocal_locks(dlm, res);
2649 spin_unlock(&res->spinlock);
2650 wake_up(&res->wq);
2651
2652 /* master is known, detach if not already detached */
2653 dlm_mle_detach_hb_events(dlm, mle);
2654 dlm_put_mle_inuse(mle);
2655 ret = 0;
2656
2657 dlm_lockres_calc_usage(dlm, res);
2658
2659 leave:
2660 /* re-dirty the lockres if we failed */
2661 if (ret < 0)
2662 dlm_kick_thread(dlm, res);
2663
2664 /* wake up waiters if the MIGRATING flag got set
2665 * but migration failed */
2666 if (wake)
2667 wake_up(&res->wq);
2668
2669 if (mres)
2670 free_page((unsigned long)mres);
2671
2672 dlm_put(dlm);
2673
2674 mlog(0, "%s: Migrating %.*s to %u, returns %d\n", dlm->name, namelen,
2675 name, target, ret);
2676 return ret;
2677 }
2678
2679 #define DLM_MIGRATION_RETRY_MS 100
2680
2681 /*
2682 * Should be called only after beginning the domain leave process.
2683 * There should not be any remaining locks on nonlocal lock resources,
2684 * and there should be no local locks left on locally mastered resources.
2685 *
2686 * Called with the dlm spinlock held, may drop it to do migration, but
2687 * will re-acquire before exit.
2688 *
2689 * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped
2690 */
dlm_empty_lockres(struct dlm_ctxt * dlm,struct dlm_lock_resource * res)2691 int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2692 {
2693 int ret;
2694 int lock_dropped = 0;
2695 u8 target = O2NM_MAX_NODES;
2696
2697 assert_spin_locked(&dlm->spinlock);
2698
2699 spin_lock(&res->spinlock);
2700 if (dlm_is_lockres_migrateable(dlm, res))
2701 target = dlm_pick_migration_target(dlm, res);
2702 spin_unlock(&res->spinlock);
2703
2704 if (target == O2NM_MAX_NODES)
2705 goto leave;
2706
2707 /* Wheee! Migrate lockres here! Will sleep so drop spinlock. */
2708 spin_unlock(&dlm->spinlock);
2709 lock_dropped = 1;
2710 ret = dlm_migrate_lockres(dlm, res, target);
2711 if (ret)
2712 mlog(0, "%s: res %.*s, Migrate to node %u failed with %d\n",
2713 dlm->name, res->lockname.len, res->lockname.name,
2714 target, ret);
2715 spin_lock(&dlm->spinlock);
2716 leave:
2717 return lock_dropped;
2718 }
2719
dlm_lock_basts_flushed(struct dlm_ctxt * dlm,struct dlm_lock * lock)2720 int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
2721 {
2722 int ret;
2723 spin_lock(&dlm->ast_lock);
2724 spin_lock(&lock->spinlock);
2725 ret = (list_empty(&lock->bast_list) && !lock->bast_pending);
2726 spin_unlock(&lock->spinlock);
2727 spin_unlock(&dlm->ast_lock);
2728 return ret;
2729 }
2730
dlm_migration_can_proceed(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,u8 mig_target)2731 static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
2732 struct dlm_lock_resource *res,
2733 u8 mig_target)
2734 {
2735 int can_proceed;
2736 spin_lock(&res->spinlock);
2737 can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING);
2738 spin_unlock(&res->spinlock);
2739
2740 /* target has died, so make the caller break out of the
2741 * wait_event, but caller must recheck the domain_map */
2742 spin_lock(&dlm->spinlock);
2743 if (!test_bit(mig_target, dlm->domain_map))
2744 can_proceed = 1;
2745 spin_unlock(&dlm->spinlock);
2746 return can_proceed;
2747 }
2748
dlm_lockres_is_dirty(struct dlm_ctxt * dlm,struct dlm_lock_resource * res)2749 static int dlm_lockres_is_dirty(struct dlm_ctxt *dlm,
2750 struct dlm_lock_resource *res)
2751 {
2752 int ret;
2753 spin_lock(&res->spinlock);
2754 ret = !!(res->state & DLM_LOCK_RES_DIRTY);
2755 spin_unlock(&res->spinlock);
2756 return ret;
2757 }
2758
2759
dlm_mark_lockres_migrating(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,u8 target)2760 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
2761 struct dlm_lock_resource *res,
2762 u8 target)
2763 {
2764 int ret = 0;
2765
2766 mlog(0, "dlm_mark_lockres_migrating: %.*s, from %u to %u\n",
2767 res->lockname.len, res->lockname.name, dlm->node_num,
2768 target);
2769 /* need to set MIGRATING flag on lockres. this is done by
2770 * ensuring that all asts have been flushed for this lockres. */
2771 spin_lock(&res->spinlock);
2772 BUG_ON(res->migration_pending);
2773 res->migration_pending = 1;
2774 /* strategy is to reserve an extra ast then release
2775 * it below, letting the release do all of the work */
2776 __dlm_lockres_reserve_ast(res);
2777 spin_unlock(&res->spinlock);
2778
2779 /* now flush all the pending asts */
2780 dlm_kick_thread(dlm, res);
2781 /* before waiting on DIRTY, block processes which may
2782 * try to dirty the lockres before MIGRATING is set */
2783 spin_lock(&res->spinlock);
2784 BUG_ON(res->state & DLM_LOCK_RES_BLOCK_DIRTY);
2785 res->state |= DLM_LOCK_RES_BLOCK_DIRTY;
2786 spin_unlock(&res->spinlock);
2787 /* now wait on any pending asts and the DIRTY state */
2788 wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
2789 dlm_lockres_release_ast(dlm, res);
2790
2791 mlog(0, "about to wait on migration_wq, dirty=%s\n",
2792 res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
2793 /* if the extra ref we just put was the final one, this
2794 * will pass thru immediately. otherwise, we need to wait
2795 * for the last ast to finish. */
2796 again:
2797 ret = wait_event_interruptible_timeout(dlm->migration_wq,
2798 dlm_migration_can_proceed(dlm, res, target),
2799 msecs_to_jiffies(1000));
2800 if (ret < 0) {
2801 mlog(0, "woken again: migrating? %s, dead? %s\n",
2802 res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2803 test_bit(target, dlm->domain_map) ? "no":"yes");
2804 } else {
2805 mlog(0, "all is well: migrating? %s, dead? %s\n",
2806 res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2807 test_bit(target, dlm->domain_map) ? "no":"yes");
2808 }
2809 if (!dlm_migration_can_proceed(dlm, res, target)) {
2810 mlog(0, "trying again...\n");
2811 goto again;
2812 }
2813
2814 ret = 0;
2815 /* did the target go down or die? */
2816 spin_lock(&dlm->spinlock);
2817 if (!test_bit(target, dlm->domain_map)) {
2818 mlog(ML_ERROR, "aha. migration target %u just went down\n",
2819 target);
2820 ret = -EHOSTDOWN;
2821 }
2822 spin_unlock(&dlm->spinlock);
2823
2824 /*
2825 * if target is down, we need to clear DLM_LOCK_RES_BLOCK_DIRTY for
2826 * another try; otherwise, we are sure the MIGRATING state is there,
2827 * drop the unneded state which blocked threads trying to DIRTY
2828 */
2829 spin_lock(&res->spinlock);
2830 BUG_ON(!(res->state & DLM_LOCK_RES_BLOCK_DIRTY));
2831 res->state &= ~DLM_LOCK_RES_BLOCK_DIRTY;
2832 if (!ret)
2833 BUG_ON(!(res->state & DLM_LOCK_RES_MIGRATING));
2834 spin_unlock(&res->spinlock);
2835
2836 /*
2837 * at this point:
2838 *
2839 * o the DLM_LOCK_RES_MIGRATING flag is set if target not down
2840 * o there are no pending asts on this lockres
2841 * o all processes trying to reserve an ast on this
2842 * lockres must wait for the MIGRATING flag to clear
2843 */
2844 return ret;
2845 }
2846
2847 /* last step in the migration process.
2848 * original master calls this to free all of the dlm_lock
2849 * structures that used to be for other nodes. */
dlm_remove_nonlocal_locks(struct dlm_ctxt * dlm,struct dlm_lock_resource * res)2850 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2851 struct dlm_lock_resource *res)
2852 {
2853 struct list_head *queue = &res->granted;
2854 int i, bit;
2855 struct dlm_lock *lock, *next;
2856
2857 assert_spin_locked(&res->spinlock);
2858
2859 BUG_ON(res->owner == dlm->node_num);
2860
2861 for (i=0; i<3; i++) {
2862 list_for_each_entry_safe(lock, next, queue, list) {
2863 if (lock->ml.node != dlm->node_num) {
2864 mlog(0, "putting lock for node %u\n",
2865 lock->ml.node);
2866 /* be extra careful */
2867 BUG_ON(!list_empty(&lock->ast_list));
2868 BUG_ON(!list_empty(&lock->bast_list));
2869 BUG_ON(lock->ast_pending);
2870 BUG_ON(lock->bast_pending);
2871 dlm_lockres_clear_refmap_bit(dlm, res,
2872 lock->ml.node);
2873 list_del_init(&lock->list);
2874 dlm_lock_put(lock);
2875 /* In a normal unlock, we would have added a
2876 * DLM_UNLOCK_FREE_LOCK action. Force it. */
2877 dlm_lock_put(lock);
2878 }
2879 }
2880 queue++;
2881 }
2882 bit = 0;
2883 while (1) {
2884 bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
2885 if (bit >= O2NM_MAX_NODES)
2886 break;
2887 /* do not clear the local node reference, if there is a
2888 * process holding this, let it drop the ref itself */
2889 if (bit != dlm->node_num) {
2890 mlog(0, "%s:%.*s: node %u had a ref to this "
2891 "migrating lockres, clearing\n", dlm->name,
2892 res->lockname.len, res->lockname.name, bit);
2893 dlm_lockres_clear_refmap_bit(dlm, res, bit);
2894 }
2895 bit++;
2896 }
2897 }
2898
2899 /*
2900 * Pick a node to migrate the lock resource to. This function selects a
2901 * potential target based first on the locks and then on refmap. It skips
2902 * nodes that are in the process of exiting the domain.
2903 */
dlm_pick_migration_target(struct dlm_ctxt * dlm,struct dlm_lock_resource * res)2904 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
2905 struct dlm_lock_resource *res)
2906 {
2907 enum dlm_lockres_list idx;
2908 struct list_head *queue = &res->granted;
2909 struct dlm_lock *lock;
2910 int noderef;
2911 u8 nodenum = O2NM_MAX_NODES;
2912
2913 assert_spin_locked(&dlm->spinlock);
2914 assert_spin_locked(&res->spinlock);
2915
2916 /* Go through all the locks */
2917 for (idx = DLM_GRANTED_LIST; idx <= DLM_BLOCKED_LIST; idx++) {
2918 queue = dlm_list_idx_to_ptr(res, idx);
2919 list_for_each_entry(lock, queue, list) {
2920 if (lock->ml.node == dlm->node_num)
2921 continue;
2922 if (test_bit(lock->ml.node, dlm->exit_domain_map))
2923 continue;
2924 nodenum = lock->ml.node;
2925 goto bail;
2926 }
2927 }
2928
2929 /* Go thru the refmap */
2930 noderef = -1;
2931 while (1) {
2932 noderef = find_next_bit(res->refmap, O2NM_MAX_NODES,
2933 noderef + 1);
2934 if (noderef >= O2NM_MAX_NODES)
2935 break;
2936 if (noderef == dlm->node_num)
2937 continue;
2938 if (test_bit(noderef, dlm->exit_domain_map))
2939 continue;
2940 nodenum = noderef;
2941 goto bail;
2942 }
2943
2944 bail:
2945 return nodenum;
2946 }
2947
2948 /* this is called by the new master once all lockres
2949 * data has been received */
dlm_do_migrate_request(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,u8 master,u8 new_master,struct dlm_node_iter * iter)2950 static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
2951 struct dlm_lock_resource *res,
2952 u8 master, u8 new_master,
2953 struct dlm_node_iter *iter)
2954 {
2955 struct dlm_migrate_request migrate;
2956 int ret, skip, status = 0;
2957 int nodenum;
2958
2959 memset(&migrate, 0, sizeof(migrate));
2960 migrate.namelen = res->lockname.len;
2961 memcpy(migrate.name, res->lockname.name, migrate.namelen);
2962 migrate.new_master = new_master;
2963 migrate.master = master;
2964
2965 ret = 0;
2966
2967 /* send message to all nodes, except the master and myself */
2968 while ((nodenum = dlm_node_iter_next(iter)) >= 0) {
2969 if (nodenum == master ||
2970 nodenum == new_master)
2971 continue;
2972
2973 /* We could race exit domain. If exited, skip. */
2974 spin_lock(&dlm->spinlock);
2975 skip = (!test_bit(nodenum, dlm->domain_map));
2976 spin_unlock(&dlm->spinlock);
2977 if (skip) {
2978 clear_bit(nodenum, iter->node_map);
2979 continue;
2980 }
2981
2982 ret = o2net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key,
2983 &migrate, sizeof(migrate), nodenum,
2984 &status);
2985 if (ret < 0) {
2986 mlog(ML_ERROR, "%s: res %.*s, Error %d send "
2987 "MIGRATE_REQUEST to node %u\n", dlm->name,
2988 migrate.namelen, migrate.name, ret, nodenum);
2989 if (!dlm_is_host_down(ret)) {
2990 mlog(ML_ERROR, "unhandled error=%d!\n", ret);
2991 BUG();
2992 }
2993 clear_bit(nodenum, iter->node_map);
2994 ret = 0;
2995 } else if (status < 0) {
2996 mlog(0, "migrate request (node %u) returned %d!\n",
2997 nodenum, status);
2998 ret = status;
2999 } else if (status == DLM_MIGRATE_RESPONSE_MASTERY_REF) {
3000 /* during the migration request we short-circuited
3001 * the mastery of the lockres. make sure we have
3002 * a mastery ref for nodenum */
3003 mlog(0, "%s:%.*s: need ref for node %u\n",
3004 dlm->name, res->lockname.len, res->lockname.name,
3005 nodenum);
3006 spin_lock(&res->spinlock);
3007 dlm_lockres_set_refmap_bit(dlm, res, nodenum);
3008 spin_unlock(&res->spinlock);
3009 }
3010 }
3011
3012 if (ret < 0)
3013 mlog_errno(ret);
3014
3015 mlog(0, "returning ret=%d\n", ret);
3016 return ret;
3017 }
3018
3019
3020 /* if there is an existing mle for this lockres, we now know who the master is.
3021 * (the one who sent us *this* message) we can clear it up right away.
3022 * since the process that put the mle on the list still has a reference to it,
3023 * we can unhash it now, set the master and wake the process. as a result,
3024 * we will have no mle in the list to start with. now we can add an mle for
3025 * the migration and this should be the only one found for those scanning the
3026 * list. */
dlm_migrate_request_handler(struct o2net_msg * msg,u32 len,void * data,void ** ret_data)3027 int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data,
3028 void **ret_data)
3029 {
3030 struct dlm_ctxt *dlm = data;
3031 struct dlm_lock_resource *res = NULL;
3032 struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf;
3033 struct dlm_master_list_entry *mle = NULL, *oldmle = NULL;
3034 const char *name;
3035 unsigned int namelen, hash;
3036 int ret = 0;
3037
3038 if (!dlm_grab(dlm))
3039 return -EINVAL;
3040
3041 name = migrate->name;
3042 namelen = migrate->namelen;
3043 hash = dlm_lockid_hash(name, namelen);
3044
3045 /* preallocate.. if this fails, abort */
3046 mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
3047
3048 if (!mle) {
3049 ret = -ENOMEM;
3050 goto leave;
3051 }
3052
3053 /* check for pre-existing lock */
3054 spin_lock(&dlm->spinlock);
3055 res = __dlm_lookup_lockres(dlm, name, namelen, hash);
3056 if (res) {
3057 spin_lock(&res->spinlock);
3058 if (res->state & DLM_LOCK_RES_RECOVERING) {
3059 /* if all is working ok, this can only mean that we got
3060 * a migrate request from a node that we now see as
3061 * dead. what can we do here? drop it to the floor? */
3062 spin_unlock(&res->spinlock);
3063 mlog(ML_ERROR, "Got a migrate request, but the "
3064 "lockres is marked as recovering!");
3065 kmem_cache_free(dlm_mle_cache, mle);
3066 ret = -EINVAL; /* need a better solution */
3067 goto unlock;
3068 }
3069 res->state |= DLM_LOCK_RES_MIGRATING;
3070 spin_unlock(&res->spinlock);
3071 }
3072
3073 spin_lock(&dlm->master_lock);
3074 /* ignore status. only nonzero status would BUG. */
3075 ret = dlm_add_migration_mle(dlm, res, mle, &oldmle,
3076 name, namelen,
3077 migrate->new_master,
3078 migrate->master);
3079
3080 spin_unlock(&dlm->master_lock);
3081 unlock:
3082 spin_unlock(&dlm->spinlock);
3083
3084 if (oldmle) {
3085 /* master is known, detach if not already detached */
3086 dlm_mle_detach_hb_events(dlm, oldmle);
3087 dlm_put_mle(oldmle);
3088 }
3089
3090 if (res)
3091 dlm_lockres_put(res);
3092 leave:
3093 dlm_put(dlm);
3094 return ret;
3095 }
3096
3097 /* must be holding dlm->spinlock and dlm->master_lock
3098 * when adding a migration mle, we can clear any other mles
3099 * in the master list because we know with certainty that
3100 * the master is "master". so we remove any old mle from
3101 * the list after setting it's master field, and then add
3102 * the new migration mle. this way we can hold with the rule
3103 * of having only one mle for a given lock name at all times. */
dlm_add_migration_mle(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,struct dlm_master_list_entry * mle,struct dlm_master_list_entry ** oldmle,const char * name,unsigned int namelen,u8 new_master,u8 master)3104 static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
3105 struct dlm_lock_resource *res,
3106 struct dlm_master_list_entry *mle,
3107 struct dlm_master_list_entry **oldmle,
3108 const char *name, unsigned int namelen,
3109 u8 new_master, u8 master)
3110 {
3111 int found;
3112 int ret = 0;
3113
3114 *oldmle = NULL;
3115
3116 assert_spin_locked(&dlm->spinlock);
3117 assert_spin_locked(&dlm->master_lock);
3118
3119 /* caller is responsible for any ref taken here on oldmle */
3120 found = dlm_find_mle(dlm, oldmle, (char *)name, namelen);
3121 if (found) {
3122 struct dlm_master_list_entry *tmp = *oldmle;
3123 spin_lock(&tmp->spinlock);
3124 if (tmp->type == DLM_MLE_MIGRATION) {
3125 if (master == dlm->node_num) {
3126 /* ah another process raced me to it */
3127 mlog(0, "tried to migrate %.*s, but some "
3128 "process beat me to it\n",
3129 namelen, name);
3130 ret = -EEXIST;
3131 } else {
3132 /* bad. 2 NODES are trying to migrate! */
3133 mlog(ML_ERROR, "migration error mle: "
3134 "master=%u new_master=%u // request: "
3135 "master=%u new_master=%u // "
3136 "lockres=%.*s\n",
3137 tmp->master, tmp->new_master,
3138 master, new_master,
3139 namelen, name);
3140 BUG();
3141 }
3142 } else {
3143 /* this is essentially what assert_master does */
3144 tmp->master = master;
3145 atomic_set(&tmp->woken, 1);
3146 wake_up(&tmp->wq);
3147 /* remove it so that only one mle will be found */
3148 __dlm_unlink_mle(dlm, tmp);
3149 __dlm_mle_detach_hb_events(dlm, tmp);
3150 if (tmp->type == DLM_MLE_MASTER) {
3151 ret = DLM_MIGRATE_RESPONSE_MASTERY_REF;
3152 mlog(0, "%s:%.*s: master=%u, newmaster=%u, "
3153 "telling master to get ref "
3154 "for cleared out mle during "
3155 "migration\n", dlm->name,
3156 namelen, name, master,
3157 new_master);
3158 }
3159 }
3160 spin_unlock(&tmp->spinlock);
3161 }
3162
3163 /* now add a migration mle to the tail of the list */
3164 dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
3165 mle->new_master = new_master;
3166 /* the new master will be sending an assert master for this.
3167 * at that point we will get the refmap reference */
3168 mle->master = master;
3169 /* do this for consistency with other mle types */
3170 set_bit(new_master, mle->maybe_map);
3171 __dlm_insert_mle(dlm, mle);
3172
3173 return ret;
3174 }
3175
3176 /*
3177 * Sets the owner of the lockres, associated to the mle, to UNKNOWN
3178 */
dlm_reset_mleres_owner(struct dlm_ctxt * dlm,struct dlm_master_list_entry * mle)3179 static struct dlm_lock_resource *dlm_reset_mleres_owner(struct dlm_ctxt *dlm,
3180 struct dlm_master_list_entry *mle)
3181 {
3182 struct dlm_lock_resource *res;
3183
3184 /* Find the lockres associated to the mle and set its owner to UNK */
3185 res = __dlm_lookup_lockres(dlm, mle->mname, mle->mnamelen,
3186 mle->mnamehash);
3187 if (res) {
3188 spin_unlock(&dlm->master_lock);
3189
3190 /* move lockres onto recovery list */
3191 spin_lock(&res->spinlock);
3192 dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
3193 dlm_move_lockres_to_recovery_list(dlm, res);
3194 spin_unlock(&res->spinlock);
3195 dlm_lockres_put(res);
3196
3197 /* about to get rid of mle, detach from heartbeat */
3198 __dlm_mle_detach_hb_events(dlm, mle);
3199
3200 /* dump the mle */
3201 spin_lock(&dlm->master_lock);
3202 __dlm_put_mle(mle);
3203 spin_unlock(&dlm->master_lock);
3204 }
3205
3206 return res;
3207 }
3208
dlm_clean_migration_mle(struct dlm_ctxt * dlm,struct dlm_master_list_entry * mle)3209 static void dlm_clean_migration_mle(struct dlm_ctxt *dlm,
3210 struct dlm_master_list_entry *mle)
3211 {
3212 __dlm_mle_detach_hb_events(dlm, mle);
3213
3214 spin_lock(&mle->spinlock);
3215 __dlm_unlink_mle(dlm, mle);
3216 atomic_set(&mle->woken, 1);
3217 spin_unlock(&mle->spinlock);
3218
3219 wake_up(&mle->wq);
3220 }
3221
dlm_clean_block_mle(struct dlm_ctxt * dlm,struct dlm_master_list_entry * mle,u8 dead_node)3222 static void dlm_clean_block_mle(struct dlm_ctxt *dlm,
3223 struct dlm_master_list_entry *mle, u8 dead_node)
3224 {
3225 int bit;
3226
3227 BUG_ON(mle->type != DLM_MLE_BLOCK);
3228
3229 spin_lock(&mle->spinlock);
3230 bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
3231 if (bit != dead_node) {
3232 mlog(0, "mle found, but dead node %u would not have been "
3233 "master\n", dead_node);
3234 spin_unlock(&mle->spinlock);
3235 } else {
3236 /* Must drop the refcount by one since the assert_master will
3237 * never arrive. This may result in the mle being unlinked and
3238 * freed, but there may still be a process waiting in the
3239 * dlmlock path which is fine. */
3240 mlog(0, "node %u was expected master\n", dead_node);
3241 atomic_set(&mle->woken, 1);
3242 spin_unlock(&mle->spinlock);
3243 wake_up(&mle->wq);
3244
3245 /* Do not need events any longer, so detach from heartbeat */
3246 __dlm_mle_detach_hb_events(dlm, mle);
3247 __dlm_put_mle(mle);
3248 }
3249 }
3250
dlm_clean_master_list(struct dlm_ctxt * dlm,u8 dead_node)3251 void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
3252 {
3253 struct dlm_master_list_entry *mle;
3254 struct dlm_lock_resource *res;
3255 struct hlist_head *bucket;
3256 struct hlist_node *tmp;
3257 unsigned int i;
3258
3259 mlog(0, "dlm=%s, dead node=%u\n", dlm->name, dead_node);
3260 top:
3261 assert_spin_locked(&dlm->spinlock);
3262
3263 /* clean the master list */
3264 spin_lock(&dlm->master_lock);
3265 for (i = 0; i < DLM_HASH_BUCKETS; i++) {
3266 bucket = dlm_master_hash(dlm, i);
3267 hlist_for_each_entry_safe(mle, tmp, bucket, master_hash_node) {
3268 BUG_ON(mle->type != DLM_MLE_BLOCK &&
3269 mle->type != DLM_MLE_MASTER &&
3270 mle->type != DLM_MLE_MIGRATION);
3271
3272 /* MASTER mles are initiated locally. The waiting
3273 * process will notice the node map change shortly.
3274 * Let that happen as normal. */
3275 if (mle->type == DLM_MLE_MASTER)
3276 continue;
3277
3278 /* BLOCK mles are initiated by other nodes. Need to
3279 * clean up if the dead node would have been the
3280 * master. */
3281 if (mle->type == DLM_MLE_BLOCK) {
3282 dlm_clean_block_mle(dlm, mle, dead_node);
3283 continue;
3284 }
3285
3286 /* Everything else is a MIGRATION mle */
3287
3288 /* The rule for MIGRATION mles is that the master
3289 * becomes UNKNOWN if *either* the original or the new
3290 * master dies. All UNKNOWN lockres' are sent to
3291 * whichever node becomes the recovery master. The new
3292 * master is responsible for determining if there is
3293 * still a master for this lockres, or if he needs to
3294 * take over mastery. Either way, this node should
3295 * expect another message to resolve this. */
3296
3297 if (mle->master != dead_node &&
3298 mle->new_master != dead_node)
3299 continue;
3300
3301 if (mle->new_master == dead_node && mle->inuse) {
3302 mlog(ML_NOTICE, "%s: target %u died during "
3303 "migration from %u, the MLE is "
3304 "still keep used, ignore it!\n",
3305 dlm->name, dead_node,
3306 mle->master);
3307 continue;
3308 }
3309
3310 /* If we have reached this point, this mle needs to be
3311 * removed from the list and freed. */
3312 dlm_clean_migration_mle(dlm, mle);
3313
3314 mlog(0, "%s: node %u died during migration from "
3315 "%u to %u!\n", dlm->name, dead_node, mle->master,
3316 mle->new_master);
3317
3318 /* If we find a lockres associated with the mle, we've
3319 * hit this rare case that messes up our lock ordering.
3320 * If so, we need to drop the master lock so that we can
3321 * take the lockres lock, meaning that we will have to
3322 * restart from the head of list. */
3323 res = dlm_reset_mleres_owner(dlm, mle);
3324 if (res)
3325 /* restart */
3326 goto top;
3327
3328 /* This may be the last reference */
3329 __dlm_put_mle(mle);
3330 }
3331 }
3332 spin_unlock(&dlm->master_lock);
3333 }
3334
dlm_finish_migration(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,u8 old_master)3335 int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
3336 u8 old_master)
3337 {
3338 struct dlm_node_iter iter;
3339 int ret = 0;
3340
3341 spin_lock(&dlm->spinlock);
3342 dlm_node_iter_init(dlm->domain_map, &iter);
3343 clear_bit(old_master, iter.node_map);
3344 clear_bit(dlm->node_num, iter.node_map);
3345 spin_unlock(&dlm->spinlock);
3346
3347 /* ownership of the lockres is changing. account for the
3348 * mastery reference here since old_master will briefly have
3349 * a reference after the migration completes */
3350 spin_lock(&res->spinlock);
3351 dlm_lockres_set_refmap_bit(dlm, res, old_master);
3352 spin_unlock(&res->spinlock);
3353
3354 mlog(0, "now time to do a migrate request to other nodes\n");
3355 ret = dlm_do_migrate_request(dlm, res, old_master,
3356 dlm->node_num, &iter);
3357 if (ret < 0) {
3358 mlog_errno(ret);
3359 goto leave;
3360 }
3361
3362 mlog(0, "doing assert master of %.*s to all except the original node\n",
3363 res->lockname.len, res->lockname.name);
3364 /* this call now finishes out the nodemap
3365 * even if one or more nodes die */
3366 ret = dlm_do_assert_master(dlm, res, iter.node_map,
3367 DLM_ASSERT_MASTER_FINISH_MIGRATION);
3368 if (ret < 0) {
3369 /* no longer need to retry. all living nodes contacted. */
3370 mlog_errno(ret);
3371 ret = 0;
3372 }
3373
3374 memset(iter.node_map, 0, sizeof(iter.node_map));
3375 set_bit(old_master, iter.node_map);
3376 mlog(0, "doing assert master of %.*s back to %u\n",
3377 res->lockname.len, res->lockname.name, old_master);
3378 ret = dlm_do_assert_master(dlm, res, iter.node_map,
3379 DLM_ASSERT_MASTER_FINISH_MIGRATION);
3380 if (ret < 0) {
3381 mlog(0, "assert master to original master failed "
3382 "with %d.\n", ret);
3383 /* the only nonzero status here would be because of
3384 * a dead original node. we're done. */
3385 ret = 0;
3386 }
3387
3388 /* all done, set the owner, clear the flag */
3389 spin_lock(&res->spinlock);
3390 dlm_set_lockres_owner(dlm, res, dlm->node_num);
3391 res->state &= ~DLM_LOCK_RES_MIGRATING;
3392 spin_unlock(&res->spinlock);
3393 /* re-dirty it on the new master */
3394 dlm_kick_thread(dlm, res);
3395 wake_up(&res->wq);
3396 leave:
3397 return ret;
3398 }
3399
3400 /*
3401 * LOCKRES AST REFCOUNT
3402 * this is integral to migration
3403 */
3404
3405 /* for future intent to call an ast, reserve one ahead of time.
3406 * this should be called only after waiting on the lockres
3407 * with dlm_wait_on_lockres, and while still holding the
3408 * spinlock after the call. */
__dlm_lockres_reserve_ast(struct dlm_lock_resource * res)3409 void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res)
3410 {
3411 assert_spin_locked(&res->spinlock);
3412 if (res->state & DLM_LOCK_RES_MIGRATING) {
3413 __dlm_print_one_lock_resource(res);
3414 }
3415 BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
3416
3417 atomic_inc(&res->asts_reserved);
3418 }
3419
3420 /*
3421 * used to drop the reserved ast, either because it went unused,
3422 * or because the ast/bast was actually called.
3423 *
3424 * also, if there is a pending migration on this lockres,
3425 * and this was the last pending ast on the lockres,
3426 * atomically set the MIGRATING flag before we drop the lock.
3427 * this is how we ensure that migration can proceed with no
3428 * asts in progress. note that it is ok if the state of the
3429 * queues is such that a lock should be granted in the future
3430 * or that a bast should be fired, because the new master will
3431 * shuffle the lists on this lockres as soon as it is migrated.
3432 */
dlm_lockres_release_ast(struct dlm_ctxt * dlm,struct dlm_lock_resource * res)3433 void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
3434 struct dlm_lock_resource *res)
3435 {
3436 if (!atomic_dec_and_lock(&res->asts_reserved, &res->spinlock))
3437 return;
3438
3439 if (!res->migration_pending) {
3440 spin_unlock(&res->spinlock);
3441 return;
3442 }
3443
3444 BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
3445 res->migration_pending = 0;
3446 res->state |= DLM_LOCK_RES_MIGRATING;
3447 spin_unlock(&res->spinlock);
3448 wake_up(&res->wq);
3449 wake_up(&dlm->migration_wq);
3450 }
3451
dlm_force_free_mles(struct dlm_ctxt * dlm)3452 void dlm_force_free_mles(struct dlm_ctxt *dlm)
3453 {
3454 int i;
3455 struct hlist_head *bucket;
3456 struct dlm_master_list_entry *mle;
3457 struct hlist_node *tmp;
3458
3459 /*
3460 * We notified all other nodes that we are exiting the domain and
3461 * marked the dlm state to DLM_CTXT_LEAVING. If any mles are still
3462 * around we force free them and wake any processes that are waiting
3463 * on the mles
3464 */
3465 spin_lock(&dlm->spinlock);
3466 spin_lock(&dlm->master_lock);
3467
3468 BUG_ON(dlm->dlm_state != DLM_CTXT_LEAVING);
3469 BUG_ON((find_next_bit(dlm->domain_map, O2NM_MAX_NODES, 0) < O2NM_MAX_NODES));
3470
3471 for (i = 0; i < DLM_HASH_BUCKETS; i++) {
3472 bucket = dlm_master_hash(dlm, i);
3473 hlist_for_each_entry_safe(mle, tmp, bucket, master_hash_node) {
3474 if (mle->type != DLM_MLE_BLOCK) {
3475 mlog(ML_ERROR, "bad mle: %p\n", mle);
3476 dlm_print_one_mle(mle);
3477 }
3478 atomic_set(&mle->woken, 1);
3479 wake_up(&mle->wq);
3480
3481 __dlm_unlink_mle(dlm, mle);
3482 __dlm_mle_detach_hb_events(dlm, mle);
3483 __dlm_put_mle(mle);
3484 }
3485 }
3486 spin_unlock(&dlm->master_lock);
3487 spin_unlock(&dlm->spinlock);
3488 }
3489