1 /*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26 /*
27 * Copyright (c) 2003 Hewlett-Packard Development Company LP.
28 * Developed under the sponsorship of the US Government under
29 * Subcontract No. B514193
30 *
31 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
32 * Use is subject to license terms.
33 *
34 * Copyright (c) 2010, 2012, Intel Corporation.
35 */
36 /*
37 * This file is part of Lustre, http://www.lustre.org/
38 * Lustre is a trademark of Sun Microsystems, Inc.
39 */
40
41 /**
42 * This file implements POSIX lock type for Lustre.
43 * Its policy properties are start and end of extent and PID.
44 *
45 * These locks are only done through MDS due to POSIX semantics requiring
46 * e.g. that locks could be only partially released and as such split into
47 * two parts, and also that two adjacent locks from the same process may be
48 * merged into a single wider lock.
49 *
50 * Lock modes are mapped like this:
51 * PR and PW for READ and WRITE locks
52 * NL to request a releasing of a portion of the lock
53 *
54 * These flock locks never timeout.
55 */
56
57 #define DEBUG_SUBSYSTEM S_LDLM
58
59 #include "../include/lustre_dlm.h"
60 #include "../include/obd_support.h"
61 #include "../include/obd_class.h"
62 #include "../include/lustre_lib.h"
63 #include <linux/list.h>
64 #include "ldlm_internal.h"
65
66 /**
67 * list_for_remaining_safe - iterate over the remaining entries in a list
68 * and safeguard against removal of a list entry.
69 * \param pos the &struct list_head to use as a loop counter. pos MUST
70 * have been initialized prior to using it in this macro.
71 * \param n another &struct list_head to use as temporary storage
72 * \param head the head for your list.
73 */
74 #define list_for_remaining_safe(pos, n, head) \
75 for (n = pos->next; pos != (head); pos = n, n = pos->next)
76
77 static inline int
ldlm_same_flock_owner(struct ldlm_lock * lock,struct ldlm_lock * new)78 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
79 {
80 return((new->l_policy_data.l_flock.owner ==
81 lock->l_policy_data.l_flock.owner) &&
82 (new->l_export == lock->l_export));
83 }
84
85 static inline int
ldlm_flocks_overlap(struct ldlm_lock * lock,struct ldlm_lock * new)86 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
87 {
88 return((new->l_policy_data.l_flock.start <=
89 lock->l_policy_data.l_flock.end) &&
90 (new->l_policy_data.l_flock.end >=
91 lock->l_policy_data.l_flock.start));
92 }
93
94 static inline void
ldlm_flock_destroy(struct ldlm_lock * lock,ldlm_mode_t mode,__u64 flags)95 ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, __u64 flags)
96 {
97 LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%llx)",
98 mode, flags);
99
100 /* Safe to not lock here, since it should be empty anyway */
101 LASSERT(hlist_unhashed(&lock->l_exp_flock_hash));
102
103 list_del_init(&lock->l_res_link);
104 if (flags == LDLM_FL_WAIT_NOREPROC &&
105 !(lock->l_flags & LDLM_FL_FAILED)) {
106 /* client side - set a flag to prevent sending a CANCEL */
107 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
108
109 /* when reaching here, it is under lock_res_and_lock(). Thus,
110 need call the nolock version of ldlm_lock_decref_internal*/
111 ldlm_lock_decref_internal_nolock(lock, mode);
112 }
113
114 ldlm_lock_destroy_nolock(lock);
115 }
116
117 /**
118 * Process a granting attempt for flock lock.
119 * Must be called under ns lock held.
120 *
121 * This function looks for any conflicts for \a lock in the granted or
122 * waiting queues. The lock is granted if no conflicts are found in
123 * either queue.
124 *
125 * It is also responsible for splitting a lock if a portion of the lock
126 * is released.
127 *
128 * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
129 * - blocking ASTs have already been sent
130 *
131 * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
132 * - blocking ASTs have not been sent yet, so list of conflicting locks
133 * would be collected and ASTs sent.
134 */
ldlm_process_flock_lock(struct ldlm_lock * req,__u64 * flags,int first_enq,ldlm_error_t * err,struct list_head * work_list)135 static int ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags,
136 int first_enq, ldlm_error_t *err,
137 struct list_head *work_list)
138 {
139 struct ldlm_resource *res = req->l_resource;
140 struct ldlm_namespace *ns = ldlm_res_to_ns(res);
141 struct list_head *tmp;
142 struct list_head *ownlocks = NULL;
143 struct ldlm_lock *lock = NULL;
144 struct ldlm_lock *new = req;
145 struct ldlm_lock *new2 = NULL;
146 ldlm_mode_t mode = req->l_req_mode;
147 int added = (mode == LCK_NL);
148 int overlaps = 0;
149 int splitted = 0;
150 const struct ldlm_callback_suite null_cbs = { NULL };
151
152 CDEBUG(D_DLMTRACE,
153 "flags %#llx owner %llu pid %u mode %u start %llu end %llu\n",
154 *flags, new->l_policy_data.l_flock.owner,
155 new->l_policy_data.l_flock.pid, mode,
156 req->l_policy_data.l_flock.start,
157 req->l_policy_data.l_flock.end);
158
159 *err = ELDLM_OK;
160
161 /* No blocking ASTs are sent to the clients for
162 * Posix file & record locks */
163 req->l_blocking_ast = NULL;
164
165 reprocess:
166 if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
167 /* This loop determines where this processes locks start
168 * in the resource lr_granted list. */
169 list_for_each(tmp, &res->lr_granted) {
170 lock = list_entry(tmp, struct ldlm_lock,
171 l_res_link);
172 if (ldlm_same_flock_owner(lock, req)) {
173 ownlocks = tmp;
174 break;
175 }
176 }
177 } else {
178 int reprocess_failed = 0;
179
180 lockmode_verify(mode);
181
182 /* This loop determines if there are existing locks
183 * that conflict with the new lock request. */
184 list_for_each(tmp, &res->lr_granted) {
185 lock = list_entry(tmp, struct ldlm_lock,
186 l_res_link);
187
188 if (ldlm_same_flock_owner(lock, req)) {
189 if (!ownlocks)
190 ownlocks = tmp;
191 continue;
192 }
193
194 /* locks are compatible, overlap doesn't matter */
195 if (lockmode_compat(lock->l_granted_mode, mode))
196 continue;
197
198 if (!ldlm_flocks_overlap(lock, req))
199 continue;
200
201 if (!first_enq) {
202 reprocess_failed = 1;
203 continue;
204 }
205
206 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
207 ldlm_flock_destroy(req, mode, *flags);
208 *err = -EAGAIN;
209 return LDLM_ITER_STOP;
210 }
211
212 if (*flags & LDLM_FL_TEST_LOCK) {
213 ldlm_flock_destroy(req, mode, *flags);
214 req->l_req_mode = lock->l_granted_mode;
215 req->l_policy_data.l_flock.pid =
216 lock->l_policy_data.l_flock.pid;
217 req->l_policy_data.l_flock.start =
218 lock->l_policy_data.l_flock.start;
219 req->l_policy_data.l_flock.end =
220 lock->l_policy_data.l_flock.end;
221 *flags |= LDLM_FL_LOCK_CHANGED;
222 return LDLM_ITER_STOP;
223 }
224
225 ldlm_resource_add_lock(res, &res->lr_waiting, req);
226 *flags |= LDLM_FL_BLOCK_GRANTED;
227 return LDLM_ITER_STOP;
228 }
229 if (reprocess_failed)
230 return LDLM_ITER_CONTINUE;
231 }
232
233 if (*flags & LDLM_FL_TEST_LOCK) {
234 ldlm_flock_destroy(req, mode, *flags);
235 req->l_req_mode = LCK_NL;
236 *flags |= LDLM_FL_LOCK_CHANGED;
237 return LDLM_ITER_STOP;
238 }
239
240 /* Scan the locks owned by this process that overlap this request.
241 * We may have to merge or split existing locks. */
242
243 if (!ownlocks)
244 ownlocks = &res->lr_granted;
245
246 list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
247 lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
248
249 if (!ldlm_same_flock_owner(lock, new))
250 break;
251
252 if (lock->l_granted_mode == mode) {
253 /* If the modes are the same then we need to process
254 * locks that overlap OR adjoin the new lock. The extra
255 * logic condition is necessary to deal with arithmetic
256 * overflow and underflow. */
257 if ((new->l_policy_data.l_flock.start >
258 (lock->l_policy_data.l_flock.end + 1))
259 && (lock->l_policy_data.l_flock.end !=
260 OBD_OBJECT_EOF))
261 continue;
262
263 if ((new->l_policy_data.l_flock.end <
264 (lock->l_policy_data.l_flock.start - 1))
265 && (lock->l_policy_data.l_flock.start != 0))
266 break;
267
268 if (new->l_policy_data.l_flock.start <
269 lock->l_policy_data.l_flock.start) {
270 lock->l_policy_data.l_flock.start =
271 new->l_policy_data.l_flock.start;
272 } else {
273 new->l_policy_data.l_flock.start =
274 lock->l_policy_data.l_flock.start;
275 }
276
277 if (new->l_policy_data.l_flock.end >
278 lock->l_policy_data.l_flock.end) {
279 lock->l_policy_data.l_flock.end =
280 new->l_policy_data.l_flock.end;
281 } else {
282 new->l_policy_data.l_flock.end =
283 lock->l_policy_data.l_flock.end;
284 }
285
286 if (added) {
287 ldlm_flock_destroy(lock, mode, *flags);
288 } else {
289 new = lock;
290 added = 1;
291 }
292 continue;
293 }
294
295 if (new->l_policy_data.l_flock.start >
296 lock->l_policy_data.l_flock.end)
297 continue;
298
299 if (new->l_policy_data.l_flock.end <
300 lock->l_policy_data.l_flock.start)
301 break;
302
303 ++overlaps;
304
305 if (new->l_policy_data.l_flock.start <=
306 lock->l_policy_data.l_flock.start) {
307 if (new->l_policy_data.l_flock.end <
308 lock->l_policy_data.l_flock.end) {
309 lock->l_policy_data.l_flock.start =
310 new->l_policy_data.l_flock.end + 1;
311 break;
312 }
313 ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
314 continue;
315 }
316 if (new->l_policy_data.l_flock.end >=
317 lock->l_policy_data.l_flock.end) {
318 lock->l_policy_data.l_flock.end =
319 new->l_policy_data.l_flock.start - 1;
320 continue;
321 }
322
323 /* split the existing lock into two locks */
324
325 /* if this is an F_UNLCK operation then we could avoid
326 * allocating a new lock and use the req lock passed in
327 * with the request but this would complicate the reply
328 * processing since updates to req get reflected in the
329 * reply. The client side replays the lock request so
330 * it must see the original lock data in the reply. */
331
332 /* XXX - if ldlm_lock_new() can sleep we should
333 * release the lr_lock, allocate the new lock,
334 * and restart processing this lock. */
335 if (!new2) {
336 unlock_res_and_lock(req);
337 new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
338 lock->l_granted_mode, &null_cbs,
339 NULL, 0, LVB_T_NONE);
340 lock_res_and_lock(req);
341 if (!new2) {
342 ldlm_flock_destroy(req, lock->l_granted_mode,
343 *flags);
344 *err = -ENOLCK;
345 return LDLM_ITER_STOP;
346 }
347 goto reprocess;
348 }
349
350 splitted = 1;
351
352 new2->l_granted_mode = lock->l_granted_mode;
353 new2->l_policy_data.l_flock.pid =
354 new->l_policy_data.l_flock.pid;
355 new2->l_policy_data.l_flock.owner =
356 new->l_policy_data.l_flock.owner;
357 new2->l_policy_data.l_flock.start =
358 lock->l_policy_data.l_flock.start;
359 new2->l_policy_data.l_flock.end =
360 new->l_policy_data.l_flock.start - 1;
361 lock->l_policy_data.l_flock.start =
362 new->l_policy_data.l_flock.end + 1;
363 new2->l_conn_export = lock->l_conn_export;
364 if (lock->l_export != NULL) {
365 new2->l_export = class_export_lock_get(lock->l_export,
366 new2);
367 if (new2->l_export->exp_lock_hash &&
368 hlist_unhashed(&new2->l_exp_hash))
369 cfs_hash_add(new2->l_export->exp_lock_hash,
370 &new2->l_remote_handle,
371 &new2->l_exp_hash);
372 }
373 if (*flags == LDLM_FL_WAIT_NOREPROC)
374 ldlm_lock_addref_internal_nolock(new2,
375 lock->l_granted_mode);
376
377 /* insert new2 at lock */
378 ldlm_resource_add_lock(res, ownlocks, new2);
379 LDLM_LOCK_RELEASE(new2);
380 break;
381 }
382
383 /* if new2 is created but never used, destroy it*/
384 if (splitted == 0 && new2 != NULL)
385 ldlm_lock_destroy_nolock(new2);
386
387 /* At this point we're granting the lock request. */
388 req->l_granted_mode = req->l_req_mode;
389
390 if (!added) {
391 list_del_init(&req->l_res_link);
392 /* insert new lock before ownlocks in list. */
393 ldlm_resource_add_lock(res, ownlocks, req);
394 }
395
396 if (*flags != LDLM_FL_WAIT_NOREPROC) {
397 /* The only one possible case for client-side calls flock
398 * policy function is ldlm_flock_completion_ast inside which
399 * carries LDLM_FL_WAIT_NOREPROC flag. */
400 CERROR("Illegal parameter for client-side-only module.\n");
401 LBUG();
402 }
403
404 /* In case we're reprocessing the requested lock we can't destroy
405 * it until after calling ldlm_add_ast_work_item() above so that laawi()
406 * can bump the reference count on \a req. Otherwise \a req
407 * could be freed before the completion AST can be sent. */
408 if (added)
409 ldlm_flock_destroy(req, mode, *flags);
410
411 ldlm_resource_dump(D_INFO, res);
412 return LDLM_ITER_CONTINUE;
413 }
414
415 struct ldlm_flock_wait_data {
416 struct ldlm_lock *fwd_lock;
417 int fwd_generation;
418 };
419
420 static void
ldlm_flock_interrupted_wait(void * data)421 ldlm_flock_interrupted_wait(void *data)
422 {
423 struct ldlm_lock *lock;
424
425 lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
426
427 lock_res_and_lock(lock);
428
429 /* client side - set flag to prevent lock from being put on LRU list */
430 lock->l_flags |= LDLM_FL_CBPENDING;
431 unlock_res_and_lock(lock);
432 }
433
434 /**
435 * Flock completion callback function.
436 *
437 * \param lock [in,out]: A lock to be handled
438 * \param flags [in]: flags
439 * \param *data [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
440 *
441 * \retval 0 : success
442 * \retval <0 : failure
443 */
444 int
ldlm_flock_completion_ast(struct ldlm_lock * lock,__u64 flags,void * data)445 ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
446 {
447 struct file_lock *getlk = lock->l_ast_data;
448 struct obd_device *obd;
449 struct obd_import *imp = NULL;
450 struct ldlm_flock_wait_data fwd;
451 struct l_wait_info lwi;
452 ldlm_error_t err;
453 int rc = 0;
454
455 CDEBUG(D_DLMTRACE, "flags: 0x%llx data: %p getlk: %p\n",
456 flags, data, getlk);
457
458 /* Import invalidation. We need to actually release the lock
459 * references being held, so that it can go away. No point in
460 * holding the lock even if app still believes it has it, since
461 * server already dropped it anyway. Only for granted locks too. */
462 if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) ==
463 (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
464 if (lock->l_req_mode == lock->l_granted_mode &&
465 lock->l_granted_mode != LCK_NL &&
466 data == NULL)
467 ldlm_lock_decref_internal(lock, lock->l_req_mode);
468
469 /* Need to wake up the waiter if we were evicted */
470 wake_up(&lock->l_waitq);
471 return 0;
472 }
473
474 LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
475
476 if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
477 LDLM_FL_BLOCK_CONV))) {
478 if (data == NULL)
479 /* mds granted the lock in the reply */
480 goto granted;
481 /* CP AST RPC: lock get granted, wake it up */
482 wake_up(&lock->l_waitq);
483 return 0;
484 }
485
486 LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, sleeping");
487 fwd.fwd_lock = lock;
488 obd = class_exp2obd(lock->l_conn_export);
489
490 /* if this is a local lock, there is no import */
491 if (obd != NULL)
492 imp = obd->u.cli.cl_import;
493
494 if (imp != NULL) {
495 spin_lock(&imp->imp_lock);
496 fwd.fwd_generation = imp->imp_generation;
497 spin_unlock(&imp->imp_lock);
498 }
499
500 lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
501
502 /* Go to sleep until the lock is granted. */
503 rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
504
505 if (rc) {
506 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
507 rc);
508 return rc;
509 }
510
511 granted:
512 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
513
514 if (lock->l_flags & LDLM_FL_DESTROYED) {
515 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
516 return 0;
517 }
518
519 if (lock->l_flags & LDLM_FL_FAILED) {
520 LDLM_DEBUG(lock, "client-side enqueue waking up: failed");
521 return -EIO;
522 }
523
524 if (rc) {
525 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
526 rc);
527 return rc;
528 }
529
530 LDLM_DEBUG(lock, "client-side enqueue granted");
531
532 lock_res_and_lock(lock);
533
534 /* ldlm_lock_enqueue() has already placed lock on the granted list. */
535 list_del_init(&lock->l_res_link);
536
537 if (lock->l_flags & LDLM_FL_FLOCK_DEADLOCK) {
538 LDLM_DEBUG(lock, "client-side enqueue deadlock received");
539 rc = -EDEADLK;
540 } else if (flags & LDLM_FL_TEST_LOCK) {
541 /* fcntl(F_GETLK) request */
542 /* The old mode was saved in getlk->fl_type so that if the mode
543 * in the lock changes we can decref the appropriate refcount.*/
544 ldlm_flock_destroy(lock, getlk->fl_type, LDLM_FL_WAIT_NOREPROC);
545 switch (lock->l_granted_mode) {
546 case LCK_PR:
547 getlk->fl_type = F_RDLCK;
548 break;
549 case LCK_PW:
550 getlk->fl_type = F_WRLCK;
551 break;
552 default:
553 getlk->fl_type = F_UNLCK;
554 }
555 getlk->fl_pid = (pid_t)lock->l_policy_data.l_flock.pid;
556 getlk->fl_start = (loff_t)lock->l_policy_data.l_flock.start;
557 getlk->fl_end = (loff_t)lock->l_policy_data.l_flock.end;
558 } else {
559 __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
560
561 /* We need to reprocess the lock to do merges or splits
562 * with existing locks owned by this process. */
563 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
564 }
565 unlock_res_and_lock(lock);
566 return rc;
567 }
568 EXPORT_SYMBOL(ldlm_flock_completion_ast);
569
ldlm_flock_policy_wire18_to_local(const ldlm_wire_policy_data_t * wpolicy,ldlm_policy_data_t * lpolicy)570 void ldlm_flock_policy_wire18_to_local(const ldlm_wire_policy_data_t *wpolicy,
571 ldlm_policy_data_t *lpolicy)
572 {
573 memset(lpolicy, 0, sizeof(*lpolicy));
574 lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
575 lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
576 lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
577 /* Compat code, old clients had no idea about owner field and
578 * relied solely on pid for ownership. Introduced in LU-104, 2.1,
579 * April 2011 */
580 lpolicy->l_flock.owner = wpolicy->l_flock.lfw_pid;
581 }
582
ldlm_flock_policy_wire21_to_local(const ldlm_wire_policy_data_t * wpolicy,ldlm_policy_data_t * lpolicy)583 void ldlm_flock_policy_wire21_to_local(const ldlm_wire_policy_data_t *wpolicy,
584 ldlm_policy_data_t *lpolicy)
585 {
586 memset(lpolicy, 0, sizeof(*lpolicy));
587 lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
588 lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
589 lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
590 lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
591 }
592
ldlm_flock_policy_local_to_wire(const ldlm_policy_data_t * lpolicy,ldlm_wire_policy_data_t * wpolicy)593 void ldlm_flock_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
594 ldlm_wire_policy_data_t *wpolicy)
595 {
596 memset(wpolicy, 0, sizeof(*wpolicy));
597 wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
598 wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
599 wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
600 wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;
601 }
602