1 /*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26 /*
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32 /*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 *
36 * lustre/obdclass/genops.c
37 *
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
40 */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include "../include/obd_class.h"
44 #include "../include/lprocfs_status.h"
45
46 extern struct list_head obd_types;
47 spinlock_t obd_types_lock;
48
49 struct kmem_cache *obd_device_cachep;
50 struct kmem_cache *obdo_cachep;
51 EXPORT_SYMBOL(obdo_cachep);
52 struct kmem_cache *import_cachep;
53
54 struct list_head obd_zombie_imports;
55 struct list_head obd_zombie_exports;
56 spinlock_t obd_zombie_impexp_lock;
57 static void obd_zombie_impexp_notify(void);
58 static void obd_zombie_export_add(struct obd_export *exp);
59 static void obd_zombie_import_add(struct obd_import *imp);
60 static void print_export_data(struct obd_export *exp,
61 const char *status, int locks);
62
63 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
64 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
65
66 /*
67 * support functions: we could use inter-module communication, but this
68 * is more portable to other OS's
69 */
obd_device_alloc(void)70 static struct obd_device *obd_device_alloc(void)
71 {
72 struct obd_device *obd;
73
74 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
75 if (obd != NULL) {
76 obd->obd_magic = OBD_DEVICE_MAGIC;
77 }
78 return obd;
79 }
80
obd_device_free(struct obd_device * obd)81 static void obd_device_free(struct obd_device *obd)
82 {
83 LASSERT(obd != NULL);
84 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
85 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
86 if (obd->obd_namespace != NULL) {
87 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
88 obd, obd->obd_namespace, obd->obd_force);
89 LBUG();
90 }
91 lu_ref_fini(&obd->obd_reference);
92 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
93 }
94
class_search_type(const char * name)95 struct obd_type *class_search_type(const char *name)
96 {
97 struct list_head *tmp;
98 struct obd_type *type;
99
100 spin_lock(&obd_types_lock);
101 list_for_each(tmp, &obd_types) {
102 type = list_entry(tmp, struct obd_type, typ_chain);
103 if (strcmp(type->typ_name, name) == 0) {
104 spin_unlock(&obd_types_lock);
105 return type;
106 }
107 }
108 spin_unlock(&obd_types_lock);
109 return NULL;
110 }
111 EXPORT_SYMBOL(class_search_type);
112
class_get_type(const char * name)113 struct obd_type *class_get_type(const char *name)
114 {
115 struct obd_type *type = class_search_type(name);
116
117 if (!type) {
118 const char *modname = name;
119
120 if (strcmp(modname, "obdfilter") == 0)
121 modname = "ofd";
122
123 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
124 modname = LUSTRE_OSP_NAME;
125
126 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
127 modname = LUSTRE_MDT_NAME;
128
129 if (!request_module("%s", modname)) {
130 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131 type = class_search_type(name);
132 } else {
133 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134 modname);
135 }
136 }
137 if (type) {
138 spin_lock(&type->obd_type_lock);
139 type->typ_refcnt++;
140 try_module_get(type->typ_dt_ops->o_owner);
141 spin_unlock(&type->obd_type_lock);
142 }
143 return type;
144 }
145 EXPORT_SYMBOL(class_get_type);
146
class_put_type(struct obd_type * type)147 void class_put_type(struct obd_type *type)
148 {
149 LASSERT(type);
150 spin_lock(&type->obd_type_lock);
151 type->typ_refcnt--;
152 module_put(type->typ_dt_ops->o_owner);
153 spin_unlock(&type->obd_type_lock);
154 }
155 EXPORT_SYMBOL(class_put_type);
156
157 #define CLASS_MAX_NAME 1024
158
class_register_type(struct obd_ops * dt_ops,struct md_ops * md_ops,struct lprocfs_vars * vars,const char * name,struct lu_device_type * ldt)159 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
160 struct lprocfs_vars *vars, const char *name,
161 struct lu_device_type *ldt)
162 {
163 struct obd_type *type;
164 int rc = 0;
165
166 /* sanity check */
167 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
168
169 if (class_search_type(name)) {
170 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
171 return -EEXIST;
172 }
173
174 rc = -ENOMEM;
175 OBD_ALLOC(type, sizeof(*type));
176 if (type == NULL)
177 return rc;
178
179 OBD_ALLOC_PTR(type->typ_dt_ops);
180 OBD_ALLOC_PTR(type->typ_md_ops);
181 OBD_ALLOC(type->typ_name, strlen(name) + 1);
182
183 if (type->typ_dt_ops == NULL ||
184 type->typ_md_ops == NULL ||
185 type->typ_name == NULL)
186 goto failed;
187
188 *(type->typ_dt_ops) = *dt_ops;
189 /* md_ops is optional */
190 if (md_ops)
191 *(type->typ_md_ops) = *md_ops;
192 strcpy(type->typ_name, name);
193 spin_lock_init(&type->obd_type_lock);
194
195 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
196 vars, type);
197 if (IS_ERR(type->typ_procroot)) {
198 rc = PTR_ERR(type->typ_procroot);
199 type->typ_procroot = NULL;
200 goto failed;
201 }
202
203 if (ldt != NULL) {
204 type->typ_lu = ldt;
205 rc = lu_device_type_init(ldt);
206 if (rc != 0)
207 goto failed;
208 }
209
210 spin_lock(&obd_types_lock);
211 list_add(&type->typ_chain, &obd_types);
212 spin_unlock(&obd_types_lock);
213
214 return 0;
215
216 failed:
217 if (type->typ_name != NULL)
218 OBD_FREE(type->typ_name, strlen(name) + 1);
219 if (type->typ_md_ops != NULL)
220 OBD_FREE_PTR(type->typ_md_ops);
221 if (type->typ_dt_ops != NULL)
222 OBD_FREE_PTR(type->typ_dt_ops);
223 OBD_FREE(type, sizeof(*type));
224 return rc;
225 }
226 EXPORT_SYMBOL(class_register_type);
227
class_unregister_type(const char * name)228 int class_unregister_type(const char *name)
229 {
230 struct obd_type *type = class_search_type(name);
231
232 if (!type) {
233 CERROR("unknown obd type\n");
234 return -EINVAL;
235 }
236
237 if (type->typ_refcnt) {
238 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
239 /* This is a bad situation, let's make the best of it */
240 /* Remove ops, but leave the name for debugging */
241 OBD_FREE_PTR(type->typ_dt_ops);
242 OBD_FREE_PTR(type->typ_md_ops);
243 return -EBUSY;
244 }
245
246 if (type->typ_procroot) {
247 lprocfs_remove(&type->typ_procroot);
248 }
249
250 if (type->typ_lu)
251 lu_device_type_fini(type->typ_lu);
252
253 spin_lock(&obd_types_lock);
254 list_del(&type->typ_chain);
255 spin_unlock(&obd_types_lock);
256 OBD_FREE(type->typ_name, strlen(name) + 1);
257 if (type->typ_dt_ops != NULL)
258 OBD_FREE_PTR(type->typ_dt_ops);
259 if (type->typ_md_ops != NULL)
260 OBD_FREE_PTR(type->typ_md_ops);
261 OBD_FREE(type, sizeof(*type));
262 return 0;
263 } /* class_unregister_type */
264 EXPORT_SYMBOL(class_unregister_type);
265
266 /**
267 * Create a new obd device.
268 *
269 * Find an empty slot in ::obd_devs[], create a new obd device in it.
270 *
271 * \param[in] type_name obd device type string.
272 * \param[in] name obd device name.
273 *
274 * \retval NULL if create fails, otherwise return the obd device
275 * pointer created.
276 */
class_newdev(const char * type_name,const char * name)277 struct obd_device *class_newdev(const char *type_name, const char *name)
278 {
279 struct obd_device *result = NULL;
280 struct obd_device *newdev;
281 struct obd_type *type = NULL;
282 int i;
283 int new_obd_minor = 0;
284
285 if (strlen(name) >= MAX_OBD_NAME) {
286 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
287 return ERR_PTR(-EINVAL);
288 }
289
290 type = class_get_type(type_name);
291 if (type == NULL){
292 CERROR("OBD: unknown type: %s\n", type_name);
293 return ERR_PTR(-ENODEV);
294 }
295
296 newdev = obd_device_alloc();
297 if (newdev == NULL) {
298 result = ERR_PTR(-ENOMEM);
299 goto out_type;
300 }
301
302 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
303
304 write_lock(&obd_dev_lock);
305 for (i = 0; i < class_devno_max(); i++) {
306 struct obd_device *obd = class_num2obd(i);
307
308 if (obd && (strcmp(name, obd->obd_name) == 0)) {
309 CERROR("Device %s already exists at %d, won't add\n",
310 name, i);
311 if (result) {
312 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
313 "%p obd_magic %08x != %08x\n", result,
314 result->obd_magic, OBD_DEVICE_MAGIC);
315 LASSERTF(result->obd_minor == new_obd_minor,
316 "%p obd_minor %d != %d\n", result,
317 result->obd_minor, new_obd_minor);
318
319 obd_devs[result->obd_minor] = NULL;
320 result->obd_name[0]='\0';
321 }
322 result = ERR_PTR(-EEXIST);
323 break;
324 }
325 if (!result && !obd) {
326 result = newdev;
327 result->obd_minor = i;
328 new_obd_minor = i;
329 result->obd_type = type;
330 strncpy(result->obd_name, name,
331 sizeof(result->obd_name) - 1);
332 obd_devs[i] = result;
333 }
334 }
335 write_unlock(&obd_dev_lock);
336
337 if (result == NULL && i >= class_devno_max()) {
338 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
339 class_devno_max());
340 result = ERR_PTR(-EOVERFLOW);
341 goto out;
342 }
343
344 if (IS_ERR(result))
345 goto out;
346
347 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
348 result->obd_name, result);
349
350 return result;
351 out:
352 obd_device_free(newdev);
353 out_type:
354 class_put_type(type);
355 return result;
356 }
357
class_release_dev(struct obd_device * obd)358 void class_release_dev(struct obd_device *obd)
359 {
360 struct obd_type *obd_type = obd->obd_type;
361
362 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
363 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
364 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
365 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
366 LASSERT(obd_type != NULL);
367
368 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
369 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
370
371 write_lock(&obd_dev_lock);
372 obd_devs[obd->obd_minor] = NULL;
373 write_unlock(&obd_dev_lock);
374 obd_device_free(obd);
375
376 class_put_type(obd_type);
377 }
378
class_name2dev(const char * name)379 int class_name2dev(const char *name)
380 {
381 int i;
382
383 if (!name)
384 return -1;
385
386 read_lock(&obd_dev_lock);
387 for (i = 0; i < class_devno_max(); i++) {
388 struct obd_device *obd = class_num2obd(i);
389
390 if (obd && strcmp(name, obd->obd_name) == 0) {
391 /* Make sure we finished attaching before we give
392 out any references */
393 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
394 if (obd->obd_attached) {
395 read_unlock(&obd_dev_lock);
396 return i;
397 }
398 break;
399 }
400 }
401 read_unlock(&obd_dev_lock);
402
403 return -1;
404 }
405 EXPORT_SYMBOL(class_name2dev);
406
class_name2obd(const char * name)407 struct obd_device *class_name2obd(const char *name)
408 {
409 int dev = class_name2dev(name);
410
411 if (dev < 0 || dev > class_devno_max())
412 return NULL;
413 return class_num2obd(dev);
414 }
415 EXPORT_SYMBOL(class_name2obd);
416
class_uuid2dev(struct obd_uuid * uuid)417 int class_uuid2dev(struct obd_uuid *uuid)
418 {
419 int i;
420
421 read_lock(&obd_dev_lock);
422 for (i = 0; i < class_devno_max(); i++) {
423 struct obd_device *obd = class_num2obd(i);
424
425 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
426 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
427 read_unlock(&obd_dev_lock);
428 return i;
429 }
430 }
431 read_unlock(&obd_dev_lock);
432
433 return -1;
434 }
435 EXPORT_SYMBOL(class_uuid2dev);
436
class_uuid2obd(struct obd_uuid * uuid)437 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
438 {
439 int dev = class_uuid2dev(uuid);
440 if (dev < 0)
441 return NULL;
442 return class_num2obd(dev);
443 }
444 EXPORT_SYMBOL(class_uuid2obd);
445
446 /**
447 * Get obd device from ::obd_devs[]
448 *
449 * \param num [in] array index
450 *
451 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
452 * otherwise return the obd device there.
453 */
class_num2obd(int num)454 struct obd_device *class_num2obd(int num)
455 {
456 struct obd_device *obd = NULL;
457
458 if (num < class_devno_max()) {
459 obd = obd_devs[num];
460 if (obd == NULL)
461 return NULL;
462
463 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
464 "%p obd_magic %08x != %08x\n",
465 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
466 LASSERTF(obd->obd_minor == num,
467 "%p obd_minor %0d != %0d\n",
468 obd, obd->obd_minor, num);
469 }
470
471 return obd;
472 }
473 EXPORT_SYMBOL(class_num2obd);
474
475 /**
476 * Get obd devices count. Device in any
477 * state are counted
478 * \retval obd device count
479 */
get_devices_count(void)480 int get_devices_count(void)
481 {
482 int index, max_index = class_devno_max(), dev_count = 0;
483
484 read_lock(&obd_dev_lock);
485 for (index = 0; index <= max_index; index++) {
486 struct obd_device *obd = class_num2obd(index);
487 if (obd != NULL)
488 dev_count++;
489 }
490 read_unlock(&obd_dev_lock);
491
492 return dev_count;
493 }
494 EXPORT_SYMBOL(get_devices_count);
495
class_obd_list(void)496 void class_obd_list(void)
497 {
498 char *status;
499 int i;
500
501 read_lock(&obd_dev_lock);
502 for (i = 0; i < class_devno_max(); i++) {
503 struct obd_device *obd = class_num2obd(i);
504
505 if (obd == NULL)
506 continue;
507 if (obd->obd_stopping)
508 status = "ST";
509 else if (obd->obd_set_up)
510 status = "UP";
511 else if (obd->obd_attached)
512 status = "AT";
513 else
514 status = "--";
515 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
516 i, status, obd->obd_type->typ_name,
517 obd->obd_name, obd->obd_uuid.uuid,
518 atomic_read(&obd->obd_refcount));
519 }
520 read_unlock(&obd_dev_lock);
521 return;
522 }
523
524 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
525 specified, then only the client with that uuid is returned,
526 otherwise any client connected to the tgt is returned. */
class_find_client_obd(struct obd_uuid * tgt_uuid,const char * typ_name,struct obd_uuid * grp_uuid)527 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
528 const char * typ_name,
529 struct obd_uuid *grp_uuid)
530 {
531 int i;
532
533 read_lock(&obd_dev_lock);
534 for (i = 0; i < class_devno_max(); i++) {
535 struct obd_device *obd = class_num2obd(i);
536
537 if (obd == NULL)
538 continue;
539 if ((strncmp(obd->obd_type->typ_name, typ_name,
540 strlen(typ_name)) == 0)) {
541 if (obd_uuid_equals(tgt_uuid,
542 &obd->u.cli.cl_target_uuid) &&
543 ((grp_uuid)? obd_uuid_equals(grp_uuid,
544 &obd->obd_uuid) : 1)) {
545 read_unlock(&obd_dev_lock);
546 return obd;
547 }
548 }
549 }
550 read_unlock(&obd_dev_lock);
551
552 return NULL;
553 }
554 EXPORT_SYMBOL(class_find_client_obd);
555
556 /* Iterate the obd_device list looking devices have grp_uuid. Start
557 searching at *next, and if a device is found, the next index to look
558 at is saved in *next. If next is NULL, then the first matching device
559 will always be returned. */
class_devices_in_group(struct obd_uuid * grp_uuid,int * next)560 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
561 {
562 int i;
563
564 if (next == NULL)
565 i = 0;
566 else if (*next >= 0 && *next < class_devno_max())
567 i = *next;
568 else
569 return NULL;
570
571 read_lock(&obd_dev_lock);
572 for (; i < class_devno_max(); i++) {
573 struct obd_device *obd = class_num2obd(i);
574
575 if (obd == NULL)
576 continue;
577 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
578 if (next != NULL)
579 *next = i+1;
580 read_unlock(&obd_dev_lock);
581 return obd;
582 }
583 }
584 read_unlock(&obd_dev_lock);
585
586 return NULL;
587 }
588 EXPORT_SYMBOL(class_devices_in_group);
589
590 /**
591 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
592 * adjust sptlrpc settings accordingly.
593 */
class_notify_sptlrpc_conf(const char * fsname,int namelen)594 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
595 {
596 struct obd_device *obd;
597 const char *type;
598 int i, rc = 0, rc2;
599
600 LASSERT(namelen > 0);
601
602 read_lock(&obd_dev_lock);
603 for (i = 0; i < class_devno_max(); i++) {
604 obd = class_num2obd(i);
605
606 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
607 continue;
608
609 /* only notify mdc, osc, mdt, ost */
610 type = obd->obd_type->typ_name;
611 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
612 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
613 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
614 strcmp(type, LUSTRE_OST_NAME) != 0)
615 continue;
616
617 if (strncmp(obd->obd_name, fsname, namelen))
618 continue;
619
620 class_incref(obd, __func__, obd);
621 read_unlock(&obd_dev_lock);
622 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
623 sizeof(KEY_SPTLRPC_CONF),
624 KEY_SPTLRPC_CONF, 0, NULL, NULL);
625 rc = rc ? rc : rc2;
626 class_decref(obd, __func__, obd);
627 read_lock(&obd_dev_lock);
628 }
629 read_unlock(&obd_dev_lock);
630 return rc;
631 }
632 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
633
obd_cleanup_caches(void)634 void obd_cleanup_caches(void)
635 {
636 if (obd_device_cachep) {
637 kmem_cache_destroy(obd_device_cachep);
638 obd_device_cachep = NULL;
639 }
640 if (obdo_cachep) {
641 kmem_cache_destroy(obdo_cachep);
642 obdo_cachep = NULL;
643 }
644 if (import_cachep) {
645 kmem_cache_destroy(import_cachep);
646 import_cachep = NULL;
647 }
648 if (capa_cachep) {
649 kmem_cache_destroy(capa_cachep);
650 capa_cachep = NULL;
651 }
652 }
653
obd_init_caches(void)654 int obd_init_caches(void)
655 {
656 LASSERT(obd_device_cachep == NULL);
657 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
658 sizeof(struct obd_device),
659 0, 0, NULL);
660 if (!obd_device_cachep)
661 goto out;
662
663 LASSERT(obdo_cachep == NULL);
664 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
665 0, 0, NULL);
666 if (!obdo_cachep)
667 goto out;
668
669 LASSERT(import_cachep == NULL);
670 import_cachep = kmem_cache_create("ll_import_cache",
671 sizeof(struct obd_import),
672 0, 0, NULL);
673 if (!import_cachep)
674 goto out;
675
676 LASSERT(capa_cachep == NULL);
677 capa_cachep = kmem_cache_create("capa_cache",
678 sizeof(struct obd_capa), 0, 0, NULL);
679 if (!capa_cachep)
680 goto out;
681
682 return 0;
683 out:
684 obd_cleanup_caches();
685 return -ENOMEM;
686
687 }
688
689 /* map connection to client */
class_conn2export(struct lustre_handle * conn)690 struct obd_export *class_conn2export(struct lustre_handle *conn)
691 {
692 struct obd_export *export;
693
694 if (!conn) {
695 CDEBUG(D_CACHE, "looking for null handle\n");
696 return NULL;
697 }
698
699 if (conn->cookie == -1) { /* this means assign a new connection */
700 CDEBUG(D_CACHE, "want a new connection\n");
701 return NULL;
702 }
703
704 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
705 export = class_handle2object(conn->cookie);
706 return export;
707 }
708 EXPORT_SYMBOL(class_conn2export);
709
class_exp2obd(struct obd_export * exp)710 struct obd_device *class_exp2obd(struct obd_export *exp)
711 {
712 if (exp)
713 return exp->exp_obd;
714 return NULL;
715 }
716 EXPORT_SYMBOL(class_exp2obd);
717
class_conn2obd(struct lustre_handle * conn)718 struct obd_device *class_conn2obd(struct lustre_handle *conn)
719 {
720 struct obd_export *export;
721 export = class_conn2export(conn);
722 if (export) {
723 struct obd_device *obd = export->exp_obd;
724 class_export_put(export);
725 return obd;
726 }
727 return NULL;
728 }
729 EXPORT_SYMBOL(class_conn2obd);
730
class_exp2cliimp(struct obd_export * exp)731 struct obd_import *class_exp2cliimp(struct obd_export *exp)
732 {
733 struct obd_device *obd = exp->exp_obd;
734 if (obd == NULL)
735 return NULL;
736 return obd->u.cli.cl_import;
737 }
738 EXPORT_SYMBOL(class_exp2cliimp);
739
class_conn2cliimp(struct lustre_handle * conn)740 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
741 {
742 struct obd_device *obd = class_conn2obd(conn);
743 if (obd == NULL)
744 return NULL;
745 return obd->u.cli.cl_import;
746 }
747 EXPORT_SYMBOL(class_conn2cliimp);
748
749 /* Export management functions */
class_export_destroy(struct obd_export * exp)750 static void class_export_destroy(struct obd_export *exp)
751 {
752 struct obd_device *obd = exp->exp_obd;
753
754 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
755 LASSERT(obd != NULL);
756
757 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
758 exp->exp_client_uuid.uuid, obd->obd_name);
759
760 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
761 if (exp->exp_connection)
762 ptlrpc_put_connection_superhack(exp->exp_connection);
763
764 LASSERT(list_empty(&exp->exp_outstanding_replies));
765 LASSERT(list_empty(&exp->exp_uncommitted_replies));
766 LASSERT(list_empty(&exp->exp_req_replay_queue));
767 LASSERT(list_empty(&exp->exp_hp_rpcs));
768 obd_destroy_export(exp);
769 class_decref(obd, "export", exp);
770
771 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
772 }
773
export_handle_addref(void * export)774 static void export_handle_addref(void *export)
775 {
776 class_export_get(export);
777 }
778
779 static struct portals_handle_ops export_handle_ops = {
780 .hop_addref = export_handle_addref,
781 .hop_free = NULL,
782 };
783
class_export_get(struct obd_export * exp)784 struct obd_export *class_export_get(struct obd_export *exp)
785 {
786 atomic_inc(&exp->exp_refcount);
787 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
788 atomic_read(&exp->exp_refcount));
789 return exp;
790 }
791 EXPORT_SYMBOL(class_export_get);
792
class_export_put(struct obd_export * exp)793 void class_export_put(struct obd_export *exp)
794 {
795 LASSERT(exp != NULL);
796 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
797 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
798 atomic_read(&exp->exp_refcount) - 1);
799
800 if (atomic_dec_and_test(&exp->exp_refcount)) {
801 LASSERT(!list_empty(&exp->exp_obd_chain));
802 CDEBUG(D_IOCTL, "final put %p/%s\n",
803 exp, exp->exp_client_uuid.uuid);
804
805 /* release nid stat refererence */
806 lprocfs_exp_cleanup(exp);
807
808 obd_zombie_export_add(exp);
809 }
810 }
811 EXPORT_SYMBOL(class_export_put);
812
813 /* Creates a new export, adds it to the hash table, and returns a
814 * pointer to it. The refcount is 2: one for the hash reference, and
815 * one for the pointer returned by this function. */
class_new_export(struct obd_device * obd,struct obd_uuid * cluuid)816 struct obd_export *class_new_export(struct obd_device *obd,
817 struct obd_uuid *cluuid)
818 {
819 struct obd_export *export;
820 struct cfs_hash *hash = NULL;
821 int rc = 0;
822
823 OBD_ALLOC_PTR(export);
824 if (!export)
825 return ERR_PTR(-ENOMEM);
826
827 export->exp_conn_cnt = 0;
828 export->exp_lock_hash = NULL;
829 export->exp_flock_hash = NULL;
830 atomic_set(&export->exp_refcount, 2);
831 atomic_set(&export->exp_rpc_count, 0);
832 atomic_set(&export->exp_cb_count, 0);
833 atomic_set(&export->exp_locks_count, 0);
834 #if LUSTRE_TRACKS_LOCK_EXP_REFS
835 INIT_LIST_HEAD(&export->exp_locks_list);
836 spin_lock_init(&export->exp_locks_list_guard);
837 #endif
838 atomic_set(&export->exp_replay_count, 0);
839 export->exp_obd = obd;
840 INIT_LIST_HEAD(&export->exp_outstanding_replies);
841 spin_lock_init(&export->exp_uncommitted_replies_lock);
842 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
843 INIT_LIST_HEAD(&export->exp_req_replay_queue);
844 INIT_LIST_HEAD(&export->exp_handle.h_link);
845 INIT_LIST_HEAD(&export->exp_hp_rpcs);
846 class_handle_hash(&export->exp_handle, &export_handle_ops);
847 export->exp_last_request_time = get_seconds();
848 spin_lock_init(&export->exp_lock);
849 spin_lock_init(&export->exp_rpc_lock);
850 INIT_HLIST_NODE(&export->exp_uuid_hash);
851 INIT_HLIST_NODE(&export->exp_nid_hash);
852 spin_lock_init(&export->exp_bl_list_lock);
853 INIT_LIST_HEAD(&export->exp_bl_list);
854
855 export->exp_sp_peer = LUSTRE_SP_ANY;
856 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
857 export->exp_client_uuid = *cluuid;
858 obd_init_export(export);
859
860 spin_lock(&obd->obd_dev_lock);
861 /* shouldn't happen, but might race */
862 if (obd->obd_stopping) {
863 rc = -ENODEV;
864 goto exit_unlock;
865 }
866
867 hash = cfs_hash_getref(obd->obd_uuid_hash);
868 if (hash == NULL) {
869 rc = -ENODEV;
870 goto exit_unlock;
871 }
872 spin_unlock(&obd->obd_dev_lock);
873
874 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
875 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
876 if (rc != 0) {
877 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
878 obd->obd_name, cluuid->uuid, rc);
879 rc = -EALREADY;
880 goto exit_err;
881 }
882 }
883
884 spin_lock(&obd->obd_dev_lock);
885 if (obd->obd_stopping) {
886 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
887 rc = -ENODEV;
888 goto exit_unlock;
889 }
890
891 class_incref(obd, "export", export);
892 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
893 list_add_tail(&export->exp_obd_chain_timed,
894 &export->exp_obd->obd_exports_timed);
895 export->exp_obd->obd_num_exports++;
896 spin_unlock(&obd->obd_dev_lock);
897 cfs_hash_putref(hash);
898 return export;
899
900 exit_unlock:
901 spin_unlock(&obd->obd_dev_lock);
902 exit_err:
903 if (hash)
904 cfs_hash_putref(hash);
905 class_handle_unhash(&export->exp_handle);
906 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
907 obd_destroy_export(export);
908 OBD_FREE_PTR(export);
909 return ERR_PTR(rc);
910 }
911 EXPORT_SYMBOL(class_new_export);
912
class_unlink_export(struct obd_export * exp)913 void class_unlink_export(struct obd_export *exp)
914 {
915 class_handle_unhash(&exp->exp_handle);
916
917 spin_lock(&exp->exp_obd->obd_dev_lock);
918 /* delete an uuid-export hashitem from hashtables */
919 if (!hlist_unhashed(&exp->exp_uuid_hash))
920 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
921 &exp->exp_client_uuid,
922 &exp->exp_uuid_hash);
923
924 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
925 list_del_init(&exp->exp_obd_chain_timed);
926 exp->exp_obd->obd_num_exports--;
927 spin_unlock(&exp->exp_obd->obd_dev_lock);
928 class_export_put(exp);
929 }
930 EXPORT_SYMBOL(class_unlink_export);
931
932 /* Import management functions */
class_import_destroy(struct obd_import * imp)933 void class_import_destroy(struct obd_import *imp)
934 {
935 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
936 imp->imp_obd->obd_name);
937
938 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
939
940 ptlrpc_put_connection_superhack(imp->imp_connection);
941
942 while (!list_empty(&imp->imp_conn_list)) {
943 struct obd_import_conn *imp_conn;
944
945 imp_conn = list_entry(imp->imp_conn_list.next,
946 struct obd_import_conn, oic_item);
947 list_del_init(&imp_conn->oic_item);
948 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
949 OBD_FREE(imp_conn, sizeof(*imp_conn));
950 }
951
952 LASSERT(imp->imp_sec == NULL);
953 class_decref(imp->imp_obd, "import", imp);
954 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
955 }
956
import_handle_addref(void * import)957 static void import_handle_addref(void *import)
958 {
959 class_import_get(import);
960 }
961
962 static struct portals_handle_ops import_handle_ops = {
963 .hop_addref = import_handle_addref,
964 .hop_free = NULL,
965 };
966
class_import_get(struct obd_import * import)967 struct obd_import *class_import_get(struct obd_import *import)
968 {
969 atomic_inc(&import->imp_refcount);
970 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
971 atomic_read(&import->imp_refcount),
972 import->imp_obd->obd_name);
973 return import;
974 }
975 EXPORT_SYMBOL(class_import_get);
976
class_import_put(struct obd_import * imp)977 void class_import_put(struct obd_import *imp)
978 {
979 LASSERT(list_empty(&imp->imp_zombie_chain));
980 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
981
982 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
983 atomic_read(&imp->imp_refcount) - 1,
984 imp->imp_obd->obd_name);
985
986 if (atomic_dec_and_test(&imp->imp_refcount)) {
987 CDEBUG(D_INFO, "final put import %p\n", imp);
988 obd_zombie_import_add(imp);
989 }
990
991 /* catch possible import put race */
992 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
993 }
994 EXPORT_SYMBOL(class_import_put);
995
init_imp_at(struct imp_at * at)996 static void init_imp_at(struct imp_at *at) {
997 int i;
998 at_init(&at->iat_net_latency, 0, 0);
999 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1000 /* max service estimates are tracked on the server side, so
1001 don't use the AT history here, just use the last reported
1002 val. (But keep hist for proc histogram, worst_ever) */
1003 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1004 AT_FLG_NOHIST);
1005 }
1006 }
1007
class_new_import(struct obd_device * obd)1008 struct obd_import *class_new_import(struct obd_device *obd)
1009 {
1010 struct obd_import *imp;
1011
1012 OBD_ALLOC(imp, sizeof(*imp));
1013 if (imp == NULL)
1014 return NULL;
1015
1016 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1017 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1018 INIT_LIST_HEAD(&imp->imp_replay_list);
1019 INIT_LIST_HEAD(&imp->imp_sending_list);
1020 INIT_LIST_HEAD(&imp->imp_delayed_list);
1021 INIT_LIST_HEAD(&imp->imp_committed_list);
1022 imp->imp_replay_cursor = &imp->imp_committed_list;
1023 spin_lock_init(&imp->imp_lock);
1024 imp->imp_last_success_conn = 0;
1025 imp->imp_state = LUSTRE_IMP_NEW;
1026 imp->imp_obd = class_incref(obd, "import", imp);
1027 mutex_init(&imp->imp_sec_mutex);
1028 init_waitqueue_head(&imp->imp_recovery_waitq);
1029
1030 atomic_set(&imp->imp_refcount, 2);
1031 atomic_set(&imp->imp_unregistering, 0);
1032 atomic_set(&imp->imp_inflight, 0);
1033 atomic_set(&imp->imp_replay_inflight, 0);
1034 atomic_set(&imp->imp_inval_count, 0);
1035 INIT_LIST_HEAD(&imp->imp_conn_list);
1036 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1037 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1038 init_imp_at(&imp->imp_at);
1039
1040 /* the default magic is V2, will be used in connect RPC, and
1041 * then adjusted according to the flags in request/reply. */
1042 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1043
1044 return imp;
1045 }
1046 EXPORT_SYMBOL(class_new_import);
1047
class_destroy_import(struct obd_import * import)1048 void class_destroy_import(struct obd_import *import)
1049 {
1050 LASSERT(import != NULL);
1051 LASSERT(import != LP_POISON);
1052
1053 class_handle_unhash(&import->imp_handle);
1054
1055 spin_lock(&import->imp_lock);
1056 import->imp_generation++;
1057 spin_unlock(&import->imp_lock);
1058 class_import_put(import);
1059 }
1060 EXPORT_SYMBOL(class_destroy_import);
1061
1062 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1063
__class_export_add_lock_ref(struct obd_export * exp,struct ldlm_lock * lock)1064 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1065 {
1066 spin_lock(&exp->exp_locks_list_guard);
1067
1068 LASSERT(lock->l_exp_refs_nr >= 0);
1069
1070 if (lock->l_exp_refs_target != NULL &&
1071 lock->l_exp_refs_target != exp) {
1072 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1073 exp, lock, lock->l_exp_refs_target);
1074 }
1075 if ((lock->l_exp_refs_nr ++) == 0) {
1076 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1077 lock->l_exp_refs_target = exp;
1078 }
1079 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1080 lock, exp, lock->l_exp_refs_nr);
1081 spin_unlock(&exp->exp_locks_list_guard);
1082 }
1083 EXPORT_SYMBOL(__class_export_add_lock_ref);
1084
__class_export_del_lock_ref(struct obd_export * exp,struct ldlm_lock * lock)1085 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1086 {
1087 spin_lock(&exp->exp_locks_list_guard);
1088 LASSERT(lock->l_exp_refs_nr > 0);
1089 if (lock->l_exp_refs_target != exp) {
1090 LCONSOLE_WARN("lock %p, "
1091 "mismatching export pointers: %p, %p\n",
1092 lock, lock->l_exp_refs_target, exp);
1093 }
1094 if (-- lock->l_exp_refs_nr == 0) {
1095 list_del_init(&lock->l_exp_refs_link);
1096 lock->l_exp_refs_target = NULL;
1097 }
1098 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1099 lock, exp, lock->l_exp_refs_nr);
1100 spin_unlock(&exp->exp_locks_list_guard);
1101 }
1102 EXPORT_SYMBOL(__class_export_del_lock_ref);
1103 #endif
1104
1105 /* A connection defines an export context in which preallocation can
1106 be managed. This releases the export pointer reference, and returns
1107 the export handle, so the export refcount is 1 when this function
1108 returns. */
class_connect(struct lustre_handle * conn,struct obd_device * obd,struct obd_uuid * cluuid)1109 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1110 struct obd_uuid *cluuid)
1111 {
1112 struct obd_export *export;
1113 LASSERT(conn != NULL);
1114 LASSERT(obd != NULL);
1115 LASSERT(cluuid != NULL);
1116
1117 export = class_new_export(obd, cluuid);
1118 if (IS_ERR(export))
1119 return PTR_ERR(export);
1120
1121 conn->cookie = export->exp_handle.h_cookie;
1122 class_export_put(export);
1123
1124 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1125 cluuid->uuid, conn->cookie);
1126 return 0;
1127 }
1128 EXPORT_SYMBOL(class_connect);
1129
1130 /* if export is involved in recovery then clean up related things */
class_export_recovery_cleanup(struct obd_export * exp)1131 void class_export_recovery_cleanup(struct obd_export *exp)
1132 {
1133 struct obd_device *obd = exp->exp_obd;
1134
1135 spin_lock(&obd->obd_recovery_task_lock);
1136 if (exp->exp_delayed)
1137 obd->obd_delayed_clients--;
1138 if (obd->obd_recovering) {
1139 if (exp->exp_in_recovery) {
1140 spin_lock(&exp->exp_lock);
1141 exp->exp_in_recovery = 0;
1142 spin_unlock(&exp->exp_lock);
1143 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1144 atomic_dec(&obd->obd_connected_clients);
1145 }
1146
1147 /* if called during recovery then should update
1148 * obd_stale_clients counter,
1149 * lightweight exports are not counted */
1150 if (exp->exp_failed &&
1151 (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1152 exp->exp_obd->obd_stale_clients++;
1153 }
1154 spin_unlock(&obd->obd_recovery_task_lock);
1155 /** Cleanup req replay fields */
1156 if (exp->exp_req_replay_needed) {
1157 spin_lock(&exp->exp_lock);
1158 exp->exp_req_replay_needed = 0;
1159 spin_unlock(&exp->exp_lock);
1160 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1161 atomic_dec(&obd->obd_req_replay_clients);
1162 }
1163 /** Cleanup lock replay data */
1164 if (exp->exp_lock_replay_needed) {
1165 spin_lock(&exp->exp_lock);
1166 exp->exp_lock_replay_needed = 0;
1167 spin_unlock(&exp->exp_lock);
1168 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1169 atomic_dec(&obd->obd_lock_replay_clients);
1170 }
1171 }
1172
1173 /* This function removes 1-3 references from the export:
1174 * 1 - for export pointer passed
1175 * and if disconnect really need
1176 * 2 - removing from hash
1177 * 3 - in client_unlink_export
1178 * The export pointer passed to this function can destroyed */
class_disconnect(struct obd_export * export)1179 int class_disconnect(struct obd_export *export)
1180 {
1181 int already_disconnected;
1182
1183 if (export == NULL) {
1184 CWARN("attempting to free NULL export %p\n", export);
1185 return -EINVAL;
1186 }
1187
1188 spin_lock(&export->exp_lock);
1189 already_disconnected = export->exp_disconnected;
1190 export->exp_disconnected = 1;
1191 spin_unlock(&export->exp_lock);
1192
1193 /* class_cleanup(), abort_recovery(), and class_fail_export()
1194 * all end up in here, and if any of them race we shouldn't
1195 * call extra class_export_puts(). */
1196 if (already_disconnected) {
1197 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1198 goto no_disconn;
1199 }
1200
1201 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1202 export->exp_handle.h_cookie);
1203
1204 if (!hlist_unhashed(&export->exp_nid_hash))
1205 cfs_hash_del(export->exp_obd->obd_nid_hash,
1206 &export->exp_connection->c_peer.nid,
1207 &export->exp_nid_hash);
1208
1209 class_export_recovery_cleanup(export);
1210 class_unlink_export(export);
1211 no_disconn:
1212 class_export_put(export);
1213 return 0;
1214 }
1215 EXPORT_SYMBOL(class_disconnect);
1216
1217 /* Return non-zero for a fully connected export */
class_connected_export(struct obd_export * exp)1218 int class_connected_export(struct obd_export *exp)
1219 {
1220 if (exp) {
1221 int connected;
1222 spin_lock(&exp->exp_lock);
1223 connected = (exp->exp_conn_cnt > 0);
1224 spin_unlock(&exp->exp_lock);
1225 return connected;
1226 }
1227 return 0;
1228 }
1229 EXPORT_SYMBOL(class_connected_export);
1230
class_disconnect_export_list(struct list_head * list,enum obd_option flags)1231 static void class_disconnect_export_list(struct list_head *list,
1232 enum obd_option flags)
1233 {
1234 int rc;
1235 struct obd_export *exp;
1236
1237 /* It's possible that an export may disconnect itself, but
1238 * nothing else will be added to this list. */
1239 while (!list_empty(list)) {
1240 exp = list_entry(list->next, struct obd_export,
1241 exp_obd_chain);
1242 /* need for safe call CDEBUG after obd_disconnect */
1243 class_export_get(exp);
1244
1245 spin_lock(&exp->exp_lock);
1246 exp->exp_flags = flags;
1247 spin_unlock(&exp->exp_lock);
1248
1249 if (obd_uuid_equals(&exp->exp_client_uuid,
1250 &exp->exp_obd->obd_uuid)) {
1251 CDEBUG(D_HA,
1252 "exp %p export uuid == obd uuid, don't discon\n",
1253 exp);
1254 /* Need to delete this now so we don't end up pointing
1255 * to work_list later when this export is cleaned up. */
1256 list_del_init(&exp->exp_obd_chain);
1257 class_export_put(exp);
1258 continue;
1259 }
1260
1261 class_export_get(exp);
1262 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1263 "last request at "CFS_TIME_T"\n",
1264 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1265 exp, exp->exp_last_request_time);
1266 /* release one export reference anyway */
1267 rc = obd_disconnect(exp);
1268
1269 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1270 obd_export_nid2str(exp), exp, rc);
1271 class_export_put(exp);
1272 }
1273 }
1274
class_disconnect_exports(struct obd_device * obd)1275 void class_disconnect_exports(struct obd_device *obd)
1276 {
1277 struct list_head work_list;
1278
1279 /* Move all of the exports from obd_exports to a work list, en masse. */
1280 INIT_LIST_HEAD(&work_list);
1281 spin_lock(&obd->obd_dev_lock);
1282 list_splice_init(&obd->obd_exports, &work_list);
1283 list_splice_init(&obd->obd_delayed_exports, &work_list);
1284 spin_unlock(&obd->obd_dev_lock);
1285
1286 if (!list_empty(&work_list)) {
1287 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1288 "disconnecting them\n", obd->obd_minor, obd);
1289 class_disconnect_export_list(&work_list,
1290 exp_flags_from_obd(obd));
1291 } else
1292 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1293 obd->obd_minor, obd);
1294 }
1295 EXPORT_SYMBOL(class_disconnect_exports);
1296
1297 /* Remove exports that have not completed recovery.
1298 */
class_disconnect_stale_exports(struct obd_device * obd,int (* test_export)(struct obd_export *))1299 void class_disconnect_stale_exports(struct obd_device *obd,
1300 int (*test_export)(struct obd_export *))
1301 {
1302 struct list_head work_list;
1303 struct obd_export *exp, *n;
1304 int evicted = 0;
1305
1306 INIT_LIST_HEAD(&work_list);
1307 spin_lock(&obd->obd_dev_lock);
1308 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1309 exp_obd_chain) {
1310 /* don't count self-export as client */
1311 if (obd_uuid_equals(&exp->exp_client_uuid,
1312 &exp->exp_obd->obd_uuid))
1313 continue;
1314
1315 /* don't evict clients which have no slot in last_rcvd
1316 * (e.g. lightweight connection) */
1317 if (exp->exp_target_data.ted_lr_idx == -1)
1318 continue;
1319
1320 spin_lock(&exp->exp_lock);
1321 if (exp->exp_failed || test_export(exp)) {
1322 spin_unlock(&exp->exp_lock);
1323 continue;
1324 }
1325 exp->exp_failed = 1;
1326 spin_unlock(&exp->exp_lock);
1327
1328 list_move(&exp->exp_obd_chain, &work_list);
1329 evicted++;
1330 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1331 obd->obd_name, exp->exp_client_uuid.uuid,
1332 exp->exp_connection == NULL ? "<unknown>" :
1333 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1334 print_export_data(exp, "EVICTING", 0);
1335 }
1336 spin_unlock(&obd->obd_dev_lock);
1337
1338 if (evicted)
1339 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1340 obd->obd_name, evicted);
1341
1342 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1343 OBD_OPT_ABORT_RECOV);
1344 }
1345 EXPORT_SYMBOL(class_disconnect_stale_exports);
1346
class_fail_export(struct obd_export * exp)1347 void class_fail_export(struct obd_export *exp)
1348 {
1349 int rc, already_failed;
1350
1351 spin_lock(&exp->exp_lock);
1352 already_failed = exp->exp_failed;
1353 exp->exp_failed = 1;
1354 spin_unlock(&exp->exp_lock);
1355
1356 if (already_failed) {
1357 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1358 exp, exp->exp_client_uuid.uuid);
1359 return;
1360 }
1361
1362 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1363 exp, exp->exp_client_uuid.uuid);
1364
1365 if (obd_dump_on_timeout)
1366 libcfs_debug_dumplog();
1367
1368 /* need for safe call CDEBUG after obd_disconnect */
1369 class_export_get(exp);
1370
1371 /* Most callers into obd_disconnect are removing their own reference
1372 * (request, for example) in addition to the one from the hash table.
1373 * We don't have such a reference here, so make one. */
1374 class_export_get(exp);
1375 rc = obd_disconnect(exp);
1376 if (rc)
1377 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1378 else
1379 CDEBUG(D_HA, "disconnected export %p/%s\n",
1380 exp, exp->exp_client_uuid.uuid);
1381 class_export_put(exp);
1382 }
1383 EXPORT_SYMBOL(class_fail_export);
1384
obd_export_nid2str(struct obd_export * exp)1385 char *obd_export_nid2str(struct obd_export *exp)
1386 {
1387 if (exp->exp_connection != NULL)
1388 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1389
1390 return "(no nid)";
1391 }
1392 EXPORT_SYMBOL(obd_export_nid2str);
1393
obd_export_evict_by_nid(struct obd_device * obd,const char * nid)1394 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1395 {
1396 struct cfs_hash *nid_hash;
1397 struct obd_export *doomed_exp = NULL;
1398 int exports_evicted = 0;
1399
1400 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1401
1402 spin_lock(&obd->obd_dev_lock);
1403 /* umount has run already, so evict thread should leave
1404 * its task to umount thread now */
1405 if (obd->obd_stopping) {
1406 spin_unlock(&obd->obd_dev_lock);
1407 return exports_evicted;
1408 }
1409 nid_hash = obd->obd_nid_hash;
1410 cfs_hash_getref(nid_hash);
1411 spin_unlock(&obd->obd_dev_lock);
1412
1413 do {
1414 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1415 if (doomed_exp == NULL)
1416 break;
1417
1418 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1419 "nid %s found, wanted nid %s, requested nid %s\n",
1420 obd_export_nid2str(doomed_exp),
1421 libcfs_nid2str(nid_key), nid);
1422 LASSERTF(doomed_exp != obd->obd_self_export,
1423 "self-export is hashed by NID?\n");
1424 exports_evicted++;
1425 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1426 "request\n", obd->obd_name,
1427 obd_uuid2str(&doomed_exp->exp_client_uuid),
1428 obd_export_nid2str(doomed_exp));
1429 class_fail_export(doomed_exp);
1430 class_export_put(doomed_exp);
1431 } while (1);
1432
1433 cfs_hash_putref(nid_hash);
1434
1435 if (!exports_evicted)
1436 CDEBUG(D_HA,
1437 "%s: can't disconnect NID '%s': no exports found\n",
1438 obd->obd_name, nid);
1439 return exports_evicted;
1440 }
1441 EXPORT_SYMBOL(obd_export_evict_by_nid);
1442
obd_export_evict_by_uuid(struct obd_device * obd,const char * uuid)1443 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1444 {
1445 struct cfs_hash *uuid_hash;
1446 struct obd_export *doomed_exp = NULL;
1447 struct obd_uuid doomed_uuid;
1448 int exports_evicted = 0;
1449
1450 spin_lock(&obd->obd_dev_lock);
1451 if (obd->obd_stopping) {
1452 spin_unlock(&obd->obd_dev_lock);
1453 return exports_evicted;
1454 }
1455 uuid_hash = obd->obd_uuid_hash;
1456 cfs_hash_getref(uuid_hash);
1457 spin_unlock(&obd->obd_dev_lock);
1458
1459 obd_str2uuid(&doomed_uuid, uuid);
1460 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1461 CERROR("%s: can't evict myself\n", obd->obd_name);
1462 cfs_hash_putref(uuid_hash);
1463 return exports_evicted;
1464 }
1465
1466 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1467
1468 if (doomed_exp == NULL) {
1469 CERROR("%s: can't disconnect %s: no exports found\n",
1470 obd->obd_name, uuid);
1471 } else {
1472 CWARN("%s: evicting %s at administrative request\n",
1473 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1474 class_fail_export(doomed_exp);
1475 class_export_put(doomed_exp);
1476 exports_evicted++;
1477 }
1478 cfs_hash_putref(uuid_hash);
1479
1480 return exports_evicted;
1481 }
1482 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1483
1484 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1485 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1486 EXPORT_SYMBOL(class_export_dump_hook);
1487 #endif
1488
print_export_data(struct obd_export * exp,const char * status,int locks)1489 static void print_export_data(struct obd_export *exp, const char *status,
1490 int locks)
1491 {
1492 struct ptlrpc_reply_state *rs;
1493 struct ptlrpc_reply_state *first_reply = NULL;
1494 int nreplies = 0;
1495
1496 spin_lock(&exp->exp_lock);
1497 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1498 rs_exp_list) {
1499 if (nreplies == 0)
1500 first_reply = rs;
1501 nreplies++;
1502 }
1503 spin_unlock(&exp->exp_lock);
1504
1505 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s %llu\n",
1506 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1507 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1508 atomic_read(&exp->exp_rpc_count),
1509 atomic_read(&exp->exp_cb_count),
1510 atomic_read(&exp->exp_locks_count),
1511 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1512 nreplies, first_reply, nreplies > 3 ? "..." : "",
1513 exp->exp_last_committed);
1514 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1515 if (locks && class_export_dump_hook != NULL)
1516 class_export_dump_hook(exp);
1517 #endif
1518 }
1519
dump_exports(struct obd_device * obd,int locks)1520 void dump_exports(struct obd_device *obd, int locks)
1521 {
1522 struct obd_export *exp;
1523
1524 spin_lock(&obd->obd_dev_lock);
1525 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1526 print_export_data(exp, "ACTIVE", locks);
1527 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1528 print_export_data(exp, "UNLINKED", locks);
1529 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1530 print_export_data(exp, "DELAYED", locks);
1531 spin_unlock(&obd->obd_dev_lock);
1532 spin_lock(&obd_zombie_impexp_lock);
1533 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1534 print_export_data(exp, "ZOMBIE", locks);
1535 spin_unlock(&obd_zombie_impexp_lock);
1536 }
1537 EXPORT_SYMBOL(dump_exports);
1538
obd_exports_barrier(struct obd_device * obd)1539 void obd_exports_barrier(struct obd_device *obd)
1540 {
1541 int waited = 2;
1542 LASSERT(list_empty(&obd->obd_exports));
1543 spin_lock(&obd->obd_dev_lock);
1544 while (!list_empty(&obd->obd_unlinked_exports)) {
1545 spin_unlock(&obd->obd_dev_lock);
1546 set_current_state(TASK_UNINTERRUPTIBLE);
1547 schedule_timeout(cfs_time_seconds(waited));
1548 if (waited > 5 && IS_PO2(waited)) {
1549 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1550 "more than %d seconds. "
1551 "The obd refcount = %d. Is it stuck?\n",
1552 obd->obd_name, waited,
1553 atomic_read(&obd->obd_refcount));
1554 dump_exports(obd, 1);
1555 }
1556 waited *= 2;
1557 spin_lock(&obd->obd_dev_lock);
1558 }
1559 spin_unlock(&obd->obd_dev_lock);
1560 }
1561 EXPORT_SYMBOL(obd_exports_barrier);
1562
1563 /* Total amount of zombies to be destroyed */
1564 static int zombies_count = 0;
1565
1566 /**
1567 * kill zombie imports and exports
1568 */
obd_zombie_impexp_cull(void)1569 void obd_zombie_impexp_cull(void)
1570 {
1571 struct obd_import *import;
1572 struct obd_export *export;
1573
1574 do {
1575 spin_lock(&obd_zombie_impexp_lock);
1576
1577 import = NULL;
1578 if (!list_empty(&obd_zombie_imports)) {
1579 import = list_entry(obd_zombie_imports.next,
1580 struct obd_import,
1581 imp_zombie_chain);
1582 list_del_init(&import->imp_zombie_chain);
1583 }
1584
1585 export = NULL;
1586 if (!list_empty(&obd_zombie_exports)) {
1587 export = list_entry(obd_zombie_exports.next,
1588 struct obd_export,
1589 exp_obd_chain);
1590 list_del_init(&export->exp_obd_chain);
1591 }
1592
1593 spin_unlock(&obd_zombie_impexp_lock);
1594
1595 if (import != NULL) {
1596 class_import_destroy(import);
1597 spin_lock(&obd_zombie_impexp_lock);
1598 zombies_count--;
1599 spin_unlock(&obd_zombie_impexp_lock);
1600 }
1601
1602 if (export != NULL) {
1603 class_export_destroy(export);
1604 spin_lock(&obd_zombie_impexp_lock);
1605 zombies_count--;
1606 spin_unlock(&obd_zombie_impexp_lock);
1607 }
1608
1609 cond_resched();
1610 } while (import != NULL || export != NULL);
1611 }
1612
1613 static struct completion obd_zombie_start;
1614 static struct completion obd_zombie_stop;
1615 static unsigned long obd_zombie_flags;
1616 static wait_queue_head_t obd_zombie_waitq;
1617 static pid_t obd_zombie_pid;
1618
1619 enum {
1620 OBD_ZOMBIE_STOP = 0x0001,
1621 };
1622
1623 /**
1624 * check for work for kill zombie import/export thread.
1625 */
obd_zombie_impexp_check(void * arg)1626 static int obd_zombie_impexp_check(void *arg)
1627 {
1628 int rc;
1629
1630 spin_lock(&obd_zombie_impexp_lock);
1631 rc = (zombies_count == 0) &&
1632 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1633 spin_unlock(&obd_zombie_impexp_lock);
1634
1635 return rc;
1636 }
1637
1638 /**
1639 * Add export to the obd_zombie thread and notify it.
1640 */
obd_zombie_export_add(struct obd_export * exp)1641 static void obd_zombie_export_add(struct obd_export *exp) {
1642 spin_lock(&exp->exp_obd->obd_dev_lock);
1643 LASSERT(!list_empty(&exp->exp_obd_chain));
1644 list_del_init(&exp->exp_obd_chain);
1645 spin_unlock(&exp->exp_obd->obd_dev_lock);
1646 spin_lock(&obd_zombie_impexp_lock);
1647 zombies_count++;
1648 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1649 spin_unlock(&obd_zombie_impexp_lock);
1650
1651 obd_zombie_impexp_notify();
1652 }
1653
1654 /**
1655 * Add import to the obd_zombie thread and notify it.
1656 */
obd_zombie_import_add(struct obd_import * imp)1657 static void obd_zombie_import_add(struct obd_import *imp) {
1658 LASSERT(imp->imp_sec == NULL);
1659 LASSERT(imp->imp_rq_pool == NULL);
1660 spin_lock(&obd_zombie_impexp_lock);
1661 LASSERT(list_empty(&imp->imp_zombie_chain));
1662 zombies_count++;
1663 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1664 spin_unlock(&obd_zombie_impexp_lock);
1665
1666 obd_zombie_impexp_notify();
1667 }
1668
1669 /**
1670 * notify import/export destroy thread about new zombie.
1671 */
obd_zombie_impexp_notify(void)1672 static void obd_zombie_impexp_notify(void)
1673 {
1674 /*
1675 * Make sure obd_zombie_impexp_thread get this notification.
1676 * It is possible this signal only get by obd_zombie_barrier, and
1677 * barrier gulps this notification and sleeps away and hangs ensues
1678 */
1679 wake_up_all(&obd_zombie_waitq);
1680 }
1681
1682 /**
1683 * check whether obd_zombie is idle
1684 */
obd_zombie_is_idle(void)1685 static int obd_zombie_is_idle(void)
1686 {
1687 int rc;
1688
1689 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1690 spin_lock(&obd_zombie_impexp_lock);
1691 rc = (zombies_count == 0);
1692 spin_unlock(&obd_zombie_impexp_lock);
1693 return rc;
1694 }
1695
1696 /**
1697 * wait when obd_zombie import/export queues become empty
1698 */
obd_zombie_barrier(void)1699 void obd_zombie_barrier(void)
1700 {
1701 struct l_wait_info lwi = { 0 };
1702
1703 if (obd_zombie_pid == current_pid())
1704 /* don't wait for myself */
1705 return;
1706 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1707 }
1708 EXPORT_SYMBOL(obd_zombie_barrier);
1709
1710
1711 /**
1712 * destroy zombie export/import thread.
1713 */
obd_zombie_impexp_thread(void * unused)1714 static int obd_zombie_impexp_thread(void *unused)
1715 {
1716 unshare_fs_struct();
1717 complete(&obd_zombie_start);
1718
1719 obd_zombie_pid = current_pid();
1720
1721 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1722 struct l_wait_info lwi = { 0 };
1723
1724 l_wait_event(obd_zombie_waitq,
1725 !obd_zombie_impexp_check(NULL), &lwi);
1726 obd_zombie_impexp_cull();
1727
1728 /*
1729 * Notify obd_zombie_barrier callers that queues
1730 * may be empty.
1731 */
1732 wake_up(&obd_zombie_waitq);
1733 }
1734
1735 complete(&obd_zombie_stop);
1736
1737 return 0;
1738 }
1739
1740
1741 /**
1742 * start destroy zombie import/export thread
1743 */
obd_zombie_impexp_init(void)1744 int obd_zombie_impexp_init(void)
1745 {
1746 struct task_struct *task;
1747
1748 INIT_LIST_HEAD(&obd_zombie_imports);
1749 INIT_LIST_HEAD(&obd_zombie_exports);
1750 spin_lock_init(&obd_zombie_impexp_lock);
1751 init_completion(&obd_zombie_start);
1752 init_completion(&obd_zombie_stop);
1753 init_waitqueue_head(&obd_zombie_waitq);
1754 obd_zombie_pid = 0;
1755
1756 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1757 if (IS_ERR(task))
1758 return PTR_ERR(task);
1759
1760 wait_for_completion(&obd_zombie_start);
1761 return 0;
1762 }
1763 /**
1764 * stop destroy zombie import/export thread
1765 */
obd_zombie_impexp_stop(void)1766 void obd_zombie_impexp_stop(void)
1767 {
1768 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1769 obd_zombie_impexp_notify();
1770 wait_for_completion(&obd_zombie_stop);
1771 }
1772
1773 /***** Kernel-userspace comm helpers *******/
1774
1775 /* Get length of entire message, including header */
kuc_len(int payload_len)1776 int kuc_len(int payload_len)
1777 {
1778 return sizeof(struct kuc_hdr) + payload_len;
1779 }
1780 EXPORT_SYMBOL(kuc_len);
1781
1782 /* Get a pointer to kuc header, given a ptr to the payload
1783 * @param p Pointer to payload area
1784 * @returns Pointer to kuc header
1785 */
kuc_ptr(void * p)1786 struct kuc_hdr * kuc_ptr(void *p)
1787 {
1788 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1789 LASSERT(lh->kuc_magic == KUC_MAGIC);
1790 return lh;
1791 }
1792 EXPORT_SYMBOL(kuc_ptr);
1793
1794 /* Test if payload is part of kuc message
1795 * @param p Pointer to payload area
1796 * @returns boolean
1797 */
kuc_ispayload(void * p)1798 int kuc_ispayload(void *p)
1799 {
1800 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1801
1802 if (kh->kuc_magic == KUC_MAGIC)
1803 return 1;
1804 else
1805 return 0;
1806 }
1807 EXPORT_SYMBOL(kuc_ispayload);
1808
1809 /* Alloc space for a message, and fill in header
1810 * @return Pointer to payload area
1811 */
kuc_alloc(int payload_len,int transport,int type)1812 void *kuc_alloc(int payload_len, int transport, int type)
1813 {
1814 struct kuc_hdr *lh;
1815 int len = kuc_len(payload_len);
1816
1817 OBD_ALLOC(lh, len);
1818 if (lh == NULL)
1819 return ERR_PTR(-ENOMEM);
1820
1821 lh->kuc_magic = KUC_MAGIC;
1822 lh->kuc_transport = transport;
1823 lh->kuc_msgtype = type;
1824 lh->kuc_msglen = len;
1825
1826 return (void *)(lh + 1);
1827 }
1828 EXPORT_SYMBOL(kuc_alloc);
1829
1830 /* Takes pointer to payload area */
kuc_free(void * p,int payload_len)1831 inline void kuc_free(void *p, int payload_len)
1832 {
1833 struct kuc_hdr *lh = kuc_ptr(p);
1834 OBD_FREE(lh, kuc_len(payload_len));
1835 }
1836 EXPORT_SYMBOL(kuc_free);
1837