1 /*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26 /*
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32 /*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 *
36 * lustre/obdclass/genops.c
37 *
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
40 */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include "../include/obd_class.h"
44 #include "../include/lprocfs_status.h"
45
46 spinlock_t obd_types_lock;
47
48 static struct kmem_cache *obd_device_cachep;
49 struct kmem_cache *obdo_cachep;
50 EXPORT_SYMBOL(obdo_cachep);
51 static struct kmem_cache *import_cachep;
52
53 static struct list_head obd_zombie_imports;
54 static struct list_head obd_zombie_exports;
55 static spinlock_t obd_zombie_impexp_lock;
56 static void obd_zombie_impexp_notify(void);
57 static void obd_zombie_export_add(struct obd_export *exp);
58 static void obd_zombie_import_add(struct obd_import *imp);
59
60 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
61 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
62
63 /*
64 * support functions: we could use inter-module communication, but this
65 * is more portable to other OS's
66 */
obd_device_alloc(void)67 static struct obd_device *obd_device_alloc(void)
68 {
69 struct obd_device *obd;
70
71 obd = kmem_cache_alloc(obd_device_cachep, GFP_NOFS | __GFP_ZERO);
72 if (obd != NULL)
73 obd->obd_magic = OBD_DEVICE_MAGIC;
74 return obd;
75 }
76
obd_device_free(struct obd_device * obd)77 static void obd_device_free(struct obd_device *obd)
78 {
79 LASSERT(obd != NULL);
80 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
81 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
82 if (obd->obd_namespace != NULL) {
83 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
84 obd, obd->obd_namespace, obd->obd_force);
85 LBUG();
86 }
87 lu_ref_fini(&obd->obd_reference);
88 kmem_cache_free(obd_device_cachep, obd);
89 }
90
class_search_type(const char * name)91 static struct obd_type *class_search_type(const char *name)
92 {
93 struct list_head *tmp;
94 struct obd_type *type;
95
96 spin_lock(&obd_types_lock);
97 list_for_each(tmp, &obd_types) {
98 type = list_entry(tmp, struct obd_type, typ_chain);
99 if (strcmp(type->typ_name, name) == 0) {
100 spin_unlock(&obd_types_lock);
101 return type;
102 }
103 }
104 spin_unlock(&obd_types_lock);
105 return NULL;
106 }
107
class_get_type(const char * name)108 static struct obd_type *class_get_type(const char *name)
109 {
110 struct obd_type *type = class_search_type(name);
111
112 if (!type) {
113 const char *modname = name;
114
115 if (strcmp(modname, "obdfilter") == 0)
116 modname = "ofd";
117
118 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
119 modname = LUSTRE_OSP_NAME;
120
121 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
122 modname = LUSTRE_MDT_NAME;
123
124 if (!request_module("%s", modname)) {
125 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
126 type = class_search_type(name);
127 } else {
128 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
129 modname);
130 }
131 }
132 if (type) {
133 spin_lock(&type->obd_type_lock);
134 type->typ_refcnt++;
135 try_module_get(type->typ_dt_ops->o_owner);
136 spin_unlock(&type->obd_type_lock);
137 }
138 return type;
139 }
140
class_put_type(struct obd_type * type)141 void class_put_type(struct obd_type *type)
142 {
143 LASSERT(type);
144 spin_lock(&type->obd_type_lock);
145 type->typ_refcnt--;
146 module_put(type->typ_dt_ops->o_owner);
147 spin_unlock(&type->obd_type_lock);
148 }
149 EXPORT_SYMBOL(class_put_type);
150
151 #define CLASS_MAX_NAME 1024
152
class_register_type(struct obd_ops * dt_ops,struct md_ops * md_ops,const char * name,struct lu_device_type * ldt)153 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
154 const char *name,
155 struct lu_device_type *ldt)
156 {
157 struct obd_type *type;
158 int rc = 0;
159
160 /* sanity check */
161 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
162
163 if (class_search_type(name)) {
164 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
165 return -EEXIST;
166 }
167
168 rc = -ENOMEM;
169 type = kzalloc(sizeof(*type), GFP_NOFS);
170 if (!type)
171 return rc;
172
173 type->typ_dt_ops = kzalloc(sizeof(*type->typ_dt_ops), GFP_NOFS);
174 type->typ_md_ops = kzalloc(sizeof(*type->typ_md_ops), GFP_NOFS);
175 type->typ_name = kzalloc(strlen(name) + 1, GFP_NOFS);
176
177 if (!type->typ_dt_ops ||
178 !type->typ_md_ops ||
179 !type->typ_name)
180 goto failed;
181
182 *(type->typ_dt_ops) = *dt_ops;
183 /* md_ops is optional */
184 if (md_ops)
185 *(type->typ_md_ops) = *md_ops;
186 strcpy(type->typ_name, name);
187 spin_lock_init(&type->obd_type_lock);
188
189 type->typ_debugfs_entry = ldebugfs_register(type->typ_name,
190 debugfs_lustre_root,
191 NULL, type);
192 if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
193 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
194 : -ENOMEM;
195 type->typ_debugfs_entry = NULL;
196 goto failed;
197 }
198
199 type->typ_kobj = kobject_create_and_add(type->typ_name, lustre_kobj);
200 if (!type->typ_kobj) {
201 rc = -ENOMEM;
202 goto failed;
203 }
204
205 if (ldt != NULL) {
206 type->typ_lu = ldt;
207 rc = lu_device_type_init(ldt);
208 if (rc != 0)
209 goto failed;
210 }
211
212 spin_lock(&obd_types_lock);
213 list_add(&type->typ_chain, &obd_types);
214 spin_unlock(&obd_types_lock);
215
216 return 0;
217
218 failed:
219 if (type->typ_kobj)
220 kobject_put(type->typ_kobj);
221 kfree(type->typ_name);
222 kfree(type->typ_md_ops);
223 kfree(type->typ_dt_ops);
224 kfree(type);
225 return rc;
226 }
227 EXPORT_SYMBOL(class_register_type);
228
class_unregister_type(const char * name)229 int class_unregister_type(const char *name)
230 {
231 struct obd_type *type = class_search_type(name);
232
233 if (!type) {
234 CERROR("unknown obd type\n");
235 return -EINVAL;
236 }
237
238 if (type->typ_refcnt) {
239 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
240 /* This is a bad situation, let's make the best of it */
241 /* Remove ops, but leave the name for debugging */
242 kfree(type->typ_dt_ops);
243 kfree(type->typ_md_ops);
244 return -EBUSY;
245 }
246
247 if (type->typ_kobj)
248 kobject_put(type->typ_kobj);
249
250 if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
251 ldebugfs_remove(&type->typ_debugfs_entry);
252
253 if (type->typ_lu)
254 lu_device_type_fini(type->typ_lu);
255
256 spin_lock(&obd_types_lock);
257 list_del(&type->typ_chain);
258 spin_unlock(&obd_types_lock);
259 kfree(type->typ_name);
260 kfree(type->typ_dt_ops);
261 kfree(type->typ_md_ops);
262 kfree(type);
263 return 0;
264 } /* class_unregister_type */
265 EXPORT_SYMBOL(class_unregister_type);
266
267 /**
268 * Create a new obd device.
269 *
270 * Find an empty slot in ::obd_devs[], create a new obd device in it.
271 *
272 * \param[in] type_name obd device type string.
273 * \param[in] name obd device name.
274 *
275 * \retval NULL if create fails, otherwise return the obd device
276 * pointer created.
277 */
class_newdev(const char * type_name,const char * name)278 struct obd_device *class_newdev(const char *type_name, const char *name)
279 {
280 struct obd_device *result = NULL;
281 struct obd_device *newdev;
282 struct obd_type *type = NULL;
283 int i;
284 int new_obd_minor = 0;
285
286 if (strlen(name) >= MAX_OBD_NAME) {
287 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
288 return ERR_PTR(-EINVAL);
289 }
290
291 type = class_get_type(type_name);
292 if (!type) {
293 CERROR("OBD: unknown type: %s\n", type_name);
294 return ERR_PTR(-ENODEV);
295 }
296
297 newdev = obd_device_alloc();
298 if (!newdev) {
299 result = ERR_PTR(-ENOMEM);
300 goto out_type;
301 }
302
303 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
304
305 write_lock(&obd_dev_lock);
306 for (i = 0; i < class_devno_max(); i++) {
307 struct obd_device *obd = class_num2obd(i);
308
309 if (obd && (strcmp(name, obd->obd_name) == 0)) {
310 CERROR("Device %s already exists at %d, won't add\n",
311 name, i);
312 if (result) {
313 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
314 "%p obd_magic %08x != %08x\n", result,
315 result->obd_magic, OBD_DEVICE_MAGIC);
316 LASSERTF(result->obd_minor == new_obd_minor,
317 "%p obd_minor %d != %d\n", result,
318 result->obd_minor, new_obd_minor);
319
320 obd_devs[result->obd_minor] = NULL;
321 result->obd_name[0] = '\0';
322 }
323 result = ERR_PTR(-EEXIST);
324 break;
325 }
326 if (!result && !obd) {
327 result = newdev;
328 result->obd_minor = i;
329 new_obd_minor = i;
330 result->obd_type = type;
331 strncpy(result->obd_name, name,
332 sizeof(result->obd_name) - 1);
333 obd_devs[i] = result;
334 }
335 }
336 write_unlock(&obd_dev_lock);
337
338 if (!result && i >= class_devno_max()) {
339 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
340 class_devno_max());
341 result = ERR_PTR(-EOVERFLOW);
342 goto out;
343 }
344
345 if (IS_ERR(result))
346 goto out;
347
348 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
349 result->obd_name, result);
350
351 return result;
352 out:
353 obd_device_free(newdev);
354 out_type:
355 class_put_type(type);
356 return result;
357 }
358
class_release_dev(struct obd_device * obd)359 void class_release_dev(struct obd_device *obd)
360 {
361 struct obd_type *obd_type = obd->obd_type;
362
363 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
364 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
365 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
366 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
367 LASSERT(obd_type != NULL);
368
369 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
370 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
371
372 write_lock(&obd_dev_lock);
373 obd_devs[obd->obd_minor] = NULL;
374 write_unlock(&obd_dev_lock);
375 obd_device_free(obd);
376
377 class_put_type(obd_type);
378 }
379
class_name2dev(const char * name)380 int class_name2dev(const char *name)
381 {
382 int i;
383
384 if (!name)
385 return -1;
386
387 read_lock(&obd_dev_lock);
388 for (i = 0; i < class_devno_max(); i++) {
389 struct obd_device *obd = class_num2obd(i);
390
391 if (obd && strcmp(name, obd->obd_name) == 0) {
392 /* Make sure we finished attaching before we give
393 out any references */
394 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
395 if (obd->obd_attached) {
396 read_unlock(&obd_dev_lock);
397 return i;
398 }
399 break;
400 }
401 }
402 read_unlock(&obd_dev_lock);
403
404 return -1;
405 }
406 EXPORT_SYMBOL(class_name2dev);
407
class_name2obd(const char * name)408 struct obd_device *class_name2obd(const char *name)
409 {
410 int dev = class_name2dev(name);
411
412 if (dev < 0 || dev > class_devno_max())
413 return NULL;
414 return class_num2obd(dev);
415 }
416 EXPORT_SYMBOL(class_name2obd);
417
class_uuid2dev(struct obd_uuid * uuid)418 int class_uuid2dev(struct obd_uuid *uuid)
419 {
420 int i;
421
422 read_lock(&obd_dev_lock);
423 for (i = 0; i < class_devno_max(); i++) {
424 struct obd_device *obd = class_num2obd(i);
425
426 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
427 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
428 read_unlock(&obd_dev_lock);
429 return i;
430 }
431 }
432 read_unlock(&obd_dev_lock);
433
434 return -1;
435 }
436 EXPORT_SYMBOL(class_uuid2dev);
437
438 /**
439 * Get obd device from ::obd_devs[]
440 *
441 * \param num [in] array index
442 *
443 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
444 * otherwise return the obd device there.
445 */
class_num2obd(int num)446 struct obd_device *class_num2obd(int num)
447 {
448 struct obd_device *obd = NULL;
449
450 if (num < class_devno_max()) {
451 obd = obd_devs[num];
452 if (!obd)
453 return NULL;
454
455 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
456 "%p obd_magic %08x != %08x\n",
457 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
458 LASSERTF(obd->obd_minor == num,
459 "%p obd_minor %0d != %0d\n",
460 obd, obd->obd_minor, num);
461 }
462
463 return obd;
464 }
465 EXPORT_SYMBOL(class_num2obd);
466
467 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
468 specified, then only the client with that uuid is returned,
469 otherwise any client connected to the tgt is returned. */
class_find_client_obd(struct obd_uuid * tgt_uuid,const char * typ_name,struct obd_uuid * grp_uuid)470 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
471 const char *typ_name,
472 struct obd_uuid *grp_uuid)
473 {
474 int i;
475
476 read_lock(&obd_dev_lock);
477 for (i = 0; i < class_devno_max(); i++) {
478 struct obd_device *obd = class_num2obd(i);
479
480 if (!obd)
481 continue;
482 if ((strncmp(obd->obd_type->typ_name, typ_name,
483 strlen(typ_name)) == 0)) {
484 if (obd_uuid_equals(tgt_uuid,
485 &obd->u.cli.cl_target_uuid) &&
486 ((grp_uuid) ? obd_uuid_equals(grp_uuid,
487 &obd->obd_uuid) : 1)) {
488 read_unlock(&obd_dev_lock);
489 return obd;
490 }
491 }
492 }
493 read_unlock(&obd_dev_lock);
494
495 return NULL;
496 }
497 EXPORT_SYMBOL(class_find_client_obd);
498
499 /* Iterate the obd_device list looking devices have grp_uuid. Start
500 searching at *next, and if a device is found, the next index to look
501 at is saved in *next. If next is NULL, then the first matching device
502 will always be returned. */
class_devices_in_group(struct obd_uuid * grp_uuid,int * next)503 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
504 {
505 int i;
506
507 if (!next)
508 i = 0;
509 else if (*next >= 0 && *next < class_devno_max())
510 i = *next;
511 else
512 return NULL;
513
514 read_lock(&obd_dev_lock);
515 for (; i < class_devno_max(); i++) {
516 struct obd_device *obd = class_num2obd(i);
517
518 if (!obd)
519 continue;
520 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
521 if (next)
522 *next = i+1;
523 read_unlock(&obd_dev_lock);
524 return obd;
525 }
526 }
527 read_unlock(&obd_dev_lock);
528
529 return NULL;
530 }
531 EXPORT_SYMBOL(class_devices_in_group);
532
533 /**
534 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
535 * adjust sptlrpc settings accordingly.
536 */
class_notify_sptlrpc_conf(const char * fsname,int namelen)537 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
538 {
539 struct obd_device *obd;
540 const char *type;
541 int i, rc = 0, rc2;
542
543 LASSERT(namelen > 0);
544
545 read_lock(&obd_dev_lock);
546 for (i = 0; i < class_devno_max(); i++) {
547 obd = class_num2obd(i);
548
549 if (!obd || obd->obd_set_up == 0 || obd->obd_stopping)
550 continue;
551
552 /* only notify mdc, osc, mdt, ost */
553 type = obd->obd_type->typ_name;
554 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
555 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
556 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
557 strcmp(type, LUSTRE_OST_NAME) != 0)
558 continue;
559
560 if (strncmp(obd->obd_name, fsname, namelen))
561 continue;
562
563 class_incref(obd, __func__, obd);
564 read_unlock(&obd_dev_lock);
565 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
566 sizeof(KEY_SPTLRPC_CONF),
567 KEY_SPTLRPC_CONF, 0, NULL, NULL);
568 rc = rc ? rc : rc2;
569 class_decref(obd, __func__, obd);
570 read_lock(&obd_dev_lock);
571 }
572 read_unlock(&obd_dev_lock);
573 return rc;
574 }
575 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
576
obd_cleanup_caches(void)577 void obd_cleanup_caches(void)
578 {
579 kmem_cache_destroy(obd_device_cachep);
580 obd_device_cachep = NULL;
581 kmem_cache_destroy(obdo_cachep);
582 obdo_cachep = NULL;
583 kmem_cache_destroy(import_cachep);
584 import_cachep = NULL;
585 }
586
obd_init_caches(void)587 int obd_init_caches(void)
588 {
589 LASSERT(!obd_device_cachep);
590 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
591 sizeof(struct obd_device),
592 0, 0, NULL);
593 if (!obd_device_cachep)
594 goto out;
595
596 LASSERT(!obdo_cachep);
597 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
598 0, 0, NULL);
599 if (!obdo_cachep)
600 goto out;
601
602 LASSERT(!import_cachep);
603 import_cachep = kmem_cache_create("ll_import_cache",
604 sizeof(struct obd_import),
605 0, 0, NULL);
606 if (!import_cachep)
607 goto out;
608
609 return 0;
610 out:
611 obd_cleanup_caches();
612 return -ENOMEM;
613
614 }
615
616 /* map connection to client */
class_conn2export(struct lustre_handle * conn)617 struct obd_export *class_conn2export(struct lustre_handle *conn)
618 {
619 struct obd_export *export;
620
621 if (!conn) {
622 CDEBUG(D_CACHE, "looking for null handle\n");
623 return NULL;
624 }
625
626 if (conn->cookie == -1) { /* this means assign a new connection */
627 CDEBUG(D_CACHE, "want a new connection\n");
628 return NULL;
629 }
630
631 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
632 export = class_handle2object(conn->cookie);
633 return export;
634 }
635 EXPORT_SYMBOL(class_conn2export);
636
class_exp2obd(struct obd_export * exp)637 struct obd_device *class_exp2obd(struct obd_export *exp)
638 {
639 if (exp)
640 return exp->exp_obd;
641 return NULL;
642 }
643 EXPORT_SYMBOL(class_exp2obd);
644
class_exp2cliimp(struct obd_export * exp)645 struct obd_import *class_exp2cliimp(struct obd_export *exp)
646 {
647 struct obd_device *obd = exp->exp_obd;
648
649 if (!obd)
650 return NULL;
651 return obd->u.cli.cl_import;
652 }
653 EXPORT_SYMBOL(class_exp2cliimp);
654
655 /* Export management functions */
class_export_destroy(struct obd_export * exp)656 static void class_export_destroy(struct obd_export *exp)
657 {
658 struct obd_device *obd = exp->exp_obd;
659
660 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
661 LASSERT(obd != NULL);
662
663 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
664 exp->exp_client_uuid.uuid, obd->obd_name);
665
666 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
667 if (exp->exp_connection)
668 ptlrpc_put_connection_superhack(exp->exp_connection);
669
670 LASSERT(list_empty(&exp->exp_outstanding_replies));
671 LASSERT(list_empty(&exp->exp_uncommitted_replies));
672 LASSERT(list_empty(&exp->exp_req_replay_queue));
673 LASSERT(list_empty(&exp->exp_hp_rpcs));
674 obd_destroy_export(exp);
675 class_decref(obd, "export", exp);
676
677 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
678 }
679
export_handle_addref(void * export)680 static void export_handle_addref(void *export)
681 {
682 class_export_get(export);
683 }
684
685 static struct portals_handle_ops export_handle_ops = {
686 .hop_addref = export_handle_addref,
687 .hop_free = NULL,
688 };
689
class_export_get(struct obd_export * exp)690 struct obd_export *class_export_get(struct obd_export *exp)
691 {
692 atomic_inc(&exp->exp_refcount);
693 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
694 atomic_read(&exp->exp_refcount));
695 return exp;
696 }
697 EXPORT_SYMBOL(class_export_get);
698
class_export_put(struct obd_export * exp)699 void class_export_put(struct obd_export *exp)
700 {
701 LASSERT(exp != NULL);
702 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
703 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
704 atomic_read(&exp->exp_refcount) - 1);
705
706 if (atomic_dec_and_test(&exp->exp_refcount)) {
707 LASSERT(!list_empty(&exp->exp_obd_chain));
708 CDEBUG(D_IOCTL, "final put %p/%s\n",
709 exp, exp->exp_client_uuid.uuid);
710
711 /* release nid stat refererence */
712 lprocfs_exp_cleanup(exp);
713
714 obd_zombie_export_add(exp);
715 }
716 }
717 EXPORT_SYMBOL(class_export_put);
718
719 /* Creates a new export, adds it to the hash table, and returns a
720 * pointer to it. The refcount is 2: one for the hash reference, and
721 * one for the pointer returned by this function. */
class_new_export(struct obd_device * obd,struct obd_uuid * cluuid)722 struct obd_export *class_new_export(struct obd_device *obd,
723 struct obd_uuid *cluuid)
724 {
725 struct obd_export *export;
726 struct cfs_hash *hash = NULL;
727 int rc = 0;
728
729 export = kzalloc(sizeof(*export), GFP_NOFS);
730 if (!export)
731 return ERR_PTR(-ENOMEM);
732
733 export->exp_conn_cnt = 0;
734 export->exp_lock_hash = NULL;
735 export->exp_flock_hash = NULL;
736 atomic_set(&export->exp_refcount, 2);
737 atomic_set(&export->exp_rpc_count, 0);
738 atomic_set(&export->exp_cb_count, 0);
739 atomic_set(&export->exp_locks_count, 0);
740 #if LUSTRE_TRACKS_LOCK_EXP_REFS
741 INIT_LIST_HEAD(&export->exp_locks_list);
742 spin_lock_init(&export->exp_locks_list_guard);
743 #endif
744 atomic_set(&export->exp_replay_count, 0);
745 export->exp_obd = obd;
746 INIT_LIST_HEAD(&export->exp_outstanding_replies);
747 spin_lock_init(&export->exp_uncommitted_replies_lock);
748 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
749 INIT_LIST_HEAD(&export->exp_req_replay_queue);
750 INIT_LIST_HEAD(&export->exp_handle.h_link);
751 INIT_LIST_HEAD(&export->exp_hp_rpcs);
752 class_handle_hash(&export->exp_handle, &export_handle_ops);
753 spin_lock_init(&export->exp_lock);
754 spin_lock_init(&export->exp_rpc_lock);
755 INIT_HLIST_NODE(&export->exp_uuid_hash);
756 spin_lock_init(&export->exp_bl_list_lock);
757 INIT_LIST_HEAD(&export->exp_bl_list);
758
759 export->exp_sp_peer = LUSTRE_SP_ANY;
760 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
761 export->exp_client_uuid = *cluuid;
762 obd_init_export(export);
763
764 spin_lock(&obd->obd_dev_lock);
765 /* shouldn't happen, but might race */
766 if (obd->obd_stopping) {
767 rc = -ENODEV;
768 goto exit_unlock;
769 }
770
771 hash = cfs_hash_getref(obd->obd_uuid_hash);
772 if (!hash) {
773 rc = -ENODEV;
774 goto exit_unlock;
775 }
776 spin_unlock(&obd->obd_dev_lock);
777
778 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
779 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
780 if (rc != 0) {
781 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
782 obd->obd_name, cluuid->uuid, rc);
783 rc = -EALREADY;
784 goto exit_err;
785 }
786 }
787
788 spin_lock(&obd->obd_dev_lock);
789 if (obd->obd_stopping) {
790 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
791 rc = -ENODEV;
792 goto exit_unlock;
793 }
794
795 class_incref(obd, "export", export);
796 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
797 export->exp_obd->obd_num_exports++;
798 spin_unlock(&obd->obd_dev_lock);
799 cfs_hash_putref(hash);
800 return export;
801
802 exit_unlock:
803 spin_unlock(&obd->obd_dev_lock);
804 exit_err:
805 if (hash)
806 cfs_hash_putref(hash);
807 class_handle_unhash(&export->exp_handle);
808 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
809 obd_destroy_export(export);
810 kfree(export);
811 return ERR_PTR(rc);
812 }
813 EXPORT_SYMBOL(class_new_export);
814
class_unlink_export(struct obd_export * exp)815 void class_unlink_export(struct obd_export *exp)
816 {
817 class_handle_unhash(&exp->exp_handle);
818
819 spin_lock(&exp->exp_obd->obd_dev_lock);
820 /* delete an uuid-export hashitem from hashtables */
821 if (!hlist_unhashed(&exp->exp_uuid_hash))
822 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
823 &exp->exp_client_uuid,
824 &exp->exp_uuid_hash);
825
826 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
827 exp->exp_obd->obd_num_exports--;
828 spin_unlock(&exp->exp_obd->obd_dev_lock);
829 class_export_put(exp);
830 }
831 EXPORT_SYMBOL(class_unlink_export);
832
833 /* Import management functions */
class_import_destroy(struct obd_import * imp)834 static void class_import_destroy(struct obd_import *imp)
835 {
836 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
837 imp->imp_obd->obd_name);
838
839 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
840
841 ptlrpc_put_connection_superhack(imp->imp_connection);
842
843 while (!list_empty(&imp->imp_conn_list)) {
844 struct obd_import_conn *imp_conn;
845
846 imp_conn = list_entry(imp->imp_conn_list.next,
847 struct obd_import_conn, oic_item);
848 list_del_init(&imp_conn->oic_item);
849 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
850 kfree(imp_conn);
851 }
852
853 LASSERT(!imp->imp_sec);
854 class_decref(imp->imp_obd, "import", imp);
855 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
856 }
857
import_handle_addref(void * import)858 static void import_handle_addref(void *import)
859 {
860 class_import_get(import);
861 }
862
863 static struct portals_handle_ops import_handle_ops = {
864 .hop_addref = import_handle_addref,
865 .hop_free = NULL,
866 };
867
class_import_get(struct obd_import * import)868 struct obd_import *class_import_get(struct obd_import *import)
869 {
870 atomic_inc(&import->imp_refcount);
871 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
872 atomic_read(&import->imp_refcount),
873 import->imp_obd->obd_name);
874 return import;
875 }
876 EXPORT_SYMBOL(class_import_get);
877
class_import_put(struct obd_import * imp)878 void class_import_put(struct obd_import *imp)
879 {
880 LASSERT(list_empty(&imp->imp_zombie_chain));
881 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
882
883 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
884 atomic_read(&imp->imp_refcount) - 1,
885 imp->imp_obd->obd_name);
886
887 if (atomic_dec_and_test(&imp->imp_refcount)) {
888 CDEBUG(D_INFO, "final put import %p\n", imp);
889 obd_zombie_import_add(imp);
890 }
891
892 /* catch possible import put race */
893 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
894 }
895 EXPORT_SYMBOL(class_import_put);
896
init_imp_at(struct imp_at * at)897 static void init_imp_at(struct imp_at *at)
898 {
899 int i;
900
901 at_init(&at->iat_net_latency, 0, 0);
902 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
903 /* max service estimates are tracked on the server side, so
904 don't use the AT history here, just use the last reported
905 val. (But keep hist for proc histogram, worst_ever) */
906 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
907 AT_FLG_NOHIST);
908 }
909 }
910
class_new_import(struct obd_device * obd)911 struct obd_import *class_new_import(struct obd_device *obd)
912 {
913 struct obd_import *imp;
914
915 imp = kzalloc(sizeof(*imp), GFP_NOFS);
916 if (!imp)
917 return NULL;
918
919 INIT_LIST_HEAD(&imp->imp_pinger_chain);
920 INIT_LIST_HEAD(&imp->imp_zombie_chain);
921 INIT_LIST_HEAD(&imp->imp_replay_list);
922 INIT_LIST_HEAD(&imp->imp_sending_list);
923 INIT_LIST_HEAD(&imp->imp_delayed_list);
924 INIT_LIST_HEAD(&imp->imp_committed_list);
925 imp->imp_replay_cursor = &imp->imp_committed_list;
926 spin_lock_init(&imp->imp_lock);
927 imp->imp_last_success_conn = 0;
928 imp->imp_state = LUSTRE_IMP_NEW;
929 imp->imp_obd = class_incref(obd, "import", imp);
930 mutex_init(&imp->imp_sec_mutex);
931 init_waitqueue_head(&imp->imp_recovery_waitq);
932
933 atomic_set(&imp->imp_refcount, 2);
934 atomic_set(&imp->imp_unregistering, 0);
935 atomic_set(&imp->imp_inflight, 0);
936 atomic_set(&imp->imp_replay_inflight, 0);
937 atomic_set(&imp->imp_inval_count, 0);
938 INIT_LIST_HEAD(&imp->imp_conn_list);
939 INIT_LIST_HEAD(&imp->imp_handle.h_link);
940 class_handle_hash(&imp->imp_handle, &import_handle_ops);
941 init_imp_at(&imp->imp_at);
942
943 /* the default magic is V2, will be used in connect RPC, and
944 * then adjusted according to the flags in request/reply. */
945 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
946
947 return imp;
948 }
949 EXPORT_SYMBOL(class_new_import);
950
class_destroy_import(struct obd_import * import)951 void class_destroy_import(struct obd_import *import)
952 {
953 LASSERT(import != NULL);
954 LASSERT(import != LP_POISON);
955
956 class_handle_unhash(&import->imp_handle);
957
958 spin_lock(&import->imp_lock);
959 import->imp_generation++;
960 spin_unlock(&import->imp_lock);
961 class_import_put(import);
962 }
963 EXPORT_SYMBOL(class_destroy_import);
964
965 #if LUSTRE_TRACKS_LOCK_EXP_REFS
966
__class_export_add_lock_ref(struct obd_export * exp,struct ldlm_lock * lock)967 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
968 {
969 spin_lock(&exp->exp_locks_list_guard);
970
971 LASSERT(lock->l_exp_refs_nr >= 0);
972
973 if (lock->l_exp_refs_target != NULL &&
974 lock->l_exp_refs_target != exp) {
975 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
976 exp, lock, lock->l_exp_refs_target);
977 }
978 if ((lock->l_exp_refs_nr++) == 0) {
979 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
980 lock->l_exp_refs_target = exp;
981 }
982 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
983 lock, exp, lock->l_exp_refs_nr);
984 spin_unlock(&exp->exp_locks_list_guard);
985 }
986 EXPORT_SYMBOL(__class_export_add_lock_ref);
987
__class_export_del_lock_ref(struct obd_export * exp,struct ldlm_lock * lock)988 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
989 {
990 spin_lock(&exp->exp_locks_list_guard);
991 LASSERT(lock->l_exp_refs_nr > 0);
992 if (lock->l_exp_refs_target != exp) {
993 LCONSOLE_WARN("lock %p, mismatching export pointers: %p, %p\n",
994 lock, lock->l_exp_refs_target, exp);
995 }
996 if (-- lock->l_exp_refs_nr == 0) {
997 list_del_init(&lock->l_exp_refs_link);
998 lock->l_exp_refs_target = NULL;
999 }
1000 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1001 lock, exp, lock->l_exp_refs_nr);
1002 spin_unlock(&exp->exp_locks_list_guard);
1003 }
1004 EXPORT_SYMBOL(__class_export_del_lock_ref);
1005 #endif
1006
1007 /* A connection defines an export context in which preallocation can
1008 be managed. This releases the export pointer reference, and returns
1009 the export handle, so the export refcount is 1 when this function
1010 returns. */
class_connect(struct lustre_handle * conn,struct obd_device * obd,struct obd_uuid * cluuid)1011 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1012 struct obd_uuid *cluuid)
1013 {
1014 struct obd_export *export;
1015
1016 LASSERT(conn != NULL);
1017 LASSERT(obd != NULL);
1018 LASSERT(cluuid != NULL);
1019
1020 export = class_new_export(obd, cluuid);
1021 if (IS_ERR(export))
1022 return PTR_ERR(export);
1023
1024 conn->cookie = export->exp_handle.h_cookie;
1025 class_export_put(export);
1026
1027 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1028 cluuid->uuid, conn->cookie);
1029 return 0;
1030 }
1031 EXPORT_SYMBOL(class_connect);
1032
1033 /* This function removes 1-3 references from the export:
1034 * 1 - for export pointer passed
1035 * and if disconnect really need
1036 * 2 - removing from hash
1037 * 3 - in client_unlink_export
1038 * The export pointer passed to this function can destroyed */
class_disconnect(struct obd_export * export)1039 int class_disconnect(struct obd_export *export)
1040 {
1041 int already_disconnected;
1042
1043 if (!export) {
1044 CWARN("attempting to free NULL export %p\n", export);
1045 return -EINVAL;
1046 }
1047
1048 spin_lock(&export->exp_lock);
1049 already_disconnected = export->exp_disconnected;
1050 export->exp_disconnected = 1;
1051 spin_unlock(&export->exp_lock);
1052
1053 /* class_cleanup(), abort_recovery(), and class_fail_export()
1054 * all end up in here, and if any of them race we shouldn't
1055 * call extra class_export_puts(). */
1056 if (already_disconnected)
1057 goto no_disconn;
1058
1059 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1060 export->exp_handle.h_cookie);
1061
1062 class_unlink_export(export);
1063 no_disconn:
1064 class_export_put(export);
1065 return 0;
1066 }
1067 EXPORT_SYMBOL(class_disconnect);
1068
class_fail_export(struct obd_export * exp)1069 void class_fail_export(struct obd_export *exp)
1070 {
1071 int rc, already_failed;
1072
1073 spin_lock(&exp->exp_lock);
1074 already_failed = exp->exp_failed;
1075 exp->exp_failed = 1;
1076 spin_unlock(&exp->exp_lock);
1077
1078 if (already_failed) {
1079 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1080 exp, exp->exp_client_uuid.uuid);
1081 return;
1082 }
1083
1084 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1085 exp, exp->exp_client_uuid.uuid);
1086
1087 if (obd_dump_on_timeout)
1088 libcfs_debug_dumplog();
1089
1090 /* need for safe call CDEBUG after obd_disconnect */
1091 class_export_get(exp);
1092
1093 /* Most callers into obd_disconnect are removing their own reference
1094 * (request, for example) in addition to the one from the hash table.
1095 * We don't have such a reference here, so make one. */
1096 class_export_get(exp);
1097 rc = obd_disconnect(exp);
1098 if (rc)
1099 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1100 else
1101 CDEBUG(D_HA, "disconnected export %p/%s\n",
1102 exp, exp->exp_client_uuid.uuid);
1103 class_export_put(exp);
1104 }
1105 EXPORT_SYMBOL(class_fail_export);
1106
1107 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1108 void (*class_export_dump_hook)(struct obd_export *) = NULL;
1109 EXPORT_SYMBOL(class_export_dump_hook);
1110 #endif
1111
1112 /* Total amount of zombies to be destroyed */
1113 static int zombies_count;
1114
1115 /**
1116 * kill zombie imports and exports
1117 */
obd_zombie_impexp_cull(void)1118 static void obd_zombie_impexp_cull(void)
1119 {
1120 struct obd_import *import;
1121 struct obd_export *export;
1122
1123 do {
1124 spin_lock(&obd_zombie_impexp_lock);
1125
1126 import = NULL;
1127 if (!list_empty(&obd_zombie_imports)) {
1128 import = list_entry(obd_zombie_imports.next,
1129 struct obd_import,
1130 imp_zombie_chain);
1131 list_del_init(&import->imp_zombie_chain);
1132 }
1133
1134 export = NULL;
1135 if (!list_empty(&obd_zombie_exports)) {
1136 export = list_entry(obd_zombie_exports.next,
1137 struct obd_export,
1138 exp_obd_chain);
1139 list_del_init(&export->exp_obd_chain);
1140 }
1141
1142 spin_unlock(&obd_zombie_impexp_lock);
1143
1144 if (import != NULL) {
1145 class_import_destroy(import);
1146 spin_lock(&obd_zombie_impexp_lock);
1147 zombies_count--;
1148 spin_unlock(&obd_zombie_impexp_lock);
1149 }
1150
1151 if (export != NULL) {
1152 class_export_destroy(export);
1153 spin_lock(&obd_zombie_impexp_lock);
1154 zombies_count--;
1155 spin_unlock(&obd_zombie_impexp_lock);
1156 }
1157
1158 cond_resched();
1159 } while (import != NULL || export != NULL);
1160 }
1161
1162 static struct completion obd_zombie_start;
1163 static struct completion obd_zombie_stop;
1164 static unsigned long obd_zombie_flags;
1165 static wait_queue_head_t obd_zombie_waitq;
1166 static pid_t obd_zombie_pid;
1167
1168 enum {
1169 OBD_ZOMBIE_STOP = 0x0001,
1170 };
1171
1172 /**
1173 * check for work for kill zombie import/export thread.
1174 */
obd_zombie_impexp_check(void * arg)1175 static int obd_zombie_impexp_check(void *arg)
1176 {
1177 int rc;
1178
1179 spin_lock(&obd_zombie_impexp_lock);
1180 rc = (zombies_count == 0) &&
1181 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1182 spin_unlock(&obd_zombie_impexp_lock);
1183
1184 return rc;
1185 }
1186
1187 /**
1188 * Add export to the obd_zombie thread and notify it.
1189 */
obd_zombie_export_add(struct obd_export * exp)1190 static void obd_zombie_export_add(struct obd_export *exp)
1191 {
1192 spin_lock(&exp->exp_obd->obd_dev_lock);
1193 LASSERT(!list_empty(&exp->exp_obd_chain));
1194 list_del_init(&exp->exp_obd_chain);
1195 spin_unlock(&exp->exp_obd->obd_dev_lock);
1196 spin_lock(&obd_zombie_impexp_lock);
1197 zombies_count++;
1198 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1199 spin_unlock(&obd_zombie_impexp_lock);
1200
1201 obd_zombie_impexp_notify();
1202 }
1203
1204 /**
1205 * Add import to the obd_zombie thread and notify it.
1206 */
obd_zombie_import_add(struct obd_import * imp)1207 static void obd_zombie_import_add(struct obd_import *imp)
1208 {
1209 LASSERT(!imp->imp_sec);
1210 spin_lock(&obd_zombie_impexp_lock);
1211 LASSERT(list_empty(&imp->imp_zombie_chain));
1212 zombies_count++;
1213 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1214 spin_unlock(&obd_zombie_impexp_lock);
1215
1216 obd_zombie_impexp_notify();
1217 }
1218
1219 /**
1220 * notify import/export destroy thread about new zombie.
1221 */
obd_zombie_impexp_notify(void)1222 static void obd_zombie_impexp_notify(void)
1223 {
1224 /*
1225 * Make sure obd_zombie_impexp_thread get this notification.
1226 * It is possible this signal only get by obd_zombie_barrier, and
1227 * barrier gulps this notification and sleeps away and hangs ensues
1228 */
1229 wake_up_all(&obd_zombie_waitq);
1230 }
1231
1232 /**
1233 * check whether obd_zombie is idle
1234 */
obd_zombie_is_idle(void)1235 static int obd_zombie_is_idle(void)
1236 {
1237 int rc;
1238
1239 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1240 spin_lock(&obd_zombie_impexp_lock);
1241 rc = (zombies_count == 0);
1242 spin_unlock(&obd_zombie_impexp_lock);
1243 return rc;
1244 }
1245
1246 /**
1247 * wait when obd_zombie import/export queues become empty
1248 */
obd_zombie_barrier(void)1249 void obd_zombie_barrier(void)
1250 {
1251 struct l_wait_info lwi = { 0 };
1252
1253 if (obd_zombie_pid == current_pid())
1254 /* don't wait for myself */
1255 return;
1256 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1257 }
1258 EXPORT_SYMBOL(obd_zombie_barrier);
1259
1260 /**
1261 * destroy zombie export/import thread.
1262 */
obd_zombie_impexp_thread(void * unused)1263 static int obd_zombie_impexp_thread(void *unused)
1264 {
1265 unshare_fs_struct();
1266 complete(&obd_zombie_start);
1267
1268 obd_zombie_pid = current_pid();
1269
1270 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1271 struct l_wait_info lwi = { 0 };
1272
1273 l_wait_event(obd_zombie_waitq,
1274 !obd_zombie_impexp_check(NULL), &lwi);
1275 obd_zombie_impexp_cull();
1276
1277 /*
1278 * Notify obd_zombie_barrier callers that queues
1279 * may be empty.
1280 */
1281 wake_up(&obd_zombie_waitq);
1282 }
1283
1284 complete(&obd_zombie_stop);
1285
1286 return 0;
1287 }
1288
1289 /**
1290 * start destroy zombie import/export thread
1291 */
obd_zombie_impexp_init(void)1292 int obd_zombie_impexp_init(void)
1293 {
1294 struct task_struct *task;
1295
1296 INIT_LIST_HEAD(&obd_zombie_imports);
1297 INIT_LIST_HEAD(&obd_zombie_exports);
1298 spin_lock_init(&obd_zombie_impexp_lock);
1299 init_completion(&obd_zombie_start);
1300 init_completion(&obd_zombie_stop);
1301 init_waitqueue_head(&obd_zombie_waitq);
1302 obd_zombie_pid = 0;
1303
1304 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1305 if (IS_ERR(task))
1306 return PTR_ERR(task);
1307
1308 wait_for_completion(&obd_zombie_start);
1309 return 0;
1310 }
1311
1312 /**
1313 * stop destroy zombie import/export thread
1314 */
obd_zombie_impexp_stop(void)1315 void obd_zombie_impexp_stop(void)
1316 {
1317 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1318 obd_zombie_impexp_notify();
1319 wait_for_completion(&obd_zombie_stop);
1320 }
1321