• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32 
33 #define DEBUG_SUBSYSTEM S_MDC
34 
35 #include <linux/module.h>
36 
37 #include <lustre_intent.h>
38 #include <obd.h>
39 #include <obd_class.h>
40 #include <lustre_dlm.h>
41 #include <lustre_fid.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46 
47 #include "mdc_internal.h"
48 
49 struct mdc_getattr_args {
50 	struct obd_export	   *ga_exp;
51 	struct md_enqueue_info      *ga_minfo;
52 };
53 
it_open_error(int phase,struct lookup_intent * it)54 int it_open_error(int phase, struct lookup_intent *it)
55 {
56 	if (it_disposition(it, DISP_OPEN_LEASE)) {
57 		if (phase >= DISP_OPEN_LEASE)
58 			return it->it_status;
59 		else
60 			return 0;
61 	}
62 	if (it_disposition(it, DISP_OPEN_OPEN)) {
63 		if (phase >= DISP_OPEN_OPEN)
64 			return it->it_status;
65 		else
66 			return 0;
67 	}
68 
69 	if (it_disposition(it, DISP_OPEN_CREATE)) {
70 		if (phase >= DISP_OPEN_CREATE)
71 			return it->it_status;
72 		else
73 			return 0;
74 	}
75 
76 	if (it_disposition(it, DISP_LOOKUP_EXECD)) {
77 		if (phase >= DISP_LOOKUP_EXECD)
78 			return it->it_status;
79 		else
80 			return 0;
81 	}
82 
83 	if (it_disposition(it, DISP_IT_EXECD)) {
84 		if (phase >= DISP_IT_EXECD)
85 			return it->it_status;
86 		else
87 			return 0;
88 	}
89 	CERROR("it disp: %X, status: %d\n", it->it_disposition,
90 	       it->it_status);
91 	LBUG();
92 	return 0;
93 }
94 EXPORT_SYMBOL(it_open_error);
95 
96 /* this must be called on a lockh that is known to have a referenced lock */
mdc_set_lock_data(struct obd_export * exp,const struct lustre_handle * lockh,void * data,__u64 * bits)97 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
98 		      void *data, __u64 *bits)
99 {
100 	struct ldlm_lock *lock;
101 	struct inode *new_inode = data;
102 
103 	if (bits)
104 		*bits = 0;
105 
106 	if (!lustre_handle_is_used(lockh))
107 		return 0;
108 
109 	lock = ldlm_handle2lock(lockh);
110 
111 	LASSERT(lock);
112 	lock_res_and_lock(lock);
113 	if (lock->l_resource->lr_lvb_inode &&
114 	    lock->l_resource->lr_lvb_inode != data) {
115 		struct inode *old_inode = lock->l_resource->lr_lvb_inode;
116 
117 		LASSERTF(old_inode->i_state & I_FREEING,
118 			 "Found existing inode %p/%lu/%u state %lu in lock: setting data to %p/%lu/%u\n",
119 			 old_inode, old_inode->i_ino, old_inode->i_generation,
120 			 old_inode->i_state, new_inode, new_inode->i_ino,
121 			 new_inode->i_generation);
122 	}
123 	lock->l_resource->lr_lvb_inode = new_inode;
124 	if (bits)
125 		*bits = lock->l_policy_data.l_inodebits.bits;
126 
127 	unlock_res_and_lock(lock);
128 	LDLM_LOCK_PUT(lock);
129 
130 	return 0;
131 }
132 
mdc_lock_match(struct obd_export * exp,__u64 flags,const struct lu_fid * fid,enum ldlm_type type,union ldlm_policy_data * policy,enum ldlm_mode mode,struct lustre_handle * lockh)133 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
134 			      const struct lu_fid *fid, enum ldlm_type type,
135 			      union ldlm_policy_data *policy,
136 			      enum ldlm_mode mode,
137 			      struct lustre_handle *lockh)
138 {
139 	struct ldlm_res_id res_id;
140 	enum ldlm_mode rc;
141 
142 	fid_build_reg_res_name(fid, &res_id);
143 	/* LU-4405: Clear bits not supported by server */
144 	policy->l_inodebits.bits &= exp_connect_ibits(exp);
145 	rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
146 			     &res_id, type, policy, mode, lockh, 0);
147 	return rc;
148 }
149 
mdc_cancel_unused(struct obd_export * exp,const struct lu_fid * fid,union ldlm_policy_data * policy,enum ldlm_mode mode,enum ldlm_cancel_flags flags,void * opaque)150 int mdc_cancel_unused(struct obd_export *exp,
151 		      const struct lu_fid *fid,
152 		      union ldlm_policy_data *policy,
153 		      enum ldlm_mode mode,
154 		      enum ldlm_cancel_flags flags,
155 		      void *opaque)
156 {
157 	struct ldlm_res_id res_id;
158 	struct obd_device *obd = class_exp2obd(exp);
159 	int rc;
160 
161 	fid_build_reg_res_name(fid, &res_id);
162 	rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
163 					     policy, mode, flags, opaque);
164 	return rc;
165 }
166 
mdc_null_inode(struct obd_export * exp,const struct lu_fid * fid)167 int mdc_null_inode(struct obd_export *exp,
168 		   const struct lu_fid *fid)
169 {
170 	struct ldlm_res_id res_id;
171 	struct ldlm_resource *res;
172 	struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
173 
174 	LASSERTF(ns, "no namespace passed\n");
175 
176 	fid_build_reg_res_name(fid, &res_id);
177 
178 	res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
179 	if (IS_ERR(res))
180 		return 0;
181 
182 	lock_res(res);
183 	res->lr_lvb_inode = NULL;
184 	unlock_res(res);
185 
186 	ldlm_resource_putref(res);
187 	return 0;
188 }
189 
mdc_clear_replay_flag(struct ptlrpc_request * req,int rc)190 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
191 {
192 	/* Don't hold error requests for replay. */
193 	if (req->rq_replay) {
194 		spin_lock(&req->rq_lock);
195 		req->rq_replay = 0;
196 		spin_unlock(&req->rq_lock);
197 	}
198 	if (rc && req->rq_transno != 0) {
199 		DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
200 		LBUG();
201 	}
202 }
203 
204 /* Save a large LOV EA into the request buffer so that it is available
205  * for replay.  We don't do this in the initial request because the
206  * original request doesn't need this buffer (at most it sends just the
207  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
208  * buffer and may also be difficult to allocate and save a very large
209  * request buffer for each open. (bug 5707)
210  *
211  * OOM here may cause recovery failure if lmm is needed (only for the
212  * original open if the MDS crashed just when this client also OOM'd)
213  * but this is incredibly unlikely, and questionable whether the client
214  * could do MDS recovery under OOM anyways...
215  */
mdc_realloc_openmsg(struct ptlrpc_request * req,struct mdt_body * body)216 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
217 				struct mdt_body *body)
218 {
219 	int     rc;
220 
221 	/* FIXME: remove this explicit offset. */
222 	rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
223 					body->mbo_eadatasize);
224 	if (rc) {
225 		CERROR("Can't enlarge segment %d size to %d\n",
226 		       DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize);
227 		body->mbo_valid &= ~OBD_MD_FLEASIZE;
228 		body->mbo_eadatasize = 0;
229 	}
230 }
231 
232 static struct ptlrpc_request *
mdc_intent_open_pack(struct obd_export * exp,struct lookup_intent * it,struct md_op_data * op_data)233 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
234 		     struct md_op_data *op_data)
235 {
236 	struct ptlrpc_request *req;
237 	struct obd_device     *obddev = class_exp2obd(exp);
238 	struct ldlm_intent    *lit;
239 	const void *lmm = op_data->op_data;
240 	u32 lmmsize = op_data->op_data_size;
241 	LIST_HEAD(cancels);
242 	int		    count = 0;
243 	int		    mode;
244 	int		    rc;
245 
246 	it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
247 
248 	/* XXX: openlock is not cancelled for cross-refs. */
249 	/* If inode is known, cancel conflicting OPEN locks. */
250 	if (fid_is_sane(&op_data->op_fid2)) {
251 		if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
252 			if (it->it_flags & FMODE_WRITE)
253 				mode = LCK_EX;
254 			else
255 				mode = LCK_PR;
256 		} else {
257 			if (it->it_flags & (FMODE_WRITE | MDS_OPEN_TRUNC))
258 				mode = LCK_CW;
259 			else if (it->it_flags & __FMODE_EXEC)
260 				mode = LCK_PR;
261 			else
262 				mode = LCK_CR;
263 		}
264 		count = mdc_resource_get_unused(exp, &op_data->op_fid2,
265 						&cancels, mode,
266 						MDS_INODELOCK_OPEN);
267 	}
268 
269 	/* If CREATE, cancel parent's UPDATE lock. */
270 	if (it->it_op & IT_CREAT)
271 		mode = LCK_EX;
272 	else
273 		mode = LCK_CR;
274 	count += mdc_resource_get_unused(exp, &op_data->op_fid1,
275 					 &cancels, mode,
276 					 MDS_INODELOCK_UPDATE);
277 
278 	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
279 				   &RQF_LDLM_INTENT_OPEN);
280 	if (!req) {
281 		ldlm_lock_list_put(&cancels, l_bl_ast, count);
282 		return ERR_PTR(-ENOMEM);
283 	}
284 
285 	req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
286 			     op_data->op_namelen + 1);
287 	req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
288 			     max(lmmsize, obddev->u.cli.cl_default_mds_easize));
289 
290 	rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
291 	if (rc < 0) {
292 		ptlrpc_request_free(req);
293 		return ERR_PTR(rc);
294 	}
295 
296 	spin_lock(&req->rq_lock);
297 	req->rq_replay = req->rq_import->imp_replayable;
298 	spin_unlock(&req->rq_lock);
299 
300 	/* pack the intent */
301 	lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
302 	lit->opc = (__u64)it->it_op;
303 
304 	/* pack the intended request */
305 	mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
306 		      lmmsize);
307 
308 	req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
309 			     obddev->u.cli.cl_max_mds_easize);
310 
311 	ptlrpc_request_set_replen(req);
312 	return req;
313 }
314 
315 static struct ptlrpc_request *
mdc_intent_getxattr_pack(struct obd_export * exp,struct lookup_intent * it,struct md_op_data * op_data)316 mdc_intent_getxattr_pack(struct obd_export *exp,
317 			 struct lookup_intent *it,
318 			 struct md_op_data *op_data)
319 {
320 	struct ptlrpc_request	*req;
321 	struct ldlm_intent	*lit;
322 	int rc, count = 0;
323 	u32 maxdata;
324 	LIST_HEAD(cancels);
325 
326 	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
327 				   &RQF_LDLM_INTENT_GETXATTR);
328 	if (!req)
329 		return ERR_PTR(-ENOMEM);
330 
331 	rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
332 	if (rc) {
333 		ptlrpc_request_free(req);
334 		return ERR_PTR(rc);
335 	}
336 
337 	/* pack the intent */
338 	lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
339 	lit->opc = IT_GETXATTR;
340 
341 	maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
342 
343 	/* pack the intended request */
344 	mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1,
345 		      0);
346 
347 	req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER, maxdata);
348 
349 	req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER, maxdata);
350 
351 	req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
352 			     RCL_SERVER, maxdata);
353 
354 	ptlrpc_request_set_replen(req);
355 
356 	return req;
357 }
358 
mdc_intent_unlink_pack(struct obd_export * exp,struct lookup_intent * it,struct md_op_data * op_data)359 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
360 						     struct lookup_intent *it,
361 						     struct md_op_data *op_data)
362 {
363 	struct ptlrpc_request *req;
364 	struct obd_device     *obddev = class_exp2obd(exp);
365 	struct ldlm_intent    *lit;
366 	int		    rc;
367 
368 	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
369 				   &RQF_LDLM_INTENT_UNLINK);
370 	if (!req)
371 		return ERR_PTR(-ENOMEM);
372 
373 	req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
374 			     op_data->op_namelen + 1);
375 
376 	rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
377 	if (rc) {
378 		ptlrpc_request_free(req);
379 		return ERR_PTR(rc);
380 	}
381 
382 	/* pack the intent */
383 	lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
384 	lit->opc = (__u64)it->it_op;
385 
386 	/* pack the intended request */
387 	mdc_unlink_pack(req, op_data);
388 
389 	req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
390 			     obddev->u.cli.cl_default_mds_easize);
391 	ptlrpc_request_set_replen(req);
392 	return req;
393 }
394 
mdc_intent_getattr_pack(struct obd_export * exp,struct lookup_intent * it,struct md_op_data * op_data)395 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
396 						      struct lookup_intent *it,
397 						     struct md_op_data *op_data)
398 {
399 	struct ptlrpc_request *req;
400 	struct obd_device     *obddev = class_exp2obd(exp);
401 	u64		       valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
402 				       OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
403 				       OBD_MD_MEA | OBD_MD_FLACL;
404 	struct ldlm_intent    *lit;
405 	int		    rc;
406 	u32 easize;
407 
408 	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
409 				   &RQF_LDLM_INTENT_GETATTR);
410 	if (!req)
411 		return ERR_PTR(-ENOMEM);
412 
413 	req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
414 			     op_data->op_namelen + 1);
415 
416 	rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
417 	if (rc) {
418 		ptlrpc_request_free(req);
419 		return ERR_PTR(rc);
420 	}
421 
422 	/* pack the intent */
423 	lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
424 	lit->opc = (__u64)it->it_op;
425 
426 	if (obddev->u.cli.cl_default_mds_easize > 0)
427 		easize = obddev->u.cli.cl_default_mds_easize;
428 	else
429 		easize = obddev->u.cli.cl_max_mds_easize;
430 
431 	/* pack the intended request */
432 	mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
433 
434 	req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
435 	ptlrpc_request_set_replen(req);
436 	return req;
437 }
438 
mdc_intent_layout_pack(struct obd_export * exp,struct lookup_intent * it,struct md_op_data * unused)439 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
440 						     struct lookup_intent *it,
441 						     struct md_op_data *unused)
442 {
443 	struct obd_device     *obd = class_exp2obd(exp);
444 	struct ptlrpc_request *req;
445 	struct ldlm_intent    *lit;
446 	struct layout_intent  *layout;
447 	int rc;
448 
449 	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
450 				   &RQF_LDLM_INTENT_LAYOUT);
451 	if (!req)
452 		return ERR_PTR(-ENOMEM);
453 
454 	req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
455 	rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
456 	if (rc) {
457 		ptlrpc_request_free(req);
458 		return ERR_PTR(rc);
459 	}
460 
461 	/* pack the intent */
462 	lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
463 	lit->opc = (__u64)it->it_op;
464 
465 	/* pack the layout intent request */
466 	layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
467 	/* LAYOUT_INTENT_ACCESS is generic, specific operation will be
468 	 * set for replication
469 	 */
470 	layout->li_opc = LAYOUT_INTENT_ACCESS;
471 
472 	req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
473 			     obd->u.cli.cl_default_mds_easize);
474 	ptlrpc_request_set_replen(req);
475 	return req;
476 }
477 
478 static struct ptlrpc_request *
mdc_enqueue_pack(struct obd_export * exp,int lvb_len)479 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
480 {
481 	struct ptlrpc_request *req;
482 	int rc;
483 
484 	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
485 	if (!req)
486 		return ERR_PTR(-ENOMEM);
487 
488 	rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
489 	if (rc) {
490 		ptlrpc_request_free(req);
491 		return ERR_PTR(rc);
492 	}
493 
494 	req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
495 	ptlrpc_request_set_replen(req);
496 	return req;
497 }
498 
mdc_finish_enqueue(struct obd_export * exp,struct ptlrpc_request * req,struct ldlm_enqueue_info * einfo,struct lookup_intent * it,struct lustre_handle * lockh,int rc)499 static int mdc_finish_enqueue(struct obd_export *exp,
500 			      struct ptlrpc_request *req,
501 			      struct ldlm_enqueue_info *einfo,
502 			      struct lookup_intent *it,
503 			      struct lustre_handle *lockh,
504 			      int rc)
505 {
506 	struct req_capsule  *pill = &req->rq_pill;
507 	struct ldlm_request *lockreq;
508 	struct ldlm_reply   *lockrep;
509 	struct ldlm_lock    *lock;
510 	void		*lvb_data = NULL;
511 	u32 lvb_len = 0;
512 
513 	LASSERT(rc >= 0);
514 	/* Similarly, if we're going to replay this request, we don't want to
515 	 * actually get a lock, just perform the intent.
516 	 */
517 	if (req->rq_transno || req->rq_replay) {
518 		lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
519 		lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
520 	}
521 
522 	if (rc == ELDLM_LOCK_ABORTED) {
523 		einfo->ei_mode = 0;
524 		memset(lockh, 0, sizeof(*lockh));
525 		rc = 0;
526 	} else { /* rc = 0 */
527 		lock = ldlm_handle2lock(lockh);
528 
529 		/* If the server gave us back a different lock mode, we should
530 		 * fix up our variables.
531 		 */
532 		if (lock->l_req_mode != einfo->ei_mode) {
533 			ldlm_lock_addref(lockh, lock->l_req_mode);
534 			ldlm_lock_decref(lockh, einfo->ei_mode);
535 			einfo->ei_mode = lock->l_req_mode;
536 		}
537 		LDLM_LOCK_PUT(lock);
538 	}
539 
540 	lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
541 
542 	it->it_disposition = (int)lockrep->lock_policy_res1;
543 	it->it_status = (int)lockrep->lock_policy_res2;
544 	it->it_lock_mode = einfo->ei_mode;
545 	it->it_lock_handle = lockh->cookie;
546 	it->it_request = req;
547 
548 	/* Technically speaking rq_transno must already be zero if
549 	 * it_status is in error, so the check is a bit redundant
550 	 */
551 	if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
552 		mdc_clear_replay_flag(req, it->it_status);
553 
554 	/* If we're doing an IT_OPEN which did not result in an actual
555 	 * successful open, then we need to remove the bit which saves
556 	 * this request for unconditional replay.
557 	 *
558 	 * It's important that we do this first!  Otherwise we might exit the
559 	 * function without doing so, and try to replay a failed create
560 	 * (bug 3440)
561 	 */
562 	if (it->it_op & IT_OPEN && req->rq_replay &&
563 	    (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
564 		mdc_clear_replay_flag(req, it->it_status);
565 
566 	DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
567 		  it->it_op, it->it_disposition, it->it_status);
568 
569 	/* We know what to expect, so we do any byte flipping required here */
570 	if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
571 		struct mdt_body *body;
572 
573 		body = req_capsule_server_get(pill, &RMF_MDT_BODY);
574 		if (!body) {
575 			CERROR("Can't swab mdt_body\n");
576 			return -EPROTO;
577 		}
578 
579 		if (it_disposition(it, DISP_OPEN_OPEN) &&
580 		    !it_open_error(DISP_OPEN_OPEN, it)) {
581 			/*
582 			 * If this is a successful OPEN request, we need to set
583 			 * replay handler and data early, so that if replay
584 			 * happens immediately after swabbing below, new reply
585 			 * is swabbed by that handler correctly.
586 			 */
587 			mdc_set_open_replay_data(NULL, NULL, it);
588 		}
589 
590 		if ((body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
591 			void *eadata;
592 
593 			mdc_update_max_ea_from_body(exp, body);
594 
595 			/*
596 			 * The eadata is opaque; just check that it is there.
597 			 * Eventually, obd_unpackmd() will check the contents.
598 			 */
599 			eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
600 							      body->mbo_eadatasize);
601 			if (!eadata)
602 				return -EPROTO;
603 
604 			/* save lvb data and length in case this is for layout
605 			 * lock
606 			 */
607 			lvb_data = eadata;
608 			lvb_len = body->mbo_eadatasize;
609 
610 			/*
611 			 * We save the reply LOV EA in case we have to replay a
612 			 * create for recovery.  If we didn't allocate a large
613 			 * enough request buffer above we need to reallocate it
614 			 * here to hold the actual LOV EA.
615 			 *
616 			 * To not save LOV EA if request is not going to replay
617 			 * (for example error one).
618 			 */
619 			if ((it->it_op & IT_OPEN) && req->rq_replay) {
620 				void *lmm;
621 
622 				if (req_capsule_get_size(pill, &RMF_EADATA,
623 							 RCL_CLIENT) <
624 				    body->mbo_eadatasize)
625 					mdc_realloc_openmsg(req, body);
626 				else
627 					req_capsule_shrink(pill, &RMF_EADATA,
628 							   body->mbo_eadatasize,
629 							   RCL_CLIENT);
630 
631 				req_capsule_set_size(pill, &RMF_EADATA,
632 						     RCL_CLIENT,
633 						     body->mbo_eadatasize);
634 
635 				lmm = req_capsule_client_get(pill, &RMF_EADATA);
636 				if (lmm)
637 					memcpy(lmm, eadata, body->mbo_eadatasize);
638 			}
639 		}
640 	} else if (it->it_op & IT_LAYOUT) {
641 		/* maybe the lock was granted right away and layout
642 		 * is packed into RMF_DLM_LVB of req
643 		 */
644 		lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
645 		if (lvb_len > 0) {
646 			lvb_data = req_capsule_server_sized_get(pill,
647 								&RMF_DLM_LVB,
648 								lvb_len);
649 			if (!lvb_data)
650 				return -EPROTO;
651 		}
652 	}
653 
654 	/* fill in stripe data for layout lock */
655 	lock = ldlm_handle2lock(lockh);
656 	if (lock && ldlm_has_layout(lock) && lvb_data) {
657 		void *lmm;
658 
659 		LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
660 			   ldlm_it2str(it->it_op), lvb_len);
661 
662 		lmm = libcfs_kvzalloc(lvb_len, GFP_NOFS);
663 		if (!lmm) {
664 			LDLM_LOCK_PUT(lock);
665 			return -ENOMEM;
666 		}
667 		memcpy(lmm, lvb_data, lvb_len);
668 
669 		/* install lvb_data */
670 		lock_res_and_lock(lock);
671 		if (!lock->l_lvb_data) {
672 			lock->l_lvb_type = LVB_T_LAYOUT;
673 			lock->l_lvb_data = lmm;
674 			lock->l_lvb_len = lvb_len;
675 			lmm = NULL;
676 		}
677 		unlock_res_and_lock(lock);
678 		if (lmm)
679 			kvfree(lmm);
680 	}
681 	if (lock)
682 		LDLM_LOCK_PUT(lock);
683 
684 	return rc;
685 }
686 
687 /* We always reserve enough space in the reply packet for a stripe MD, because
688  * we don't know in advance the file type.
689  */
mdc_enqueue(struct obd_export * exp,struct ldlm_enqueue_info * einfo,const union ldlm_policy_data * policy,struct lookup_intent * it,struct md_op_data * op_data,struct lustre_handle * lockh,u64 extra_lock_flags)690 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
691 		const union ldlm_policy_data *policy,
692 		struct lookup_intent *it, struct md_op_data *op_data,
693 		struct lustre_handle *lockh, u64 extra_lock_flags)
694 {
695 	static const union ldlm_policy_data lookup_policy = {
696 		.l_inodebits = { MDS_INODELOCK_LOOKUP }
697 	};
698 	static const union ldlm_policy_data update_policy = {
699 		.l_inodebits = { MDS_INODELOCK_UPDATE }
700 	};
701 	static const union ldlm_policy_data layout_policy = {
702 		.l_inodebits = { MDS_INODELOCK_LAYOUT }
703 	};
704 	static const union ldlm_policy_data getxattr_policy = {
705 		.l_inodebits = { MDS_INODELOCK_XATTR }
706 	};
707 	struct obd_device *obddev = class_exp2obd(exp);
708 	struct ptlrpc_request *req = NULL;
709 	u64 flags, saved_flags = extra_lock_flags;
710 	struct ldlm_res_id res_id;
711 	int generation, resends = 0;
712 	struct ldlm_reply *lockrep;
713 	enum lvb_type lvb_type = LVB_T_NONE;
714 	int rc;
715 
716 	LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
717 		 einfo->ei_type);
718 	fid_build_reg_res_name(&op_data->op_fid1, &res_id);
719 
720 	if (it) {
721 		LASSERT(!policy);
722 
723 		saved_flags |= LDLM_FL_HAS_INTENT;
724 		if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
725 			policy = &update_policy;
726 		else if (it->it_op & IT_LAYOUT)
727 			policy = &layout_policy;
728 		else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
729 			policy = &getxattr_policy;
730 		else
731 			policy = &lookup_policy;
732 	}
733 
734 	generation = obddev->u.cli.cl_import->imp_generation;
735 resend:
736 	flags = saved_flags;
737 	if (!it) {
738 		/* The only way right now is FLOCK. */
739 		LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
740 			 einfo->ei_type);
741 		res_id.name[3] = LDLM_FLOCK;
742 	} else if (it->it_op & IT_OPEN) {
743 		req = mdc_intent_open_pack(exp, it, op_data);
744 	} else if (it->it_op & IT_UNLINK) {
745 		req = mdc_intent_unlink_pack(exp, it, op_data);
746 	} else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
747 		req = mdc_intent_getattr_pack(exp, it, op_data);
748 	} else if (it->it_op & IT_READDIR) {
749 		req = mdc_enqueue_pack(exp, 0);
750 	} else if (it->it_op & IT_LAYOUT) {
751 		if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
752 			return -EOPNOTSUPP;
753 		req = mdc_intent_layout_pack(exp, it, op_data);
754 		lvb_type = LVB_T_LAYOUT;
755 	} else if (it->it_op & IT_GETXATTR) {
756 		req = mdc_intent_getxattr_pack(exp, it, op_data);
757 	} else {
758 		LBUG();
759 		return -EINVAL;
760 	}
761 
762 	if (IS_ERR(req))
763 		return PTR_ERR(req);
764 
765 	if (resends) {
766 		req->rq_generation_set = 1;
767 		req->rq_import_generation = generation;
768 		req->rq_sent = ktime_get_real_seconds() + resends;
769 	}
770 
771 	/* It is important to obtain modify RPC slot first (if applicable), so
772 	 * that threads that are waiting for a modify RPC slot are not polluting
773 	 * our rpcs in flight counter.
774 	 * We do not do flock request limiting, though
775 	 */
776 	if (it) {
777 		mdc_get_mod_rpc_slot(req, it);
778 		rc = obd_get_request_slot(&obddev->u.cli);
779 		if (rc != 0) {
780 			mdc_put_mod_rpc_slot(req, it);
781 			mdc_clear_replay_flag(req, 0);
782 			ptlrpc_req_finished(req);
783 			return rc;
784 		}
785 	}
786 
787 	rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
788 			      0, lvb_type, lockh, 0);
789 	if (!it) {
790 		/* For flock requests we immediately return without further
791 		 * delay and let caller deal with the rest, since rest of
792 		 * this function metadata processing makes no sense for flock
793 		 * requests anyway. But in case of problem during comms with
794 		 * Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
795 		 * can not rely on caller and this mainly for F_UNLCKs
796 		 * (explicits or automatically generated by Kernel to clean
797 		 * current FLocks upon exit) that can't be trashed
798 		 */
799 		if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
800 		    (einfo->ei_type == LDLM_FLOCK) &&
801 		    (einfo->ei_mode == LCK_NL))
802 			goto resend;
803 		return rc;
804 	}
805 
806 	obd_put_request_slot(&obddev->u.cli);
807 	mdc_put_mod_rpc_slot(req, it);
808 
809 	if (rc < 0) {
810 		CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
811 		       obddev->obd_name, rc);
812 
813 		mdc_clear_replay_flag(req, rc);
814 		ptlrpc_req_finished(req);
815 		return rc;
816 	}
817 
818 	lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
819 
820 	lockrep->lock_policy_res2 =
821 		ptlrpc_status_ntoh(lockrep->lock_policy_res2);
822 
823 	/*
824 	 * Retry infinitely when the server returns -EINPROGRESS for the
825 	 * intent operation, when server returns -EINPROGRESS for acquiring
826 	 * intent lock, we'll retry in after_reply().
827 	 */
828 	if (it->it_op && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
829 		mdc_clear_replay_flag(req, rc);
830 		ptlrpc_req_finished(req);
831 		resends++;
832 
833 		CDEBUG(D_HA, "%s: resend:%d op:%d " DFID "/" DFID "\n",
834 		       obddev->obd_name, resends, it->it_op,
835 		       PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
836 
837 		if (generation == obddev->u.cli.cl_import->imp_generation) {
838 			goto resend;
839 		} else {
840 			CDEBUG(D_HA, "resend cross eviction\n");
841 			return -EIO;
842 		}
843 	}
844 
845 	rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
846 	if (rc < 0) {
847 		if (lustre_handle_is_used(lockh)) {
848 			ldlm_lock_decref(lockh, einfo->ei_mode);
849 			memset(lockh, 0, sizeof(*lockh));
850 		}
851 		ptlrpc_req_finished(req);
852 
853 		it->it_lock_handle = 0;
854 		it->it_lock_mode = 0;
855 		it->it_request = NULL;
856 	}
857 
858 	return rc;
859 }
860 
mdc_finish_intent_lock(struct obd_export * exp,struct ptlrpc_request * request,struct md_op_data * op_data,struct lookup_intent * it,struct lustre_handle * lockh)861 static int mdc_finish_intent_lock(struct obd_export *exp,
862 				  struct ptlrpc_request *request,
863 				  struct md_op_data *op_data,
864 				  struct lookup_intent *it,
865 				  struct lustre_handle *lockh)
866 {
867 	struct lustre_handle old_lock;
868 	struct mdt_body *mdt_body;
869 	struct ldlm_lock *lock;
870 	int rc;
871 
872 	LASSERT(request != LP_POISON);
873 	LASSERT(request->rq_repmsg != LP_POISON);
874 
875 	if (it->it_op & IT_READDIR)
876 		return 0;
877 
878 	if (!it_disposition(it, DISP_IT_EXECD)) {
879 		/* The server failed before it even started executing the
880 		 * intent, i.e. because it couldn't unpack the request.
881 		 */
882 		LASSERT(it->it_status != 0);
883 		return it->it_status;
884 	}
885 	rc = it_open_error(DISP_IT_EXECD, it);
886 	if (rc)
887 		return rc;
888 
889 	mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
890 	LASSERT(mdt_body);      /* mdc_enqueue checked */
891 
892 	rc = it_open_error(DISP_LOOKUP_EXECD, it);
893 	if (rc)
894 		return rc;
895 
896 	/* keep requests around for the multiple phases of the call
897 	 * this shows the DISP_XX must guarantee we make it into the call
898 	 */
899 	if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
900 	    it_disposition(it, DISP_OPEN_CREATE) &&
901 	    !it_open_error(DISP_OPEN_CREATE, it)) {
902 		it_set_disposition(it, DISP_ENQ_CREATE_REF);
903 		ptlrpc_request_addref(request); /* balanced in ll_create_node */
904 	}
905 	if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
906 	    it_disposition(it, DISP_OPEN_OPEN) &&
907 	    !it_open_error(DISP_OPEN_OPEN, it)) {
908 		it_set_disposition(it, DISP_ENQ_OPEN_REF);
909 		ptlrpc_request_addref(request); /* balanced in ll_file_open */
910 		/* BUG 11546 - eviction in the middle of open rpc processing */
911 		OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
912 	}
913 
914 	if (it->it_op & IT_CREAT)
915 		/* XXX this belongs in ll_create_it */
916 		;
917 	else if (it->it_op == IT_OPEN)
918 		LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
919 	else
920 		LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
921 
922 	/* If we already have a matching lock, then cancel the new
923 	 * one.  We have to set the data here instead of in
924 	 * mdc_enqueue, because we need to use the child's inode as
925 	 * the l_ast_data to match, and that's not available until
926 	 * intent_finish has performed the iget().)
927 	 */
928 	lock = ldlm_handle2lock(lockh);
929 	if (lock) {
930 		union ldlm_policy_data policy = lock->l_policy_data;
931 
932 		LDLM_DEBUG(lock, "matching against this");
933 
934 		LASSERTF(fid_res_name_eq(&mdt_body->mbo_fid1,
935 					 &lock->l_resource->lr_name),
936 			 "Lock res_id: " DLDLMRES ", fid: " DFID "\n",
937 			 PLDLMRES(lock->l_resource), PFID(&mdt_body->mbo_fid1));
938 		LDLM_LOCK_PUT(lock);
939 
940 		memcpy(&old_lock, lockh, sizeof(*lockh));
941 		if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
942 				    LDLM_IBITS, &policy, LCK_NL,
943 				    &old_lock, 0)) {
944 			ldlm_lock_decref_and_cancel(lockh,
945 						    it->it_lock_mode);
946 			memcpy(lockh, &old_lock, sizeof(old_lock));
947 			it->it_lock_handle = lockh->cookie;
948 		}
949 	}
950 	CDEBUG(D_DENTRY,
951 	       "D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
952 	       (int)op_data->op_namelen, op_data->op_name,
953 	       ldlm_it2str(it->it_op), it->it_status, it->it_disposition, rc);
954 	return rc;
955 }
956 
mdc_revalidate_lock(struct obd_export * exp,struct lookup_intent * it,struct lu_fid * fid,__u64 * bits)957 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
958 			struct lu_fid *fid, __u64 *bits)
959 {
960 	/* We could just return 1 immediately, but since we should only
961 	 * be called in revalidate_it if we already have a lock, let's
962 	 * verify that.
963 	 */
964 	struct ldlm_res_id res_id;
965 	struct lustre_handle lockh;
966 	union ldlm_policy_data policy;
967 	enum ldlm_mode mode;
968 
969 	if (it->it_lock_handle) {
970 		lockh.cookie = it->it_lock_handle;
971 		mode = ldlm_revalidate_lock_handle(&lockh, bits);
972 	} else {
973 		fid_build_reg_res_name(fid, &res_id);
974 		switch (it->it_op) {
975 		case IT_GETATTR:
976 			/* File attributes are held under multiple bits:
977 			 * nlink is under lookup lock, size and times are
978 			 * under UPDATE lock and recently we've also got
979 			 * a separate permissions lock for owner/group/acl that
980 			 * were protected by lookup lock before.
981 			 * Getattr must provide all of that information,
982 			 * so we need to ensure we have all of those locks.
983 			 * Unfortunately, if the bits are split across multiple
984 			 * locks, there's no easy way to match all of them here,
985 			 * so an extra RPC would be performed to fetch all
986 			 * of those bits at once for now.
987 			 */
988 			/* For new MDTs(> 2.4), UPDATE|PERM should be enough,
989 			 * but for old MDTs (< 2.4), permission is covered
990 			 * by LOOKUP lock, so it needs to match all bits here.
991 			 */
992 			policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
993 						  MDS_INODELOCK_LOOKUP |
994 						  MDS_INODELOCK_PERM;
995 			break;
996 		case IT_READDIR:
997 			policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
998 			break;
999 		case IT_LAYOUT:
1000 			policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1001 			break;
1002 		default:
1003 			policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1004 			break;
1005 		}
1006 
1007 		mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1008 				      LDLM_IBITS, &policy,
1009 				      LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1010 				      &lockh);
1011 	}
1012 
1013 	if (mode) {
1014 		it->it_lock_handle = lockh.cookie;
1015 		it->it_lock_mode = mode;
1016 	} else {
1017 		it->it_lock_handle = 0;
1018 		it->it_lock_mode = 0;
1019 	}
1020 
1021 	return !!mode;
1022 }
1023 
1024 /*
1025  * This long block is all about fixing up the lock and request state
1026  * so that it is correct as of the moment _before_ the operation was
1027  * applied; that way, the VFS will think that everything is normal and
1028  * call Lustre's regular VFS methods.
1029  *
1030  * If we're performing a creation, that means that unless the creation
1031  * failed with EEXIST, we should fake up a negative dentry.
1032  *
1033  * For everything else, we want the lookup to succeed.
1034  *
1035  * One additional note: if CREATE or OPEN succeeded, we add an extra
1036  * reference to the request because we need to keep it around until
1037  * ll_create/ll_open gets called.
1038  *
1039  * The server will return to us, in it_disposition, an indication of
1040  * exactly what it_status refers to.
1041  *
1042  * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1043  * otherwise if DISP_OPEN_CREATE is set, then it_status is the
1044  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1045  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1046  * was successful.
1047  *
1048  * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1049  * child lookup.
1050  */
mdc_intent_lock(struct obd_export * exp,struct md_op_data * op_data,struct lookup_intent * it,struct ptlrpc_request ** reqp,ldlm_blocking_callback cb_blocking,__u64 extra_lock_flags)1051 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1052 		    struct lookup_intent *it, struct ptlrpc_request **reqp,
1053 		    ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1054 {
1055 	struct ldlm_enqueue_info einfo = {
1056 		.ei_type	= LDLM_IBITS,
1057 		.ei_mode	= it_to_lock_mode(it),
1058 		.ei_cb_bl	= cb_blocking,
1059 		.ei_cb_cp	= ldlm_completion_ast,
1060 	};
1061 	struct lustre_handle lockh;
1062 	int rc = 0;
1063 
1064 	LASSERT(it);
1065 
1066 	CDEBUG(D_DLMTRACE, "(name: %.*s," DFID ") in obj " DFID
1067 		", intent: %s flags %#Lo\n", (int)op_data->op_namelen,
1068 		op_data->op_name, PFID(&op_data->op_fid2),
1069 		PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1070 		it->it_flags);
1071 
1072 	lockh.cookie = 0;
1073 	if (fid_is_sane(&op_data->op_fid2) &&
1074 	    (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1075 		/* We could just return 1 immediately, but since we should only
1076 		 * be called in revalidate_it if we already have a lock, let's
1077 		 * verify that.
1078 		 */
1079 		it->it_lock_handle = 0;
1080 		rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1081 		/* Only return failure if it was not GETATTR by cfid
1082 		 * (from inode_revalidate)
1083 		 */
1084 		if (rc || op_data->op_namelen != 0)
1085 			return rc;
1086 	}
1087 
1088 	/* For case if upper layer did not alloc fid, do it now. */
1089 	if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1090 		rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1091 		if (rc < 0) {
1092 			CERROR("Can't alloc new fid, rc %d\n", rc);
1093 			return rc;
1094 		}
1095 	}
1096 	rc = mdc_enqueue(exp, &einfo, NULL, it, op_data, &lockh,
1097 			 extra_lock_flags);
1098 	if (rc < 0)
1099 		return rc;
1100 
1101 	*reqp = it->it_request;
1102 	rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1103 	return rc;
1104 }
1105 
mdc_intent_getattr_async_interpret(const struct lu_env * env,struct ptlrpc_request * req,void * args,int rc)1106 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1107 					      struct ptlrpc_request *req,
1108 					      void *args, int rc)
1109 {
1110 	struct mdc_getattr_args  *ga = args;
1111 	struct obd_export	*exp = ga->ga_exp;
1112 	struct md_enqueue_info   *minfo = ga->ga_minfo;
1113 	struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1114 	struct lookup_intent     *it;
1115 	struct lustre_handle     *lockh;
1116 	struct obd_device	*obddev;
1117 	struct ldlm_reply	 *lockrep;
1118 	__u64		     flags = LDLM_FL_HAS_INTENT;
1119 
1120 	it    = &minfo->mi_it;
1121 	lockh = &minfo->mi_lockh;
1122 
1123 	obddev = class_exp2obd(exp);
1124 
1125 	obd_put_request_slot(&obddev->u.cli);
1126 	if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1127 		rc = -ETIMEDOUT;
1128 
1129 	rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1130 				   &flags, NULL, 0, lockh, rc);
1131 	if (rc < 0) {
1132 		CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1133 		mdc_clear_replay_flag(req, rc);
1134 		goto out;
1135 	}
1136 
1137 	lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1138 
1139 	lockrep->lock_policy_res2 =
1140 		ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1141 
1142 	rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1143 	if (rc)
1144 		goto out;
1145 
1146 	rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1147 
1148 out:
1149 	minfo->mi_cb(req, minfo, rc);
1150 	return 0;
1151 }
1152 
mdc_intent_getattr_async(struct obd_export * exp,struct md_enqueue_info * minfo)1153 int mdc_intent_getattr_async(struct obd_export *exp,
1154 			     struct md_enqueue_info *minfo)
1155 {
1156 	struct md_op_data       *op_data = &minfo->mi_data;
1157 	struct lookup_intent    *it = &minfo->mi_it;
1158 	struct ptlrpc_request   *req;
1159 	struct mdc_getattr_args *ga;
1160 	struct obd_device       *obddev = class_exp2obd(exp);
1161 	struct ldlm_res_id       res_id;
1162 	union ldlm_policy_data policy = {
1163 		.l_inodebits = { MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE }
1164 	};
1165 	int		      rc = 0;
1166 	__u64		    flags = LDLM_FL_HAS_INTENT;
1167 
1168 	CDEBUG(D_DLMTRACE,
1169 	       "name: %.*s in inode " DFID ", intent: %s flags %#Lo\n",
1170 	       (int)op_data->op_namelen, op_data->op_name,
1171 	       PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1172 
1173 	fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1174 	req = mdc_intent_getattr_pack(exp, it, op_data);
1175 	if (IS_ERR(req))
1176 		return PTR_ERR(req);
1177 
1178 	rc = obd_get_request_slot(&obddev->u.cli);
1179 	if (rc != 0) {
1180 		ptlrpc_req_finished(req);
1181 		return rc;
1182 	}
1183 
1184 	rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1185 			      &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1186 	if (rc < 0) {
1187 		obd_put_request_slot(&obddev->u.cli);
1188 		ptlrpc_req_finished(req);
1189 		return rc;
1190 	}
1191 
1192 	BUILD_BUG_ON(sizeof(*ga) > sizeof(req->rq_async_args));
1193 	ga = ptlrpc_req_async_args(req);
1194 	ga->ga_exp = exp;
1195 	ga->ga_minfo = minfo;
1196 
1197 	req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1198 	ptlrpcd_add_req(req);
1199 
1200 	return 0;
1201 }
1202