• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* -*- mode: c; c-basic-offset: 8; -*-
3  * vim: noexpandtab sw=8 ts=8 sts=0:
4  *
5  * userdlm.c
6  *
7  * Code which implements the kernel side of a minimal userspace
8  * interface to our DLM.
9  *
10  * Many of the functions here are pared down versions of dlmglue.c
11  * functions.
12  *
13  * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
14  */
15 
16 #include <linux/signal.h>
17 #include <linux/sched/signal.h>
18 
19 #include <linux/module.h>
20 #include <linux/fs.h>
21 #include <linux/types.h>
22 #include <linux/crc32.h>
23 
24 #include "../ocfs2_lockingver.h"
25 #include "../stackglue.h"
26 #include "userdlm.h"
27 
28 #define MLOG_MASK_PREFIX ML_DLMFS
29 #include "../cluster/masklog.h"
30 
31 
user_lksb_to_lock_res(struct ocfs2_dlm_lksb * lksb)32 static inline struct user_lock_res *user_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb)
33 {
34 	return container_of(lksb, struct user_lock_res, l_lksb);
35 }
36 
user_check_wait_flag(struct user_lock_res * lockres,int flag)37 static inline int user_check_wait_flag(struct user_lock_res *lockres,
38 				       int flag)
39 {
40 	int ret;
41 
42 	spin_lock(&lockres->l_lock);
43 	ret = lockres->l_flags & flag;
44 	spin_unlock(&lockres->l_lock);
45 
46 	return ret;
47 }
48 
user_wait_on_busy_lock(struct user_lock_res * lockres)49 static inline void user_wait_on_busy_lock(struct user_lock_res *lockres)
50 
51 {
52 	wait_event(lockres->l_event,
53 		   !user_check_wait_flag(lockres, USER_LOCK_BUSY));
54 }
55 
user_wait_on_blocked_lock(struct user_lock_res * lockres)56 static inline void user_wait_on_blocked_lock(struct user_lock_res *lockres)
57 
58 {
59 	wait_event(lockres->l_event,
60 		   !user_check_wait_flag(lockres, USER_LOCK_BLOCKED));
61 }
62 
63 /* I heart container_of... */
64 static inline struct ocfs2_cluster_connection *
cluster_connection_from_user_lockres(struct user_lock_res * lockres)65 cluster_connection_from_user_lockres(struct user_lock_res *lockres)
66 {
67 	struct dlmfs_inode_private *ip;
68 
69 	ip = container_of(lockres,
70 			  struct dlmfs_inode_private,
71 			  ip_lockres);
72 	return ip->ip_conn;
73 }
74 
75 static struct inode *
user_dlm_inode_from_user_lockres(struct user_lock_res * lockres)76 user_dlm_inode_from_user_lockres(struct user_lock_res *lockres)
77 {
78 	struct dlmfs_inode_private *ip;
79 
80 	ip = container_of(lockres,
81 			  struct dlmfs_inode_private,
82 			  ip_lockres);
83 	return &ip->ip_vfs_inode;
84 }
85 
user_recover_from_dlm_error(struct user_lock_res * lockres)86 static inline void user_recover_from_dlm_error(struct user_lock_res *lockres)
87 {
88 	spin_lock(&lockres->l_lock);
89 	lockres->l_flags &= ~USER_LOCK_BUSY;
90 	spin_unlock(&lockres->l_lock);
91 }
92 
93 #define user_log_dlm_error(_func, _stat, _lockres) do {			\
94 	mlog(ML_ERROR, "Dlm error %d while calling %s on "		\
95 		"resource %.*s\n", _stat, _func,			\
96 		_lockres->l_namelen, _lockres->l_name); 		\
97 } while (0)
98 
99 /* WARNING: This function lives in a world where the only three lock
100  * levels are EX, PR, and NL. It *will* have to be adjusted when more
101  * lock types are added. */
user_highest_compat_lock_level(int level)102 static inline int user_highest_compat_lock_level(int level)
103 {
104 	int new_level = DLM_LOCK_EX;
105 
106 	if (level == DLM_LOCK_EX)
107 		new_level = DLM_LOCK_NL;
108 	else if (level == DLM_LOCK_PR)
109 		new_level = DLM_LOCK_PR;
110 	return new_level;
111 }
112 
user_ast(struct ocfs2_dlm_lksb * lksb)113 static void user_ast(struct ocfs2_dlm_lksb *lksb)
114 {
115 	struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);
116 	int status;
117 
118 	mlog(ML_BASTS, "AST fired for lockres %.*s, level %d => %d\n",
119 	     lockres->l_namelen, lockres->l_name, lockres->l_level,
120 	     lockres->l_requested);
121 
122 	spin_lock(&lockres->l_lock);
123 
124 	status = ocfs2_dlm_lock_status(&lockres->l_lksb);
125 	if (status) {
126 		mlog(ML_ERROR, "lksb status value of %u on lockres %.*s\n",
127 		     status, lockres->l_namelen, lockres->l_name);
128 		spin_unlock(&lockres->l_lock);
129 		return;
130 	}
131 
132 	mlog_bug_on_msg(lockres->l_requested == DLM_LOCK_IV,
133 			"Lockres %.*s, requested ivmode. flags 0x%x\n",
134 			lockres->l_namelen, lockres->l_name, lockres->l_flags);
135 
136 	/* we're downconverting. */
137 	if (lockres->l_requested < lockres->l_level) {
138 		if (lockres->l_requested <=
139 		    user_highest_compat_lock_level(lockres->l_blocking)) {
140 			lockres->l_blocking = DLM_LOCK_NL;
141 			lockres->l_flags &= ~USER_LOCK_BLOCKED;
142 		}
143 	}
144 
145 	lockres->l_level = lockres->l_requested;
146 	lockres->l_requested = DLM_LOCK_IV;
147 	lockres->l_flags |= USER_LOCK_ATTACHED;
148 	lockres->l_flags &= ~USER_LOCK_BUSY;
149 
150 	spin_unlock(&lockres->l_lock);
151 
152 	wake_up(&lockres->l_event);
153 }
154 
user_dlm_grab_inode_ref(struct user_lock_res * lockres)155 static inline void user_dlm_grab_inode_ref(struct user_lock_res *lockres)
156 {
157 	struct inode *inode;
158 	inode = user_dlm_inode_from_user_lockres(lockres);
159 	if (!igrab(inode))
160 		BUG();
161 }
162 
163 static void user_dlm_unblock_lock(struct work_struct *work);
164 
__user_dlm_queue_lockres(struct user_lock_res * lockres)165 static void __user_dlm_queue_lockres(struct user_lock_res *lockres)
166 {
167 	if (!(lockres->l_flags & USER_LOCK_QUEUED)) {
168 		user_dlm_grab_inode_ref(lockres);
169 
170 		INIT_WORK(&lockres->l_work, user_dlm_unblock_lock);
171 
172 		queue_work(user_dlm_worker, &lockres->l_work);
173 		lockres->l_flags |= USER_LOCK_QUEUED;
174 	}
175 }
176 
__user_dlm_cond_queue_lockres(struct user_lock_res * lockres)177 static void __user_dlm_cond_queue_lockres(struct user_lock_res *lockres)
178 {
179 	int queue = 0;
180 
181 	if (!(lockres->l_flags & USER_LOCK_BLOCKED))
182 		return;
183 
184 	switch (lockres->l_blocking) {
185 	case DLM_LOCK_EX:
186 		if (!lockres->l_ex_holders && !lockres->l_ro_holders)
187 			queue = 1;
188 		break;
189 	case DLM_LOCK_PR:
190 		if (!lockres->l_ex_holders)
191 			queue = 1;
192 		break;
193 	default:
194 		BUG();
195 	}
196 
197 	if (queue)
198 		__user_dlm_queue_lockres(lockres);
199 }
200 
user_bast(struct ocfs2_dlm_lksb * lksb,int level)201 static void user_bast(struct ocfs2_dlm_lksb *lksb, int level)
202 {
203 	struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);
204 
205 	mlog(ML_BASTS, "BAST fired for lockres %.*s, blocking %d, level %d\n",
206 	     lockres->l_namelen, lockres->l_name, level, lockres->l_level);
207 
208 	spin_lock(&lockres->l_lock);
209 	lockres->l_flags |= USER_LOCK_BLOCKED;
210 	if (level > lockres->l_blocking)
211 		lockres->l_blocking = level;
212 
213 	__user_dlm_queue_lockres(lockres);
214 	spin_unlock(&lockres->l_lock);
215 
216 	wake_up(&lockres->l_event);
217 }
218 
user_unlock_ast(struct ocfs2_dlm_lksb * lksb,int status)219 static void user_unlock_ast(struct ocfs2_dlm_lksb *lksb, int status)
220 {
221 	struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);
222 
223 	mlog(ML_BASTS, "UNLOCK AST fired for lockres %.*s, flags 0x%x\n",
224 	     lockres->l_namelen, lockres->l_name, lockres->l_flags);
225 
226 	if (status)
227 		mlog(ML_ERROR, "dlm returns status %d\n", status);
228 
229 	spin_lock(&lockres->l_lock);
230 	/* The teardown flag gets set early during the unlock process,
231 	 * so test the cancel flag to make sure that this ast isn't
232 	 * for a concurrent cancel. */
233 	if (lockres->l_flags & USER_LOCK_IN_TEARDOWN
234 	    && !(lockres->l_flags & USER_LOCK_IN_CANCEL)) {
235 		lockres->l_level = DLM_LOCK_IV;
236 	} else if (status == DLM_CANCELGRANT) {
237 		/* We tried to cancel a convert request, but it was
238 		 * already granted. Don't clear the busy flag - the
239 		 * ast should've done this already. */
240 		BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL));
241 		lockres->l_flags &= ~USER_LOCK_IN_CANCEL;
242 		goto out_noclear;
243 	} else {
244 		BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL));
245 		/* Cancel succeeded, we want to re-queue */
246 		lockres->l_requested = DLM_LOCK_IV; /* cancel an
247 						    * upconvert
248 						    * request. */
249 		lockres->l_flags &= ~USER_LOCK_IN_CANCEL;
250 		/* we want the unblock thread to look at it again
251 		 * now. */
252 		if (lockres->l_flags & USER_LOCK_BLOCKED)
253 			__user_dlm_queue_lockres(lockres);
254 	}
255 
256 	lockres->l_flags &= ~USER_LOCK_BUSY;
257 out_noclear:
258 	spin_unlock(&lockres->l_lock);
259 
260 	wake_up(&lockres->l_event);
261 }
262 
263 /*
264  * This is the userdlmfs locking protocol version.
265  *
266  * See fs/ocfs2/dlmglue.c for more details on locking versions.
267  */
268 static struct ocfs2_locking_protocol user_dlm_lproto = {
269 	.lp_max_version = {
270 		.pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
271 		.pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
272 	},
273 	.lp_lock_ast		= user_ast,
274 	.lp_blocking_ast	= user_bast,
275 	.lp_unlock_ast		= user_unlock_ast,
276 };
277 
user_dlm_drop_inode_ref(struct user_lock_res * lockres)278 static inline void user_dlm_drop_inode_ref(struct user_lock_res *lockres)
279 {
280 	struct inode *inode;
281 	inode = user_dlm_inode_from_user_lockres(lockres);
282 	iput(inode);
283 }
284 
user_dlm_unblock_lock(struct work_struct * work)285 static void user_dlm_unblock_lock(struct work_struct *work)
286 {
287 	int new_level, status;
288 	struct user_lock_res *lockres =
289 		container_of(work, struct user_lock_res, l_work);
290 	struct ocfs2_cluster_connection *conn =
291 		cluster_connection_from_user_lockres(lockres);
292 
293 	mlog(0, "lockres %.*s\n", lockres->l_namelen, lockres->l_name);
294 
295 	spin_lock(&lockres->l_lock);
296 
297 	mlog_bug_on_msg(!(lockres->l_flags & USER_LOCK_QUEUED),
298 			"Lockres %.*s, flags 0x%x\n",
299 			lockres->l_namelen, lockres->l_name, lockres->l_flags);
300 
301 	/* notice that we don't clear USER_LOCK_BLOCKED here. If it's
302 	 * set, we want user_ast clear it. */
303 	lockres->l_flags &= ~USER_LOCK_QUEUED;
304 
305 	/* It's valid to get here and no longer be blocked - if we get
306 	 * several basts in a row, we might be queued by the first
307 	 * one, the unblock thread might run and clear the queued
308 	 * flag, and finally we might get another bast which re-queues
309 	 * us before our ast for the downconvert is called. */
310 	if (!(lockres->l_flags & USER_LOCK_BLOCKED)) {
311 		mlog(ML_BASTS, "lockres %.*s USER_LOCK_BLOCKED\n",
312 		     lockres->l_namelen, lockres->l_name);
313 		spin_unlock(&lockres->l_lock);
314 		goto drop_ref;
315 	}
316 
317 	if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) {
318 		mlog(ML_BASTS, "lockres %.*s USER_LOCK_IN_TEARDOWN\n",
319 		     lockres->l_namelen, lockres->l_name);
320 		spin_unlock(&lockres->l_lock);
321 		goto drop_ref;
322 	}
323 
324 	if (lockres->l_flags & USER_LOCK_BUSY) {
325 		if (lockres->l_flags & USER_LOCK_IN_CANCEL) {
326 			mlog(ML_BASTS, "lockres %.*s USER_LOCK_IN_CANCEL\n",
327 			     lockres->l_namelen, lockres->l_name);
328 			spin_unlock(&lockres->l_lock);
329 			goto drop_ref;
330 		}
331 
332 		lockres->l_flags |= USER_LOCK_IN_CANCEL;
333 		spin_unlock(&lockres->l_lock);
334 
335 		status = ocfs2_dlm_unlock(conn, &lockres->l_lksb,
336 					  DLM_LKF_CANCEL);
337 		if (status)
338 			user_log_dlm_error("ocfs2_dlm_unlock", status, lockres);
339 		goto drop_ref;
340 	}
341 
342 	/* If there are still incompat holders, we can exit safely
343 	 * without worrying about re-queueing this lock as that will
344 	 * happen on the last call to user_cluster_unlock. */
345 	if ((lockres->l_blocking == DLM_LOCK_EX)
346 	    && (lockres->l_ex_holders || lockres->l_ro_holders)) {
347 		spin_unlock(&lockres->l_lock);
348 		mlog(ML_BASTS, "lockres %.*s, EX/PR Holders %u,%u\n",
349 		     lockres->l_namelen, lockres->l_name,
350 		     lockres->l_ex_holders, lockres->l_ro_holders);
351 		goto drop_ref;
352 	}
353 
354 	if ((lockres->l_blocking == DLM_LOCK_PR)
355 	    && lockres->l_ex_holders) {
356 		spin_unlock(&lockres->l_lock);
357 		mlog(ML_BASTS, "lockres %.*s, EX Holders %u\n",
358 		     lockres->l_namelen, lockres->l_name,
359 		     lockres->l_ex_holders);
360 		goto drop_ref;
361 	}
362 
363 	/* yay, we can downconvert now. */
364 	new_level = user_highest_compat_lock_level(lockres->l_blocking);
365 	lockres->l_requested = new_level;
366 	lockres->l_flags |= USER_LOCK_BUSY;
367 	mlog(ML_BASTS, "lockres %.*s, downconvert %d => %d\n",
368 	     lockres->l_namelen, lockres->l_name, lockres->l_level, new_level);
369 	spin_unlock(&lockres->l_lock);
370 
371 	/* need lock downconvert request now... */
372 	status = ocfs2_dlm_lock(conn, new_level, &lockres->l_lksb,
373 				DLM_LKF_CONVERT|DLM_LKF_VALBLK,
374 				lockres->l_name,
375 				lockres->l_namelen);
376 	if (status) {
377 		user_log_dlm_error("ocfs2_dlm_lock", status, lockres);
378 		user_recover_from_dlm_error(lockres);
379 	}
380 
381 drop_ref:
382 	user_dlm_drop_inode_ref(lockres);
383 }
384 
user_dlm_inc_holders(struct user_lock_res * lockres,int level)385 static inline void user_dlm_inc_holders(struct user_lock_res *lockres,
386 					int level)
387 {
388 	switch(level) {
389 	case DLM_LOCK_EX:
390 		lockres->l_ex_holders++;
391 		break;
392 	case DLM_LOCK_PR:
393 		lockres->l_ro_holders++;
394 		break;
395 	default:
396 		BUG();
397 	}
398 }
399 
400 /* predict what lock level we'll be dropping down to on behalf
401  * of another node, and return true if the currently wanted
402  * level will be compatible with it. */
403 static inline int
user_may_continue_on_blocked_lock(struct user_lock_res * lockres,int wanted)404 user_may_continue_on_blocked_lock(struct user_lock_res *lockres,
405 				  int wanted)
406 {
407 	BUG_ON(!(lockres->l_flags & USER_LOCK_BLOCKED));
408 
409 	return wanted <= user_highest_compat_lock_level(lockres->l_blocking);
410 }
411 
user_dlm_cluster_lock(struct user_lock_res * lockres,int level,int lkm_flags)412 int user_dlm_cluster_lock(struct user_lock_res *lockres,
413 			  int level,
414 			  int lkm_flags)
415 {
416 	int status, local_flags;
417 	struct ocfs2_cluster_connection *conn =
418 		cluster_connection_from_user_lockres(lockres);
419 
420 	if (level != DLM_LOCK_EX &&
421 	    level != DLM_LOCK_PR) {
422 		mlog(ML_ERROR, "lockres %.*s: invalid request!\n",
423 		     lockres->l_namelen, lockres->l_name);
424 		status = -EINVAL;
425 		goto bail;
426 	}
427 
428 	mlog(ML_BASTS, "lockres %.*s, level %d, flags = 0x%x\n",
429 	     lockres->l_namelen, lockres->l_name, level, lkm_flags);
430 
431 again:
432 	if (signal_pending(current)) {
433 		status = -ERESTARTSYS;
434 		goto bail;
435 	}
436 
437 	spin_lock(&lockres->l_lock);
438 	if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) {
439 		spin_unlock(&lockres->l_lock);
440 		status = -EAGAIN;
441 		goto bail;
442 	}
443 
444 	/* We only compare against the currently granted level
445 	 * here. If the lock is blocked waiting on a downconvert,
446 	 * we'll get caught below. */
447 	if ((lockres->l_flags & USER_LOCK_BUSY) &&
448 	    (level > lockres->l_level)) {
449 		/* is someone sitting in dlm_lock? If so, wait on
450 		 * them. */
451 		spin_unlock(&lockres->l_lock);
452 
453 		user_wait_on_busy_lock(lockres);
454 		goto again;
455 	}
456 
457 	if ((lockres->l_flags & USER_LOCK_BLOCKED) &&
458 	    (!user_may_continue_on_blocked_lock(lockres, level))) {
459 		/* is the lock is currently blocked on behalf of
460 		 * another node */
461 		spin_unlock(&lockres->l_lock);
462 
463 		user_wait_on_blocked_lock(lockres);
464 		goto again;
465 	}
466 
467 	if (level > lockres->l_level) {
468 		local_flags = lkm_flags | DLM_LKF_VALBLK;
469 		if (lockres->l_level != DLM_LOCK_IV)
470 			local_flags |= DLM_LKF_CONVERT;
471 
472 		lockres->l_requested = level;
473 		lockres->l_flags |= USER_LOCK_BUSY;
474 		spin_unlock(&lockres->l_lock);
475 
476 		BUG_ON(level == DLM_LOCK_IV);
477 		BUG_ON(level == DLM_LOCK_NL);
478 
479 		/* call dlm_lock to upgrade lock now */
480 		status = ocfs2_dlm_lock(conn, level, &lockres->l_lksb,
481 					local_flags, lockres->l_name,
482 					lockres->l_namelen);
483 		if (status) {
484 			if ((lkm_flags & DLM_LKF_NOQUEUE) &&
485 			    (status != -EAGAIN))
486 				user_log_dlm_error("ocfs2_dlm_lock",
487 						   status, lockres);
488 			user_recover_from_dlm_error(lockres);
489 			goto bail;
490 		}
491 
492 		user_wait_on_busy_lock(lockres);
493 		goto again;
494 	}
495 
496 	user_dlm_inc_holders(lockres, level);
497 	spin_unlock(&lockres->l_lock);
498 
499 	status = 0;
500 bail:
501 	return status;
502 }
503 
user_dlm_dec_holders(struct user_lock_res * lockres,int level)504 static inline void user_dlm_dec_holders(struct user_lock_res *lockres,
505 					int level)
506 {
507 	switch(level) {
508 	case DLM_LOCK_EX:
509 		BUG_ON(!lockres->l_ex_holders);
510 		lockres->l_ex_holders--;
511 		break;
512 	case DLM_LOCK_PR:
513 		BUG_ON(!lockres->l_ro_holders);
514 		lockres->l_ro_holders--;
515 		break;
516 	default:
517 		BUG();
518 	}
519 }
520 
user_dlm_cluster_unlock(struct user_lock_res * lockres,int level)521 void user_dlm_cluster_unlock(struct user_lock_res *lockres,
522 			     int level)
523 {
524 	if (level != DLM_LOCK_EX &&
525 	    level != DLM_LOCK_PR) {
526 		mlog(ML_ERROR, "lockres %.*s: invalid request!\n",
527 		     lockres->l_namelen, lockres->l_name);
528 		return;
529 	}
530 
531 	spin_lock(&lockres->l_lock);
532 	user_dlm_dec_holders(lockres, level);
533 	__user_dlm_cond_queue_lockres(lockres);
534 	spin_unlock(&lockres->l_lock);
535 }
536 
user_dlm_write_lvb(struct inode * inode,const char * val,unsigned int len)537 void user_dlm_write_lvb(struct inode *inode,
538 			const char *val,
539 			unsigned int len)
540 {
541 	struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres;
542 	char *lvb;
543 
544 	BUG_ON(len > DLM_LVB_LEN);
545 
546 	spin_lock(&lockres->l_lock);
547 
548 	BUG_ON(lockres->l_level < DLM_LOCK_EX);
549 	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
550 	memcpy(lvb, val, len);
551 
552 	spin_unlock(&lockres->l_lock);
553 }
554 
user_dlm_read_lvb(struct inode * inode,char * val)555 bool user_dlm_read_lvb(struct inode *inode, char *val)
556 {
557 	struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres;
558 	char *lvb;
559 	bool ret = true;
560 
561 	spin_lock(&lockres->l_lock);
562 
563 	BUG_ON(lockres->l_level < DLM_LOCK_PR);
564 	if (ocfs2_dlm_lvb_valid(&lockres->l_lksb)) {
565 		lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
566 		memcpy(val, lvb, DLM_LVB_LEN);
567 	} else
568 		ret = false;
569 
570 	spin_unlock(&lockres->l_lock);
571 	return ret;
572 }
573 
user_dlm_lock_res_init(struct user_lock_res * lockres,struct dentry * dentry)574 void user_dlm_lock_res_init(struct user_lock_res *lockres,
575 			    struct dentry *dentry)
576 {
577 	memset(lockres, 0, sizeof(*lockres));
578 
579 	spin_lock_init(&lockres->l_lock);
580 	init_waitqueue_head(&lockres->l_event);
581 	lockres->l_level = DLM_LOCK_IV;
582 	lockres->l_requested = DLM_LOCK_IV;
583 	lockres->l_blocking = DLM_LOCK_IV;
584 
585 	/* should have been checked before getting here. */
586 	BUG_ON(dentry->d_name.len >= USER_DLM_LOCK_ID_MAX_LEN);
587 
588 	memcpy(lockres->l_name,
589 	       dentry->d_name.name,
590 	       dentry->d_name.len);
591 	lockres->l_namelen = dentry->d_name.len;
592 }
593 
user_dlm_destroy_lock(struct user_lock_res * lockres)594 int user_dlm_destroy_lock(struct user_lock_res *lockres)
595 {
596 	int status = -EBUSY;
597 	struct ocfs2_cluster_connection *conn =
598 		cluster_connection_from_user_lockres(lockres);
599 
600 	mlog(ML_BASTS, "lockres %.*s\n", lockres->l_namelen, lockres->l_name);
601 
602 	spin_lock(&lockres->l_lock);
603 	if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) {
604 		spin_unlock(&lockres->l_lock);
605 		goto bail;
606 	}
607 
608 	lockres->l_flags |= USER_LOCK_IN_TEARDOWN;
609 
610 	while (lockres->l_flags & USER_LOCK_BUSY) {
611 		spin_unlock(&lockres->l_lock);
612 
613 		user_wait_on_busy_lock(lockres);
614 
615 		spin_lock(&lockres->l_lock);
616 	}
617 
618 	if (lockres->l_ro_holders || lockres->l_ex_holders) {
619 		lockres->l_flags &= ~USER_LOCK_IN_TEARDOWN;
620 		spin_unlock(&lockres->l_lock);
621 		goto bail;
622 	}
623 
624 	status = 0;
625 	if (!(lockres->l_flags & USER_LOCK_ATTACHED)) {
626 		/*
627 		 * lock is never requested, leave USER_LOCK_IN_TEARDOWN set
628 		 * to avoid new lock request coming in.
629 		 */
630 		spin_unlock(&lockres->l_lock);
631 		goto bail;
632 	}
633 
634 	lockres->l_flags &= ~USER_LOCK_ATTACHED;
635 	lockres->l_flags |= USER_LOCK_BUSY;
636 	spin_unlock(&lockres->l_lock);
637 
638 	status = ocfs2_dlm_unlock(conn, &lockres->l_lksb, DLM_LKF_VALBLK);
639 	if (status) {
640 		spin_lock(&lockres->l_lock);
641 		lockres->l_flags &= ~USER_LOCK_IN_TEARDOWN;
642 		lockres->l_flags &= ~USER_LOCK_BUSY;
643 		spin_unlock(&lockres->l_lock);
644 		user_log_dlm_error("ocfs2_dlm_unlock", status, lockres);
645 		goto bail;
646 	}
647 
648 	user_wait_on_busy_lock(lockres);
649 
650 	status = 0;
651 bail:
652 	return status;
653 }
654 
user_dlm_recovery_handler_noop(int node_num,void * recovery_data)655 static void user_dlm_recovery_handler_noop(int node_num,
656 					   void *recovery_data)
657 {
658 	/* We ignore recovery events */
659 	return;
660 }
661 
user_dlm_set_locking_protocol(void)662 void user_dlm_set_locking_protocol(void)
663 {
664 	ocfs2_stack_glue_set_max_proto_version(&user_dlm_lproto.lp_max_version);
665 }
666 
user_dlm_register(const struct qstr * name)667 struct ocfs2_cluster_connection *user_dlm_register(const struct qstr *name)
668 {
669 	int rc;
670 	struct ocfs2_cluster_connection *conn;
671 
672 	rc = ocfs2_cluster_connect_agnostic(name->name, name->len,
673 					    &user_dlm_lproto,
674 					    user_dlm_recovery_handler_noop,
675 					    NULL, &conn);
676 	if (rc)
677 		mlog_errno(rc);
678 
679 	return rc ? ERR_PTR(rc) : conn;
680 }
681 
user_dlm_unregister(struct ocfs2_cluster_connection * conn)682 void user_dlm_unregister(struct ocfs2_cluster_connection *conn)
683 {
684 	ocfs2_cluster_disconnect(conn, 0);
685 }
686