• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41 
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include "../include/obd_class.h"
44 #include "../include/lprocfs_status.h"
45 
46 spinlock_t obd_types_lock;
47 
48 static struct kmem_cache *obd_device_cachep;
49 struct kmem_cache *obdo_cachep;
50 EXPORT_SYMBOL(obdo_cachep);
51 static struct kmem_cache *import_cachep;
52 
53 static struct list_head      obd_zombie_imports;
54 static struct list_head      obd_zombie_exports;
55 static spinlock_t  obd_zombie_impexp_lock;
56 static void obd_zombie_impexp_notify(void);
57 static void obd_zombie_export_add(struct obd_export *exp);
58 static void obd_zombie_import_add(struct obd_import *imp);
59 
60 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
61 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
62 
63 /*
64  * support functions: we could use inter-module communication, but this
65  * is more portable to other OS's
66  */
obd_device_alloc(void)67 static struct obd_device *obd_device_alloc(void)
68 {
69 	struct obd_device *obd;
70 
71 	obd = kmem_cache_alloc(obd_device_cachep, GFP_NOFS | __GFP_ZERO);
72 	if (obd != NULL)
73 		obd->obd_magic = OBD_DEVICE_MAGIC;
74 	return obd;
75 }
76 
obd_device_free(struct obd_device * obd)77 static void obd_device_free(struct obd_device *obd)
78 {
79 	LASSERT(obd != NULL);
80 	LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
81 		 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
82 	if (obd->obd_namespace != NULL) {
83 		CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
84 		       obd, obd->obd_namespace, obd->obd_force);
85 		LBUG();
86 	}
87 	lu_ref_fini(&obd->obd_reference);
88 	kmem_cache_free(obd_device_cachep, obd);
89 }
90 
class_search_type(const char * name)91 static struct obd_type *class_search_type(const char *name)
92 {
93 	struct list_head *tmp;
94 	struct obd_type *type;
95 
96 	spin_lock(&obd_types_lock);
97 	list_for_each(tmp, &obd_types) {
98 		type = list_entry(tmp, struct obd_type, typ_chain);
99 		if (strcmp(type->typ_name, name) == 0) {
100 			spin_unlock(&obd_types_lock);
101 			return type;
102 		}
103 	}
104 	spin_unlock(&obd_types_lock);
105 	return NULL;
106 }
107 
class_get_type(const char * name)108 static struct obd_type *class_get_type(const char *name)
109 {
110 	struct obd_type *type = class_search_type(name);
111 
112 	if (!type) {
113 		const char *modname = name;
114 
115 		if (strcmp(modname, "obdfilter") == 0)
116 			modname = "ofd";
117 
118 		if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
119 			modname = LUSTRE_OSP_NAME;
120 
121 		if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
122 			modname = LUSTRE_MDT_NAME;
123 
124 		if (!request_module("%s", modname)) {
125 			CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
126 			type = class_search_type(name);
127 		} else {
128 			LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
129 					   modname);
130 		}
131 	}
132 	if (type) {
133 		spin_lock(&type->obd_type_lock);
134 		type->typ_refcnt++;
135 		try_module_get(type->typ_dt_ops->o_owner);
136 		spin_unlock(&type->obd_type_lock);
137 	}
138 	return type;
139 }
140 
class_put_type(struct obd_type * type)141 void class_put_type(struct obd_type *type)
142 {
143 	LASSERT(type);
144 	spin_lock(&type->obd_type_lock);
145 	type->typ_refcnt--;
146 	module_put(type->typ_dt_ops->o_owner);
147 	spin_unlock(&type->obd_type_lock);
148 }
149 EXPORT_SYMBOL(class_put_type);
150 
151 #define CLASS_MAX_NAME 1024
152 
class_register_type(struct obd_ops * dt_ops,struct md_ops * md_ops,const char * name,struct lu_device_type * ldt)153 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
154 			const char *name,
155 			struct lu_device_type *ldt)
156 {
157 	struct obd_type *type;
158 	int rc = 0;
159 
160 	/* sanity check */
161 	LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
162 
163 	if (class_search_type(name)) {
164 		CDEBUG(D_IOCTL, "Type %s already registered\n", name);
165 		return -EEXIST;
166 	}
167 
168 	rc = -ENOMEM;
169 	type = kzalloc(sizeof(*type), GFP_NOFS);
170 	if (!type)
171 		return rc;
172 
173 	type->typ_dt_ops = kzalloc(sizeof(*type->typ_dt_ops), GFP_NOFS);
174 	type->typ_md_ops = kzalloc(sizeof(*type->typ_md_ops), GFP_NOFS);
175 	type->typ_name = kzalloc(strlen(name) + 1, GFP_NOFS);
176 
177 	if (!type->typ_dt_ops ||
178 	    !type->typ_md_ops ||
179 	    !type->typ_name)
180 		goto failed;
181 
182 	*(type->typ_dt_ops) = *dt_ops;
183 	/* md_ops is optional */
184 	if (md_ops)
185 		*(type->typ_md_ops) = *md_ops;
186 	strcpy(type->typ_name, name);
187 	spin_lock_init(&type->obd_type_lock);
188 
189 	type->typ_debugfs_entry = ldebugfs_register(type->typ_name,
190 						    debugfs_lustre_root,
191 						    NULL, type);
192 	if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
193 		rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
194 					     : -ENOMEM;
195 		type->typ_debugfs_entry = NULL;
196 		goto failed;
197 	}
198 
199 	type->typ_kobj = kobject_create_and_add(type->typ_name, lustre_kobj);
200 	if (!type->typ_kobj) {
201 		rc = -ENOMEM;
202 		goto failed;
203 	}
204 
205 	if (ldt != NULL) {
206 		type->typ_lu = ldt;
207 		rc = lu_device_type_init(ldt);
208 		if (rc != 0)
209 			goto failed;
210 	}
211 
212 	spin_lock(&obd_types_lock);
213 	list_add(&type->typ_chain, &obd_types);
214 	spin_unlock(&obd_types_lock);
215 
216 	return 0;
217 
218  failed:
219 	if (type->typ_kobj)
220 		kobject_put(type->typ_kobj);
221 	kfree(type->typ_name);
222 	kfree(type->typ_md_ops);
223 	kfree(type->typ_dt_ops);
224 	kfree(type);
225 	return rc;
226 }
227 EXPORT_SYMBOL(class_register_type);
228 
class_unregister_type(const char * name)229 int class_unregister_type(const char *name)
230 {
231 	struct obd_type *type = class_search_type(name);
232 
233 	if (!type) {
234 		CERROR("unknown obd type\n");
235 		return -EINVAL;
236 	}
237 
238 	if (type->typ_refcnt) {
239 		CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
240 		/* This is a bad situation, let's make the best of it */
241 		/* Remove ops, but leave the name for debugging */
242 		kfree(type->typ_dt_ops);
243 		kfree(type->typ_md_ops);
244 		return -EBUSY;
245 	}
246 
247 	if (type->typ_kobj)
248 		kobject_put(type->typ_kobj);
249 
250 	if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
251 		ldebugfs_remove(&type->typ_debugfs_entry);
252 
253 	if (type->typ_lu)
254 		lu_device_type_fini(type->typ_lu);
255 
256 	spin_lock(&obd_types_lock);
257 	list_del(&type->typ_chain);
258 	spin_unlock(&obd_types_lock);
259 	kfree(type->typ_name);
260 	kfree(type->typ_dt_ops);
261 	kfree(type->typ_md_ops);
262 	kfree(type);
263 	return 0;
264 } /* class_unregister_type */
265 EXPORT_SYMBOL(class_unregister_type);
266 
267 /**
268  * Create a new obd device.
269  *
270  * Find an empty slot in ::obd_devs[], create a new obd device in it.
271  *
272  * \param[in] type_name obd device type string.
273  * \param[in] name      obd device name.
274  *
275  * \retval NULL if create fails, otherwise return the obd device
276  *	 pointer created.
277  */
class_newdev(const char * type_name,const char * name)278 struct obd_device *class_newdev(const char *type_name, const char *name)
279 {
280 	struct obd_device *result = NULL;
281 	struct obd_device *newdev;
282 	struct obd_type *type = NULL;
283 	int i;
284 	int new_obd_minor = 0;
285 
286 	if (strlen(name) >= MAX_OBD_NAME) {
287 		CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
288 		return ERR_PTR(-EINVAL);
289 	}
290 
291 	type = class_get_type(type_name);
292 	if (!type) {
293 		CERROR("OBD: unknown type: %s\n", type_name);
294 		return ERR_PTR(-ENODEV);
295 	}
296 
297 	newdev = obd_device_alloc();
298 	if (!newdev) {
299 		result = ERR_PTR(-ENOMEM);
300 		goto out_type;
301 	}
302 
303 	LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
304 
305 	write_lock(&obd_dev_lock);
306 	for (i = 0; i < class_devno_max(); i++) {
307 		struct obd_device *obd = class_num2obd(i);
308 
309 		if (obd && (strcmp(name, obd->obd_name) == 0)) {
310 			CERROR("Device %s already exists at %d, won't add\n",
311 			       name, i);
312 			if (result) {
313 				LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
314 					 "%p obd_magic %08x != %08x\n", result,
315 					 result->obd_magic, OBD_DEVICE_MAGIC);
316 				LASSERTF(result->obd_minor == new_obd_minor,
317 					 "%p obd_minor %d != %d\n", result,
318 					 result->obd_minor, new_obd_minor);
319 
320 				obd_devs[result->obd_minor] = NULL;
321 				result->obd_name[0] = '\0';
322 			 }
323 			result = ERR_PTR(-EEXIST);
324 			break;
325 		}
326 		if (!result && !obd) {
327 			result = newdev;
328 			result->obd_minor = i;
329 			new_obd_minor = i;
330 			result->obd_type = type;
331 			strncpy(result->obd_name, name,
332 				sizeof(result->obd_name) - 1);
333 			obd_devs[i] = result;
334 		}
335 	}
336 	write_unlock(&obd_dev_lock);
337 
338 	if (!result && i >= class_devno_max()) {
339 		CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
340 		       class_devno_max());
341 		result = ERR_PTR(-EOVERFLOW);
342 		goto out;
343 	}
344 
345 	if (IS_ERR(result))
346 		goto out;
347 
348 	CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
349 	       result->obd_name, result);
350 
351 	return result;
352 out:
353 	obd_device_free(newdev);
354 out_type:
355 	class_put_type(type);
356 	return result;
357 }
358 
class_release_dev(struct obd_device * obd)359 void class_release_dev(struct obd_device *obd)
360 {
361 	struct obd_type *obd_type = obd->obd_type;
362 
363 	LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
364 		 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
365 	LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
366 		 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
367 	LASSERT(obd_type != NULL);
368 
369 	CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
370 	       obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
371 
372 	write_lock(&obd_dev_lock);
373 	obd_devs[obd->obd_minor] = NULL;
374 	write_unlock(&obd_dev_lock);
375 	obd_device_free(obd);
376 
377 	class_put_type(obd_type);
378 }
379 
class_name2dev(const char * name)380 int class_name2dev(const char *name)
381 {
382 	int i;
383 
384 	if (!name)
385 		return -1;
386 
387 	read_lock(&obd_dev_lock);
388 	for (i = 0; i < class_devno_max(); i++) {
389 		struct obd_device *obd = class_num2obd(i);
390 
391 		if (obd && strcmp(name, obd->obd_name) == 0) {
392 			/* Make sure we finished attaching before we give
393 			   out any references */
394 			LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
395 			if (obd->obd_attached) {
396 				read_unlock(&obd_dev_lock);
397 				return i;
398 			}
399 			break;
400 		}
401 	}
402 	read_unlock(&obd_dev_lock);
403 
404 	return -1;
405 }
406 EXPORT_SYMBOL(class_name2dev);
407 
class_name2obd(const char * name)408 struct obd_device *class_name2obd(const char *name)
409 {
410 	int dev = class_name2dev(name);
411 
412 	if (dev < 0 || dev > class_devno_max())
413 		return NULL;
414 	return class_num2obd(dev);
415 }
416 EXPORT_SYMBOL(class_name2obd);
417 
class_uuid2dev(struct obd_uuid * uuid)418 int class_uuid2dev(struct obd_uuid *uuid)
419 {
420 	int i;
421 
422 	read_lock(&obd_dev_lock);
423 	for (i = 0; i < class_devno_max(); i++) {
424 		struct obd_device *obd = class_num2obd(i);
425 
426 		if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
427 			LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
428 			read_unlock(&obd_dev_lock);
429 			return i;
430 		}
431 	}
432 	read_unlock(&obd_dev_lock);
433 
434 	return -1;
435 }
436 EXPORT_SYMBOL(class_uuid2dev);
437 
438 /**
439  * Get obd device from ::obd_devs[]
440  *
441  * \param num [in] array index
442  *
443  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
444  *	 otherwise return the obd device there.
445  */
class_num2obd(int num)446 struct obd_device *class_num2obd(int num)
447 {
448 	struct obd_device *obd = NULL;
449 
450 	if (num < class_devno_max()) {
451 		obd = obd_devs[num];
452 		if (!obd)
453 			return NULL;
454 
455 		LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
456 			 "%p obd_magic %08x != %08x\n",
457 			 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
458 		LASSERTF(obd->obd_minor == num,
459 			 "%p obd_minor %0d != %0d\n",
460 			 obd, obd->obd_minor, num);
461 	}
462 
463 	return obd;
464 }
465 EXPORT_SYMBOL(class_num2obd);
466 
467 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
468    specified, then only the client with that uuid is returned,
469    otherwise any client connected to the tgt is returned. */
class_find_client_obd(struct obd_uuid * tgt_uuid,const char * typ_name,struct obd_uuid * grp_uuid)470 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
471 					  const char *typ_name,
472 					  struct obd_uuid *grp_uuid)
473 {
474 	int i;
475 
476 	read_lock(&obd_dev_lock);
477 	for (i = 0; i < class_devno_max(); i++) {
478 		struct obd_device *obd = class_num2obd(i);
479 
480 		if (!obd)
481 			continue;
482 		if ((strncmp(obd->obd_type->typ_name, typ_name,
483 			     strlen(typ_name)) == 0)) {
484 			if (obd_uuid_equals(tgt_uuid,
485 					    &obd->u.cli.cl_target_uuid) &&
486 			    ((grp_uuid) ? obd_uuid_equals(grp_uuid,
487 							 &obd->obd_uuid) : 1)) {
488 				read_unlock(&obd_dev_lock);
489 				return obd;
490 			}
491 		}
492 	}
493 	read_unlock(&obd_dev_lock);
494 
495 	return NULL;
496 }
497 EXPORT_SYMBOL(class_find_client_obd);
498 
499 /* Iterate the obd_device list looking devices have grp_uuid. Start
500    searching at *next, and if a device is found, the next index to look
501    at is saved in *next. If next is NULL, then the first matching device
502    will always be returned. */
class_devices_in_group(struct obd_uuid * grp_uuid,int * next)503 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
504 {
505 	int i;
506 
507 	if (!next)
508 		i = 0;
509 	else if (*next >= 0 && *next < class_devno_max())
510 		i = *next;
511 	else
512 		return NULL;
513 
514 	read_lock(&obd_dev_lock);
515 	for (; i < class_devno_max(); i++) {
516 		struct obd_device *obd = class_num2obd(i);
517 
518 		if (!obd)
519 			continue;
520 		if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
521 			if (next)
522 				*next = i+1;
523 			read_unlock(&obd_dev_lock);
524 			return obd;
525 		}
526 	}
527 	read_unlock(&obd_dev_lock);
528 
529 	return NULL;
530 }
531 EXPORT_SYMBOL(class_devices_in_group);
532 
533 /**
534  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
535  * adjust sptlrpc settings accordingly.
536  */
class_notify_sptlrpc_conf(const char * fsname,int namelen)537 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
538 {
539 	struct obd_device  *obd;
540 	const char	 *type;
541 	int		 i, rc = 0, rc2;
542 
543 	LASSERT(namelen > 0);
544 
545 	read_lock(&obd_dev_lock);
546 	for (i = 0; i < class_devno_max(); i++) {
547 		obd = class_num2obd(i);
548 
549 		if (!obd || obd->obd_set_up == 0 || obd->obd_stopping)
550 			continue;
551 
552 		/* only notify mdc, osc, mdt, ost */
553 		type = obd->obd_type->typ_name;
554 		if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
555 		    strcmp(type, LUSTRE_OSC_NAME) != 0 &&
556 		    strcmp(type, LUSTRE_MDT_NAME) != 0 &&
557 		    strcmp(type, LUSTRE_OST_NAME) != 0)
558 			continue;
559 
560 		if (strncmp(obd->obd_name, fsname, namelen))
561 			continue;
562 
563 		class_incref(obd, __func__, obd);
564 		read_unlock(&obd_dev_lock);
565 		rc2 = obd_set_info_async(NULL, obd->obd_self_export,
566 					 sizeof(KEY_SPTLRPC_CONF),
567 					 KEY_SPTLRPC_CONF, 0, NULL, NULL);
568 		rc = rc ? rc : rc2;
569 		class_decref(obd, __func__, obd);
570 		read_lock(&obd_dev_lock);
571 	}
572 	read_unlock(&obd_dev_lock);
573 	return rc;
574 }
575 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
576 
obd_cleanup_caches(void)577 void obd_cleanup_caches(void)
578 {
579 	kmem_cache_destroy(obd_device_cachep);
580 	obd_device_cachep = NULL;
581 	kmem_cache_destroy(obdo_cachep);
582 	obdo_cachep = NULL;
583 	kmem_cache_destroy(import_cachep);
584 	import_cachep = NULL;
585 }
586 
obd_init_caches(void)587 int obd_init_caches(void)
588 {
589 	LASSERT(!obd_device_cachep);
590 	obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
591 						 sizeof(struct obd_device),
592 						 0, 0, NULL);
593 	if (!obd_device_cachep)
594 		goto out;
595 
596 	LASSERT(!obdo_cachep);
597 	obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
598 					   0, 0, NULL);
599 	if (!obdo_cachep)
600 		goto out;
601 
602 	LASSERT(!import_cachep);
603 	import_cachep = kmem_cache_create("ll_import_cache",
604 					     sizeof(struct obd_import),
605 					     0, 0, NULL);
606 	if (!import_cachep)
607 		goto out;
608 
609 	return 0;
610  out:
611 	obd_cleanup_caches();
612 	return -ENOMEM;
613 
614 }
615 
616 /* map connection to client */
class_conn2export(struct lustre_handle * conn)617 struct obd_export *class_conn2export(struct lustre_handle *conn)
618 {
619 	struct obd_export *export;
620 
621 	if (!conn) {
622 		CDEBUG(D_CACHE, "looking for null handle\n");
623 		return NULL;
624 	}
625 
626 	if (conn->cookie == -1) {  /* this means assign a new connection */
627 		CDEBUG(D_CACHE, "want a new connection\n");
628 		return NULL;
629 	}
630 
631 	CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
632 	export = class_handle2object(conn->cookie);
633 	return export;
634 }
635 EXPORT_SYMBOL(class_conn2export);
636 
class_exp2obd(struct obd_export * exp)637 struct obd_device *class_exp2obd(struct obd_export *exp)
638 {
639 	if (exp)
640 		return exp->exp_obd;
641 	return NULL;
642 }
643 EXPORT_SYMBOL(class_exp2obd);
644 
class_exp2cliimp(struct obd_export * exp)645 struct obd_import *class_exp2cliimp(struct obd_export *exp)
646 {
647 	struct obd_device *obd = exp->exp_obd;
648 
649 	if (!obd)
650 		return NULL;
651 	return obd->u.cli.cl_import;
652 }
653 EXPORT_SYMBOL(class_exp2cliimp);
654 
655 /* Export management functions */
class_export_destroy(struct obd_export * exp)656 static void class_export_destroy(struct obd_export *exp)
657 {
658 	struct obd_device *obd = exp->exp_obd;
659 
660 	LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
661 	LASSERT(obd != NULL);
662 
663 	CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
664 	       exp->exp_client_uuid.uuid, obd->obd_name);
665 
666 	/* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
667 	if (exp->exp_connection)
668 		ptlrpc_put_connection_superhack(exp->exp_connection);
669 
670 	LASSERT(list_empty(&exp->exp_outstanding_replies));
671 	LASSERT(list_empty(&exp->exp_uncommitted_replies));
672 	LASSERT(list_empty(&exp->exp_req_replay_queue));
673 	LASSERT(list_empty(&exp->exp_hp_rpcs));
674 	obd_destroy_export(exp);
675 	class_decref(obd, "export", exp);
676 
677 	OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
678 }
679 
export_handle_addref(void * export)680 static void export_handle_addref(void *export)
681 {
682 	class_export_get(export);
683 }
684 
685 static struct portals_handle_ops export_handle_ops = {
686 	.hop_addref = export_handle_addref,
687 	.hop_free   = NULL,
688 };
689 
class_export_get(struct obd_export * exp)690 struct obd_export *class_export_get(struct obd_export *exp)
691 {
692 	atomic_inc(&exp->exp_refcount);
693 	CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
694 	       atomic_read(&exp->exp_refcount));
695 	return exp;
696 }
697 EXPORT_SYMBOL(class_export_get);
698 
class_export_put(struct obd_export * exp)699 void class_export_put(struct obd_export *exp)
700 {
701 	LASSERT(exp != NULL);
702 	LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
703 	CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
704 	       atomic_read(&exp->exp_refcount) - 1);
705 
706 	if (atomic_dec_and_test(&exp->exp_refcount)) {
707 		LASSERT(!list_empty(&exp->exp_obd_chain));
708 		CDEBUG(D_IOCTL, "final put %p/%s\n",
709 		       exp, exp->exp_client_uuid.uuid);
710 
711 		/* release nid stat refererence */
712 		lprocfs_exp_cleanup(exp);
713 
714 		obd_zombie_export_add(exp);
715 	}
716 }
717 EXPORT_SYMBOL(class_export_put);
718 
719 /* Creates a new export, adds it to the hash table, and returns a
720  * pointer to it. The refcount is 2: one for the hash reference, and
721  * one for the pointer returned by this function. */
class_new_export(struct obd_device * obd,struct obd_uuid * cluuid)722 struct obd_export *class_new_export(struct obd_device *obd,
723 				    struct obd_uuid *cluuid)
724 {
725 	struct obd_export *export;
726 	struct cfs_hash *hash = NULL;
727 	int rc = 0;
728 
729 	export = kzalloc(sizeof(*export), GFP_NOFS);
730 	if (!export)
731 		return ERR_PTR(-ENOMEM);
732 
733 	export->exp_conn_cnt = 0;
734 	export->exp_lock_hash = NULL;
735 	export->exp_flock_hash = NULL;
736 	atomic_set(&export->exp_refcount, 2);
737 	atomic_set(&export->exp_rpc_count, 0);
738 	atomic_set(&export->exp_cb_count, 0);
739 	atomic_set(&export->exp_locks_count, 0);
740 #if LUSTRE_TRACKS_LOCK_EXP_REFS
741 	INIT_LIST_HEAD(&export->exp_locks_list);
742 	spin_lock_init(&export->exp_locks_list_guard);
743 #endif
744 	atomic_set(&export->exp_replay_count, 0);
745 	export->exp_obd = obd;
746 	INIT_LIST_HEAD(&export->exp_outstanding_replies);
747 	spin_lock_init(&export->exp_uncommitted_replies_lock);
748 	INIT_LIST_HEAD(&export->exp_uncommitted_replies);
749 	INIT_LIST_HEAD(&export->exp_req_replay_queue);
750 	INIT_LIST_HEAD(&export->exp_handle.h_link);
751 	INIT_LIST_HEAD(&export->exp_hp_rpcs);
752 	class_handle_hash(&export->exp_handle, &export_handle_ops);
753 	spin_lock_init(&export->exp_lock);
754 	spin_lock_init(&export->exp_rpc_lock);
755 	INIT_HLIST_NODE(&export->exp_uuid_hash);
756 	spin_lock_init(&export->exp_bl_list_lock);
757 	INIT_LIST_HEAD(&export->exp_bl_list);
758 
759 	export->exp_sp_peer = LUSTRE_SP_ANY;
760 	export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
761 	export->exp_client_uuid = *cluuid;
762 	obd_init_export(export);
763 
764 	spin_lock(&obd->obd_dev_lock);
765 	/* shouldn't happen, but might race */
766 	if (obd->obd_stopping) {
767 		rc = -ENODEV;
768 		goto exit_unlock;
769 	}
770 
771 	hash = cfs_hash_getref(obd->obd_uuid_hash);
772 	if (!hash) {
773 		rc = -ENODEV;
774 		goto exit_unlock;
775 	}
776 	spin_unlock(&obd->obd_dev_lock);
777 
778 	if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
779 		rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
780 		if (rc != 0) {
781 			LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
782 				      obd->obd_name, cluuid->uuid, rc);
783 			rc = -EALREADY;
784 			goto exit_err;
785 		}
786 	}
787 
788 	spin_lock(&obd->obd_dev_lock);
789 	if (obd->obd_stopping) {
790 		cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
791 		rc = -ENODEV;
792 		goto exit_unlock;
793 	}
794 
795 	class_incref(obd, "export", export);
796 	list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
797 	export->exp_obd->obd_num_exports++;
798 	spin_unlock(&obd->obd_dev_lock);
799 	cfs_hash_putref(hash);
800 	return export;
801 
802 exit_unlock:
803 	spin_unlock(&obd->obd_dev_lock);
804 exit_err:
805 	if (hash)
806 		cfs_hash_putref(hash);
807 	class_handle_unhash(&export->exp_handle);
808 	LASSERT(hlist_unhashed(&export->exp_uuid_hash));
809 	obd_destroy_export(export);
810 	kfree(export);
811 	return ERR_PTR(rc);
812 }
813 EXPORT_SYMBOL(class_new_export);
814 
class_unlink_export(struct obd_export * exp)815 void class_unlink_export(struct obd_export *exp)
816 {
817 	class_handle_unhash(&exp->exp_handle);
818 
819 	spin_lock(&exp->exp_obd->obd_dev_lock);
820 	/* delete an uuid-export hashitem from hashtables */
821 	if (!hlist_unhashed(&exp->exp_uuid_hash))
822 		cfs_hash_del(exp->exp_obd->obd_uuid_hash,
823 			     &exp->exp_client_uuid,
824 			     &exp->exp_uuid_hash);
825 
826 	list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
827 	exp->exp_obd->obd_num_exports--;
828 	spin_unlock(&exp->exp_obd->obd_dev_lock);
829 	class_export_put(exp);
830 }
831 EXPORT_SYMBOL(class_unlink_export);
832 
833 /* Import management functions */
class_import_destroy(struct obd_import * imp)834 static void class_import_destroy(struct obd_import *imp)
835 {
836 	CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
837 		imp->imp_obd->obd_name);
838 
839 	LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
840 
841 	ptlrpc_put_connection_superhack(imp->imp_connection);
842 
843 	while (!list_empty(&imp->imp_conn_list)) {
844 		struct obd_import_conn *imp_conn;
845 
846 		imp_conn = list_entry(imp->imp_conn_list.next,
847 					  struct obd_import_conn, oic_item);
848 		list_del_init(&imp_conn->oic_item);
849 		ptlrpc_put_connection_superhack(imp_conn->oic_conn);
850 		kfree(imp_conn);
851 	}
852 
853 	LASSERT(!imp->imp_sec);
854 	class_decref(imp->imp_obd, "import", imp);
855 	OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
856 }
857 
import_handle_addref(void * import)858 static void import_handle_addref(void *import)
859 {
860 	class_import_get(import);
861 }
862 
863 static struct portals_handle_ops import_handle_ops = {
864 	.hop_addref = import_handle_addref,
865 	.hop_free   = NULL,
866 };
867 
class_import_get(struct obd_import * import)868 struct obd_import *class_import_get(struct obd_import *import)
869 {
870 	atomic_inc(&import->imp_refcount);
871 	CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
872 	       atomic_read(&import->imp_refcount),
873 	       import->imp_obd->obd_name);
874 	return import;
875 }
876 EXPORT_SYMBOL(class_import_get);
877 
class_import_put(struct obd_import * imp)878 void class_import_put(struct obd_import *imp)
879 {
880 	LASSERT(list_empty(&imp->imp_zombie_chain));
881 	LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
882 
883 	CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
884 	       atomic_read(&imp->imp_refcount) - 1,
885 	       imp->imp_obd->obd_name);
886 
887 	if (atomic_dec_and_test(&imp->imp_refcount)) {
888 		CDEBUG(D_INFO, "final put import %p\n", imp);
889 		obd_zombie_import_add(imp);
890 	}
891 
892 	/* catch possible import put race */
893 	LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
894 }
895 EXPORT_SYMBOL(class_import_put);
896 
init_imp_at(struct imp_at * at)897 static void init_imp_at(struct imp_at *at)
898 {
899 	int i;
900 
901 	at_init(&at->iat_net_latency, 0, 0);
902 	for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
903 		/* max service estimates are tracked on the server side, so
904 		   don't use the AT history here, just use the last reported
905 		   val. (But keep hist for proc histogram, worst_ever) */
906 		at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
907 			AT_FLG_NOHIST);
908 	}
909 }
910 
class_new_import(struct obd_device * obd)911 struct obd_import *class_new_import(struct obd_device *obd)
912 {
913 	struct obd_import *imp;
914 
915 	imp = kzalloc(sizeof(*imp), GFP_NOFS);
916 	if (!imp)
917 		return NULL;
918 
919 	INIT_LIST_HEAD(&imp->imp_pinger_chain);
920 	INIT_LIST_HEAD(&imp->imp_zombie_chain);
921 	INIT_LIST_HEAD(&imp->imp_replay_list);
922 	INIT_LIST_HEAD(&imp->imp_sending_list);
923 	INIT_LIST_HEAD(&imp->imp_delayed_list);
924 	INIT_LIST_HEAD(&imp->imp_committed_list);
925 	imp->imp_replay_cursor = &imp->imp_committed_list;
926 	spin_lock_init(&imp->imp_lock);
927 	imp->imp_last_success_conn = 0;
928 	imp->imp_state = LUSTRE_IMP_NEW;
929 	imp->imp_obd = class_incref(obd, "import", imp);
930 	mutex_init(&imp->imp_sec_mutex);
931 	init_waitqueue_head(&imp->imp_recovery_waitq);
932 
933 	atomic_set(&imp->imp_refcount, 2);
934 	atomic_set(&imp->imp_unregistering, 0);
935 	atomic_set(&imp->imp_inflight, 0);
936 	atomic_set(&imp->imp_replay_inflight, 0);
937 	atomic_set(&imp->imp_inval_count, 0);
938 	INIT_LIST_HEAD(&imp->imp_conn_list);
939 	INIT_LIST_HEAD(&imp->imp_handle.h_link);
940 	class_handle_hash(&imp->imp_handle, &import_handle_ops);
941 	init_imp_at(&imp->imp_at);
942 
943 	/* the default magic is V2, will be used in connect RPC, and
944 	 * then adjusted according to the flags in request/reply. */
945 	imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
946 
947 	return imp;
948 }
949 EXPORT_SYMBOL(class_new_import);
950 
class_destroy_import(struct obd_import * import)951 void class_destroy_import(struct obd_import *import)
952 {
953 	LASSERT(import != NULL);
954 	LASSERT(import != LP_POISON);
955 
956 	class_handle_unhash(&import->imp_handle);
957 
958 	spin_lock(&import->imp_lock);
959 	import->imp_generation++;
960 	spin_unlock(&import->imp_lock);
961 	class_import_put(import);
962 }
963 EXPORT_SYMBOL(class_destroy_import);
964 
965 #if LUSTRE_TRACKS_LOCK_EXP_REFS
966 
__class_export_add_lock_ref(struct obd_export * exp,struct ldlm_lock * lock)967 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
968 {
969 	spin_lock(&exp->exp_locks_list_guard);
970 
971 	LASSERT(lock->l_exp_refs_nr >= 0);
972 
973 	if (lock->l_exp_refs_target != NULL &&
974 	    lock->l_exp_refs_target != exp) {
975 		LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
976 			      exp, lock, lock->l_exp_refs_target);
977 	}
978 	if ((lock->l_exp_refs_nr++) == 0) {
979 		list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
980 		lock->l_exp_refs_target = exp;
981 	}
982 	CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
983 	       lock, exp, lock->l_exp_refs_nr);
984 	spin_unlock(&exp->exp_locks_list_guard);
985 }
986 EXPORT_SYMBOL(__class_export_add_lock_ref);
987 
__class_export_del_lock_ref(struct obd_export * exp,struct ldlm_lock * lock)988 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
989 {
990 	spin_lock(&exp->exp_locks_list_guard);
991 	LASSERT(lock->l_exp_refs_nr > 0);
992 	if (lock->l_exp_refs_target != exp) {
993 		LCONSOLE_WARN("lock %p, mismatching export pointers: %p, %p\n",
994 			      lock, lock->l_exp_refs_target, exp);
995 	}
996 	if (-- lock->l_exp_refs_nr == 0) {
997 		list_del_init(&lock->l_exp_refs_link);
998 		lock->l_exp_refs_target = NULL;
999 	}
1000 	CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1001 	       lock, exp, lock->l_exp_refs_nr);
1002 	spin_unlock(&exp->exp_locks_list_guard);
1003 }
1004 EXPORT_SYMBOL(__class_export_del_lock_ref);
1005 #endif
1006 
1007 /* A connection defines an export context in which preallocation can
1008    be managed. This releases the export pointer reference, and returns
1009    the export handle, so the export refcount is 1 when this function
1010    returns. */
class_connect(struct lustre_handle * conn,struct obd_device * obd,struct obd_uuid * cluuid)1011 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1012 		  struct obd_uuid *cluuid)
1013 {
1014 	struct obd_export *export;
1015 
1016 	LASSERT(conn != NULL);
1017 	LASSERT(obd != NULL);
1018 	LASSERT(cluuid != NULL);
1019 
1020 	export = class_new_export(obd, cluuid);
1021 	if (IS_ERR(export))
1022 		return PTR_ERR(export);
1023 
1024 	conn->cookie = export->exp_handle.h_cookie;
1025 	class_export_put(export);
1026 
1027 	CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1028 	       cluuid->uuid, conn->cookie);
1029 	return 0;
1030 }
1031 EXPORT_SYMBOL(class_connect);
1032 
1033 /* This function removes 1-3 references from the export:
1034  * 1 - for export pointer passed
1035  * and if disconnect really need
1036  * 2 - removing from hash
1037  * 3 - in client_unlink_export
1038  * The export pointer passed to this function can destroyed */
class_disconnect(struct obd_export * export)1039 int class_disconnect(struct obd_export *export)
1040 {
1041 	int already_disconnected;
1042 
1043 	if (!export) {
1044 		CWARN("attempting to free NULL export %p\n", export);
1045 		return -EINVAL;
1046 	}
1047 
1048 	spin_lock(&export->exp_lock);
1049 	already_disconnected = export->exp_disconnected;
1050 	export->exp_disconnected = 1;
1051 	spin_unlock(&export->exp_lock);
1052 
1053 	/* class_cleanup(), abort_recovery(), and class_fail_export()
1054 	 * all end up in here, and if any of them race we shouldn't
1055 	 * call extra class_export_puts(). */
1056 	if (already_disconnected)
1057 		goto no_disconn;
1058 
1059 	CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1060 	       export->exp_handle.h_cookie);
1061 
1062 	class_unlink_export(export);
1063 no_disconn:
1064 	class_export_put(export);
1065 	return 0;
1066 }
1067 EXPORT_SYMBOL(class_disconnect);
1068 
class_fail_export(struct obd_export * exp)1069 void class_fail_export(struct obd_export *exp)
1070 {
1071 	int rc, already_failed;
1072 
1073 	spin_lock(&exp->exp_lock);
1074 	already_failed = exp->exp_failed;
1075 	exp->exp_failed = 1;
1076 	spin_unlock(&exp->exp_lock);
1077 
1078 	if (already_failed) {
1079 		CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1080 		       exp, exp->exp_client_uuid.uuid);
1081 		return;
1082 	}
1083 
1084 	CDEBUG(D_HA, "disconnecting export %p/%s\n",
1085 	       exp, exp->exp_client_uuid.uuid);
1086 
1087 	if (obd_dump_on_timeout)
1088 		libcfs_debug_dumplog();
1089 
1090 	/* need for safe call CDEBUG after obd_disconnect */
1091 	class_export_get(exp);
1092 
1093 	/* Most callers into obd_disconnect are removing their own reference
1094 	 * (request, for example) in addition to the one from the hash table.
1095 	 * We don't have such a reference here, so make one. */
1096 	class_export_get(exp);
1097 	rc = obd_disconnect(exp);
1098 	if (rc)
1099 		CERROR("disconnecting export %p failed: %d\n", exp, rc);
1100 	else
1101 		CDEBUG(D_HA, "disconnected export %p/%s\n",
1102 		       exp, exp->exp_client_uuid.uuid);
1103 	class_export_put(exp);
1104 }
1105 EXPORT_SYMBOL(class_fail_export);
1106 
1107 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1108 void (*class_export_dump_hook)(struct obd_export *) = NULL;
1109 EXPORT_SYMBOL(class_export_dump_hook);
1110 #endif
1111 
1112 /* Total amount of zombies to be destroyed */
1113 static int zombies_count;
1114 
1115 /**
1116  * kill zombie imports and exports
1117  */
obd_zombie_impexp_cull(void)1118 static void obd_zombie_impexp_cull(void)
1119 {
1120 	struct obd_import *import;
1121 	struct obd_export *export;
1122 
1123 	do {
1124 		spin_lock(&obd_zombie_impexp_lock);
1125 
1126 		import = NULL;
1127 		if (!list_empty(&obd_zombie_imports)) {
1128 			import = list_entry(obd_zombie_imports.next,
1129 						struct obd_import,
1130 						imp_zombie_chain);
1131 			list_del_init(&import->imp_zombie_chain);
1132 		}
1133 
1134 		export = NULL;
1135 		if (!list_empty(&obd_zombie_exports)) {
1136 			export = list_entry(obd_zombie_exports.next,
1137 						struct obd_export,
1138 						exp_obd_chain);
1139 			list_del_init(&export->exp_obd_chain);
1140 		}
1141 
1142 		spin_unlock(&obd_zombie_impexp_lock);
1143 
1144 		if (import != NULL) {
1145 			class_import_destroy(import);
1146 			spin_lock(&obd_zombie_impexp_lock);
1147 			zombies_count--;
1148 			spin_unlock(&obd_zombie_impexp_lock);
1149 		}
1150 
1151 		if (export != NULL) {
1152 			class_export_destroy(export);
1153 			spin_lock(&obd_zombie_impexp_lock);
1154 			zombies_count--;
1155 			spin_unlock(&obd_zombie_impexp_lock);
1156 		}
1157 
1158 		cond_resched();
1159 	} while (import != NULL || export != NULL);
1160 }
1161 
1162 static struct completion	obd_zombie_start;
1163 static struct completion	obd_zombie_stop;
1164 static unsigned long		obd_zombie_flags;
1165 static wait_queue_head_t		obd_zombie_waitq;
1166 static pid_t			obd_zombie_pid;
1167 
1168 enum {
1169 	OBD_ZOMBIE_STOP		= 0x0001,
1170 };
1171 
1172 /**
1173  * check for work for kill zombie import/export thread.
1174  */
obd_zombie_impexp_check(void * arg)1175 static int obd_zombie_impexp_check(void *arg)
1176 {
1177 	int rc;
1178 
1179 	spin_lock(&obd_zombie_impexp_lock);
1180 	rc = (zombies_count == 0) &&
1181 	     !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1182 	spin_unlock(&obd_zombie_impexp_lock);
1183 
1184 	return rc;
1185 }
1186 
1187 /**
1188  * Add export to the obd_zombie thread and notify it.
1189  */
obd_zombie_export_add(struct obd_export * exp)1190 static void obd_zombie_export_add(struct obd_export *exp)
1191 {
1192 	spin_lock(&exp->exp_obd->obd_dev_lock);
1193 	LASSERT(!list_empty(&exp->exp_obd_chain));
1194 	list_del_init(&exp->exp_obd_chain);
1195 	spin_unlock(&exp->exp_obd->obd_dev_lock);
1196 	spin_lock(&obd_zombie_impexp_lock);
1197 	zombies_count++;
1198 	list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1199 	spin_unlock(&obd_zombie_impexp_lock);
1200 
1201 	obd_zombie_impexp_notify();
1202 }
1203 
1204 /**
1205  * Add import to the obd_zombie thread and notify it.
1206  */
obd_zombie_import_add(struct obd_import * imp)1207 static void obd_zombie_import_add(struct obd_import *imp)
1208 {
1209 	LASSERT(!imp->imp_sec);
1210 	spin_lock(&obd_zombie_impexp_lock);
1211 	LASSERT(list_empty(&imp->imp_zombie_chain));
1212 	zombies_count++;
1213 	list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1214 	spin_unlock(&obd_zombie_impexp_lock);
1215 
1216 	obd_zombie_impexp_notify();
1217 }
1218 
1219 /**
1220  * notify import/export destroy thread about new zombie.
1221  */
obd_zombie_impexp_notify(void)1222 static void obd_zombie_impexp_notify(void)
1223 {
1224 	/*
1225 	 * Make sure obd_zombie_impexp_thread get this notification.
1226 	 * It is possible this signal only get by obd_zombie_barrier, and
1227 	 * barrier gulps this notification and sleeps away and hangs ensues
1228 	 */
1229 	wake_up_all(&obd_zombie_waitq);
1230 }
1231 
1232 /**
1233  * check whether obd_zombie is idle
1234  */
obd_zombie_is_idle(void)1235 static int obd_zombie_is_idle(void)
1236 {
1237 	int rc;
1238 
1239 	LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1240 	spin_lock(&obd_zombie_impexp_lock);
1241 	rc = (zombies_count == 0);
1242 	spin_unlock(&obd_zombie_impexp_lock);
1243 	return rc;
1244 }
1245 
1246 /**
1247  * wait when obd_zombie import/export queues become empty
1248  */
obd_zombie_barrier(void)1249 void obd_zombie_barrier(void)
1250 {
1251 	struct l_wait_info lwi = { 0 };
1252 
1253 	if (obd_zombie_pid == current_pid())
1254 		/* don't wait for myself */
1255 		return;
1256 	l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1257 }
1258 EXPORT_SYMBOL(obd_zombie_barrier);
1259 
1260 /**
1261  * destroy zombie export/import thread.
1262  */
obd_zombie_impexp_thread(void * unused)1263 static int obd_zombie_impexp_thread(void *unused)
1264 {
1265 	unshare_fs_struct();
1266 	complete(&obd_zombie_start);
1267 
1268 	obd_zombie_pid = current_pid();
1269 
1270 	while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1271 		struct l_wait_info lwi = { 0 };
1272 
1273 		l_wait_event(obd_zombie_waitq,
1274 			     !obd_zombie_impexp_check(NULL), &lwi);
1275 		obd_zombie_impexp_cull();
1276 
1277 		/*
1278 		 * Notify obd_zombie_barrier callers that queues
1279 		 * may be empty.
1280 		 */
1281 		wake_up(&obd_zombie_waitq);
1282 	}
1283 
1284 	complete(&obd_zombie_stop);
1285 
1286 	return 0;
1287 }
1288 
1289 /**
1290  * start destroy zombie import/export thread
1291  */
obd_zombie_impexp_init(void)1292 int obd_zombie_impexp_init(void)
1293 {
1294 	struct task_struct *task;
1295 
1296 	INIT_LIST_HEAD(&obd_zombie_imports);
1297 	INIT_LIST_HEAD(&obd_zombie_exports);
1298 	spin_lock_init(&obd_zombie_impexp_lock);
1299 	init_completion(&obd_zombie_start);
1300 	init_completion(&obd_zombie_stop);
1301 	init_waitqueue_head(&obd_zombie_waitq);
1302 	obd_zombie_pid = 0;
1303 
1304 	task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1305 	if (IS_ERR(task))
1306 		return PTR_ERR(task);
1307 
1308 	wait_for_completion(&obd_zombie_start);
1309 	return 0;
1310 }
1311 
1312 /**
1313  * stop destroy zombie import/export thread
1314  */
obd_zombie_impexp_stop(void)1315 void obd_zombie_impexp_stop(void)
1316 {
1317 	set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1318 	obd_zombie_impexp_notify();
1319 	wait_for_completion(&obd_zombie_stop);
1320 }
1321