• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003 Hewlett-Packard Development Company LP.
28  * Developed under the sponsorship of the US Government under
29  * Subcontract No. B514193
30  *
31  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
32  * Use is subject to license terms.
33  *
34  * Copyright (c) 2010, 2012, Intel Corporation.
35  */
36 /*
37  * This file is part of Lustre, http://www.lustre.org/
38  * Lustre is a trademark of Sun Microsystems, Inc.
39  */
40 
41 /**
42  * This file implements POSIX lock type for Lustre.
43  * Its policy properties are start and end of extent and PID.
44  *
45  * These locks are only done through MDS due to POSIX semantics requiring
46  * e.g. that locks could be only partially released and as such split into
47  * two parts, and also that two adjacent locks from the same process may be
48  * merged into a single wider lock.
49  *
50  * Lock modes are mapped like this:
51  * PR and PW for READ and WRITE locks
52  * NL to request a releasing of a portion of the lock
53  *
54  * These flock locks never timeout.
55  */
56 
57 #define DEBUG_SUBSYSTEM S_LDLM
58 
59 #include "../include/lustre_dlm.h"
60 #include "../include/obd_support.h"
61 #include "../include/obd_class.h"
62 #include "../include/lustre_lib.h"
63 #include <linux/list.h>
64 #include "ldlm_internal.h"
65 
66 /**
67  * list_for_remaining_safe - iterate over the remaining entries in a list
68  *	      and safeguard against removal of a list entry.
69  * \param pos   the &struct list_head to use as a loop counter. pos MUST
70  *	      have been initialized prior to using it in this macro.
71  * \param n     another &struct list_head to use as temporary storage
72  * \param head  the head for your list.
73  */
74 #define list_for_remaining_safe(pos, n, head) \
75 	for (n = pos->next; pos != (head); pos = n, n = pos->next)
76 
77 static inline int
ldlm_same_flock_owner(struct ldlm_lock * lock,struct ldlm_lock * new)78 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
79 {
80 	return((new->l_policy_data.l_flock.owner ==
81 		lock->l_policy_data.l_flock.owner) &&
82 	       (new->l_export == lock->l_export));
83 }
84 
85 static inline int
ldlm_flocks_overlap(struct ldlm_lock * lock,struct ldlm_lock * new)86 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
87 {
88 	return((new->l_policy_data.l_flock.start <=
89 		lock->l_policy_data.l_flock.end) &&
90 	       (new->l_policy_data.l_flock.end >=
91 		lock->l_policy_data.l_flock.start));
92 }
93 
94 static inline void
ldlm_flock_destroy(struct ldlm_lock * lock,ldlm_mode_t mode,__u64 flags)95 ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, __u64 flags)
96 {
97 	LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%llx)",
98 		   mode, flags);
99 
100 	/* Safe to not lock here, since it should be empty anyway */
101 	LASSERT(hlist_unhashed(&lock->l_exp_flock_hash));
102 
103 	list_del_init(&lock->l_res_link);
104 	if (flags == LDLM_FL_WAIT_NOREPROC &&
105 	    !(lock->l_flags & LDLM_FL_FAILED)) {
106 		/* client side - set a flag to prevent sending a CANCEL */
107 		lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
108 
109 		/* when reaching here, it is under lock_res_and_lock(). Thus,
110 		   need call the nolock version of ldlm_lock_decref_internal*/
111 		ldlm_lock_decref_internal_nolock(lock, mode);
112 	}
113 
114 	ldlm_lock_destroy_nolock(lock);
115 }
116 
117 /**
118  * Process a granting attempt for flock lock.
119  * Must be called under ns lock held.
120  *
121  * This function looks for any conflicts for \a lock in the granted or
122  * waiting queues. The lock is granted if no conflicts are found in
123  * either queue.
124  *
125  * It is also responsible for splitting a lock if a portion of the lock
126  * is released.
127  *
128  * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
129  *   - blocking ASTs have already been sent
130  *
131  * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
132  *   - blocking ASTs have not been sent yet, so list of conflicting locks
133  *     would be collected and ASTs sent.
134  */
ldlm_process_flock_lock(struct ldlm_lock * req,__u64 * flags,int first_enq,ldlm_error_t * err,struct list_head * work_list)135 static int ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags,
136 				   int first_enq, ldlm_error_t *err,
137 				   struct list_head *work_list)
138 {
139 	struct ldlm_resource *res = req->l_resource;
140 	struct ldlm_namespace *ns = ldlm_res_to_ns(res);
141 	struct list_head *tmp;
142 	struct list_head *ownlocks = NULL;
143 	struct ldlm_lock *lock = NULL;
144 	struct ldlm_lock *new = req;
145 	struct ldlm_lock *new2 = NULL;
146 	ldlm_mode_t mode = req->l_req_mode;
147 	int added = (mode == LCK_NL);
148 	int overlaps = 0;
149 	int splitted = 0;
150 	const struct ldlm_callback_suite null_cbs = { NULL };
151 
152 	CDEBUG(D_DLMTRACE,
153 	       "flags %#llx owner %llu pid %u mode %u start %llu end %llu\n",
154 	       *flags, new->l_policy_data.l_flock.owner,
155 	       new->l_policy_data.l_flock.pid, mode,
156 	       req->l_policy_data.l_flock.start,
157 	       req->l_policy_data.l_flock.end);
158 
159 	*err = ELDLM_OK;
160 
161 	/* No blocking ASTs are sent to the clients for
162 	 * Posix file & record locks */
163 	req->l_blocking_ast = NULL;
164 
165 reprocess:
166 	if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
167 		/* This loop determines where this processes locks start
168 		 * in the resource lr_granted list. */
169 		list_for_each(tmp, &res->lr_granted) {
170 			lock = list_entry(tmp, struct ldlm_lock,
171 					      l_res_link);
172 			if (ldlm_same_flock_owner(lock, req)) {
173 				ownlocks = tmp;
174 				break;
175 			}
176 		}
177 	} else {
178 		int reprocess_failed = 0;
179 
180 		lockmode_verify(mode);
181 
182 		/* This loop determines if there are existing locks
183 		 * that conflict with the new lock request. */
184 		list_for_each(tmp, &res->lr_granted) {
185 			lock = list_entry(tmp, struct ldlm_lock,
186 					      l_res_link);
187 
188 			if (ldlm_same_flock_owner(lock, req)) {
189 				if (!ownlocks)
190 					ownlocks = tmp;
191 				continue;
192 			}
193 
194 			/* locks are compatible, overlap doesn't matter */
195 			if (lockmode_compat(lock->l_granted_mode, mode))
196 				continue;
197 
198 			if (!ldlm_flocks_overlap(lock, req))
199 				continue;
200 
201 			if (!first_enq) {
202 				reprocess_failed = 1;
203 				continue;
204 			}
205 
206 			if (*flags & LDLM_FL_BLOCK_NOWAIT) {
207 				ldlm_flock_destroy(req, mode, *flags);
208 				*err = -EAGAIN;
209 				return LDLM_ITER_STOP;
210 			}
211 
212 			if (*flags & LDLM_FL_TEST_LOCK) {
213 				ldlm_flock_destroy(req, mode, *flags);
214 				req->l_req_mode = lock->l_granted_mode;
215 				req->l_policy_data.l_flock.pid =
216 					lock->l_policy_data.l_flock.pid;
217 				req->l_policy_data.l_flock.start =
218 					lock->l_policy_data.l_flock.start;
219 				req->l_policy_data.l_flock.end =
220 					lock->l_policy_data.l_flock.end;
221 				*flags |= LDLM_FL_LOCK_CHANGED;
222 				return LDLM_ITER_STOP;
223 			}
224 
225 			ldlm_resource_add_lock(res, &res->lr_waiting, req);
226 			*flags |= LDLM_FL_BLOCK_GRANTED;
227 			return LDLM_ITER_STOP;
228 		}
229 		if (reprocess_failed)
230 			return LDLM_ITER_CONTINUE;
231 	}
232 
233 	if (*flags & LDLM_FL_TEST_LOCK) {
234 		ldlm_flock_destroy(req, mode, *flags);
235 		req->l_req_mode = LCK_NL;
236 		*flags |= LDLM_FL_LOCK_CHANGED;
237 		return LDLM_ITER_STOP;
238 	}
239 
240 	/* Scan the locks owned by this process that overlap this request.
241 	 * We may have to merge or split existing locks. */
242 
243 	if (!ownlocks)
244 		ownlocks = &res->lr_granted;
245 
246 	list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
247 		lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
248 
249 		if (!ldlm_same_flock_owner(lock, new))
250 			break;
251 
252 		if (lock->l_granted_mode == mode) {
253 			/* If the modes are the same then we need to process
254 			 * locks that overlap OR adjoin the new lock. The extra
255 			 * logic condition is necessary to deal with arithmetic
256 			 * overflow and underflow. */
257 			if ((new->l_policy_data.l_flock.start >
258 			     (lock->l_policy_data.l_flock.end + 1))
259 			    && (lock->l_policy_data.l_flock.end !=
260 				OBD_OBJECT_EOF))
261 				continue;
262 
263 			if ((new->l_policy_data.l_flock.end <
264 			     (lock->l_policy_data.l_flock.start - 1))
265 			    && (lock->l_policy_data.l_flock.start != 0))
266 				break;
267 
268 			if (new->l_policy_data.l_flock.start <
269 			    lock->l_policy_data.l_flock.start) {
270 				lock->l_policy_data.l_flock.start =
271 					new->l_policy_data.l_flock.start;
272 			} else {
273 				new->l_policy_data.l_flock.start =
274 					lock->l_policy_data.l_flock.start;
275 			}
276 
277 			if (new->l_policy_data.l_flock.end >
278 			    lock->l_policy_data.l_flock.end) {
279 				lock->l_policy_data.l_flock.end =
280 					new->l_policy_data.l_flock.end;
281 			} else {
282 				new->l_policy_data.l_flock.end =
283 					lock->l_policy_data.l_flock.end;
284 			}
285 
286 			if (added) {
287 				ldlm_flock_destroy(lock, mode, *flags);
288 			} else {
289 				new = lock;
290 				added = 1;
291 			}
292 			continue;
293 		}
294 
295 		if (new->l_policy_data.l_flock.start >
296 		    lock->l_policy_data.l_flock.end)
297 			continue;
298 
299 		if (new->l_policy_data.l_flock.end <
300 		    lock->l_policy_data.l_flock.start)
301 			break;
302 
303 		++overlaps;
304 
305 		if (new->l_policy_data.l_flock.start <=
306 		    lock->l_policy_data.l_flock.start) {
307 			if (new->l_policy_data.l_flock.end <
308 			    lock->l_policy_data.l_flock.end) {
309 				lock->l_policy_data.l_flock.start =
310 					new->l_policy_data.l_flock.end + 1;
311 				break;
312 			}
313 			ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
314 			continue;
315 		}
316 		if (new->l_policy_data.l_flock.end >=
317 		    lock->l_policy_data.l_flock.end) {
318 			lock->l_policy_data.l_flock.end =
319 				new->l_policy_data.l_flock.start - 1;
320 			continue;
321 		}
322 
323 		/* split the existing lock into two locks */
324 
325 		/* if this is an F_UNLCK operation then we could avoid
326 		 * allocating a new lock and use the req lock passed in
327 		 * with the request but this would complicate the reply
328 		 * processing since updates to req get reflected in the
329 		 * reply. The client side replays the lock request so
330 		 * it must see the original lock data in the reply. */
331 
332 		/* XXX - if ldlm_lock_new() can sleep we should
333 		 * release the lr_lock, allocate the new lock,
334 		 * and restart processing this lock. */
335 		if (!new2) {
336 			unlock_res_and_lock(req);
337 			new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
338 						lock->l_granted_mode, &null_cbs,
339 						NULL, 0, LVB_T_NONE);
340 			lock_res_and_lock(req);
341 			if (!new2) {
342 				ldlm_flock_destroy(req, lock->l_granted_mode,
343 						   *flags);
344 				*err = -ENOLCK;
345 				return LDLM_ITER_STOP;
346 			}
347 			goto reprocess;
348 		}
349 
350 		splitted = 1;
351 
352 		new2->l_granted_mode = lock->l_granted_mode;
353 		new2->l_policy_data.l_flock.pid =
354 			new->l_policy_data.l_flock.pid;
355 		new2->l_policy_data.l_flock.owner =
356 			new->l_policy_data.l_flock.owner;
357 		new2->l_policy_data.l_flock.start =
358 			lock->l_policy_data.l_flock.start;
359 		new2->l_policy_data.l_flock.end =
360 			new->l_policy_data.l_flock.start - 1;
361 		lock->l_policy_data.l_flock.start =
362 			new->l_policy_data.l_flock.end + 1;
363 		new2->l_conn_export = lock->l_conn_export;
364 		if (lock->l_export != NULL) {
365 			new2->l_export = class_export_lock_get(lock->l_export,
366 							       new2);
367 			if (new2->l_export->exp_lock_hash &&
368 			    hlist_unhashed(&new2->l_exp_hash))
369 				cfs_hash_add(new2->l_export->exp_lock_hash,
370 					     &new2->l_remote_handle,
371 					     &new2->l_exp_hash);
372 		}
373 		if (*flags == LDLM_FL_WAIT_NOREPROC)
374 			ldlm_lock_addref_internal_nolock(new2,
375 							 lock->l_granted_mode);
376 
377 		/* insert new2 at lock */
378 		ldlm_resource_add_lock(res, ownlocks, new2);
379 		LDLM_LOCK_RELEASE(new2);
380 		break;
381 	}
382 
383 	/* if new2 is created but never used, destroy it*/
384 	if (splitted == 0 && new2 != NULL)
385 		ldlm_lock_destroy_nolock(new2);
386 
387 	/* At this point we're granting the lock request. */
388 	req->l_granted_mode = req->l_req_mode;
389 
390 	if (!added) {
391 		list_del_init(&req->l_res_link);
392 		/* insert new lock before ownlocks in list. */
393 		ldlm_resource_add_lock(res, ownlocks, req);
394 	}
395 
396 	if (*flags != LDLM_FL_WAIT_NOREPROC) {
397 		/* The only one possible case for client-side calls flock
398 		 * policy function is ldlm_flock_completion_ast inside which
399 		 * carries LDLM_FL_WAIT_NOREPROC flag. */
400 		CERROR("Illegal parameter for client-side-only module.\n");
401 		LBUG();
402 	}
403 
404 	/* In case we're reprocessing the requested lock we can't destroy
405 	 * it until after calling ldlm_add_ast_work_item() above so that laawi()
406 	 * can bump the reference count on \a req. Otherwise \a req
407 	 * could be freed before the completion AST can be sent.  */
408 	if (added)
409 		ldlm_flock_destroy(req, mode, *flags);
410 
411 	ldlm_resource_dump(D_INFO, res);
412 	return LDLM_ITER_CONTINUE;
413 }
414 
415 struct ldlm_flock_wait_data {
416 	struct ldlm_lock *fwd_lock;
417 	int	       fwd_generation;
418 };
419 
420 static void
ldlm_flock_interrupted_wait(void * data)421 ldlm_flock_interrupted_wait(void *data)
422 {
423 	struct ldlm_lock *lock;
424 
425 	lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
426 
427 	lock_res_and_lock(lock);
428 
429 	/* client side - set flag to prevent lock from being put on LRU list */
430 	lock->l_flags |= LDLM_FL_CBPENDING;
431 	unlock_res_and_lock(lock);
432 }
433 
434 /**
435  * Flock completion callback function.
436  *
437  * \param lock [in,out]: A lock to be handled
438  * \param flags    [in]: flags
439  * \param *data    [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
440  *
441  * \retval 0    : success
442  * \retval <0   : failure
443  */
444 int
ldlm_flock_completion_ast(struct ldlm_lock * lock,__u64 flags,void * data)445 ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
446 {
447 	struct file_lock		*getlk = lock->l_ast_data;
448 	struct obd_device	      *obd;
449 	struct obd_import	      *imp = NULL;
450 	struct ldlm_flock_wait_data     fwd;
451 	struct l_wait_info	      lwi;
452 	ldlm_error_t		    err;
453 	int			     rc = 0;
454 
455 	CDEBUG(D_DLMTRACE, "flags: 0x%llx data: %p getlk: %p\n",
456 	       flags, data, getlk);
457 
458 	/* Import invalidation. We need to actually release the lock
459 	 * references being held, so that it can go away. No point in
460 	 * holding the lock even if app still believes it has it, since
461 	 * server already dropped it anyway. Only for granted locks too. */
462 	if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) ==
463 	    (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
464 		if (lock->l_req_mode == lock->l_granted_mode &&
465 		    lock->l_granted_mode != LCK_NL &&
466 		    data == NULL)
467 			ldlm_lock_decref_internal(lock, lock->l_req_mode);
468 
469 		/* Need to wake up the waiter if we were evicted */
470 		wake_up(&lock->l_waitq);
471 		return 0;
472 	}
473 
474 	LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
475 
476 	if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
477 		       LDLM_FL_BLOCK_CONV))) {
478 		if (data == NULL)
479 			/* mds granted the lock in the reply */
480 			goto granted;
481 		/* CP AST RPC: lock get granted, wake it up */
482 		wake_up(&lock->l_waitq);
483 		return 0;
484 	}
485 
486 	LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, sleeping");
487 	fwd.fwd_lock = lock;
488 	obd = class_exp2obd(lock->l_conn_export);
489 
490 	/* if this is a local lock, there is no import */
491 	if (obd != NULL)
492 		imp = obd->u.cli.cl_import;
493 
494 	if (imp != NULL) {
495 		spin_lock(&imp->imp_lock);
496 		fwd.fwd_generation = imp->imp_generation;
497 		spin_unlock(&imp->imp_lock);
498 	}
499 
500 	lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
501 
502 	/* Go to sleep until the lock is granted. */
503 	rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
504 
505 	if (rc) {
506 		LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
507 			   rc);
508 		return rc;
509 	}
510 
511 granted:
512 	OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
513 
514 	if (lock->l_flags & LDLM_FL_DESTROYED) {
515 		LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
516 		return 0;
517 	}
518 
519 	if (lock->l_flags & LDLM_FL_FAILED) {
520 		LDLM_DEBUG(lock, "client-side enqueue waking up: failed");
521 		return -EIO;
522 	}
523 
524 	if (rc) {
525 		LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
526 			   rc);
527 		return rc;
528 	}
529 
530 	LDLM_DEBUG(lock, "client-side enqueue granted");
531 
532 	lock_res_and_lock(lock);
533 
534 	/* ldlm_lock_enqueue() has already placed lock on the granted list. */
535 	list_del_init(&lock->l_res_link);
536 
537 	if (lock->l_flags & LDLM_FL_FLOCK_DEADLOCK) {
538 		LDLM_DEBUG(lock, "client-side enqueue deadlock received");
539 		rc = -EDEADLK;
540 	} else if (flags & LDLM_FL_TEST_LOCK) {
541 		/* fcntl(F_GETLK) request */
542 		/* The old mode was saved in getlk->fl_type so that if the mode
543 		 * in the lock changes we can decref the appropriate refcount.*/
544 		ldlm_flock_destroy(lock, getlk->fl_type, LDLM_FL_WAIT_NOREPROC);
545 		switch (lock->l_granted_mode) {
546 		case LCK_PR:
547 			getlk->fl_type = F_RDLCK;
548 			break;
549 		case LCK_PW:
550 			getlk->fl_type = F_WRLCK;
551 			break;
552 		default:
553 			getlk->fl_type = F_UNLCK;
554 		}
555 		getlk->fl_pid = (pid_t)lock->l_policy_data.l_flock.pid;
556 		getlk->fl_start = (loff_t)lock->l_policy_data.l_flock.start;
557 		getlk->fl_end = (loff_t)lock->l_policy_data.l_flock.end;
558 	} else {
559 		__u64 noreproc = LDLM_FL_WAIT_NOREPROC;
560 
561 		/* We need to reprocess the lock to do merges or splits
562 		 * with existing locks owned by this process. */
563 		ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
564 	}
565 	unlock_res_and_lock(lock);
566 	return rc;
567 }
568 EXPORT_SYMBOL(ldlm_flock_completion_ast);
569 
ldlm_flock_policy_wire18_to_local(const ldlm_wire_policy_data_t * wpolicy,ldlm_policy_data_t * lpolicy)570 void ldlm_flock_policy_wire18_to_local(const ldlm_wire_policy_data_t *wpolicy,
571 				       ldlm_policy_data_t *lpolicy)
572 {
573 	memset(lpolicy, 0, sizeof(*lpolicy));
574 	lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
575 	lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
576 	lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
577 	/* Compat code, old clients had no idea about owner field and
578 	 * relied solely on pid for ownership. Introduced in LU-104, 2.1,
579 	 * April 2011 */
580 	lpolicy->l_flock.owner = wpolicy->l_flock.lfw_pid;
581 }
582 
ldlm_flock_policy_wire21_to_local(const ldlm_wire_policy_data_t * wpolicy,ldlm_policy_data_t * lpolicy)583 void ldlm_flock_policy_wire21_to_local(const ldlm_wire_policy_data_t *wpolicy,
584 				       ldlm_policy_data_t *lpolicy)
585 {
586 	memset(lpolicy, 0, sizeof(*lpolicy));
587 	lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
588 	lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
589 	lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
590 	lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
591 }
592 
ldlm_flock_policy_local_to_wire(const ldlm_policy_data_t * lpolicy,ldlm_wire_policy_data_t * wpolicy)593 void ldlm_flock_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
594 				     ldlm_wire_policy_data_t *wpolicy)
595 {
596 	memset(wpolicy, 0, sizeof(*wpolicy));
597 	wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
598 	wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
599 	wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
600 	wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;
601 }
602