• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2010, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_lockd.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41 
42 #define DEBUG_SUBSYSTEM S_LDLM
43 
44 #include "../../include/linux/libcfs/libcfs.h"
45 #include "../include/lustre_dlm.h"
46 #include "../include/obd_class.h"
47 #include <linux/list.h>
48 #include "ldlm_internal.h"
49 
50 static int ldlm_num_threads;
51 module_param(ldlm_num_threads, int, 0444);
52 MODULE_PARM_DESC(ldlm_num_threads, "number of DLM service threads to start");
53 
54 static char *ldlm_cpts;
55 module_param(ldlm_cpts, charp, 0444);
56 MODULE_PARM_DESC(ldlm_cpts, "CPU partitions ldlm threads should run on");
57 
58 static struct mutex	ldlm_ref_mutex;
59 static int ldlm_refcount;
60 
61 static struct kobject *ldlm_kobj;
62 struct kset *ldlm_ns_kset;
63 static struct kset *ldlm_svc_kset;
64 
65 struct ldlm_cb_async_args {
66 	struct ldlm_cb_set_arg *ca_set_arg;
67 	struct ldlm_lock       *ca_lock;
68 };
69 
70 /* LDLM state */
71 
72 static struct ldlm_state *ldlm_state;
73 
74 #define ELT_STOPPED   0
75 #define ELT_READY     1
76 #define ELT_TERMINATE 2
77 
78 struct ldlm_bl_pool {
79 	spinlock_t		blp_lock;
80 
81 	/*
82 	 * blp_prio_list is used for callbacks that should be handled
83 	 * as a priority. It is used for LDLM_FL_DISCARD_DATA requests.
84 	 * see bug 13843
85 	 */
86 	struct list_head	      blp_prio_list;
87 
88 	/*
89 	 * blp_list is used for all other callbacks which are likely
90 	 * to take longer to process.
91 	 */
92 	struct list_head	      blp_list;
93 
94 	wait_queue_head_t	     blp_waitq;
95 	struct completion	blp_comp;
96 	atomic_t	    blp_num_threads;
97 	atomic_t	    blp_busy_threads;
98 	int		     blp_min_threads;
99 	int		     blp_max_threads;
100 };
101 
102 struct ldlm_bl_work_item {
103 	struct list_head	      blwi_entry;
104 	struct ldlm_namespace  *blwi_ns;
105 	struct ldlm_lock_desc   blwi_ld;
106 	struct ldlm_lock       *blwi_lock;
107 	struct list_head	      blwi_head;
108 	int		     blwi_count;
109 	struct completion	blwi_comp;
110 	ldlm_cancel_flags_t     blwi_flags;
111 	int		     blwi_mem_pressure;
112 };
113 
114 /**
115  * Callback handler for receiving incoming blocking ASTs.
116  *
117  * This can only happen on client side.
118  */
ldlm_handle_bl_callback(struct ldlm_namespace * ns,struct ldlm_lock_desc * ld,struct ldlm_lock * lock)119 void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
120 			     struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
121 {
122 	int do_ast;
123 
124 	LDLM_DEBUG(lock, "client blocking AST callback handler");
125 
126 	lock_res_and_lock(lock);
127 	lock->l_flags |= LDLM_FL_CBPENDING;
128 
129 	if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)
130 		lock->l_flags |= LDLM_FL_CANCEL;
131 
132 	do_ast = !lock->l_readers && !lock->l_writers;
133 	unlock_res_and_lock(lock);
134 
135 	if (do_ast) {
136 		CDEBUG(D_DLMTRACE,
137 		       "Lock %p already unused, calling callback (%p)\n", lock,
138 		       lock->l_blocking_ast);
139 		if (lock->l_blocking_ast != NULL)
140 			lock->l_blocking_ast(lock, ld, lock->l_ast_data,
141 					     LDLM_CB_BLOCKING);
142 	} else {
143 		CDEBUG(D_DLMTRACE,
144 		       "Lock %p is referenced, will be cancelled later\n",
145 		       lock);
146 	}
147 
148 	LDLM_DEBUG(lock, "client blocking callback handler END");
149 	LDLM_LOCK_RELEASE(lock);
150 }
151 
152 /**
153  * Callback handler for receiving incoming completion ASTs.
154  *
155  * This only can happen on client side.
156  */
ldlm_handle_cp_callback(struct ptlrpc_request * req,struct ldlm_namespace * ns,struct ldlm_request * dlm_req,struct ldlm_lock * lock)157 static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
158 				    struct ldlm_namespace *ns,
159 				    struct ldlm_request *dlm_req,
160 				    struct ldlm_lock *lock)
161 {
162 	int lvb_len;
163 	LIST_HEAD(ast_list);
164 	int rc = 0;
165 
166 	LDLM_DEBUG(lock, "client completion callback handler START");
167 
168 	if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
169 		int to = cfs_time_seconds(1);
170 
171 		while (to > 0) {
172 			set_current_state(TASK_INTERRUPTIBLE);
173 			schedule_timeout(to);
174 			if (lock->l_granted_mode == lock->l_req_mode ||
175 			    lock->l_flags & LDLM_FL_DESTROYED)
176 				break;
177 		}
178 	}
179 
180 	lvb_len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT);
181 	if (lvb_len < 0) {
182 		LDLM_ERROR(lock, "Fail to get lvb_len, rc = %d", lvb_len);
183 		rc = lvb_len;
184 		goto out;
185 	} else if (lvb_len > 0) {
186 		if (lock->l_lvb_len > 0) {
187 			/* for extent lock, lvb contains ost_lvb{}. */
188 			LASSERT(lock->l_lvb_data != NULL);
189 
190 			if (unlikely(lock->l_lvb_len < lvb_len)) {
191 				LDLM_ERROR(lock, "Replied LVB is larger than expectation, expected = %d, replied = %d",
192 					   lock->l_lvb_len, lvb_len);
193 				rc = -EINVAL;
194 				goto out;
195 			}
196 		} else if (ldlm_has_layout(lock)) { /* for layout lock, lvb has
197 						     * variable length */
198 			void *lvb_data;
199 
200 			lvb_data = kzalloc(lvb_len, GFP_NOFS);
201 			if (!lvb_data) {
202 				LDLM_ERROR(lock, "No memory: %d.\n", lvb_len);
203 				rc = -ENOMEM;
204 				goto out;
205 			}
206 
207 			lock_res_and_lock(lock);
208 			LASSERT(lock->l_lvb_data == NULL);
209 			lock->l_lvb_type = LVB_T_LAYOUT;
210 			lock->l_lvb_data = lvb_data;
211 			lock->l_lvb_len = lvb_len;
212 			unlock_res_and_lock(lock);
213 		}
214 	}
215 
216 	lock_res_and_lock(lock);
217 	if ((lock->l_flags & LDLM_FL_DESTROYED) ||
218 	    lock->l_granted_mode == lock->l_req_mode) {
219 		/* bug 11300: the lock has already been granted */
220 		unlock_res_and_lock(lock);
221 		LDLM_DEBUG(lock, "Double grant race happened");
222 		rc = 0;
223 		goto out;
224 	}
225 
226 	/* If we receive the completion AST before the actual enqueue returned,
227 	 * then we might need to switch lock modes, resources, or extents. */
228 	if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
229 		lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
230 		LDLM_DEBUG(lock, "completion AST, new lock mode");
231 	}
232 
233 	if (lock->l_resource->lr_type != LDLM_PLAIN) {
234 		ldlm_convert_policy_to_local(req->rq_export,
235 					  dlm_req->lock_desc.l_resource.lr_type,
236 					  &dlm_req->lock_desc.l_policy_data,
237 					  &lock->l_policy_data);
238 		LDLM_DEBUG(lock, "completion AST, new policy data");
239 	}
240 
241 	ldlm_resource_unlink_lock(lock);
242 	if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
243 		   &lock->l_resource->lr_name,
244 		   sizeof(lock->l_resource->lr_name)) != 0) {
245 		unlock_res_and_lock(lock);
246 		rc = ldlm_lock_change_resource(ns, lock,
247 				&dlm_req->lock_desc.l_resource.lr_name);
248 		if (rc < 0) {
249 			LDLM_ERROR(lock, "Failed to allocate resource");
250 			goto out;
251 		}
252 		LDLM_DEBUG(lock, "completion AST, new resource");
253 		CERROR("change resource!\n");
254 		lock_res_and_lock(lock);
255 	}
256 
257 	if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
258 		/* BL_AST locks are not needed in LRU.
259 		 * Let ldlm_cancel_lru() be fast. */
260 		ldlm_lock_remove_from_lru(lock);
261 		lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
262 		LDLM_DEBUG(lock, "completion AST includes blocking AST");
263 	}
264 
265 	if (lock->l_lvb_len > 0) {
266 		rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_CLIENT,
267 				   lock->l_lvb_data, lvb_len);
268 		if (rc < 0) {
269 			unlock_res_and_lock(lock);
270 			goto out;
271 		}
272 	}
273 
274 	ldlm_grant_lock(lock, &ast_list);
275 	unlock_res_and_lock(lock);
276 
277 	LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
278 
279 	/* Let Enqueue to call osc_lock_upcall() and initialize
280 	 * l_ast_data */
281 	OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2);
282 
283 	ldlm_run_ast_work(ns, &ast_list, LDLM_WORK_CP_AST);
284 
285 	LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
286 			  lock);
287 	goto out;
288 
289 out:
290 	if (rc < 0) {
291 		lock_res_and_lock(lock);
292 		lock->l_flags |= LDLM_FL_FAILED;
293 		unlock_res_and_lock(lock);
294 		wake_up(&lock->l_waitq);
295 	}
296 	LDLM_LOCK_RELEASE(lock);
297 }
298 
299 /**
300  * Callback handler for receiving incoming glimpse ASTs.
301  *
302  * This only can happen on client side.  After handling the glimpse AST
303  * we also consider dropping the lock here if it is unused locally for a
304  * long time.
305  */
ldlm_handle_gl_callback(struct ptlrpc_request * req,struct ldlm_namespace * ns,struct ldlm_request * dlm_req,struct ldlm_lock * lock)306 static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
307 				    struct ldlm_namespace *ns,
308 				    struct ldlm_request *dlm_req,
309 				    struct ldlm_lock *lock)
310 {
311 	int rc = -ENOSYS;
312 
313 	LDLM_DEBUG(lock, "client glimpse AST callback handler");
314 
315 	if (lock->l_glimpse_ast != NULL)
316 		rc = lock->l_glimpse_ast(lock, req);
317 
318 	if (req->rq_repmsg != NULL) {
319 		ptlrpc_reply(req);
320 	} else {
321 		req->rq_status = rc;
322 		ptlrpc_error(req);
323 	}
324 
325 	lock_res_and_lock(lock);
326 	if (lock->l_granted_mode == LCK_PW &&
327 	    !lock->l_readers && !lock->l_writers &&
328 	    cfs_time_after(cfs_time_current(),
329 			   cfs_time_add(lock->l_last_used,
330 					cfs_time_seconds(10)))) {
331 		unlock_res_and_lock(lock);
332 		if (ldlm_bl_to_thread_lock(ns, NULL, lock))
333 			ldlm_handle_bl_callback(ns, NULL, lock);
334 
335 		return;
336 	}
337 	unlock_res_and_lock(lock);
338 	LDLM_LOCK_RELEASE(lock);
339 }
340 
ldlm_callback_reply(struct ptlrpc_request * req,int rc)341 static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
342 {
343 	if (req->rq_no_reply)
344 		return 0;
345 
346 	req->rq_status = rc;
347 	if (!req->rq_packed_final) {
348 		rc = lustre_pack_reply(req, 1, NULL, NULL);
349 		if (rc)
350 			return rc;
351 	}
352 	return ptlrpc_reply(req);
353 }
354 
__ldlm_bl_to_thread(struct ldlm_bl_work_item * blwi,ldlm_cancel_flags_t cancel_flags)355 static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi,
356 			       ldlm_cancel_flags_t cancel_flags)
357 {
358 	struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
359 
360 	spin_lock(&blp->blp_lock);
361 	if (blwi->blwi_lock &&
362 	    blwi->blwi_lock->l_flags & LDLM_FL_DISCARD_DATA) {
363 		/* add LDLM_FL_DISCARD_DATA requests to the priority list */
364 		list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list);
365 	} else {
366 		/* other blocking callbacks are added to the regular list */
367 		list_add_tail(&blwi->blwi_entry, &blp->blp_list);
368 	}
369 	spin_unlock(&blp->blp_lock);
370 
371 	wake_up(&blp->blp_waitq);
372 
373 	/* can not check blwi->blwi_flags as blwi could be already freed in
374 	   LCF_ASYNC mode */
375 	if (!(cancel_flags & LCF_ASYNC))
376 		wait_for_completion(&blwi->blwi_comp);
377 
378 	return 0;
379 }
380 
init_blwi(struct ldlm_bl_work_item * blwi,struct ldlm_namespace * ns,struct ldlm_lock_desc * ld,struct list_head * cancels,int count,struct ldlm_lock * lock,ldlm_cancel_flags_t cancel_flags)381 static inline void init_blwi(struct ldlm_bl_work_item *blwi,
382 			     struct ldlm_namespace *ns,
383 			     struct ldlm_lock_desc *ld,
384 			     struct list_head *cancels, int count,
385 			     struct ldlm_lock *lock,
386 			     ldlm_cancel_flags_t cancel_flags)
387 {
388 	init_completion(&blwi->blwi_comp);
389 	INIT_LIST_HEAD(&blwi->blwi_head);
390 
391 	if (memory_pressure_get())
392 		blwi->blwi_mem_pressure = 1;
393 
394 	blwi->blwi_ns = ns;
395 	blwi->blwi_flags = cancel_flags;
396 	if (ld != NULL)
397 		blwi->blwi_ld = *ld;
398 	if (count) {
399 		list_add(&blwi->blwi_head, cancels);
400 		list_del_init(cancels);
401 		blwi->blwi_count = count;
402 	} else {
403 		blwi->blwi_lock = lock;
404 	}
405 }
406 
407 /**
408  * Queues a list of locks \a cancels containing \a count locks
409  * for later processing by a blocking thread.  If \a count is zero,
410  * then the lock referenced as \a lock is queued instead.
411  *
412  * The blocking thread would then call ->l_blocking_ast callback in the lock.
413  * If list addition fails an error is returned and caller is supposed to
414  * call ->l_blocking_ast itself.
415  */
ldlm_bl_to_thread(struct ldlm_namespace * ns,struct ldlm_lock_desc * ld,struct ldlm_lock * lock,struct list_head * cancels,int count,ldlm_cancel_flags_t cancel_flags)416 static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
417 			     struct ldlm_lock_desc *ld,
418 			     struct ldlm_lock *lock,
419 			     struct list_head *cancels, int count,
420 			     ldlm_cancel_flags_t cancel_flags)
421 {
422 	if (cancels && count == 0)
423 		return 0;
424 
425 	if (cancel_flags & LCF_ASYNC) {
426 		struct ldlm_bl_work_item *blwi;
427 
428 		blwi = kzalloc(sizeof(*blwi), GFP_NOFS);
429 		if (!blwi)
430 			return -ENOMEM;
431 		init_blwi(blwi, ns, ld, cancels, count, lock, cancel_flags);
432 
433 		return __ldlm_bl_to_thread(blwi, cancel_flags);
434 	} else {
435 		/* if it is synchronous call do minimum mem alloc, as it could
436 		 * be triggered from kernel shrinker
437 		 */
438 		struct ldlm_bl_work_item blwi;
439 
440 		memset(&blwi, 0, sizeof(blwi));
441 		init_blwi(&blwi, ns, ld, cancels, count, lock, cancel_flags);
442 		return __ldlm_bl_to_thread(&blwi, cancel_flags);
443 	}
444 }
445 
ldlm_bl_to_thread_lock(struct ldlm_namespace * ns,struct ldlm_lock_desc * ld,struct ldlm_lock * lock)446 int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
447 			   struct ldlm_lock *lock)
448 {
449 	return ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LCF_ASYNC);
450 }
451 
ldlm_bl_to_thread_list(struct ldlm_namespace * ns,struct ldlm_lock_desc * ld,struct list_head * cancels,int count,ldlm_cancel_flags_t cancel_flags)452 int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
453 			   struct list_head *cancels, int count,
454 			   ldlm_cancel_flags_t cancel_flags)
455 {
456 	return ldlm_bl_to_thread(ns, ld, NULL, cancels, count, cancel_flags);
457 }
458 
459 /* Setinfo coming from Server (eg MDT) to Client (eg MDC)! */
ldlm_handle_setinfo(struct ptlrpc_request * req)460 static int ldlm_handle_setinfo(struct ptlrpc_request *req)
461 {
462 	struct obd_device *obd = req->rq_export->exp_obd;
463 	char *key;
464 	void *val;
465 	int keylen, vallen;
466 	int rc = -ENOSYS;
467 
468 	DEBUG_REQ(D_HSM, req, "%s: handle setinfo\n", obd->obd_name);
469 
470 	req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
471 
472 	key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
473 	if (key == NULL) {
474 		DEBUG_REQ(D_IOCTL, req, "no set_info key");
475 		return -EFAULT;
476 	}
477 	keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY,
478 				      RCL_CLIENT);
479 	val = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
480 	if (val == NULL) {
481 		DEBUG_REQ(D_IOCTL, req, "no set_info val");
482 		return -EFAULT;
483 	}
484 	vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL,
485 				      RCL_CLIENT);
486 
487 	/* We are responsible for swabbing contents of val */
488 
489 	if (KEY_IS(KEY_HSM_COPYTOOL_SEND))
490 		/* Pass it on to mdc (the "export" in this case) */
491 		rc = obd_set_info_async(req->rq_svc_thread->t_env,
492 					req->rq_export,
493 					sizeof(KEY_HSM_COPYTOOL_SEND),
494 					KEY_HSM_COPYTOOL_SEND,
495 					vallen, val, NULL);
496 	else
497 		DEBUG_REQ(D_WARNING, req, "ignoring unknown key %s", key);
498 
499 	return rc;
500 }
501 
ldlm_callback_errmsg(struct ptlrpc_request * req,const char * msg,int rc,struct lustre_handle * handle)502 static inline void ldlm_callback_errmsg(struct ptlrpc_request *req,
503 					const char *msg, int rc,
504 					struct lustre_handle *handle)
505 {
506 	DEBUG_REQ((req->rq_no_reply || rc) ? D_WARNING : D_DLMTRACE, req,
507 		  "%s: [nid %s] [rc %d] [lock %#llx]",
508 		  msg, libcfs_id2str(req->rq_peer), rc,
509 		  handle ? handle->cookie : 0);
510 	if (req->rq_no_reply)
511 		CWARN("No reply was sent, maybe cause bug 21636.\n");
512 	else if (rc)
513 		CWARN("Send reply failed, maybe cause bug 21636.\n");
514 }
515 
ldlm_handle_qc_callback(struct ptlrpc_request * req)516 static int ldlm_handle_qc_callback(struct ptlrpc_request *req)
517 {
518 	struct obd_quotactl *oqctl;
519 	struct client_obd *cli = &req->rq_export->exp_obd->u.cli;
520 
521 	oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
522 	if (oqctl == NULL) {
523 		CERROR("Can't unpack obd_quotactl\n");
524 		return -EPROTO;
525 	}
526 
527 	oqctl->qc_stat = ptlrpc_status_ntoh(oqctl->qc_stat);
528 
529 	cli->cl_qchk_stat = oqctl->qc_stat;
530 	return 0;
531 }
532 
533 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
ldlm_callback_handler(struct ptlrpc_request * req)534 static int ldlm_callback_handler(struct ptlrpc_request *req)
535 {
536 	struct ldlm_namespace *ns;
537 	struct ldlm_request *dlm_req;
538 	struct ldlm_lock *lock;
539 	int rc;
540 
541 	/* Requests arrive in sender's byte order.  The ptlrpc service
542 	 * handler has already checked and, if necessary, byte-swapped the
543 	 * incoming request message body, but I am responsible for the
544 	 * message buffers. */
545 
546 	/* do nothing for sec context finalize */
547 	if (lustre_msg_get_opc(req->rq_reqmsg) == SEC_CTX_FINI)
548 		return 0;
549 
550 	req_capsule_init(&req->rq_pill, req, RCL_SERVER);
551 
552 	if (req->rq_export == NULL) {
553 		rc = ldlm_callback_reply(req, -ENOTCONN);
554 		ldlm_callback_errmsg(req, "Operate on unconnected server",
555 				     rc, NULL);
556 		return 0;
557 	}
558 
559 	LASSERT(req->rq_export != NULL);
560 	LASSERT(req->rq_export->exp_obd != NULL);
561 
562 	switch (lustre_msg_get_opc(req->rq_reqmsg)) {
563 	case LDLM_BL_CALLBACK:
564 		if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET))
565 			return 0;
566 		break;
567 	case LDLM_CP_CALLBACK:
568 		if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CP_CALLBACK_NET))
569 			return 0;
570 		break;
571 	case LDLM_GL_CALLBACK:
572 		if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GL_CALLBACK_NET))
573 			return 0;
574 		break;
575 	case LDLM_SET_INFO:
576 		rc = ldlm_handle_setinfo(req);
577 		ldlm_callback_reply(req, rc);
578 		return 0;
579 	case OBD_QC_CALLBACK:
580 		req_capsule_set(&req->rq_pill, &RQF_QC_CALLBACK);
581 		if (OBD_FAIL_CHECK(OBD_FAIL_OBD_QC_CALLBACK_NET))
582 			return 0;
583 		rc = ldlm_handle_qc_callback(req);
584 		ldlm_callback_reply(req, rc);
585 		return 0;
586 	default:
587 		CERROR("unknown opcode %u\n",
588 		       lustre_msg_get_opc(req->rq_reqmsg));
589 		ldlm_callback_reply(req, -EPROTO);
590 		return 0;
591 	}
592 
593 	ns = req->rq_export->exp_obd->obd_namespace;
594 	LASSERT(ns != NULL);
595 
596 	req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
597 
598 	dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
599 	if (dlm_req == NULL) {
600 		rc = ldlm_callback_reply(req, -EPROTO);
601 		ldlm_callback_errmsg(req, "Operate without parameter", rc,
602 				     NULL);
603 		return 0;
604 	}
605 
606 	/* Force a known safe race, send a cancel to the server for a lock
607 	 * which the server has already started a blocking callback on. */
608 	if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE) &&
609 	    lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
610 		rc = ldlm_cli_cancel(&dlm_req->lock_handle[0], 0);
611 		if (rc < 0)
612 			CERROR("ldlm_cli_cancel: %d\n", rc);
613 	}
614 
615 	lock = ldlm_handle2lock_long(&dlm_req->lock_handle[0], 0);
616 	if (!lock) {
617 		CDEBUG(D_DLMTRACE, "callback on lock %#llx - lock disappeared\n",
618 		       dlm_req->lock_handle[0].cookie);
619 		rc = ldlm_callback_reply(req, -EINVAL);
620 		ldlm_callback_errmsg(req, "Operate with invalid parameter", rc,
621 				     &dlm_req->lock_handle[0]);
622 		return 0;
623 	}
624 
625 	if ((lock->l_flags & LDLM_FL_FAIL_LOC) &&
626 	    lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK)
627 		OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
628 
629 	/* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
630 	lock_res_and_lock(lock);
631 	lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags &
632 					      LDLM_AST_FLAGS);
633 	if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
634 		/* If somebody cancels lock and cache is already dropped,
635 		 * or lock is failed before cp_ast received on client,
636 		 * we can tell the server we have no lock. Otherwise, we
637 		 * should send cancel after dropping the cache. */
638 		if (((lock->l_flags & LDLM_FL_CANCELING) &&
639 		    (lock->l_flags & LDLM_FL_BL_DONE)) ||
640 		    (lock->l_flags & LDLM_FL_FAILED)) {
641 			LDLM_DEBUG(lock, "callback on lock %#llx - lock disappeared\n",
642 				   dlm_req->lock_handle[0].cookie);
643 			unlock_res_and_lock(lock);
644 			LDLM_LOCK_RELEASE(lock);
645 			rc = ldlm_callback_reply(req, -EINVAL);
646 			ldlm_callback_errmsg(req, "Operate on stale lock", rc,
647 					     &dlm_req->lock_handle[0]);
648 			return 0;
649 		}
650 		/* BL_AST locks are not needed in LRU.
651 		 * Let ldlm_cancel_lru() be fast. */
652 		ldlm_lock_remove_from_lru(lock);
653 		lock->l_flags |= LDLM_FL_BL_AST;
654 	}
655 	unlock_res_and_lock(lock);
656 
657 	/* We want the ost thread to get this reply so that it can respond
658 	 * to ost requests (write cache writeback) that might be triggered
659 	 * in the callback.
660 	 *
661 	 * But we'd also like to be able to indicate in the reply that we're
662 	 * cancelling right now, because it's unused, or have an intent result
663 	 * in the reply, so we might have to push the responsibility for sending
664 	 * the reply down into the AST handlers, alas. */
665 
666 	switch (lustre_msg_get_opc(req->rq_reqmsg)) {
667 	case LDLM_BL_CALLBACK:
668 		CDEBUG(D_INODE, "blocking ast\n");
669 		req_capsule_extend(&req->rq_pill, &RQF_LDLM_BL_CALLBACK);
670 		if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)) {
671 			rc = ldlm_callback_reply(req, 0);
672 			if (req->rq_no_reply || rc)
673 				ldlm_callback_errmsg(req, "Normal process", rc,
674 						     &dlm_req->lock_handle[0]);
675 		}
676 		if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock))
677 			ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
678 		break;
679 	case LDLM_CP_CALLBACK:
680 		CDEBUG(D_INODE, "completion ast\n");
681 		req_capsule_extend(&req->rq_pill, &RQF_LDLM_CP_CALLBACK);
682 		ldlm_callback_reply(req, 0);
683 		ldlm_handle_cp_callback(req, ns, dlm_req, lock);
684 		break;
685 	case LDLM_GL_CALLBACK:
686 		CDEBUG(D_INODE, "glimpse ast\n");
687 		req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
688 		ldlm_handle_gl_callback(req, ns, dlm_req, lock);
689 		break;
690 	default:
691 		LBUG();			 /* checked above */
692 	}
693 
694 	return 0;
695 }
696 
ldlm_bl_get_work(struct ldlm_bl_pool * blp)697 static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp)
698 {
699 	struct ldlm_bl_work_item *blwi = NULL;
700 	static unsigned int num_bl;
701 
702 	spin_lock(&blp->blp_lock);
703 	/* process a request from the blp_list at least every blp_num_threads */
704 	if (!list_empty(&blp->blp_list) &&
705 	    (list_empty(&blp->blp_prio_list) || num_bl == 0))
706 		blwi = list_entry(blp->blp_list.next,
707 				      struct ldlm_bl_work_item, blwi_entry);
708 	else
709 		if (!list_empty(&blp->blp_prio_list))
710 			blwi = list_entry(blp->blp_prio_list.next,
711 					      struct ldlm_bl_work_item,
712 					      blwi_entry);
713 
714 	if (blwi) {
715 		if (++num_bl >= atomic_read(&blp->blp_num_threads))
716 			num_bl = 0;
717 		list_del(&blwi->blwi_entry);
718 	}
719 	spin_unlock(&blp->blp_lock);
720 
721 	return blwi;
722 }
723 
724 /* This only contains temporary data until the thread starts */
725 struct ldlm_bl_thread_data {
726 	char			bltd_name[CFS_CURPROC_COMM_MAX];
727 	struct ldlm_bl_pool	*bltd_blp;
728 	struct completion	bltd_comp;
729 	int			bltd_num;
730 };
731 
732 static int ldlm_bl_thread_main(void *arg);
733 
ldlm_bl_thread_start(struct ldlm_bl_pool * blp)734 static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp)
735 {
736 	struct ldlm_bl_thread_data bltd = { .bltd_blp = blp };
737 	struct task_struct *task;
738 
739 	init_completion(&bltd.bltd_comp);
740 	bltd.bltd_num = atomic_read(&blp->blp_num_threads);
741 	snprintf(bltd.bltd_name, sizeof(bltd.bltd_name),
742 		"ldlm_bl_%02d", bltd.bltd_num);
743 	task = kthread_run(ldlm_bl_thread_main, &bltd, "%s", bltd.bltd_name);
744 	if (IS_ERR(task)) {
745 		CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %ld\n",
746 		       atomic_read(&blp->blp_num_threads), PTR_ERR(task));
747 		return PTR_ERR(task);
748 	}
749 	wait_for_completion(&bltd.bltd_comp);
750 
751 	return 0;
752 }
753 
754 /**
755  * Main blocking requests processing thread.
756  *
757  * Callers put locks into its queue by calling ldlm_bl_to_thread.
758  * This thread in the end ends up doing actual call to ->l_blocking_ast
759  * for queued locks.
760  */
ldlm_bl_thread_main(void * arg)761 static int ldlm_bl_thread_main(void *arg)
762 {
763 	struct ldlm_bl_pool *blp;
764 
765 	{
766 		struct ldlm_bl_thread_data *bltd = arg;
767 
768 		blp = bltd->bltd_blp;
769 
770 		atomic_inc(&blp->blp_num_threads);
771 		atomic_inc(&blp->blp_busy_threads);
772 
773 		complete(&bltd->bltd_comp);
774 		/* cannot use bltd after this, it is only on caller's stack */
775 	}
776 
777 	while (1) {
778 		struct l_wait_info lwi = { 0 };
779 		struct ldlm_bl_work_item *blwi = NULL;
780 		int busy;
781 
782 		blwi = ldlm_bl_get_work(blp);
783 
784 		if (blwi == NULL) {
785 			atomic_dec(&blp->blp_busy_threads);
786 			l_wait_event_exclusive(blp->blp_waitq,
787 					 (blwi = ldlm_bl_get_work(blp)) != NULL,
788 					 &lwi);
789 			busy = atomic_inc_return(&blp->blp_busy_threads);
790 		} else {
791 			busy = atomic_read(&blp->blp_busy_threads);
792 		}
793 
794 		if (blwi->blwi_ns == NULL)
795 			/* added by ldlm_cleanup() */
796 			break;
797 
798 		/* Not fatal if racy and have a few too many threads */
799 		if (unlikely(busy < blp->blp_max_threads &&
800 			     busy >= atomic_read(&blp->blp_num_threads) &&
801 			     !blwi->blwi_mem_pressure))
802 			/* discard the return value, we tried */
803 			ldlm_bl_thread_start(blp);
804 
805 		if (blwi->blwi_mem_pressure)
806 			memory_pressure_set();
807 
808 		if (blwi->blwi_count) {
809 			int count;
810 			/* The special case when we cancel locks in LRU
811 			 * asynchronously, we pass the list of locks here.
812 			 * Thus locks are marked LDLM_FL_CANCELING, but NOT
813 			 * canceled locally yet. */
814 			count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
815 							   blwi->blwi_count,
816 							   LCF_BL_AST);
817 			ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL,
818 					     blwi->blwi_flags);
819 		} else {
820 			ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
821 						blwi->blwi_lock);
822 		}
823 		if (blwi->blwi_mem_pressure)
824 			memory_pressure_clr();
825 
826 		if (blwi->blwi_flags & LCF_ASYNC)
827 			kfree(blwi);
828 		else
829 			complete(&blwi->blwi_comp);
830 	}
831 
832 	atomic_dec(&blp->blp_busy_threads);
833 	atomic_dec(&blp->blp_num_threads);
834 	complete(&blp->blp_comp);
835 	return 0;
836 }
837 
838 static int ldlm_setup(void);
839 static int ldlm_cleanup(void);
840 
ldlm_get_ref(void)841 int ldlm_get_ref(void)
842 {
843 	int rc = 0;
844 
845 	mutex_lock(&ldlm_ref_mutex);
846 	if (++ldlm_refcount == 1) {
847 		rc = ldlm_setup();
848 		if (rc)
849 			ldlm_refcount--;
850 	}
851 	mutex_unlock(&ldlm_ref_mutex);
852 
853 	return rc;
854 }
855 EXPORT_SYMBOL(ldlm_get_ref);
856 
ldlm_put_ref(void)857 void ldlm_put_ref(void)
858 {
859 	mutex_lock(&ldlm_ref_mutex);
860 	if (ldlm_refcount == 1) {
861 		int rc = ldlm_cleanup();
862 
863 		if (rc)
864 			CERROR("ldlm_cleanup failed: %d\n", rc);
865 		else
866 			ldlm_refcount--;
867 	} else {
868 		ldlm_refcount--;
869 	}
870 	mutex_unlock(&ldlm_ref_mutex);
871 }
872 EXPORT_SYMBOL(ldlm_put_ref);
873 
874 extern unsigned int ldlm_cancel_unused_locks_before_replay;
875 
cancel_unused_locks_before_replay_show(struct kobject * kobj,struct attribute * attr,char * buf)876 static ssize_t cancel_unused_locks_before_replay_show(struct kobject *kobj,
877 						      struct attribute *attr,
878 						      char *buf)
879 {
880 	return sprintf(buf, "%d\n", ldlm_cancel_unused_locks_before_replay);
881 }
882 
cancel_unused_locks_before_replay_store(struct kobject * kobj,struct attribute * attr,const char * buffer,size_t count)883 static ssize_t cancel_unused_locks_before_replay_store(struct kobject *kobj,
884 						       struct attribute *attr,
885 						       const char *buffer,
886 						       size_t count)
887 {
888 	int rc;
889 	unsigned long val;
890 
891 	rc = kstrtoul(buffer, 10, &val);
892 	if (rc)
893 		return rc;
894 
895 	ldlm_cancel_unused_locks_before_replay = val;
896 
897 	return count;
898 }
899 LUSTRE_RW_ATTR(cancel_unused_locks_before_replay);
900 
901 /* These are for root of /sys/fs/lustre/ldlm */
902 static struct attribute *ldlm_attrs[] = {
903 	&lustre_attr_cancel_unused_locks_before_replay.attr,
904 	NULL,
905 };
906 
907 static struct attribute_group ldlm_attr_group = {
908 	.attrs = ldlm_attrs,
909 };
910 
ldlm_setup(void)911 static int ldlm_setup(void)
912 {
913 	static struct ptlrpc_service_conf	conf;
914 	struct ldlm_bl_pool			*blp = NULL;
915 	int rc = 0;
916 	int i;
917 
918 	if (ldlm_state != NULL)
919 		return -EALREADY;
920 
921 	ldlm_state = kzalloc(sizeof(*ldlm_state), GFP_NOFS);
922 	if (!ldlm_state)
923 		return -ENOMEM;
924 
925 	ldlm_kobj = kobject_create_and_add("ldlm", lustre_kobj);
926 	if (!ldlm_kobj) {
927 		rc = -ENOMEM;
928 		goto out;
929 	}
930 
931 	rc = sysfs_create_group(ldlm_kobj, &ldlm_attr_group);
932 	if (rc)
933 		goto out;
934 
935 	ldlm_ns_kset = kset_create_and_add("namespaces", NULL, ldlm_kobj);
936 	if (!ldlm_ns_kset) {
937 		rc = -ENOMEM;
938 		goto out;
939 	}
940 
941 	ldlm_svc_kset = kset_create_and_add("services", NULL, ldlm_kobj);
942 	if (!ldlm_svc_kset) {
943 		rc = -ENOMEM;
944 		goto out;
945 	}
946 
947 	rc = ldlm_debugfs_setup();
948 	if (rc != 0)
949 		goto out;
950 
951 	memset(&conf, 0, sizeof(conf));
952 	conf = (typeof(conf)) {
953 		.psc_name		= "ldlm_cbd",
954 		.psc_watchdog_factor	= 2,
955 		.psc_buf		= {
956 			.bc_nbufs		= LDLM_CLIENT_NBUFS,
957 			.bc_buf_size		= LDLM_BUFSIZE,
958 			.bc_req_max_size	= LDLM_MAXREQSIZE,
959 			.bc_rep_max_size	= LDLM_MAXREPSIZE,
960 			.bc_req_portal		= LDLM_CB_REQUEST_PORTAL,
961 			.bc_rep_portal		= LDLM_CB_REPLY_PORTAL,
962 		},
963 		.psc_thr		= {
964 			.tc_thr_name		= "ldlm_cb",
965 			.tc_thr_factor		= LDLM_THR_FACTOR,
966 			.tc_nthrs_init		= LDLM_NTHRS_INIT,
967 			.tc_nthrs_base		= LDLM_NTHRS_BASE,
968 			.tc_nthrs_max		= LDLM_NTHRS_MAX,
969 			.tc_nthrs_user		= ldlm_num_threads,
970 			.tc_cpu_affinity	= 1,
971 			.tc_ctx_tags		= LCT_MD_THREAD | LCT_DT_THREAD,
972 		},
973 		.psc_cpt		= {
974 			.cc_pattern		= ldlm_cpts,
975 		},
976 		.psc_ops		= {
977 			.so_req_handler		= ldlm_callback_handler,
978 		},
979 	};
980 	ldlm_state->ldlm_cb_service =
981 			ptlrpc_register_service(&conf, ldlm_svc_kset,
982 						ldlm_svc_debugfs_dir);
983 	if (IS_ERR(ldlm_state->ldlm_cb_service)) {
984 		CERROR("failed to start service\n");
985 		rc = PTR_ERR(ldlm_state->ldlm_cb_service);
986 		ldlm_state->ldlm_cb_service = NULL;
987 		goto out;
988 	}
989 
990 	blp = kzalloc(sizeof(*blp), GFP_NOFS);
991 	if (!blp) {
992 		rc = -ENOMEM;
993 		goto out;
994 	}
995 	ldlm_state->ldlm_bl_pool = blp;
996 
997 	spin_lock_init(&blp->blp_lock);
998 	INIT_LIST_HEAD(&blp->blp_list);
999 	INIT_LIST_HEAD(&blp->blp_prio_list);
1000 	init_waitqueue_head(&blp->blp_waitq);
1001 	atomic_set(&blp->blp_num_threads, 0);
1002 	atomic_set(&blp->blp_busy_threads, 0);
1003 
1004 	if (ldlm_num_threads == 0) {
1005 		blp->blp_min_threads = LDLM_NTHRS_INIT;
1006 		blp->blp_max_threads = LDLM_NTHRS_MAX;
1007 	} else {
1008 		blp->blp_min_threads = blp->blp_max_threads =
1009 			min_t(int, LDLM_NTHRS_MAX, max_t(int, LDLM_NTHRS_INIT,
1010 							 ldlm_num_threads));
1011 	}
1012 
1013 	for (i = 0; i < blp->blp_min_threads; i++) {
1014 		rc = ldlm_bl_thread_start(blp);
1015 		if (rc < 0)
1016 			goto out;
1017 	}
1018 
1019 	rc = ldlm_pools_init();
1020 	if (rc) {
1021 		CERROR("Failed to initialize LDLM pools: %d\n", rc);
1022 		goto out;
1023 	}
1024 	return 0;
1025 
1026  out:
1027 	ldlm_cleanup();
1028 	return rc;
1029 }
1030 
ldlm_cleanup(void)1031 static int ldlm_cleanup(void)
1032 {
1033 	if (!list_empty(ldlm_namespace_list(LDLM_NAMESPACE_SERVER)) ||
1034 	    !list_empty(ldlm_namespace_list(LDLM_NAMESPACE_CLIENT))) {
1035 		CERROR("ldlm still has namespaces; clean these up first.\n");
1036 		ldlm_dump_all_namespaces(LDLM_NAMESPACE_SERVER, D_DLMTRACE);
1037 		ldlm_dump_all_namespaces(LDLM_NAMESPACE_CLIENT, D_DLMTRACE);
1038 		return -EBUSY;
1039 	}
1040 
1041 	ldlm_pools_fini();
1042 
1043 	if (ldlm_state->ldlm_bl_pool != NULL) {
1044 		struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
1045 
1046 		while (atomic_read(&blp->blp_num_threads) > 0) {
1047 			struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
1048 
1049 			init_completion(&blp->blp_comp);
1050 
1051 			spin_lock(&blp->blp_lock);
1052 			list_add_tail(&blwi.blwi_entry, &blp->blp_list);
1053 			wake_up(&blp->blp_waitq);
1054 			spin_unlock(&blp->blp_lock);
1055 
1056 			wait_for_completion(&blp->blp_comp);
1057 		}
1058 
1059 		kfree(blp);
1060 	}
1061 
1062 	if (ldlm_state->ldlm_cb_service != NULL)
1063 		ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
1064 
1065 	if (ldlm_ns_kset)
1066 		kset_unregister(ldlm_ns_kset);
1067 	if (ldlm_svc_kset)
1068 		kset_unregister(ldlm_svc_kset);
1069 	if (ldlm_kobj)
1070 		kobject_put(ldlm_kobj);
1071 
1072 	ldlm_debugfs_cleanup();
1073 
1074 	kfree(ldlm_state);
1075 	ldlm_state = NULL;
1076 
1077 	return 0;
1078 }
1079 
ldlm_init(void)1080 int ldlm_init(void)
1081 {
1082 	mutex_init(&ldlm_ref_mutex);
1083 	mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER));
1084 	mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT));
1085 	ldlm_resource_slab = kmem_cache_create("ldlm_resources",
1086 					       sizeof(struct ldlm_resource), 0,
1087 					       SLAB_HWCACHE_ALIGN, NULL);
1088 	if (ldlm_resource_slab == NULL)
1089 		return -ENOMEM;
1090 
1091 	ldlm_lock_slab = kmem_cache_create("ldlm_locks",
1092 			      sizeof(struct ldlm_lock), 0,
1093 			      SLAB_HWCACHE_ALIGN | SLAB_DESTROY_BY_RCU, NULL);
1094 	if (ldlm_lock_slab == NULL) {
1095 		kmem_cache_destroy(ldlm_resource_slab);
1096 		return -ENOMEM;
1097 	}
1098 
1099 	ldlm_interval_slab = kmem_cache_create("interval_node",
1100 					sizeof(struct ldlm_interval),
1101 					0, SLAB_HWCACHE_ALIGN, NULL);
1102 	if (ldlm_interval_slab == NULL) {
1103 		kmem_cache_destroy(ldlm_resource_slab);
1104 		kmem_cache_destroy(ldlm_lock_slab);
1105 		return -ENOMEM;
1106 	}
1107 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1108 	class_export_dump_hook = ldlm_dump_export_locks;
1109 #endif
1110 	return 0;
1111 }
1112 
ldlm_exit(void)1113 void ldlm_exit(void)
1114 {
1115 	if (ldlm_refcount)
1116 		CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
1117 	kmem_cache_destroy(ldlm_resource_slab);
1118 	/* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
1119 	 * synchronize_rcu() to wait a grace period elapsed, so that
1120 	 * ldlm_lock_free() get a chance to be called. */
1121 	synchronize_rcu();
1122 	kmem_cache_destroy(ldlm_lock_slab);
1123 	kmem_cache_destroy(ldlm_interval_slab);
1124 }
1125