• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12 
13 /* Central locking logic has four stages:
14 
15    dlm_lock()
16    dlm_unlock()
17 
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22 
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27 
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32 
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35 
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40 
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43 
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46 
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49 
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53 
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
75 
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
87 				    struct dlm_message *ms);
88 static int receive_extralen(struct dlm_message *ms);
89 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
90 static void del_timeout(struct dlm_lkb *lkb);
91 
92 /*
93  * Lock compatibilty matrix - thanks Steve
94  * UN = Unlocked state. Not really a state, used as a flag
95  * PD = Padding. Used to make the matrix a nice power of two in size
96  * Other states are the same as the VMS DLM.
97  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
98  */
99 
100 static const int __dlm_compat_matrix[8][8] = {
101       /* UN NL CR CW PR PW EX PD */
102         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
103         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
104         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
105         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
106         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
107         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
108         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
109         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
110 };
111 
112 /*
113  * This defines the direction of transfer of LVB data.
114  * Granted mode is the row; requested mode is the column.
115  * Usage: matrix[grmode+1][rqmode+1]
116  * 1 = LVB is returned to the caller
117  * 0 = LVB is written to the resource
118  * -1 = nothing happens to the LVB
119  */
120 
121 const int dlm_lvb_operations[8][8] = {
122         /* UN   NL  CR  CW  PR  PW  EX  PD*/
123         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
124         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
125         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
126         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
127         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
128         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
129         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
130         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
131 };
132 
133 #define modes_compat(gr, rq) \
134 	__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
135 
dlm_modes_compat(int mode1,int mode2)136 int dlm_modes_compat(int mode1, int mode2)
137 {
138 	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
139 }
140 
141 /*
142  * Compatibility matrix for conversions with QUECVT set.
143  * Granted mode is the row; requested mode is the column.
144  * Usage: matrix[grmode+1][rqmode+1]
145  */
146 
147 static const int __quecvt_compat_matrix[8][8] = {
148       /* UN NL CR CW PR PW EX PD */
149         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
150         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
151         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
152         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
153         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
154         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
155         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
156         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
157 };
158 
dlm_print_lkb(struct dlm_lkb * lkb)159 void dlm_print_lkb(struct dlm_lkb *lkb)
160 {
161 	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
162 	       "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
163 	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
164 	       lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
165 	       lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
166 }
167 
dlm_print_rsb(struct dlm_rsb * r)168 static void dlm_print_rsb(struct dlm_rsb *r)
169 {
170 	printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
171 	       r->res_nodeid, r->res_flags, r->res_first_lkid,
172 	       r->res_recover_locks_count, r->res_name);
173 }
174 
dlm_dump_rsb(struct dlm_rsb * r)175 void dlm_dump_rsb(struct dlm_rsb *r)
176 {
177 	struct dlm_lkb *lkb;
178 
179 	dlm_print_rsb(r);
180 
181 	printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
182 	       list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
183 	printk(KERN_ERR "rsb lookup list\n");
184 	list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
185 		dlm_print_lkb(lkb);
186 	printk(KERN_ERR "rsb grant queue:\n");
187 	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
188 		dlm_print_lkb(lkb);
189 	printk(KERN_ERR "rsb convert queue:\n");
190 	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
191 		dlm_print_lkb(lkb);
192 	printk(KERN_ERR "rsb wait queue:\n");
193 	list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
194 		dlm_print_lkb(lkb);
195 }
196 
197 /* Threads cannot use the lockspace while it's being recovered */
198 
dlm_lock_recovery(struct dlm_ls * ls)199 static inline void dlm_lock_recovery(struct dlm_ls *ls)
200 {
201 	down_read(&ls->ls_in_recovery);
202 }
203 
dlm_unlock_recovery(struct dlm_ls * ls)204 void dlm_unlock_recovery(struct dlm_ls *ls)
205 {
206 	up_read(&ls->ls_in_recovery);
207 }
208 
dlm_lock_recovery_try(struct dlm_ls * ls)209 int dlm_lock_recovery_try(struct dlm_ls *ls)
210 {
211 	return down_read_trylock(&ls->ls_in_recovery);
212 }
213 
can_be_queued(struct dlm_lkb * lkb)214 static inline int can_be_queued(struct dlm_lkb *lkb)
215 {
216 	return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
217 }
218 
force_blocking_asts(struct dlm_lkb * lkb)219 static inline int force_blocking_asts(struct dlm_lkb *lkb)
220 {
221 	return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
222 }
223 
is_demoted(struct dlm_lkb * lkb)224 static inline int is_demoted(struct dlm_lkb *lkb)
225 {
226 	return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
227 }
228 
is_altmode(struct dlm_lkb * lkb)229 static inline int is_altmode(struct dlm_lkb *lkb)
230 {
231 	return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
232 }
233 
is_granted(struct dlm_lkb * lkb)234 static inline int is_granted(struct dlm_lkb *lkb)
235 {
236 	return (lkb->lkb_status == DLM_LKSTS_GRANTED);
237 }
238 
is_remote(struct dlm_rsb * r)239 static inline int is_remote(struct dlm_rsb *r)
240 {
241 	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
242 	return !!r->res_nodeid;
243 }
244 
is_process_copy(struct dlm_lkb * lkb)245 static inline int is_process_copy(struct dlm_lkb *lkb)
246 {
247 	return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
248 }
249 
is_master_copy(struct dlm_lkb * lkb)250 static inline int is_master_copy(struct dlm_lkb *lkb)
251 {
252 	if (lkb->lkb_flags & DLM_IFL_MSTCPY)
253 		DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
254 	return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
255 }
256 
middle_conversion(struct dlm_lkb * lkb)257 static inline int middle_conversion(struct dlm_lkb *lkb)
258 {
259 	if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
260 	    (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
261 		return 1;
262 	return 0;
263 }
264 
down_conversion(struct dlm_lkb * lkb)265 static inline int down_conversion(struct dlm_lkb *lkb)
266 {
267 	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
268 }
269 
is_overlap_unlock(struct dlm_lkb * lkb)270 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
271 {
272 	return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
273 }
274 
is_overlap_cancel(struct dlm_lkb * lkb)275 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
276 {
277 	return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
278 }
279 
is_overlap(struct dlm_lkb * lkb)280 static inline int is_overlap(struct dlm_lkb *lkb)
281 {
282 	return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
283 				  DLM_IFL_OVERLAP_CANCEL));
284 }
285 
queue_cast(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)286 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
287 {
288 	if (is_master_copy(lkb))
289 		return;
290 
291 	del_timeout(lkb);
292 
293 	DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
294 
295 	/* if the operation was a cancel, then return -DLM_ECANCEL, if a
296 	   timeout caused the cancel then return -ETIMEDOUT */
297 	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
298 		lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
299 		rv = -ETIMEDOUT;
300 	}
301 
302 	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
303 		lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
304 		rv = -EDEADLK;
305 	}
306 
307 	lkb->lkb_lksb->sb_status = rv;
308 	lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
309 
310 	dlm_add_ast(lkb, AST_COMP, 0);
311 }
312 
queue_cast_overlap(struct dlm_rsb * r,struct dlm_lkb * lkb)313 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
314 {
315 	queue_cast(r, lkb,
316 		   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
317 }
318 
queue_bast(struct dlm_rsb * r,struct dlm_lkb * lkb,int rqmode)319 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
320 {
321 	lkb->lkb_time_bast = ktime_get();
322 
323 	if (is_master_copy(lkb))
324 		send_bast(r, lkb, rqmode);
325 	else
326 		dlm_add_ast(lkb, AST_BAST, rqmode);
327 }
328 
329 /*
330  * Basic operations on rsb's and lkb's
331  */
332 
create_rsb(struct dlm_ls * ls,char * name,int len)333 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
334 {
335 	struct dlm_rsb *r;
336 
337 	r = dlm_allocate_rsb(ls, len);
338 	if (!r)
339 		return NULL;
340 
341 	r->res_ls = ls;
342 	r->res_length = len;
343 	memcpy(r->res_name, name, len);
344 	mutex_init(&r->res_mutex);
345 
346 	INIT_LIST_HEAD(&r->res_lookup);
347 	INIT_LIST_HEAD(&r->res_grantqueue);
348 	INIT_LIST_HEAD(&r->res_convertqueue);
349 	INIT_LIST_HEAD(&r->res_waitqueue);
350 	INIT_LIST_HEAD(&r->res_root_list);
351 	INIT_LIST_HEAD(&r->res_recover_list);
352 
353 	return r;
354 }
355 
search_rsb_list(struct list_head * head,char * name,int len,unsigned int flags,struct dlm_rsb ** r_ret)356 static int search_rsb_list(struct list_head *head, char *name, int len,
357 			   unsigned int flags, struct dlm_rsb **r_ret)
358 {
359 	struct dlm_rsb *r;
360 	int error = 0;
361 
362 	list_for_each_entry(r, head, res_hashchain) {
363 		if (len == r->res_length && !memcmp(name, r->res_name, len))
364 			goto found;
365 	}
366 	*r_ret = NULL;
367 	return -EBADR;
368 
369  found:
370 	if (r->res_nodeid && (flags & R_MASTER))
371 		error = -ENOTBLK;
372 	*r_ret = r;
373 	return error;
374 }
375 
_search_rsb(struct dlm_ls * ls,char * name,int len,int b,unsigned int flags,struct dlm_rsb ** r_ret)376 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
377 		       unsigned int flags, struct dlm_rsb **r_ret)
378 {
379 	struct dlm_rsb *r;
380 	int error;
381 
382 	error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
383 	if (!error) {
384 		kref_get(&r->res_ref);
385 		goto out;
386 	}
387 	error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
388 	if (error)
389 		goto out;
390 
391 	list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
392 
393 	if (dlm_no_directory(ls))
394 		goto out;
395 
396 	if (r->res_nodeid == -1) {
397 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
398 		r->res_first_lkid = 0;
399 	} else if (r->res_nodeid > 0) {
400 		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
401 		r->res_first_lkid = 0;
402 	} else {
403 		DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
404 		DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
405 	}
406  out:
407 	*r_ret = r;
408 	return error;
409 }
410 
search_rsb(struct dlm_ls * ls,char * name,int len,int b,unsigned int flags,struct dlm_rsb ** r_ret)411 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
412 		      unsigned int flags, struct dlm_rsb **r_ret)
413 {
414 	int error;
415 	spin_lock(&ls->ls_rsbtbl[b].lock);
416 	error = _search_rsb(ls, name, len, b, flags, r_ret);
417 	spin_unlock(&ls->ls_rsbtbl[b].lock);
418 	return error;
419 }
420 
421 /*
422  * Find rsb in rsbtbl and potentially create/add one
423  *
424  * Delaying the release of rsb's has a similar benefit to applications keeping
425  * NL locks on an rsb, but without the guarantee that the cached master value
426  * will still be valid when the rsb is reused.  Apps aren't always smart enough
427  * to keep NL locks on an rsb that they may lock again shortly; this can lead
428  * to excessive master lookups and removals if we don't delay the release.
429  *
430  * Searching for an rsb means looking through both the normal list and toss
431  * list.  When found on the toss list the rsb is moved to the normal list with
432  * ref count of 1; when found on normal list the ref count is incremented.
433  */
434 
find_rsb(struct dlm_ls * ls,char * name,int namelen,unsigned int flags,struct dlm_rsb ** r_ret)435 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
436 		    unsigned int flags, struct dlm_rsb **r_ret)
437 {
438 	struct dlm_rsb *r, *tmp;
439 	uint32_t hash, bucket;
440 	int error = -EINVAL;
441 
442 	if (namelen > DLM_RESNAME_MAXLEN)
443 		goto out;
444 
445 	if (dlm_no_directory(ls))
446 		flags |= R_CREATE;
447 
448 	error = 0;
449 	hash = jhash(name, namelen, 0);
450 	bucket = hash & (ls->ls_rsbtbl_size - 1);
451 
452 	error = search_rsb(ls, name, namelen, bucket, flags, &r);
453 	if (!error)
454 		goto out;
455 
456 	if (error == -EBADR && !(flags & R_CREATE))
457 		goto out;
458 
459 	/* the rsb was found but wasn't a master copy */
460 	if (error == -ENOTBLK)
461 		goto out;
462 
463 	error = -ENOMEM;
464 	r = create_rsb(ls, name, namelen);
465 	if (!r)
466 		goto out;
467 
468 	r->res_hash = hash;
469 	r->res_bucket = bucket;
470 	r->res_nodeid = -1;
471 	kref_init(&r->res_ref);
472 
473 	/* With no directory, the master can be set immediately */
474 	if (dlm_no_directory(ls)) {
475 		int nodeid = dlm_dir_nodeid(r);
476 		if (nodeid == dlm_our_nodeid())
477 			nodeid = 0;
478 		r->res_nodeid = nodeid;
479 	}
480 
481 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
482 	error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
483 	if (!error) {
484 		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
485 		dlm_free_rsb(r);
486 		r = tmp;
487 		goto out;
488 	}
489 	list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
490 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
491 	error = 0;
492  out:
493 	*r_ret = r;
494 	return error;
495 }
496 
497 /* This is only called to add a reference when the code already holds
498    a valid reference to the rsb, so there's no need for locking. */
499 
hold_rsb(struct dlm_rsb * r)500 static inline void hold_rsb(struct dlm_rsb *r)
501 {
502 	kref_get(&r->res_ref);
503 }
504 
dlm_hold_rsb(struct dlm_rsb * r)505 void dlm_hold_rsb(struct dlm_rsb *r)
506 {
507 	hold_rsb(r);
508 }
509 
toss_rsb(struct kref * kref)510 static void toss_rsb(struct kref *kref)
511 {
512 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
513 	struct dlm_ls *ls = r->res_ls;
514 
515 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
516 	kref_init(&r->res_ref);
517 	list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
518 	r->res_toss_time = jiffies;
519 	if (r->res_lvbptr) {
520 		dlm_free_lvb(r->res_lvbptr);
521 		r->res_lvbptr = NULL;
522 	}
523 }
524 
525 /* When all references to the rsb are gone it's transfered to
526    the tossed list for later disposal. */
527 
put_rsb(struct dlm_rsb * r)528 static void put_rsb(struct dlm_rsb *r)
529 {
530 	struct dlm_ls *ls = r->res_ls;
531 	uint32_t bucket = r->res_bucket;
532 
533 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
534 	kref_put(&r->res_ref, toss_rsb);
535 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
536 }
537 
dlm_put_rsb(struct dlm_rsb * r)538 void dlm_put_rsb(struct dlm_rsb *r)
539 {
540 	put_rsb(r);
541 }
542 
543 /* See comment for unhold_lkb */
544 
unhold_rsb(struct dlm_rsb * r)545 static void unhold_rsb(struct dlm_rsb *r)
546 {
547 	int rv;
548 	rv = kref_put(&r->res_ref, toss_rsb);
549 	DLM_ASSERT(!rv, dlm_dump_rsb(r););
550 }
551 
kill_rsb(struct kref * kref)552 static void kill_rsb(struct kref *kref)
553 {
554 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
555 
556 	/* All work is done after the return from kref_put() so we
557 	   can release the write_lock before the remove and free. */
558 
559 	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
560 	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
561 	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
562 	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
563 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
564 	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
565 }
566 
567 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
568    The rsb must exist as long as any lkb's for it do. */
569 
attach_lkb(struct dlm_rsb * r,struct dlm_lkb * lkb)570 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
571 {
572 	hold_rsb(r);
573 	lkb->lkb_resource = r;
574 }
575 
detach_lkb(struct dlm_lkb * lkb)576 static void detach_lkb(struct dlm_lkb *lkb)
577 {
578 	if (lkb->lkb_resource) {
579 		put_rsb(lkb->lkb_resource);
580 		lkb->lkb_resource = NULL;
581 	}
582 }
583 
create_lkb(struct dlm_ls * ls,struct dlm_lkb ** lkb_ret)584 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
585 {
586 	struct dlm_lkb *lkb, *tmp;
587 	uint32_t lkid = 0;
588 	uint16_t bucket;
589 
590 	lkb = dlm_allocate_lkb(ls);
591 	if (!lkb)
592 		return -ENOMEM;
593 
594 	lkb->lkb_nodeid = -1;
595 	lkb->lkb_grmode = DLM_LOCK_IV;
596 	kref_init(&lkb->lkb_ref);
597 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
598 	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
599 	INIT_LIST_HEAD(&lkb->lkb_time_list);
600 
601 	get_random_bytes(&bucket, sizeof(bucket));
602 	bucket &= (ls->ls_lkbtbl_size - 1);
603 
604 	write_lock(&ls->ls_lkbtbl[bucket].lock);
605 
606 	/* counter can roll over so we must verify lkid is not in use */
607 
608 	while (lkid == 0) {
609 		lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
610 
611 		list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
612 				    lkb_idtbl_list) {
613 			if (tmp->lkb_id != lkid)
614 				continue;
615 			lkid = 0;
616 			break;
617 		}
618 	}
619 
620 	lkb->lkb_id = lkid;
621 	list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
622 	write_unlock(&ls->ls_lkbtbl[bucket].lock);
623 
624 	*lkb_ret = lkb;
625 	return 0;
626 }
627 
__find_lkb(struct dlm_ls * ls,uint32_t lkid)628 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
629 {
630 	struct dlm_lkb *lkb;
631 	uint16_t bucket = (lkid >> 16);
632 
633 	list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
634 		if (lkb->lkb_id == lkid)
635 			return lkb;
636 	}
637 	return NULL;
638 }
639 
find_lkb(struct dlm_ls * ls,uint32_t lkid,struct dlm_lkb ** lkb_ret)640 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
641 {
642 	struct dlm_lkb *lkb;
643 	uint16_t bucket = (lkid >> 16);
644 
645 	if (bucket >= ls->ls_lkbtbl_size)
646 		return -EBADSLT;
647 
648 	read_lock(&ls->ls_lkbtbl[bucket].lock);
649 	lkb = __find_lkb(ls, lkid);
650 	if (lkb)
651 		kref_get(&lkb->lkb_ref);
652 	read_unlock(&ls->ls_lkbtbl[bucket].lock);
653 
654 	*lkb_ret = lkb;
655 	return lkb ? 0 : -ENOENT;
656 }
657 
kill_lkb(struct kref * kref)658 static void kill_lkb(struct kref *kref)
659 {
660 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
661 
662 	/* All work is done after the return from kref_put() so we
663 	   can release the write_lock before the detach_lkb */
664 
665 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
666 }
667 
668 /* __put_lkb() is used when an lkb may not have an rsb attached to
669    it so we need to provide the lockspace explicitly */
670 
__put_lkb(struct dlm_ls * ls,struct dlm_lkb * lkb)671 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
672 {
673 	uint16_t bucket = (lkb->lkb_id >> 16);
674 
675 	write_lock(&ls->ls_lkbtbl[bucket].lock);
676 	if (kref_put(&lkb->lkb_ref, kill_lkb)) {
677 		list_del(&lkb->lkb_idtbl_list);
678 		write_unlock(&ls->ls_lkbtbl[bucket].lock);
679 
680 		detach_lkb(lkb);
681 
682 		/* for local/process lkbs, lvbptr points to caller's lksb */
683 		if (lkb->lkb_lvbptr && is_master_copy(lkb))
684 			dlm_free_lvb(lkb->lkb_lvbptr);
685 		dlm_free_lkb(lkb);
686 		return 1;
687 	} else {
688 		write_unlock(&ls->ls_lkbtbl[bucket].lock);
689 		return 0;
690 	}
691 }
692 
dlm_put_lkb(struct dlm_lkb * lkb)693 int dlm_put_lkb(struct dlm_lkb *lkb)
694 {
695 	struct dlm_ls *ls;
696 
697 	DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
698 	DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
699 
700 	ls = lkb->lkb_resource->res_ls;
701 	return __put_lkb(ls, lkb);
702 }
703 
704 /* This is only called to add a reference when the code already holds
705    a valid reference to the lkb, so there's no need for locking. */
706 
hold_lkb(struct dlm_lkb * lkb)707 static inline void hold_lkb(struct dlm_lkb *lkb)
708 {
709 	kref_get(&lkb->lkb_ref);
710 }
711 
712 /* This is called when we need to remove a reference and are certain
713    it's not the last ref.  e.g. del_lkb is always called between a
714    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
715    put_lkb would work fine, but would involve unnecessary locking */
716 
unhold_lkb(struct dlm_lkb * lkb)717 static inline void unhold_lkb(struct dlm_lkb *lkb)
718 {
719 	int rv;
720 	rv = kref_put(&lkb->lkb_ref, kill_lkb);
721 	DLM_ASSERT(!rv, dlm_print_lkb(lkb););
722 }
723 
lkb_add_ordered(struct list_head * new,struct list_head * head,int mode)724 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
725 			    int mode)
726 {
727 	struct dlm_lkb *lkb = NULL;
728 
729 	list_for_each_entry(lkb, head, lkb_statequeue)
730 		if (lkb->lkb_rqmode < mode)
731 			break;
732 
733 	if (!lkb)
734 		list_add_tail(new, head);
735 	else
736 		__list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
737 }
738 
739 /* add/remove lkb to rsb's grant/convert/wait queue */
740 
add_lkb(struct dlm_rsb * r,struct dlm_lkb * lkb,int status)741 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
742 {
743 	kref_get(&lkb->lkb_ref);
744 
745 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
746 
747 	lkb->lkb_timestamp = ktime_get();
748 
749 	lkb->lkb_status = status;
750 
751 	switch (status) {
752 	case DLM_LKSTS_WAITING:
753 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
754 			list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
755 		else
756 			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
757 		break;
758 	case DLM_LKSTS_GRANTED:
759 		/* convention says granted locks kept in order of grmode */
760 		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
761 				lkb->lkb_grmode);
762 		break;
763 	case DLM_LKSTS_CONVERT:
764 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
765 			list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
766 		else
767 			list_add_tail(&lkb->lkb_statequeue,
768 				      &r->res_convertqueue);
769 		break;
770 	default:
771 		DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
772 	}
773 }
774 
del_lkb(struct dlm_rsb * r,struct dlm_lkb * lkb)775 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
776 {
777 	lkb->lkb_status = 0;
778 	list_del(&lkb->lkb_statequeue);
779 	unhold_lkb(lkb);
780 }
781 
move_lkb(struct dlm_rsb * r,struct dlm_lkb * lkb,int sts)782 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
783 {
784 	hold_lkb(lkb);
785 	del_lkb(r, lkb);
786 	add_lkb(r, lkb, sts);
787 	unhold_lkb(lkb);
788 }
789 
msg_reply_type(int mstype)790 static int msg_reply_type(int mstype)
791 {
792 	switch (mstype) {
793 	case DLM_MSG_REQUEST:
794 		return DLM_MSG_REQUEST_REPLY;
795 	case DLM_MSG_CONVERT:
796 		return DLM_MSG_CONVERT_REPLY;
797 	case DLM_MSG_UNLOCK:
798 		return DLM_MSG_UNLOCK_REPLY;
799 	case DLM_MSG_CANCEL:
800 		return DLM_MSG_CANCEL_REPLY;
801 	case DLM_MSG_LOOKUP:
802 		return DLM_MSG_LOOKUP_REPLY;
803 	}
804 	return -1;
805 }
806 
807 /* add/remove lkb from global waiters list of lkb's waiting for
808    a reply from a remote node */
809 
add_to_waiters(struct dlm_lkb * lkb,int mstype)810 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
811 {
812 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
813 	int error = 0;
814 
815 	mutex_lock(&ls->ls_waiters_mutex);
816 
817 	if (is_overlap_unlock(lkb) ||
818 	    (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
819 		error = -EINVAL;
820 		goto out;
821 	}
822 
823 	if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
824 		switch (mstype) {
825 		case DLM_MSG_UNLOCK:
826 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
827 			break;
828 		case DLM_MSG_CANCEL:
829 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
830 			break;
831 		default:
832 			error = -EBUSY;
833 			goto out;
834 		}
835 		lkb->lkb_wait_count++;
836 		hold_lkb(lkb);
837 
838 		log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
839 			  lkb->lkb_id, lkb->lkb_wait_type, mstype,
840 			  lkb->lkb_wait_count, lkb->lkb_flags);
841 		goto out;
842 	}
843 
844 	DLM_ASSERT(!lkb->lkb_wait_count,
845 		   dlm_print_lkb(lkb);
846 		   printk("wait_count %d\n", lkb->lkb_wait_count););
847 
848 	lkb->lkb_wait_count++;
849 	lkb->lkb_wait_type = mstype;
850 	hold_lkb(lkb);
851 	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
852  out:
853 	if (error)
854 		log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
855 			  lkb->lkb_id, error, lkb->lkb_flags, mstype,
856 			  lkb->lkb_wait_type, lkb->lkb_resource->res_name);
857 	mutex_unlock(&ls->ls_waiters_mutex);
858 	return error;
859 }
860 
861 /* We clear the RESEND flag because we might be taking an lkb off the waiters
862    list as part of process_requestqueue (e.g. a lookup that has an optimized
863    request reply on the requestqueue) between dlm_recover_waiters_pre() which
864    set RESEND and dlm_recover_waiters_post() */
865 
_remove_from_waiters(struct dlm_lkb * lkb,int mstype)866 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
867 {
868 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
869 	int overlap_done = 0;
870 
871 	if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
872 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
873 		overlap_done = 1;
874 		goto out_del;
875 	}
876 
877 	if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
878 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
879 		overlap_done = 1;
880 		goto out_del;
881 	}
882 
883 	/* N.B. type of reply may not always correspond to type of original
884 	   msg due to lookup->request optimization, verify others? */
885 
886 	if (lkb->lkb_wait_type) {
887 		lkb->lkb_wait_type = 0;
888 		goto out_del;
889 	}
890 
891 	log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
892 		  lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
893 	return -1;
894 
895  out_del:
896 	/* the force-unlock/cancel has completed and we haven't recvd a reply
897 	   to the op that was in progress prior to the unlock/cancel; we
898 	   give up on any reply to the earlier op.  FIXME: not sure when/how
899 	   this would happen */
900 
901 	if (overlap_done && lkb->lkb_wait_type) {
902 		log_error(ls, "remove_from_waiters %x reply %d give up on %d",
903 			  lkb->lkb_id, mstype, lkb->lkb_wait_type);
904 		lkb->lkb_wait_count--;
905 		lkb->lkb_wait_type = 0;
906 	}
907 
908 	DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
909 
910 	lkb->lkb_flags &= ~DLM_IFL_RESEND;
911 	lkb->lkb_wait_count--;
912 	if (!lkb->lkb_wait_count)
913 		list_del_init(&lkb->lkb_wait_reply);
914 	unhold_lkb(lkb);
915 	return 0;
916 }
917 
remove_from_waiters(struct dlm_lkb * lkb,int mstype)918 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
919 {
920 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
921 	int error;
922 
923 	mutex_lock(&ls->ls_waiters_mutex);
924 	error = _remove_from_waiters(lkb, mstype);
925 	mutex_unlock(&ls->ls_waiters_mutex);
926 	return error;
927 }
928 
929 /* Handles situations where we might be processing a "fake" or "stub" reply in
930    which we can't try to take waiters_mutex again. */
931 
remove_from_waiters_ms(struct dlm_lkb * lkb,struct dlm_message * ms)932 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
933 {
934 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
935 	int error;
936 
937 	if (ms != &ls->ls_stub_ms)
938 		mutex_lock(&ls->ls_waiters_mutex);
939 	error = _remove_from_waiters(lkb, ms->m_type);
940 	if (ms != &ls->ls_stub_ms)
941 		mutex_unlock(&ls->ls_waiters_mutex);
942 	return error;
943 }
944 
dir_remove(struct dlm_rsb * r)945 static void dir_remove(struct dlm_rsb *r)
946 {
947 	int to_nodeid;
948 
949 	if (dlm_no_directory(r->res_ls))
950 		return;
951 
952 	to_nodeid = dlm_dir_nodeid(r);
953 	if (to_nodeid != dlm_our_nodeid())
954 		send_remove(r);
955 	else
956 		dlm_dir_remove_entry(r->res_ls, to_nodeid,
957 				     r->res_name, r->res_length);
958 }
959 
960 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
961    found since they are in order of newest to oldest? */
962 
shrink_bucket(struct dlm_ls * ls,int b)963 static int shrink_bucket(struct dlm_ls *ls, int b)
964 {
965 	struct dlm_rsb *r;
966 	int count = 0, found;
967 
968 	for (;;) {
969 		found = 0;
970 		spin_lock(&ls->ls_rsbtbl[b].lock);
971 		list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
972 					    res_hashchain) {
973 			if (!time_after_eq(jiffies, r->res_toss_time +
974 					   dlm_config.ci_toss_secs * HZ))
975 				continue;
976 			found = 1;
977 			break;
978 		}
979 
980 		if (!found) {
981 			spin_unlock(&ls->ls_rsbtbl[b].lock);
982 			break;
983 		}
984 
985 		if (kref_put(&r->res_ref, kill_rsb)) {
986 			list_del(&r->res_hashchain);
987 			spin_unlock(&ls->ls_rsbtbl[b].lock);
988 
989 			if (is_master(r))
990 				dir_remove(r);
991 			dlm_free_rsb(r);
992 			count++;
993 		} else {
994 			spin_unlock(&ls->ls_rsbtbl[b].lock);
995 			log_error(ls, "tossed rsb in use %s", r->res_name);
996 		}
997 	}
998 
999 	return count;
1000 }
1001 
dlm_scan_rsbs(struct dlm_ls * ls)1002 void dlm_scan_rsbs(struct dlm_ls *ls)
1003 {
1004 	int i;
1005 
1006 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1007 		shrink_bucket(ls, i);
1008 		if (dlm_locking_stopped(ls))
1009 			break;
1010 		cond_resched();
1011 	}
1012 }
1013 
add_timeout(struct dlm_lkb * lkb)1014 static void add_timeout(struct dlm_lkb *lkb)
1015 {
1016 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1017 
1018 	if (is_master_copy(lkb))
1019 		return;
1020 
1021 	if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1022 	    !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1023 		lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1024 		goto add_it;
1025 	}
1026 	if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1027 		goto add_it;
1028 	return;
1029 
1030  add_it:
1031 	DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1032 	mutex_lock(&ls->ls_timeout_mutex);
1033 	hold_lkb(lkb);
1034 	list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1035 	mutex_unlock(&ls->ls_timeout_mutex);
1036 }
1037 
del_timeout(struct dlm_lkb * lkb)1038 static void del_timeout(struct dlm_lkb *lkb)
1039 {
1040 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1041 
1042 	mutex_lock(&ls->ls_timeout_mutex);
1043 	if (!list_empty(&lkb->lkb_time_list)) {
1044 		list_del_init(&lkb->lkb_time_list);
1045 		unhold_lkb(lkb);
1046 	}
1047 	mutex_unlock(&ls->ls_timeout_mutex);
1048 }
1049 
1050 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1051    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1052    and then lock rsb because of lock ordering in add_timeout.  We may need
1053    to specify some special timeout-related bits in the lkb that are just to
1054    be accessed under the timeout_mutex. */
1055 
dlm_scan_timeout(struct dlm_ls * ls)1056 void dlm_scan_timeout(struct dlm_ls *ls)
1057 {
1058 	struct dlm_rsb *r;
1059 	struct dlm_lkb *lkb;
1060 	int do_cancel, do_warn;
1061 	s64 wait_us;
1062 
1063 	for (;;) {
1064 		if (dlm_locking_stopped(ls))
1065 			break;
1066 
1067 		do_cancel = 0;
1068 		do_warn = 0;
1069 		mutex_lock(&ls->ls_timeout_mutex);
1070 		list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1071 
1072 			wait_us = ktime_to_us(ktime_sub(ktime_get(),
1073 					      		lkb->lkb_timestamp));
1074 
1075 			if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1076 			    wait_us >= (lkb->lkb_timeout_cs * 10000))
1077 				do_cancel = 1;
1078 
1079 			if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1080 			    wait_us >= dlm_config.ci_timewarn_cs * 10000)
1081 				do_warn = 1;
1082 
1083 			if (!do_cancel && !do_warn)
1084 				continue;
1085 			hold_lkb(lkb);
1086 			break;
1087 		}
1088 		mutex_unlock(&ls->ls_timeout_mutex);
1089 
1090 		if (!do_cancel && !do_warn)
1091 			break;
1092 
1093 		r = lkb->lkb_resource;
1094 		hold_rsb(r);
1095 		lock_rsb(r);
1096 
1097 		if (do_warn) {
1098 			/* clear flag so we only warn once */
1099 			lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1100 			if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1101 				del_timeout(lkb);
1102 			dlm_timeout_warn(lkb);
1103 		}
1104 
1105 		if (do_cancel) {
1106 			log_debug(ls, "timeout cancel %x node %d %s",
1107 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1108 			lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1109 			lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1110 			del_timeout(lkb);
1111 			_cancel_lock(r, lkb);
1112 		}
1113 
1114 		unlock_rsb(r);
1115 		unhold_rsb(r);
1116 		dlm_put_lkb(lkb);
1117 	}
1118 }
1119 
1120 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1121    dlm_recoverd before checking/setting ls_recover_begin. */
1122 
dlm_adjust_timeouts(struct dlm_ls * ls)1123 void dlm_adjust_timeouts(struct dlm_ls *ls)
1124 {
1125 	struct dlm_lkb *lkb;
1126 	u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1127 
1128 	ls->ls_recover_begin = 0;
1129 	mutex_lock(&ls->ls_timeout_mutex);
1130 	list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1131 		lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1132 	mutex_unlock(&ls->ls_timeout_mutex);
1133 }
1134 
1135 /* lkb is master or local copy */
1136 
set_lvb_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)1137 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1138 {
1139 	int b, len = r->res_ls->ls_lvblen;
1140 
1141 	/* b=1 lvb returned to caller
1142 	   b=0 lvb written to rsb or invalidated
1143 	   b=-1 do nothing */
1144 
1145 	b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1146 
1147 	if (b == 1) {
1148 		if (!lkb->lkb_lvbptr)
1149 			return;
1150 
1151 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1152 			return;
1153 
1154 		if (!r->res_lvbptr)
1155 			return;
1156 
1157 		memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1158 		lkb->lkb_lvbseq = r->res_lvbseq;
1159 
1160 	} else if (b == 0) {
1161 		if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1162 			rsb_set_flag(r, RSB_VALNOTVALID);
1163 			return;
1164 		}
1165 
1166 		if (!lkb->lkb_lvbptr)
1167 			return;
1168 
1169 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1170 			return;
1171 
1172 		if (!r->res_lvbptr)
1173 			r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1174 
1175 		if (!r->res_lvbptr)
1176 			return;
1177 
1178 		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1179 		r->res_lvbseq++;
1180 		lkb->lkb_lvbseq = r->res_lvbseq;
1181 		rsb_clear_flag(r, RSB_VALNOTVALID);
1182 	}
1183 
1184 	if (rsb_flag(r, RSB_VALNOTVALID))
1185 		lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1186 }
1187 
set_lvb_unlock(struct dlm_rsb * r,struct dlm_lkb * lkb)1188 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1189 {
1190 	if (lkb->lkb_grmode < DLM_LOCK_PW)
1191 		return;
1192 
1193 	if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1194 		rsb_set_flag(r, RSB_VALNOTVALID);
1195 		return;
1196 	}
1197 
1198 	if (!lkb->lkb_lvbptr)
1199 		return;
1200 
1201 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1202 		return;
1203 
1204 	if (!r->res_lvbptr)
1205 		r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1206 
1207 	if (!r->res_lvbptr)
1208 		return;
1209 
1210 	memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1211 	r->res_lvbseq++;
1212 	rsb_clear_flag(r, RSB_VALNOTVALID);
1213 }
1214 
1215 /* lkb is process copy (pc) */
1216 
set_lvb_lock_pc(struct dlm_rsb * r,struct dlm_lkb * lkb,struct dlm_message * ms)1217 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1218 			    struct dlm_message *ms)
1219 {
1220 	int b;
1221 
1222 	if (!lkb->lkb_lvbptr)
1223 		return;
1224 
1225 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1226 		return;
1227 
1228 	b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1229 	if (b == 1) {
1230 		int len = receive_extralen(ms);
1231 		if (len > DLM_RESNAME_MAXLEN)
1232 			len = DLM_RESNAME_MAXLEN;
1233 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1234 		lkb->lkb_lvbseq = ms->m_lvbseq;
1235 	}
1236 }
1237 
1238 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1239    remove_lock -- used for unlock, removes lkb from granted
1240    revert_lock -- used for cancel, moves lkb from convert to granted
1241    grant_lock  -- used for request and convert, adds lkb to granted or
1242                   moves lkb from convert or waiting to granted
1243 
1244    Each of these is used for master or local copy lkb's.  There is
1245    also a _pc() variation used to make the corresponding change on
1246    a process copy (pc) lkb. */
1247 
_remove_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)1248 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1249 {
1250 	del_lkb(r, lkb);
1251 	lkb->lkb_grmode = DLM_LOCK_IV;
1252 	/* this unhold undoes the original ref from create_lkb()
1253 	   so this leads to the lkb being freed */
1254 	unhold_lkb(lkb);
1255 }
1256 
remove_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)1257 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1258 {
1259 	set_lvb_unlock(r, lkb);
1260 	_remove_lock(r, lkb);
1261 }
1262 
remove_lock_pc(struct dlm_rsb * r,struct dlm_lkb * lkb)1263 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1264 {
1265 	_remove_lock(r, lkb);
1266 }
1267 
1268 /* returns: 0 did nothing
1269 	    1 moved lock to granted
1270 	   -1 removed lock */
1271 
revert_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)1272 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1273 {
1274 	int rv = 0;
1275 
1276 	lkb->lkb_rqmode = DLM_LOCK_IV;
1277 
1278 	switch (lkb->lkb_status) {
1279 	case DLM_LKSTS_GRANTED:
1280 		break;
1281 	case DLM_LKSTS_CONVERT:
1282 		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1283 		rv = 1;
1284 		break;
1285 	case DLM_LKSTS_WAITING:
1286 		del_lkb(r, lkb);
1287 		lkb->lkb_grmode = DLM_LOCK_IV;
1288 		/* this unhold undoes the original ref from create_lkb()
1289 		   so this leads to the lkb being freed */
1290 		unhold_lkb(lkb);
1291 		rv = -1;
1292 		break;
1293 	default:
1294 		log_print("invalid status for revert %d", lkb->lkb_status);
1295 	}
1296 	return rv;
1297 }
1298 
revert_lock_pc(struct dlm_rsb * r,struct dlm_lkb * lkb)1299 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1300 {
1301 	return revert_lock(r, lkb);
1302 }
1303 
_grant_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)1304 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1305 {
1306 	if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1307 		lkb->lkb_grmode = lkb->lkb_rqmode;
1308 		if (lkb->lkb_status)
1309 			move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1310 		else
1311 			add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1312 	}
1313 
1314 	lkb->lkb_rqmode = DLM_LOCK_IV;
1315 }
1316 
grant_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)1317 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1318 {
1319 	set_lvb_lock(r, lkb);
1320 	_grant_lock(r, lkb);
1321 	lkb->lkb_highbast = 0;
1322 }
1323 
grant_lock_pc(struct dlm_rsb * r,struct dlm_lkb * lkb,struct dlm_message * ms)1324 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1325 			  struct dlm_message *ms)
1326 {
1327 	set_lvb_lock_pc(r, lkb, ms);
1328 	_grant_lock(r, lkb);
1329 }
1330 
1331 /* called by grant_pending_locks() which means an async grant message must
1332    be sent to the requesting node in addition to granting the lock if the
1333    lkb belongs to a remote node. */
1334 
grant_lock_pending(struct dlm_rsb * r,struct dlm_lkb * lkb)1335 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1336 {
1337 	grant_lock(r, lkb);
1338 	if (is_master_copy(lkb))
1339 		send_grant(r, lkb);
1340 	else
1341 		queue_cast(r, lkb, 0);
1342 }
1343 
1344 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1345    change the granted/requested modes.  We're munging things accordingly in
1346    the process copy.
1347    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1348    conversion deadlock
1349    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1350    compatible with other granted locks */
1351 
munge_demoted(struct dlm_lkb * lkb,struct dlm_message * ms)1352 static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1353 {
1354 	if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1355 		log_print("munge_demoted %x invalid reply type %d",
1356 			  lkb->lkb_id, ms->m_type);
1357 		return;
1358 	}
1359 
1360 	if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1361 		log_print("munge_demoted %x invalid modes gr %d rq %d",
1362 			  lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1363 		return;
1364 	}
1365 
1366 	lkb->lkb_grmode = DLM_LOCK_NL;
1367 }
1368 
munge_altmode(struct dlm_lkb * lkb,struct dlm_message * ms)1369 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1370 {
1371 	if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1372 	    ms->m_type != DLM_MSG_GRANT) {
1373 		log_print("munge_altmode %x invalid reply type %d",
1374 			  lkb->lkb_id, ms->m_type);
1375 		return;
1376 	}
1377 
1378 	if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1379 		lkb->lkb_rqmode = DLM_LOCK_PR;
1380 	else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1381 		lkb->lkb_rqmode = DLM_LOCK_CW;
1382 	else {
1383 		log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1384 		dlm_print_lkb(lkb);
1385 	}
1386 }
1387 
first_in_list(struct dlm_lkb * lkb,struct list_head * head)1388 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1389 {
1390 	struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1391 					   lkb_statequeue);
1392 	if (lkb->lkb_id == first->lkb_id)
1393 		return 1;
1394 
1395 	return 0;
1396 }
1397 
1398 /* Check if the given lkb conflicts with another lkb on the queue. */
1399 
queue_conflict(struct list_head * head,struct dlm_lkb * lkb)1400 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1401 {
1402 	struct dlm_lkb *this;
1403 
1404 	list_for_each_entry(this, head, lkb_statequeue) {
1405 		if (this == lkb)
1406 			continue;
1407 		if (!modes_compat(this, lkb))
1408 			return 1;
1409 	}
1410 	return 0;
1411 }
1412 
1413 /*
1414  * "A conversion deadlock arises with a pair of lock requests in the converting
1415  * queue for one resource.  The granted mode of each lock blocks the requested
1416  * mode of the other lock."
1417  *
1418  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1419  * convert queue from being granted, then deadlk/demote lkb.
1420  *
1421  * Example:
1422  * Granted Queue: empty
1423  * Convert Queue: NL->EX (first lock)
1424  *                PR->EX (second lock)
1425  *
1426  * The first lock can't be granted because of the granted mode of the second
1427  * lock and the second lock can't be granted because it's not first in the
1428  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1429  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1430  * flag set and return DEMOTED in the lksb flags.
1431  *
1432  * Originally, this function detected conv-deadlk in a more limited scope:
1433  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1434  * - if lkb1 was the first entry in the queue (not just earlier), and was
1435  *   blocked by the granted mode of lkb2, and there was nothing on the
1436  *   granted queue preventing lkb1 from being granted immediately, i.e.
1437  *   lkb2 was the only thing preventing lkb1 from being granted.
1438  *
1439  * That second condition meant we'd only say there was conv-deadlk if
1440  * resolving it (by demotion) would lead to the first lock on the convert
1441  * queue being granted right away.  It allowed conversion deadlocks to exist
1442  * between locks on the convert queue while they couldn't be granted anyway.
1443  *
1444  * Now, we detect and take action on conversion deadlocks immediately when
1445  * they're created, even if they may not be immediately consequential.  If
1446  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1447  * mode that would prevent lkb1's conversion from being granted, we do a
1448  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1449  * I think this means that the lkb_is_ahead condition below should always
1450  * be zero, i.e. there will never be conv-deadlk between two locks that are
1451  * both already on the convert queue.
1452  */
1453 
conversion_deadlock_detect(struct dlm_rsb * r,struct dlm_lkb * lkb2)1454 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1455 {
1456 	struct dlm_lkb *lkb1;
1457 	int lkb_is_ahead = 0;
1458 
1459 	list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1460 		if (lkb1 == lkb2) {
1461 			lkb_is_ahead = 1;
1462 			continue;
1463 		}
1464 
1465 		if (!lkb_is_ahead) {
1466 			if (!modes_compat(lkb2, lkb1))
1467 				return 1;
1468 		} else {
1469 			if (!modes_compat(lkb2, lkb1) &&
1470 			    !modes_compat(lkb1, lkb2))
1471 				return 1;
1472 		}
1473 	}
1474 	return 0;
1475 }
1476 
1477 /*
1478  * Return 1 if the lock can be granted, 0 otherwise.
1479  * Also detect and resolve conversion deadlocks.
1480  *
1481  * lkb is the lock to be granted
1482  *
1483  * now is 1 if the function is being called in the context of the
1484  * immediate request, it is 0 if called later, after the lock has been
1485  * queued.
1486  *
1487  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1488  */
1489 
_can_be_granted(struct dlm_rsb * r,struct dlm_lkb * lkb,int now)1490 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1491 {
1492 	int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1493 
1494 	/*
1495 	 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1496 	 * a new request for a NL mode lock being blocked.
1497 	 *
1498 	 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1499 	 * request, then it would be granted.  In essence, the use of this flag
1500 	 * tells the Lock Manager to expedite theis request by not considering
1501 	 * what may be in the CONVERTING or WAITING queues...  As of this
1502 	 * writing, the EXPEDITE flag can be used only with new requests for NL
1503 	 * mode locks.  This flag is not valid for conversion requests.
1504 	 *
1505 	 * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1506 	 * conversion or used with a non-NL requested mode.  We also know an
1507 	 * EXPEDITE request is always granted immediately, so now must always
1508 	 * be 1.  The full condition to grant an expedite request: (now &&
1509 	 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1510 	 * therefore be shortened to just checking the flag.
1511 	 */
1512 
1513 	if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1514 		return 1;
1515 
1516 	/*
1517 	 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1518 	 * added to the remaining conditions.
1519 	 */
1520 
1521 	if (queue_conflict(&r->res_grantqueue, lkb))
1522 		goto out;
1523 
1524 	/*
1525 	 * 6-3: By default, a conversion request is immediately granted if the
1526 	 * requested mode is compatible with the modes of all other granted
1527 	 * locks
1528 	 */
1529 
1530 	if (queue_conflict(&r->res_convertqueue, lkb))
1531 		goto out;
1532 
1533 	/*
1534 	 * 6-5: But the default algorithm for deciding whether to grant or
1535 	 * queue conversion requests does not by itself guarantee that such
1536 	 * requests are serviced on a "first come first serve" basis.  This, in
1537 	 * turn, can lead to a phenomenon known as "indefinate postponement".
1538 	 *
1539 	 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1540 	 * the system service employed to request a lock conversion.  This flag
1541 	 * forces certain conversion requests to be queued, even if they are
1542 	 * compatible with the granted modes of other locks on the same
1543 	 * resource.  Thus, the use of this flag results in conversion requests
1544 	 * being ordered on a "first come first servce" basis.
1545 	 *
1546 	 * DCT: This condition is all about new conversions being able to occur
1547 	 * "in place" while the lock remains on the granted queue (assuming
1548 	 * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1549 	 * doesn't _have_ to go onto the convert queue where it's processed in
1550 	 * order.  The "now" variable is necessary to distinguish converts
1551 	 * being received and processed for the first time now, because once a
1552 	 * convert is moved to the conversion queue the condition below applies
1553 	 * requiring fifo granting.
1554 	 */
1555 
1556 	if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1557 		return 1;
1558 
1559 	/*
1560 	 * The NOORDER flag is set to avoid the standard vms rules on grant
1561 	 * order.
1562 	 */
1563 
1564 	if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1565 		return 1;
1566 
1567 	/*
1568 	 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1569 	 * granted until all other conversion requests ahead of it are granted
1570 	 * and/or canceled.
1571 	 */
1572 
1573 	if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1574 		return 1;
1575 
1576 	/*
1577 	 * 6-4: By default, a new request is immediately granted only if all
1578 	 * three of the following conditions are satisfied when the request is
1579 	 * issued:
1580 	 * - The queue of ungranted conversion requests for the resource is
1581 	 *   empty.
1582 	 * - The queue of ungranted new requests for the resource is empty.
1583 	 * - The mode of the new request is compatible with the most
1584 	 *   restrictive mode of all granted locks on the resource.
1585 	 */
1586 
1587 	if (now && !conv && list_empty(&r->res_convertqueue) &&
1588 	    list_empty(&r->res_waitqueue))
1589 		return 1;
1590 
1591 	/*
1592 	 * 6-4: Once a lock request is in the queue of ungranted new requests,
1593 	 * it cannot be granted until the queue of ungranted conversion
1594 	 * requests is empty, all ungranted new requests ahead of it are
1595 	 * granted and/or canceled, and it is compatible with the granted mode
1596 	 * of the most restrictive lock granted on the resource.
1597 	 */
1598 
1599 	if (!now && !conv && list_empty(&r->res_convertqueue) &&
1600 	    first_in_list(lkb, &r->res_waitqueue))
1601 		return 1;
1602  out:
1603 	return 0;
1604 }
1605 
can_be_granted(struct dlm_rsb * r,struct dlm_lkb * lkb,int now,int * err)1606 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1607 			  int *err)
1608 {
1609 	int rv;
1610 	int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1611 	int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1612 
1613 	if (err)
1614 		*err = 0;
1615 
1616 	rv = _can_be_granted(r, lkb, now);
1617 	if (rv)
1618 		goto out;
1619 
1620 	/*
1621 	 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1622 	 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1623 	 * cancels one of the locks.
1624 	 */
1625 
1626 	if (is_convert && can_be_queued(lkb) &&
1627 	    conversion_deadlock_detect(r, lkb)) {
1628 		if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1629 			lkb->lkb_grmode = DLM_LOCK_NL;
1630 			lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1631 		} else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1632 			if (err)
1633 				*err = -EDEADLK;
1634 			else {
1635 				log_print("can_be_granted deadlock %x now %d",
1636 					  lkb->lkb_id, now);
1637 				dlm_dump_rsb(r);
1638 			}
1639 		}
1640 		goto out;
1641 	}
1642 
1643 	/*
1644 	 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1645 	 * to grant a request in a mode other than the normal rqmode.  It's a
1646 	 * simple way to provide a big optimization to applications that can
1647 	 * use them.
1648 	 */
1649 
1650 	if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1651 		alt = DLM_LOCK_PR;
1652 	else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1653 		alt = DLM_LOCK_CW;
1654 
1655 	if (alt) {
1656 		lkb->lkb_rqmode = alt;
1657 		rv = _can_be_granted(r, lkb, now);
1658 		if (rv)
1659 			lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1660 		else
1661 			lkb->lkb_rqmode = rqmode;
1662 	}
1663  out:
1664 	return rv;
1665 }
1666 
1667 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1668    for locks pending on the convert list.  Once verified (watch for these
1669    log_prints), we should be able to just call _can_be_granted() and not
1670    bother with the demote/deadlk cases here (and there's no easy way to deal
1671    with a deadlk here, we'd have to generate something like grant_lock with
1672    the deadlk error.) */
1673 
1674 /* Returns the highest requested mode of all blocked conversions; sets
1675    cw if there's a blocked conversion to DLM_LOCK_CW. */
1676 
grant_pending_convert(struct dlm_rsb * r,int high,int * cw)1677 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1678 {
1679 	struct dlm_lkb *lkb, *s;
1680 	int hi, demoted, quit, grant_restart, demote_restart;
1681 	int deadlk;
1682 
1683 	quit = 0;
1684  restart:
1685 	grant_restart = 0;
1686 	demote_restart = 0;
1687 	hi = DLM_LOCK_IV;
1688 
1689 	list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1690 		demoted = is_demoted(lkb);
1691 		deadlk = 0;
1692 
1693 		if (can_be_granted(r, lkb, 0, &deadlk)) {
1694 			grant_lock_pending(r, lkb);
1695 			grant_restart = 1;
1696 			continue;
1697 		}
1698 
1699 		if (!demoted && is_demoted(lkb)) {
1700 			log_print("WARN: pending demoted %x node %d %s",
1701 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1702 			demote_restart = 1;
1703 			continue;
1704 		}
1705 
1706 		if (deadlk) {
1707 			log_print("WARN: pending deadlock %x node %d %s",
1708 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1709 			dlm_dump_rsb(r);
1710 			continue;
1711 		}
1712 
1713 		hi = max_t(int, lkb->lkb_rqmode, hi);
1714 
1715 		if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1716 			*cw = 1;
1717 	}
1718 
1719 	if (grant_restart)
1720 		goto restart;
1721 	if (demote_restart && !quit) {
1722 		quit = 1;
1723 		goto restart;
1724 	}
1725 
1726 	return max_t(int, high, hi);
1727 }
1728 
grant_pending_wait(struct dlm_rsb * r,int high,int * cw)1729 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1730 {
1731 	struct dlm_lkb *lkb, *s;
1732 
1733 	list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1734 		if (can_be_granted(r, lkb, 0, NULL))
1735 			grant_lock_pending(r, lkb);
1736                 else {
1737 			high = max_t(int, lkb->lkb_rqmode, high);
1738 			if (lkb->lkb_rqmode == DLM_LOCK_CW)
1739 				*cw = 1;
1740 		}
1741 	}
1742 
1743 	return high;
1744 }
1745 
1746 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1747    on either the convert or waiting queue.
1748    high is the largest rqmode of all locks blocked on the convert or
1749    waiting queue. */
1750 
lock_requires_bast(struct dlm_lkb * gr,int high,int cw)1751 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1752 {
1753 	if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1754 		if (gr->lkb_highbast < DLM_LOCK_EX)
1755 			return 1;
1756 		return 0;
1757 	}
1758 
1759 	if (gr->lkb_highbast < high &&
1760 	    !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1761 		return 1;
1762 	return 0;
1763 }
1764 
grant_pending_locks(struct dlm_rsb * r)1765 static void grant_pending_locks(struct dlm_rsb *r)
1766 {
1767 	struct dlm_lkb *lkb, *s;
1768 	int high = DLM_LOCK_IV;
1769 	int cw = 0;
1770 
1771 	DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1772 
1773 	high = grant_pending_convert(r, high, &cw);
1774 	high = grant_pending_wait(r, high, &cw);
1775 
1776 	if (high == DLM_LOCK_IV)
1777 		return;
1778 
1779 	/*
1780 	 * If there are locks left on the wait/convert queue then send blocking
1781 	 * ASTs to granted locks based on the largest requested mode (high)
1782 	 * found above.
1783 	 */
1784 
1785 	list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1786 		if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1787 			if (cw && high == DLM_LOCK_PR &&
1788 			    lkb->lkb_grmode == DLM_LOCK_PR)
1789 				queue_bast(r, lkb, DLM_LOCK_CW);
1790 			else
1791 				queue_bast(r, lkb, high);
1792 			lkb->lkb_highbast = high;
1793 		}
1794 	}
1795 }
1796 
modes_require_bast(struct dlm_lkb * gr,struct dlm_lkb * rq)1797 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1798 {
1799 	if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1800 	    (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1801 		if (gr->lkb_highbast < DLM_LOCK_EX)
1802 			return 1;
1803 		return 0;
1804 	}
1805 
1806 	if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1807 		return 1;
1808 	return 0;
1809 }
1810 
send_bast_queue(struct dlm_rsb * r,struct list_head * head,struct dlm_lkb * lkb)1811 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1812 			    struct dlm_lkb *lkb)
1813 {
1814 	struct dlm_lkb *gr;
1815 
1816 	list_for_each_entry(gr, head, lkb_statequeue) {
1817 		if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
1818 			queue_bast(r, gr, lkb->lkb_rqmode);
1819 			gr->lkb_highbast = lkb->lkb_rqmode;
1820 		}
1821 	}
1822 }
1823 
send_blocking_asts(struct dlm_rsb * r,struct dlm_lkb * lkb)1824 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1825 {
1826 	send_bast_queue(r, &r->res_grantqueue, lkb);
1827 }
1828 
send_blocking_asts_all(struct dlm_rsb * r,struct dlm_lkb * lkb)1829 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1830 {
1831 	send_bast_queue(r, &r->res_grantqueue, lkb);
1832 	send_bast_queue(r, &r->res_convertqueue, lkb);
1833 }
1834 
1835 /* set_master(r, lkb) -- set the master nodeid of a resource
1836 
1837    The purpose of this function is to set the nodeid field in the given
1838    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1839    known, it can just be copied to the lkb and the function will return
1840    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1841    before it can be copied to the lkb.
1842 
1843    When the rsb nodeid is being looked up remotely, the initial lkb
1844    causing the lookup is kept on the ls_waiters list waiting for the
1845    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1846    on the rsb's res_lookup list until the master is verified.
1847 
1848    Return values:
1849    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1850    1: the rsb master is not available and the lkb has been placed on
1851       a wait queue
1852 */
1853 
set_master(struct dlm_rsb * r,struct dlm_lkb * lkb)1854 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1855 {
1856 	struct dlm_ls *ls = r->res_ls;
1857 	int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1858 
1859 	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1860 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1861 		r->res_first_lkid = lkb->lkb_id;
1862 		lkb->lkb_nodeid = r->res_nodeid;
1863 		return 0;
1864 	}
1865 
1866 	if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1867 		list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1868 		return 1;
1869 	}
1870 
1871 	if (r->res_nodeid == 0) {
1872 		lkb->lkb_nodeid = 0;
1873 		return 0;
1874 	}
1875 
1876 	if (r->res_nodeid > 0) {
1877 		lkb->lkb_nodeid = r->res_nodeid;
1878 		return 0;
1879 	}
1880 
1881 	DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1882 
1883 	dir_nodeid = dlm_dir_nodeid(r);
1884 
1885 	if (dir_nodeid != our_nodeid) {
1886 		r->res_first_lkid = lkb->lkb_id;
1887 		send_lookup(r, lkb);
1888 		return 1;
1889 	}
1890 
1891 	for (i = 0; i < 2; i++) {
1892 		/* It's possible for dlm_scand to remove an old rsb for
1893 		   this same resource from the toss list, us to create
1894 		   a new one, look up the master locally, and find it
1895 		   already exists just before dlm_scand does the
1896 		   dir_remove() on the previous rsb. */
1897 
1898 		error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1899 				       r->res_length, &ret_nodeid);
1900 		if (!error)
1901 			break;
1902 		log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1903 		schedule();
1904 	}
1905 	if (error && error != -EEXIST)
1906 		return error;
1907 
1908 	if (ret_nodeid == our_nodeid) {
1909 		r->res_first_lkid = 0;
1910 		r->res_nodeid = 0;
1911 		lkb->lkb_nodeid = 0;
1912 	} else {
1913 		r->res_first_lkid = lkb->lkb_id;
1914 		r->res_nodeid = ret_nodeid;
1915 		lkb->lkb_nodeid = ret_nodeid;
1916 	}
1917 	return 0;
1918 }
1919 
process_lookup_list(struct dlm_rsb * r)1920 static void process_lookup_list(struct dlm_rsb *r)
1921 {
1922 	struct dlm_lkb *lkb, *safe;
1923 
1924 	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1925 		list_del_init(&lkb->lkb_rsb_lookup);
1926 		_request_lock(r, lkb);
1927 		schedule();
1928 	}
1929 }
1930 
1931 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1932 
confirm_master(struct dlm_rsb * r,int error)1933 static void confirm_master(struct dlm_rsb *r, int error)
1934 {
1935 	struct dlm_lkb *lkb;
1936 
1937 	if (!r->res_first_lkid)
1938 		return;
1939 
1940 	switch (error) {
1941 	case 0:
1942 	case -EINPROGRESS:
1943 		r->res_first_lkid = 0;
1944 		process_lookup_list(r);
1945 		break;
1946 
1947 	case -EAGAIN:
1948 	case -EBADR:
1949 	case -ENOTBLK:
1950 		/* the remote request failed and won't be retried (it was
1951 		   a NOQUEUE, or has been canceled/unlocked); make a waiting
1952 		   lkb the first_lkid */
1953 
1954 		r->res_first_lkid = 0;
1955 
1956 		if (!list_empty(&r->res_lookup)) {
1957 			lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1958 					 lkb_rsb_lookup);
1959 			list_del_init(&lkb->lkb_rsb_lookup);
1960 			r->res_first_lkid = lkb->lkb_id;
1961 			_request_lock(r, lkb);
1962 		}
1963 		break;
1964 
1965 	default:
1966 		log_error(r->res_ls, "confirm_master unknown error %d", error);
1967 	}
1968 }
1969 
set_lock_args(int mode,struct dlm_lksb * lksb,uint32_t flags,int namelen,unsigned long timeout_cs,void (* ast)(void * astparam),void * astparam,void (* bast)(void * astparam,int mode),struct dlm_args * args)1970 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1971 			 int namelen, unsigned long timeout_cs,
1972 			 void (*ast) (void *astparam),
1973 			 void *astparam,
1974 			 void (*bast) (void *astparam, int mode),
1975 			 struct dlm_args *args)
1976 {
1977 	int rv = -EINVAL;
1978 
1979 	/* check for invalid arg usage */
1980 
1981 	if (mode < 0 || mode > DLM_LOCK_EX)
1982 		goto out;
1983 
1984 	if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1985 		goto out;
1986 
1987 	if (flags & DLM_LKF_CANCEL)
1988 		goto out;
1989 
1990 	if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1991 		goto out;
1992 
1993 	if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1994 		goto out;
1995 
1996 	if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1997 		goto out;
1998 
1999 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2000 		goto out;
2001 
2002 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2003 		goto out;
2004 
2005 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2006 		goto out;
2007 
2008 	if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2009 		goto out;
2010 
2011 	if (!ast || !lksb)
2012 		goto out;
2013 
2014 	if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2015 		goto out;
2016 
2017 	if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2018 		goto out;
2019 
2020 	/* these args will be copied to the lkb in validate_lock_args,
2021 	   it cannot be done now because when converting locks, fields in
2022 	   an active lkb cannot be modified before locking the rsb */
2023 
2024 	args->flags = flags;
2025 	args->astfn = ast;
2026 	args->astparam = astparam;
2027 	args->bastfn = bast;
2028 	args->timeout = timeout_cs;
2029 	args->mode = mode;
2030 	args->lksb = lksb;
2031 	rv = 0;
2032  out:
2033 	return rv;
2034 }
2035 
set_unlock_args(uint32_t flags,void * astarg,struct dlm_args * args)2036 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2037 {
2038 	if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2039  		      DLM_LKF_FORCEUNLOCK))
2040 		return -EINVAL;
2041 
2042 	if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2043 		return -EINVAL;
2044 
2045 	args->flags = flags;
2046 	args->astparam = astarg;
2047 	return 0;
2048 }
2049 
validate_lock_args(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_args * args)2050 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2051 			      struct dlm_args *args)
2052 {
2053 	int rv = -EINVAL;
2054 
2055 	if (args->flags & DLM_LKF_CONVERT) {
2056 		if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2057 			goto out;
2058 
2059 		if (args->flags & DLM_LKF_QUECVT &&
2060 		    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2061 			goto out;
2062 
2063 		rv = -EBUSY;
2064 		if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2065 			goto out;
2066 
2067 		if (lkb->lkb_wait_type)
2068 			goto out;
2069 
2070 		if (is_overlap(lkb))
2071 			goto out;
2072 	}
2073 
2074 	lkb->lkb_exflags = args->flags;
2075 	lkb->lkb_sbflags = 0;
2076 	lkb->lkb_astfn = args->astfn;
2077 	lkb->lkb_astparam = args->astparam;
2078 	lkb->lkb_bastfn = args->bastfn;
2079 	lkb->lkb_rqmode = args->mode;
2080 	lkb->lkb_lksb = args->lksb;
2081 	lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2082 	lkb->lkb_ownpid = (int) current->pid;
2083 	lkb->lkb_timeout_cs = args->timeout;
2084 	rv = 0;
2085  out:
2086 	return rv;
2087 }
2088 
2089 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2090    for success */
2091 
2092 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2093    because there may be a lookup in progress and it's valid to do
2094    cancel/unlockf on it */
2095 
validate_unlock_args(struct dlm_lkb * lkb,struct dlm_args * args)2096 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2097 {
2098 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2099 	int rv = -EINVAL;
2100 
2101 	if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2102 		log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2103 		dlm_print_lkb(lkb);
2104 		goto out;
2105 	}
2106 
2107 	/* an lkb may still exist even though the lock is EOL'ed due to a
2108 	   cancel, unlock or failed noqueue request; an app can't use these
2109 	   locks; return same error as if the lkid had not been found at all */
2110 
2111 	if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2112 		log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2113 		rv = -ENOENT;
2114 		goto out;
2115 	}
2116 
2117 	/* an lkb may be waiting for an rsb lookup to complete where the
2118 	   lookup was initiated by another lock */
2119 
2120 	if (!list_empty(&lkb->lkb_rsb_lookup)) {
2121 		if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2122 			log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2123 			list_del_init(&lkb->lkb_rsb_lookup);
2124 			queue_cast(lkb->lkb_resource, lkb,
2125 				   args->flags & DLM_LKF_CANCEL ?
2126 				   -DLM_ECANCEL : -DLM_EUNLOCK);
2127 			unhold_lkb(lkb); /* undoes create_lkb() */
2128 		}
2129 		/* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2130 		rv = -EBUSY;
2131 		goto out;
2132 	}
2133 
2134 	/* cancel not allowed with another cancel/unlock in progress */
2135 
2136 	if (args->flags & DLM_LKF_CANCEL) {
2137 		if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2138 			goto out;
2139 
2140 		if (is_overlap(lkb))
2141 			goto out;
2142 
2143 		/* don't let scand try to do a cancel */
2144 		del_timeout(lkb);
2145 
2146 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
2147 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2148 			rv = -EBUSY;
2149 			goto out;
2150 		}
2151 
2152 		switch (lkb->lkb_wait_type) {
2153 		case DLM_MSG_LOOKUP:
2154 		case DLM_MSG_REQUEST:
2155 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2156 			rv = -EBUSY;
2157 			goto out;
2158 		case DLM_MSG_UNLOCK:
2159 		case DLM_MSG_CANCEL:
2160 			goto out;
2161 		}
2162 		/* add_to_waiters() will set OVERLAP_CANCEL */
2163 		goto out_ok;
2164 	}
2165 
2166 	/* do we need to allow a force-unlock if there's a normal unlock
2167 	   already in progress?  in what conditions could the normal unlock
2168 	   fail such that we'd want to send a force-unlock to be sure? */
2169 
2170 	if (args->flags & DLM_LKF_FORCEUNLOCK) {
2171 		if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2172 			goto out;
2173 
2174 		if (is_overlap_unlock(lkb))
2175 			goto out;
2176 
2177 		/* don't let scand try to do a cancel */
2178 		del_timeout(lkb);
2179 
2180 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
2181 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2182 			rv = -EBUSY;
2183 			goto out;
2184 		}
2185 
2186 		switch (lkb->lkb_wait_type) {
2187 		case DLM_MSG_LOOKUP:
2188 		case DLM_MSG_REQUEST:
2189 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2190 			rv = -EBUSY;
2191 			goto out;
2192 		case DLM_MSG_UNLOCK:
2193 			goto out;
2194 		}
2195 		/* add_to_waiters() will set OVERLAP_UNLOCK */
2196 		goto out_ok;
2197 	}
2198 
2199 	/* normal unlock not allowed if there's any op in progress */
2200 	rv = -EBUSY;
2201 	if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2202 		goto out;
2203 
2204  out_ok:
2205 	/* an overlapping op shouldn't blow away exflags from other op */
2206 	lkb->lkb_exflags |= args->flags;
2207 	lkb->lkb_sbflags = 0;
2208 	lkb->lkb_astparam = args->astparam;
2209 	rv = 0;
2210  out:
2211 	if (rv)
2212 		log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2213 			  lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2214 			  args->flags, lkb->lkb_wait_type,
2215 			  lkb->lkb_resource->res_name);
2216 	return rv;
2217 }
2218 
2219 /*
2220  * Four stage 4 varieties:
2221  * do_request(), do_convert(), do_unlock(), do_cancel()
2222  * These are called on the master node for the given lock and
2223  * from the central locking logic.
2224  */
2225 
do_request(struct dlm_rsb * r,struct dlm_lkb * lkb)2226 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2227 {
2228 	int error = 0;
2229 
2230 	if (can_be_granted(r, lkb, 1, NULL)) {
2231 		grant_lock(r, lkb);
2232 		queue_cast(r, lkb, 0);
2233 		goto out;
2234 	}
2235 
2236 	if (can_be_queued(lkb)) {
2237 		error = -EINPROGRESS;
2238 		add_lkb(r, lkb, DLM_LKSTS_WAITING);
2239 		send_blocking_asts(r, lkb);
2240 		add_timeout(lkb);
2241 		goto out;
2242 	}
2243 
2244 	error = -EAGAIN;
2245 	if (force_blocking_asts(lkb))
2246 		send_blocking_asts_all(r, lkb);
2247 	queue_cast(r, lkb, -EAGAIN);
2248 
2249  out:
2250 	return error;
2251 }
2252 
do_convert(struct dlm_rsb * r,struct dlm_lkb * lkb)2253 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2254 {
2255 	int error = 0;
2256 	int deadlk = 0;
2257 
2258 	/* changing an existing lock may allow others to be granted */
2259 
2260 	if (can_be_granted(r, lkb, 1, &deadlk)) {
2261 		grant_lock(r, lkb);
2262 		queue_cast(r, lkb, 0);
2263 		grant_pending_locks(r);
2264 		goto out;
2265 	}
2266 
2267 	/* can_be_granted() detected that this lock would block in a conversion
2268 	   deadlock, so we leave it on the granted queue and return EDEADLK in
2269 	   the ast for the convert. */
2270 
2271 	if (deadlk) {
2272 		/* it's left on the granted queue */
2273 		log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2274 			  lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2275 			  lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2276 		revert_lock(r, lkb);
2277 		queue_cast(r, lkb, -EDEADLK);
2278 		error = -EDEADLK;
2279 		goto out;
2280 	}
2281 
2282 	/* is_demoted() means the can_be_granted() above set the grmode
2283 	   to NL, and left us on the granted queue.  This auto-demotion
2284 	   (due to CONVDEADLK) might mean other locks, and/or this lock, are
2285 	   now grantable.  We have to try to grant other converting locks
2286 	   before we try again to grant this one. */
2287 
2288 	if (is_demoted(lkb)) {
2289 		grant_pending_convert(r, DLM_LOCK_IV, NULL);
2290 		if (_can_be_granted(r, lkb, 1)) {
2291 			grant_lock(r, lkb);
2292 			queue_cast(r, lkb, 0);
2293 			grant_pending_locks(r);
2294 			goto out;
2295 		}
2296 		/* else fall through and move to convert queue */
2297 	}
2298 
2299 	if (can_be_queued(lkb)) {
2300 		error = -EINPROGRESS;
2301 		del_lkb(r, lkb);
2302 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2303 		send_blocking_asts(r, lkb);
2304 		add_timeout(lkb);
2305 		goto out;
2306 	}
2307 
2308 	error = -EAGAIN;
2309 	if (force_blocking_asts(lkb))
2310 		send_blocking_asts_all(r, lkb);
2311 	queue_cast(r, lkb, -EAGAIN);
2312 
2313  out:
2314 	return error;
2315 }
2316 
do_unlock(struct dlm_rsb * r,struct dlm_lkb * lkb)2317 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2318 {
2319 	remove_lock(r, lkb);
2320 	queue_cast(r, lkb, -DLM_EUNLOCK);
2321 	grant_pending_locks(r);
2322 	return -DLM_EUNLOCK;
2323 }
2324 
2325 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2326 
do_cancel(struct dlm_rsb * r,struct dlm_lkb * lkb)2327 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2328 {
2329 	int error;
2330 
2331 	error = revert_lock(r, lkb);
2332 	if (error) {
2333 		queue_cast(r, lkb, -DLM_ECANCEL);
2334 		grant_pending_locks(r);
2335 		return -DLM_ECANCEL;
2336 	}
2337 	return 0;
2338 }
2339 
2340 /*
2341  * Four stage 3 varieties:
2342  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2343  */
2344 
2345 /* add a new lkb to a possibly new rsb, called by requesting process */
2346 
_request_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2347 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2348 {
2349 	int error;
2350 
2351 	/* set_master: sets lkb nodeid from r */
2352 
2353 	error = set_master(r, lkb);
2354 	if (error < 0)
2355 		goto out;
2356 	if (error) {
2357 		error = 0;
2358 		goto out;
2359 	}
2360 
2361 	if (is_remote(r))
2362 		/* receive_request() calls do_request() on remote node */
2363 		error = send_request(r, lkb);
2364 	else
2365 		error = do_request(r, lkb);
2366  out:
2367 	return error;
2368 }
2369 
2370 /* change some property of an existing lkb, e.g. mode */
2371 
_convert_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2372 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2373 {
2374 	int error;
2375 
2376 	if (is_remote(r))
2377 		/* receive_convert() calls do_convert() on remote node */
2378 		error = send_convert(r, lkb);
2379 	else
2380 		error = do_convert(r, lkb);
2381 
2382 	return error;
2383 }
2384 
2385 /* remove an existing lkb from the granted queue */
2386 
_unlock_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2387 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2388 {
2389 	int error;
2390 
2391 	if (is_remote(r))
2392 		/* receive_unlock() calls do_unlock() on remote node */
2393 		error = send_unlock(r, lkb);
2394 	else
2395 		error = do_unlock(r, lkb);
2396 
2397 	return error;
2398 }
2399 
2400 /* remove an existing lkb from the convert or wait queue */
2401 
_cancel_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2402 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2403 {
2404 	int error;
2405 
2406 	if (is_remote(r))
2407 		/* receive_cancel() calls do_cancel() on remote node */
2408 		error = send_cancel(r, lkb);
2409 	else
2410 		error = do_cancel(r, lkb);
2411 
2412 	return error;
2413 }
2414 
2415 /*
2416  * Four stage 2 varieties:
2417  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2418  */
2419 
request_lock(struct dlm_ls * ls,struct dlm_lkb * lkb,char * name,int len,struct dlm_args * args)2420 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2421 			int len, struct dlm_args *args)
2422 {
2423 	struct dlm_rsb *r;
2424 	int error;
2425 
2426 	error = validate_lock_args(ls, lkb, args);
2427 	if (error)
2428 		goto out;
2429 
2430 	error = find_rsb(ls, name, len, R_CREATE, &r);
2431 	if (error)
2432 		goto out;
2433 
2434 	lock_rsb(r);
2435 
2436 	attach_lkb(r, lkb);
2437 	lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2438 
2439 	error = _request_lock(r, lkb);
2440 
2441 	unlock_rsb(r);
2442 	put_rsb(r);
2443 
2444  out:
2445 	return error;
2446 }
2447 
convert_lock(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_args * args)2448 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2449 			struct dlm_args *args)
2450 {
2451 	struct dlm_rsb *r;
2452 	int error;
2453 
2454 	r = lkb->lkb_resource;
2455 
2456 	hold_rsb(r);
2457 	lock_rsb(r);
2458 
2459 	error = validate_lock_args(ls, lkb, args);
2460 	if (error)
2461 		goto out;
2462 
2463 	error = _convert_lock(r, lkb);
2464  out:
2465 	unlock_rsb(r);
2466 	put_rsb(r);
2467 	return error;
2468 }
2469 
unlock_lock(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_args * args)2470 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2471 		       struct dlm_args *args)
2472 {
2473 	struct dlm_rsb *r;
2474 	int error;
2475 
2476 	r = lkb->lkb_resource;
2477 
2478 	hold_rsb(r);
2479 	lock_rsb(r);
2480 
2481 	error = validate_unlock_args(lkb, args);
2482 	if (error)
2483 		goto out;
2484 
2485 	error = _unlock_lock(r, lkb);
2486  out:
2487 	unlock_rsb(r);
2488 	put_rsb(r);
2489 	return error;
2490 }
2491 
cancel_lock(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_args * args)2492 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2493 		       struct dlm_args *args)
2494 {
2495 	struct dlm_rsb *r;
2496 	int error;
2497 
2498 	r = lkb->lkb_resource;
2499 
2500 	hold_rsb(r);
2501 	lock_rsb(r);
2502 
2503 	error = validate_unlock_args(lkb, args);
2504 	if (error)
2505 		goto out;
2506 
2507 	error = _cancel_lock(r, lkb);
2508  out:
2509 	unlock_rsb(r);
2510 	put_rsb(r);
2511 	return error;
2512 }
2513 
2514 /*
2515  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2516  */
2517 
dlm_lock(dlm_lockspace_t * lockspace,int mode,struct dlm_lksb * lksb,uint32_t flags,void * name,unsigned int namelen,uint32_t parent_lkid,void (* ast)(void * astarg),void * astarg,void (* bast)(void * astarg,int mode))2518 int dlm_lock(dlm_lockspace_t *lockspace,
2519 	     int mode,
2520 	     struct dlm_lksb *lksb,
2521 	     uint32_t flags,
2522 	     void *name,
2523 	     unsigned int namelen,
2524 	     uint32_t parent_lkid,
2525 	     void (*ast) (void *astarg),
2526 	     void *astarg,
2527 	     void (*bast) (void *astarg, int mode))
2528 {
2529 	struct dlm_ls *ls;
2530 	struct dlm_lkb *lkb;
2531 	struct dlm_args args;
2532 	int error, convert = flags & DLM_LKF_CONVERT;
2533 
2534 	ls = dlm_find_lockspace_local(lockspace);
2535 	if (!ls)
2536 		return -EINVAL;
2537 
2538 	dlm_lock_recovery(ls);
2539 
2540 	if (convert)
2541 		error = find_lkb(ls, lksb->sb_lkid, &lkb);
2542 	else
2543 		error = create_lkb(ls, &lkb);
2544 
2545 	if (error)
2546 		goto out;
2547 
2548 	error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2549 			      astarg, bast, &args);
2550 	if (error)
2551 		goto out_put;
2552 
2553 	if (convert)
2554 		error = convert_lock(ls, lkb, &args);
2555 	else
2556 		error = request_lock(ls, lkb, name, namelen, &args);
2557 
2558 	if (error == -EINPROGRESS)
2559 		error = 0;
2560  out_put:
2561 	if (convert || error)
2562 		__put_lkb(ls, lkb);
2563 	if (error == -EAGAIN || error == -EDEADLK)
2564 		error = 0;
2565  out:
2566 	dlm_unlock_recovery(ls);
2567 	dlm_put_lockspace(ls);
2568 	return error;
2569 }
2570 
dlm_unlock(dlm_lockspace_t * lockspace,uint32_t lkid,uint32_t flags,struct dlm_lksb * lksb,void * astarg)2571 int dlm_unlock(dlm_lockspace_t *lockspace,
2572 	       uint32_t lkid,
2573 	       uint32_t flags,
2574 	       struct dlm_lksb *lksb,
2575 	       void *astarg)
2576 {
2577 	struct dlm_ls *ls;
2578 	struct dlm_lkb *lkb;
2579 	struct dlm_args args;
2580 	int error;
2581 
2582 	ls = dlm_find_lockspace_local(lockspace);
2583 	if (!ls)
2584 		return -EINVAL;
2585 
2586 	dlm_lock_recovery(ls);
2587 
2588 	error = find_lkb(ls, lkid, &lkb);
2589 	if (error)
2590 		goto out;
2591 
2592 	error = set_unlock_args(flags, astarg, &args);
2593 	if (error)
2594 		goto out_put;
2595 
2596 	if (flags & DLM_LKF_CANCEL)
2597 		error = cancel_lock(ls, lkb, &args);
2598 	else
2599 		error = unlock_lock(ls, lkb, &args);
2600 
2601 	if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2602 		error = 0;
2603 	if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2604 		error = 0;
2605  out_put:
2606 	dlm_put_lkb(lkb);
2607  out:
2608 	dlm_unlock_recovery(ls);
2609 	dlm_put_lockspace(ls);
2610 	return error;
2611 }
2612 
2613 /*
2614  * send/receive routines for remote operations and replies
2615  *
2616  * send_args
2617  * send_common
2618  * send_request			receive_request
2619  * send_convert			receive_convert
2620  * send_unlock			receive_unlock
2621  * send_cancel			receive_cancel
2622  * send_grant			receive_grant
2623  * send_bast			receive_bast
2624  * send_lookup			receive_lookup
2625  * send_remove			receive_remove
2626  *
2627  * 				send_common_reply
2628  * receive_request_reply	send_request_reply
2629  * receive_convert_reply	send_convert_reply
2630  * receive_unlock_reply		send_unlock_reply
2631  * receive_cancel_reply		send_cancel_reply
2632  * receive_lookup_reply		send_lookup_reply
2633  */
2634 
_create_message(struct dlm_ls * ls,int mb_len,int to_nodeid,int mstype,struct dlm_message ** ms_ret,struct dlm_mhandle ** mh_ret)2635 static int _create_message(struct dlm_ls *ls, int mb_len,
2636 			   int to_nodeid, int mstype,
2637 			   struct dlm_message **ms_ret,
2638 			   struct dlm_mhandle **mh_ret)
2639 {
2640 	struct dlm_message *ms;
2641 	struct dlm_mhandle *mh;
2642 	char *mb;
2643 
2644 	/* get_buffer gives us a message handle (mh) that we need to
2645 	   pass into lowcomms_commit and a message buffer (mb) that we
2646 	   write our data into */
2647 
2648 	mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, ls->ls_allocation, &mb);
2649 	if (!mh)
2650 		return -ENOBUFS;
2651 
2652 	memset(mb, 0, mb_len);
2653 
2654 	ms = (struct dlm_message *) mb;
2655 
2656 	ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2657 	ms->m_header.h_lockspace = ls->ls_global_id;
2658 	ms->m_header.h_nodeid = dlm_our_nodeid();
2659 	ms->m_header.h_length = mb_len;
2660 	ms->m_header.h_cmd = DLM_MSG;
2661 
2662 	ms->m_type = mstype;
2663 
2664 	*mh_ret = mh;
2665 	*ms_ret = ms;
2666 	return 0;
2667 }
2668 
create_message(struct dlm_rsb * r,struct dlm_lkb * lkb,int to_nodeid,int mstype,struct dlm_message ** ms_ret,struct dlm_mhandle ** mh_ret)2669 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2670 			  int to_nodeid, int mstype,
2671 			  struct dlm_message **ms_ret,
2672 			  struct dlm_mhandle **mh_ret)
2673 {
2674 	int mb_len = sizeof(struct dlm_message);
2675 
2676 	switch (mstype) {
2677 	case DLM_MSG_REQUEST:
2678 	case DLM_MSG_LOOKUP:
2679 	case DLM_MSG_REMOVE:
2680 		mb_len += r->res_length;
2681 		break;
2682 	case DLM_MSG_CONVERT:
2683 	case DLM_MSG_UNLOCK:
2684 	case DLM_MSG_REQUEST_REPLY:
2685 	case DLM_MSG_CONVERT_REPLY:
2686 	case DLM_MSG_GRANT:
2687 		if (lkb && lkb->lkb_lvbptr)
2688 			mb_len += r->res_ls->ls_lvblen;
2689 		break;
2690 	}
2691 
2692 	return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2693 			       ms_ret, mh_ret);
2694 }
2695 
2696 /* further lowcomms enhancements or alternate implementations may make
2697    the return value from this function useful at some point */
2698 
send_message(struct dlm_mhandle * mh,struct dlm_message * ms)2699 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2700 {
2701 	dlm_message_out(ms);
2702 	dlm_lowcomms_commit_buffer(mh);
2703 	return 0;
2704 }
2705 
send_args(struct dlm_rsb * r,struct dlm_lkb * lkb,struct dlm_message * ms)2706 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2707 		      struct dlm_message *ms)
2708 {
2709 	ms->m_nodeid   = lkb->lkb_nodeid;
2710 	ms->m_pid      = lkb->lkb_ownpid;
2711 	ms->m_lkid     = lkb->lkb_id;
2712 	ms->m_remid    = lkb->lkb_remid;
2713 	ms->m_exflags  = lkb->lkb_exflags;
2714 	ms->m_sbflags  = lkb->lkb_sbflags;
2715 	ms->m_flags    = lkb->lkb_flags;
2716 	ms->m_lvbseq   = lkb->lkb_lvbseq;
2717 	ms->m_status   = lkb->lkb_status;
2718 	ms->m_grmode   = lkb->lkb_grmode;
2719 	ms->m_rqmode   = lkb->lkb_rqmode;
2720 	ms->m_hash     = r->res_hash;
2721 
2722 	/* m_result and m_bastmode are set from function args,
2723 	   not from lkb fields */
2724 
2725 	if (lkb->lkb_bastfn)
2726 		ms->m_asts |= AST_BAST;
2727 	if (lkb->lkb_astfn)
2728 		ms->m_asts |= AST_COMP;
2729 
2730 	/* compare with switch in create_message; send_remove() doesn't
2731 	   use send_args() */
2732 
2733 	switch (ms->m_type) {
2734 	case DLM_MSG_REQUEST:
2735 	case DLM_MSG_LOOKUP:
2736 		memcpy(ms->m_extra, r->res_name, r->res_length);
2737 		break;
2738 	case DLM_MSG_CONVERT:
2739 	case DLM_MSG_UNLOCK:
2740 	case DLM_MSG_REQUEST_REPLY:
2741 	case DLM_MSG_CONVERT_REPLY:
2742 	case DLM_MSG_GRANT:
2743 		if (!lkb->lkb_lvbptr)
2744 			break;
2745 		memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2746 		break;
2747 	}
2748 }
2749 
send_common(struct dlm_rsb * r,struct dlm_lkb * lkb,int mstype)2750 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2751 {
2752 	struct dlm_message *ms;
2753 	struct dlm_mhandle *mh;
2754 	int to_nodeid, error;
2755 
2756 	error = add_to_waiters(lkb, mstype);
2757 	if (error)
2758 		return error;
2759 
2760 	to_nodeid = r->res_nodeid;
2761 
2762 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2763 	if (error)
2764 		goto fail;
2765 
2766 	send_args(r, lkb, ms);
2767 
2768 	error = send_message(mh, ms);
2769 	if (error)
2770 		goto fail;
2771 	return 0;
2772 
2773  fail:
2774 	remove_from_waiters(lkb, msg_reply_type(mstype));
2775 	return error;
2776 }
2777 
send_request(struct dlm_rsb * r,struct dlm_lkb * lkb)2778 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2779 {
2780 	return send_common(r, lkb, DLM_MSG_REQUEST);
2781 }
2782 
send_convert(struct dlm_rsb * r,struct dlm_lkb * lkb)2783 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2784 {
2785 	int error;
2786 
2787 	error = send_common(r, lkb, DLM_MSG_CONVERT);
2788 
2789 	/* down conversions go without a reply from the master */
2790 	if (!error && down_conversion(lkb)) {
2791 		remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2792 		r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2793 		r->res_ls->ls_stub_ms.m_result = 0;
2794 		r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2795 		__receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2796 	}
2797 
2798 	return error;
2799 }
2800 
2801 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2802    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2803    that the master is still correct. */
2804 
send_unlock(struct dlm_rsb * r,struct dlm_lkb * lkb)2805 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2806 {
2807 	return send_common(r, lkb, DLM_MSG_UNLOCK);
2808 }
2809 
send_cancel(struct dlm_rsb * r,struct dlm_lkb * lkb)2810 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2811 {
2812 	return send_common(r, lkb, DLM_MSG_CANCEL);
2813 }
2814 
send_grant(struct dlm_rsb * r,struct dlm_lkb * lkb)2815 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2816 {
2817 	struct dlm_message *ms;
2818 	struct dlm_mhandle *mh;
2819 	int to_nodeid, error;
2820 
2821 	to_nodeid = lkb->lkb_nodeid;
2822 
2823 	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2824 	if (error)
2825 		goto out;
2826 
2827 	send_args(r, lkb, ms);
2828 
2829 	ms->m_result = 0;
2830 
2831 	error = send_message(mh, ms);
2832  out:
2833 	return error;
2834 }
2835 
send_bast(struct dlm_rsb * r,struct dlm_lkb * lkb,int mode)2836 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2837 {
2838 	struct dlm_message *ms;
2839 	struct dlm_mhandle *mh;
2840 	int to_nodeid, error;
2841 
2842 	to_nodeid = lkb->lkb_nodeid;
2843 
2844 	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2845 	if (error)
2846 		goto out;
2847 
2848 	send_args(r, lkb, ms);
2849 
2850 	ms->m_bastmode = mode;
2851 
2852 	error = send_message(mh, ms);
2853  out:
2854 	return error;
2855 }
2856 
send_lookup(struct dlm_rsb * r,struct dlm_lkb * lkb)2857 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2858 {
2859 	struct dlm_message *ms;
2860 	struct dlm_mhandle *mh;
2861 	int to_nodeid, error;
2862 
2863 	error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2864 	if (error)
2865 		return error;
2866 
2867 	to_nodeid = dlm_dir_nodeid(r);
2868 
2869 	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2870 	if (error)
2871 		goto fail;
2872 
2873 	send_args(r, lkb, ms);
2874 
2875 	error = send_message(mh, ms);
2876 	if (error)
2877 		goto fail;
2878 	return 0;
2879 
2880  fail:
2881 	remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2882 	return error;
2883 }
2884 
send_remove(struct dlm_rsb * r)2885 static int send_remove(struct dlm_rsb *r)
2886 {
2887 	struct dlm_message *ms;
2888 	struct dlm_mhandle *mh;
2889 	int to_nodeid, error;
2890 
2891 	to_nodeid = dlm_dir_nodeid(r);
2892 
2893 	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2894 	if (error)
2895 		goto out;
2896 
2897 	memcpy(ms->m_extra, r->res_name, r->res_length);
2898 	ms->m_hash = r->res_hash;
2899 
2900 	error = send_message(mh, ms);
2901  out:
2902 	return error;
2903 }
2904 
send_common_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int mstype,int rv)2905 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2906 			     int mstype, int rv)
2907 {
2908 	struct dlm_message *ms;
2909 	struct dlm_mhandle *mh;
2910 	int to_nodeid, error;
2911 
2912 	to_nodeid = lkb->lkb_nodeid;
2913 
2914 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2915 	if (error)
2916 		goto out;
2917 
2918 	send_args(r, lkb, ms);
2919 
2920 	ms->m_result = rv;
2921 
2922 	error = send_message(mh, ms);
2923  out:
2924 	return error;
2925 }
2926 
send_request_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)2927 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2928 {
2929 	return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2930 }
2931 
send_convert_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)2932 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2933 {
2934 	return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2935 }
2936 
send_unlock_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)2937 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2938 {
2939 	return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2940 }
2941 
send_cancel_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)2942 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2943 {
2944 	return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2945 }
2946 
send_lookup_reply(struct dlm_ls * ls,struct dlm_message * ms_in,int ret_nodeid,int rv)2947 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2948 			     int ret_nodeid, int rv)
2949 {
2950 	struct dlm_rsb *r = &ls->ls_stub_rsb;
2951 	struct dlm_message *ms;
2952 	struct dlm_mhandle *mh;
2953 	int error, nodeid = ms_in->m_header.h_nodeid;
2954 
2955 	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2956 	if (error)
2957 		goto out;
2958 
2959 	ms->m_lkid = ms_in->m_lkid;
2960 	ms->m_result = rv;
2961 	ms->m_nodeid = ret_nodeid;
2962 
2963 	error = send_message(mh, ms);
2964  out:
2965 	return error;
2966 }
2967 
2968 /* which args we save from a received message depends heavily on the type
2969    of message, unlike the send side where we can safely send everything about
2970    the lkb for any type of message */
2971 
receive_flags(struct dlm_lkb * lkb,struct dlm_message * ms)2972 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2973 {
2974 	lkb->lkb_exflags = ms->m_exflags;
2975 	lkb->lkb_sbflags = ms->m_sbflags;
2976 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2977 		         (ms->m_flags & 0x0000FFFF);
2978 }
2979 
receive_flags_reply(struct dlm_lkb * lkb,struct dlm_message * ms)2980 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2981 {
2982 	lkb->lkb_sbflags = ms->m_sbflags;
2983 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2984 		         (ms->m_flags & 0x0000FFFF);
2985 }
2986 
receive_extralen(struct dlm_message * ms)2987 static int receive_extralen(struct dlm_message *ms)
2988 {
2989 	return (ms->m_header.h_length - sizeof(struct dlm_message));
2990 }
2991 
receive_lvb(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_message * ms)2992 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2993 		       struct dlm_message *ms)
2994 {
2995 	int len;
2996 
2997 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2998 		if (!lkb->lkb_lvbptr)
2999 			lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3000 		if (!lkb->lkb_lvbptr)
3001 			return -ENOMEM;
3002 		len = receive_extralen(ms);
3003 		if (len > DLM_RESNAME_MAXLEN)
3004 			len = DLM_RESNAME_MAXLEN;
3005 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3006 	}
3007 	return 0;
3008 }
3009 
fake_bastfn(void * astparam,int mode)3010 static void fake_bastfn(void *astparam, int mode)
3011 {
3012 	log_print("fake_bastfn should not be called");
3013 }
3014 
fake_astfn(void * astparam)3015 static void fake_astfn(void *astparam)
3016 {
3017 	log_print("fake_astfn should not be called");
3018 }
3019 
receive_request_args(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_message * ms)3020 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3021 				struct dlm_message *ms)
3022 {
3023 	lkb->lkb_nodeid = ms->m_header.h_nodeid;
3024 	lkb->lkb_ownpid = ms->m_pid;
3025 	lkb->lkb_remid = ms->m_lkid;
3026 	lkb->lkb_grmode = DLM_LOCK_IV;
3027 	lkb->lkb_rqmode = ms->m_rqmode;
3028 
3029 	lkb->lkb_bastfn = (ms->m_asts & AST_BAST) ? &fake_bastfn : NULL;
3030 	lkb->lkb_astfn = (ms->m_asts & AST_COMP) ? &fake_astfn : NULL;
3031 
3032 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3033 		/* lkb was just created so there won't be an lvb yet */
3034 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3035 		if (!lkb->lkb_lvbptr)
3036 			return -ENOMEM;
3037 	}
3038 
3039 	return 0;
3040 }
3041 
receive_convert_args(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_message * ms)3042 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3043 				struct dlm_message *ms)
3044 {
3045 	if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3046 		return -EBUSY;
3047 
3048 	if (receive_lvb(ls, lkb, ms))
3049 		return -ENOMEM;
3050 
3051 	lkb->lkb_rqmode = ms->m_rqmode;
3052 	lkb->lkb_lvbseq = ms->m_lvbseq;
3053 
3054 	return 0;
3055 }
3056 
receive_unlock_args(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_message * ms)3057 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3058 			       struct dlm_message *ms)
3059 {
3060 	if (receive_lvb(ls, lkb, ms))
3061 		return -ENOMEM;
3062 	return 0;
3063 }
3064 
3065 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3066    uses to send a reply and that the remote end uses to process the reply. */
3067 
setup_stub_lkb(struct dlm_ls * ls,struct dlm_message * ms)3068 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3069 {
3070 	struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3071 	lkb->lkb_nodeid = ms->m_header.h_nodeid;
3072 	lkb->lkb_remid = ms->m_lkid;
3073 }
3074 
3075 /* This is called after the rsb is locked so that we can safely inspect
3076    fields in the lkb. */
3077 
validate_message(struct dlm_lkb * lkb,struct dlm_message * ms)3078 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3079 {
3080 	int from = ms->m_header.h_nodeid;
3081 	int error = 0;
3082 
3083 	switch (ms->m_type) {
3084 	case DLM_MSG_CONVERT:
3085 	case DLM_MSG_UNLOCK:
3086 	case DLM_MSG_CANCEL:
3087 		if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3088 			error = -EINVAL;
3089 		break;
3090 
3091 	case DLM_MSG_CONVERT_REPLY:
3092 	case DLM_MSG_UNLOCK_REPLY:
3093 	case DLM_MSG_CANCEL_REPLY:
3094 	case DLM_MSG_GRANT:
3095 	case DLM_MSG_BAST:
3096 		if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3097 			error = -EINVAL;
3098 		break;
3099 
3100 	case DLM_MSG_REQUEST_REPLY:
3101 		if (!is_process_copy(lkb))
3102 			error = -EINVAL;
3103 		else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3104 			error = -EINVAL;
3105 		break;
3106 
3107 	default:
3108 		error = -EINVAL;
3109 	}
3110 
3111 	if (error)
3112 		log_error(lkb->lkb_resource->res_ls,
3113 			  "ignore invalid message %d from %d %x %x %x %d",
3114 			  ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3115 			  lkb->lkb_flags, lkb->lkb_nodeid);
3116 	return error;
3117 }
3118 
receive_request(struct dlm_ls * ls,struct dlm_message * ms)3119 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3120 {
3121 	struct dlm_lkb *lkb;
3122 	struct dlm_rsb *r;
3123 	int error, namelen;
3124 
3125 	error = create_lkb(ls, &lkb);
3126 	if (error)
3127 		goto fail;
3128 
3129 	receive_flags(lkb, ms);
3130 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
3131 	error = receive_request_args(ls, lkb, ms);
3132 	if (error) {
3133 		__put_lkb(ls, lkb);
3134 		goto fail;
3135 	}
3136 
3137 	namelen = receive_extralen(ms);
3138 
3139 	error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3140 	if (error) {
3141 		__put_lkb(ls, lkb);
3142 		goto fail;
3143 	}
3144 
3145 	lock_rsb(r);
3146 
3147 	attach_lkb(r, lkb);
3148 	error = do_request(r, lkb);
3149 	send_request_reply(r, lkb, error);
3150 
3151 	unlock_rsb(r);
3152 	put_rsb(r);
3153 
3154 	if (error == -EINPROGRESS)
3155 		error = 0;
3156 	if (error)
3157 		dlm_put_lkb(lkb);
3158 	return;
3159 
3160  fail:
3161 	setup_stub_lkb(ls, ms);
3162 	send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3163 }
3164 
receive_convert(struct dlm_ls * ls,struct dlm_message * ms)3165 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3166 {
3167 	struct dlm_lkb *lkb;
3168 	struct dlm_rsb *r;
3169 	int error, reply = 1;
3170 
3171 	error = find_lkb(ls, ms->m_remid, &lkb);
3172 	if (error)
3173 		goto fail;
3174 
3175 	r = lkb->lkb_resource;
3176 
3177 	hold_rsb(r);
3178 	lock_rsb(r);
3179 
3180 	error = validate_message(lkb, ms);
3181 	if (error)
3182 		goto out;
3183 
3184 	receive_flags(lkb, ms);
3185 	error = receive_convert_args(ls, lkb, ms);
3186 	if (error)
3187 		goto out_reply;
3188 	reply = !down_conversion(lkb);
3189 
3190 	error = do_convert(r, lkb);
3191  out_reply:
3192 	if (reply)
3193 		send_convert_reply(r, lkb, error);
3194  out:
3195 	unlock_rsb(r);
3196 	put_rsb(r);
3197 	dlm_put_lkb(lkb);
3198 	return;
3199 
3200  fail:
3201 	setup_stub_lkb(ls, ms);
3202 	send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3203 }
3204 
receive_unlock(struct dlm_ls * ls,struct dlm_message * ms)3205 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3206 {
3207 	struct dlm_lkb *lkb;
3208 	struct dlm_rsb *r;
3209 	int error;
3210 
3211 	error = find_lkb(ls, ms->m_remid, &lkb);
3212 	if (error)
3213 		goto fail;
3214 
3215 	r = lkb->lkb_resource;
3216 
3217 	hold_rsb(r);
3218 	lock_rsb(r);
3219 
3220 	error = validate_message(lkb, ms);
3221 	if (error)
3222 		goto out;
3223 
3224 	receive_flags(lkb, ms);
3225 	error = receive_unlock_args(ls, lkb, ms);
3226 	if (error)
3227 		goto out_reply;
3228 
3229 	error = do_unlock(r, lkb);
3230  out_reply:
3231 	send_unlock_reply(r, lkb, error);
3232  out:
3233 	unlock_rsb(r);
3234 	put_rsb(r);
3235 	dlm_put_lkb(lkb);
3236 	return;
3237 
3238  fail:
3239 	setup_stub_lkb(ls, ms);
3240 	send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3241 }
3242 
receive_cancel(struct dlm_ls * ls,struct dlm_message * ms)3243 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3244 {
3245 	struct dlm_lkb *lkb;
3246 	struct dlm_rsb *r;
3247 	int error;
3248 
3249 	error = find_lkb(ls, ms->m_remid, &lkb);
3250 	if (error)
3251 		goto fail;
3252 
3253 	receive_flags(lkb, ms);
3254 
3255 	r = lkb->lkb_resource;
3256 
3257 	hold_rsb(r);
3258 	lock_rsb(r);
3259 
3260 	error = validate_message(lkb, ms);
3261 	if (error)
3262 		goto out;
3263 
3264 	error = do_cancel(r, lkb);
3265 	send_cancel_reply(r, lkb, error);
3266  out:
3267 	unlock_rsb(r);
3268 	put_rsb(r);
3269 	dlm_put_lkb(lkb);
3270 	return;
3271 
3272  fail:
3273 	setup_stub_lkb(ls, ms);
3274 	send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3275 }
3276 
receive_grant(struct dlm_ls * ls,struct dlm_message * ms)3277 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3278 {
3279 	struct dlm_lkb *lkb;
3280 	struct dlm_rsb *r;
3281 	int error;
3282 
3283 	error = find_lkb(ls, ms->m_remid, &lkb);
3284 	if (error) {
3285 		log_debug(ls, "receive_grant from %d no lkb %x",
3286 			  ms->m_header.h_nodeid, ms->m_remid);
3287 		return;
3288 	}
3289 
3290 	r = lkb->lkb_resource;
3291 
3292 	hold_rsb(r);
3293 	lock_rsb(r);
3294 
3295 	error = validate_message(lkb, ms);
3296 	if (error)
3297 		goto out;
3298 
3299 	receive_flags_reply(lkb, ms);
3300 	if (is_altmode(lkb))
3301 		munge_altmode(lkb, ms);
3302 	grant_lock_pc(r, lkb, ms);
3303 	queue_cast(r, lkb, 0);
3304  out:
3305 	unlock_rsb(r);
3306 	put_rsb(r);
3307 	dlm_put_lkb(lkb);
3308 }
3309 
receive_bast(struct dlm_ls * ls,struct dlm_message * ms)3310 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3311 {
3312 	struct dlm_lkb *lkb;
3313 	struct dlm_rsb *r;
3314 	int error;
3315 
3316 	error = find_lkb(ls, ms->m_remid, &lkb);
3317 	if (error) {
3318 		log_debug(ls, "receive_bast from %d no lkb %x",
3319 			  ms->m_header.h_nodeid, ms->m_remid);
3320 		return;
3321 	}
3322 
3323 	r = lkb->lkb_resource;
3324 
3325 	hold_rsb(r);
3326 	lock_rsb(r);
3327 
3328 	error = validate_message(lkb, ms);
3329 	if (error)
3330 		goto out;
3331 
3332 	queue_bast(r, lkb, ms->m_bastmode);
3333  out:
3334 	unlock_rsb(r);
3335 	put_rsb(r);
3336 	dlm_put_lkb(lkb);
3337 }
3338 
receive_lookup(struct dlm_ls * ls,struct dlm_message * ms)3339 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3340 {
3341 	int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3342 
3343 	from_nodeid = ms->m_header.h_nodeid;
3344 	our_nodeid = dlm_our_nodeid();
3345 
3346 	len = receive_extralen(ms);
3347 
3348 	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3349 	if (dir_nodeid != our_nodeid) {
3350 		log_error(ls, "lookup dir_nodeid %d from %d",
3351 			  dir_nodeid, from_nodeid);
3352 		error = -EINVAL;
3353 		ret_nodeid = -1;
3354 		goto out;
3355 	}
3356 
3357 	error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3358 
3359 	/* Optimization: we're master so treat lookup as a request */
3360 	if (!error && ret_nodeid == our_nodeid) {
3361 		receive_request(ls, ms);
3362 		return;
3363 	}
3364  out:
3365 	send_lookup_reply(ls, ms, ret_nodeid, error);
3366 }
3367 
receive_remove(struct dlm_ls * ls,struct dlm_message * ms)3368 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3369 {
3370 	int len, dir_nodeid, from_nodeid;
3371 
3372 	from_nodeid = ms->m_header.h_nodeid;
3373 
3374 	len = receive_extralen(ms);
3375 
3376 	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3377 	if (dir_nodeid != dlm_our_nodeid()) {
3378 		log_error(ls, "remove dir entry dir_nodeid %d from %d",
3379 			  dir_nodeid, from_nodeid);
3380 		return;
3381 	}
3382 
3383 	dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3384 }
3385 
receive_purge(struct dlm_ls * ls,struct dlm_message * ms)3386 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3387 {
3388 	do_purge(ls, ms->m_nodeid, ms->m_pid);
3389 }
3390 
receive_request_reply(struct dlm_ls * ls,struct dlm_message * ms)3391 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3392 {
3393 	struct dlm_lkb *lkb;
3394 	struct dlm_rsb *r;
3395 	int error, mstype, result;
3396 
3397 	error = find_lkb(ls, ms->m_remid, &lkb);
3398 	if (error) {
3399 		log_debug(ls, "receive_request_reply from %d no lkb %x",
3400 			  ms->m_header.h_nodeid, ms->m_remid);
3401 		return;
3402 	}
3403 
3404 	r = lkb->lkb_resource;
3405 	hold_rsb(r);
3406 	lock_rsb(r);
3407 
3408 	error = validate_message(lkb, ms);
3409 	if (error)
3410 		goto out;
3411 
3412 	mstype = lkb->lkb_wait_type;
3413 	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3414 	if (error)
3415 		goto out;
3416 
3417 	/* Optimization: the dir node was also the master, so it took our
3418 	   lookup as a request and sent request reply instead of lookup reply */
3419 	if (mstype == DLM_MSG_LOOKUP) {
3420 		r->res_nodeid = ms->m_header.h_nodeid;
3421 		lkb->lkb_nodeid = r->res_nodeid;
3422 	}
3423 
3424 	/* this is the value returned from do_request() on the master */
3425 	result = ms->m_result;
3426 
3427 	switch (result) {
3428 	case -EAGAIN:
3429 		/* request would block (be queued) on remote master */
3430 		queue_cast(r, lkb, -EAGAIN);
3431 		confirm_master(r, -EAGAIN);
3432 		unhold_lkb(lkb); /* undoes create_lkb() */
3433 		break;
3434 
3435 	case -EINPROGRESS:
3436 	case 0:
3437 		/* request was queued or granted on remote master */
3438 		receive_flags_reply(lkb, ms);
3439 		lkb->lkb_remid = ms->m_lkid;
3440 		if (is_altmode(lkb))
3441 			munge_altmode(lkb, ms);
3442 		if (result) {
3443 			add_lkb(r, lkb, DLM_LKSTS_WAITING);
3444 			add_timeout(lkb);
3445 		} else {
3446 			grant_lock_pc(r, lkb, ms);
3447 			queue_cast(r, lkb, 0);
3448 		}
3449 		confirm_master(r, result);
3450 		break;
3451 
3452 	case -EBADR:
3453 	case -ENOTBLK:
3454 		/* find_rsb failed to find rsb or rsb wasn't master */
3455 		log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3456 			  lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3457 		r->res_nodeid = -1;
3458 		lkb->lkb_nodeid = -1;
3459 
3460 		if (is_overlap(lkb)) {
3461 			/* we'll ignore error in cancel/unlock reply */
3462 			queue_cast_overlap(r, lkb);
3463 			confirm_master(r, result);
3464 			unhold_lkb(lkb); /* undoes create_lkb() */
3465 		} else
3466 			_request_lock(r, lkb);
3467 		break;
3468 
3469 	default:
3470 		log_error(ls, "receive_request_reply %x error %d",
3471 			  lkb->lkb_id, result);
3472 	}
3473 
3474 	if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3475 		log_debug(ls, "receive_request_reply %x result %d unlock",
3476 			  lkb->lkb_id, result);
3477 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3478 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3479 		send_unlock(r, lkb);
3480 	} else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3481 		log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3482 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3483 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3484 		send_cancel(r, lkb);
3485 	} else {
3486 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3487 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3488 	}
3489  out:
3490 	unlock_rsb(r);
3491 	put_rsb(r);
3492 	dlm_put_lkb(lkb);
3493 }
3494 
__receive_convert_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,struct dlm_message * ms)3495 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3496 				    struct dlm_message *ms)
3497 {
3498 	/* this is the value returned from do_convert() on the master */
3499 	switch (ms->m_result) {
3500 	case -EAGAIN:
3501 		/* convert would block (be queued) on remote master */
3502 		queue_cast(r, lkb, -EAGAIN);
3503 		break;
3504 
3505 	case -EDEADLK:
3506 		receive_flags_reply(lkb, ms);
3507 		revert_lock_pc(r, lkb);
3508 		queue_cast(r, lkb, -EDEADLK);
3509 		break;
3510 
3511 	case -EINPROGRESS:
3512 		/* convert was queued on remote master */
3513 		receive_flags_reply(lkb, ms);
3514 		if (is_demoted(lkb))
3515 			munge_demoted(lkb, ms);
3516 		del_lkb(r, lkb);
3517 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3518 		add_timeout(lkb);
3519 		break;
3520 
3521 	case 0:
3522 		/* convert was granted on remote master */
3523 		receive_flags_reply(lkb, ms);
3524 		if (is_demoted(lkb))
3525 			munge_demoted(lkb, ms);
3526 		grant_lock_pc(r, lkb, ms);
3527 		queue_cast(r, lkb, 0);
3528 		break;
3529 
3530 	default:
3531 		log_error(r->res_ls, "receive_convert_reply %x error %d",
3532 			  lkb->lkb_id, ms->m_result);
3533 	}
3534 }
3535 
_receive_convert_reply(struct dlm_lkb * lkb,struct dlm_message * ms)3536 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3537 {
3538 	struct dlm_rsb *r = lkb->lkb_resource;
3539 	int error;
3540 
3541 	hold_rsb(r);
3542 	lock_rsb(r);
3543 
3544 	error = validate_message(lkb, ms);
3545 	if (error)
3546 		goto out;
3547 
3548 	/* stub reply can happen with waiters_mutex held */
3549 	error = remove_from_waiters_ms(lkb, ms);
3550 	if (error)
3551 		goto out;
3552 
3553 	__receive_convert_reply(r, lkb, ms);
3554  out:
3555 	unlock_rsb(r);
3556 	put_rsb(r);
3557 }
3558 
receive_convert_reply(struct dlm_ls * ls,struct dlm_message * ms)3559 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3560 {
3561 	struct dlm_lkb *lkb;
3562 	int error;
3563 
3564 	error = find_lkb(ls, ms->m_remid, &lkb);
3565 	if (error) {
3566 		log_debug(ls, "receive_convert_reply from %d no lkb %x",
3567 			  ms->m_header.h_nodeid, ms->m_remid);
3568 		return;
3569 	}
3570 
3571 	_receive_convert_reply(lkb, ms);
3572 	dlm_put_lkb(lkb);
3573 }
3574 
_receive_unlock_reply(struct dlm_lkb * lkb,struct dlm_message * ms)3575 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3576 {
3577 	struct dlm_rsb *r = lkb->lkb_resource;
3578 	int error;
3579 
3580 	hold_rsb(r);
3581 	lock_rsb(r);
3582 
3583 	error = validate_message(lkb, ms);
3584 	if (error)
3585 		goto out;
3586 
3587 	/* stub reply can happen with waiters_mutex held */
3588 	error = remove_from_waiters_ms(lkb, ms);
3589 	if (error)
3590 		goto out;
3591 
3592 	/* this is the value returned from do_unlock() on the master */
3593 
3594 	switch (ms->m_result) {
3595 	case -DLM_EUNLOCK:
3596 		receive_flags_reply(lkb, ms);
3597 		remove_lock_pc(r, lkb);
3598 		queue_cast(r, lkb, -DLM_EUNLOCK);
3599 		break;
3600 	case -ENOENT:
3601 		break;
3602 	default:
3603 		log_error(r->res_ls, "receive_unlock_reply %x error %d",
3604 			  lkb->lkb_id, ms->m_result);
3605 	}
3606  out:
3607 	unlock_rsb(r);
3608 	put_rsb(r);
3609 }
3610 
receive_unlock_reply(struct dlm_ls * ls,struct dlm_message * ms)3611 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3612 {
3613 	struct dlm_lkb *lkb;
3614 	int error;
3615 
3616 	error = find_lkb(ls, ms->m_remid, &lkb);
3617 	if (error) {
3618 		log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3619 			  ms->m_header.h_nodeid, ms->m_remid);
3620 		return;
3621 	}
3622 
3623 	_receive_unlock_reply(lkb, ms);
3624 	dlm_put_lkb(lkb);
3625 }
3626 
_receive_cancel_reply(struct dlm_lkb * lkb,struct dlm_message * ms)3627 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3628 {
3629 	struct dlm_rsb *r = lkb->lkb_resource;
3630 	int error;
3631 
3632 	hold_rsb(r);
3633 	lock_rsb(r);
3634 
3635 	error = validate_message(lkb, ms);
3636 	if (error)
3637 		goto out;
3638 
3639 	/* stub reply can happen with waiters_mutex held */
3640 	error = remove_from_waiters_ms(lkb, ms);
3641 	if (error)
3642 		goto out;
3643 
3644 	/* this is the value returned from do_cancel() on the master */
3645 
3646 	switch (ms->m_result) {
3647 	case -DLM_ECANCEL:
3648 		receive_flags_reply(lkb, ms);
3649 		revert_lock_pc(r, lkb);
3650 		queue_cast(r, lkb, -DLM_ECANCEL);
3651 		break;
3652 	case 0:
3653 		break;
3654 	default:
3655 		log_error(r->res_ls, "receive_cancel_reply %x error %d",
3656 			  lkb->lkb_id, ms->m_result);
3657 	}
3658  out:
3659 	unlock_rsb(r);
3660 	put_rsb(r);
3661 }
3662 
receive_cancel_reply(struct dlm_ls * ls,struct dlm_message * ms)3663 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3664 {
3665 	struct dlm_lkb *lkb;
3666 	int error;
3667 
3668 	error = find_lkb(ls, ms->m_remid, &lkb);
3669 	if (error) {
3670 		log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3671 			  ms->m_header.h_nodeid, ms->m_remid);
3672 		return;
3673 	}
3674 
3675 	_receive_cancel_reply(lkb, ms);
3676 	dlm_put_lkb(lkb);
3677 }
3678 
receive_lookup_reply(struct dlm_ls * ls,struct dlm_message * ms)3679 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3680 {
3681 	struct dlm_lkb *lkb;
3682 	struct dlm_rsb *r;
3683 	int error, ret_nodeid;
3684 
3685 	error = find_lkb(ls, ms->m_lkid, &lkb);
3686 	if (error) {
3687 		log_error(ls, "receive_lookup_reply no lkb");
3688 		return;
3689 	}
3690 
3691 	/* ms->m_result is the value returned by dlm_dir_lookup on dir node
3692 	   FIXME: will a non-zero error ever be returned? */
3693 
3694 	r = lkb->lkb_resource;
3695 	hold_rsb(r);
3696 	lock_rsb(r);
3697 
3698 	error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3699 	if (error)
3700 		goto out;
3701 
3702 	ret_nodeid = ms->m_nodeid;
3703 	if (ret_nodeid == dlm_our_nodeid()) {
3704 		r->res_nodeid = 0;
3705 		ret_nodeid = 0;
3706 		r->res_first_lkid = 0;
3707 	} else {
3708 		/* set_master() will copy res_nodeid to lkb_nodeid */
3709 		r->res_nodeid = ret_nodeid;
3710 	}
3711 
3712 	if (is_overlap(lkb)) {
3713 		log_debug(ls, "receive_lookup_reply %x unlock %x",
3714 			  lkb->lkb_id, lkb->lkb_flags);
3715 		queue_cast_overlap(r, lkb);
3716 		unhold_lkb(lkb); /* undoes create_lkb() */
3717 		goto out_list;
3718 	}
3719 
3720 	_request_lock(r, lkb);
3721 
3722  out_list:
3723 	if (!ret_nodeid)
3724 		process_lookup_list(r);
3725  out:
3726 	unlock_rsb(r);
3727 	put_rsb(r);
3728 	dlm_put_lkb(lkb);
3729 }
3730 
_receive_message(struct dlm_ls * ls,struct dlm_message * ms)3731 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3732 {
3733 	if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3734 		log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3735 			  ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3736 			  ms->m_remid, ms->m_result);
3737 		return;
3738 	}
3739 
3740 	switch (ms->m_type) {
3741 
3742 	/* messages sent to a master node */
3743 
3744 	case DLM_MSG_REQUEST:
3745 		receive_request(ls, ms);
3746 		break;
3747 
3748 	case DLM_MSG_CONVERT:
3749 		receive_convert(ls, ms);
3750 		break;
3751 
3752 	case DLM_MSG_UNLOCK:
3753 		receive_unlock(ls, ms);
3754 		break;
3755 
3756 	case DLM_MSG_CANCEL:
3757 		receive_cancel(ls, ms);
3758 		break;
3759 
3760 	/* messages sent from a master node (replies to above) */
3761 
3762 	case DLM_MSG_REQUEST_REPLY:
3763 		receive_request_reply(ls, ms);
3764 		break;
3765 
3766 	case DLM_MSG_CONVERT_REPLY:
3767 		receive_convert_reply(ls, ms);
3768 		break;
3769 
3770 	case DLM_MSG_UNLOCK_REPLY:
3771 		receive_unlock_reply(ls, ms);
3772 		break;
3773 
3774 	case DLM_MSG_CANCEL_REPLY:
3775 		receive_cancel_reply(ls, ms);
3776 		break;
3777 
3778 	/* messages sent from a master node (only two types of async msg) */
3779 
3780 	case DLM_MSG_GRANT:
3781 		receive_grant(ls, ms);
3782 		break;
3783 
3784 	case DLM_MSG_BAST:
3785 		receive_bast(ls, ms);
3786 		break;
3787 
3788 	/* messages sent to a dir node */
3789 
3790 	case DLM_MSG_LOOKUP:
3791 		receive_lookup(ls, ms);
3792 		break;
3793 
3794 	case DLM_MSG_REMOVE:
3795 		receive_remove(ls, ms);
3796 		break;
3797 
3798 	/* messages sent from a dir node (remove has no reply) */
3799 
3800 	case DLM_MSG_LOOKUP_REPLY:
3801 		receive_lookup_reply(ls, ms);
3802 		break;
3803 
3804 	/* other messages */
3805 
3806 	case DLM_MSG_PURGE:
3807 		receive_purge(ls, ms);
3808 		break;
3809 
3810 	default:
3811 		log_error(ls, "unknown message type %d", ms->m_type);
3812 	}
3813 
3814 	dlm_astd_wake();
3815 }
3816 
3817 /* If the lockspace is in recovery mode (locking stopped), then normal
3818    messages are saved on the requestqueue for processing after recovery is
3819    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
3820    messages off the requestqueue before we process new ones. This occurs right
3821    after recovery completes when we transition from saving all messages on
3822    requestqueue, to processing all the saved messages, to processing new
3823    messages as they arrive. */
3824 
dlm_receive_message(struct dlm_ls * ls,struct dlm_message * ms,int nodeid)3825 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
3826 				int nodeid)
3827 {
3828 	if (dlm_locking_stopped(ls)) {
3829 		dlm_add_requestqueue(ls, nodeid, ms);
3830 	} else {
3831 		dlm_wait_requestqueue(ls);
3832 		_receive_message(ls, ms);
3833 	}
3834 }
3835 
3836 /* This is called by dlm_recoverd to process messages that were saved on
3837    the requestqueue. */
3838 
dlm_receive_message_saved(struct dlm_ls * ls,struct dlm_message * ms)3839 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
3840 {
3841 	_receive_message(ls, ms);
3842 }
3843 
3844 /* This is called by the midcomms layer when something is received for
3845    the lockspace.  It could be either a MSG (normal message sent as part of
3846    standard locking activity) or an RCOM (recovery message sent as part of
3847    lockspace recovery). */
3848 
dlm_receive_buffer(union dlm_packet * p,int nodeid)3849 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
3850 {
3851 	struct dlm_header *hd = &p->header;
3852 	struct dlm_ls *ls;
3853 	int type = 0;
3854 
3855 	switch (hd->h_cmd) {
3856 	case DLM_MSG:
3857 		dlm_message_in(&p->message);
3858 		type = p->message.m_type;
3859 		break;
3860 	case DLM_RCOM:
3861 		dlm_rcom_in(&p->rcom);
3862 		type = p->rcom.rc_type;
3863 		break;
3864 	default:
3865 		log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
3866 		return;
3867 	}
3868 
3869 	if (hd->h_nodeid != nodeid) {
3870 		log_print("invalid h_nodeid %d from %d lockspace %x",
3871 			  hd->h_nodeid, nodeid, hd->h_lockspace);
3872 		return;
3873 	}
3874 
3875 	ls = dlm_find_lockspace_global(hd->h_lockspace);
3876 	if (!ls) {
3877 		if (dlm_config.ci_log_debug)
3878 			log_print("invalid lockspace %x from %d cmd %d type %d",
3879 				  hd->h_lockspace, nodeid, hd->h_cmd, type);
3880 
3881 		if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
3882 			dlm_send_ls_not_ready(nodeid, &p->rcom);
3883 		return;
3884 	}
3885 
3886 	/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
3887 	   be inactive (in this ls) before transitioning to recovery mode */
3888 
3889 	down_read(&ls->ls_recv_active);
3890 	if (hd->h_cmd == DLM_MSG)
3891 		dlm_receive_message(ls, &p->message, nodeid);
3892 	else
3893 		dlm_receive_rcom(ls, &p->rcom, nodeid);
3894 	up_read(&ls->ls_recv_active);
3895 
3896 	dlm_put_lockspace(ls);
3897 }
3898 
recover_convert_waiter(struct dlm_ls * ls,struct dlm_lkb * lkb)3899 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3900 {
3901 	if (middle_conversion(lkb)) {
3902 		hold_lkb(lkb);
3903 		ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3904 		ls->ls_stub_ms.m_result = -EINPROGRESS;
3905 		ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3906 		ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
3907 		_receive_convert_reply(lkb, &ls->ls_stub_ms);
3908 
3909 		/* Same special case as in receive_rcom_lock_args() */
3910 		lkb->lkb_grmode = DLM_LOCK_IV;
3911 		rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3912 		unhold_lkb(lkb);
3913 
3914 	} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3915 		lkb->lkb_flags |= DLM_IFL_RESEND;
3916 	}
3917 
3918 	/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3919 	   conversions are async; there's no reply from the remote master */
3920 }
3921 
3922 /* A waiting lkb needs recovery if the master node has failed, or
3923    the master node is changing (only when no directory is used) */
3924 
waiter_needs_recovery(struct dlm_ls * ls,struct dlm_lkb * lkb)3925 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3926 {
3927 	if (dlm_is_removed(ls, lkb->lkb_nodeid))
3928 		return 1;
3929 
3930 	if (!dlm_no_directory(ls))
3931 		return 0;
3932 
3933 	if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3934 		return 1;
3935 
3936 	return 0;
3937 }
3938 
3939 /* Recovery for locks that are waiting for replies from nodes that are now
3940    gone.  We can just complete unlocks and cancels by faking a reply from the
3941    dead node.  Requests and up-conversions we flag to be resent after
3942    recovery.  Down-conversions can just be completed with a fake reply like
3943    unlocks.  Conversions between PR and CW need special attention. */
3944 
dlm_recover_waiters_pre(struct dlm_ls * ls)3945 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3946 {
3947 	struct dlm_lkb *lkb, *safe;
3948 	int wait_type, stub_unlock_result, stub_cancel_result;
3949 
3950 	mutex_lock(&ls->ls_waiters_mutex);
3951 
3952 	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3953 		log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3954 			  lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3955 
3956 		/* all outstanding lookups, regardless of destination  will be
3957 		   resent after recovery is done */
3958 
3959 		if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3960 			lkb->lkb_flags |= DLM_IFL_RESEND;
3961 			continue;
3962 		}
3963 
3964 		if (!waiter_needs_recovery(ls, lkb))
3965 			continue;
3966 
3967 		wait_type = lkb->lkb_wait_type;
3968 		stub_unlock_result = -DLM_EUNLOCK;
3969 		stub_cancel_result = -DLM_ECANCEL;
3970 
3971 		/* Main reply may have been received leaving a zero wait_type,
3972 		   but a reply for the overlapping op may not have been
3973 		   received.  In that case we need to fake the appropriate
3974 		   reply for the overlap op. */
3975 
3976 		if (!wait_type) {
3977 			if (is_overlap_cancel(lkb)) {
3978 				wait_type = DLM_MSG_CANCEL;
3979 				if (lkb->lkb_grmode == DLM_LOCK_IV)
3980 					stub_cancel_result = 0;
3981 			}
3982 			if (is_overlap_unlock(lkb)) {
3983 				wait_type = DLM_MSG_UNLOCK;
3984 				if (lkb->lkb_grmode == DLM_LOCK_IV)
3985 					stub_unlock_result = -ENOENT;
3986 			}
3987 
3988 			log_debug(ls, "rwpre overlap %x %x %d %d %d",
3989 				  lkb->lkb_id, lkb->lkb_flags, wait_type,
3990 				  stub_cancel_result, stub_unlock_result);
3991 		}
3992 
3993 		switch (wait_type) {
3994 
3995 		case DLM_MSG_REQUEST:
3996 			lkb->lkb_flags |= DLM_IFL_RESEND;
3997 			break;
3998 
3999 		case DLM_MSG_CONVERT:
4000 			recover_convert_waiter(ls, lkb);
4001 			break;
4002 
4003 		case DLM_MSG_UNLOCK:
4004 			hold_lkb(lkb);
4005 			ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
4006 			ls->ls_stub_ms.m_result = stub_unlock_result;
4007 			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4008 			ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4009 			_receive_unlock_reply(lkb, &ls->ls_stub_ms);
4010 			dlm_put_lkb(lkb);
4011 			break;
4012 
4013 		case DLM_MSG_CANCEL:
4014 			hold_lkb(lkb);
4015 			ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
4016 			ls->ls_stub_ms.m_result = stub_cancel_result;
4017 			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4018 			ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4019 			_receive_cancel_reply(lkb, &ls->ls_stub_ms);
4020 			dlm_put_lkb(lkb);
4021 			break;
4022 
4023 		default:
4024 			log_error(ls, "invalid lkb wait_type %d %d",
4025 				  lkb->lkb_wait_type, wait_type);
4026 		}
4027 		schedule();
4028 	}
4029 	mutex_unlock(&ls->ls_waiters_mutex);
4030 }
4031 
find_resend_waiter(struct dlm_ls * ls)4032 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4033 {
4034 	struct dlm_lkb *lkb;
4035 	int found = 0;
4036 
4037 	mutex_lock(&ls->ls_waiters_mutex);
4038 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4039 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
4040 			hold_lkb(lkb);
4041 			found = 1;
4042 			break;
4043 		}
4044 	}
4045 	mutex_unlock(&ls->ls_waiters_mutex);
4046 
4047 	if (!found)
4048 		lkb = NULL;
4049 	return lkb;
4050 }
4051 
4052 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
4053    master or dir-node for r.  Processing the lkb may result in it being placed
4054    back on waiters. */
4055 
4056 /* We do this after normal locking has been enabled and any saved messages
4057    (in requestqueue) have been processed.  We should be confident that at
4058    this point we won't get or process a reply to any of these waiting
4059    operations.  But, new ops may be coming in on the rsbs/locks here from
4060    userspace or remotely. */
4061 
4062 /* there may have been an overlap unlock/cancel prior to recovery or after
4063    recovery.  if before, the lkb may still have a pos wait_count; if after, the
4064    overlap flag would just have been set and nothing new sent.  we can be
4065    confident here than any replies to either the initial op or overlap ops
4066    prior to recovery have been received. */
4067 
dlm_recover_waiters_post(struct dlm_ls * ls)4068 int dlm_recover_waiters_post(struct dlm_ls *ls)
4069 {
4070 	struct dlm_lkb *lkb;
4071 	struct dlm_rsb *r;
4072 	int error = 0, mstype, err, oc, ou;
4073 
4074 	while (1) {
4075 		if (dlm_locking_stopped(ls)) {
4076 			log_debug(ls, "recover_waiters_post aborted");
4077 			error = -EINTR;
4078 			break;
4079 		}
4080 
4081 		lkb = find_resend_waiter(ls);
4082 		if (!lkb)
4083 			break;
4084 
4085 		r = lkb->lkb_resource;
4086 		hold_rsb(r);
4087 		lock_rsb(r);
4088 
4089 		mstype = lkb->lkb_wait_type;
4090 		oc = is_overlap_cancel(lkb);
4091 		ou = is_overlap_unlock(lkb);
4092 		err = 0;
4093 
4094 		log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
4095 			  lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
4096 
4097 		/* At this point we assume that we won't get a reply to any
4098 		   previous op or overlap op on this lock.  First, do a big
4099 		   remove_from_waiters() for all previous ops. */
4100 
4101 		lkb->lkb_flags &= ~DLM_IFL_RESEND;
4102 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4103 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4104 		lkb->lkb_wait_type = 0;
4105 		lkb->lkb_wait_count = 0;
4106 		mutex_lock(&ls->ls_waiters_mutex);
4107 		list_del_init(&lkb->lkb_wait_reply);
4108 		mutex_unlock(&ls->ls_waiters_mutex);
4109 		unhold_lkb(lkb); /* for waiters list */
4110 
4111 		if (oc || ou) {
4112 			/* do an unlock or cancel instead of resending */
4113 			switch (mstype) {
4114 			case DLM_MSG_LOOKUP:
4115 			case DLM_MSG_REQUEST:
4116 				queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4117 							-DLM_ECANCEL);
4118 				unhold_lkb(lkb); /* undoes create_lkb() */
4119 				break;
4120 			case DLM_MSG_CONVERT:
4121 				if (oc) {
4122 					queue_cast(r, lkb, -DLM_ECANCEL);
4123 				} else {
4124 					lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4125 					_unlock_lock(r, lkb);
4126 				}
4127 				break;
4128 			default:
4129 				err = 1;
4130 			}
4131 		} else {
4132 			switch (mstype) {
4133 			case DLM_MSG_LOOKUP:
4134 			case DLM_MSG_REQUEST:
4135 				_request_lock(r, lkb);
4136 				if (is_master(r))
4137 					confirm_master(r, 0);
4138 				break;
4139 			case DLM_MSG_CONVERT:
4140 				_convert_lock(r, lkb);
4141 				break;
4142 			default:
4143 				err = 1;
4144 			}
4145 		}
4146 
4147 		if (err)
4148 			log_error(ls, "recover_waiters_post %x %d %x %d %d",
4149 			  	  lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4150 		unlock_rsb(r);
4151 		put_rsb(r);
4152 		dlm_put_lkb(lkb);
4153 	}
4154 
4155 	return error;
4156 }
4157 
purge_queue(struct dlm_rsb * r,struct list_head * queue,int (* test)(struct dlm_ls * ls,struct dlm_lkb * lkb))4158 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4159 			int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4160 {
4161 	struct dlm_ls *ls = r->res_ls;
4162 	struct dlm_lkb *lkb, *safe;
4163 
4164 	list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4165 		if (test(ls, lkb)) {
4166 			rsb_set_flag(r, RSB_LOCKS_PURGED);
4167 			del_lkb(r, lkb);
4168 			/* this put should free the lkb */
4169 			if (!dlm_put_lkb(lkb))
4170 				log_error(ls, "purged lkb not released");
4171 		}
4172 	}
4173 }
4174 
purge_dead_test(struct dlm_ls * ls,struct dlm_lkb * lkb)4175 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4176 {
4177 	return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4178 }
4179 
purge_mstcpy_test(struct dlm_ls * ls,struct dlm_lkb * lkb)4180 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4181 {
4182 	return is_master_copy(lkb);
4183 }
4184 
purge_dead_locks(struct dlm_rsb * r)4185 static void purge_dead_locks(struct dlm_rsb *r)
4186 {
4187 	purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4188 	purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4189 	purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4190 }
4191 
dlm_purge_mstcpy_locks(struct dlm_rsb * r)4192 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4193 {
4194 	purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4195 	purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4196 	purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4197 }
4198 
4199 /* Get rid of locks held by nodes that are gone. */
4200 
dlm_purge_locks(struct dlm_ls * ls)4201 int dlm_purge_locks(struct dlm_ls *ls)
4202 {
4203 	struct dlm_rsb *r;
4204 
4205 	log_debug(ls, "dlm_purge_locks");
4206 
4207 	down_write(&ls->ls_root_sem);
4208 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4209 		hold_rsb(r);
4210 		lock_rsb(r);
4211 		if (is_master(r))
4212 			purge_dead_locks(r);
4213 		unlock_rsb(r);
4214 		unhold_rsb(r);
4215 
4216 		schedule();
4217 	}
4218 	up_write(&ls->ls_root_sem);
4219 
4220 	return 0;
4221 }
4222 
find_purged_rsb(struct dlm_ls * ls,int bucket)4223 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4224 {
4225 	struct dlm_rsb *r, *r_ret = NULL;
4226 
4227 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
4228 	list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4229 		if (!rsb_flag(r, RSB_LOCKS_PURGED))
4230 			continue;
4231 		hold_rsb(r);
4232 		rsb_clear_flag(r, RSB_LOCKS_PURGED);
4233 		r_ret = r;
4234 		break;
4235 	}
4236 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4237 	return r_ret;
4238 }
4239 
dlm_grant_after_purge(struct dlm_ls * ls)4240 void dlm_grant_after_purge(struct dlm_ls *ls)
4241 {
4242 	struct dlm_rsb *r;
4243 	int bucket = 0;
4244 
4245 	while (1) {
4246 		r = find_purged_rsb(ls, bucket);
4247 		if (!r) {
4248 			if (bucket == ls->ls_rsbtbl_size - 1)
4249 				break;
4250 			bucket++;
4251 			continue;
4252 		}
4253 		lock_rsb(r);
4254 		if (is_master(r)) {
4255 			grant_pending_locks(r);
4256 			confirm_master(r, 0);
4257 		}
4258 		unlock_rsb(r);
4259 		put_rsb(r);
4260 		schedule();
4261 	}
4262 }
4263 
search_remid_list(struct list_head * head,int nodeid,uint32_t remid)4264 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4265 					 uint32_t remid)
4266 {
4267 	struct dlm_lkb *lkb;
4268 
4269 	list_for_each_entry(lkb, head, lkb_statequeue) {
4270 		if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4271 			return lkb;
4272 	}
4273 	return NULL;
4274 }
4275 
search_remid(struct dlm_rsb * r,int nodeid,uint32_t remid)4276 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4277 				    uint32_t remid)
4278 {
4279 	struct dlm_lkb *lkb;
4280 
4281 	lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4282 	if (lkb)
4283 		return lkb;
4284 	lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4285 	if (lkb)
4286 		return lkb;
4287 	lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4288 	if (lkb)
4289 		return lkb;
4290 	return NULL;
4291 }
4292 
4293 /* needs at least dlm_rcom + rcom_lock */
receive_rcom_lock_args(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_rsb * r,struct dlm_rcom * rc)4294 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4295 				  struct dlm_rsb *r, struct dlm_rcom *rc)
4296 {
4297 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4298 
4299 	lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4300 	lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4301 	lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4302 	lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4303 	lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4304 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
4305 	lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4306 	lkb->lkb_rqmode = rl->rl_rqmode;
4307 	lkb->lkb_grmode = rl->rl_grmode;
4308 	/* don't set lkb_status because add_lkb wants to itself */
4309 
4310 	lkb->lkb_bastfn = (rl->rl_asts & AST_BAST) ? &fake_bastfn : NULL;
4311 	lkb->lkb_astfn = (rl->rl_asts & AST_COMP) ? &fake_astfn : NULL;
4312 
4313 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4314 		int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4315 			 sizeof(struct rcom_lock);
4316 		if (lvblen > ls->ls_lvblen)
4317 			return -EINVAL;
4318 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4319 		if (!lkb->lkb_lvbptr)
4320 			return -ENOMEM;
4321 		memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4322 	}
4323 
4324 	/* Conversions between PR and CW (middle modes) need special handling.
4325 	   The real granted mode of these converting locks cannot be determined
4326 	   until all locks have been rebuilt on the rsb (recover_conversion) */
4327 
4328 	if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4329 	    middle_conversion(lkb)) {
4330 		rl->rl_status = DLM_LKSTS_CONVERT;
4331 		lkb->lkb_grmode = DLM_LOCK_IV;
4332 		rsb_set_flag(r, RSB_RECOVER_CONVERT);
4333 	}
4334 
4335 	return 0;
4336 }
4337 
4338 /* This lkb may have been recovered in a previous aborted recovery so we need
4339    to check if the rsb already has an lkb with the given remote nodeid/lkid.
4340    If so we just send back a standard reply.  If not, we create a new lkb with
4341    the given values and send back our lkid.  We send back our lkid by sending
4342    back the rcom_lock struct we got but with the remid field filled in. */
4343 
4344 /* needs at least dlm_rcom + rcom_lock */
dlm_recover_master_copy(struct dlm_ls * ls,struct dlm_rcom * rc)4345 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4346 {
4347 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4348 	struct dlm_rsb *r;
4349 	struct dlm_lkb *lkb;
4350 	int error;
4351 
4352 	if (rl->rl_parent_lkid) {
4353 		error = -EOPNOTSUPP;
4354 		goto out;
4355 	}
4356 
4357 	error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4358 			 R_MASTER, &r);
4359 	if (error)
4360 		goto out;
4361 
4362 	lock_rsb(r);
4363 
4364 	lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
4365 	if (lkb) {
4366 		error = -EEXIST;
4367 		goto out_remid;
4368 	}
4369 
4370 	error = create_lkb(ls, &lkb);
4371 	if (error)
4372 		goto out_unlock;
4373 
4374 	error = receive_rcom_lock_args(ls, lkb, r, rc);
4375 	if (error) {
4376 		__put_lkb(ls, lkb);
4377 		goto out_unlock;
4378 	}
4379 
4380 	attach_lkb(r, lkb);
4381 	add_lkb(r, lkb, rl->rl_status);
4382 	error = 0;
4383 
4384  out_remid:
4385 	/* this is the new value returned to the lock holder for
4386 	   saving in its process-copy lkb */
4387 	rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4388 
4389  out_unlock:
4390 	unlock_rsb(r);
4391 	put_rsb(r);
4392  out:
4393 	if (error)
4394 		log_debug(ls, "recover_master_copy %d %x", error,
4395 			  le32_to_cpu(rl->rl_lkid));
4396 	rl->rl_result = cpu_to_le32(error);
4397 	return error;
4398 }
4399 
4400 /* needs at least dlm_rcom + rcom_lock */
dlm_recover_process_copy(struct dlm_ls * ls,struct dlm_rcom * rc)4401 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4402 {
4403 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4404 	struct dlm_rsb *r;
4405 	struct dlm_lkb *lkb;
4406 	int error;
4407 
4408 	error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
4409 	if (error) {
4410 		log_error(ls, "recover_process_copy no lkid %x",
4411 				le32_to_cpu(rl->rl_lkid));
4412 		return error;
4413 	}
4414 
4415 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4416 
4417 	error = le32_to_cpu(rl->rl_result);
4418 
4419 	r = lkb->lkb_resource;
4420 	hold_rsb(r);
4421 	lock_rsb(r);
4422 
4423 	switch (error) {
4424 	case -EBADR:
4425 		/* There's a chance the new master received our lock before
4426 		   dlm_recover_master_reply(), this wouldn't happen if we did
4427 		   a barrier between recover_masters and recover_locks. */
4428 		log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4429 			  (unsigned long)r, r->res_name);
4430 		dlm_send_rcom_lock(r, lkb);
4431 		goto out;
4432 	case -EEXIST:
4433 		log_debug(ls, "master copy exists %x", lkb->lkb_id);
4434 		/* fall through */
4435 	case 0:
4436 		lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
4437 		break;
4438 	default:
4439 		log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4440 			  error, lkb->lkb_id);
4441 	}
4442 
4443 	/* an ack for dlm_recover_locks() which waits for replies from
4444 	   all the locks it sends to new masters */
4445 	dlm_recovered_lock(r);
4446  out:
4447 	unlock_rsb(r);
4448 	put_rsb(r);
4449 	dlm_put_lkb(lkb);
4450 
4451 	return 0;
4452 }
4453 
dlm_user_request(struct dlm_ls * ls,struct dlm_user_args * ua,int mode,uint32_t flags,void * name,unsigned int namelen,unsigned long timeout_cs)4454 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4455 		     int mode, uint32_t flags, void *name, unsigned int namelen,
4456 		     unsigned long timeout_cs)
4457 {
4458 	struct dlm_lkb *lkb;
4459 	struct dlm_args args;
4460 	int error;
4461 
4462 	dlm_lock_recovery(ls);
4463 
4464 	error = create_lkb(ls, &lkb);
4465 	if (error) {
4466 		kfree(ua);
4467 		goto out;
4468 	}
4469 
4470 	if (flags & DLM_LKF_VALBLK) {
4471 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4472 		if (!ua->lksb.sb_lvbptr) {
4473 			kfree(ua);
4474 			__put_lkb(ls, lkb);
4475 			error = -ENOMEM;
4476 			goto out;
4477 		}
4478 	}
4479 
4480 	/* After ua is attached to lkb it will be freed by dlm_free_lkb().
4481 	   When DLM_IFL_USER is set, the dlm knows that this is a userspace
4482 	   lock and that lkb_astparam is the dlm_user_args structure. */
4483 
4484 	error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4485 			      fake_astfn, ua, fake_bastfn, &args);
4486 	lkb->lkb_flags |= DLM_IFL_USER;
4487 	ua->old_mode = DLM_LOCK_IV;
4488 
4489 	if (error) {
4490 		__put_lkb(ls, lkb);
4491 		goto out;
4492 	}
4493 
4494 	error = request_lock(ls, lkb, name, namelen, &args);
4495 
4496 	switch (error) {
4497 	case 0:
4498 		break;
4499 	case -EINPROGRESS:
4500 		error = 0;
4501 		break;
4502 	case -EAGAIN:
4503 		error = 0;
4504 		/* fall through */
4505 	default:
4506 		__put_lkb(ls, lkb);
4507 		goto out;
4508 	}
4509 
4510 	/* add this new lkb to the per-process list of locks */
4511 	spin_lock(&ua->proc->locks_spin);
4512 	hold_lkb(lkb);
4513 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4514 	spin_unlock(&ua->proc->locks_spin);
4515  out:
4516 	dlm_unlock_recovery(ls);
4517 	return error;
4518 }
4519 
dlm_user_convert(struct dlm_ls * ls,struct dlm_user_args * ua_tmp,int mode,uint32_t flags,uint32_t lkid,char * lvb_in,unsigned long timeout_cs)4520 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4521 		     int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4522 		     unsigned long timeout_cs)
4523 {
4524 	struct dlm_lkb *lkb;
4525 	struct dlm_args args;
4526 	struct dlm_user_args *ua;
4527 	int error;
4528 
4529 	dlm_lock_recovery(ls);
4530 
4531 	error = find_lkb(ls, lkid, &lkb);
4532 	if (error)
4533 		goto out;
4534 
4535 	/* user can change the params on its lock when it converts it, or
4536 	   add an lvb that didn't exist before */
4537 
4538 	ua = lkb->lkb_ua;
4539 
4540 	if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4541 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4542 		if (!ua->lksb.sb_lvbptr) {
4543 			error = -ENOMEM;
4544 			goto out_put;
4545 		}
4546 	}
4547 	if (lvb_in && ua->lksb.sb_lvbptr)
4548 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4549 
4550 	ua->xid = ua_tmp->xid;
4551 	ua->castparam = ua_tmp->castparam;
4552 	ua->castaddr = ua_tmp->castaddr;
4553 	ua->bastparam = ua_tmp->bastparam;
4554 	ua->bastaddr = ua_tmp->bastaddr;
4555 	ua->user_lksb = ua_tmp->user_lksb;
4556 	ua->old_mode = lkb->lkb_grmode;
4557 
4558 	error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4559 			      fake_astfn, ua, fake_bastfn, &args);
4560 	if (error)
4561 		goto out_put;
4562 
4563 	error = convert_lock(ls, lkb, &args);
4564 
4565 	if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4566 		error = 0;
4567  out_put:
4568 	dlm_put_lkb(lkb);
4569  out:
4570 	dlm_unlock_recovery(ls);
4571 	kfree(ua_tmp);
4572 	return error;
4573 }
4574 
dlm_user_unlock(struct dlm_ls * ls,struct dlm_user_args * ua_tmp,uint32_t flags,uint32_t lkid,char * lvb_in)4575 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4576 		    uint32_t flags, uint32_t lkid, char *lvb_in)
4577 {
4578 	struct dlm_lkb *lkb;
4579 	struct dlm_args args;
4580 	struct dlm_user_args *ua;
4581 	int error;
4582 
4583 	dlm_lock_recovery(ls);
4584 
4585 	error = find_lkb(ls, lkid, &lkb);
4586 	if (error)
4587 		goto out;
4588 
4589 	ua = lkb->lkb_ua;
4590 
4591 	if (lvb_in && ua->lksb.sb_lvbptr)
4592 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4593 	if (ua_tmp->castparam)
4594 		ua->castparam = ua_tmp->castparam;
4595 	ua->user_lksb = ua_tmp->user_lksb;
4596 
4597 	error = set_unlock_args(flags, ua, &args);
4598 	if (error)
4599 		goto out_put;
4600 
4601 	error = unlock_lock(ls, lkb, &args);
4602 
4603 	if (error == -DLM_EUNLOCK)
4604 		error = 0;
4605 	/* from validate_unlock_args() */
4606 	if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4607 		error = 0;
4608 	if (error)
4609 		goto out_put;
4610 
4611 	spin_lock(&ua->proc->locks_spin);
4612 	/* dlm_user_add_ast() may have already taken lkb off the proc list */
4613 	if (!list_empty(&lkb->lkb_ownqueue))
4614 		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4615 	spin_unlock(&ua->proc->locks_spin);
4616  out_put:
4617 	dlm_put_lkb(lkb);
4618  out:
4619 	dlm_unlock_recovery(ls);
4620 	kfree(ua_tmp);
4621 	return error;
4622 }
4623 
dlm_user_cancel(struct dlm_ls * ls,struct dlm_user_args * ua_tmp,uint32_t flags,uint32_t lkid)4624 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4625 		    uint32_t flags, uint32_t lkid)
4626 {
4627 	struct dlm_lkb *lkb;
4628 	struct dlm_args args;
4629 	struct dlm_user_args *ua;
4630 	int error;
4631 
4632 	dlm_lock_recovery(ls);
4633 
4634 	error = find_lkb(ls, lkid, &lkb);
4635 	if (error)
4636 		goto out;
4637 
4638 	ua = lkb->lkb_ua;
4639 	if (ua_tmp->castparam)
4640 		ua->castparam = ua_tmp->castparam;
4641 	ua->user_lksb = ua_tmp->user_lksb;
4642 
4643 	error = set_unlock_args(flags, ua, &args);
4644 	if (error)
4645 		goto out_put;
4646 
4647 	error = cancel_lock(ls, lkb, &args);
4648 
4649 	if (error == -DLM_ECANCEL)
4650 		error = 0;
4651 	/* from validate_unlock_args() */
4652 	if (error == -EBUSY)
4653 		error = 0;
4654  out_put:
4655 	dlm_put_lkb(lkb);
4656  out:
4657 	dlm_unlock_recovery(ls);
4658 	kfree(ua_tmp);
4659 	return error;
4660 }
4661 
dlm_user_deadlock(struct dlm_ls * ls,uint32_t flags,uint32_t lkid)4662 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4663 {
4664 	struct dlm_lkb *lkb;
4665 	struct dlm_args args;
4666 	struct dlm_user_args *ua;
4667 	struct dlm_rsb *r;
4668 	int error;
4669 
4670 	dlm_lock_recovery(ls);
4671 
4672 	error = find_lkb(ls, lkid, &lkb);
4673 	if (error)
4674 		goto out;
4675 
4676 	ua = lkb->lkb_ua;
4677 
4678 	error = set_unlock_args(flags, ua, &args);
4679 	if (error)
4680 		goto out_put;
4681 
4682 	/* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4683 
4684 	r = lkb->lkb_resource;
4685 	hold_rsb(r);
4686 	lock_rsb(r);
4687 
4688 	error = validate_unlock_args(lkb, &args);
4689 	if (error)
4690 		goto out_r;
4691 	lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4692 
4693 	error = _cancel_lock(r, lkb);
4694  out_r:
4695 	unlock_rsb(r);
4696 	put_rsb(r);
4697 
4698 	if (error == -DLM_ECANCEL)
4699 		error = 0;
4700 	/* from validate_unlock_args() */
4701 	if (error == -EBUSY)
4702 		error = 0;
4703  out_put:
4704 	dlm_put_lkb(lkb);
4705  out:
4706 	dlm_unlock_recovery(ls);
4707 	return error;
4708 }
4709 
4710 /* lkb's that are removed from the waiters list by revert are just left on the
4711    orphans list with the granted orphan locks, to be freed by purge */
4712 
orphan_proc_lock(struct dlm_ls * ls,struct dlm_lkb * lkb)4713 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4714 {
4715 	struct dlm_args args;
4716 	int error;
4717 
4718 	hold_lkb(lkb);
4719 	mutex_lock(&ls->ls_orphans_mutex);
4720 	list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4721 	mutex_unlock(&ls->ls_orphans_mutex);
4722 
4723 	set_unlock_args(0, lkb->lkb_ua, &args);
4724 
4725 	error = cancel_lock(ls, lkb, &args);
4726 	if (error == -DLM_ECANCEL)
4727 		error = 0;
4728 	return error;
4729 }
4730 
4731 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4732    Regardless of what rsb queue the lock is on, it's removed and freed. */
4733 
unlock_proc_lock(struct dlm_ls * ls,struct dlm_lkb * lkb)4734 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4735 {
4736 	struct dlm_args args;
4737 	int error;
4738 
4739 	set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
4740 
4741 	error = unlock_lock(ls, lkb, &args);
4742 	if (error == -DLM_EUNLOCK)
4743 		error = 0;
4744 	return error;
4745 }
4746 
4747 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4748    (which does lock_rsb) due to deadlock with receiving a message that does
4749    lock_rsb followed by dlm_user_add_ast() */
4750 
del_proc_lock(struct dlm_ls * ls,struct dlm_user_proc * proc)4751 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4752 				     struct dlm_user_proc *proc)
4753 {
4754 	struct dlm_lkb *lkb = NULL;
4755 
4756 	mutex_lock(&ls->ls_clear_proc_locks);
4757 	if (list_empty(&proc->locks))
4758 		goto out;
4759 
4760 	lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4761 	list_del_init(&lkb->lkb_ownqueue);
4762 
4763 	if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4764 		lkb->lkb_flags |= DLM_IFL_ORPHAN;
4765 	else
4766 		lkb->lkb_flags |= DLM_IFL_DEAD;
4767  out:
4768 	mutex_unlock(&ls->ls_clear_proc_locks);
4769 	return lkb;
4770 }
4771 
4772 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4773    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4774    which we clear here. */
4775 
4776 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4777    list, and no more device_writes should add lkb's to proc->locks list; so we
4778    shouldn't need to take asts_spin or locks_spin here.  this assumes that
4779    device reads/writes/closes are serialized -- FIXME: we may need to serialize
4780    them ourself. */
4781 
dlm_clear_proc_locks(struct dlm_ls * ls,struct dlm_user_proc * proc)4782 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4783 {
4784 	struct dlm_lkb *lkb, *safe;
4785 
4786 	dlm_lock_recovery(ls);
4787 
4788 	while (1) {
4789 		lkb = del_proc_lock(ls, proc);
4790 		if (!lkb)
4791 			break;
4792 		del_timeout(lkb);
4793 		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4794 			orphan_proc_lock(ls, lkb);
4795 		else
4796 			unlock_proc_lock(ls, lkb);
4797 
4798 		/* this removes the reference for the proc->locks list
4799 		   added by dlm_user_request, it may result in the lkb
4800 		   being freed */
4801 
4802 		dlm_put_lkb(lkb);
4803 	}
4804 
4805 	mutex_lock(&ls->ls_clear_proc_locks);
4806 
4807 	/* in-progress unlocks */
4808 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4809 		list_del_init(&lkb->lkb_ownqueue);
4810 		lkb->lkb_flags |= DLM_IFL_DEAD;
4811 		dlm_put_lkb(lkb);
4812 	}
4813 
4814 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4815 		lkb->lkb_ast_type = 0;
4816 		list_del(&lkb->lkb_astqueue);
4817 		dlm_put_lkb(lkb);
4818 	}
4819 
4820 	mutex_unlock(&ls->ls_clear_proc_locks);
4821 	dlm_unlock_recovery(ls);
4822 }
4823 
purge_proc_locks(struct dlm_ls * ls,struct dlm_user_proc * proc)4824 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4825 {
4826 	struct dlm_lkb *lkb, *safe;
4827 
4828 	while (1) {
4829 		lkb = NULL;
4830 		spin_lock(&proc->locks_spin);
4831 		if (!list_empty(&proc->locks)) {
4832 			lkb = list_entry(proc->locks.next, struct dlm_lkb,
4833 					 lkb_ownqueue);
4834 			list_del_init(&lkb->lkb_ownqueue);
4835 		}
4836 		spin_unlock(&proc->locks_spin);
4837 
4838 		if (!lkb)
4839 			break;
4840 
4841 		lkb->lkb_flags |= DLM_IFL_DEAD;
4842 		unlock_proc_lock(ls, lkb);
4843 		dlm_put_lkb(lkb); /* ref from proc->locks list */
4844 	}
4845 
4846 	spin_lock(&proc->locks_spin);
4847 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4848 		list_del_init(&lkb->lkb_ownqueue);
4849 		lkb->lkb_flags |= DLM_IFL_DEAD;
4850 		dlm_put_lkb(lkb);
4851 	}
4852 	spin_unlock(&proc->locks_spin);
4853 
4854 	spin_lock(&proc->asts_spin);
4855 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4856 		list_del(&lkb->lkb_astqueue);
4857 		dlm_put_lkb(lkb);
4858 	}
4859 	spin_unlock(&proc->asts_spin);
4860 }
4861 
4862 /* pid of 0 means purge all orphans */
4863 
do_purge(struct dlm_ls * ls,int nodeid,int pid)4864 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4865 {
4866 	struct dlm_lkb *lkb, *safe;
4867 
4868 	mutex_lock(&ls->ls_orphans_mutex);
4869 	list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4870 		if (pid && lkb->lkb_ownpid != pid)
4871 			continue;
4872 		unlock_proc_lock(ls, lkb);
4873 		list_del_init(&lkb->lkb_ownqueue);
4874 		dlm_put_lkb(lkb);
4875 	}
4876 	mutex_unlock(&ls->ls_orphans_mutex);
4877 }
4878 
send_purge(struct dlm_ls * ls,int nodeid,int pid)4879 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4880 {
4881 	struct dlm_message *ms;
4882 	struct dlm_mhandle *mh;
4883 	int error;
4884 
4885 	error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4886 				DLM_MSG_PURGE, &ms, &mh);
4887 	if (error)
4888 		return error;
4889 	ms->m_nodeid = nodeid;
4890 	ms->m_pid = pid;
4891 
4892 	return send_message(mh, ms);
4893 }
4894 
dlm_user_purge(struct dlm_ls * ls,struct dlm_user_proc * proc,int nodeid,int pid)4895 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4896 		   int nodeid, int pid)
4897 {
4898 	int error = 0;
4899 
4900 	if (nodeid != dlm_our_nodeid()) {
4901 		error = send_purge(ls, nodeid, pid);
4902 	} else {
4903 		dlm_lock_recovery(ls);
4904 		if (pid == current->pid)
4905 			purge_proc_locks(ls, proc);
4906 		else
4907 			do_purge(ls, nodeid, pid);
4908 		dlm_unlock_recovery(ls);
4909 	}
4910 	return error;
4911 }
4912 
4913