• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
6 **
7 **
8 *******************************************************************************
9 ******************************************************************************/
10 
11 /* Central locking logic has four stages:
12 
13    dlm_lock()
14    dlm_unlock()
15 
16    request_lock(ls, lkb)
17    convert_lock(ls, lkb)
18    unlock_lock(ls, lkb)
19    cancel_lock(ls, lkb)
20 
21    _request_lock(r, lkb)
22    _convert_lock(r, lkb)
23    _unlock_lock(r, lkb)
24    _cancel_lock(r, lkb)
25 
26    do_request(r, lkb)
27    do_convert(r, lkb)
28    do_unlock(r, lkb)
29    do_cancel(r, lkb)
30 
31    Stage 1 (lock, unlock) is mainly about checking input args and
32    splitting into one of the four main operations:
33 
34        dlm_lock          = request_lock
35        dlm_lock+CONVERT  = convert_lock
36        dlm_unlock        = unlock_lock
37        dlm_unlock+CANCEL = cancel_lock
38 
39    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
40    provided to the next stage.
41 
42    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
43    When remote, it calls send_xxxx(), when local it calls do_xxxx().
44 
45    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
46    given rsb and lkb and queues callbacks.
47 
48    For remote operations, send_xxxx() results in the corresponding do_xxxx()
49    function being executed on the remote node.  The connecting send/receive
50    calls on local (L) and remote (R) nodes:
51 
52    L: send_xxxx()              ->  R: receive_xxxx()
53                                    R: do_xxxx()
54    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
55 */
56 #include <trace/events/dlm.h>
57 
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "midcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77 
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89 				    const struct dlm_message *ms, bool local);
90 static int receive_extralen(const struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void deactivate_rsb(struct kref *kref);
93 
94 /*
95  * Lock compatibilty matrix - thanks Steve
96  * UN = Unlocked state. Not really a state, used as a flag
97  * PD = Padding. Used to make the matrix a nice power of two in size
98  * Other states are the same as the VMS DLM.
99  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
100  */
101 
102 static const int __dlm_compat_matrix[8][8] = {
103       /* UN NL CR CW PR PW EX PD */
104         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
105         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
106         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
107         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
108         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
109         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
110         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
111         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
112 };
113 
114 /*
115  * This defines the direction of transfer of LVB data.
116  * Granted mode is the row; requested mode is the column.
117  * Usage: matrix[grmode+1][rqmode+1]
118  * 1 = LVB is returned to the caller
119  * 0 = LVB is written to the resource
120  * -1 = nothing happens to the LVB
121  */
122 
123 const int dlm_lvb_operations[8][8] = {
124         /* UN   NL  CR  CW  PR  PW  EX  PD*/
125         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
126         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
127         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
128         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
129         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
130         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
131         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
132         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
133 };
134 
135 #define modes_compat(gr, rq) \
136 	__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
137 
dlm_modes_compat(int mode1,int mode2)138 int dlm_modes_compat(int mode1, int mode2)
139 {
140 	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
141 }
142 
143 /*
144  * Compatibility matrix for conversions with QUECVT set.
145  * Granted mode is the row; requested mode is the column.
146  * Usage: matrix[grmode+1][rqmode+1]
147  */
148 
149 static const int __quecvt_compat_matrix[8][8] = {
150       /* UN NL CR CW PR PW EX PD */
151         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
152         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
153         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
154         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
155         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
156         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
157         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
158         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
159 };
160 
dlm_print_lkb(struct dlm_lkb * lkb)161 void dlm_print_lkb(struct dlm_lkb *lkb)
162 {
163 	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
164 	       "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
165 	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166 	       dlm_iflags_val(lkb), lkb->lkb_status, lkb->lkb_rqmode,
167 	       lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
168 	       (unsigned long long)lkb->lkb_recover_seq);
169 }
170 
dlm_print_rsb(struct dlm_rsb * r)171 static void dlm_print_rsb(struct dlm_rsb *r)
172 {
173 	printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
174 	       "rlc %d name %s\n",
175 	       r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
176 	       r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
177 	       r->res_name);
178 }
179 
dlm_dump_rsb(struct dlm_rsb * r)180 void dlm_dump_rsb(struct dlm_rsb *r)
181 {
182 	struct dlm_lkb *lkb;
183 
184 	dlm_print_rsb(r);
185 
186 	printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
187 	       list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
188 	printk(KERN_ERR "rsb lookup list\n");
189 	list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
190 		dlm_print_lkb(lkb);
191 	printk(KERN_ERR "rsb grant queue:\n");
192 	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
193 		dlm_print_lkb(lkb);
194 	printk(KERN_ERR "rsb convert queue:\n");
195 	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
196 		dlm_print_lkb(lkb);
197 	printk(KERN_ERR "rsb wait queue:\n");
198 	list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
199 		dlm_print_lkb(lkb);
200 }
201 
202 /* Threads cannot use the lockspace while it's being recovered */
203 
dlm_lock_recovery(struct dlm_ls * ls)204 void dlm_lock_recovery(struct dlm_ls *ls)
205 {
206 	down_read(&ls->ls_in_recovery);
207 }
208 
dlm_unlock_recovery(struct dlm_ls * ls)209 void dlm_unlock_recovery(struct dlm_ls *ls)
210 {
211 	up_read(&ls->ls_in_recovery);
212 }
213 
dlm_lock_recovery_try(struct dlm_ls * ls)214 int dlm_lock_recovery_try(struct dlm_ls *ls)
215 {
216 	return down_read_trylock(&ls->ls_in_recovery);
217 }
218 
can_be_queued(struct dlm_lkb * lkb)219 static inline int can_be_queued(struct dlm_lkb *lkb)
220 {
221 	return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
222 }
223 
force_blocking_asts(struct dlm_lkb * lkb)224 static inline int force_blocking_asts(struct dlm_lkb *lkb)
225 {
226 	return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
227 }
228 
is_demoted(struct dlm_lkb * lkb)229 static inline int is_demoted(struct dlm_lkb *lkb)
230 {
231 	return test_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
232 }
233 
is_altmode(struct dlm_lkb * lkb)234 static inline int is_altmode(struct dlm_lkb *lkb)
235 {
236 	return test_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
237 }
238 
is_granted(struct dlm_lkb * lkb)239 static inline int is_granted(struct dlm_lkb *lkb)
240 {
241 	return (lkb->lkb_status == DLM_LKSTS_GRANTED);
242 }
243 
is_remote(struct dlm_rsb * r)244 static inline int is_remote(struct dlm_rsb *r)
245 {
246 	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
247 	return !!r->res_nodeid;
248 }
249 
is_process_copy(struct dlm_lkb * lkb)250 static inline int is_process_copy(struct dlm_lkb *lkb)
251 {
252 	return lkb->lkb_nodeid &&
253 	       !test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
254 }
255 
is_master_copy(struct dlm_lkb * lkb)256 static inline int is_master_copy(struct dlm_lkb *lkb)
257 {
258 	return test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
259 }
260 
middle_conversion(struct dlm_lkb * lkb)261 static inline int middle_conversion(struct dlm_lkb *lkb)
262 {
263 	if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264 	    (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265 		return 1;
266 	return 0;
267 }
268 
down_conversion(struct dlm_lkb * lkb)269 static inline int down_conversion(struct dlm_lkb *lkb)
270 {
271 	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272 }
273 
is_overlap_unlock(struct dlm_lkb * lkb)274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275 {
276 	return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
277 }
278 
is_overlap_cancel(struct dlm_lkb * lkb)279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280 {
281 	return test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
282 }
283 
is_overlap(struct dlm_lkb * lkb)284 static inline int is_overlap(struct dlm_lkb *lkb)
285 {
286 	return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags) ||
287 	       test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
288 }
289 
queue_cast(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291 {
292 	if (is_master_copy(lkb))
293 		return;
294 
295 	DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
296 
297 	if (rv == -DLM_ECANCEL &&
298 	    test_and_clear_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags))
299 		rv = -EDEADLK;
300 
301 	dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, dlm_sbflags_val(lkb));
302 }
303 
queue_cast_overlap(struct dlm_rsb * r,struct dlm_lkb * lkb)304 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
305 {
306 	queue_cast(r, lkb,
307 		   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
308 }
309 
queue_bast(struct dlm_rsb * r,struct dlm_lkb * lkb,int rqmode)310 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
311 {
312 	if (is_master_copy(lkb)) {
313 		send_bast(r, lkb, rqmode);
314 	} else {
315 		dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
316 	}
317 }
318 
319 /*
320  * Basic operations on rsb's and lkb's
321  */
322 
rsb_toss_jiffies(void)323 static inline unsigned long rsb_toss_jiffies(void)
324 {
325 	return jiffies + (READ_ONCE(dlm_config.ci_toss_secs) * HZ);
326 }
327 
328 /* This is only called to add a reference when the code already holds
329    a valid reference to the rsb, so there's no need for locking. */
330 
hold_rsb(struct dlm_rsb * r)331 static inline void hold_rsb(struct dlm_rsb *r)
332 {
333 	/* inactive rsbs are not ref counted */
334 	WARN_ON(rsb_flag(r, RSB_INACTIVE));
335 	kref_get(&r->res_ref);
336 }
337 
dlm_hold_rsb(struct dlm_rsb * r)338 void dlm_hold_rsb(struct dlm_rsb *r)
339 {
340 	hold_rsb(r);
341 }
342 
343 /* TODO move this to lib/refcount.c */
344 static __must_check bool
dlm_refcount_dec_and_write_lock_bh(refcount_t * r,rwlock_t * lock)345 dlm_refcount_dec_and_write_lock_bh(refcount_t *r, rwlock_t *lock)
346 __cond_acquires(lock)
347 {
348 	if (refcount_dec_not_one(r))
349 		return false;
350 
351 	write_lock_bh(lock);
352 	if (!refcount_dec_and_test(r)) {
353 		write_unlock_bh(lock);
354 		return false;
355 	}
356 
357 	return true;
358 }
359 
360 /* TODO move this to include/linux/kref.h */
dlm_kref_put_write_lock_bh(struct kref * kref,void (* release)(struct kref * kref),rwlock_t * lock)361 static inline int dlm_kref_put_write_lock_bh(struct kref *kref,
362 					     void (*release)(struct kref *kref),
363 					     rwlock_t *lock)
364 {
365 	if (dlm_refcount_dec_and_write_lock_bh(&kref->refcount, lock)) {
366 		release(kref);
367 		return 1;
368 	}
369 
370 	return 0;
371 }
372 
put_rsb(struct dlm_rsb * r)373 static void put_rsb(struct dlm_rsb *r)
374 {
375 	struct dlm_ls *ls = r->res_ls;
376 	int rv;
377 
378 	rv = dlm_kref_put_write_lock_bh(&r->res_ref, deactivate_rsb,
379 					&ls->ls_rsbtbl_lock);
380 	if (rv)
381 		write_unlock_bh(&ls->ls_rsbtbl_lock);
382 }
383 
dlm_put_rsb(struct dlm_rsb * r)384 void dlm_put_rsb(struct dlm_rsb *r)
385 {
386 	put_rsb(r);
387 }
388 
389 /* connected with timer_delete_sync() in dlm_ls_stop() to stop
390  * new timers when recovery is triggered and don't run them
391  * again until a resume_scan_timer() tries it again.
392  */
enable_scan_timer(struct dlm_ls * ls,unsigned long jiffies)393 static void enable_scan_timer(struct dlm_ls *ls, unsigned long jiffies)
394 {
395 	if (!dlm_locking_stopped(ls))
396 		mod_timer(&ls->ls_scan_timer, jiffies);
397 }
398 
399 /* This function tries to resume the timer callback if a rsb
400  * is on the scan list and no timer is pending. It might that
401  * the first entry is on currently executed as timer callback
402  * but we don't care if a timer queued up again and does
403  * nothing. Should be a rare case.
404  */
resume_scan_timer(struct dlm_ls * ls)405 void resume_scan_timer(struct dlm_ls *ls)
406 {
407 	struct dlm_rsb *r;
408 
409 	spin_lock_bh(&ls->ls_scan_lock);
410 	r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
411 				     res_scan_list);
412 	if (r && !timer_pending(&ls->ls_scan_timer))
413 		enable_scan_timer(ls, r->res_toss_time);
414 	spin_unlock_bh(&ls->ls_scan_lock);
415 }
416 
417 /* ls_rsbtbl_lock must be held */
418 
del_scan(struct dlm_ls * ls,struct dlm_rsb * r)419 static void del_scan(struct dlm_ls *ls, struct dlm_rsb *r)
420 {
421 	struct dlm_rsb *first;
422 
423 	/* active rsbs should never be on the scan list */
424 	WARN_ON(!rsb_flag(r, RSB_INACTIVE));
425 
426 	spin_lock_bh(&ls->ls_scan_lock);
427 	r->res_toss_time = 0;
428 
429 	/* if the rsb is not queued do nothing */
430 	if (list_empty(&r->res_scan_list))
431 		goto out;
432 
433 	/* get the first element before delete */
434 	first = list_first_entry(&ls->ls_scan_list, struct dlm_rsb,
435 				 res_scan_list);
436 	list_del_init(&r->res_scan_list);
437 	/* check if the first element was the rsb we deleted */
438 	if (first == r) {
439 		/* try to get the new first element, if the list
440 		 * is empty now try to delete the timer, if we are
441 		 * too late we don't care.
442 		 *
443 		 * if the list isn't empty and a new first element got
444 		 * in place, set the new timer expire time.
445 		 */
446 		first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
447 						 res_scan_list);
448 		if (!first)
449 			timer_delete(&ls->ls_scan_timer);
450 		else
451 			enable_scan_timer(ls, first->res_toss_time);
452 	}
453 
454 out:
455 	spin_unlock_bh(&ls->ls_scan_lock);
456 }
457 
add_scan(struct dlm_ls * ls,struct dlm_rsb * r)458 static void add_scan(struct dlm_ls *ls, struct dlm_rsb *r)
459 {
460 	int our_nodeid = dlm_our_nodeid();
461 	struct dlm_rsb *first;
462 
463 	/* A dir record for a remote master rsb should never be on the scan list. */
464 	WARN_ON(!dlm_no_directory(ls) &&
465 		(r->res_master_nodeid != our_nodeid) &&
466 		(dlm_dir_nodeid(r) == our_nodeid));
467 
468 	/* An active rsb should never be on the scan list. */
469 	WARN_ON(!rsb_flag(r, RSB_INACTIVE));
470 
471 	/* An rsb should not already be on the scan list. */
472 	WARN_ON(!list_empty(&r->res_scan_list));
473 
474 	spin_lock_bh(&ls->ls_scan_lock);
475 	/* set the new rsb absolute expire time in the rsb */
476 	r->res_toss_time = rsb_toss_jiffies();
477 	if (list_empty(&ls->ls_scan_list)) {
478 		/* if the queue is empty add the element and it's
479 		 * our new expire time
480 		 */
481 		list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
482 		enable_scan_timer(ls, r->res_toss_time);
483 	} else {
484 		/* try to get the maybe new first element and then add
485 		 * to this rsb with the oldest expire time to the end
486 		 * of the queue. If the list was empty before this
487 		 * rsb expire time is our next expiration if it wasn't
488 		 * the now new first elemet is our new expiration time
489 		 */
490 		first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
491 						 res_scan_list);
492 		list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
493 		if (!first)
494 			enable_scan_timer(ls, r->res_toss_time);
495 		else
496 			enable_scan_timer(ls, first->res_toss_time);
497 	}
498 	spin_unlock_bh(&ls->ls_scan_lock);
499 }
500 
501 /* if we hit contention we do in 250 ms a retry to trylock.
502  * if there is any other mod_timer in between we don't care
503  * about that it expires earlier again this is only for the
504  * unlikely case nothing happened in this time.
505  */
506 #define DLM_TOSS_TIMER_RETRY	(jiffies + msecs_to_jiffies(250))
507 
508 /* Called by lockspace scan_timer to free unused rsb's. */
509 
dlm_rsb_scan(struct timer_list * timer)510 void dlm_rsb_scan(struct timer_list *timer)
511 {
512 	struct dlm_ls *ls = from_timer(ls, timer, ls_scan_timer);
513 	int our_nodeid = dlm_our_nodeid();
514 	struct dlm_rsb *r;
515 	int rv;
516 
517 	while (1) {
518 		/* interrupting point to leave iteration when
519 		 * recovery waits for timer_delete_sync(), recovery
520 		 * will take care to delete everything in scan list.
521 		 */
522 		if (dlm_locking_stopped(ls))
523 			break;
524 
525 		rv = spin_trylock(&ls->ls_scan_lock);
526 		if (!rv) {
527 			/* rearm again try timer */
528 			enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
529 			break;
530 		}
531 
532 		r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
533 					     res_scan_list);
534 		if (!r) {
535 			/* the next add_scan will enable the timer again */
536 			spin_unlock(&ls->ls_scan_lock);
537 			break;
538 		}
539 
540 		/*
541 		 * If the first rsb is not yet expired, then stop because the
542 		 * list is sorted with nearest expiration first.
543 		 */
544 		if (time_before(jiffies, r->res_toss_time)) {
545 			/* rearm with the next rsb to expire in the future */
546 			enable_scan_timer(ls, r->res_toss_time);
547 			spin_unlock(&ls->ls_scan_lock);
548 			break;
549 		}
550 
551 		/* in find_rsb_dir/nodir there is a reverse order of this
552 		 * lock, however this is only a trylock if we hit some
553 		 * possible contention we try it again.
554 		 */
555 		rv = write_trylock(&ls->ls_rsbtbl_lock);
556 		if (!rv) {
557 			spin_unlock(&ls->ls_scan_lock);
558 			/* rearm again try timer */
559 			enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
560 			break;
561 		}
562 
563 		list_del(&r->res_slow_list);
564 		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
565 				       dlm_rhash_rsb_params);
566 		rsb_clear_flag(r, RSB_HASHED);
567 
568 		/* ls_rsbtbl_lock is not needed when calling send_remove() */
569 		write_unlock(&ls->ls_rsbtbl_lock);
570 
571 		list_del_init(&r->res_scan_list);
572 		spin_unlock(&ls->ls_scan_lock);
573 
574 		/* An rsb that is a dir record for a remote master rsb
575 		 * cannot be removed, and should not have a timer enabled.
576 		 */
577 		WARN_ON(!dlm_no_directory(ls) &&
578 			(r->res_master_nodeid != our_nodeid) &&
579 			(dlm_dir_nodeid(r) == our_nodeid));
580 
581 		/* We're the master of this rsb but we're not
582 		 * the directory record, so we need to tell the
583 		 * dir node to remove the dir record
584 		 */
585 		if (!dlm_no_directory(ls) &&
586 		    (r->res_master_nodeid == our_nodeid) &&
587 		    (dlm_dir_nodeid(r) != our_nodeid))
588 			send_remove(r);
589 
590 		free_inactive_rsb(r);
591 	}
592 }
593 
594 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
595    unlock any spinlocks, go back and call pre_rsb_struct again.
596    Otherwise, take an rsb off the list and return it. */
597 
get_rsb_struct(struct dlm_ls * ls,const void * name,int len,struct dlm_rsb ** r_ret)598 static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
599 			  struct dlm_rsb **r_ret)
600 {
601 	struct dlm_rsb *r;
602 
603 	r = dlm_allocate_rsb();
604 	if (!r)
605 		return -ENOMEM;
606 
607 	r->res_ls = ls;
608 	r->res_length = len;
609 	memcpy(r->res_name, name, len);
610 	spin_lock_init(&r->res_lock);
611 
612 	INIT_LIST_HEAD(&r->res_lookup);
613 	INIT_LIST_HEAD(&r->res_grantqueue);
614 	INIT_LIST_HEAD(&r->res_convertqueue);
615 	INIT_LIST_HEAD(&r->res_waitqueue);
616 	INIT_LIST_HEAD(&r->res_root_list);
617 	INIT_LIST_HEAD(&r->res_scan_list);
618 	INIT_LIST_HEAD(&r->res_recover_list);
619 	INIT_LIST_HEAD(&r->res_masters_list);
620 
621 	*r_ret = r;
622 	return 0;
623 }
624 
dlm_search_rsb_tree(struct rhashtable * rhash,const void * name,int len,struct dlm_rsb ** r_ret)625 int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
626 			struct dlm_rsb **r_ret)
627 {
628 	char key[DLM_RESNAME_MAXLEN] = {};
629 
630 	memcpy(key, name, len);
631 	*r_ret = rhashtable_lookup_fast(rhash, &key, dlm_rhash_rsb_params);
632 	if (*r_ret)
633 		return 0;
634 
635 	return -EBADR;
636 }
637 
rsb_insert(struct dlm_rsb * rsb,struct rhashtable * rhash)638 static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
639 {
640 	int rv;
641 
642 	rv = rhashtable_insert_fast(rhash, &rsb->res_node,
643 				    dlm_rhash_rsb_params);
644 	if (!rv)
645 		rsb_set_flag(rsb, RSB_HASHED);
646 
647 	return rv;
648 }
649 
650 /*
651  * Find rsb in rsbtbl and potentially create/add one
652  *
653  * Delaying the release of rsb's has a similar benefit to applications keeping
654  * NL locks on an rsb, but without the guarantee that the cached master value
655  * will still be valid when the rsb is reused.  Apps aren't always smart enough
656  * to keep NL locks on an rsb that they may lock again shortly; this can lead
657  * to excessive master lookups and removals if we don't delay the release.
658  *
659  * Searching for an rsb means looking through both the normal list and toss
660  * list.  When found on the toss list the rsb is moved to the normal list with
661  * ref count of 1; when found on normal list the ref count is incremented.
662  *
663  * rsb's on the keep list are being used locally and refcounted.
664  * rsb's on the toss list are not being used locally, and are not refcounted.
665  *
666  * The toss list rsb's were either
667  * - previously used locally but not any more (were on keep list, then
668  *   moved to toss list when last refcount dropped)
669  * - created and put on toss list as a directory record for a lookup
670  *   (we are the dir node for the res, but are not using the res right now,
671  *   but some other node is)
672  *
673  * The purpose of find_rsb() is to return a refcounted rsb for local use.
674  * So, if the given rsb is on the toss list, it is moved to the keep list
675  * before being returned.
676  *
677  * deactivate_rsb() happens when all local usage of the rsb is done, i.e. no
678  * more refcounts exist, so the rsb is moved from the keep list to the
679  * toss list.
680  *
681  * rsb's on both keep and toss lists are used for doing a name to master
682  * lookups.  rsb's that are in use locally (and being refcounted) are on
683  * the keep list, rsb's that are not in use locally (not refcounted) and
684  * only exist for name/master lookups are on the toss list.
685  *
686  * rsb's on the toss list who's dir_nodeid is not local can have stale
687  * name/master mappings.  So, remote requests on such rsb's can potentially
688  * return with an error, which means the mapping is stale and needs to
689  * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
690  * first_lkid is to keep only a single outstanding request on an rsb
691  * while that rsb has a potentially stale master.)
692  */
693 
find_rsb_dir(struct dlm_ls * ls,const void * name,int len,uint32_t hash,int dir_nodeid,int from_nodeid,unsigned int flags,struct dlm_rsb ** r_ret)694 static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
695 			uint32_t hash, int dir_nodeid, int from_nodeid,
696 			unsigned int flags, struct dlm_rsb **r_ret)
697 {
698 	struct dlm_rsb *r = NULL;
699 	int our_nodeid = dlm_our_nodeid();
700 	int from_local = 0;
701 	int from_other = 0;
702 	int from_dir = 0;
703 	int create = 0;
704 	int error;
705 
706 	if (flags & R_RECEIVE_REQUEST) {
707 		if (from_nodeid == dir_nodeid)
708 			from_dir = 1;
709 		else
710 			from_other = 1;
711 	} else if (flags & R_REQUEST) {
712 		from_local = 1;
713 	}
714 
715 	/*
716 	 * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
717 	 * from_nodeid has sent us a lock in dlm_recover_locks, believing
718 	 * we're the new master.  Our local recovery may not have set
719 	 * res_master_nodeid to our_nodeid yet, so allow either.  Don't
720 	 * create the rsb; dlm_recover_process_copy() will handle EBADR
721 	 * by resending.
722 	 *
723 	 * If someone sends us a request, we are the dir node, and we do
724 	 * not find the rsb anywhere, then recreate it.  This happens if
725 	 * someone sends us a request after we have removed/freed an rsb.
726 	 * (They sent a request instead of lookup because they are using
727 	 * an rsb taken from their scan list.)
728 	 */
729 
730 	if (from_local || from_dir ||
731 	    (from_other && (dir_nodeid == our_nodeid))) {
732 		create = 1;
733 	}
734 
735  retry:
736 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
737 	if (error)
738 		goto do_new;
739 
740 	/* check if the rsb is active under read lock - likely path */
741 	read_lock_bh(&ls->ls_rsbtbl_lock);
742 	if (!rsb_flag(r, RSB_HASHED)) {
743 		read_unlock_bh(&ls->ls_rsbtbl_lock);
744 		error = -EBADR;
745 		goto do_new;
746 	}
747 
748 	/*
749 	 * rsb is active, so we can't check master_nodeid without lock_rsb.
750 	 */
751 
752 	if (rsb_flag(r, RSB_INACTIVE)) {
753 		read_unlock_bh(&ls->ls_rsbtbl_lock);
754 		goto do_inactive;
755 	}
756 
757 	kref_get(&r->res_ref);
758 	read_unlock_bh(&ls->ls_rsbtbl_lock);
759 	goto out;
760 
761 
762  do_inactive:
763 	write_lock_bh(&ls->ls_rsbtbl_lock);
764 
765 	/*
766 	 * The expectation here is that the rsb will have HASHED and
767 	 * INACTIVE flags set, and that the rsb can be moved from
768 	 * inactive back to active again.  However, between releasing
769 	 * the read lock and acquiring the write lock, this rsb could
770 	 * have been removed from rsbtbl, and had HASHED cleared, to
771 	 * be freed.  To deal with this case, we would normally need
772 	 * to repeat dlm_search_rsb_tree while holding the write lock,
773 	 * but rcu allows us to simply check the HASHED flag, because
774 	 * the rcu read lock means the rsb will not be freed yet.
775 	 * If the HASHED flag is not set, then the rsb is being freed,
776 	 * so we add a new rsb struct.  If the HASHED flag is set,
777 	 * and INACTIVE is not set, it means another thread has
778 	 * made the rsb active, as we're expecting to do here, and
779 	 * we just repeat the lookup (this will be very unlikely.)
780 	 */
781 	if (rsb_flag(r, RSB_HASHED)) {
782 		if (!rsb_flag(r, RSB_INACTIVE)) {
783 			write_unlock_bh(&ls->ls_rsbtbl_lock);
784 			goto retry;
785 		}
786 	} else {
787 		write_unlock_bh(&ls->ls_rsbtbl_lock);
788 		error = -EBADR;
789 		goto do_new;
790 	}
791 
792 	/*
793 	 * rsb found inactive (master_nodeid may be out of date unless
794 	 * we are the dir_nodeid or were the master)  No other thread
795 	 * is using this rsb because it's inactive, so we can
796 	 * look at or update res_master_nodeid without lock_rsb.
797 	 */
798 
799 	if ((r->res_master_nodeid != our_nodeid) && from_other) {
800 		/* our rsb was not master, and another node (not the dir node)
801 		   has sent us a request */
802 		log_debug(ls, "find_rsb inactive from_other %d master %d dir %d %s",
803 			  from_nodeid, r->res_master_nodeid, dir_nodeid,
804 			  r->res_name);
805 		write_unlock_bh(&ls->ls_rsbtbl_lock);
806 		error = -ENOTBLK;
807 		goto out;
808 	}
809 
810 	if ((r->res_master_nodeid != our_nodeid) && from_dir) {
811 		/* don't think this should ever happen */
812 		log_error(ls, "find_rsb inactive from_dir %d master %d",
813 			  from_nodeid, r->res_master_nodeid);
814 		dlm_print_rsb(r);
815 		/* fix it and go on */
816 		r->res_master_nodeid = our_nodeid;
817 		r->res_nodeid = 0;
818 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
819 		r->res_first_lkid = 0;
820 	}
821 
822 	if (from_local && (r->res_master_nodeid != our_nodeid)) {
823 		/* Because we have held no locks on this rsb,
824 		   res_master_nodeid could have become stale. */
825 		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
826 		r->res_first_lkid = 0;
827 	}
828 
829 	/* we always deactivate scan timer for the rsb, when
830 	 * we move it out of the inactive state as rsb state
831 	 * can be changed and scan timers are only for inactive
832 	 * rsbs.
833 	 */
834 	del_scan(ls, r);
835 	list_move(&r->res_slow_list, &ls->ls_slow_active);
836 	rsb_clear_flag(r, RSB_INACTIVE);
837 	kref_init(&r->res_ref); /* ref is now used in active state */
838 	write_unlock_bh(&ls->ls_rsbtbl_lock);
839 
840 	goto out;
841 
842 
843  do_new:
844 	/*
845 	 * rsb not found
846 	 */
847 
848 	if (error == -EBADR && !create)
849 		goto out;
850 
851 	error = get_rsb_struct(ls, name, len, &r);
852 	if (WARN_ON_ONCE(error))
853 		goto out;
854 
855 	r->res_hash = hash;
856 	r->res_dir_nodeid = dir_nodeid;
857 	kref_init(&r->res_ref);
858 
859 	if (from_dir) {
860 		/* want to see how often this happens */
861 		log_debug(ls, "find_rsb new from_dir %d recreate %s",
862 			  from_nodeid, r->res_name);
863 		r->res_master_nodeid = our_nodeid;
864 		r->res_nodeid = 0;
865 		goto out_add;
866 	}
867 
868 	if (from_other && (dir_nodeid != our_nodeid)) {
869 		/* should never happen */
870 		log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
871 			  from_nodeid, dir_nodeid, our_nodeid, r->res_name);
872 		dlm_free_rsb(r);
873 		r = NULL;
874 		error = -ENOTBLK;
875 		goto out;
876 	}
877 
878 	if (from_other) {
879 		log_debug(ls, "find_rsb new from_other %d dir %d %s",
880 			  from_nodeid, dir_nodeid, r->res_name);
881 	}
882 
883 	if (dir_nodeid == our_nodeid) {
884 		/* When we are the dir nodeid, we can set the master
885 		   node immediately */
886 		r->res_master_nodeid = our_nodeid;
887 		r->res_nodeid = 0;
888 	} else {
889 		/* set_master will send_lookup to dir_nodeid */
890 		r->res_master_nodeid = 0;
891 		r->res_nodeid = -1;
892 	}
893 
894  out_add:
895 
896 	write_lock_bh(&ls->ls_rsbtbl_lock);
897 	error = rsb_insert(r, &ls->ls_rsbtbl);
898 	if (error == -EEXIST) {
899 		/* somebody else was faster and it seems the
900 		 * rsb exists now, we do a whole relookup
901 		 */
902 		write_unlock_bh(&ls->ls_rsbtbl_lock);
903 		dlm_free_rsb(r);
904 		goto retry;
905 	} else if (!error) {
906 		list_add(&r->res_slow_list, &ls->ls_slow_active);
907 	}
908 	write_unlock_bh(&ls->ls_rsbtbl_lock);
909  out:
910 	*r_ret = r;
911 	return error;
912 }
913 
914 /* During recovery, other nodes can send us new MSTCPY locks (from
915    dlm_recover_locks) before we've made ourself master (in
916    dlm_recover_masters). */
917 
find_rsb_nodir(struct dlm_ls * ls,const void * name,int len,uint32_t hash,int dir_nodeid,int from_nodeid,unsigned int flags,struct dlm_rsb ** r_ret)918 static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
919 			  uint32_t hash, int dir_nodeid, int from_nodeid,
920 			  unsigned int flags, struct dlm_rsb **r_ret)
921 {
922 	struct dlm_rsb *r = NULL;
923 	int our_nodeid = dlm_our_nodeid();
924 	int recover = (flags & R_RECEIVE_RECOVER);
925 	int error;
926 
927  retry:
928 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
929 	if (error)
930 		goto do_new;
931 
932 	/* check if the rsb is in active state under read lock - likely path */
933 	read_lock_bh(&ls->ls_rsbtbl_lock);
934 	if (!rsb_flag(r, RSB_HASHED)) {
935 		read_unlock_bh(&ls->ls_rsbtbl_lock);
936 		goto do_new;
937 	}
938 
939 	if (rsb_flag(r, RSB_INACTIVE)) {
940 		read_unlock_bh(&ls->ls_rsbtbl_lock);
941 		goto do_inactive;
942 	}
943 
944 	/*
945 	 * rsb is active, so we can't check master_nodeid without lock_rsb.
946 	 */
947 
948 	kref_get(&r->res_ref);
949 	read_unlock_bh(&ls->ls_rsbtbl_lock);
950 
951 	goto out;
952 
953 
954  do_inactive:
955 	write_lock_bh(&ls->ls_rsbtbl_lock);
956 
957 	/* See comment in find_rsb_dir. */
958 	if (rsb_flag(r, RSB_HASHED)) {
959 		if (!rsb_flag(r, RSB_INACTIVE)) {
960 			write_unlock_bh(&ls->ls_rsbtbl_lock);
961 			goto retry;
962 		}
963 	} else {
964 		write_unlock_bh(&ls->ls_rsbtbl_lock);
965 		goto do_new;
966 	}
967 
968 
969 	/*
970 	 * rsb found inactive. No other thread is using this rsb because
971 	 * it's inactive, so we can look at or update res_master_nodeid
972 	 * without lock_rsb.
973 	 */
974 
975 	if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
976 		/* our rsb is not master, and another node has sent us a
977 		   request; this should never happen */
978 		log_error(ls, "find_rsb inactive from_nodeid %d master %d dir %d",
979 			  from_nodeid, r->res_master_nodeid, dir_nodeid);
980 		dlm_print_rsb(r);
981 		write_unlock_bh(&ls->ls_rsbtbl_lock);
982 		error = -ENOTBLK;
983 		goto out;
984 	}
985 
986 	if (!recover && (r->res_master_nodeid != our_nodeid) &&
987 	    (dir_nodeid == our_nodeid)) {
988 		/* our rsb is not master, and we are dir; may as well fix it;
989 		   this should never happen */
990 		log_error(ls, "find_rsb inactive our %d master %d dir %d",
991 			  our_nodeid, r->res_master_nodeid, dir_nodeid);
992 		dlm_print_rsb(r);
993 		r->res_master_nodeid = our_nodeid;
994 		r->res_nodeid = 0;
995 	}
996 
997 	del_scan(ls, r);
998 	list_move(&r->res_slow_list, &ls->ls_slow_active);
999 	rsb_clear_flag(r, RSB_INACTIVE);
1000 	kref_init(&r->res_ref);
1001 	write_unlock_bh(&ls->ls_rsbtbl_lock);
1002 
1003 	goto out;
1004 
1005 
1006  do_new:
1007 	/*
1008 	 * rsb not found
1009 	 */
1010 
1011 	error = get_rsb_struct(ls, name, len, &r);
1012 	if (WARN_ON_ONCE(error))
1013 		goto out;
1014 
1015 	r->res_hash = hash;
1016 	r->res_dir_nodeid = dir_nodeid;
1017 	r->res_master_nodeid = dir_nodeid;
1018 	r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
1019 	kref_init(&r->res_ref);
1020 
1021 	write_lock_bh(&ls->ls_rsbtbl_lock);
1022 	error = rsb_insert(r, &ls->ls_rsbtbl);
1023 	if (error == -EEXIST) {
1024 		/* somebody else was faster and it seems the
1025 		 * rsb exists now, we do a whole relookup
1026 		 */
1027 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1028 		dlm_free_rsb(r);
1029 		goto retry;
1030 	} else if (!error) {
1031 		list_add(&r->res_slow_list, &ls->ls_slow_active);
1032 	}
1033 	write_unlock_bh(&ls->ls_rsbtbl_lock);
1034 
1035  out:
1036 	*r_ret = r;
1037 	return error;
1038 }
1039 
1040 /*
1041  * rsb rcu usage
1042  *
1043  * While rcu read lock is held, the rsb cannot be freed,
1044  * which allows a lookup optimization.
1045  *
1046  * Two threads are accessing the same rsb concurrently,
1047  * the first (A) is trying to use the rsb, the second (B)
1048  * is trying to free the rsb.
1049  *
1050  * thread A                 thread B
1051  * (trying to use rsb)      (trying to free rsb)
1052  *
1053  * A1. rcu read lock
1054  * A2. rsbtbl read lock
1055  * A3. look up rsb in rsbtbl
1056  * A4. rsbtbl read unlock
1057  *                          B1. rsbtbl write lock
1058  *                          B2. look up rsb in rsbtbl
1059  *                          B3. remove rsb from rsbtbl
1060  *                          B4. clear rsb HASHED flag
1061  *                          B5. rsbtbl write unlock
1062  *                          B6. begin freeing rsb using rcu...
1063  *
1064  * (rsb is inactive, so try to make it active again)
1065  * A5. read rsb HASHED flag (safe because rsb is not freed yet)
1066  * A6. the rsb HASHED flag is not set, which it means the rsb
1067  *     is being removed from rsbtbl and freed, so don't use it.
1068  * A7. rcu read unlock
1069  *
1070  *                          B7. ...finish freeing rsb using rcu
1071  * A8. create a new rsb
1072  *
1073  * Without the rcu optimization, steps A5-8 would need to do
1074  * an extra rsbtbl lookup:
1075  * A5. rsbtbl write lock
1076  * A6. look up rsb in rsbtbl, not found
1077  * A7. rsbtbl write unlock
1078  * A8. create a new rsb
1079  */
1080 
find_rsb(struct dlm_ls * ls,const void * name,int len,int from_nodeid,unsigned int flags,struct dlm_rsb ** r_ret)1081 static int find_rsb(struct dlm_ls *ls, const void *name, int len,
1082 		    int from_nodeid, unsigned int flags,
1083 		    struct dlm_rsb **r_ret)
1084 {
1085 	int dir_nodeid;
1086 	uint32_t hash;
1087 	int rv;
1088 
1089 	if (len > DLM_RESNAME_MAXLEN)
1090 		return -EINVAL;
1091 
1092 	hash = jhash(name, len, 0);
1093 	dir_nodeid = dlm_hash2nodeid(ls, hash);
1094 
1095 	rcu_read_lock();
1096 	if (dlm_no_directory(ls))
1097 		rv = find_rsb_nodir(ls, name, len, hash, dir_nodeid,
1098 				      from_nodeid, flags, r_ret);
1099 	else
1100 		rv = find_rsb_dir(ls, name, len, hash, dir_nodeid,
1101 				    from_nodeid, flags, r_ret);
1102 	rcu_read_unlock();
1103 	return rv;
1104 }
1105 
1106 /* we have received a request and found that res_master_nodeid != our_nodeid,
1107    so we need to return an error or make ourself the master */
1108 
validate_master_nodeid(struct dlm_ls * ls,struct dlm_rsb * r,int from_nodeid)1109 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
1110 				  int from_nodeid)
1111 {
1112 	if (dlm_no_directory(ls)) {
1113 		log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
1114 			  from_nodeid, r->res_master_nodeid,
1115 			  r->res_dir_nodeid);
1116 		dlm_print_rsb(r);
1117 		return -ENOTBLK;
1118 	}
1119 
1120 	if (from_nodeid != r->res_dir_nodeid) {
1121 		/* our rsb is not master, and another node (not the dir node)
1122 	   	   has sent us a request.  this is much more common when our
1123 	   	   master_nodeid is zero, so limit debug to non-zero.  */
1124 
1125 		if (r->res_master_nodeid) {
1126 			log_debug(ls, "validate master from_other %d master %d "
1127 				  "dir %d first %x %s", from_nodeid,
1128 				  r->res_master_nodeid, r->res_dir_nodeid,
1129 				  r->res_first_lkid, r->res_name);
1130 		}
1131 		return -ENOTBLK;
1132 	} else {
1133 		/* our rsb is not master, but the dir nodeid has sent us a
1134 	   	   request; this could happen with master 0 / res_nodeid -1 */
1135 
1136 		if (r->res_master_nodeid) {
1137 			log_error(ls, "validate master from_dir %d master %d "
1138 				  "first %x %s",
1139 				  from_nodeid, r->res_master_nodeid,
1140 				  r->res_first_lkid, r->res_name);
1141 		}
1142 
1143 		r->res_master_nodeid = dlm_our_nodeid();
1144 		r->res_nodeid = 0;
1145 		return 0;
1146 	}
1147 }
1148 
__dlm_master_lookup(struct dlm_ls * ls,struct dlm_rsb * r,int our_nodeid,int from_nodeid,bool is_inactive,unsigned int flags,int * r_nodeid,int * result)1149 static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid,
1150 				int from_nodeid, bool is_inactive, unsigned int flags,
1151 				int *r_nodeid, int *result)
1152 {
1153 	int fix_master = (flags & DLM_LU_RECOVER_MASTER);
1154 	int from_master = (flags & DLM_LU_RECOVER_DIR);
1155 
1156 	if (r->res_dir_nodeid != our_nodeid) {
1157 		/* should not happen, but may as well fix it and carry on */
1158 		log_error(ls, "%s res_dir %d our %d %s", __func__,
1159 			  r->res_dir_nodeid, our_nodeid, r->res_name);
1160 		r->res_dir_nodeid = our_nodeid;
1161 	}
1162 
1163 	if (fix_master && r->res_master_nodeid && dlm_is_removed(ls, r->res_master_nodeid)) {
1164 		/* Recovery uses this function to set a new master when
1165 		 * the previous master failed.  Setting NEW_MASTER will
1166 		 * force dlm_recover_masters to call recover_master on this
1167 		 * rsb even though the res_nodeid is no longer removed.
1168 		 */
1169 
1170 		r->res_master_nodeid = from_nodeid;
1171 		r->res_nodeid = from_nodeid;
1172 		rsb_set_flag(r, RSB_NEW_MASTER);
1173 
1174 		if (is_inactive) {
1175 			/* I don't think we should ever find it inactive. */
1176 			log_error(ls, "%s fix_master inactive", __func__);
1177 			dlm_dump_rsb(r);
1178 		}
1179 	}
1180 
1181 	if (from_master && (r->res_master_nodeid != from_nodeid)) {
1182 		/* this will happen if from_nodeid became master during
1183 		 * a previous recovery cycle, and we aborted the previous
1184 		 * cycle before recovering this master value
1185 		 */
1186 
1187 		log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s",
1188 			  __func__, from_nodeid, r->res_master_nodeid,
1189 			  r->res_nodeid, r->res_first_lkid, r->res_name);
1190 
1191 		if (r->res_master_nodeid == our_nodeid) {
1192 			log_error(ls, "from_master %d our_master", from_nodeid);
1193 			dlm_dump_rsb(r);
1194 			goto ret_assign;
1195 		}
1196 
1197 		r->res_master_nodeid = from_nodeid;
1198 		r->res_nodeid = from_nodeid;
1199 		rsb_set_flag(r, RSB_NEW_MASTER);
1200 	}
1201 
1202 	if (!r->res_master_nodeid) {
1203 		/* this will happen if recovery happens while we're looking
1204 		 * up the master for this rsb
1205 		 */
1206 
1207 		log_debug(ls, "%s master 0 to %d first %x %s", __func__,
1208 			  from_nodeid, r->res_first_lkid, r->res_name);
1209 		r->res_master_nodeid = from_nodeid;
1210 		r->res_nodeid = from_nodeid;
1211 	}
1212 
1213 	if (!from_master && !fix_master &&
1214 	    (r->res_master_nodeid == from_nodeid)) {
1215 		/* this can happen when the master sends remove, the dir node
1216 		 * finds the rsb on the active list and ignores the remove,
1217 		 * and the former master sends a lookup
1218 		 */
1219 
1220 		log_limit(ls, "%s from master %d flags %x first %x %s",
1221 			  __func__, from_nodeid, flags, r->res_first_lkid,
1222 			  r->res_name);
1223 	}
1224 
1225  ret_assign:
1226 	*r_nodeid = r->res_master_nodeid;
1227 	if (result)
1228 		*result = DLM_LU_MATCH;
1229 }
1230 
1231 /*
1232  * We're the dir node for this res and another node wants to know the
1233  * master nodeid.  During normal operation (non recovery) this is only
1234  * called from receive_lookup(); master lookups when the local node is
1235  * the dir node are done by find_rsb().
1236  *
1237  * normal operation, we are the dir node for a resource
1238  * . _request_lock
1239  * . set_master
1240  * . send_lookup
1241  * . receive_lookup
1242  * . dlm_master_lookup flags 0
1243  *
1244  * recover directory, we are rebuilding dir for all resources
1245  * . dlm_recover_directory
1246  * . dlm_rcom_names
1247  *   remote node sends back the rsb names it is master of and we are dir of
1248  * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
1249  *   we either create new rsb setting remote node as master, or find existing
1250  *   rsb and set master to be the remote node.
1251  *
1252  * recover masters, we are finding the new master for resources
1253  * . dlm_recover_masters
1254  * . recover_master
1255  * . dlm_send_rcom_lookup
1256  * . receive_rcom_lookup
1257  * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
1258  */
1259 
_dlm_master_lookup(struct dlm_ls * ls,int from_nodeid,const char * name,int len,unsigned int flags,int * r_nodeid,int * result)1260 static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
1261 			      int len, unsigned int flags, int *r_nodeid, int *result)
1262 {
1263 	struct dlm_rsb *r = NULL;
1264 	uint32_t hash;
1265 	int our_nodeid = dlm_our_nodeid();
1266 	int dir_nodeid, error;
1267 
1268 	if (len > DLM_RESNAME_MAXLEN)
1269 		return -EINVAL;
1270 
1271 	if (from_nodeid == our_nodeid) {
1272 		log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
1273 			  our_nodeid, flags);
1274 		return -EINVAL;
1275 	}
1276 
1277 	hash = jhash(name, len, 0);
1278 	dir_nodeid = dlm_hash2nodeid(ls, hash);
1279 	if (dir_nodeid != our_nodeid) {
1280 		log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
1281 			  from_nodeid, dir_nodeid, our_nodeid, hash,
1282 			  ls->ls_num_nodes);
1283 		*r_nodeid = -1;
1284 		return -EINVAL;
1285 	}
1286 
1287  retry:
1288 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1289 	if (error)
1290 		goto not_found;
1291 
1292 	/* check if the rsb is active under read lock - likely path */
1293 	read_lock_bh(&ls->ls_rsbtbl_lock);
1294 	if (!rsb_flag(r, RSB_HASHED)) {
1295 		read_unlock_bh(&ls->ls_rsbtbl_lock);
1296 		goto not_found;
1297 	}
1298 
1299 	if (rsb_flag(r, RSB_INACTIVE)) {
1300 		read_unlock_bh(&ls->ls_rsbtbl_lock);
1301 		goto do_inactive;
1302 	}
1303 
1304 	/* because the rsb is active, we need to lock_rsb before
1305 	 * checking/changing re_master_nodeid
1306 	 */
1307 
1308 	hold_rsb(r);
1309 	read_unlock_bh(&ls->ls_rsbtbl_lock);
1310 	lock_rsb(r);
1311 
1312 	__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
1313 			    flags, r_nodeid, result);
1314 
1315 	/* the rsb was active */
1316 	unlock_rsb(r);
1317 	put_rsb(r);
1318 
1319 	return 0;
1320 
1321  do_inactive:
1322 	/* unlikely path - check if still part of ls_rsbtbl */
1323 	write_lock_bh(&ls->ls_rsbtbl_lock);
1324 
1325 	/* see comment in find_rsb_dir */
1326 	if (rsb_flag(r, RSB_HASHED)) {
1327 		if (!rsb_flag(r, RSB_INACTIVE)) {
1328 			write_unlock_bh(&ls->ls_rsbtbl_lock);
1329 			/* something as changed, very unlikely but
1330 			 * try again
1331 			 */
1332 			goto retry;
1333 		}
1334 	} else {
1335 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1336 		goto not_found;
1337 	}
1338 
1339 	/* because the rsb is inactive, it's not refcounted and lock_rsb
1340 	   is not used, but is protected by the rsbtbl lock */
1341 
1342 	__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
1343 			    r_nodeid, result);
1344 
1345 	/* A dir record rsb should never be on scan list.
1346 	 * Except when we are the dir and master node.
1347 	 * This function should only be called by the dir
1348 	 * node.
1349 	 */
1350 	WARN_ON(!list_empty(&r->res_scan_list) &&
1351 		r->res_master_nodeid != our_nodeid);
1352 
1353 	write_unlock_bh(&ls->ls_rsbtbl_lock);
1354 
1355 	return 0;
1356 
1357  not_found:
1358 	error = get_rsb_struct(ls, name, len, &r);
1359 	if (WARN_ON_ONCE(error))
1360 		goto out;
1361 
1362 	r->res_hash = hash;
1363 	r->res_dir_nodeid = our_nodeid;
1364 	r->res_master_nodeid = from_nodeid;
1365 	r->res_nodeid = from_nodeid;
1366 	rsb_set_flag(r, RSB_INACTIVE);
1367 
1368 	write_lock_bh(&ls->ls_rsbtbl_lock);
1369 	error = rsb_insert(r, &ls->ls_rsbtbl);
1370 	if (error == -EEXIST) {
1371 		/* somebody else was faster and it seems the
1372 		 * rsb exists now, we do a whole relookup
1373 		 */
1374 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1375 		dlm_free_rsb(r);
1376 		goto retry;
1377 	} else if (error) {
1378 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1379 		/* should never happen */
1380 		dlm_free_rsb(r);
1381 		goto retry;
1382 	}
1383 
1384 	list_add(&r->res_slow_list, &ls->ls_slow_inactive);
1385 	write_unlock_bh(&ls->ls_rsbtbl_lock);
1386 
1387 	if (result)
1388 		*result = DLM_LU_ADD;
1389 	*r_nodeid = from_nodeid;
1390  out:
1391 	return error;
1392 }
1393 
dlm_master_lookup(struct dlm_ls * ls,int from_nodeid,const char * name,int len,unsigned int flags,int * r_nodeid,int * result)1394 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
1395 		      int len, unsigned int flags, int *r_nodeid, int *result)
1396 {
1397 	int rv;
1398 	rcu_read_lock();
1399 	rv = _dlm_master_lookup(ls, from_nodeid, name, len, flags, r_nodeid, result);
1400 	rcu_read_unlock();
1401 	return rv;
1402 }
1403 
dlm_dump_rsb_hash(struct dlm_ls * ls,uint32_t hash)1404 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1405 {
1406 	struct dlm_rsb *r;
1407 
1408 	read_lock_bh(&ls->ls_rsbtbl_lock);
1409 	list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
1410 		if (r->res_hash == hash)
1411 			dlm_dump_rsb(r);
1412 	}
1413 	read_unlock_bh(&ls->ls_rsbtbl_lock);
1414 }
1415 
dlm_dump_rsb_name(struct dlm_ls * ls,const char * name,int len)1416 void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
1417 {
1418 	struct dlm_rsb *r = NULL;
1419 	int error;
1420 
1421 	rcu_read_lock();
1422 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1423 	if (!error)
1424 		goto out;
1425 
1426 	dlm_dump_rsb(r);
1427  out:
1428 	rcu_read_unlock();
1429 }
1430 
deactivate_rsb(struct kref * kref)1431 static void deactivate_rsb(struct kref *kref)
1432 {
1433 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1434 	struct dlm_ls *ls = r->res_ls;
1435 	int our_nodeid = dlm_our_nodeid();
1436 
1437 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1438 	rsb_set_flag(r, RSB_INACTIVE);
1439 	list_move(&r->res_slow_list, &ls->ls_slow_inactive);
1440 
1441 	/*
1442 	 * When the rsb becomes unused, there are two possibilities:
1443 	 * 1. Leave the inactive rsb in place (don't remove it).
1444 	 * 2. Add it to the scan list to be removed.
1445 	 *
1446 	 * 1 is done when the rsb is acting as the dir record
1447 	 * for a remotely mastered rsb.  The rsb must be left
1448 	 * in place as an inactive rsb to act as the dir record.
1449 	 *
1450 	 * 2 is done when a) the rsb is not the master and not the
1451 	 * dir record, b) when the rsb is both the master and the
1452 	 * dir record, c) when the rsb is master but not dir record.
1453 	 *
1454 	 * (If no directory is used, the rsb can always be removed.)
1455 	 */
1456 	if (dlm_no_directory(ls) ||
1457 	    (r->res_master_nodeid == our_nodeid ||
1458 	     dlm_dir_nodeid(r) != our_nodeid))
1459 		add_scan(ls, r);
1460 
1461 	if (r->res_lvbptr) {
1462 		dlm_free_lvb(r->res_lvbptr);
1463 		r->res_lvbptr = NULL;
1464 	}
1465 }
1466 
free_inactive_rsb(struct dlm_rsb * r)1467 void free_inactive_rsb(struct dlm_rsb *r)
1468 {
1469 	WARN_ON_ONCE(!rsb_flag(r, RSB_INACTIVE));
1470 
1471 	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1472 	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1473 	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1474 	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1475 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1476 	DLM_ASSERT(list_empty(&r->res_scan_list), dlm_dump_rsb(r););
1477 	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1478 	DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r););
1479 
1480 	dlm_free_rsb(r);
1481 }
1482 
1483 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1484    The rsb must exist as long as any lkb's for it do. */
1485 
attach_lkb(struct dlm_rsb * r,struct dlm_lkb * lkb)1486 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1487 {
1488 	hold_rsb(r);
1489 	lkb->lkb_resource = r;
1490 }
1491 
detach_lkb(struct dlm_lkb * lkb)1492 static void detach_lkb(struct dlm_lkb *lkb)
1493 {
1494 	if (lkb->lkb_resource) {
1495 		put_rsb(lkb->lkb_resource);
1496 		lkb->lkb_resource = NULL;
1497 	}
1498 }
1499 
_create_lkb(struct dlm_ls * ls,struct dlm_lkb ** lkb_ret,unsigned long start,unsigned long end)1500 static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
1501 		       unsigned long start, unsigned long end)
1502 {
1503 	struct xa_limit limit;
1504 	struct dlm_lkb *lkb;
1505 	int rv;
1506 
1507 	limit.max = end;
1508 	limit.min = start;
1509 
1510 	lkb = dlm_allocate_lkb();
1511 	if (!lkb)
1512 		return -ENOMEM;
1513 
1514 	lkb->lkb_last_bast_cb_mode = DLM_LOCK_IV;
1515 	lkb->lkb_last_cast_cb_mode = DLM_LOCK_IV;
1516 	lkb->lkb_last_cb_mode = DLM_LOCK_IV;
1517 	lkb->lkb_nodeid = -1;
1518 	lkb->lkb_grmode = DLM_LOCK_IV;
1519 	kref_init(&lkb->lkb_ref);
1520 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1521 	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1522 
1523 	write_lock_bh(&ls->ls_lkbxa_lock);
1524 	rv = xa_alloc(&ls->ls_lkbxa, &lkb->lkb_id, lkb, limit, GFP_ATOMIC);
1525 	write_unlock_bh(&ls->ls_lkbxa_lock);
1526 
1527 	if (rv < 0) {
1528 		log_error(ls, "create_lkb xa error %d", rv);
1529 		dlm_free_lkb(lkb);
1530 		return rv;
1531 	}
1532 
1533 	*lkb_ret = lkb;
1534 	return 0;
1535 }
1536 
create_lkb(struct dlm_ls * ls,struct dlm_lkb ** lkb_ret)1537 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1538 {
1539 	return _create_lkb(ls, lkb_ret, 1, ULONG_MAX);
1540 }
1541 
find_lkb(struct dlm_ls * ls,uint32_t lkid,struct dlm_lkb ** lkb_ret)1542 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1543 {
1544 	struct dlm_lkb *lkb;
1545 
1546 	rcu_read_lock();
1547 	lkb = xa_load(&ls->ls_lkbxa, lkid);
1548 	if (lkb) {
1549 		/* check if lkb is still part of lkbxa under lkbxa_lock as
1550 		 * the lkb_ref is tight to the lkbxa data structure, see
1551 		 * __put_lkb().
1552 		 */
1553 		read_lock_bh(&ls->ls_lkbxa_lock);
1554 		if (kref_read(&lkb->lkb_ref))
1555 			kref_get(&lkb->lkb_ref);
1556 		else
1557 			lkb = NULL;
1558 		read_unlock_bh(&ls->ls_lkbxa_lock);
1559 	}
1560 	rcu_read_unlock();
1561 
1562 	*lkb_ret = lkb;
1563 	return lkb ? 0 : -ENOENT;
1564 }
1565 
kill_lkb(struct kref * kref)1566 static void kill_lkb(struct kref *kref)
1567 {
1568 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1569 
1570 	/* All work is done after the return from kref_put() so we
1571 	   can release the write_lock before the detach_lkb */
1572 
1573 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1574 }
1575 
1576 /* __put_lkb() is used when an lkb may not have an rsb attached to
1577    it so we need to provide the lockspace explicitly */
1578 
__put_lkb(struct dlm_ls * ls,struct dlm_lkb * lkb)1579 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1580 {
1581 	uint32_t lkid = lkb->lkb_id;
1582 	int rv;
1583 
1584 	rv = dlm_kref_put_write_lock_bh(&lkb->lkb_ref, kill_lkb,
1585 					&ls->ls_lkbxa_lock);
1586 	if (rv) {
1587 		xa_erase(&ls->ls_lkbxa, lkid);
1588 		write_unlock_bh(&ls->ls_lkbxa_lock);
1589 
1590 		detach_lkb(lkb);
1591 
1592 		/* for local/process lkbs, lvbptr points to caller's lksb */
1593 		if (lkb->lkb_lvbptr && is_master_copy(lkb))
1594 			dlm_free_lvb(lkb->lkb_lvbptr);
1595 		dlm_free_lkb(lkb);
1596 	}
1597 
1598 	return rv;
1599 }
1600 
dlm_put_lkb(struct dlm_lkb * lkb)1601 int dlm_put_lkb(struct dlm_lkb *lkb)
1602 {
1603 	struct dlm_ls *ls;
1604 
1605 	DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1606 	DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1607 
1608 	ls = lkb->lkb_resource->res_ls;
1609 	return __put_lkb(ls, lkb);
1610 }
1611 
1612 /* This is only called to add a reference when the code already holds
1613    a valid reference to the lkb, so there's no need for locking. */
1614 
hold_lkb(struct dlm_lkb * lkb)1615 static inline void hold_lkb(struct dlm_lkb *lkb)
1616 {
1617 	kref_get(&lkb->lkb_ref);
1618 }
1619 
unhold_lkb_assert(struct kref * kref)1620 static void unhold_lkb_assert(struct kref *kref)
1621 {
1622 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1623 
1624 	DLM_ASSERT(false, dlm_print_lkb(lkb););
1625 }
1626 
1627 /* This is called when we need to remove a reference and are certain
1628    it's not the last ref.  e.g. del_lkb is always called between a
1629    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1630    put_lkb would work fine, but would involve unnecessary locking */
1631 
unhold_lkb(struct dlm_lkb * lkb)1632 static inline void unhold_lkb(struct dlm_lkb *lkb)
1633 {
1634 	kref_put(&lkb->lkb_ref, unhold_lkb_assert);
1635 }
1636 
lkb_add_ordered(struct list_head * new,struct list_head * head,int mode)1637 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1638 			    int mode)
1639 {
1640 	struct dlm_lkb *lkb = NULL, *iter;
1641 
1642 	list_for_each_entry(iter, head, lkb_statequeue)
1643 		if (iter->lkb_rqmode < mode) {
1644 			lkb = iter;
1645 			list_add_tail(new, &iter->lkb_statequeue);
1646 			break;
1647 		}
1648 
1649 	if (!lkb)
1650 		list_add_tail(new, head);
1651 }
1652 
1653 /* add/remove lkb to rsb's grant/convert/wait queue */
1654 
add_lkb(struct dlm_rsb * r,struct dlm_lkb * lkb,int status)1655 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1656 {
1657 	kref_get(&lkb->lkb_ref);
1658 
1659 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1660 
1661 	lkb->lkb_timestamp = ktime_get();
1662 
1663 	lkb->lkb_status = status;
1664 
1665 	switch (status) {
1666 	case DLM_LKSTS_WAITING:
1667 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1668 			list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1669 		else
1670 			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1671 		break;
1672 	case DLM_LKSTS_GRANTED:
1673 		/* convention says granted locks kept in order of grmode */
1674 		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1675 				lkb->lkb_grmode);
1676 		break;
1677 	case DLM_LKSTS_CONVERT:
1678 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1679 			list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1680 		else
1681 			list_add_tail(&lkb->lkb_statequeue,
1682 				      &r->res_convertqueue);
1683 		break;
1684 	default:
1685 		DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1686 	}
1687 }
1688 
del_lkb(struct dlm_rsb * r,struct dlm_lkb * lkb)1689 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1690 {
1691 	lkb->lkb_status = 0;
1692 	list_del(&lkb->lkb_statequeue);
1693 	unhold_lkb(lkb);
1694 }
1695 
move_lkb(struct dlm_rsb * r,struct dlm_lkb * lkb,int sts)1696 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1697 {
1698 	del_lkb(r, lkb);
1699 	add_lkb(r, lkb, sts);
1700 }
1701 
msg_reply_type(int mstype)1702 static int msg_reply_type(int mstype)
1703 {
1704 	switch (mstype) {
1705 	case DLM_MSG_REQUEST:
1706 		return DLM_MSG_REQUEST_REPLY;
1707 	case DLM_MSG_CONVERT:
1708 		return DLM_MSG_CONVERT_REPLY;
1709 	case DLM_MSG_UNLOCK:
1710 		return DLM_MSG_UNLOCK_REPLY;
1711 	case DLM_MSG_CANCEL:
1712 		return DLM_MSG_CANCEL_REPLY;
1713 	case DLM_MSG_LOOKUP:
1714 		return DLM_MSG_LOOKUP_REPLY;
1715 	}
1716 	return -1;
1717 }
1718 
1719 /* add/remove lkb from global waiters list of lkb's waiting for
1720    a reply from a remote node */
1721 
add_to_waiters(struct dlm_lkb * lkb,int mstype,int to_nodeid)1722 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1723 {
1724 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1725 	int error = 0;
1726 
1727 	spin_lock_bh(&ls->ls_waiters_lock);
1728 
1729 	if (is_overlap_unlock(lkb) ||
1730 	    (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1731 		error = -EINVAL;
1732 		goto out;
1733 	}
1734 
1735 	if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1736 		switch (mstype) {
1737 		case DLM_MSG_UNLOCK:
1738 			set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
1739 			break;
1740 		case DLM_MSG_CANCEL:
1741 			set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
1742 			break;
1743 		default:
1744 			error = -EBUSY;
1745 			goto out;
1746 		}
1747 		lkb->lkb_wait_count++;
1748 		hold_lkb(lkb);
1749 
1750 		log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1751 			  lkb->lkb_id, lkb->lkb_wait_type, mstype,
1752 			  lkb->lkb_wait_count, dlm_iflags_val(lkb));
1753 		goto out;
1754 	}
1755 
1756 	DLM_ASSERT(!lkb->lkb_wait_count,
1757 		   dlm_print_lkb(lkb);
1758 		   printk("wait_count %d\n", lkb->lkb_wait_count););
1759 
1760 	lkb->lkb_wait_count++;
1761 	lkb->lkb_wait_type = mstype;
1762 	lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1763 	hold_lkb(lkb);
1764 	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1765  out:
1766 	if (error)
1767 		log_error(ls, "addwait error %x %d flags %x %d %d %s",
1768 			  lkb->lkb_id, error, dlm_iflags_val(lkb), mstype,
1769 			  lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1770 	spin_unlock_bh(&ls->ls_waiters_lock);
1771 	return error;
1772 }
1773 
1774 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1775    list as part of process_requestqueue (e.g. a lookup that has an optimized
1776    request reply on the requestqueue) between dlm_recover_waiters_pre() which
1777    set RESEND and dlm_recover_waiters_post() */
1778 
_remove_from_waiters(struct dlm_lkb * lkb,int mstype,const struct dlm_message * ms)1779 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1780 				const struct dlm_message *ms)
1781 {
1782 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1783 	int overlap_done = 0;
1784 
1785 	if (mstype == DLM_MSG_UNLOCK_REPLY &&
1786 	    test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
1787 		log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1788 		overlap_done = 1;
1789 		goto out_del;
1790 	}
1791 
1792 	if (mstype == DLM_MSG_CANCEL_REPLY &&
1793 	    test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1794 		log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1795 		overlap_done = 1;
1796 		goto out_del;
1797 	}
1798 
1799 	/* Cancel state was preemptively cleared by a successful convert,
1800 	   see next comment, nothing to do. */
1801 
1802 	if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1803 	    (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1804 		log_debug(ls, "remwait %x cancel_reply wait_type %d",
1805 			  lkb->lkb_id, lkb->lkb_wait_type);
1806 		return -1;
1807 	}
1808 
1809 	/* Remove for the convert reply, and premptively remove for the
1810 	   cancel reply.  A convert has been granted while there's still
1811 	   an outstanding cancel on it (the cancel is moot and the result
1812 	   in the cancel reply should be 0).  We preempt the cancel reply
1813 	   because the app gets the convert result and then can follow up
1814 	   with another op, like convert.  This subsequent op would see the
1815 	   lingering state of the cancel and fail with -EBUSY. */
1816 
1817 	if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1818 	    (lkb->lkb_wait_type == DLM_MSG_CONVERT) && ms && !ms->m_result &&
1819 	    test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1820 		log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1821 			  lkb->lkb_id);
1822 		lkb->lkb_wait_type = 0;
1823 		lkb->lkb_wait_count--;
1824 		unhold_lkb(lkb);
1825 		goto out_del;
1826 	}
1827 
1828 	/* N.B. type of reply may not always correspond to type of original
1829 	   msg due to lookup->request optimization, verify others? */
1830 
1831 	if (lkb->lkb_wait_type) {
1832 		lkb->lkb_wait_type = 0;
1833 		goto out_del;
1834 	}
1835 
1836 	log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1837 		  lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0,
1838 		  lkb->lkb_remid, mstype, dlm_iflags_val(lkb));
1839 	return -1;
1840 
1841  out_del:
1842 	/* the force-unlock/cancel has completed and we haven't recvd a reply
1843 	   to the op that was in progress prior to the unlock/cancel; we
1844 	   give up on any reply to the earlier op.  FIXME: not sure when/how
1845 	   this would happen */
1846 
1847 	if (overlap_done && lkb->lkb_wait_type) {
1848 		log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1849 			  lkb->lkb_id, mstype, lkb->lkb_wait_type);
1850 		lkb->lkb_wait_count--;
1851 		unhold_lkb(lkb);
1852 		lkb->lkb_wait_type = 0;
1853 	}
1854 
1855 	DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1856 
1857 	clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
1858 	lkb->lkb_wait_count--;
1859 	if (!lkb->lkb_wait_count)
1860 		list_del_init(&lkb->lkb_wait_reply);
1861 	unhold_lkb(lkb);
1862 	return 0;
1863 }
1864 
remove_from_waiters(struct dlm_lkb * lkb,int mstype)1865 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1866 {
1867 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1868 	int error;
1869 
1870 	spin_lock_bh(&ls->ls_waiters_lock);
1871 	error = _remove_from_waiters(lkb, mstype, NULL);
1872 	spin_unlock_bh(&ls->ls_waiters_lock);
1873 	return error;
1874 }
1875 
1876 /* Handles situations where we might be processing a "fake" or "local" reply in
1877  * the recovery context which stops any locking activity. Only debugfs might
1878  * change the lockspace waiters but they will held the recovery lock to ensure
1879  * remove_from_waiters_ms() in local case will be the only user manipulating the
1880  * lockspace waiters in recovery context.
1881  */
1882 
remove_from_waiters_ms(struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)1883 static int remove_from_waiters_ms(struct dlm_lkb *lkb,
1884 				  const struct dlm_message *ms, bool local)
1885 {
1886 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1887 	int error;
1888 
1889 	if (!local)
1890 		spin_lock_bh(&ls->ls_waiters_lock);
1891 	else
1892 		WARN_ON_ONCE(!rwsem_is_locked(&ls->ls_in_recovery) ||
1893 			     !dlm_locking_stopped(ls));
1894 	error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
1895 	if (!local)
1896 		spin_unlock_bh(&ls->ls_waiters_lock);
1897 	return error;
1898 }
1899 
1900 /* lkb is master or local copy */
1901 
set_lvb_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)1902 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1903 {
1904 	int b, len = r->res_ls->ls_lvblen;
1905 
1906 	/* b=1 lvb returned to caller
1907 	   b=0 lvb written to rsb or invalidated
1908 	   b=-1 do nothing */
1909 
1910 	b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1911 
1912 	if (b == 1) {
1913 		if (!lkb->lkb_lvbptr)
1914 			return;
1915 
1916 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1917 			return;
1918 
1919 		if (!r->res_lvbptr)
1920 			return;
1921 
1922 		memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1923 		lkb->lkb_lvbseq = r->res_lvbseq;
1924 
1925 	} else if (b == 0) {
1926 		if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1927 			rsb_set_flag(r, RSB_VALNOTVALID);
1928 			return;
1929 		}
1930 
1931 		if (!lkb->lkb_lvbptr)
1932 			return;
1933 
1934 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1935 			return;
1936 
1937 		if (!r->res_lvbptr)
1938 			r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1939 
1940 		if (!r->res_lvbptr)
1941 			return;
1942 
1943 		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1944 		r->res_lvbseq++;
1945 		lkb->lkb_lvbseq = r->res_lvbseq;
1946 		rsb_clear_flag(r, RSB_VALNOTVALID);
1947 	}
1948 
1949 	if (rsb_flag(r, RSB_VALNOTVALID))
1950 		set_bit(DLM_SBF_VALNOTVALID_BIT, &lkb->lkb_sbflags);
1951 }
1952 
set_lvb_unlock(struct dlm_rsb * r,struct dlm_lkb * lkb)1953 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1954 {
1955 	if (lkb->lkb_grmode < DLM_LOCK_PW)
1956 		return;
1957 
1958 	if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1959 		rsb_set_flag(r, RSB_VALNOTVALID);
1960 		return;
1961 	}
1962 
1963 	if (!lkb->lkb_lvbptr)
1964 		return;
1965 
1966 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1967 		return;
1968 
1969 	if (!r->res_lvbptr)
1970 		r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1971 
1972 	if (!r->res_lvbptr)
1973 		return;
1974 
1975 	memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1976 	r->res_lvbseq++;
1977 	rsb_clear_flag(r, RSB_VALNOTVALID);
1978 }
1979 
1980 /* lkb is process copy (pc) */
1981 
set_lvb_lock_pc(struct dlm_rsb * r,struct dlm_lkb * lkb,const struct dlm_message * ms)1982 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1983 			    const struct dlm_message *ms)
1984 {
1985 	int b;
1986 
1987 	if (!lkb->lkb_lvbptr)
1988 		return;
1989 
1990 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1991 		return;
1992 
1993 	b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1994 	if (b == 1) {
1995 		int len = receive_extralen(ms);
1996 		if (len > r->res_ls->ls_lvblen)
1997 			len = r->res_ls->ls_lvblen;
1998 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1999 		lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
2000 	}
2001 }
2002 
2003 /* Manipulate lkb's on rsb's convert/granted/waiting queues
2004    remove_lock -- used for unlock, removes lkb from granted
2005    revert_lock -- used for cancel, moves lkb from convert to granted
2006    grant_lock  -- used for request and convert, adds lkb to granted or
2007                   moves lkb from convert or waiting to granted
2008 
2009    Each of these is used for master or local copy lkb's.  There is
2010    also a _pc() variation used to make the corresponding change on
2011    a process copy (pc) lkb. */
2012 
_remove_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2013 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2014 {
2015 	del_lkb(r, lkb);
2016 	lkb->lkb_grmode = DLM_LOCK_IV;
2017 	/* this unhold undoes the original ref from create_lkb()
2018 	   so this leads to the lkb being freed */
2019 	unhold_lkb(lkb);
2020 }
2021 
remove_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2022 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2023 {
2024 	set_lvb_unlock(r, lkb);
2025 	_remove_lock(r, lkb);
2026 }
2027 
remove_lock_pc(struct dlm_rsb * r,struct dlm_lkb * lkb)2028 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2029 {
2030 	_remove_lock(r, lkb);
2031 }
2032 
2033 /* returns: 0 did nothing
2034 	    1 moved lock to granted
2035 	   -1 removed lock */
2036 
revert_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2037 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2038 {
2039 	int rv = 0;
2040 
2041 	lkb->lkb_rqmode = DLM_LOCK_IV;
2042 
2043 	switch (lkb->lkb_status) {
2044 	case DLM_LKSTS_GRANTED:
2045 		break;
2046 	case DLM_LKSTS_CONVERT:
2047 		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2048 		rv = 1;
2049 		break;
2050 	case DLM_LKSTS_WAITING:
2051 		del_lkb(r, lkb);
2052 		lkb->lkb_grmode = DLM_LOCK_IV;
2053 		/* this unhold undoes the original ref from create_lkb()
2054 		   so this leads to the lkb being freed */
2055 		unhold_lkb(lkb);
2056 		rv = -1;
2057 		break;
2058 	default:
2059 		log_print("invalid status for revert %d", lkb->lkb_status);
2060 	}
2061 	return rv;
2062 }
2063 
revert_lock_pc(struct dlm_rsb * r,struct dlm_lkb * lkb)2064 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2065 {
2066 	return revert_lock(r, lkb);
2067 }
2068 
_grant_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2069 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2070 {
2071 	if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2072 		lkb->lkb_grmode = lkb->lkb_rqmode;
2073 		if (lkb->lkb_status)
2074 			move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2075 		else
2076 			add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2077 	}
2078 
2079 	lkb->lkb_rqmode = DLM_LOCK_IV;
2080 	lkb->lkb_highbast = 0;
2081 }
2082 
grant_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2083 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2084 {
2085 	set_lvb_lock(r, lkb);
2086 	_grant_lock(r, lkb);
2087 }
2088 
grant_lock_pc(struct dlm_rsb * r,struct dlm_lkb * lkb,const struct dlm_message * ms)2089 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2090 			  const struct dlm_message *ms)
2091 {
2092 	set_lvb_lock_pc(r, lkb, ms);
2093 	_grant_lock(r, lkb);
2094 }
2095 
2096 /* called by grant_pending_locks() which means an async grant message must
2097    be sent to the requesting node in addition to granting the lock if the
2098    lkb belongs to a remote node. */
2099 
grant_lock_pending(struct dlm_rsb * r,struct dlm_lkb * lkb)2100 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2101 {
2102 	grant_lock(r, lkb);
2103 	if (is_master_copy(lkb))
2104 		send_grant(r, lkb);
2105 	else
2106 		queue_cast(r, lkb, 0);
2107 }
2108 
2109 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2110    change the granted/requested modes.  We're munging things accordingly in
2111    the process copy.
2112    CONVDEADLK: our grmode may have been forced down to NL to resolve a
2113    conversion deadlock
2114    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2115    compatible with other granted locks */
2116 
munge_demoted(struct dlm_lkb * lkb)2117 static void munge_demoted(struct dlm_lkb *lkb)
2118 {
2119 	if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2120 		log_print("munge_demoted %x invalid modes gr %d rq %d",
2121 			  lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2122 		return;
2123 	}
2124 
2125 	lkb->lkb_grmode = DLM_LOCK_NL;
2126 }
2127 
munge_altmode(struct dlm_lkb * lkb,const struct dlm_message * ms)2128 static void munge_altmode(struct dlm_lkb *lkb, const struct dlm_message *ms)
2129 {
2130 	if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) &&
2131 	    ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) {
2132 		log_print("munge_altmode %x invalid reply type %d",
2133 			  lkb->lkb_id, le32_to_cpu(ms->m_type));
2134 		return;
2135 	}
2136 
2137 	if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2138 		lkb->lkb_rqmode = DLM_LOCK_PR;
2139 	else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2140 		lkb->lkb_rqmode = DLM_LOCK_CW;
2141 	else {
2142 		log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2143 		dlm_print_lkb(lkb);
2144 	}
2145 }
2146 
first_in_list(struct dlm_lkb * lkb,struct list_head * head)2147 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2148 {
2149 	struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2150 					   lkb_statequeue);
2151 	if (lkb->lkb_id == first->lkb_id)
2152 		return 1;
2153 
2154 	return 0;
2155 }
2156 
2157 /* Check if the given lkb conflicts with another lkb on the queue. */
2158 
queue_conflict(struct list_head * head,struct dlm_lkb * lkb)2159 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2160 {
2161 	struct dlm_lkb *this;
2162 
2163 	list_for_each_entry(this, head, lkb_statequeue) {
2164 		if (this == lkb)
2165 			continue;
2166 		if (!modes_compat(this, lkb))
2167 			return 1;
2168 	}
2169 	return 0;
2170 }
2171 
2172 /*
2173  * "A conversion deadlock arises with a pair of lock requests in the converting
2174  * queue for one resource.  The granted mode of each lock blocks the requested
2175  * mode of the other lock."
2176  *
2177  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2178  * convert queue from being granted, then deadlk/demote lkb.
2179  *
2180  * Example:
2181  * Granted Queue: empty
2182  * Convert Queue: NL->EX (first lock)
2183  *                PR->EX (second lock)
2184  *
2185  * The first lock can't be granted because of the granted mode of the second
2186  * lock and the second lock can't be granted because it's not first in the
2187  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2188  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2189  * flag set and return DEMOTED in the lksb flags.
2190  *
2191  * Originally, this function detected conv-deadlk in a more limited scope:
2192  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2193  * - if lkb1 was the first entry in the queue (not just earlier), and was
2194  *   blocked by the granted mode of lkb2, and there was nothing on the
2195  *   granted queue preventing lkb1 from being granted immediately, i.e.
2196  *   lkb2 was the only thing preventing lkb1 from being granted.
2197  *
2198  * That second condition meant we'd only say there was conv-deadlk if
2199  * resolving it (by demotion) would lead to the first lock on the convert
2200  * queue being granted right away.  It allowed conversion deadlocks to exist
2201  * between locks on the convert queue while they couldn't be granted anyway.
2202  *
2203  * Now, we detect and take action on conversion deadlocks immediately when
2204  * they're created, even if they may not be immediately consequential.  If
2205  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2206  * mode that would prevent lkb1's conversion from being granted, we do a
2207  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2208  * I think this means that the lkb_is_ahead condition below should always
2209  * be zero, i.e. there will never be conv-deadlk between two locks that are
2210  * both already on the convert queue.
2211  */
2212 
conversion_deadlock_detect(struct dlm_rsb * r,struct dlm_lkb * lkb2)2213 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2214 {
2215 	struct dlm_lkb *lkb1;
2216 	int lkb_is_ahead = 0;
2217 
2218 	list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2219 		if (lkb1 == lkb2) {
2220 			lkb_is_ahead = 1;
2221 			continue;
2222 		}
2223 
2224 		if (!lkb_is_ahead) {
2225 			if (!modes_compat(lkb2, lkb1))
2226 				return 1;
2227 		} else {
2228 			if (!modes_compat(lkb2, lkb1) &&
2229 			    !modes_compat(lkb1, lkb2))
2230 				return 1;
2231 		}
2232 	}
2233 	return 0;
2234 }
2235 
2236 /*
2237  * Return 1 if the lock can be granted, 0 otherwise.
2238  * Also detect and resolve conversion deadlocks.
2239  *
2240  * lkb is the lock to be granted
2241  *
2242  * now is 1 if the function is being called in the context of the
2243  * immediate request, it is 0 if called later, after the lock has been
2244  * queued.
2245  *
2246  * recover is 1 if dlm_recover_grant() is trying to grant conversions
2247  * after recovery.
2248  *
2249  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2250  */
2251 
_can_be_granted(struct dlm_rsb * r,struct dlm_lkb * lkb,int now,int recover)2252 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2253 			   int recover)
2254 {
2255 	int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2256 
2257 	/*
2258 	 * 6-10: Version 5.4 introduced an option to address the phenomenon of
2259 	 * a new request for a NL mode lock being blocked.
2260 	 *
2261 	 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2262 	 * request, then it would be granted.  In essence, the use of this flag
2263 	 * tells the Lock Manager to expedite theis request by not considering
2264 	 * what may be in the CONVERTING or WAITING queues...  As of this
2265 	 * writing, the EXPEDITE flag can be used only with new requests for NL
2266 	 * mode locks.  This flag is not valid for conversion requests.
2267 	 *
2268 	 * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
2269 	 * conversion or used with a non-NL requested mode.  We also know an
2270 	 * EXPEDITE request is always granted immediately, so now must always
2271 	 * be 1.  The full condition to grant an expedite request: (now &&
2272 	 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2273 	 * therefore be shortened to just checking the flag.
2274 	 */
2275 
2276 	if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2277 		return 1;
2278 
2279 	/*
2280 	 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2281 	 * added to the remaining conditions.
2282 	 */
2283 
2284 	if (queue_conflict(&r->res_grantqueue, lkb))
2285 		return 0;
2286 
2287 	/*
2288 	 * 6-3: By default, a conversion request is immediately granted if the
2289 	 * requested mode is compatible with the modes of all other granted
2290 	 * locks
2291 	 */
2292 
2293 	if (queue_conflict(&r->res_convertqueue, lkb))
2294 		return 0;
2295 
2296 	/*
2297 	 * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2298 	 * locks for a recovered rsb, on which lkb's have been rebuilt.
2299 	 * The lkb's may have been rebuilt on the queues in a different
2300 	 * order than they were in on the previous master.  So, granting
2301 	 * queued conversions in order after recovery doesn't make sense
2302 	 * since the order hasn't been preserved anyway.  The new order
2303 	 * could also have created a new "in place" conversion deadlock.
2304 	 * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2305 	 * After recovery, there would be no granted locks, and possibly
2306 	 * NL->EX, PR->EX, an in-place conversion deadlock.)  So, after
2307 	 * recovery, grant conversions without considering order.
2308 	 */
2309 
2310 	if (conv && recover)
2311 		return 1;
2312 
2313 	/*
2314 	 * 6-5: But the default algorithm for deciding whether to grant or
2315 	 * queue conversion requests does not by itself guarantee that such
2316 	 * requests are serviced on a "first come first serve" basis.  This, in
2317 	 * turn, can lead to a phenomenon known as "indefinate postponement".
2318 	 *
2319 	 * 6-7: This issue is dealt with by using the optional QUECVT flag with
2320 	 * the system service employed to request a lock conversion.  This flag
2321 	 * forces certain conversion requests to be queued, even if they are
2322 	 * compatible with the granted modes of other locks on the same
2323 	 * resource.  Thus, the use of this flag results in conversion requests
2324 	 * being ordered on a "first come first servce" basis.
2325 	 *
2326 	 * DCT: This condition is all about new conversions being able to occur
2327 	 * "in place" while the lock remains on the granted queue (assuming
2328 	 * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
2329 	 * doesn't _have_ to go onto the convert queue where it's processed in
2330 	 * order.  The "now" variable is necessary to distinguish converts
2331 	 * being received and processed for the first time now, because once a
2332 	 * convert is moved to the conversion queue the condition below applies
2333 	 * requiring fifo granting.
2334 	 */
2335 
2336 	if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2337 		return 1;
2338 
2339 	/*
2340 	 * Even if the convert is compat with all granted locks,
2341 	 * QUECVT forces it behind other locks on the convert queue.
2342 	 */
2343 
2344 	if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2345 		if (list_empty(&r->res_convertqueue))
2346 			return 1;
2347 		else
2348 			return 0;
2349 	}
2350 
2351 	/*
2352 	 * The NOORDER flag is set to avoid the standard vms rules on grant
2353 	 * order.
2354 	 */
2355 
2356 	if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2357 		return 1;
2358 
2359 	/*
2360 	 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2361 	 * granted until all other conversion requests ahead of it are granted
2362 	 * and/or canceled.
2363 	 */
2364 
2365 	if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2366 		return 1;
2367 
2368 	/*
2369 	 * 6-4: By default, a new request is immediately granted only if all
2370 	 * three of the following conditions are satisfied when the request is
2371 	 * issued:
2372 	 * - The queue of ungranted conversion requests for the resource is
2373 	 *   empty.
2374 	 * - The queue of ungranted new requests for the resource is empty.
2375 	 * - The mode of the new request is compatible with the most
2376 	 *   restrictive mode of all granted locks on the resource.
2377 	 */
2378 
2379 	if (now && !conv && list_empty(&r->res_convertqueue) &&
2380 	    list_empty(&r->res_waitqueue))
2381 		return 1;
2382 
2383 	/*
2384 	 * 6-4: Once a lock request is in the queue of ungranted new requests,
2385 	 * it cannot be granted until the queue of ungranted conversion
2386 	 * requests is empty, all ungranted new requests ahead of it are
2387 	 * granted and/or canceled, and it is compatible with the granted mode
2388 	 * of the most restrictive lock granted on the resource.
2389 	 */
2390 
2391 	if (!now && !conv && list_empty(&r->res_convertqueue) &&
2392 	    first_in_list(lkb, &r->res_waitqueue))
2393 		return 1;
2394 
2395 	return 0;
2396 }
2397 
can_be_granted(struct dlm_rsb * r,struct dlm_lkb * lkb,int now,int recover,int * err)2398 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2399 			  int recover, int *err)
2400 {
2401 	int rv;
2402 	int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2403 	int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2404 
2405 	if (err)
2406 		*err = 0;
2407 
2408 	rv = _can_be_granted(r, lkb, now, recover);
2409 	if (rv)
2410 		goto out;
2411 
2412 	/*
2413 	 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2414 	 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2415 	 * cancels one of the locks.
2416 	 */
2417 
2418 	if (is_convert && can_be_queued(lkb) &&
2419 	    conversion_deadlock_detect(r, lkb)) {
2420 		if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2421 			lkb->lkb_grmode = DLM_LOCK_NL;
2422 			set_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
2423 		} else if (err) {
2424 			*err = -EDEADLK;
2425 		} else {
2426 			log_print("can_be_granted deadlock %x now %d",
2427 				  lkb->lkb_id, now);
2428 			dlm_dump_rsb(r);
2429 		}
2430 		goto out;
2431 	}
2432 
2433 	/*
2434 	 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2435 	 * to grant a request in a mode other than the normal rqmode.  It's a
2436 	 * simple way to provide a big optimization to applications that can
2437 	 * use them.
2438 	 */
2439 
2440 	if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2441 		alt = DLM_LOCK_PR;
2442 	else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2443 		alt = DLM_LOCK_CW;
2444 
2445 	if (alt) {
2446 		lkb->lkb_rqmode = alt;
2447 		rv = _can_be_granted(r, lkb, now, 0);
2448 		if (rv)
2449 			set_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
2450 		else
2451 			lkb->lkb_rqmode = rqmode;
2452 	}
2453  out:
2454 	return rv;
2455 }
2456 
2457 /* Returns the highest requested mode of all blocked conversions; sets
2458    cw if there's a blocked conversion to DLM_LOCK_CW. */
2459 
grant_pending_convert(struct dlm_rsb * r,int high,int * cw,unsigned int * count)2460 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2461 				 unsigned int *count)
2462 {
2463 	struct dlm_lkb *lkb, *s;
2464 	int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2465 	int hi, demoted, quit, grant_restart, demote_restart;
2466 	int deadlk;
2467 
2468 	quit = 0;
2469  restart:
2470 	grant_restart = 0;
2471 	demote_restart = 0;
2472 	hi = DLM_LOCK_IV;
2473 
2474 	list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2475 		demoted = is_demoted(lkb);
2476 		deadlk = 0;
2477 
2478 		if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2479 			grant_lock_pending(r, lkb);
2480 			grant_restart = 1;
2481 			if (count)
2482 				(*count)++;
2483 			continue;
2484 		}
2485 
2486 		if (!demoted && is_demoted(lkb)) {
2487 			log_print("WARN: pending demoted %x node %d %s",
2488 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2489 			demote_restart = 1;
2490 			continue;
2491 		}
2492 
2493 		if (deadlk) {
2494 			/*
2495 			 * If DLM_LKB_NODLKWT flag is set and conversion
2496 			 * deadlock is detected, we request blocking AST and
2497 			 * down (or cancel) conversion.
2498 			 */
2499 			if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2500 				if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2501 					queue_bast(r, lkb, lkb->lkb_rqmode);
2502 					lkb->lkb_highbast = lkb->lkb_rqmode;
2503 				}
2504 			} else {
2505 				log_print("WARN: pending deadlock %x node %d %s",
2506 					  lkb->lkb_id, lkb->lkb_nodeid,
2507 					  r->res_name);
2508 				dlm_dump_rsb(r);
2509 			}
2510 			continue;
2511 		}
2512 
2513 		hi = max_t(int, lkb->lkb_rqmode, hi);
2514 
2515 		if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2516 			*cw = 1;
2517 	}
2518 
2519 	if (grant_restart)
2520 		goto restart;
2521 	if (demote_restart && !quit) {
2522 		quit = 1;
2523 		goto restart;
2524 	}
2525 
2526 	return max_t(int, high, hi);
2527 }
2528 
grant_pending_wait(struct dlm_rsb * r,int high,int * cw,unsigned int * count)2529 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2530 			      unsigned int *count)
2531 {
2532 	struct dlm_lkb *lkb, *s;
2533 
2534 	list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2535 		if (can_be_granted(r, lkb, 0, 0, NULL)) {
2536 			grant_lock_pending(r, lkb);
2537 			if (count)
2538 				(*count)++;
2539 		} else {
2540 			high = max_t(int, lkb->lkb_rqmode, high);
2541 			if (lkb->lkb_rqmode == DLM_LOCK_CW)
2542 				*cw = 1;
2543 		}
2544 	}
2545 
2546 	return high;
2547 }
2548 
2549 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2550    on either the convert or waiting queue.
2551    high is the largest rqmode of all locks blocked on the convert or
2552    waiting queue. */
2553 
lock_requires_bast(struct dlm_lkb * gr,int high,int cw)2554 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2555 {
2556 	if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2557 		if (gr->lkb_highbast < DLM_LOCK_EX)
2558 			return 1;
2559 		return 0;
2560 	}
2561 
2562 	if (gr->lkb_highbast < high &&
2563 	    !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2564 		return 1;
2565 	return 0;
2566 }
2567 
grant_pending_locks(struct dlm_rsb * r,unsigned int * count)2568 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2569 {
2570 	struct dlm_lkb *lkb, *s;
2571 	int high = DLM_LOCK_IV;
2572 	int cw = 0;
2573 
2574 	if (!is_master(r)) {
2575 		log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2576 		dlm_dump_rsb(r);
2577 		return;
2578 	}
2579 
2580 	high = grant_pending_convert(r, high, &cw, count);
2581 	high = grant_pending_wait(r, high, &cw, count);
2582 
2583 	if (high == DLM_LOCK_IV)
2584 		return;
2585 
2586 	/*
2587 	 * If there are locks left on the wait/convert queue then send blocking
2588 	 * ASTs to granted locks based on the largest requested mode (high)
2589 	 * found above.
2590 	 */
2591 
2592 	list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2593 		if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2594 			if (cw && high == DLM_LOCK_PR &&
2595 			    lkb->lkb_grmode == DLM_LOCK_PR)
2596 				queue_bast(r, lkb, DLM_LOCK_CW);
2597 			else
2598 				queue_bast(r, lkb, high);
2599 			lkb->lkb_highbast = high;
2600 		}
2601 	}
2602 }
2603 
modes_require_bast(struct dlm_lkb * gr,struct dlm_lkb * rq)2604 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2605 {
2606 	if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2607 	    (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2608 		if (gr->lkb_highbast < DLM_LOCK_EX)
2609 			return 1;
2610 		return 0;
2611 	}
2612 
2613 	if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2614 		return 1;
2615 	return 0;
2616 }
2617 
send_bast_queue(struct dlm_rsb * r,struct list_head * head,struct dlm_lkb * lkb)2618 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2619 			    struct dlm_lkb *lkb)
2620 {
2621 	struct dlm_lkb *gr;
2622 
2623 	list_for_each_entry(gr, head, lkb_statequeue) {
2624 		/* skip self when sending basts to convertqueue */
2625 		if (gr == lkb)
2626 			continue;
2627 		if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2628 			queue_bast(r, gr, lkb->lkb_rqmode);
2629 			gr->lkb_highbast = lkb->lkb_rqmode;
2630 		}
2631 	}
2632 }
2633 
send_blocking_asts(struct dlm_rsb * r,struct dlm_lkb * lkb)2634 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2635 {
2636 	send_bast_queue(r, &r->res_grantqueue, lkb);
2637 }
2638 
send_blocking_asts_all(struct dlm_rsb * r,struct dlm_lkb * lkb)2639 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2640 {
2641 	send_bast_queue(r, &r->res_grantqueue, lkb);
2642 	send_bast_queue(r, &r->res_convertqueue, lkb);
2643 }
2644 
2645 /* set_master(r, lkb) -- set the master nodeid of a resource
2646 
2647    The purpose of this function is to set the nodeid field in the given
2648    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2649    known, it can just be copied to the lkb and the function will return
2650    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2651    before it can be copied to the lkb.
2652 
2653    When the rsb nodeid is being looked up remotely, the initial lkb
2654    causing the lookup is kept on the ls_waiters list waiting for the
2655    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2656    on the rsb's res_lookup list until the master is verified.
2657 
2658    Return values:
2659    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2660    1: the rsb master is not available and the lkb has been placed on
2661       a wait queue
2662 */
2663 
set_master(struct dlm_rsb * r,struct dlm_lkb * lkb)2664 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2665 {
2666 	int our_nodeid = dlm_our_nodeid();
2667 
2668 	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2669 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2670 		r->res_first_lkid = lkb->lkb_id;
2671 		lkb->lkb_nodeid = r->res_nodeid;
2672 		return 0;
2673 	}
2674 
2675 	if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2676 		list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2677 		return 1;
2678 	}
2679 
2680 	if (r->res_master_nodeid == our_nodeid) {
2681 		lkb->lkb_nodeid = 0;
2682 		return 0;
2683 	}
2684 
2685 	if (r->res_master_nodeid) {
2686 		lkb->lkb_nodeid = r->res_master_nodeid;
2687 		return 0;
2688 	}
2689 
2690 	if (dlm_dir_nodeid(r) == our_nodeid) {
2691 		/* This is a somewhat unusual case; find_rsb will usually
2692 		   have set res_master_nodeid when dir nodeid is local, but
2693 		   there are cases where we become the dir node after we've
2694 		   past find_rsb and go through _request_lock again.
2695 		   confirm_master() or process_lookup_list() needs to be
2696 		   called after this. */
2697 		log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2698 			  lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2699 			  r->res_name);
2700 		r->res_master_nodeid = our_nodeid;
2701 		r->res_nodeid = 0;
2702 		lkb->lkb_nodeid = 0;
2703 		return 0;
2704 	}
2705 
2706 	r->res_first_lkid = lkb->lkb_id;
2707 	send_lookup(r, lkb);
2708 	return 1;
2709 }
2710 
process_lookup_list(struct dlm_rsb * r)2711 static void process_lookup_list(struct dlm_rsb *r)
2712 {
2713 	struct dlm_lkb *lkb, *safe;
2714 
2715 	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2716 		list_del_init(&lkb->lkb_rsb_lookup);
2717 		_request_lock(r, lkb);
2718 	}
2719 }
2720 
2721 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2722 
confirm_master(struct dlm_rsb * r,int error)2723 static void confirm_master(struct dlm_rsb *r, int error)
2724 {
2725 	struct dlm_lkb *lkb;
2726 
2727 	if (!r->res_first_lkid)
2728 		return;
2729 
2730 	switch (error) {
2731 	case 0:
2732 	case -EINPROGRESS:
2733 		r->res_first_lkid = 0;
2734 		process_lookup_list(r);
2735 		break;
2736 
2737 	case -EAGAIN:
2738 	case -EBADR:
2739 	case -ENOTBLK:
2740 		/* the remote request failed and won't be retried (it was
2741 		   a NOQUEUE, or has been canceled/unlocked); make a waiting
2742 		   lkb the first_lkid */
2743 
2744 		r->res_first_lkid = 0;
2745 
2746 		if (!list_empty(&r->res_lookup)) {
2747 			lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2748 					 lkb_rsb_lookup);
2749 			list_del_init(&lkb->lkb_rsb_lookup);
2750 			r->res_first_lkid = lkb->lkb_id;
2751 			_request_lock(r, lkb);
2752 		}
2753 		break;
2754 
2755 	default:
2756 		log_error(r->res_ls, "confirm_master unknown error %d", error);
2757 	}
2758 }
2759 
set_lock_args(int mode,struct dlm_lksb * lksb,uint32_t flags,int namelen,void (* ast)(void * astparam),void * astparam,void (* bast)(void * astparam,int mode),struct dlm_args * args)2760 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2761 			 int namelen, void (*ast)(void *astparam),
2762 			 void *astparam,
2763 			 void (*bast)(void *astparam, int mode),
2764 			 struct dlm_args *args)
2765 {
2766 	int rv = -EINVAL;
2767 
2768 	/* check for invalid arg usage */
2769 
2770 	if (mode < 0 || mode > DLM_LOCK_EX)
2771 		goto out;
2772 
2773 	if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2774 		goto out;
2775 
2776 	if (flags & DLM_LKF_CANCEL)
2777 		goto out;
2778 
2779 	if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2780 		goto out;
2781 
2782 	if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2783 		goto out;
2784 
2785 	if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2786 		goto out;
2787 
2788 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2789 		goto out;
2790 
2791 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2792 		goto out;
2793 
2794 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2795 		goto out;
2796 
2797 	if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2798 		goto out;
2799 
2800 	if (!ast || !lksb)
2801 		goto out;
2802 
2803 	if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2804 		goto out;
2805 
2806 	if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2807 		goto out;
2808 
2809 	/* these args will be copied to the lkb in validate_lock_args,
2810 	   it cannot be done now because when converting locks, fields in
2811 	   an active lkb cannot be modified before locking the rsb */
2812 
2813 	args->flags = flags;
2814 	args->astfn = ast;
2815 	args->astparam = astparam;
2816 	args->bastfn = bast;
2817 	args->mode = mode;
2818 	args->lksb = lksb;
2819 	rv = 0;
2820  out:
2821 	return rv;
2822 }
2823 
set_unlock_args(uint32_t flags,void * astarg,struct dlm_args * args)2824 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2825 {
2826 	if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2827  		      DLM_LKF_FORCEUNLOCK))
2828 		return -EINVAL;
2829 
2830 	if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2831 		return -EINVAL;
2832 
2833 	args->flags = flags;
2834 	args->astparam = astarg;
2835 	return 0;
2836 }
2837 
validate_lock_args(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_args * args)2838 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2839 			      struct dlm_args *args)
2840 {
2841 	int rv = -EBUSY;
2842 
2843 	if (args->flags & DLM_LKF_CONVERT) {
2844 		if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2845 			goto out;
2846 
2847 		/* lock not allowed if there's any op in progress */
2848 		if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2849 			goto out;
2850 
2851 		if (is_overlap(lkb))
2852 			goto out;
2853 
2854 		rv = -EINVAL;
2855 		if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
2856 			goto out;
2857 
2858 		if (args->flags & DLM_LKF_QUECVT &&
2859 		    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2860 			goto out;
2861 	}
2862 
2863 	lkb->lkb_exflags = args->flags;
2864 	dlm_set_sbflags_val(lkb, 0);
2865 	lkb->lkb_astfn = args->astfn;
2866 	lkb->lkb_astparam = args->astparam;
2867 	lkb->lkb_bastfn = args->bastfn;
2868 	lkb->lkb_rqmode = args->mode;
2869 	lkb->lkb_lksb = args->lksb;
2870 	lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2871 	lkb->lkb_ownpid = (int) current->pid;
2872 	rv = 0;
2873  out:
2874 	switch (rv) {
2875 	case 0:
2876 		break;
2877 	case -EINVAL:
2878 		/* annoy the user because dlm usage is wrong */
2879 		WARN_ON(1);
2880 		log_error(ls, "%s %d %x %x %x %d %d", __func__,
2881 			  rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2882 			  lkb->lkb_status, lkb->lkb_wait_type);
2883 		break;
2884 	default:
2885 		log_debug(ls, "%s %d %x %x %x %d %d", __func__,
2886 			  rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2887 			  lkb->lkb_status, lkb->lkb_wait_type);
2888 		break;
2889 	}
2890 
2891 	return rv;
2892 }
2893 
2894 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2895    for success */
2896 
2897 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2898    because there may be a lookup in progress and it's valid to do
2899    cancel/unlockf on it */
2900 
validate_unlock_args(struct dlm_lkb * lkb,struct dlm_args * args)2901 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2902 {
2903 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2904 	int rv = -EBUSY;
2905 
2906 	/* normal unlock not allowed if there's any op in progress */
2907 	if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) &&
2908 	    (lkb->lkb_wait_type || lkb->lkb_wait_count))
2909 		goto out;
2910 
2911 	/* an lkb may be waiting for an rsb lookup to complete where the
2912 	   lookup was initiated by another lock */
2913 
2914 	if (!list_empty(&lkb->lkb_rsb_lookup)) {
2915 		if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2916 			log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2917 			list_del_init(&lkb->lkb_rsb_lookup);
2918 			queue_cast(lkb->lkb_resource, lkb,
2919 				   args->flags & DLM_LKF_CANCEL ?
2920 				   -DLM_ECANCEL : -DLM_EUNLOCK);
2921 			unhold_lkb(lkb); /* undoes create_lkb() */
2922 		}
2923 		/* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2924 		goto out;
2925 	}
2926 
2927 	rv = -EINVAL;
2928 	if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) {
2929 		log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2930 		dlm_print_lkb(lkb);
2931 		goto out;
2932 	}
2933 
2934 	/* an lkb may still exist even though the lock is EOL'ed due to a
2935 	 * cancel, unlock or failed noqueue request; an app can't use these
2936 	 * locks; return same error as if the lkid had not been found at all
2937 	 */
2938 
2939 	if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
2940 		log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2941 		rv = -ENOENT;
2942 		goto out;
2943 	}
2944 
2945 	/* cancel not allowed with another cancel/unlock in progress */
2946 
2947 	if (args->flags & DLM_LKF_CANCEL) {
2948 		if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2949 			goto out;
2950 
2951 		if (is_overlap(lkb))
2952 			goto out;
2953 
2954 		if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2955 			set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2956 			rv = -EBUSY;
2957 			goto out;
2958 		}
2959 
2960 		/* there's nothing to cancel */
2961 		if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2962 		    !lkb->lkb_wait_type) {
2963 			rv = -EBUSY;
2964 			goto out;
2965 		}
2966 
2967 		switch (lkb->lkb_wait_type) {
2968 		case DLM_MSG_LOOKUP:
2969 		case DLM_MSG_REQUEST:
2970 			set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2971 			rv = -EBUSY;
2972 			goto out;
2973 		case DLM_MSG_UNLOCK:
2974 		case DLM_MSG_CANCEL:
2975 			goto out;
2976 		}
2977 		/* add_to_waiters() will set OVERLAP_CANCEL */
2978 		goto out_ok;
2979 	}
2980 
2981 	/* do we need to allow a force-unlock if there's a normal unlock
2982 	   already in progress?  in what conditions could the normal unlock
2983 	   fail such that we'd want to send a force-unlock to be sure? */
2984 
2985 	if (args->flags & DLM_LKF_FORCEUNLOCK) {
2986 		if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2987 			goto out;
2988 
2989 		if (is_overlap_unlock(lkb))
2990 			goto out;
2991 
2992 		if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2993 			set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2994 			rv = -EBUSY;
2995 			goto out;
2996 		}
2997 
2998 		switch (lkb->lkb_wait_type) {
2999 		case DLM_MSG_LOOKUP:
3000 		case DLM_MSG_REQUEST:
3001 			set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
3002 			rv = -EBUSY;
3003 			goto out;
3004 		case DLM_MSG_UNLOCK:
3005 			goto out;
3006 		}
3007 		/* add_to_waiters() will set OVERLAP_UNLOCK */
3008 	}
3009 
3010  out_ok:
3011 	/* an overlapping op shouldn't blow away exflags from other op */
3012 	lkb->lkb_exflags |= args->flags;
3013 	dlm_set_sbflags_val(lkb, 0);
3014 	lkb->lkb_astparam = args->astparam;
3015 	rv = 0;
3016  out:
3017 	switch (rv) {
3018 	case 0:
3019 		break;
3020 	case -EINVAL:
3021 		/* annoy the user because dlm usage is wrong */
3022 		WARN_ON(1);
3023 		log_error(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3024 			  lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3025 			  args->flags, lkb->lkb_wait_type,
3026 			  lkb->lkb_resource->res_name);
3027 		break;
3028 	default:
3029 		log_debug(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3030 			  lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3031 			  args->flags, lkb->lkb_wait_type,
3032 			  lkb->lkb_resource->res_name);
3033 		break;
3034 	}
3035 
3036 	return rv;
3037 }
3038 
3039 /*
3040  * Four stage 4 varieties:
3041  * do_request(), do_convert(), do_unlock(), do_cancel()
3042  * These are called on the master node for the given lock and
3043  * from the central locking logic.
3044  */
3045 
do_request(struct dlm_rsb * r,struct dlm_lkb * lkb)3046 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3047 {
3048 	int error = 0;
3049 
3050 	if (can_be_granted(r, lkb, 1, 0, NULL)) {
3051 		grant_lock(r, lkb);
3052 		queue_cast(r, lkb, 0);
3053 		goto out;
3054 	}
3055 
3056 	if (can_be_queued(lkb)) {
3057 		error = -EINPROGRESS;
3058 		add_lkb(r, lkb, DLM_LKSTS_WAITING);
3059 		goto out;
3060 	}
3061 
3062 	error = -EAGAIN;
3063 	queue_cast(r, lkb, -EAGAIN);
3064  out:
3065 	return error;
3066 }
3067 
do_request_effects(struct dlm_rsb * r,struct dlm_lkb * lkb,int error)3068 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3069 			       int error)
3070 {
3071 	switch (error) {
3072 	case -EAGAIN:
3073 		if (force_blocking_asts(lkb))
3074 			send_blocking_asts_all(r, lkb);
3075 		break;
3076 	case -EINPROGRESS:
3077 		send_blocking_asts(r, lkb);
3078 		break;
3079 	}
3080 }
3081 
do_convert(struct dlm_rsb * r,struct dlm_lkb * lkb)3082 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3083 {
3084 	int error = 0;
3085 	int deadlk = 0;
3086 
3087 	/* changing an existing lock may allow others to be granted */
3088 
3089 	if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3090 		grant_lock(r, lkb);
3091 		queue_cast(r, lkb, 0);
3092 		goto out;
3093 	}
3094 
3095 	/* can_be_granted() detected that this lock would block in a conversion
3096 	   deadlock, so we leave it on the granted queue and return EDEADLK in
3097 	   the ast for the convert. */
3098 
3099 	if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
3100 		/* it's left on the granted queue */
3101 		revert_lock(r, lkb);
3102 		queue_cast(r, lkb, -EDEADLK);
3103 		error = -EDEADLK;
3104 		goto out;
3105 	}
3106 
3107 	/* is_demoted() means the can_be_granted() above set the grmode
3108 	   to NL, and left us on the granted queue.  This auto-demotion
3109 	   (due to CONVDEADLK) might mean other locks, and/or this lock, are
3110 	   now grantable.  We have to try to grant other converting locks
3111 	   before we try again to grant this one. */
3112 
3113 	if (is_demoted(lkb)) {
3114 		grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3115 		if (_can_be_granted(r, lkb, 1, 0)) {
3116 			grant_lock(r, lkb);
3117 			queue_cast(r, lkb, 0);
3118 			goto out;
3119 		}
3120 		/* else fall through and move to convert queue */
3121 	}
3122 
3123 	if (can_be_queued(lkb)) {
3124 		error = -EINPROGRESS;
3125 		del_lkb(r, lkb);
3126 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3127 		goto out;
3128 	}
3129 
3130 	error = -EAGAIN;
3131 	queue_cast(r, lkb, -EAGAIN);
3132  out:
3133 	return error;
3134 }
3135 
do_convert_effects(struct dlm_rsb * r,struct dlm_lkb * lkb,int error)3136 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3137 			       int error)
3138 {
3139 	switch (error) {
3140 	case 0:
3141 		grant_pending_locks(r, NULL);
3142 		/* grant_pending_locks also sends basts */
3143 		break;
3144 	case -EAGAIN:
3145 		if (force_blocking_asts(lkb))
3146 			send_blocking_asts_all(r, lkb);
3147 		break;
3148 	case -EINPROGRESS:
3149 		send_blocking_asts(r, lkb);
3150 		break;
3151 	}
3152 }
3153 
do_unlock(struct dlm_rsb * r,struct dlm_lkb * lkb)3154 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3155 {
3156 	remove_lock(r, lkb);
3157 	queue_cast(r, lkb, -DLM_EUNLOCK);
3158 	return -DLM_EUNLOCK;
3159 }
3160 
do_unlock_effects(struct dlm_rsb * r,struct dlm_lkb * lkb,int error)3161 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3162 			      int error)
3163 {
3164 	grant_pending_locks(r, NULL);
3165 }
3166 
3167 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3168 
do_cancel(struct dlm_rsb * r,struct dlm_lkb * lkb)3169 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3170 {
3171 	int error;
3172 
3173 	error = revert_lock(r, lkb);
3174 	if (error) {
3175 		queue_cast(r, lkb, -DLM_ECANCEL);
3176 		return -DLM_ECANCEL;
3177 	}
3178 	return 0;
3179 }
3180 
do_cancel_effects(struct dlm_rsb * r,struct dlm_lkb * lkb,int error)3181 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3182 			      int error)
3183 {
3184 	if (error)
3185 		grant_pending_locks(r, NULL);
3186 }
3187 
3188 /*
3189  * Four stage 3 varieties:
3190  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3191  */
3192 
3193 /* add a new lkb to a possibly new rsb, called by requesting process */
3194 
_request_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)3195 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3196 {
3197 	int error;
3198 
3199 	/* set_master: sets lkb nodeid from r */
3200 
3201 	error = set_master(r, lkb);
3202 	if (error < 0)
3203 		goto out;
3204 	if (error) {
3205 		error = 0;
3206 		goto out;
3207 	}
3208 
3209 	if (is_remote(r)) {
3210 		/* receive_request() calls do_request() on remote node */
3211 		error = send_request(r, lkb);
3212 	} else {
3213 		error = do_request(r, lkb);
3214 		/* for remote locks the request_reply is sent
3215 		   between do_request and do_request_effects */
3216 		do_request_effects(r, lkb, error);
3217 	}
3218  out:
3219 	return error;
3220 }
3221 
3222 /* change some property of an existing lkb, e.g. mode */
3223 
_convert_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)3224 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3225 {
3226 	int error;
3227 
3228 	if (is_remote(r)) {
3229 		/* receive_convert() calls do_convert() on remote node */
3230 		error = send_convert(r, lkb);
3231 	} else {
3232 		error = do_convert(r, lkb);
3233 		/* for remote locks the convert_reply is sent
3234 		   between do_convert and do_convert_effects */
3235 		do_convert_effects(r, lkb, error);
3236 	}
3237 
3238 	return error;
3239 }
3240 
3241 /* remove an existing lkb from the granted queue */
3242 
_unlock_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)3243 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3244 {
3245 	int error;
3246 
3247 	if (is_remote(r)) {
3248 		/* receive_unlock() calls do_unlock() on remote node */
3249 		error = send_unlock(r, lkb);
3250 	} else {
3251 		error = do_unlock(r, lkb);
3252 		/* for remote locks the unlock_reply is sent
3253 		   between do_unlock and do_unlock_effects */
3254 		do_unlock_effects(r, lkb, error);
3255 	}
3256 
3257 	return error;
3258 }
3259 
3260 /* remove an existing lkb from the convert or wait queue */
3261 
_cancel_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)3262 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3263 {
3264 	int error;
3265 
3266 	if (is_remote(r)) {
3267 		/* receive_cancel() calls do_cancel() on remote node */
3268 		error = send_cancel(r, lkb);
3269 	} else {
3270 		error = do_cancel(r, lkb);
3271 		/* for remote locks the cancel_reply is sent
3272 		   between do_cancel and do_cancel_effects */
3273 		do_cancel_effects(r, lkb, error);
3274 	}
3275 
3276 	return error;
3277 }
3278 
3279 /*
3280  * Four stage 2 varieties:
3281  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3282  */
3283 
request_lock(struct dlm_ls * ls,struct dlm_lkb * lkb,const void * name,int len,struct dlm_args * args)3284 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3285 			const void *name, int len,
3286 			struct dlm_args *args)
3287 {
3288 	struct dlm_rsb *r;
3289 	int error;
3290 
3291 	error = validate_lock_args(ls, lkb, args);
3292 	if (error)
3293 		return error;
3294 
3295 	error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3296 	if (error)
3297 		return error;
3298 
3299 	lock_rsb(r);
3300 
3301 	attach_lkb(r, lkb);
3302 	lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3303 
3304 	error = _request_lock(r, lkb);
3305 
3306 	unlock_rsb(r);
3307 	put_rsb(r);
3308 	return error;
3309 }
3310 
convert_lock(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_args * args)3311 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3312 			struct dlm_args *args)
3313 {
3314 	struct dlm_rsb *r;
3315 	int error;
3316 
3317 	r = lkb->lkb_resource;
3318 
3319 	hold_rsb(r);
3320 	lock_rsb(r);
3321 
3322 	error = validate_lock_args(ls, lkb, args);
3323 	if (error)
3324 		goto out;
3325 
3326 	error = _convert_lock(r, lkb);
3327  out:
3328 	unlock_rsb(r);
3329 	put_rsb(r);
3330 	return error;
3331 }
3332 
unlock_lock(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_args * args)3333 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3334 		       struct dlm_args *args)
3335 {
3336 	struct dlm_rsb *r;
3337 	int error;
3338 
3339 	r = lkb->lkb_resource;
3340 
3341 	hold_rsb(r);
3342 	lock_rsb(r);
3343 
3344 	error = validate_unlock_args(lkb, args);
3345 	if (error)
3346 		goto out;
3347 
3348 	error = _unlock_lock(r, lkb);
3349  out:
3350 	unlock_rsb(r);
3351 	put_rsb(r);
3352 	return error;
3353 }
3354 
cancel_lock(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_args * args)3355 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3356 		       struct dlm_args *args)
3357 {
3358 	struct dlm_rsb *r;
3359 	int error;
3360 
3361 	r = lkb->lkb_resource;
3362 
3363 	hold_rsb(r);
3364 	lock_rsb(r);
3365 
3366 	error = validate_unlock_args(lkb, args);
3367 	if (error)
3368 		goto out;
3369 
3370 	error = _cancel_lock(r, lkb);
3371  out:
3372 	unlock_rsb(r);
3373 	put_rsb(r);
3374 	return error;
3375 }
3376 
3377 /*
3378  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
3379  */
3380 
dlm_lock(dlm_lockspace_t * lockspace,int mode,struct dlm_lksb * lksb,uint32_t flags,const void * name,unsigned int namelen,uint32_t parent_lkid,void (* ast)(void * astarg),void * astarg,void (* bast)(void * astarg,int mode))3381 int dlm_lock(dlm_lockspace_t *lockspace,
3382 	     int mode,
3383 	     struct dlm_lksb *lksb,
3384 	     uint32_t flags,
3385 	     const void *name,
3386 	     unsigned int namelen,
3387 	     uint32_t parent_lkid,
3388 	     void (*ast) (void *astarg),
3389 	     void *astarg,
3390 	     void (*bast) (void *astarg, int mode))
3391 {
3392 	struct dlm_ls *ls;
3393 	struct dlm_lkb *lkb;
3394 	struct dlm_args args;
3395 	int error, convert = flags & DLM_LKF_CONVERT;
3396 
3397 	ls = dlm_find_lockspace_local(lockspace);
3398 	if (!ls)
3399 		return -EINVAL;
3400 
3401 	dlm_lock_recovery(ls);
3402 
3403 	if (convert)
3404 		error = find_lkb(ls, lksb->sb_lkid, &lkb);
3405 	else
3406 		error = create_lkb(ls, &lkb);
3407 
3408 	if (error)
3409 		goto out;
3410 
3411 	trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
3412 
3413 	error = set_lock_args(mode, lksb, flags, namelen, ast, astarg, bast,
3414 			      &args);
3415 	if (error)
3416 		goto out_put;
3417 
3418 	if (convert)
3419 		error = convert_lock(ls, lkb, &args);
3420 	else
3421 		error = request_lock(ls, lkb, name, namelen, &args);
3422 
3423 	if (error == -EINPROGRESS)
3424 		error = 0;
3425  out_put:
3426 	trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true);
3427 
3428 	if (convert || error)
3429 		__put_lkb(ls, lkb);
3430 	if (error == -EAGAIN || error == -EDEADLK)
3431 		error = 0;
3432  out:
3433 	dlm_unlock_recovery(ls);
3434 	dlm_put_lockspace(ls);
3435 	return error;
3436 }
3437 
dlm_unlock(dlm_lockspace_t * lockspace,uint32_t lkid,uint32_t flags,struct dlm_lksb * lksb,void * astarg)3438 int dlm_unlock(dlm_lockspace_t *lockspace,
3439 	       uint32_t lkid,
3440 	       uint32_t flags,
3441 	       struct dlm_lksb *lksb,
3442 	       void *astarg)
3443 {
3444 	struct dlm_ls *ls;
3445 	struct dlm_lkb *lkb;
3446 	struct dlm_args args;
3447 	int error;
3448 
3449 	ls = dlm_find_lockspace_local(lockspace);
3450 	if (!ls)
3451 		return -EINVAL;
3452 
3453 	dlm_lock_recovery(ls);
3454 
3455 	error = find_lkb(ls, lkid, &lkb);
3456 	if (error)
3457 		goto out;
3458 
3459 	trace_dlm_unlock_start(ls, lkb, flags);
3460 
3461 	error = set_unlock_args(flags, astarg, &args);
3462 	if (error)
3463 		goto out_put;
3464 
3465 	if (flags & DLM_LKF_CANCEL)
3466 		error = cancel_lock(ls, lkb, &args);
3467 	else
3468 		error = unlock_lock(ls, lkb, &args);
3469 
3470 	if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3471 		error = 0;
3472 	if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3473 		error = 0;
3474  out_put:
3475 	trace_dlm_unlock_end(ls, lkb, flags, error);
3476 
3477 	dlm_put_lkb(lkb);
3478  out:
3479 	dlm_unlock_recovery(ls);
3480 	dlm_put_lockspace(ls);
3481 	return error;
3482 }
3483 
3484 /*
3485  * send/receive routines for remote operations and replies
3486  *
3487  * send_args
3488  * send_common
3489  * send_request			receive_request
3490  * send_convert			receive_convert
3491  * send_unlock			receive_unlock
3492  * send_cancel			receive_cancel
3493  * send_grant			receive_grant
3494  * send_bast			receive_bast
3495  * send_lookup			receive_lookup
3496  * send_remove			receive_remove
3497  *
3498  * 				send_common_reply
3499  * receive_request_reply	send_request_reply
3500  * receive_convert_reply	send_convert_reply
3501  * receive_unlock_reply		send_unlock_reply
3502  * receive_cancel_reply		send_cancel_reply
3503  * receive_lookup_reply		send_lookup_reply
3504  */
3505 
_create_message(struct dlm_ls * ls,int mb_len,int to_nodeid,int mstype,struct dlm_message ** ms_ret,struct dlm_mhandle ** mh_ret)3506 static int _create_message(struct dlm_ls *ls, int mb_len,
3507 			   int to_nodeid, int mstype,
3508 			   struct dlm_message **ms_ret,
3509 			   struct dlm_mhandle **mh_ret)
3510 {
3511 	struct dlm_message *ms;
3512 	struct dlm_mhandle *mh;
3513 	char *mb;
3514 
3515 	/* get_buffer gives us a message handle (mh) that we need to
3516 	   pass into midcomms_commit and a message buffer (mb) that we
3517 	   write our data into */
3518 
3519 	mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb);
3520 	if (!mh)
3521 		return -ENOBUFS;
3522 
3523 	ms = (struct dlm_message *) mb;
3524 
3525 	ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3526 	ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
3527 	ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
3528 	ms->m_header.h_length = cpu_to_le16(mb_len);
3529 	ms->m_header.h_cmd = DLM_MSG;
3530 
3531 	ms->m_type = cpu_to_le32(mstype);
3532 
3533 	*mh_ret = mh;
3534 	*ms_ret = ms;
3535 	return 0;
3536 }
3537 
create_message(struct dlm_rsb * r,struct dlm_lkb * lkb,int to_nodeid,int mstype,struct dlm_message ** ms_ret,struct dlm_mhandle ** mh_ret)3538 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3539 			  int to_nodeid, int mstype,
3540 			  struct dlm_message **ms_ret,
3541 			  struct dlm_mhandle **mh_ret)
3542 {
3543 	int mb_len = sizeof(struct dlm_message);
3544 
3545 	switch (mstype) {
3546 	case DLM_MSG_REQUEST:
3547 	case DLM_MSG_LOOKUP:
3548 	case DLM_MSG_REMOVE:
3549 		mb_len += r->res_length;
3550 		break;
3551 	case DLM_MSG_CONVERT:
3552 	case DLM_MSG_UNLOCK:
3553 	case DLM_MSG_REQUEST_REPLY:
3554 	case DLM_MSG_CONVERT_REPLY:
3555 	case DLM_MSG_GRANT:
3556 		if (lkb && lkb->lkb_lvbptr && (lkb->lkb_exflags & DLM_LKF_VALBLK))
3557 			mb_len += r->res_ls->ls_lvblen;
3558 		break;
3559 	}
3560 
3561 	return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3562 			       ms_ret, mh_ret);
3563 }
3564 
3565 /* further lowcomms enhancements or alternate implementations may make
3566    the return value from this function useful at some point */
3567 
send_message(struct dlm_mhandle * mh,struct dlm_message * ms,const void * name,int namelen)3568 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms,
3569 			const void *name, int namelen)
3570 {
3571 	dlm_midcomms_commit_mhandle(mh, name, namelen);
3572 	return 0;
3573 }
3574 
send_args(struct dlm_rsb * r,struct dlm_lkb * lkb,struct dlm_message * ms)3575 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3576 		      struct dlm_message *ms)
3577 {
3578 	ms->m_nodeid   = cpu_to_le32(lkb->lkb_nodeid);
3579 	ms->m_pid      = cpu_to_le32(lkb->lkb_ownpid);
3580 	ms->m_lkid     = cpu_to_le32(lkb->lkb_id);
3581 	ms->m_remid    = cpu_to_le32(lkb->lkb_remid);
3582 	ms->m_exflags  = cpu_to_le32(lkb->lkb_exflags);
3583 	ms->m_sbflags  = cpu_to_le32(dlm_sbflags_val(lkb));
3584 	ms->m_flags    = cpu_to_le32(dlm_dflags_val(lkb));
3585 	ms->m_lvbseq   = cpu_to_le32(lkb->lkb_lvbseq);
3586 	ms->m_status   = cpu_to_le32(lkb->lkb_status);
3587 	ms->m_grmode   = cpu_to_le32(lkb->lkb_grmode);
3588 	ms->m_rqmode   = cpu_to_le32(lkb->lkb_rqmode);
3589 	ms->m_hash     = cpu_to_le32(r->res_hash);
3590 
3591 	/* m_result and m_bastmode are set from function args,
3592 	   not from lkb fields */
3593 
3594 	if (lkb->lkb_bastfn)
3595 		ms->m_asts |= cpu_to_le32(DLM_CB_BAST);
3596 	if (lkb->lkb_astfn)
3597 		ms->m_asts |= cpu_to_le32(DLM_CB_CAST);
3598 
3599 	/* compare with switch in create_message; send_remove() doesn't
3600 	   use send_args() */
3601 
3602 	switch (ms->m_type) {
3603 	case cpu_to_le32(DLM_MSG_REQUEST):
3604 	case cpu_to_le32(DLM_MSG_LOOKUP):
3605 		memcpy(ms->m_extra, r->res_name, r->res_length);
3606 		break;
3607 	case cpu_to_le32(DLM_MSG_CONVERT):
3608 	case cpu_to_le32(DLM_MSG_UNLOCK):
3609 	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3610 	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3611 	case cpu_to_le32(DLM_MSG_GRANT):
3612 		if (!lkb->lkb_lvbptr || !(lkb->lkb_exflags & DLM_LKF_VALBLK))
3613 			break;
3614 		memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3615 		break;
3616 	}
3617 }
3618 
send_common(struct dlm_rsb * r,struct dlm_lkb * lkb,int mstype)3619 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3620 {
3621 	struct dlm_message *ms;
3622 	struct dlm_mhandle *mh;
3623 	int to_nodeid, error;
3624 
3625 	to_nodeid = r->res_nodeid;
3626 
3627 	error = add_to_waiters(lkb, mstype, to_nodeid);
3628 	if (error)
3629 		return error;
3630 
3631 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3632 	if (error)
3633 		goto fail;
3634 
3635 	send_args(r, lkb, ms);
3636 
3637 	error = send_message(mh, ms, r->res_name, r->res_length);
3638 	if (error)
3639 		goto fail;
3640 	return 0;
3641 
3642  fail:
3643 	remove_from_waiters(lkb, msg_reply_type(mstype));
3644 	return error;
3645 }
3646 
send_request(struct dlm_rsb * r,struct dlm_lkb * lkb)3647 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3648 {
3649 	return send_common(r, lkb, DLM_MSG_REQUEST);
3650 }
3651 
send_convert(struct dlm_rsb * r,struct dlm_lkb * lkb)3652 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3653 {
3654 	int error;
3655 
3656 	error = send_common(r, lkb, DLM_MSG_CONVERT);
3657 
3658 	/* down conversions go without a reply from the master */
3659 	if (!error && down_conversion(lkb)) {
3660 		remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3661 		r->res_ls->ls_local_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
3662 		r->res_ls->ls_local_ms.m_result = 0;
3663 		__receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms, true);
3664 	}
3665 
3666 	return error;
3667 }
3668 
3669 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3670    MASTER_UNCERTAIN to force the next request on the rsb to confirm
3671    that the master is still correct. */
3672 
send_unlock(struct dlm_rsb * r,struct dlm_lkb * lkb)3673 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3674 {
3675 	return send_common(r, lkb, DLM_MSG_UNLOCK);
3676 }
3677 
send_cancel(struct dlm_rsb * r,struct dlm_lkb * lkb)3678 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3679 {
3680 	return send_common(r, lkb, DLM_MSG_CANCEL);
3681 }
3682 
send_grant(struct dlm_rsb * r,struct dlm_lkb * lkb)3683 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3684 {
3685 	struct dlm_message *ms;
3686 	struct dlm_mhandle *mh;
3687 	int to_nodeid, error;
3688 
3689 	to_nodeid = lkb->lkb_nodeid;
3690 
3691 	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3692 	if (error)
3693 		goto out;
3694 
3695 	send_args(r, lkb, ms);
3696 
3697 	ms->m_result = 0;
3698 
3699 	error = send_message(mh, ms, r->res_name, r->res_length);
3700  out:
3701 	return error;
3702 }
3703 
send_bast(struct dlm_rsb * r,struct dlm_lkb * lkb,int mode)3704 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3705 {
3706 	struct dlm_message *ms;
3707 	struct dlm_mhandle *mh;
3708 	int to_nodeid, error;
3709 
3710 	to_nodeid = lkb->lkb_nodeid;
3711 
3712 	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3713 	if (error)
3714 		goto out;
3715 
3716 	send_args(r, lkb, ms);
3717 
3718 	ms->m_bastmode = cpu_to_le32(mode);
3719 
3720 	error = send_message(mh, ms, r->res_name, r->res_length);
3721  out:
3722 	return error;
3723 }
3724 
send_lookup(struct dlm_rsb * r,struct dlm_lkb * lkb)3725 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3726 {
3727 	struct dlm_message *ms;
3728 	struct dlm_mhandle *mh;
3729 	int to_nodeid, error;
3730 
3731 	to_nodeid = dlm_dir_nodeid(r);
3732 
3733 	error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3734 	if (error)
3735 		return error;
3736 
3737 	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3738 	if (error)
3739 		goto fail;
3740 
3741 	send_args(r, lkb, ms);
3742 
3743 	error = send_message(mh, ms, r->res_name, r->res_length);
3744 	if (error)
3745 		goto fail;
3746 	return 0;
3747 
3748  fail:
3749 	remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3750 	return error;
3751 }
3752 
send_remove(struct dlm_rsb * r)3753 static int send_remove(struct dlm_rsb *r)
3754 {
3755 	struct dlm_message *ms;
3756 	struct dlm_mhandle *mh;
3757 	int to_nodeid, error;
3758 
3759 	to_nodeid = dlm_dir_nodeid(r);
3760 
3761 	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3762 	if (error)
3763 		goto out;
3764 
3765 	memcpy(ms->m_extra, r->res_name, r->res_length);
3766 	ms->m_hash = cpu_to_le32(r->res_hash);
3767 
3768 	error = send_message(mh, ms, r->res_name, r->res_length);
3769  out:
3770 	return error;
3771 }
3772 
send_common_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int mstype,int rv)3773 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3774 			     int mstype, int rv)
3775 {
3776 	struct dlm_message *ms;
3777 	struct dlm_mhandle *mh;
3778 	int to_nodeid, error;
3779 
3780 	to_nodeid = lkb->lkb_nodeid;
3781 
3782 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3783 	if (error)
3784 		goto out;
3785 
3786 	send_args(r, lkb, ms);
3787 
3788 	ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3789 
3790 	error = send_message(mh, ms, r->res_name, r->res_length);
3791  out:
3792 	return error;
3793 }
3794 
send_request_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)3795 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3796 {
3797 	return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3798 }
3799 
send_convert_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)3800 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3801 {
3802 	return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3803 }
3804 
send_unlock_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)3805 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3806 {
3807 	return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3808 }
3809 
send_cancel_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)3810 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3811 {
3812 	return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3813 }
3814 
send_lookup_reply(struct dlm_ls * ls,const struct dlm_message * ms_in,int ret_nodeid,int rv)3815 static int send_lookup_reply(struct dlm_ls *ls,
3816 			     const struct dlm_message *ms_in, int ret_nodeid,
3817 			     int rv)
3818 {
3819 	struct dlm_rsb *r = &ls->ls_local_rsb;
3820 	struct dlm_message *ms;
3821 	struct dlm_mhandle *mh;
3822 	int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
3823 
3824 	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3825 	if (error)
3826 		goto out;
3827 
3828 	ms->m_lkid = ms_in->m_lkid;
3829 	ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3830 	ms->m_nodeid = cpu_to_le32(ret_nodeid);
3831 
3832 	error = send_message(mh, ms, ms_in->m_extra, receive_extralen(ms_in));
3833  out:
3834 	return error;
3835 }
3836 
3837 /* which args we save from a received message depends heavily on the type
3838    of message, unlike the send side where we can safely send everything about
3839    the lkb for any type of message */
3840 
receive_flags(struct dlm_lkb * lkb,const struct dlm_message * ms)3841 static void receive_flags(struct dlm_lkb *lkb, const struct dlm_message *ms)
3842 {
3843 	lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
3844 	dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3845 	dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3846 }
3847 
receive_flags_reply(struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)3848 static void receive_flags_reply(struct dlm_lkb *lkb,
3849 				const struct dlm_message *ms,
3850 				bool local)
3851 {
3852 	if (local)
3853 		return;
3854 
3855 	dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3856 	dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3857 }
3858 
receive_extralen(const struct dlm_message * ms)3859 static int receive_extralen(const struct dlm_message *ms)
3860 {
3861 	return (le16_to_cpu(ms->m_header.h_length) -
3862 		sizeof(struct dlm_message));
3863 }
3864 
receive_lvb(struct dlm_ls * ls,struct dlm_lkb * lkb,const struct dlm_message * ms)3865 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3866 		       const struct dlm_message *ms)
3867 {
3868 	int len;
3869 
3870 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3871 		if (!lkb->lkb_lvbptr)
3872 			lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3873 		if (!lkb->lkb_lvbptr)
3874 			return -ENOMEM;
3875 		len = receive_extralen(ms);
3876 		if (len > ls->ls_lvblen)
3877 			len = ls->ls_lvblen;
3878 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3879 	}
3880 	return 0;
3881 }
3882 
fake_bastfn(void * astparam,int mode)3883 static void fake_bastfn(void *astparam, int mode)
3884 {
3885 	log_print("fake_bastfn should not be called");
3886 }
3887 
fake_astfn(void * astparam)3888 static void fake_astfn(void *astparam)
3889 {
3890 	log_print("fake_astfn should not be called");
3891 }
3892 
receive_request_args(struct dlm_ls * ls,struct dlm_lkb * lkb,const struct dlm_message * ms)3893 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3894 				const struct dlm_message *ms)
3895 {
3896 	lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3897 	lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
3898 	lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3899 	lkb->lkb_grmode = DLM_LOCK_IV;
3900 	lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3901 
3902 	lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL;
3903 	lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL;
3904 
3905 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3906 		/* lkb was just created so there won't be an lvb yet */
3907 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3908 		if (!lkb->lkb_lvbptr)
3909 			return -ENOMEM;
3910 	}
3911 
3912 	return 0;
3913 }
3914 
receive_convert_args(struct dlm_ls * ls,struct dlm_lkb * lkb,const struct dlm_message * ms)3915 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3916 				const struct dlm_message *ms)
3917 {
3918 	if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3919 		return -EBUSY;
3920 
3921 	if (receive_lvb(ls, lkb, ms))
3922 		return -ENOMEM;
3923 
3924 	lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3925 	lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
3926 
3927 	return 0;
3928 }
3929 
receive_unlock_args(struct dlm_ls * ls,struct dlm_lkb * lkb,const struct dlm_message * ms)3930 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3931 			       const struct dlm_message *ms)
3932 {
3933 	if (receive_lvb(ls, lkb, ms))
3934 		return -ENOMEM;
3935 	return 0;
3936 }
3937 
3938 /* We fill in the local-lkb fields with the info that send_xxxx_reply()
3939    uses to send a reply and that the remote end uses to process the reply. */
3940 
setup_local_lkb(struct dlm_ls * ls,const struct dlm_message * ms)3941 static void setup_local_lkb(struct dlm_ls *ls, const struct dlm_message *ms)
3942 {
3943 	struct dlm_lkb *lkb = &ls->ls_local_lkb;
3944 	lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3945 	lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3946 }
3947 
3948 /* This is called after the rsb is locked so that we can safely inspect
3949    fields in the lkb. */
3950 
validate_message(struct dlm_lkb * lkb,const struct dlm_message * ms)3951 static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
3952 {
3953 	int from = le32_to_cpu(ms->m_header.h_nodeid);
3954 	int error = 0;
3955 
3956 	/* currently mixing of user/kernel locks are not supported */
3957 	if (ms->m_flags & cpu_to_le32(BIT(DLM_DFL_USER_BIT)) &&
3958 	    !test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
3959 		log_error(lkb->lkb_resource->res_ls,
3960 			  "got user dlm message for a kernel lock");
3961 		error = -EINVAL;
3962 		goto out;
3963 	}
3964 
3965 	switch (ms->m_type) {
3966 	case cpu_to_le32(DLM_MSG_CONVERT):
3967 	case cpu_to_le32(DLM_MSG_UNLOCK):
3968 	case cpu_to_le32(DLM_MSG_CANCEL):
3969 		if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3970 			error = -EINVAL;
3971 		break;
3972 
3973 	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3974 	case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
3975 	case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
3976 	case cpu_to_le32(DLM_MSG_GRANT):
3977 	case cpu_to_le32(DLM_MSG_BAST):
3978 		if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3979 			error = -EINVAL;
3980 		break;
3981 
3982 	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3983 		if (!is_process_copy(lkb))
3984 			error = -EINVAL;
3985 		else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3986 			error = -EINVAL;
3987 		break;
3988 
3989 	default:
3990 		error = -EINVAL;
3991 	}
3992 
3993 out:
3994 	if (error)
3995 		log_error(lkb->lkb_resource->res_ls,
3996 			  "ignore invalid message %d from %d %x %x %x %d",
3997 			  le32_to_cpu(ms->m_type), from, lkb->lkb_id,
3998 			  lkb->lkb_remid, dlm_iflags_val(lkb),
3999 			  lkb->lkb_nodeid);
4000 	return error;
4001 }
4002 
receive_request(struct dlm_ls * ls,const struct dlm_message * ms)4003 static int receive_request(struct dlm_ls *ls, const struct dlm_message *ms)
4004 {
4005 	struct dlm_lkb *lkb;
4006 	struct dlm_rsb *r;
4007 	int from_nodeid;
4008 	int error, namelen = 0;
4009 
4010 	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4011 
4012 	error = create_lkb(ls, &lkb);
4013 	if (error)
4014 		goto fail;
4015 
4016 	receive_flags(lkb, ms);
4017 	set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
4018 	error = receive_request_args(ls, lkb, ms);
4019 	if (error) {
4020 		__put_lkb(ls, lkb);
4021 		goto fail;
4022 	}
4023 
4024 	/* The dir node is the authority on whether we are the master
4025 	   for this rsb or not, so if the master sends us a request, we should
4026 	   recreate the rsb if we've destroyed it.   This race happens when we
4027 	   send a remove message to the dir node at the same time that the dir
4028 	   node sends us a request for the rsb. */
4029 
4030 	namelen = receive_extralen(ms);
4031 
4032 	error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4033 			 R_RECEIVE_REQUEST, &r);
4034 	if (error) {
4035 		__put_lkb(ls, lkb);
4036 		goto fail;
4037 	}
4038 
4039 	lock_rsb(r);
4040 
4041 	if (r->res_master_nodeid != dlm_our_nodeid()) {
4042 		error = validate_master_nodeid(ls, r, from_nodeid);
4043 		if (error) {
4044 			unlock_rsb(r);
4045 			put_rsb(r);
4046 			__put_lkb(ls, lkb);
4047 			goto fail;
4048 		}
4049 	}
4050 
4051 	attach_lkb(r, lkb);
4052 	error = do_request(r, lkb);
4053 	send_request_reply(r, lkb, error);
4054 	do_request_effects(r, lkb, error);
4055 
4056 	unlock_rsb(r);
4057 	put_rsb(r);
4058 
4059 	if (error == -EINPROGRESS)
4060 		error = 0;
4061 	if (error)
4062 		dlm_put_lkb(lkb);
4063 	return 0;
4064 
4065  fail:
4066 	/* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4067 	   and do this receive_request again from process_lookup_list once
4068 	   we get the lookup reply.  This would avoid a many repeated
4069 	   ENOTBLK request failures when the lookup reply designating us
4070 	   as master is delayed. */
4071 
4072 	if (error != -ENOTBLK) {
4073 		log_limit(ls, "receive_request %x from %d %d",
4074 			  le32_to_cpu(ms->m_lkid), from_nodeid, error);
4075 	}
4076 
4077 	setup_local_lkb(ls, ms);
4078 	send_request_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4079 	return error;
4080 }
4081 
receive_convert(struct dlm_ls * ls,const struct dlm_message * ms)4082 static int receive_convert(struct dlm_ls *ls, const struct dlm_message *ms)
4083 {
4084 	struct dlm_lkb *lkb;
4085 	struct dlm_rsb *r;
4086 	int error, reply = 1;
4087 
4088 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4089 	if (error)
4090 		goto fail;
4091 
4092 	if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4093 		log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4094 			  "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4095 			  (unsigned long long)lkb->lkb_recover_seq,
4096 			  le32_to_cpu(ms->m_header.h_nodeid),
4097 			  le32_to_cpu(ms->m_lkid));
4098 		error = -ENOENT;
4099 		dlm_put_lkb(lkb);
4100 		goto fail;
4101 	}
4102 
4103 	r = lkb->lkb_resource;
4104 
4105 	hold_rsb(r);
4106 	lock_rsb(r);
4107 
4108 	error = validate_message(lkb, ms);
4109 	if (error)
4110 		goto out;
4111 
4112 	receive_flags(lkb, ms);
4113 
4114 	error = receive_convert_args(ls, lkb, ms);
4115 	if (error) {
4116 		send_convert_reply(r, lkb, error);
4117 		goto out;
4118 	}
4119 
4120 	reply = !down_conversion(lkb);
4121 
4122 	error = do_convert(r, lkb);
4123 	if (reply)
4124 		send_convert_reply(r, lkb, error);
4125 	do_convert_effects(r, lkb, error);
4126  out:
4127 	unlock_rsb(r);
4128 	put_rsb(r);
4129 	dlm_put_lkb(lkb);
4130 	return 0;
4131 
4132  fail:
4133 	setup_local_lkb(ls, ms);
4134 	send_convert_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4135 	return error;
4136 }
4137 
receive_unlock(struct dlm_ls * ls,const struct dlm_message * ms)4138 static int receive_unlock(struct dlm_ls *ls, const struct dlm_message *ms)
4139 {
4140 	struct dlm_lkb *lkb;
4141 	struct dlm_rsb *r;
4142 	int error;
4143 
4144 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4145 	if (error)
4146 		goto fail;
4147 
4148 	if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4149 		log_error(ls, "receive_unlock %x remid %x remote %d %x",
4150 			  lkb->lkb_id, lkb->lkb_remid,
4151 			  le32_to_cpu(ms->m_header.h_nodeid),
4152 			  le32_to_cpu(ms->m_lkid));
4153 		error = -ENOENT;
4154 		dlm_put_lkb(lkb);
4155 		goto fail;
4156 	}
4157 
4158 	r = lkb->lkb_resource;
4159 
4160 	hold_rsb(r);
4161 	lock_rsb(r);
4162 
4163 	error = validate_message(lkb, ms);
4164 	if (error)
4165 		goto out;
4166 
4167 	receive_flags(lkb, ms);
4168 
4169 	error = receive_unlock_args(ls, lkb, ms);
4170 	if (error) {
4171 		send_unlock_reply(r, lkb, error);
4172 		goto out;
4173 	}
4174 
4175 	error = do_unlock(r, lkb);
4176 	send_unlock_reply(r, lkb, error);
4177 	do_unlock_effects(r, lkb, error);
4178  out:
4179 	unlock_rsb(r);
4180 	put_rsb(r);
4181 	dlm_put_lkb(lkb);
4182 	return 0;
4183 
4184  fail:
4185 	setup_local_lkb(ls, ms);
4186 	send_unlock_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4187 	return error;
4188 }
4189 
receive_cancel(struct dlm_ls * ls,const struct dlm_message * ms)4190 static int receive_cancel(struct dlm_ls *ls, const struct dlm_message *ms)
4191 {
4192 	struct dlm_lkb *lkb;
4193 	struct dlm_rsb *r;
4194 	int error;
4195 
4196 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4197 	if (error)
4198 		goto fail;
4199 
4200 	receive_flags(lkb, ms);
4201 
4202 	r = lkb->lkb_resource;
4203 
4204 	hold_rsb(r);
4205 	lock_rsb(r);
4206 
4207 	error = validate_message(lkb, ms);
4208 	if (error)
4209 		goto out;
4210 
4211 	error = do_cancel(r, lkb);
4212 	send_cancel_reply(r, lkb, error);
4213 	do_cancel_effects(r, lkb, error);
4214  out:
4215 	unlock_rsb(r);
4216 	put_rsb(r);
4217 	dlm_put_lkb(lkb);
4218 	return 0;
4219 
4220  fail:
4221 	setup_local_lkb(ls, ms);
4222 	send_cancel_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4223 	return error;
4224 }
4225 
receive_grant(struct dlm_ls * ls,const struct dlm_message * ms)4226 static int receive_grant(struct dlm_ls *ls, const struct dlm_message *ms)
4227 {
4228 	struct dlm_lkb *lkb;
4229 	struct dlm_rsb *r;
4230 	int error;
4231 
4232 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4233 	if (error)
4234 		return error;
4235 
4236 	r = lkb->lkb_resource;
4237 
4238 	hold_rsb(r);
4239 	lock_rsb(r);
4240 
4241 	error = validate_message(lkb, ms);
4242 	if (error)
4243 		goto out;
4244 
4245 	receive_flags_reply(lkb, ms, false);
4246 	if (is_altmode(lkb))
4247 		munge_altmode(lkb, ms);
4248 	grant_lock_pc(r, lkb, ms);
4249 	queue_cast(r, lkb, 0);
4250  out:
4251 	unlock_rsb(r);
4252 	put_rsb(r);
4253 	dlm_put_lkb(lkb);
4254 	return 0;
4255 }
4256 
receive_bast(struct dlm_ls * ls,const struct dlm_message * ms)4257 static int receive_bast(struct dlm_ls *ls, const struct dlm_message *ms)
4258 {
4259 	struct dlm_lkb *lkb;
4260 	struct dlm_rsb *r;
4261 	int error;
4262 
4263 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4264 	if (error)
4265 		return error;
4266 
4267 	r = lkb->lkb_resource;
4268 
4269 	hold_rsb(r);
4270 	lock_rsb(r);
4271 
4272 	error = validate_message(lkb, ms);
4273 	if (error)
4274 		goto out;
4275 
4276 	queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode));
4277 	lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode);
4278  out:
4279 	unlock_rsb(r);
4280 	put_rsb(r);
4281 	dlm_put_lkb(lkb);
4282 	return 0;
4283 }
4284 
receive_lookup(struct dlm_ls * ls,const struct dlm_message * ms)4285 static void receive_lookup(struct dlm_ls *ls, const struct dlm_message *ms)
4286 {
4287 	int len, error, ret_nodeid, from_nodeid, our_nodeid;
4288 
4289 	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4290 	our_nodeid = dlm_our_nodeid();
4291 
4292 	len = receive_extralen(ms);
4293 
4294 	error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4295 				  &ret_nodeid, NULL);
4296 
4297 	/* Optimization: we're master so treat lookup as a request */
4298 	if (!error && ret_nodeid == our_nodeid) {
4299 		receive_request(ls, ms);
4300 		return;
4301 	}
4302 	send_lookup_reply(ls, ms, ret_nodeid, error);
4303 }
4304 
receive_remove(struct dlm_ls * ls,const struct dlm_message * ms)4305 static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
4306 {
4307 	char name[DLM_RESNAME_MAXLEN+1];
4308 	struct dlm_rsb *r;
4309 	int rv, len, dir_nodeid, from_nodeid;
4310 
4311 	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4312 
4313 	len = receive_extralen(ms);
4314 
4315 	if (len > DLM_RESNAME_MAXLEN) {
4316 		log_error(ls, "receive_remove from %d bad len %d",
4317 			  from_nodeid, len);
4318 		return;
4319 	}
4320 
4321 	dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash));
4322 	if (dir_nodeid != dlm_our_nodeid()) {
4323 		log_error(ls, "receive_remove from %d bad nodeid %d",
4324 			  from_nodeid, dir_nodeid);
4325 		return;
4326 	}
4327 
4328 	/*
4329 	 * Look for inactive rsb, if it's there, free it.
4330 	 * If the rsb is active, it's being used, and we should ignore this
4331 	 * message.  This is an expected race between the dir node sending a
4332 	 * request to the master node at the same time as the master node sends
4333 	 * a remove to the dir node.  The resolution to that race is for the
4334 	 * dir node to ignore the remove message, and the master node to
4335 	 * recreate the master rsb when it gets a request from the dir node for
4336 	 * an rsb it doesn't have.
4337 	 */
4338 
4339 	memset(name, 0, sizeof(name));
4340 	memcpy(name, ms->m_extra, len);
4341 
4342 	rcu_read_lock();
4343 	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
4344 	if (rv) {
4345 		rcu_read_unlock();
4346 		/* should not happen */
4347 		log_error(ls, "%s from %d not found %s", __func__,
4348 			  from_nodeid, name);
4349 		return;
4350 	}
4351 
4352 	write_lock_bh(&ls->ls_rsbtbl_lock);
4353 	if (!rsb_flag(r, RSB_HASHED)) {
4354 		rcu_read_unlock();
4355 		write_unlock_bh(&ls->ls_rsbtbl_lock);
4356 		/* should not happen */
4357 		log_error(ls, "%s from %d got removed during removal %s",
4358 			  __func__, from_nodeid, name);
4359 		return;
4360 	}
4361 	/* at this stage the rsb can only being freed here */
4362 	rcu_read_unlock();
4363 
4364 	if (!rsb_flag(r, RSB_INACTIVE)) {
4365 		if (r->res_master_nodeid != from_nodeid) {
4366 			/* should not happen */
4367 			log_error(ls, "receive_remove on active rsb from %d master %d",
4368 				  from_nodeid, r->res_master_nodeid);
4369 			dlm_print_rsb(r);
4370 			write_unlock_bh(&ls->ls_rsbtbl_lock);
4371 			return;
4372 		}
4373 
4374 		/* Ignore the remove message, see race comment above. */
4375 
4376 		log_debug(ls, "receive_remove from %d master %d first %x %s",
4377 			  from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4378 			  name);
4379 		write_unlock_bh(&ls->ls_rsbtbl_lock);
4380 		return;
4381 	}
4382 
4383 	if (r->res_master_nodeid != from_nodeid) {
4384 		log_error(ls, "receive_remove inactive from %d master %d",
4385 			  from_nodeid, r->res_master_nodeid);
4386 		dlm_print_rsb(r);
4387 		write_unlock_bh(&ls->ls_rsbtbl_lock);
4388 		return;
4389 	}
4390 
4391 	list_del(&r->res_slow_list);
4392 	rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
4393 			       dlm_rhash_rsb_params);
4394 	rsb_clear_flag(r, RSB_HASHED);
4395 	write_unlock_bh(&ls->ls_rsbtbl_lock);
4396 
4397 	free_inactive_rsb(r);
4398 }
4399 
receive_purge(struct dlm_ls * ls,const struct dlm_message * ms)4400 static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms)
4401 {
4402 	do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
4403 }
4404 
receive_request_reply(struct dlm_ls * ls,const struct dlm_message * ms)4405 static int receive_request_reply(struct dlm_ls *ls,
4406 				 const struct dlm_message *ms)
4407 {
4408 	struct dlm_lkb *lkb;
4409 	struct dlm_rsb *r;
4410 	int error, mstype, result;
4411 	int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4412 
4413 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4414 	if (error)
4415 		return error;
4416 
4417 	r = lkb->lkb_resource;
4418 	hold_rsb(r);
4419 	lock_rsb(r);
4420 
4421 	error = validate_message(lkb, ms);
4422 	if (error)
4423 		goto out;
4424 
4425 	mstype = lkb->lkb_wait_type;
4426 	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4427 	if (error) {
4428 		log_error(ls, "receive_request_reply %x remote %d %x result %d",
4429 			  lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid),
4430 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4431 		dlm_dump_rsb(r);
4432 		goto out;
4433 	}
4434 
4435 	/* Optimization: the dir node was also the master, so it took our
4436 	   lookup as a request and sent request reply instead of lookup reply */
4437 	if (mstype == DLM_MSG_LOOKUP) {
4438 		r->res_master_nodeid = from_nodeid;
4439 		r->res_nodeid = from_nodeid;
4440 		lkb->lkb_nodeid = from_nodeid;
4441 	}
4442 
4443 	/* this is the value returned from do_request() on the master */
4444 	result = from_dlm_errno(le32_to_cpu(ms->m_result));
4445 
4446 	switch (result) {
4447 	case -EAGAIN:
4448 		/* request would block (be queued) on remote master */
4449 		queue_cast(r, lkb, -EAGAIN);
4450 		confirm_master(r, -EAGAIN);
4451 		unhold_lkb(lkb); /* undoes create_lkb() */
4452 		break;
4453 
4454 	case -EINPROGRESS:
4455 	case 0:
4456 		/* request was queued or granted on remote master */
4457 		receive_flags_reply(lkb, ms, false);
4458 		lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
4459 		if (is_altmode(lkb))
4460 			munge_altmode(lkb, ms);
4461 		if (result) {
4462 			add_lkb(r, lkb, DLM_LKSTS_WAITING);
4463 		} else {
4464 			grant_lock_pc(r, lkb, ms);
4465 			queue_cast(r, lkb, 0);
4466 		}
4467 		confirm_master(r, result);
4468 		break;
4469 
4470 	case -EBADR:
4471 	case -ENOTBLK:
4472 		/* find_rsb failed to find rsb or rsb wasn't master */
4473 		log_limit(ls, "receive_request_reply %x from %d %d "
4474 			  "master %d dir %d first %x %s", lkb->lkb_id,
4475 			  from_nodeid, result, r->res_master_nodeid,
4476 			  r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4477 
4478 		if (r->res_dir_nodeid != dlm_our_nodeid() &&
4479 		    r->res_master_nodeid != dlm_our_nodeid()) {
4480 			/* cause _request_lock->set_master->send_lookup */
4481 			r->res_master_nodeid = 0;
4482 			r->res_nodeid = -1;
4483 			lkb->lkb_nodeid = -1;
4484 		}
4485 
4486 		if (is_overlap(lkb)) {
4487 			/* we'll ignore error in cancel/unlock reply */
4488 			queue_cast_overlap(r, lkb);
4489 			confirm_master(r, result);
4490 			unhold_lkb(lkb); /* undoes create_lkb() */
4491 		} else {
4492 			_request_lock(r, lkb);
4493 
4494 			if (r->res_master_nodeid == dlm_our_nodeid())
4495 				confirm_master(r, 0);
4496 		}
4497 		break;
4498 
4499 	default:
4500 		log_error(ls, "receive_request_reply %x error %d",
4501 			  lkb->lkb_id, result);
4502 	}
4503 
4504 	if ((result == 0 || result == -EINPROGRESS) &&
4505 	    test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
4506 		log_debug(ls, "receive_request_reply %x result %d unlock",
4507 			  lkb->lkb_id, result);
4508 		clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4509 		send_unlock(r, lkb);
4510 	} else if ((result == -EINPROGRESS) &&
4511 		   test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
4512 				      &lkb->lkb_iflags)) {
4513 		log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4514 		clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4515 		send_cancel(r, lkb);
4516 	} else {
4517 		clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4518 		clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4519 	}
4520  out:
4521 	unlock_rsb(r);
4522 	put_rsb(r);
4523 	dlm_put_lkb(lkb);
4524 	return 0;
4525 }
4526 
__receive_convert_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)4527 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4528 				    const struct dlm_message *ms, bool local)
4529 {
4530 	/* this is the value returned from do_convert() on the master */
4531 	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4532 	case -EAGAIN:
4533 		/* convert would block (be queued) on remote master */
4534 		queue_cast(r, lkb, -EAGAIN);
4535 		break;
4536 
4537 	case -EDEADLK:
4538 		receive_flags_reply(lkb, ms, local);
4539 		revert_lock_pc(r, lkb);
4540 		queue_cast(r, lkb, -EDEADLK);
4541 		break;
4542 
4543 	case -EINPROGRESS:
4544 		/* convert was queued on remote master */
4545 		receive_flags_reply(lkb, ms, local);
4546 		if (is_demoted(lkb))
4547 			munge_demoted(lkb);
4548 		del_lkb(r, lkb);
4549 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4550 		break;
4551 
4552 	case 0:
4553 		/* convert was granted on remote master */
4554 		receive_flags_reply(lkb, ms, local);
4555 		if (is_demoted(lkb))
4556 			munge_demoted(lkb);
4557 		grant_lock_pc(r, lkb, ms);
4558 		queue_cast(r, lkb, 0);
4559 		break;
4560 
4561 	default:
4562 		log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4563 			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4564 			  le32_to_cpu(ms->m_lkid),
4565 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4566 		dlm_print_rsb(r);
4567 		dlm_print_lkb(lkb);
4568 	}
4569 }
4570 
_receive_convert_reply(struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)4571 static void _receive_convert_reply(struct dlm_lkb *lkb,
4572 				   const struct dlm_message *ms, bool local)
4573 {
4574 	struct dlm_rsb *r = lkb->lkb_resource;
4575 	int error;
4576 
4577 	hold_rsb(r);
4578 	lock_rsb(r);
4579 
4580 	error = validate_message(lkb, ms);
4581 	if (error)
4582 		goto out;
4583 
4584 	error = remove_from_waiters_ms(lkb, ms, local);
4585 	if (error)
4586 		goto out;
4587 
4588 	__receive_convert_reply(r, lkb, ms, local);
4589  out:
4590 	unlock_rsb(r);
4591 	put_rsb(r);
4592 }
4593 
receive_convert_reply(struct dlm_ls * ls,const struct dlm_message * ms)4594 static int receive_convert_reply(struct dlm_ls *ls,
4595 				 const struct dlm_message *ms)
4596 {
4597 	struct dlm_lkb *lkb;
4598 	int error;
4599 
4600 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4601 	if (error)
4602 		return error;
4603 
4604 	_receive_convert_reply(lkb, ms, false);
4605 	dlm_put_lkb(lkb);
4606 	return 0;
4607 }
4608 
_receive_unlock_reply(struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)4609 static void _receive_unlock_reply(struct dlm_lkb *lkb,
4610 				  const struct dlm_message *ms, bool local)
4611 {
4612 	struct dlm_rsb *r = lkb->lkb_resource;
4613 	int error;
4614 
4615 	hold_rsb(r);
4616 	lock_rsb(r);
4617 
4618 	error = validate_message(lkb, ms);
4619 	if (error)
4620 		goto out;
4621 
4622 	error = remove_from_waiters_ms(lkb, ms, local);
4623 	if (error)
4624 		goto out;
4625 
4626 	/* this is the value returned from do_unlock() on the master */
4627 
4628 	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4629 	case -DLM_EUNLOCK:
4630 		receive_flags_reply(lkb, ms, local);
4631 		remove_lock_pc(r, lkb);
4632 		queue_cast(r, lkb, -DLM_EUNLOCK);
4633 		break;
4634 	case -ENOENT:
4635 		break;
4636 	default:
4637 		log_error(r->res_ls, "receive_unlock_reply %x error %d",
4638 			  lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result)));
4639 	}
4640  out:
4641 	unlock_rsb(r);
4642 	put_rsb(r);
4643 }
4644 
receive_unlock_reply(struct dlm_ls * ls,const struct dlm_message * ms)4645 static int receive_unlock_reply(struct dlm_ls *ls,
4646 				const struct dlm_message *ms)
4647 {
4648 	struct dlm_lkb *lkb;
4649 	int error;
4650 
4651 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4652 	if (error)
4653 		return error;
4654 
4655 	_receive_unlock_reply(lkb, ms, false);
4656 	dlm_put_lkb(lkb);
4657 	return 0;
4658 }
4659 
_receive_cancel_reply(struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)4660 static void _receive_cancel_reply(struct dlm_lkb *lkb,
4661 				  const struct dlm_message *ms, bool local)
4662 {
4663 	struct dlm_rsb *r = lkb->lkb_resource;
4664 	int error;
4665 
4666 	hold_rsb(r);
4667 	lock_rsb(r);
4668 
4669 	error = validate_message(lkb, ms);
4670 	if (error)
4671 		goto out;
4672 
4673 	error = remove_from_waiters_ms(lkb, ms, local);
4674 	if (error)
4675 		goto out;
4676 
4677 	/* this is the value returned from do_cancel() on the master */
4678 
4679 	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4680 	case -DLM_ECANCEL:
4681 		receive_flags_reply(lkb, ms, local);
4682 		revert_lock_pc(r, lkb);
4683 		queue_cast(r, lkb, -DLM_ECANCEL);
4684 		break;
4685 	case 0:
4686 		break;
4687 	default:
4688 		log_error(r->res_ls, "receive_cancel_reply %x error %d",
4689 			  lkb->lkb_id,
4690 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4691 	}
4692  out:
4693 	unlock_rsb(r);
4694 	put_rsb(r);
4695 }
4696 
receive_cancel_reply(struct dlm_ls * ls,const struct dlm_message * ms)4697 static int receive_cancel_reply(struct dlm_ls *ls,
4698 				const struct dlm_message *ms)
4699 {
4700 	struct dlm_lkb *lkb;
4701 	int error;
4702 
4703 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4704 	if (error)
4705 		return error;
4706 
4707 	_receive_cancel_reply(lkb, ms, false);
4708 	dlm_put_lkb(lkb);
4709 	return 0;
4710 }
4711 
receive_lookup_reply(struct dlm_ls * ls,const struct dlm_message * ms)4712 static void receive_lookup_reply(struct dlm_ls *ls,
4713 				 const struct dlm_message *ms)
4714 {
4715 	struct dlm_lkb *lkb;
4716 	struct dlm_rsb *r;
4717 	int error, ret_nodeid;
4718 	int do_lookup_list = 0;
4719 
4720 	error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
4721 	if (error) {
4722 		log_error(ls, "%s no lkid %x", __func__,
4723 			  le32_to_cpu(ms->m_lkid));
4724 		return;
4725 	}
4726 
4727 	/* ms->m_result is the value returned by dlm_master_lookup on dir node
4728 	   FIXME: will a non-zero error ever be returned? */
4729 
4730 	r = lkb->lkb_resource;
4731 	hold_rsb(r);
4732 	lock_rsb(r);
4733 
4734 	error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4735 	if (error)
4736 		goto out;
4737 
4738 	ret_nodeid = le32_to_cpu(ms->m_nodeid);
4739 
4740 	/* We sometimes receive a request from the dir node for this
4741 	   rsb before we've received the dir node's loookup_reply for it.
4742 	   The request from the dir node implies we're the master, so we set
4743 	   ourself as master in receive_request_reply, and verify here that
4744 	   we are indeed the master. */
4745 
4746 	if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4747 		/* This should never happen */
4748 		log_error(ls, "receive_lookup_reply %x from %d ret %d "
4749 			  "master %d dir %d our %d first %x %s",
4750 			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4751 			  ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
4752 			  dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4753 	}
4754 
4755 	if (ret_nodeid == dlm_our_nodeid()) {
4756 		r->res_master_nodeid = ret_nodeid;
4757 		r->res_nodeid = 0;
4758 		do_lookup_list = 1;
4759 		r->res_first_lkid = 0;
4760 	} else if (ret_nodeid == -1) {
4761 		/* the remote node doesn't believe it's the dir node */
4762 		log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4763 			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
4764 		r->res_master_nodeid = 0;
4765 		r->res_nodeid = -1;
4766 		lkb->lkb_nodeid = -1;
4767 	} else {
4768 		/* set_master() will set lkb_nodeid from r */
4769 		r->res_master_nodeid = ret_nodeid;
4770 		r->res_nodeid = ret_nodeid;
4771 	}
4772 
4773 	if (is_overlap(lkb)) {
4774 		log_debug(ls, "receive_lookup_reply %x unlock %x",
4775 			  lkb->lkb_id, dlm_iflags_val(lkb));
4776 		queue_cast_overlap(r, lkb);
4777 		unhold_lkb(lkb); /* undoes create_lkb() */
4778 		goto out_list;
4779 	}
4780 
4781 	_request_lock(r, lkb);
4782 
4783  out_list:
4784 	if (do_lookup_list)
4785 		process_lookup_list(r);
4786  out:
4787 	unlock_rsb(r);
4788 	put_rsb(r);
4789 	dlm_put_lkb(lkb);
4790 }
4791 
_receive_message(struct dlm_ls * ls,const struct dlm_message * ms,uint32_t saved_seq)4792 static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4793 			     uint32_t saved_seq)
4794 {
4795 	int error = 0, noent = 0;
4796 
4797 	if (WARN_ON_ONCE(!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid)))) {
4798 		log_limit(ls, "receive %d from non-member %d %x %x %d",
4799 			  le32_to_cpu(ms->m_type),
4800 			  le32_to_cpu(ms->m_header.h_nodeid),
4801 			  le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4802 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4803 		return;
4804 	}
4805 
4806 	switch (ms->m_type) {
4807 
4808 	/* messages sent to a master node */
4809 
4810 	case cpu_to_le32(DLM_MSG_REQUEST):
4811 		error = receive_request(ls, ms);
4812 		break;
4813 
4814 	case cpu_to_le32(DLM_MSG_CONVERT):
4815 		error = receive_convert(ls, ms);
4816 		break;
4817 
4818 	case cpu_to_le32(DLM_MSG_UNLOCK):
4819 		error = receive_unlock(ls, ms);
4820 		break;
4821 
4822 	case cpu_to_le32(DLM_MSG_CANCEL):
4823 		noent = 1;
4824 		error = receive_cancel(ls, ms);
4825 		break;
4826 
4827 	/* messages sent from a master node (replies to above) */
4828 
4829 	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
4830 		error = receive_request_reply(ls, ms);
4831 		break;
4832 
4833 	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
4834 		error = receive_convert_reply(ls, ms);
4835 		break;
4836 
4837 	case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
4838 		error = receive_unlock_reply(ls, ms);
4839 		break;
4840 
4841 	case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
4842 		error = receive_cancel_reply(ls, ms);
4843 		break;
4844 
4845 	/* messages sent from a master node (only two types of async msg) */
4846 
4847 	case cpu_to_le32(DLM_MSG_GRANT):
4848 		noent = 1;
4849 		error = receive_grant(ls, ms);
4850 		break;
4851 
4852 	case cpu_to_le32(DLM_MSG_BAST):
4853 		noent = 1;
4854 		error = receive_bast(ls, ms);
4855 		break;
4856 
4857 	/* messages sent to a dir node */
4858 
4859 	case cpu_to_le32(DLM_MSG_LOOKUP):
4860 		receive_lookup(ls, ms);
4861 		break;
4862 
4863 	case cpu_to_le32(DLM_MSG_REMOVE):
4864 		receive_remove(ls, ms);
4865 		break;
4866 
4867 	/* messages sent from a dir node (remove has no reply) */
4868 
4869 	case cpu_to_le32(DLM_MSG_LOOKUP_REPLY):
4870 		receive_lookup_reply(ls, ms);
4871 		break;
4872 
4873 	/* other messages */
4874 
4875 	case cpu_to_le32(DLM_MSG_PURGE):
4876 		receive_purge(ls, ms);
4877 		break;
4878 
4879 	default:
4880 		log_error(ls, "unknown message type %d",
4881 			  le32_to_cpu(ms->m_type));
4882 	}
4883 
4884 	/*
4885 	 * When checking for ENOENT, we're checking the result of
4886 	 * find_lkb(m_remid):
4887 	 *
4888 	 * The lock id referenced in the message wasn't found.  This may
4889 	 * happen in normal usage for the async messages and cancel, so
4890 	 * only use log_debug for them.
4891 	 *
4892 	 * Some errors are expected and normal.
4893 	 */
4894 
4895 	if (error == -ENOENT && noent) {
4896 		log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4897 			  le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4898 			  le32_to_cpu(ms->m_header.h_nodeid),
4899 			  le32_to_cpu(ms->m_lkid), saved_seq);
4900 	} else if (error == -ENOENT) {
4901 		log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4902 			  le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4903 			  le32_to_cpu(ms->m_header.h_nodeid),
4904 			  le32_to_cpu(ms->m_lkid), saved_seq);
4905 
4906 		if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT))
4907 			dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash));
4908 	}
4909 
4910 	if (error == -EINVAL) {
4911 		log_error(ls, "receive %d inval from %d lkid %x remid %x "
4912 			  "saved_seq %u",
4913 			  le32_to_cpu(ms->m_type),
4914 			  le32_to_cpu(ms->m_header.h_nodeid),
4915 			  le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4916 			  saved_seq);
4917 	}
4918 }
4919 
4920 /* If the lockspace is in recovery mode (locking stopped), then normal
4921    messages are saved on the requestqueue for processing after recovery is
4922    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4923    messages off the requestqueue before we process new ones. This occurs right
4924    after recovery completes when we transition from saving all messages on
4925    requestqueue, to processing all the saved messages, to processing new
4926    messages as they arrive. */
4927 
dlm_receive_message(struct dlm_ls * ls,const struct dlm_message * ms,int nodeid)4928 static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4929 				int nodeid)
4930 {
4931 try_again:
4932 	read_lock_bh(&ls->ls_requestqueue_lock);
4933 	if (test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4934 		/* If we were a member of this lockspace, left, and rejoined,
4935 		   other nodes may still be sending us messages from the
4936 		   lockspace generation before we left. */
4937 		if (WARN_ON_ONCE(!ls->ls_generation)) {
4938 			read_unlock_bh(&ls->ls_requestqueue_lock);
4939 			log_limit(ls, "receive %d from %d ignore old gen",
4940 				  le32_to_cpu(ms->m_type), nodeid);
4941 			return;
4942 		}
4943 
4944 		read_unlock_bh(&ls->ls_requestqueue_lock);
4945 		write_lock_bh(&ls->ls_requestqueue_lock);
4946 		/* recheck because we hold writelock now */
4947 		if (!test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4948 			write_unlock_bh(&ls->ls_requestqueue_lock);
4949 			goto try_again;
4950 		}
4951 
4952 		dlm_add_requestqueue(ls, nodeid, ms);
4953 		write_unlock_bh(&ls->ls_requestqueue_lock);
4954 	} else {
4955 		_receive_message(ls, ms, 0);
4956 		read_unlock_bh(&ls->ls_requestqueue_lock);
4957 	}
4958 }
4959 
4960 /* This is called by dlm_recoverd to process messages that were saved on
4961    the requestqueue. */
4962 
dlm_receive_message_saved(struct dlm_ls * ls,const struct dlm_message * ms,uint32_t saved_seq)4963 void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms,
4964 			       uint32_t saved_seq)
4965 {
4966 	_receive_message(ls, ms, saved_seq);
4967 }
4968 
4969 /* This is called by the midcomms layer when something is received for
4970    the lockspace.  It could be either a MSG (normal message sent as part of
4971    standard locking activity) or an RCOM (recovery message sent as part of
4972    lockspace recovery). */
4973 
dlm_receive_buffer(const union dlm_packet * p,int nodeid)4974 void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
4975 {
4976 	const struct dlm_header *hd = &p->header;
4977 	struct dlm_ls *ls;
4978 	int type = 0;
4979 
4980 	switch (hd->h_cmd) {
4981 	case DLM_MSG:
4982 		type = le32_to_cpu(p->message.m_type);
4983 		break;
4984 	case DLM_RCOM:
4985 		type = le32_to_cpu(p->rcom.rc_type);
4986 		break;
4987 	default:
4988 		log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4989 		return;
4990 	}
4991 
4992 	if (le32_to_cpu(hd->h_nodeid) != nodeid) {
4993 		log_print("invalid h_nodeid %d from %d lockspace %x",
4994 			  le32_to_cpu(hd->h_nodeid), nodeid,
4995 			  le32_to_cpu(hd->u.h_lockspace));
4996 		return;
4997 	}
4998 
4999 	ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace));
5000 	if (!ls) {
5001 		if (dlm_config.ci_log_debug) {
5002 			printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
5003 				"%u from %d cmd %d type %d\n",
5004 				le32_to_cpu(hd->u.h_lockspace), nodeid,
5005 				hd->h_cmd, type);
5006 		}
5007 
5008 		if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
5009 			dlm_send_ls_not_ready(nodeid, &p->rcom);
5010 		return;
5011 	}
5012 
5013 	/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
5014 	   be inactive (in this ls) before transitioning to recovery mode */
5015 
5016 	read_lock_bh(&ls->ls_recv_active);
5017 	if (hd->h_cmd == DLM_MSG)
5018 		dlm_receive_message(ls, &p->message, nodeid);
5019 	else if (hd->h_cmd == DLM_RCOM)
5020 		dlm_receive_rcom(ls, &p->rcom, nodeid);
5021 	else
5022 		log_error(ls, "invalid h_cmd %d from %d lockspace %x",
5023 			  hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace));
5024 	read_unlock_bh(&ls->ls_recv_active);
5025 
5026 	dlm_put_lockspace(ls);
5027 }
5028 
recover_convert_waiter(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_message * ms_local)5029 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
5030 				   struct dlm_message *ms_local)
5031 {
5032 	if (middle_conversion(lkb)) {
5033 		hold_lkb(lkb);
5034 		memset(ms_local, 0, sizeof(struct dlm_message));
5035 		ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
5036 		ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
5037 		ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5038 		_receive_convert_reply(lkb, ms_local, true);
5039 
5040 		/* Same special case as in receive_rcom_lock_args() */
5041 		lkb->lkb_grmode = DLM_LOCK_IV;
5042 		rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
5043 		unhold_lkb(lkb);
5044 
5045 	} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
5046 		set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5047 	}
5048 
5049 	/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
5050 	   conversions are async; there's no reply from the remote master */
5051 }
5052 
5053 /* A waiting lkb needs recovery if the master node has failed, or
5054    the master node is changing (only when no directory is used) */
5055 
waiter_needs_recovery(struct dlm_ls * ls,struct dlm_lkb * lkb,int dir_nodeid)5056 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5057 				 int dir_nodeid)
5058 {
5059 	if (dlm_no_directory(ls))
5060 		return 1;
5061 
5062 	if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5063 		return 1;
5064 
5065 	return 0;
5066 }
5067 
5068 /* Recovery for locks that are waiting for replies from nodes that are now
5069    gone.  We can just complete unlocks and cancels by faking a reply from the
5070    dead node.  Requests and up-conversions we flag to be resent after
5071    recovery.  Down-conversions can just be completed with a fake reply like
5072    unlocks.  Conversions between PR and CW need special attention. */
5073 
dlm_recover_waiters_pre(struct dlm_ls * ls)5074 void dlm_recover_waiters_pre(struct dlm_ls *ls)
5075 {
5076 	struct dlm_lkb *lkb, *safe;
5077 	struct dlm_message *ms_local;
5078 	int wait_type, local_unlock_result, local_cancel_result;
5079 	int dir_nodeid;
5080 
5081 	ms_local = kmalloc(sizeof(*ms_local), GFP_KERNEL);
5082 	if (!ms_local)
5083 		return;
5084 
5085 	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5086 
5087 		dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5088 
5089 		/* exclude debug messages about unlocks because there can be so
5090 		   many and they aren't very interesting */
5091 
5092 		if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5093 			log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5094 				  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5095 				  lkb->lkb_id,
5096 				  lkb->lkb_remid,
5097 				  lkb->lkb_wait_type,
5098 				  lkb->lkb_resource->res_nodeid,
5099 				  lkb->lkb_nodeid,
5100 				  lkb->lkb_wait_nodeid,
5101 				  dir_nodeid);
5102 		}
5103 
5104 		/* all outstanding lookups, regardless of destination  will be
5105 		   resent after recovery is done */
5106 
5107 		if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5108 			set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5109 			continue;
5110 		}
5111 
5112 		if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5113 			continue;
5114 
5115 		wait_type = lkb->lkb_wait_type;
5116 		local_unlock_result = -DLM_EUNLOCK;
5117 		local_cancel_result = -DLM_ECANCEL;
5118 
5119 		/* Main reply may have been received leaving a zero wait_type,
5120 		   but a reply for the overlapping op may not have been
5121 		   received.  In that case we need to fake the appropriate
5122 		   reply for the overlap op. */
5123 
5124 		if (!wait_type) {
5125 			if (is_overlap_cancel(lkb)) {
5126 				wait_type = DLM_MSG_CANCEL;
5127 				if (lkb->lkb_grmode == DLM_LOCK_IV)
5128 					local_cancel_result = 0;
5129 			}
5130 			if (is_overlap_unlock(lkb)) {
5131 				wait_type = DLM_MSG_UNLOCK;
5132 				if (lkb->lkb_grmode == DLM_LOCK_IV)
5133 					local_unlock_result = -ENOENT;
5134 			}
5135 
5136 			log_debug(ls, "rwpre overlap %x %x %d %d %d",
5137 				  lkb->lkb_id, dlm_iflags_val(lkb), wait_type,
5138 				  local_cancel_result, local_unlock_result);
5139 		}
5140 
5141 		switch (wait_type) {
5142 
5143 		case DLM_MSG_REQUEST:
5144 			set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5145 			break;
5146 
5147 		case DLM_MSG_CONVERT:
5148 			recover_convert_waiter(ls, lkb, ms_local);
5149 			break;
5150 
5151 		case DLM_MSG_UNLOCK:
5152 			hold_lkb(lkb);
5153 			memset(ms_local, 0, sizeof(struct dlm_message));
5154 			ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
5155 			ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result));
5156 			ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5157 			_receive_unlock_reply(lkb, ms_local, true);
5158 			dlm_put_lkb(lkb);
5159 			break;
5160 
5161 		case DLM_MSG_CANCEL:
5162 			hold_lkb(lkb);
5163 			memset(ms_local, 0, sizeof(struct dlm_message));
5164 			ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
5165 			ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result));
5166 			ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5167 			_receive_cancel_reply(lkb, ms_local, true);
5168 			dlm_put_lkb(lkb);
5169 			break;
5170 
5171 		default:
5172 			log_error(ls, "invalid lkb wait_type %d %d",
5173 				  lkb->lkb_wait_type, wait_type);
5174 		}
5175 		schedule();
5176 	}
5177 	kfree(ms_local);
5178 }
5179 
find_resend_waiter(struct dlm_ls * ls)5180 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5181 {
5182 	struct dlm_lkb *lkb = NULL, *iter;
5183 
5184 	spin_lock_bh(&ls->ls_waiters_lock);
5185 	list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
5186 		if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
5187 			hold_lkb(iter);
5188 			lkb = iter;
5189 			break;
5190 		}
5191 	}
5192 	spin_unlock_bh(&ls->ls_waiters_lock);
5193 
5194 	return lkb;
5195 }
5196 
5197 /*
5198  * Forced state reset for locks that were in the middle of remote operations
5199  * when recovery happened (i.e. lkbs that were on the waiters list, waiting
5200  * for a reply from a remote operation.)  The lkbs remaining on the waiters
5201  * list need to be reevaluated; some may need resending to a different node
5202  * than previously, and some may now need local handling rather than remote.
5203  *
5204  * First, the lkb state for the voided remote operation is forcibly reset,
5205  * equivalent to what remove_from_waiters() would normally do:
5206  * . lkb removed from ls_waiters list
5207  * . lkb wait_type cleared
5208  * . lkb waiters_count cleared
5209  * . lkb ref count decremented for each waiters_count (almost always 1,
5210  *   but possibly 2 in case of cancel/unlock overlapping, which means
5211  *   two remote replies were being expected for the lkb.)
5212  *
5213  * Second, the lkb is reprocessed like an original operation would be,
5214  * by passing it to _request_lock or _convert_lock, which will either
5215  * process the lkb operation locally, or send it to a remote node again
5216  * and put the lkb back onto the waiters list.
5217  *
5218  * When reprocessing the lkb, we may find that it's flagged for an overlapping
5219  * force-unlock or cancel, either from before recovery began, or after recovery
5220  * finished.  If this is the case, the unlock/cancel is done directly, and the
5221  * original operation is not initiated again (no _request_lock/_convert_lock.)
5222  */
5223 
dlm_recover_waiters_post(struct dlm_ls * ls)5224 int dlm_recover_waiters_post(struct dlm_ls *ls)
5225 {
5226 	struct dlm_lkb *lkb;
5227 	struct dlm_rsb *r;
5228 	int error = 0, mstype, err, oc, ou;
5229 
5230 	while (1) {
5231 		if (dlm_locking_stopped(ls)) {
5232 			log_debug(ls, "recover_waiters_post aborted");
5233 			error = -EINTR;
5234 			break;
5235 		}
5236 
5237 		/*
5238 		 * Find an lkb from the waiters list that's been affected by
5239 		 * recovery node changes, and needs to be reprocessed.  Does
5240 		 * hold_lkb(), adding a refcount.
5241 		 */
5242 		lkb = find_resend_waiter(ls);
5243 		if (!lkb)
5244 			break;
5245 
5246 		r = lkb->lkb_resource;
5247 		hold_rsb(r);
5248 		lock_rsb(r);
5249 
5250 		/*
5251 		 * If the lkb has been flagged for a force unlock or cancel,
5252 		 * then the reprocessing below will be replaced by just doing
5253 		 * the unlock/cancel directly.
5254 		 */
5255 		mstype = lkb->lkb_wait_type;
5256 		oc = test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
5257 					&lkb->lkb_iflags);
5258 		ou = test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT,
5259 					&lkb->lkb_iflags);
5260 		err = 0;
5261 
5262 		log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5263 			  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5264 			  "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5265 			  r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5266 			  dlm_dir_nodeid(r), oc, ou);
5267 
5268 		/*
5269 		 * No reply to the pre-recovery operation will now be received,
5270 		 * so a forced equivalent of remove_from_waiters() is needed to
5271 		 * reset the waiters state that was in place before recovery.
5272 		 */
5273 
5274 		clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5275 
5276 		/* Forcibly clear wait_type */
5277 		lkb->lkb_wait_type = 0;
5278 
5279 		/*
5280 		 * Forcibly reset wait_count and associated refcount.  The
5281 		 * wait_count will almost always be 1, but in case of an
5282 		 * overlapping unlock/cancel it could be 2: see where
5283 		 * add_to_waiters() finds the lkb is already on the waiters
5284 		 * list and does lkb_wait_count++; hold_lkb().
5285 		 */
5286 		while (lkb->lkb_wait_count) {
5287 			lkb->lkb_wait_count--;
5288 			unhold_lkb(lkb);
5289 		}
5290 
5291 		/* Forcibly remove from waiters list */
5292 		spin_lock_bh(&ls->ls_waiters_lock);
5293 		list_del_init(&lkb->lkb_wait_reply);
5294 		spin_unlock_bh(&ls->ls_waiters_lock);
5295 
5296 		/*
5297 		 * The lkb is now clear of all prior waiters state and can be
5298 		 * processed locally, or sent to remote node again, or directly
5299 		 * cancelled/unlocked.
5300 		 */
5301 
5302 		if (oc || ou) {
5303 			/* do an unlock or cancel instead of resending */
5304 			switch (mstype) {
5305 			case DLM_MSG_LOOKUP:
5306 			case DLM_MSG_REQUEST:
5307 				queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5308 							-DLM_ECANCEL);
5309 				unhold_lkb(lkb); /* undoes create_lkb() */
5310 				break;
5311 			case DLM_MSG_CONVERT:
5312 				if (oc) {
5313 					queue_cast(r, lkb, -DLM_ECANCEL);
5314 				} else {
5315 					lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5316 					_unlock_lock(r, lkb);
5317 				}
5318 				break;
5319 			default:
5320 				err = 1;
5321 			}
5322 		} else {
5323 			switch (mstype) {
5324 			case DLM_MSG_LOOKUP:
5325 			case DLM_MSG_REQUEST:
5326 				_request_lock(r, lkb);
5327 				if (r->res_nodeid != -1 && is_master(r))
5328 					confirm_master(r, 0);
5329 				break;
5330 			case DLM_MSG_CONVERT:
5331 				_convert_lock(r, lkb);
5332 				break;
5333 			default:
5334 				err = 1;
5335 			}
5336 		}
5337 
5338 		if (err) {
5339 			log_error(ls, "waiter %x msg %d r_nodeid %d "
5340 				  "dir_nodeid %d overlap %d %d",
5341 				  lkb->lkb_id, mstype, r->res_nodeid,
5342 				  dlm_dir_nodeid(r), oc, ou);
5343 		}
5344 		unlock_rsb(r);
5345 		put_rsb(r);
5346 		dlm_put_lkb(lkb);
5347 	}
5348 
5349 	return error;
5350 }
5351 
purge_mstcpy_list(struct dlm_ls * ls,struct dlm_rsb * r,struct list_head * list)5352 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5353 			      struct list_head *list)
5354 {
5355 	struct dlm_lkb *lkb, *safe;
5356 
5357 	list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5358 		if (!is_master_copy(lkb))
5359 			continue;
5360 
5361 		/* don't purge lkbs we've added in recover_master_copy for
5362 		   the current recovery seq */
5363 
5364 		if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5365 			continue;
5366 
5367 		del_lkb(r, lkb);
5368 
5369 		/* this put should free the lkb */
5370 		if (!dlm_put_lkb(lkb))
5371 			log_error(ls, "purged mstcpy lkb not released");
5372 	}
5373 }
5374 
dlm_purge_mstcpy_locks(struct dlm_rsb * r)5375 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5376 {
5377 	struct dlm_ls *ls = r->res_ls;
5378 
5379 	purge_mstcpy_list(ls, r, &r->res_grantqueue);
5380 	purge_mstcpy_list(ls, r, &r->res_convertqueue);
5381 	purge_mstcpy_list(ls, r, &r->res_waitqueue);
5382 }
5383 
purge_dead_list(struct dlm_ls * ls,struct dlm_rsb * r,struct list_head * list,int nodeid_gone,unsigned int * count)5384 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5385 			    struct list_head *list,
5386 			    int nodeid_gone, unsigned int *count)
5387 {
5388 	struct dlm_lkb *lkb, *safe;
5389 
5390 	list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5391 		if (!is_master_copy(lkb))
5392 			continue;
5393 
5394 		if ((lkb->lkb_nodeid == nodeid_gone) ||
5395 		    dlm_is_removed(ls, lkb->lkb_nodeid)) {
5396 
5397 			/* tell recover_lvb to invalidate the lvb
5398 			   because a node holding EX/PW failed */
5399 			if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5400 			    (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5401 				rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5402 			}
5403 
5404 			del_lkb(r, lkb);
5405 
5406 			/* this put should free the lkb */
5407 			if (!dlm_put_lkb(lkb))
5408 				log_error(ls, "purged dead lkb not released");
5409 
5410 			rsb_set_flag(r, RSB_RECOVER_GRANT);
5411 
5412 			(*count)++;
5413 		}
5414 	}
5415 }
5416 
5417 /* Get rid of locks held by nodes that are gone. */
5418 
dlm_recover_purge(struct dlm_ls * ls,const struct list_head * root_list)5419 void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list)
5420 {
5421 	struct dlm_rsb *r;
5422 	struct dlm_member *memb;
5423 	int nodes_count = 0;
5424 	int nodeid_gone = 0;
5425 	unsigned int lkb_count = 0;
5426 
5427 	/* cache one removed nodeid to optimize the common
5428 	   case of a single node removed */
5429 
5430 	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5431 		nodes_count++;
5432 		nodeid_gone = memb->nodeid;
5433 	}
5434 
5435 	if (!nodes_count)
5436 		return;
5437 
5438 	list_for_each_entry(r, root_list, res_root_list) {
5439 		lock_rsb(r);
5440 		if (r->res_nodeid != -1 && is_master(r)) {
5441 			purge_dead_list(ls, r, &r->res_grantqueue,
5442 					nodeid_gone, &lkb_count);
5443 			purge_dead_list(ls, r, &r->res_convertqueue,
5444 					nodeid_gone, &lkb_count);
5445 			purge_dead_list(ls, r, &r->res_waitqueue,
5446 					nodeid_gone, &lkb_count);
5447 		}
5448 		unlock_rsb(r);
5449 
5450 		cond_resched();
5451 	}
5452 
5453 	if (lkb_count)
5454 		log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5455 			  lkb_count, nodes_count);
5456 }
5457 
find_grant_rsb(struct dlm_ls * ls)5458 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls)
5459 {
5460 	struct dlm_rsb *r;
5461 
5462 	read_lock_bh(&ls->ls_rsbtbl_lock);
5463 	list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
5464 		if (!rsb_flag(r, RSB_RECOVER_GRANT))
5465 			continue;
5466 		if (!is_master(r)) {
5467 			rsb_clear_flag(r, RSB_RECOVER_GRANT);
5468 			continue;
5469 		}
5470 		hold_rsb(r);
5471 		read_unlock_bh(&ls->ls_rsbtbl_lock);
5472 		return r;
5473 	}
5474 	read_unlock_bh(&ls->ls_rsbtbl_lock);
5475 	return NULL;
5476 }
5477 
5478 /*
5479  * Attempt to grant locks on resources that we are the master of.
5480  * Locks may have become grantable during recovery because locks
5481  * from departed nodes have been purged (or not rebuilt), allowing
5482  * previously blocked locks to now be granted.  The subset of rsb's
5483  * we are interested in are those with lkb's on either the convert or
5484  * waiting queues.
5485  *
5486  * Simplest would be to go through each master rsb and check for non-empty
5487  * convert or waiting queues, and attempt to grant on those rsbs.
5488  * Checking the queues requires lock_rsb, though, for which we'd need
5489  * to release the rsbtbl lock.  This would make iterating through all
5490  * rsb's very inefficient.  So, we rely on earlier recovery routines
5491  * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5492  * locks for.
5493  */
5494 
dlm_recover_grant(struct dlm_ls * ls)5495 void dlm_recover_grant(struct dlm_ls *ls)
5496 {
5497 	struct dlm_rsb *r;
5498 	unsigned int count = 0;
5499 	unsigned int rsb_count = 0;
5500 	unsigned int lkb_count = 0;
5501 
5502 	while (1) {
5503 		r = find_grant_rsb(ls);
5504 		if (!r)
5505 			break;
5506 
5507 		rsb_count++;
5508 		count = 0;
5509 		lock_rsb(r);
5510 		/* the RECOVER_GRANT flag is checked in the grant path */
5511 		grant_pending_locks(r, &count);
5512 		rsb_clear_flag(r, RSB_RECOVER_GRANT);
5513 		lkb_count += count;
5514 		confirm_master(r, 0);
5515 		unlock_rsb(r);
5516 		put_rsb(r);
5517 		cond_resched();
5518 	}
5519 
5520 	if (lkb_count)
5521 		log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5522 			  lkb_count, rsb_count);
5523 }
5524 
search_remid_list(struct list_head * head,int nodeid,uint32_t remid)5525 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5526 					 uint32_t remid)
5527 {
5528 	struct dlm_lkb *lkb;
5529 
5530 	list_for_each_entry(lkb, head, lkb_statequeue) {
5531 		if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5532 			return lkb;
5533 	}
5534 	return NULL;
5535 }
5536 
search_remid(struct dlm_rsb * r,int nodeid,uint32_t remid)5537 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5538 				    uint32_t remid)
5539 {
5540 	struct dlm_lkb *lkb;
5541 
5542 	lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5543 	if (lkb)
5544 		return lkb;
5545 	lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5546 	if (lkb)
5547 		return lkb;
5548 	lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5549 	if (lkb)
5550 		return lkb;
5551 	return NULL;
5552 }
5553 
5554 /* needs at least dlm_rcom + rcom_lock */
receive_rcom_lock_args(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_rsb * r,const struct dlm_rcom * rc)5555 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5556 				  struct dlm_rsb *r, const struct dlm_rcom *rc)
5557 {
5558 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5559 
5560 	lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5561 	lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5562 	lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5563 	lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5564 	dlm_set_dflags_val(lkb, le32_to_cpu(rl->rl_flags));
5565 	set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
5566 	lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5567 	lkb->lkb_rqmode = rl->rl_rqmode;
5568 	lkb->lkb_grmode = rl->rl_grmode;
5569 	/* don't set lkb_status because add_lkb wants to itself */
5570 
5571 	lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5572 	lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5573 
5574 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5575 		int lvblen = le16_to_cpu(rc->rc_header.h_length) -
5576 			sizeof(struct dlm_rcom) - sizeof(struct rcom_lock);
5577 		if (lvblen > ls->ls_lvblen)
5578 			return -EINVAL;
5579 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5580 		if (!lkb->lkb_lvbptr)
5581 			return -ENOMEM;
5582 		memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5583 	}
5584 
5585 	/* Conversions between PR and CW (middle modes) need special handling.
5586 	   The real granted mode of these converting locks cannot be determined
5587 	   until all locks have been rebuilt on the rsb (recover_conversion) */
5588 
5589 	if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
5590 	    middle_conversion(lkb)) {
5591 		rl->rl_status = DLM_LKSTS_CONVERT;
5592 		lkb->lkb_grmode = DLM_LOCK_IV;
5593 		rsb_set_flag(r, RSB_RECOVER_CONVERT);
5594 	}
5595 
5596 	return 0;
5597 }
5598 
5599 /* This lkb may have been recovered in a previous aborted recovery so we need
5600    to check if the rsb already has an lkb with the given remote nodeid/lkid.
5601    If so we just send back a standard reply.  If not, we create a new lkb with
5602    the given values and send back our lkid.  We send back our lkid by sending
5603    back the rcom_lock struct we got but with the remid field filled in. */
5604 
5605 /* needs at least dlm_rcom + rcom_lock */
dlm_recover_master_copy(struct dlm_ls * ls,const struct dlm_rcom * rc,__le32 * rl_remid,__le32 * rl_result)5606 int dlm_recover_master_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5607 			    __le32 *rl_remid, __le32 *rl_result)
5608 {
5609 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5610 	struct dlm_rsb *r;
5611 	struct dlm_lkb *lkb;
5612 	uint32_t remid = 0;
5613 	int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5614 	int error;
5615 
5616 	/* init rl_remid with rcom lock rl_remid */
5617 	*rl_remid = rl->rl_remid;
5618 
5619 	if (rl->rl_parent_lkid) {
5620 		error = -EOPNOTSUPP;
5621 		goto out;
5622 	}
5623 
5624 	remid = le32_to_cpu(rl->rl_lkid);
5625 
5626 	/* In general we expect the rsb returned to be R_MASTER, but we don't
5627 	   have to require it.  Recovery of masters on one node can overlap
5628 	   recovery of locks on another node, so one node can send us MSTCPY
5629 	   locks before we've made ourselves master of this rsb.  We can still
5630 	   add new MSTCPY locks that we receive here without any harm; when
5631 	   we make ourselves master, dlm_recover_masters() won't touch the
5632 	   MSTCPY locks we've received early. */
5633 
5634 	error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5635 			 from_nodeid, R_RECEIVE_RECOVER, &r);
5636 	if (error)
5637 		goto out;
5638 
5639 	lock_rsb(r);
5640 
5641 	if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5642 		log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5643 			  from_nodeid, remid);
5644 		error = -EBADR;
5645 		goto out_unlock;
5646 	}
5647 
5648 	lkb = search_remid(r, from_nodeid, remid);
5649 	if (lkb) {
5650 		error = -EEXIST;
5651 		goto out_remid;
5652 	}
5653 
5654 	error = create_lkb(ls, &lkb);
5655 	if (error)
5656 		goto out_unlock;
5657 
5658 	error = receive_rcom_lock_args(ls, lkb, r, rc);
5659 	if (error) {
5660 		__put_lkb(ls, lkb);
5661 		goto out_unlock;
5662 	}
5663 
5664 	attach_lkb(r, lkb);
5665 	add_lkb(r, lkb, rl->rl_status);
5666 	ls->ls_recover_locks_in++;
5667 
5668 	if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5669 		rsb_set_flag(r, RSB_RECOVER_GRANT);
5670 
5671  out_remid:
5672 	/* this is the new value returned to the lock holder for
5673 	   saving in its process-copy lkb */
5674 	*rl_remid = cpu_to_le32(lkb->lkb_id);
5675 
5676 	lkb->lkb_recover_seq = ls->ls_recover_seq;
5677 
5678  out_unlock:
5679 	unlock_rsb(r);
5680 	put_rsb(r);
5681  out:
5682 	if (error && error != -EEXIST)
5683 		log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5684 			  from_nodeid, remid, error);
5685 	*rl_result = cpu_to_le32(error);
5686 	return error;
5687 }
5688 
5689 /* needs at least dlm_rcom + rcom_lock */
dlm_recover_process_copy(struct dlm_ls * ls,const struct dlm_rcom * rc,uint64_t seq)5690 int dlm_recover_process_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5691 			     uint64_t seq)
5692 {
5693 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5694 	struct dlm_rsb *r;
5695 	struct dlm_lkb *lkb;
5696 	uint32_t lkid, remid;
5697 	int error, result;
5698 
5699 	lkid = le32_to_cpu(rl->rl_lkid);
5700 	remid = le32_to_cpu(rl->rl_remid);
5701 	result = le32_to_cpu(rl->rl_result);
5702 
5703 	error = find_lkb(ls, lkid, &lkb);
5704 	if (error) {
5705 		log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5706 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5707 			  result);
5708 		return error;
5709 	}
5710 
5711 	r = lkb->lkb_resource;
5712 	hold_rsb(r);
5713 	lock_rsb(r);
5714 
5715 	if (!is_process_copy(lkb)) {
5716 		log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5717 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5718 			  result);
5719 		dlm_dump_rsb(r);
5720 		unlock_rsb(r);
5721 		put_rsb(r);
5722 		dlm_put_lkb(lkb);
5723 		return -EINVAL;
5724 	}
5725 
5726 	switch (result) {
5727 	case -EBADR:
5728 		/* There's a chance the new master received our lock before
5729 		   dlm_recover_master_reply(), this wouldn't happen if we did
5730 		   a barrier between recover_masters and recover_locks. */
5731 
5732 		log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5733 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5734 			  result);
5735 
5736 		dlm_send_rcom_lock(r, lkb, seq);
5737 		goto out;
5738 	case -EEXIST:
5739 	case 0:
5740 		lkb->lkb_remid = remid;
5741 		break;
5742 	default:
5743 		log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5744 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5745 			  result);
5746 	}
5747 
5748 	/* an ack for dlm_recover_locks() which waits for replies from
5749 	   all the locks it sends to new masters */
5750 	dlm_recovered_lock(r);
5751  out:
5752 	unlock_rsb(r);
5753 	put_rsb(r);
5754 	dlm_put_lkb(lkb);
5755 
5756 	return 0;
5757 }
5758 
dlm_user_request(struct dlm_ls * ls,struct dlm_user_args * ua,int mode,uint32_t flags,void * name,unsigned int namelen)5759 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5760 		     int mode, uint32_t flags, void *name, unsigned int namelen)
5761 {
5762 	struct dlm_lkb *lkb;
5763 	struct dlm_args args;
5764 	bool do_put = true;
5765 	int error;
5766 
5767 	dlm_lock_recovery(ls);
5768 
5769 	error = create_lkb(ls, &lkb);
5770 	if (error) {
5771 		kfree(ua);
5772 		goto out;
5773 	}
5774 
5775 	trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
5776 
5777 	if (flags & DLM_LKF_VALBLK) {
5778 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5779 		if (!ua->lksb.sb_lvbptr) {
5780 			kfree(ua);
5781 			error = -ENOMEM;
5782 			goto out_put;
5783 		}
5784 	}
5785 	error = set_lock_args(mode, &ua->lksb, flags, namelen, fake_astfn, ua,
5786 			      fake_bastfn, &args);
5787 	if (error) {
5788 		kfree(ua->lksb.sb_lvbptr);
5789 		ua->lksb.sb_lvbptr = NULL;
5790 		kfree(ua);
5791 		goto out_put;
5792 	}
5793 
5794 	/* After ua is attached to lkb it will be freed by dlm_free_lkb().
5795 	   When DLM_DFL_USER_BIT is set, the dlm knows that this is a userspace
5796 	   lock and that lkb_astparam is the dlm_user_args structure. */
5797 	set_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags);
5798 	error = request_lock(ls, lkb, name, namelen, &args);
5799 
5800 	switch (error) {
5801 	case 0:
5802 		break;
5803 	case -EINPROGRESS:
5804 		error = 0;
5805 		break;
5806 	case -EAGAIN:
5807 		error = 0;
5808 		fallthrough;
5809 	default:
5810 		goto out_put;
5811 	}
5812 
5813 	/* add this new lkb to the per-process list of locks */
5814 	spin_lock_bh(&ua->proc->locks_spin);
5815 	hold_lkb(lkb);
5816 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5817 	spin_unlock_bh(&ua->proc->locks_spin);
5818 	do_put = false;
5819  out_put:
5820 	trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false);
5821 	if (do_put)
5822 		__put_lkb(ls, lkb);
5823  out:
5824 	dlm_unlock_recovery(ls);
5825 	return error;
5826 }
5827 
dlm_user_convert(struct dlm_ls * ls,struct dlm_user_args * ua_tmp,int mode,uint32_t flags,uint32_t lkid,char * lvb_in)5828 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5829 		     int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
5830 {
5831 	struct dlm_lkb *lkb;
5832 	struct dlm_args args;
5833 	struct dlm_user_args *ua;
5834 	int error;
5835 
5836 	dlm_lock_recovery(ls);
5837 
5838 	error = find_lkb(ls, lkid, &lkb);
5839 	if (error)
5840 		goto out;
5841 
5842 	trace_dlm_lock_start(ls, lkb, NULL, 0, mode, flags);
5843 
5844 	/* user can change the params on its lock when it converts it, or
5845 	   add an lvb that didn't exist before */
5846 
5847 	ua = lkb->lkb_ua;
5848 
5849 	if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5850 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5851 		if (!ua->lksb.sb_lvbptr) {
5852 			error = -ENOMEM;
5853 			goto out_put;
5854 		}
5855 	}
5856 	if (lvb_in && ua->lksb.sb_lvbptr)
5857 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5858 
5859 	ua->xid = ua_tmp->xid;
5860 	ua->castparam = ua_tmp->castparam;
5861 	ua->castaddr = ua_tmp->castaddr;
5862 	ua->bastparam = ua_tmp->bastparam;
5863 	ua->bastaddr = ua_tmp->bastaddr;
5864 	ua->user_lksb = ua_tmp->user_lksb;
5865 
5866 	error = set_lock_args(mode, &ua->lksb, flags, 0, fake_astfn, ua,
5867 			      fake_bastfn, &args);
5868 	if (error)
5869 		goto out_put;
5870 
5871 	error = convert_lock(ls, lkb, &args);
5872 
5873 	if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5874 		error = 0;
5875  out_put:
5876 	trace_dlm_lock_end(ls, lkb, NULL, 0, mode, flags, error, false);
5877 	dlm_put_lkb(lkb);
5878  out:
5879 	dlm_unlock_recovery(ls);
5880 	kfree(ua_tmp);
5881 	return error;
5882 }
5883 
5884 /*
5885  * The caller asks for an orphan lock on a given resource with a given mode.
5886  * If a matching lock exists, it's moved to the owner's list of locks and
5887  * the lkid is returned.
5888  */
5889 
dlm_user_adopt_orphan(struct dlm_ls * ls,struct dlm_user_args * ua_tmp,int mode,uint32_t flags,void * name,unsigned int namelen,uint32_t * lkid)5890 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5891 		     int mode, uint32_t flags, void *name, unsigned int namelen,
5892 		     uint32_t *lkid)
5893 {
5894 	struct dlm_lkb *lkb = NULL, *iter;
5895 	struct dlm_user_args *ua;
5896 	int found_other_mode = 0;
5897 	int rv = 0;
5898 
5899 	spin_lock_bh(&ls->ls_orphans_lock);
5900 	list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) {
5901 		if (iter->lkb_resource->res_length != namelen)
5902 			continue;
5903 		if (memcmp(iter->lkb_resource->res_name, name, namelen))
5904 			continue;
5905 		if (iter->lkb_grmode != mode) {
5906 			found_other_mode = 1;
5907 			continue;
5908 		}
5909 
5910 		lkb = iter;
5911 		list_del_init(&iter->lkb_ownqueue);
5912 		clear_bit(DLM_DFL_ORPHAN_BIT, &iter->lkb_dflags);
5913 		*lkid = iter->lkb_id;
5914 		break;
5915 	}
5916 	spin_unlock_bh(&ls->ls_orphans_lock);
5917 
5918 	if (!lkb && found_other_mode) {
5919 		rv = -EAGAIN;
5920 		goto out;
5921 	}
5922 
5923 	if (!lkb) {
5924 		rv = -ENOENT;
5925 		goto out;
5926 	}
5927 
5928 	lkb->lkb_exflags = flags;
5929 	lkb->lkb_ownpid = (int) current->pid;
5930 
5931 	ua = lkb->lkb_ua;
5932 
5933 	ua->proc = ua_tmp->proc;
5934 	ua->xid = ua_tmp->xid;
5935 	ua->castparam = ua_tmp->castparam;
5936 	ua->castaddr = ua_tmp->castaddr;
5937 	ua->bastparam = ua_tmp->bastparam;
5938 	ua->bastaddr = ua_tmp->bastaddr;
5939 	ua->user_lksb = ua_tmp->user_lksb;
5940 
5941 	/*
5942 	 * The lkb reference from the ls_orphans list was not
5943 	 * removed above, and is now considered the reference
5944 	 * for the proc locks list.
5945 	 */
5946 
5947 	spin_lock_bh(&ua->proc->locks_spin);
5948 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5949 	spin_unlock_bh(&ua->proc->locks_spin);
5950  out:
5951 	kfree(ua_tmp);
5952 	return rv;
5953 }
5954 
dlm_user_unlock(struct dlm_ls * ls,struct dlm_user_args * ua_tmp,uint32_t flags,uint32_t lkid,char * lvb_in)5955 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5956 		    uint32_t flags, uint32_t lkid, char *lvb_in)
5957 {
5958 	struct dlm_lkb *lkb;
5959 	struct dlm_args args;
5960 	struct dlm_user_args *ua;
5961 	int error;
5962 
5963 	dlm_lock_recovery(ls);
5964 
5965 	error = find_lkb(ls, lkid, &lkb);
5966 	if (error)
5967 		goto out;
5968 
5969 	trace_dlm_unlock_start(ls, lkb, flags);
5970 
5971 	ua = lkb->lkb_ua;
5972 
5973 	if (lvb_in && ua->lksb.sb_lvbptr)
5974 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5975 	if (ua_tmp->castparam)
5976 		ua->castparam = ua_tmp->castparam;
5977 	ua->user_lksb = ua_tmp->user_lksb;
5978 
5979 	error = set_unlock_args(flags, ua, &args);
5980 	if (error)
5981 		goto out_put;
5982 
5983 	error = unlock_lock(ls, lkb, &args);
5984 
5985 	if (error == -DLM_EUNLOCK)
5986 		error = 0;
5987 	/* from validate_unlock_args() */
5988 	if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5989 		error = 0;
5990 	if (error)
5991 		goto out_put;
5992 
5993 	spin_lock_bh(&ua->proc->locks_spin);
5994 	/* dlm_user_add_cb() may have already taken lkb off the proc list */
5995 	if (!list_empty(&lkb->lkb_ownqueue))
5996 		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5997 	spin_unlock_bh(&ua->proc->locks_spin);
5998  out_put:
5999 	trace_dlm_unlock_end(ls, lkb, flags, error);
6000 	dlm_put_lkb(lkb);
6001  out:
6002 	dlm_unlock_recovery(ls);
6003 	kfree(ua_tmp);
6004 	return error;
6005 }
6006 
dlm_user_cancel(struct dlm_ls * ls,struct dlm_user_args * ua_tmp,uint32_t flags,uint32_t lkid)6007 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
6008 		    uint32_t flags, uint32_t lkid)
6009 {
6010 	struct dlm_lkb *lkb;
6011 	struct dlm_args args;
6012 	struct dlm_user_args *ua;
6013 	int error;
6014 
6015 	dlm_lock_recovery(ls);
6016 
6017 	error = find_lkb(ls, lkid, &lkb);
6018 	if (error)
6019 		goto out;
6020 
6021 	trace_dlm_unlock_start(ls, lkb, flags);
6022 
6023 	ua = lkb->lkb_ua;
6024 	if (ua_tmp->castparam)
6025 		ua->castparam = ua_tmp->castparam;
6026 	ua->user_lksb = ua_tmp->user_lksb;
6027 
6028 	error = set_unlock_args(flags, ua, &args);
6029 	if (error)
6030 		goto out_put;
6031 
6032 	error = cancel_lock(ls, lkb, &args);
6033 
6034 	if (error == -DLM_ECANCEL)
6035 		error = 0;
6036 	/* from validate_unlock_args() */
6037 	if (error == -EBUSY)
6038 		error = 0;
6039  out_put:
6040 	trace_dlm_unlock_end(ls, lkb, flags, error);
6041 	dlm_put_lkb(lkb);
6042  out:
6043 	dlm_unlock_recovery(ls);
6044 	kfree(ua_tmp);
6045 	return error;
6046 }
6047 
dlm_user_deadlock(struct dlm_ls * ls,uint32_t flags,uint32_t lkid)6048 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
6049 {
6050 	struct dlm_lkb *lkb;
6051 	struct dlm_args args;
6052 	struct dlm_user_args *ua;
6053 	struct dlm_rsb *r;
6054 	int error;
6055 
6056 	dlm_lock_recovery(ls);
6057 
6058 	error = find_lkb(ls, lkid, &lkb);
6059 	if (error)
6060 		goto out;
6061 
6062 	trace_dlm_unlock_start(ls, lkb, flags);
6063 
6064 	ua = lkb->lkb_ua;
6065 
6066 	error = set_unlock_args(flags, ua, &args);
6067 	if (error)
6068 		goto out_put;
6069 
6070 	/* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
6071 
6072 	r = lkb->lkb_resource;
6073 	hold_rsb(r);
6074 	lock_rsb(r);
6075 
6076 	error = validate_unlock_args(lkb, &args);
6077 	if (error)
6078 		goto out_r;
6079 	set_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags);
6080 
6081 	error = _cancel_lock(r, lkb);
6082  out_r:
6083 	unlock_rsb(r);
6084 	put_rsb(r);
6085 
6086 	if (error == -DLM_ECANCEL)
6087 		error = 0;
6088 	/* from validate_unlock_args() */
6089 	if (error == -EBUSY)
6090 		error = 0;
6091  out_put:
6092 	trace_dlm_unlock_end(ls, lkb, flags, error);
6093 	dlm_put_lkb(lkb);
6094  out:
6095 	dlm_unlock_recovery(ls);
6096 	return error;
6097 }
6098 
6099 /* lkb's that are removed from the waiters list by revert are just left on the
6100    orphans list with the granted orphan locks, to be freed by purge */
6101 
orphan_proc_lock(struct dlm_ls * ls,struct dlm_lkb * lkb)6102 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6103 {
6104 	struct dlm_args args;
6105 	int error;
6106 
6107 	hold_lkb(lkb); /* reference for the ls_orphans list */
6108 	spin_lock_bh(&ls->ls_orphans_lock);
6109 	list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6110 	spin_unlock_bh(&ls->ls_orphans_lock);
6111 
6112 	set_unlock_args(0, lkb->lkb_ua, &args);
6113 
6114 	error = cancel_lock(ls, lkb, &args);
6115 	if (error == -DLM_ECANCEL)
6116 		error = 0;
6117 	return error;
6118 }
6119 
6120 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
6121    granted.  Regardless of what rsb queue the lock is on, it's removed and
6122    freed.  The IVVALBLK flag causes the lvb on the resource to be invalidated
6123    if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
6124 
unlock_proc_lock(struct dlm_ls * ls,struct dlm_lkb * lkb)6125 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6126 {
6127 	struct dlm_args args;
6128 	int error;
6129 
6130 	set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
6131 			lkb->lkb_ua, &args);
6132 
6133 	error = unlock_lock(ls, lkb, &args);
6134 	if (error == -DLM_EUNLOCK)
6135 		error = 0;
6136 	return error;
6137 }
6138 
6139 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
6140    (which does lock_rsb) due to deadlock with receiving a message that does
6141    lock_rsb followed by dlm_user_add_cb() */
6142 
del_proc_lock(struct dlm_ls * ls,struct dlm_user_proc * proc)6143 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
6144 				     struct dlm_user_proc *proc)
6145 {
6146 	struct dlm_lkb *lkb = NULL;
6147 
6148 	spin_lock_bh(&ls->ls_clear_proc_locks);
6149 	if (list_empty(&proc->locks))
6150 		goto out;
6151 
6152 	lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
6153 	list_del_init(&lkb->lkb_ownqueue);
6154 
6155 	if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6156 		set_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags);
6157 	else
6158 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6159  out:
6160 	spin_unlock_bh(&ls->ls_clear_proc_locks);
6161 	return lkb;
6162 }
6163 
6164 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
6165    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
6166    which we clear here. */
6167 
6168 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
6169    list, and no more device_writes should add lkb's to proc->locks list; so we
6170    shouldn't need to take asts_spin or locks_spin here.  this assumes that
6171    device reads/writes/closes are serialized -- FIXME: we may need to serialize
6172    them ourself. */
6173 
dlm_clear_proc_locks(struct dlm_ls * ls,struct dlm_user_proc * proc)6174 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6175 {
6176 	struct dlm_callback *cb, *cb_safe;
6177 	struct dlm_lkb *lkb, *safe;
6178 
6179 	dlm_lock_recovery(ls);
6180 
6181 	while (1) {
6182 		lkb = del_proc_lock(ls, proc);
6183 		if (!lkb)
6184 			break;
6185 		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6186 			orphan_proc_lock(ls, lkb);
6187 		else
6188 			unlock_proc_lock(ls, lkb);
6189 
6190 		/* this removes the reference for the proc->locks list
6191 		   added by dlm_user_request, it may result in the lkb
6192 		   being freed */
6193 
6194 		dlm_put_lkb(lkb);
6195 	}
6196 
6197 	spin_lock_bh(&ls->ls_clear_proc_locks);
6198 
6199 	/* in-progress unlocks */
6200 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6201 		list_del_init(&lkb->lkb_ownqueue);
6202 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6203 		dlm_put_lkb(lkb);
6204 	}
6205 
6206 	list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6207 		list_del(&cb->list);
6208 		dlm_free_cb(cb);
6209 	}
6210 
6211 	spin_unlock_bh(&ls->ls_clear_proc_locks);
6212 	dlm_unlock_recovery(ls);
6213 }
6214 
purge_proc_locks(struct dlm_ls * ls,struct dlm_user_proc * proc)6215 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6216 {
6217 	struct dlm_callback *cb, *cb_safe;
6218 	struct dlm_lkb *lkb, *safe;
6219 
6220 	while (1) {
6221 		lkb = NULL;
6222 		spin_lock_bh(&proc->locks_spin);
6223 		if (!list_empty(&proc->locks)) {
6224 			lkb = list_entry(proc->locks.next, struct dlm_lkb,
6225 					 lkb_ownqueue);
6226 			list_del_init(&lkb->lkb_ownqueue);
6227 		}
6228 		spin_unlock_bh(&proc->locks_spin);
6229 
6230 		if (!lkb)
6231 			break;
6232 
6233 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6234 		unlock_proc_lock(ls, lkb);
6235 		dlm_put_lkb(lkb); /* ref from proc->locks list */
6236 	}
6237 
6238 	spin_lock_bh(&proc->locks_spin);
6239 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6240 		list_del_init(&lkb->lkb_ownqueue);
6241 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6242 		dlm_put_lkb(lkb);
6243 	}
6244 	spin_unlock_bh(&proc->locks_spin);
6245 
6246 	spin_lock_bh(&proc->asts_spin);
6247 	list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6248 		list_del(&cb->list);
6249 		dlm_free_cb(cb);
6250 	}
6251 	spin_unlock_bh(&proc->asts_spin);
6252 }
6253 
6254 /* pid of 0 means purge all orphans */
6255 
do_purge(struct dlm_ls * ls,int nodeid,int pid)6256 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6257 {
6258 	struct dlm_lkb *lkb, *safe;
6259 
6260 	spin_lock_bh(&ls->ls_orphans_lock);
6261 	list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6262 		if (pid && lkb->lkb_ownpid != pid)
6263 			continue;
6264 		unlock_proc_lock(ls, lkb);
6265 		list_del_init(&lkb->lkb_ownqueue);
6266 		dlm_put_lkb(lkb);
6267 	}
6268 	spin_unlock_bh(&ls->ls_orphans_lock);
6269 }
6270 
send_purge(struct dlm_ls * ls,int nodeid,int pid)6271 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6272 {
6273 	struct dlm_message *ms;
6274 	struct dlm_mhandle *mh;
6275 	int error;
6276 
6277 	error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6278 				DLM_MSG_PURGE, &ms, &mh);
6279 	if (error)
6280 		return error;
6281 	ms->m_nodeid = cpu_to_le32(nodeid);
6282 	ms->m_pid = cpu_to_le32(pid);
6283 
6284 	return send_message(mh, ms, NULL, 0);
6285 }
6286 
dlm_user_purge(struct dlm_ls * ls,struct dlm_user_proc * proc,int nodeid,int pid)6287 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6288 		   int nodeid, int pid)
6289 {
6290 	int error = 0;
6291 
6292 	if (nodeid && (nodeid != dlm_our_nodeid())) {
6293 		error = send_purge(ls, nodeid, pid);
6294 	} else {
6295 		dlm_lock_recovery(ls);
6296 		if (pid == current->pid)
6297 			purge_proc_locks(ls, proc);
6298 		else
6299 			do_purge(ls, nodeid, pid);
6300 		dlm_unlock_recovery(ls);
6301 	}
6302 	return error;
6303 }
6304 
6305 /* debug functionality */
dlm_debug_add_lkb(struct dlm_ls * ls,uint32_t lkb_id,char * name,int len,int lkb_nodeid,unsigned int lkb_dflags,int lkb_status)6306 int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
6307 		      int lkb_nodeid, unsigned int lkb_dflags, int lkb_status)
6308 {
6309 	struct dlm_lksb *lksb;
6310 	struct dlm_lkb *lkb;
6311 	struct dlm_rsb *r;
6312 	int error;
6313 
6314 	/* we currently can't set a valid user lock */
6315 	if (lkb_dflags & BIT(DLM_DFL_USER_BIT))
6316 		return -EOPNOTSUPP;
6317 
6318 	lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
6319 	if (!lksb)
6320 		return -ENOMEM;
6321 
6322 	error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1);
6323 	if (error) {
6324 		kfree(lksb);
6325 		return error;
6326 	}
6327 
6328 	dlm_set_dflags_val(lkb, lkb_dflags);
6329 	lkb->lkb_nodeid = lkb_nodeid;
6330 	lkb->lkb_lksb = lksb;
6331 	/* user specific pointer, just don't have it NULL for kernel locks */
6332 	if (~lkb_dflags & BIT(DLM_DFL_USER_BIT))
6333 		lkb->lkb_astparam = (void *)0xDEADBEEF;
6334 
6335 	error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
6336 	if (error) {
6337 		kfree(lksb);
6338 		__put_lkb(ls, lkb);
6339 		return error;
6340 	}
6341 
6342 	lock_rsb(r);
6343 	attach_lkb(r, lkb);
6344 	add_lkb(r, lkb, lkb_status);
6345 	unlock_rsb(r);
6346 	put_rsb(r);
6347 
6348 	return 0;
6349 }
6350 
dlm_debug_add_lkb_to_waiters(struct dlm_ls * ls,uint32_t lkb_id,int mstype,int to_nodeid)6351 int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
6352 				 int mstype, int to_nodeid)
6353 {
6354 	struct dlm_lkb *lkb;
6355 	int error;
6356 
6357 	error = find_lkb(ls, lkb_id, &lkb);
6358 	if (error)
6359 		return error;
6360 
6361 	error = add_to_waiters(lkb, mstype, to_nodeid);
6362 	dlm_put_lkb(lkb);
6363 	return error;
6364 }
6365 
6366