• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/lu_object.c
37  *
38  * Lustre Object.
39  * These are the only exported functions, they provide some generic
40  * infrastructure for managing object devices
41  *
42  *   Author: Nikita Danilov <nikita.danilov@sun.com>
43  */
44 
45 #define DEBUG_SUBSYSTEM S_CLASS
46 
47 #include "../../include/linux/libcfs/libcfs.h"
48 
49 # include <linux/module.h>
50 
51 /* hash_long() */
52 #include "../../include/linux/libcfs/libcfs_hash.h"
53 #include "../include/obd_class.h"
54 #include "../include/obd_support.h"
55 #include "../include/lustre_disk.h"
56 #include "../include/lustre_fid.h"
57 #include "../include/lu_object.h"
58 #include "../include/lu_ref.h"
59 #include <linux/list.h>
60 
61 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
62 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx);
63 
64 /**
65  * Decrease reference counter on object. If last reference is freed, return
66  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
67  * case, free object immediately.
68  */
lu_object_put(const struct lu_env * env,struct lu_object * o)69 void lu_object_put(const struct lu_env *env, struct lu_object *o)
70 {
71 	struct lu_site_bkt_data *bkt;
72 	struct lu_object_header *top;
73 	struct lu_site	  *site;
74 	struct lu_object	*orig;
75 	struct cfs_hash_bd	    bd;
76 	const struct lu_fid     *fid;
77 
78 	top  = o->lo_header;
79 	site = o->lo_dev->ld_site;
80 	orig = o;
81 
82 	/*
83 	 * till we have full fids-on-OST implemented anonymous objects
84 	 * are possible in OSP. such an object isn't listed in the site
85 	 * so we should not remove it from the site.
86 	 */
87 	fid = lu_object_fid(o);
88 	if (fid_is_zero(fid)) {
89 		LASSERT(top->loh_hash.next == NULL
90 			&& top->loh_hash.pprev == NULL);
91 		LASSERT(list_empty(&top->loh_lru));
92 		if (!atomic_dec_and_test(&top->loh_ref))
93 			return;
94 		list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
95 			if (o->lo_ops->loo_object_release != NULL)
96 				o->lo_ops->loo_object_release(env, o);
97 		}
98 		lu_object_free(env, orig);
99 		return;
100 	}
101 
102 	cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd);
103 	bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
104 
105 	if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) {
106 		if (lu_object_is_dying(top)) {
107 
108 			/*
109 			 * somebody may be waiting for this, currently only
110 			 * used for cl_object, see cl_object_put_last().
111 			 */
112 			wake_up_all(&bkt->lsb_marche_funebre);
113 		}
114 		return;
115 	}
116 
117 	/*
118 	 * When last reference is released, iterate over object
119 	 * layers, and notify them that object is no longer busy.
120 	 */
121 	list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
122 		if (o->lo_ops->loo_object_release != NULL)
123 			o->lo_ops->loo_object_release(env, o);
124 	}
125 
126 	if (!lu_object_is_dying(top)) {
127 		LASSERT(list_empty(&top->loh_lru));
128 		list_add_tail(&top->loh_lru, &bkt->lsb_lru);
129 		bkt->lsb_lru_len++;
130 		lprocfs_counter_incr(site->ls_stats, LU_SS_LRU_LEN);
131 		CDEBUG(D_INODE, "Add %p to site lru. hash: %p, bkt: %p, lru_len: %ld\n",
132 		       o, site->ls_obj_hash, bkt, bkt->lsb_lru_len);
133 		cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
134 		return;
135 	}
136 
137 	/*
138 	 * If object is dying (will not be cached), removed it
139 	 * from hash table and LRU.
140 	 *
141 	 * This is done with hash table and LRU lists locked. As the only
142 	 * way to acquire first reference to previously unreferenced
143 	 * object is through hash-table lookup (lu_object_find()),
144 	 * or LRU scanning (lu_site_purge()), that are done under hash-table
145 	 * and LRU lock, no race with concurrent object lookup is possible
146 	 * and we can safely destroy object below.
147 	 */
148 	if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags))
149 		cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash);
150 	cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
151 	/*
152 	 * Object was already removed from hash and lru above, can
153 	 * kill it.
154 	 */
155 	lu_object_free(env, orig);
156 }
157 EXPORT_SYMBOL(lu_object_put);
158 
159 /**
160  * Kill the object and take it out of LRU cache.
161  * Currently used by client code for layout change.
162  */
lu_object_unhash(const struct lu_env * env,struct lu_object * o)163 void lu_object_unhash(const struct lu_env *env, struct lu_object *o)
164 {
165 	struct lu_object_header *top;
166 
167 	top = o->lo_header;
168 	set_bit(LU_OBJECT_HEARD_BANSHEE, &top->loh_flags);
169 	if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags)) {
170 		struct lu_site *site = o->lo_dev->ld_site;
171 		struct cfs_hash *obj_hash = site->ls_obj_hash;
172 		struct cfs_hash_bd bd;
173 
174 		cfs_hash_bd_get_and_lock(obj_hash, &top->loh_fid, &bd, 1);
175 		if (!list_empty(&top->loh_lru)) {
176 			struct lu_site_bkt_data *bkt;
177 
178 			list_del_init(&top->loh_lru);
179 			bkt = cfs_hash_bd_extra_get(obj_hash, &bd);
180 			bkt->lsb_lru_len--;
181 			lprocfs_counter_decr(site->ls_stats, LU_SS_LRU_LEN);
182 		}
183 		cfs_hash_bd_del_locked(obj_hash, &bd, &top->loh_hash);
184 		cfs_hash_bd_unlock(obj_hash, &bd, 1);
185 	}
186 }
187 EXPORT_SYMBOL(lu_object_unhash);
188 
189 /**
190  * Allocate new object.
191  *
192  * This follows object creation protocol, described in the comment within
193  * struct lu_device_operations definition.
194  */
lu_object_alloc(const struct lu_env * env,struct lu_device * dev,const struct lu_fid * f,const struct lu_object_conf * conf)195 static struct lu_object *lu_object_alloc(const struct lu_env *env,
196 					 struct lu_device *dev,
197 					 const struct lu_fid *f,
198 					 const struct lu_object_conf *conf)
199 {
200 	struct lu_object *scan;
201 	struct lu_object *top;
202 	struct list_head *layers;
203 	unsigned int init_mask = 0;
204 	unsigned int init_flag;
205 	int clean;
206 	int result;
207 
208 	/*
209 	 * Create top-level object slice. This will also create
210 	 * lu_object_header.
211 	 */
212 	top = dev->ld_ops->ldo_object_alloc(env, NULL, dev);
213 	if (top == NULL)
214 		return ERR_PTR(-ENOMEM);
215 	if (IS_ERR(top))
216 		return top;
217 	/*
218 	 * This is the only place where object fid is assigned. It's constant
219 	 * after this point.
220 	 */
221 	top->lo_header->loh_fid = *f;
222 	layers = &top->lo_header->loh_layers;
223 
224 	do {
225 		/*
226 		 * Call ->loo_object_init() repeatedly, until no more new
227 		 * object slices are created.
228 		 */
229 		clean = 1;
230 		init_flag = 1;
231 		list_for_each_entry(scan, layers, lo_linkage) {
232 			if (init_mask & init_flag)
233 				goto next;
234 			clean = 0;
235 			scan->lo_header = top->lo_header;
236 			result = scan->lo_ops->loo_object_init(env, scan, conf);
237 			if (result != 0) {
238 				lu_object_free(env, top);
239 				return ERR_PTR(result);
240 			}
241 			init_mask |= init_flag;
242 next:
243 			init_flag <<= 1;
244 		}
245 	} while (!clean);
246 
247 	list_for_each_entry_reverse(scan, layers, lo_linkage) {
248 		if (scan->lo_ops->loo_object_start != NULL) {
249 			result = scan->lo_ops->loo_object_start(env, scan);
250 			if (result != 0) {
251 				lu_object_free(env, top);
252 				return ERR_PTR(result);
253 			}
254 		}
255 	}
256 
257 	lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED);
258 	return top;
259 }
260 
261 /**
262  * Free an object.
263  */
lu_object_free(const struct lu_env * env,struct lu_object * o)264 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
265 {
266 	struct lu_site_bkt_data *bkt;
267 	struct lu_site	  *site;
268 	struct lu_object	*scan;
269 	struct list_head	      *layers;
270 	struct list_head	       splice;
271 
272 	site   = o->lo_dev->ld_site;
273 	layers = &o->lo_header->loh_layers;
274 	bkt    = lu_site_bkt_from_fid(site, &o->lo_header->loh_fid);
275 	/*
276 	 * First call ->loo_object_delete() method to release all resources.
277 	 */
278 	list_for_each_entry_reverse(scan, layers, lo_linkage) {
279 		if (scan->lo_ops->loo_object_delete != NULL)
280 			scan->lo_ops->loo_object_delete(env, scan);
281 	}
282 
283 	/*
284 	 * Then, splice object layers into stand-alone list, and call
285 	 * ->loo_object_free() on all layers to free memory. Splice is
286 	 * necessary, because lu_object_header is freed together with the
287 	 * top-level slice.
288 	 */
289 	INIT_LIST_HEAD(&splice);
290 	list_splice_init(layers, &splice);
291 	while (!list_empty(&splice)) {
292 		/*
293 		 * Free layers in bottom-to-top order, so that object header
294 		 * lives as long as possible and ->loo_object_free() methods
295 		 * can look at its contents.
296 		 */
297 		o = container_of0(splice.prev, struct lu_object, lo_linkage);
298 		list_del_init(&o->lo_linkage);
299 		LASSERT(o->lo_ops->loo_object_free != NULL);
300 		o->lo_ops->loo_object_free(env, o);
301 	}
302 
303 	if (waitqueue_active(&bkt->lsb_marche_funebre))
304 		wake_up_all(&bkt->lsb_marche_funebre);
305 }
306 
307 /**
308  * Free \a nr objects from the cold end of the site LRU list.
309  */
lu_site_purge(const struct lu_env * env,struct lu_site * s,int nr)310 int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
311 {
312 	struct lu_object_header *h;
313 	struct lu_object_header *temp;
314 	struct lu_site_bkt_data *bkt;
315 	struct cfs_hash_bd	    bd;
316 	struct cfs_hash_bd	    bd2;
317 	struct list_head	       dispose;
318 	int		      did_sth;
319 	int		      start;
320 	int		      count;
321 	int		      bnr;
322 	int		      i;
323 
324 	if (OBD_FAIL_CHECK(OBD_FAIL_OBD_NO_LRU))
325 		return 0;
326 
327 	INIT_LIST_HEAD(&dispose);
328 	/*
329 	 * Under LRU list lock, scan LRU list and move unreferenced objects to
330 	 * the dispose list, removing them from LRU and hash table.
331 	 */
332 	start = s->ls_purge_start;
333 	bnr = (nr == ~0) ? -1 : nr / CFS_HASH_NBKT(s->ls_obj_hash) + 1;
334  again:
335 	did_sth = 0;
336 	cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
337 		if (i < start)
338 			continue;
339 		count = bnr;
340 		cfs_hash_bd_lock(s->ls_obj_hash, &bd, 1);
341 		bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
342 
343 		list_for_each_entry_safe(h, temp, &bkt->lsb_lru, loh_lru) {
344 			LASSERT(atomic_read(&h->loh_ref) == 0);
345 
346 			cfs_hash_bd_get(s->ls_obj_hash, &h->loh_fid, &bd2);
347 			LASSERT(bd.bd_bucket == bd2.bd_bucket);
348 
349 			cfs_hash_bd_del_locked(s->ls_obj_hash,
350 					       &bd2, &h->loh_hash);
351 			list_move(&h->loh_lru, &dispose);
352 			bkt->lsb_lru_len--;
353 			lprocfs_counter_decr(s->ls_stats, LU_SS_LRU_LEN);
354 			if (did_sth == 0)
355 				did_sth = 1;
356 
357 			if (nr != ~0 && --nr == 0)
358 				break;
359 
360 			if (count > 0 && --count == 0)
361 				break;
362 
363 		}
364 		cfs_hash_bd_unlock(s->ls_obj_hash, &bd, 1);
365 		cond_resched();
366 		/*
367 		 * Free everything on the dispose list. This is safe against
368 		 * races due to the reasons described in lu_object_put().
369 		 */
370 		while (!list_empty(&dispose)) {
371 			h = container_of0(dispose.next,
372 					  struct lu_object_header, loh_lru);
373 			list_del_init(&h->loh_lru);
374 			lu_object_free(env, lu_object_top(h));
375 			lprocfs_counter_incr(s->ls_stats, LU_SS_LRU_PURGED);
376 		}
377 
378 		if (nr == 0)
379 			break;
380 	}
381 
382 	if (nr != 0 && did_sth && start != 0) {
383 		start = 0; /* restart from the first bucket */
384 		goto again;
385 	}
386 	/* race on s->ls_purge_start, but nobody cares */
387 	s->ls_purge_start = i % CFS_HASH_NBKT(s->ls_obj_hash);
388 
389 	return nr;
390 }
391 EXPORT_SYMBOL(lu_site_purge);
392 
393 /*
394  * Object printing.
395  *
396  * Code below has to jump through certain loops to output object description
397  * into libcfs_debug_msg-based log. The problem is that lu_object_print()
398  * composes object description from strings that are parts of _lines_ of
399  * output (i.e., strings that are not terminated by newline). This doesn't fit
400  * very well into libcfs_debug_msg() interface that assumes that each message
401  * supplied to it is a self-contained output line.
402  *
403  * To work around this, strings are collected in a temporary buffer
404  * (implemented as a value of lu_cdebug_key key), until terminating newline
405  * character is detected.
406  *
407  */
408 
409 enum {
410 	/**
411 	 * Maximal line size.
412 	 *
413 	 * XXX overflow is not handled correctly.
414 	 */
415 	LU_CDEBUG_LINE = 512
416 };
417 
418 struct lu_cdebug_data {
419 	/**
420 	 * Temporary buffer.
421 	 */
422 	char lck_area[LU_CDEBUG_LINE];
423 };
424 
425 /* context key constructor/destructor: lu_global_key_init, lu_global_key_fini */
426 LU_KEY_INIT_FINI(lu_global, struct lu_cdebug_data);
427 
428 /**
429  * Key, holding temporary buffer. This key is registered very early by
430  * lu_global_init().
431  */
432 static struct lu_context_key lu_global_key = {
433 	.lct_tags = LCT_MD_THREAD | LCT_DT_THREAD |
434 		    LCT_MG_THREAD | LCT_CL_THREAD | LCT_LOCAL,
435 	.lct_init = lu_global_key_init,
436 	.lct_fini = lu_global_key_fini
437 };
438 
439 /**
440  * Printer function emitting messages through libcfs_debug_msg().
441  */
lu_cdebug_printer(const struct lu_env * env,void * cookie,const char * format,...)442 int lu_cdebug_printer(const struct lu_env *env,
443 		      void *cookie, const char *format, ...)
444 {
445 	struct libcfs_debug_msg_data *msgdata = cookie;
446 	struct lu_cdebug_data	*key;
447 	int used;
448 	int complete;
449 	va_list args;
450 
451 	va_start(args, format);
452 
453 	key = lu_context_key_get(&env->le_ctx, &lu_global_key);
454 	LASSERT(key != NULL);
455 
456 	used = strlen(key->lck_area);
457 	complete = format[strlen(format) - 1] == '\n';
458 	/*
459 	 * Append new chunk to the buffer.
460 	 */
461 	vsnprintf(key->lck_area + used,
462 		  ARRAY_SIZE(key->lck_area) - used, format, args);
463 	if (complete) {
464 		if (cfs_cdebug_show(msgdata->msg_mask, msgdata->msg_subsys))
465 			libcfs_debug_msg(msgdata, "%s", key->lck_area);
466 		key->lck_area[0] = 0;
467 	}
468 	va_end(args);
469 	return 0;
470 }
471 EXPORT_SYMBOL(lu_cdebug_printer);
472 
473 /**
474  * Print object header.
475  */
lu_object_header_print(const struct lu_env * env,void * cookie,lu_printer_t printer,const struct lu_object_header * hdr)476 void lu_object_header_print(const struct lu_env *env, void *cookie,
477 			    lu_printer_t printer,
478 			    const struct lu_object_header *hdr)
479 {
480 	(*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]",
481 		   hdr, hdr->loh_flags, atomic_read(&hdr->loh_ref),
482 		   PFID(&hdr->loh_fid),
483 		   hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
484 		   list_empty((struct list_head *)&hdr->loh_lru) ? \
485 		   "" : " lru",
486 		   hdr->loh_attr & LOHA_EXISTS ? " exist":"");
487 }
488 EXPORT_SYMBOL(lu_object_header_print);
489 
490 /**
491  * Print human readable representation of the \a o to the \a printer.
492  */
lu_object_print(const struct lu_env * env,void * cookie,lu_printer_t printer,const struct lu_object * o)493 void lu_object_print(const struct lu_env *env, void *cookie,
494 		     lu_printer_t printer, const struct lu_object *o)
495 {
496 	static const char ruler[] = "........................................";
497 	struct lu_object_header *top;
498 	int depth = 4;
499 
500 	top = o->lo_header;
501 	lu_object_header_print(env, cookie, printer, top);
502 	(*printer)(env, cookie, "{\n");
503 
504 	list_for_each_entry(o, &top->loh_layers, lo_linkage) {
505 		/*
506 		 * print `.' \a depth times followed by type name and address
507 		 */
508 		(*printer)(env, cookie, "%*.*s%s@%p", depth, depth, ruler,
509 			   o->lo_dev->ld_type->ldt_name, o);
510 
511 		if (o->lo_ops->loo_object_print != NULL)
512 			(*o->lo_ops->loo_object_print)(env, cookie, printer, o);
513 
514 		(*printer)(env, cookie, "\n");
515 	}
516 
517 	(*printer)(env, cookie, "} header@%p\n", top);
518 }
519 EXPORT_SYMBOL(lu_object_print);
520 
htable_lookup(struct lu_site * s,struct cfs_hash_bd * bd,const struct lu_fid * f,wait_queue_t * waiter,__u64 * version)521 static struct lu_object *htable_lookup(struct lu_site *s,
522 				       struct cfs_hash_bd *bd,
523 				       const struct lu_fid *f,
524 				       wait_queue_t *waiter,
525 				       __u64 *version)
526 {
527 	struct lu_site_bkt_data *bkt;
528 	struct lu_object_header *h;
529 	struct hlist_node	*hnode;
530 	__u64  ver = cfs_hash_bd_version_get(bd);
531 
532 	if (*version == ver)
533 		return ERR_PTR(-ENOENT);
534 
535 	*version = ver;
536 	bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd);
537 	/* cfs_hash_bd_peek_locked is a somehow "internal" function
538 	 * of cfs_hash, it doesn't add refcount on object. */
539 	hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f);
540 	if (hnode == NULL) {
541 		lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
542 		return ERR_PTR(-ENOENT);
543 	}
544 
545 	h = container_of0(hnode, struct lu_object_header, loh_hash);
546 	if (likely(!lu_object_is_dying(h))) {
547 		cfs_hash_get(s->ls_obj_hash, hnode);
548 		lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
549 		if (!list_empty(&h->loh_lru)) {
550 			list_del_init(&h->loh_lru);
551 			bkt->lsb_lru_len--;
552 			lprocfs_counter_decr(s->ls_stats, LU_SS_LRU_LEN);
553 		}
554 		return lu_object_top(h);
555 	}
556 
557 	/*
558 	 * Lookup found an object being destroyed this object cannot be
559 	 * returned (to assure that references to dying objects are eventually
560 	 * drained), and moreover, lookup has to wait until object is freed.
561 	 */
562 
563 	init_waitqueue_entry(waiter, current);
564 	add_wait_queue(&bkt->lsb_marche_funebre, waiter);
565 	set_current_state(TASK_UNINTERRUPTIBLE);
566 	lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_DEATH_RACE);
567 	return ERR_PTR(-EAGAIN);
568 }
569 
570 /**
571  * Search cache for an object with the fid \a f. If such object is found,
572  * return it. Otherwise, create new object, insert it into cache and return
573  * it. In any case, additional reference is acquired on the returned object.
574  */
lu_object_find(const struct lu_env * env,struct lu_device * dev,const struct lu_fid * f,const struct lu_object_conf * conf)575 static struct lu_object *lu_object_find(const struct lu_env *env,
576 					struct lu_device *dev,
577 					const struct lu_fid *f,
578 					const struct lu_object_conf *conf)
579 {
580 	return lu_object_find_at(env, dev->ld_site->ls_top_dev, f, conf);
581 }
582 
lu_object_new(const struct lu_env * env,struct lu_device * dev,const struct lu_fid * f,const struct lu_object_conf * conf)583 static struct lu_object *lu_object_new(const struct lu_env *env,
584 				       struct lu_device *dev,
585 				       const struct lu_fid *f,
586 				       const struct lu_object_conf *conf)
587 {
588 	struct lu_object	*o;
589 	struct cfs_hash	      *hs;
590 	struct cfs_hash_bd	    bd;
591 
592 	o = lu_object_alloc(env, dev, f, conf);
593 	if (IS_ERR(o))
594 		return o;
595 
596 	hs = dev->ld_site->ls_obj_hash;
597 	cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
598 	cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
599 	cfs_hash_bd_unlock(hs, &bd, 1);
600 	return o;
601 }
602 
603 /**
604  * Core logic of lu_object_find*() functions.
605  */
lu_object_find_try(const struct lu_env * env,struct lu_device * dev,const struct lu_fid * f,const struct lu_object_conf * conf,wait_queue_t * waiter)606 static struct lu_object *lu_object_find_try(const struct lu_env *env,
607 					    struct lu_device *dev,
608 					    const struct lu_fid *f,
609 					    const struct lu_object_conf *conf,
610 					    wait_queue_t *waiter)
611 {
612 	struct lu_object      *o;
613 	struct lu_object      *shadow;
614 	struct lu_site	*s;
615 	struct cfs_hash	    *hs;
616 	struct cfs_hash_bd	  bd;
617 	__u64		  version = 0;
618 
619 	/*
620 	 * This uses standard index maintenance protocol:
621 	 *
622 	 *     - search index under lock, and return object if found;
623 	 *     - otherwise, unlock index, allocate new object;
624 	 *     - lock index and search again;
625 	 *     - if nothing is found (usual case), insert newly created
626 	 *       object into index;
627 	 *     - otherwise (race: other thread inserted object), free
628 	 *       object just allocated.
629 	 *     - unlock index;
630 	 *     - return object.
631 	 *
632 	 * For "LOC_F_NEW" case, we are sure the object is new established.
633 	 * It is unnecessary to perform lookup-alloc-lookup-insert, instead,
634 	 * just alloc and insert directly.
635 	 *
636 	 * If dying object is found during index search, add @waiter to the
637 	 * site wait-queue and return ERR_PTR(-EAGAIN).
638 	 */
639 	if (conf != NULL && conf->loc_flags & LOC_F_NEW)
640 		return lu_object_new(env, dev, f, conf);
641 
642 	s  = dev->ld_site;
643 	hs = s->ls_obj_hash;
644 	cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
645 	o = htable_lookup(s, &bd, f, waiter, &version);
646 	cfs_hash_bd_unlock(hs, &bd, 1);
647 	if (!IS_ERR(o) || PTR_ERR(o) != -ENOENT)
648 		return o;
649 
650 	/*
651 	 * Allocate new object. This may result in rather complicated
652 	 * operations, including fld queries, inode loading, etc.
653 	 */
654 	o = lu_object_alloc(env, dev, f, conf);
655 	if (IS_ERR(o))
656 		return o;
657 
658 	LASSERT(lu_fid_eq(lu_object_fid(o), f));
659 
660 	cfs_hash_bd_lock(hs, &bd, 1);
661 
662 	shadow = htable_lookup(s, &bd, f, waiter, &version);
663 	if (likely(PTR_ERR(shadow) == -ENOENT)) {
664 		cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
665 		cfs_hash_bd_unlock(hs, &bd, 1);
666 		return o;
667 	}
668 
669 	lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
670 	cfs_hash_bd_unlock(hs, &bd, 1);
671 	lu_object_free(env, o);
672 	return shadow;
673 }
674 
675 /**
676  * Much like lu_object_find(), but top level device of object is specifically
677  * \a dev rather than top level device of the site. This interface allows
678  * objects of different "stacking" to be created within the same site.
679  */
lu_object_find_at(const struct lu_env * env,struct lu_device * dev,const struct lu_fid * f,const struct lu_object_conf * conf)680 struct lu_object *lu_object_find_at(const struct lu_env *env,
681 				    struct lu_device *dev,
682 				    const struct lu_fid *f,
683 				    const struct lu_object_conf *conf)
684 {
685 	struct lu_site_bkt_data *bkt;
686 	struct lu_object	*obj;
687 	wait_queue_t	   wait;
688 
689 	while (1) {
690 		obj = lu_object_find_try(env, dev, f, conf, &wait);
691 		if (obj != ERR_PTR(-EAGAIN))
692 			return obj;
693 		/*
694 		 * lu_object_find_try() already added waiter into the
695 		 * wait queue.
696 		 */
697 		schedule();
698 		bkt = lu_site_bkt_from_fid(dev->ld_site, (void *)f);
699 		remove_wait_queue(&bkt->lsb_marche_funebre, &wait);
700 	}
701 }
702 EXPORT_SYMBOL(lu_object_find_at);
703 
704 /**
705  * Find object with given fid, and return its slice belonging to given device.
706  */
lu_object_find_slice(const struct lu_env * env,struct lu_device * dev,const struct lu_fid * f,const struct lu_object_conf * conf)707 struct lu_object *lu_object_find_slice(const struct lu_env *env,
708 				       struct lu_device *dev,
709 				       const struct lu_fid *f,
710 				       const struct lu_object_conf *conf)
711 {
712 	struct lu_object *top;
713 	struct lu_object *obj;
714 
715 	top = lu_object_find(env, dev, f, conf);
716 	if (!IS_ERR(top)) {
717 		obj = lu_object_locate(top->lo_header, dev->ld_type);
718 		if (obj == NULL)
719 			lu_object_put(env, top);
720 	} else
721 		obj = top;
722 	return obj;
723 }
724 EXPORT_SYMBOL(lu_object_find_slice);
725 
726 /**
727  * Global list of all device types.
728  */
729 static LIST_HEAD(lu_device_types);
730 
lu_device_type_init(struct lu_device_type * ldt)731 int lu_device_type_init(struct lu_device_type *ldt)
732 {
733 	int result = 0;
734 
735 	INIT_LIST_HEAD(&ldt->ldt_linkage);
736 	if (ldt->ldt_ops->ldto_init)
737 		result = ldt->ldt_ops->ldto_init(ldt);
738 	if (result == 0)
739 		list_add(&ldt->ldt_linkage, &lu_device_types);
740 	return result;
741 }
742 EXPORT_SYMBOL(lu_device_type_init);
743 
lu_device_type_fini(struct lu_device_type * ldt)744 void lu_device_type_fini(struct lu_device_type *ldt)
745 {
746 	list_del_init(&ldt->ldt_linkage);
747 	if (ldt->ldt_ops->ldto_fini)
748 		ldt->ldt_ops->ldto_fini(ldt);
749 }
750 EXPORT_SYMBOL(lu_device_type_fini);
751 
lu_types_stop(void)752 void lu_types_stop(void)
753 {
754 	struct lu_device_type *ldt;
755 
756 	list_for_each_entry(ldt, &lu_device_types, ldt_linkage) {
757 		if (ldt->ldt_device_nr == 0 && ldt->ldt_ops->ldto_stop)
758 			ldt->ldt_ops->ldto_stop(ldt);
759 	}
760 }
761 EXPORT_SYMBOL(lu_types_stop);
762 
763 /**
764  * Global list of all sites on this node
765  */
766 static LIST_HEAD(lu_sites);
767 static DEFINE_MUTEX(lu_sites_guard);
768 
769 /**
770  * Global environment used by site shrinker.
771  */
772 static struct lu_env lu_shrink_env;
773 
774 struct lu_site_print_arg {
775 	struct lu_env   *lsp_env;
776 	void	    *lsp_cookie;
777 	lu_printer_t     lsp_printer;
778 };
779 
780 static int
lu_site_obj_print(struct cfs_hash * hs,struct cfs_hash_bd * bd,struct hlist_node * hnode,void * data)781 lu_site_obj_print(struct cfs_hash *hs, struct cfs_hash_bd *bd,
782 		  struct hlist_node *hnode, void *data)
783 {
784 	struct lu_site_print_arg *arg = (struct lu_site_print_arg *)data;
785 	struct lu_object_header  *h;
786 
787 	h = hlist_entry(hnode, struct lu_object_header, loh_hash);
788 	if (!list_empty(&h->loh_layers)) {
789 		const struct lu_object *o;
790 
791 		o = lu_object_top(h);
792 		lu_object_print(arg->lsp_env, arg->lsp_cookie,
793 				arg->lsp_printer, o);
794 	} else {
795 		lu_object_header_print(arg->lsp_env, arg->lsp_cookie,
796 				       arg->lsp_printer, h);
797 	}
798 	return 0;
799 }
800 
801 /**
802  * Print all objects in \a s.
803  */
lu_site_print(const struct lu_env * env,struct lu_site * s,void * cookie,lu_printer_t printer)804 void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie,
805 		   lu_printer_t printer)
806 {
807 	struct lu_site_print_arg arg = {
808 		.lsp_env     = (struct lu_env *)env,
809 		.lsp_cookie  = cookie,
810 		.lsp_printer = printer,
811 	};
812 
813 	cfs_hash_for_each(s->ls_obj_hash, lu_site_obj_print, &arg);
814 }
815 EXPORT_SYMBOL(lu_site_print);
816 
817 enum {
818 	LU_CACHE_PERCENT_MAX     = 50,
819 	LU_CACHE_PERCENT_DEFAULT = 20
820 };
821 
822 static unsigned int lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
823 module_param(lu_cache_percent, int, 0644);
824 MODULE_PARM_DESC(lu_cache_percent, "Percentage of memory to be used as lu_object cache");
825 
826 /**
827  * Return desired hash table order.
828  */
lu_htable_order(void)829 static int lu_htable_order(void)
830 {
831 	unsigned long cache_size;
832 	int bits;
833 
834 	/*
835 	 * Calculate hash table size, assuming that we want reasonable
836 	 * performance when 20% of total memory is occupied by cache of
837 	 * lu_objects.
838 	 *
839 	 * Size of lu_object is (arbitrary) taken as 1K (together with inode).
840 	 */
841 	cache_size = totalram_pages;
842 
843 #if BITS_PER_LONG == 32
844 	/* limit hashtable size for lowmem systems to low RAM */
845 	if (cache_size > 1 << (30 - PAGE_CACHE_SHIFT))
846 		cache_size = 1 << (30 - PAGE_CACHE_SHIFT) * 3 / 4;
847 #endif
848 
849 	/* clear off unreasonable cache setting. */
850 	if (lu_cache_percent == 0 || lu_cache_percent > LU_CACHE_PERCENT_MAX) {
851 		CWARN("obdclass: invalid lu_cache_percent: %u, it must be in the range of (0, %u]. Will use default value: %u.\n",
852 		      lu_cache_percent, LU_CACHE_PERCENT_MAX,
853 		      LU_CACHE_PERCENT_DEFAULT);
854 
855 		lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
856 	}
857 	cache_size = cache_size / 100 * lu_cache_percent *
858 		(PAGE_CACHE_SIZE / 1024);
859 
860 	for (bits = 1; (1 << bits) < cache_size; ++bits) {
861 		;
862 	}
863 	return bits;
864 }
865 
lu_obj_hop_hash(struct cfs_hash * hs,const void * key,unsigned mask)866 static unsigned lu_obj_hop_hash(struct cfs_hash *hs,
867 				const void *key, unsigned mask)
868 {
869 	struct lu_fid  *fid = (struct lu_fid *)key;
870 	__u32	   hash;
871 
872 	hash = fid_flatten32(fid);
873 	hash += (hash >> 4) + (hash << 12); /* mixing oid and seq */
874 	hash = hash_long(hash, hs->hs_bkt_bits);
875 
876 	/* give me another random factor */
877 	hash -= hash_long((unsigned long)hs, fid_oid(fid) % 11 + 3);
878 
879 	hash <<= hs->hs_cur_bits - hs->hs_bkt_bits;
880 	hash |= (fid_seq(fid) + fid_oid(fid)) & (CFS_HASH_NBKT(hs) - 1);
881 
882 	return hash & mask;
883 }
884 
lu_obj_hop_object(struct hlist_node * hnode)885 static void *lu_obj_hop_object(struct hlist_node *hnode)
886 {
887 	return hlist_entry(hnode, struct lu_object_header, loh_hash);
888 }
889 
lu_obj_hop_key(struct hlist_node * hnode)890 static void *lu_obj_hop_key(struct hlist_node *hnode)
891 {
892 	struct lu_object_header *h;
893 
894 	h = hlist_entry(hnode, struct lu_object_header, loh_hash);
895 	return &h->loh_fid;
896 }
897 
lu_obj_hop_keycmp(const void * key,struct hlist_node * hnode)898 static int lu_obj_hop_keycmp(const void *key, struct hlist_node *hnode)
899 {
900 	struct lu_object_header *h;
901 
902 	h = hlist_entry(hnode, struct lu_object_header, loh_hash);
903 	return lu_fid_eq(&h->loh_fid, (struct lu_fid *)key);
904 }
905 
lu_obj_hop_get(struct cfs_hash * hs,struct hlist_node * hnode)906 static void lu_obj_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
907 {
908 	struct lu_object_header *h;
909 
910 	h = hlist_entry(hnode, struct lu_object_header, loh_hash);
911 	atomic_inc(&h->loh_ref);
912 }
913 
lu_obj_hop_put_locked(struct cfs_hash * hs,struct hlist_node * hnode)914 static void lu_obj_hop_put_locked(struct cfs_hash *hs, struct hlist_node *hnode)
915 {
916 	LBUG(); /* we should never called it */
917 }
918 
919 struct cfs_hash_ops lu_site_hash_ops = {
920 	.hs_hash	= lu_obj_hop_hash,
921 	.hs_key		= lu_obj_hop_key,
922 	.hs_keycmp      = lu_obj_hop_keycmp,
923 	.hs_object      = lu_obj_hop_object,
924 	.hs_get		= lu_obj_hop_get,
925 	.hs_put_locked  = lu_obj_hop_put_locked,
926 };
927 
lu_dev_add_linkage(struct lu_site * s,struct lu_device * d)928 static void lu_dev_add_linkage(struct lu_site *s, struct lu_device *d)
929 {
930 	spin_lock(&s->ls_ld_lock);
931 	if (list_empty(&d->ld_linkage))
932 		list_add(&d->ld_linkage, &s->ls_ld_linkage);
933 	spin_unlock(&s->ls_ld_lock);
934 }
935 
936 /**
937  * Initialize site \a s, with \a d as the top level device.
938  */
939 #define LU_SITE_BITS_MIN    12
940 #define LU_SITE_BITS_MAX    24
941 /**
942  * total 256 buckets, we don't want too many buckets because:
943  * - consume too much memory
944  * - avoid unbalanced LRU list
945  */
946 #define LU_SITE_BKT_BITS    8
947 
lu_site_init(struct lu_site * s,struct lu_device * top)948 int lu_site_init(struct lu_site *s, struct lu_device *top)
949 {
950 	struct lu_site_bkt_data *bkt;
951 	struct cfs_hash_bd bd;
952 	char name[16];
953 	int bits;
954 	int i;
955 
956 	memset(s, 0, sizeof(*s));
957 	bits = lu_htable_order();
958 	snprintf(name, 16, "lu_site_%s", top->ld_type->ldt_name);
959 	for (bits = min(max(LU_SITE_BITS_MIN, bits), LU_SITE_BITS_MAX);
960 	     bits >= LU_SITE_BITS_MIN; bits--) {
961 		s->ls_obj_hash = cfs_hash_create(name, bits, bits,
962 						 bits - LU_SITE_BKT_BITS,
963 						 sizeof(*bkt), 0, 0,
964 						 &lu_site_hash_ops,
965 						 CFS_HASH_SPIN_BKTLOCK |
966 						 CFS_HASH_NO_ITEMREF |
967 						 CFS_HASH_DEPTH |
968 						 CFS_HASH_ASSERT_EMPTY);
969 		if (s->ls_obj_hash != NULL)
970 			break;
971 	}
972 
973 	if (s->ls_obj_hash == NULL) {
974 		CERROR("failed to create lu_site hash with bits: %d\n", bits);
975 		return -ENOMEM;
976 	}
977 
978 	cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
979 		bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
980 		INIT_LIST_HEAD(&bkt->lsb_lru);
981 		init_waitqueue_head(&bkt->lsb_marche_funebre);
982 	}
983 
984 	s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0);
985 	if (s->ls_stats == NULL) {
986 		cfs_hash_putref(s->ls_obj_hash);
987 		s->ls_obj_hash = NULL;
988 		return -ENOMEM;
989 	}
990 
991 	lprocfs_counter_init(s->ls_stats, LU_SS_CREATED,
992 			     0, "created", "created");
993 	lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_HIT,
994 			     0, "cache_hit", "cache_hit");
995 	lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_MISS,
996 			     0, "cache_miss", "cache_miss");
997 	lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_RACE,
998 			     0, "cache_race", "cache_race");
999 	lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_DEATH_RACE,
1000 			     0, "cache_death_race", "cache_death_race");
1001 	lprocfs_counter_init(s->ls_stats, LU_SS_LRU_PURGED,
1002 			     0, "lru_purged", "lru_purged");
1003 	/*
1004 	 * Unlike other counters, lru_len can be decremented so
1005 	 * need lc_sum instead of just lc_count
1006 	 */
1007 	lprocfs_counter_init(s->ls_stats, LU_SS_LRU_LEN,
1008 			     LPROCFS_CNTR_AVGMINMAX, "lru_len", "lru_len");
1009 
1010 	INIT_LIST_HEAD(&s->ls_linkage);
1011 	s->ls_top_dev = top;
1012 	top->ld_site = s;
1013 	lu_device_get(top);
1014 	lu_ref_add(&top->ld_reference, "site-top", s);
1015 
1016 	INIT_LIST_HEAD(&s->ls_ld_linkage);
1017 	spin_lock_init(&s->ls_ld_lock);
1018 
1019 	lu_dev_add_linkage(s, top);
1020 
1021 	return 0;
1022 }
1023 EXPORT_SYMBOL(lu_site_init);
1024 
1025 /**
1026  * Finalize \a s and release its resources.
1027  */
lu_site_fini(struct lu_site * s)1028 void lu_site_fini(struct lu_site *s)
1029 {
1030 	mutex_lock(&lu_sites_guard);
1031 	list_del_init(&s->ls_linkage);
1032 	mutex_unlock(&lu_sites_guard);
1033 
1034 	if (s->ls_obj_hash != NULL) {
1035 		cfs_hash_putref(s->ls_obj_hash);
1036 		s->ls_obj_hash = NULL;
1037 	}
1038 
1039 	if (s->ls_top_dev != NULL) {
1040 		s->ls_top_dev->ld_site = NULL;
1041 		lu_ref_del(&s->ls_top_dev->ld_reference, "site-top", s);
1042 		lu_device_put(s->ls_top_dev);
1043 		s->ls_top_dev = NULL;
1044 	}
1045 
1046 	if (s->ls_stats != NULL)
1047 		lprocfs_free_stats(&s->ls_stats);
1048 }
1049 EXPORT_SYMBOL(lu_site_fini);
1050 
1051 /**
1052  * Called when initialization of stack for this site is completed.
1053  */
lu_site_init_finish(struct lu_site * s)1054 int lu_site_init_finish(struct lu_site *s)
1055 {
1056 	int result;
1057 
1058 	mutex_lock(&lu_sites_guard);
1059 	result = lu_context_refill(&lu_shrink_env.le_ctx);
1060 	if (result == 0)
1061 		list_add(&s->ls_linkage, &lu_sites);
1062 	mutex_unlock(&lu_sites_guard);
1063 	return result;
1064 }
1065 EXPORT_SYMBOL(lu_site_init_finish);
1066 
1067 /**
1068  * Acquire additional reference on device \a d
1069  */
lu_device_get(struct lu_device * d)1070 void lu_device_get(struct lu_device *d)
1071 {
1072 	atomic_inc(&d->ld_ref);
1073 }
1074 EXPORT_SYMBOL(lu_device_get);
1075 
1076 /**
1077  * Release reference on device \a d.
1078  */
lu_device_put(struct lu_device * d)1079 void lu_device_put(struct lu_device *d)
1080 {
1081 	LASSERT(atomic_read(&d->ld_ref) > 0);
1082 	atomic_dec(&d->ld_ref);
1083 }
1084 EXPORT_SYMBOL(lu_device_put);
1085 
1086 /**
1087  * Initialize device \a d of type \a t.
1088  */
lu_device_init(struct lu_device * d,struct lu_device_type * t)1089 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
1090 {
1091 	if (t->ldt_device_nr++ == 0 && t->ldt_ops->ldto_start != NULL)
1092 		t->ldt_ops->ldto_start(t);
1093 	memset(d, 0, sizeof(*d));
1094 	atomic_set(&d->ld_ref, 0);
1095 	d->ld_type = t;
1096 	lu_ref_init(&d->ld_reference);
1097 	INIT_LIST_HEAD(&d->ld_linkage);
1098 	return 0;
1099 }
1100 EXPORT_SYMBOL(lu_device_init);
1101 
1102 /**
1103  * Finalize device \a d.
1104  */
lu_device_fini(struct lu_device * d)1105 void lu_device_fini(struct lu_device *d)
1106 {
1107 	struct lu_device_type *t;
1108 
1109 	t = d->ld_type;
1110 	if (d->ld_obd != NULL) {
1111 		d->ld_obd->obd_lu_dev = NULL;
1112 		d->ld_obd = NULL;
1113 	}
1114 
1115 	lu_ref_fini(&d->ld_reference);
1116 	LASSERTF(atomic_read(&d->ld_ref) == 0,
1117 		 "Refcount is %u\n", atomic_read(&d->ld_ref));
1118 	LASSERT(t->ldt_device_nr > 0);
1119 	if (--t->ldt_device_nr == 0 && t->ldt_ops->ldto_stop != NULL)
1120 		t->ldt_ops->ldto_stop(t);
1121 }
1122 EXPORT_SYMBOL(lu_device_fini);
1123 
1124 /**
1125  * Initialize object \a o that is part of compound object \a h and was created
1126  * by device \a d.
1127  */
lu_object_init(struct lu_object * o,struct lu_object_header * h,struct lu_device * d)1128 int lu_object_init(struct lu_object *o, struct lu_object_header *h,
1129 		   struct lu_device *d)
1130 {
1131 	memset(o, 0, sizeof(*o));
1132 	o->lo_header = h;
1133 	o->lo_dev = d;
1134 	lu_device_get(d);
1135 	lu_ref_add_at(&d->ld_reference, &o->lo_dev_ref, "lu_object", o);
1136 	INIT_LIST_HEAD(&o->lo_linkage);
1137 
1138 	return 0;
1139 }
1140 EXPORT_SYMBOL(lu_object_init);
1141 
1142 /**
1143  * Finalize object and release its resources.
1144  */
lu_object_fini(struct lu_object * o)1145 void lu_object_fini(struct lu_object *o)
1146 {
1147 	struct lu_device *dev = o->lo_dev;
1148 
1149 	LASSERT(list_empty(&o->lo_linkage));
1150 
1151 	if (dev != NULL) {
1152 		lu_ref_del_at(&dev->ld_reference, &o->lo_dev_ref,
1153 			      "lu_object", o);
1154 		lu_device_put(dev);
1155 		o->lo_dev = NULL;
1156 	}
1157 }
1158 EXPORT_SYMBOL(lu_object_fini);
1159 
1160 /**
1161  * Add object \a o as first layer of compound object \a h
1162  *
1163  * This is typically called by the ->ldo_object_alloc() method of top-level
1164  * device.
1165  */
lu_object_add_top(struct lu_object_header * h,struct lu_object * o)1166 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
1167 {
1168 	list_move(&o->lo_linkage, &h->loh_layers);
1169 }
1170 EXPORT_SYMBOL(lu_object_add_top);
1171 
1172 /**
1173  * Add object \a o as a layer of compound object, going after \a before.
1174  *
1175  * This is typically called by the ->ldo_object_alloc() method of \a
1176  * before->lo_dev.
1177  */
lu_object_add(struct lu_object * before,struct lu_object * o)1178 void lu_object_add(struct lu_object *before, struct lu_object *o)
1179 {
1180 	list_move(&o->lo_linkage, &before->lo_linkage);
1181 }
1182 EXPORT_SYMBOL(lu_object_add);
1183 
1184 /**
1185  * Initialize compound object.
1186  */
lu_object_header_init(struct lu_object_header * h)1187 int lu_object_header_init(struct lu_object_header *h)
1188 {
1189 	memset(h, 0, sizeof(*h));
1190 	atomic_set(&h->loh_ref, 1);
1191 	INIT_HLIST_NODE(&h->loh_hash);
1192 	INIT_LIST_HEAD(&h->loh_lru);
1193 	INIT_LIST_HEAD(&h->loh_layers);
1194 	lu_ref_init(&h->loh_reference);
1195 	return 0;
1196 }
1197 EXPORT_SYMBOL(lu_object_header_init);
1198 
1199 /**
1200  * Finalize compound object.
1201  */
lu_object_header_fini(struct lu_object_header * h)1202 void lu_object_header_fini(struct lu_object_header *h)
1203 {
1204 	LASSERT(list_empty(&h->loh_layers));
1205 	LASSERT(list_empty(&h->loh_lru));
1206 	LASSERT(hlist_unhashed(&h->loh_hash));
1207 	lu_ref_fini(&h->loh_reference);
1208 }
1209 EXPORT_SYMBOL(lu_object_header_fini);
1210 
1211 /**
1212  * Given a compound object, find its slice, corresponding to the device type
1213  * \a dtype.
1214  */
lu_object_locate(struct lu_object_header * h,const struct lu_device_type * dtype)1215 struct lu_object *lu_object_locate(struct lu_object_header *h,
1216 				   const struct lu_device_type *dtype)
1217 {
1218 	struct lu_object *o;
1219 
1220 	list_for_each_entry(o, &h->loh_layers, lo_linkage) {
1221 		if (o->lo_dev->ld_type == dtype)
1222 			return o;
1223 	}
1224 	return NULL;
1225 }
1226 EXPORT_SYMBOL(lu_object_locate);
1227 
1228 /**
1229  * Finalize and free devices in the device stack.
1230  *
1231  * Finalize device stack by purging object cache, and calling
1232  * lu_device_type_operations::ldto_device_fini() and
1233  * lu_device_type_operations::ldto_device_free() on all devices in the stack.
1234  */
lu_stack_fini(const struct lu_env * env,struct lu_device * top)1235 void lu_stack_fini(const struct lu_env *env, struct lu_device *top)
1236 {
1237 	struct lu_site   *site = top->ld_site;
1238 	struct lu_device *scan;
1239 	struct lu_device *next;
1240 
1241 	lu_site_purge(env, site, ~0);
1242 	for (scan = top; scan != NULL; scan = next) {
1243 		next = scan->ld_type->ldt_ops->ldto_device_fini(env, scan);
1244 		lu_ref_del(&scan->ld_reference, "lu-stack", &lu_site_init);
1245 		lu_device_put(scan);
1246 	}
1247 
1248 	/* purge again. */
1249 	lu_site_purge(env, site, ~0);
1250 
1251 	for (scan = top; scan != NULL; scan = next) {
1252 		const struct lu_device_type *ldt = scan->ld_type;
1253 		struct obd_type	     *type;
1254 
1255 		next = ldt->ldt_ops->ldto_device_free(env, scan);
1256 		type = ldt->ldt_obd_type;
1257 		if (type != NULL) {
1258 			type->typ_refcnt--;
1259 			class_put_type(type);
1260 		}
1261 	}
1262 }
1263 EXPORT_SYMBOL(lu_stack_fini);
1264 
1265 enum {
1266 	/**
1267 	 * Maximal number of tld slots.
1268 	 */
1269 	LU_CONTEXT_KEY_NR = 40
1270 };
1271 
1272 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
1273 
1274 static DEFINE_SPINLOCK(lu_keys_guard);
1275 
1276 /**
1277  * Global counter incremented whenever key is registered, unregistered,
1278  * revived or quiesced. This is used to void unnecessary calls to
1279  * lu_context_refill(). No locking is provided, as initialization and shutdown
1280  * are supposed to be externally serialized.
1281  */
1282 static unsigned key_set_version;
1283 
1284 /**
1285  * Register new key.
1286  */
lu_context_key_register(struct lu_context_key * key)1287 int lu_context_key_register(struct lu_context_key *key)
1288 {
1289 	int result;
1290 	int i;
1291 
1292 	LASSERT(key->lct_init != NULL);
1293 	LASSERT(key->lct_fini != NULL);
1294 	LASSERT(key->lct_tags != 0);
1295 
1296 	result = -ENFILE;
1297 	spin_lock(&lu_keys_guard);
1298 	for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1299 		if (lu_keys[i] == NULL) {
1300 			key->lct_index = i;
1301 			atomic_set(&key->lct_used, 1);
1302 			lu_keys[i] = key;
1303 			lu_ref_init(&key->lct_reference);
1304 			result = 0;
1305 			++key_set_version;
1306 			break;
1307 		}
1308 	}
1309 	spin_unlock(&lu_keys_guard);
1310 	return result;
1311 }
1312 EXPORT_SYMBOL(lu_context_key_register);
1313 
key_fini(struct lu_context * ctx,int index)1314 static void key_fini(struct lu_context *ctx, int index)
1315 {
1316 	if (ctx->lc_value != NULL && ctx->lc_value[index] != NULL) {
1317 		struct lu_context_key *key;
1318 
1319 		key = lu_keys[index];
1320 		LASSERT(key != NULL);
1321 		LASSERT(key->lct_fini != NULL);
1322 		LASSERT(atomic_read(&key->lct_used) > 1);
1323 
1324 		key->lct_fini(ctx, key, ctx->lc_value[index]);
1325 		lu_ref_del(&key->lct_reference, "ctx", ctx);
1326 		atomic_dec(&key->lct_used);
1327 
1328 		if ((ctx->lc_tags & LCT_NOREF) == 0) {
1329 #ifdef CONFIG_MODULE_UNLOAD
1330 			LINVRNT(module_refcount(key->lct_owner) > 0);
1331 #endif
1332 			module_put(key->lct_owner);
1333 		}
1334 		ctx->lc_value[index] = NULL;
1335 	}
1336 }
1337 
1338 /**
1339  * Deregister key.
1340  */
lu_context_key_degister(struct lu_context_key * key)1341 void lu_context_key_degister(struct lu_context_key *key)
1342 {
1343 	LASSERT(atomic_read(&key->lct_used) >= 1);
1344 	LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1345 
1346 	lu_context_key_quiesce(key);
1347 
1348 	++key_set_version;
1349 	spin_lock(&lu_keys_guard);
1350 	key_fini(&lu_shrink_env.le_ctx, key->lct_index);
1351 	if (lu_keys[key->lct_index]) {
1352 		lu_keys[key->lct_index] = NULL;
1353 		lu_ref_fini(&key->lct_reference);
1354 	}
1355 	spin_unlock(&lu_keys_guard);
1356 
1357 	LASSERTF(atomic_read(&key->lct_used) == 1,
1358 		 "key has instances: %d\n",
1359 		 atomic_read(&key->lct_used));
1360 }
1361 EXPORT_SYMBOL(lu_context_key_degister);
1362 
1363 /**
1364  * Register a number of keys. This has to be called after all keys have been
1365  * initialized by a call to LU_CONTEXT_KEY_INIT().
1366  */
lu_context_key_register_many(struct lu_context_key * k,...)1367 int lu_context_key_register_many(struct lu_context_key *k, ...)
1368 {
1369 	struct lu_context_key *key = k;
1370 	va_list args;
1371 	int result;
1372 
1373 	va_start(args, k);
1374 	do {
1375 		result = lu_context_key_register(key);
1376 		if (result)
1377 			break;
1378 		key = va_arg(args, struct lu_context_key *);
1379 	} while (key != NULL);
1380 	va_end(args);
1381 
1382 	if (result != 0) {
1383 		va_start(args, k);
1384 		while (k != key) {
1385 			lu_context_key_degister(k);
1386 			k = va_arg(args, struct lu_context_key *);
1387 		}
1388 		va_end(args);
1389 	}
1390 
1391 	return result;
1392 }
1393 EXPORT_SYMBOL(lu_context_key_register_many);
1394 
1395 /**
1396  * De-register a number of keys. This is a dual to
1397  * lu_context_key_register_many().
1398  */
lu_context_key_degister_many(struct lu_context_key * k,...)1399 void lu_context_key_degister_many(struct lu_context_key *k, ...)
1400 {
1401 	va_list args;
1402 
1403 	va_start(args, k);
1404 	do {
1405 		lu_context_key_degister(k);
1406 		k = va_arg(args, struct lu_context_key*);
1407 	} while (k != NULL);
1408 	va_end(args);
1409 }
1410 EXPORT_SYMBOL(lu_context_key_degister_many);
1411 
1412 /**
1413  * Revive a number of keys.
1414  */
lu_context_key_revive_many(struct lu_context_key * k,...)1415 void lu_context_key_revive_many(struct lu_context_key *k, ...)
1416 {
1417 	va_list args;
1418 
1419 	va_start(args, k);
1420 	do {
1421 		lu_context_key_revive(k);
1422 		k = va_arg(args, struct lu_context_key*);
1423 	} while (k != NULL);
1424 	va_end(args);
1425 }
1426 EXPORT_SYMBOL(lu_context_key_revive_many);
1427 
1428 /**
1429  * Quiescent a number of keys.
1430  */
lu_context_key_quiesce_many(struct lu_context_key * k,...)1431 void lu_context_key_quiesce_many(struct lu_context_key *k, ...)
1432 {
1433 	va_list args;
1434 
1435 	va_start(args, k);
1436 	do {
1437 		lu_context_key_quiesce(k);
1438 		k = va_arg(args, struct lu_context_key*);
1439 	} while (k != NULL);
1440 	va_end(args);
1441 }
1442 EXPORT_SYMBOL(lu_context_key_quiesce_many);
1443 
1444 /**
1445  * Return value associated with key \a key in context \a ctx.
1446  */
lu_context_key_get(const struct lu_context * ctx,const struct lu_context_key * key)1447 void *lu_context_key_get(const struct lu_context *ctx,
1448 			 const struct lu_context_key *key)
1449 {
1450 	LINVRNT(ctx->lc_state == LCS_ENTERED);
1451 	LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1452 	LASSERT(lu_keys[key->lct_index] == key);
1453 	return ctx->lc_value[key->lct_index];
1454 }
1455 EXPORT_SYMBOL(lu_context_key_get);
1456 
1457 /**
1458  * List of remembered contexts. XXX document me.
1459  */
1460 static LIST_HEAD(lu_context_remembered);
1461 
1462 /**
1463  * Destroy \a key in all remembered contexts. This is used to destroy key
1464  * values in "shared" contexts (like service threads), when a module owning
1465  * the key is about to be unloaded.
1466  */
lu_context_key_quiesce(struct lu_context_key * key)1467 void lu_context_key_quiesce(struct lu_context_key *key)
1468 {
1469 	struct lu_context *ctx;
1470 
1471 	if (!(key->lct_tags & LCT_QUIESCENT)) {
1472 		/*
1473 		 * XXX layering violation.
1474 		 */
1475 		key->lct_tags |= LCT_QUIESCENT;
1476 		/*
1477 		 * XXX memory barrier has to go here.
1478 		 */
1479 		spin_lock(&lu_keys_guard);
1480 		list_for_each_entry(ctx, &lu_context_remembered,
1481 					lc_remember)
1482 			key_fini(ctx, key->lct_index);
1483 		spin_unlock(&lu_keys_guard);
1484 		++key_set_version;
1485 	}
1486 }
1487 EXPORT_SYMBOL(lu_context_key_quiesce);
1488 
lu_context_key_revive(struct lu_context_key * key)1489 void lu_context_key_revive(struct lu_context_key *key)
1490 {
1491 	key->lct_tags &= ~LCT_QUIESCENT;
1492 	++key_set_version;
1493 }
1494 EXPORT_SYMBOL(lu_context_key_revive);
1495 
keys_fini(struct lu_context * ctx)1496 static void keys_fini(struct lu_context *ctx)
1497 {
1498 	int	i;
1499 
1500 	if (ctx->lc_value == NULL)
1501 		return;
1502 
1503 	for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
1504 		key_fini(ctx, i);
1505 
1506 	kfree(ctx->lc_value);
1507 	ctx->lc_value = NULL;
1508 }
1509 
keys_fill(struct lu_context * ctx)1510 static int keys_fill(struct lu_context *ctx)
1511 {
1512 	int i;
1513 
1514 	LINVRNT(ctx->lc_value != NULL);
1515 	for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1516 		struct lu_context_key *key;
1517 
1518 		key = lu_keys[i];
1519 		if (ctx->lc_value[i] == NULL && key != NULL &&
1520 		    (key->lct_tags & ctx->lc_tags) &&
1521 		    /*
1522 		     * Don't create values for a LCT_QUIESCENT key, as this
1523 		     * will pin module owning a key.
1524 		     */
1525 		    !(key->lct_tags & LCT_QUIESCENT)) {
1526 			void *value;
1527 
1528 			LINVRNT(key->lct_init != NULL);
1529 			LINVRNT(key->lct_index == i);
1530 
1531 			value = key->lct_init(ctx, key);
1532 			if (IS_ERR(value))
1533 				return PTR_ERR(value);
1534 
1535 			if (!(ctx->lc_tags & LCT_NOREF))
1536 				try_module_get(key->lct_owner);
1537 			lu_ref_add_atomic(&key->lct_reference, "ctx", ctx);
1538 			atomic_inc(&key->lct_used);
1539 			/*
1540 			 * This is the only place in the code, where an
1541 			 * element of ctx->lc_value[] array is set to non-NULL
1542 			 * value.
1543 			 */
1544 			ctx->lc_value[i] = value;
1545 			if (key->lct_exit != NULL)
1546 				ctx->lc_tags |= LCT_HAS_EXIT;
1547 		}
1548 		ctx->lc_version = key_set_version;
1549 	}
1550 	return 0;
1551 }
1552 
keys_init(struct lu_context * ctx)1553 static int keys_init(struct lu_context *ctx)
1554 {
1555 	ctx->lc_value = kcalloc(ARRAY_SIZE(lu_keys), sizeof(ctx->lc_value[0]),
1556 				GFP_NOFS);
1557 	if (likely(ctx->lc_value != NULL))
1558 		return keys_fill(ctx);
1559 
1560 	return -ENOMEM;
1561 }
1562 
1563 /**
1564  * Initialize context data-structure. Create values for all keys.
1565  */
lu_context_init(struct lu_context * ctx,__u32 tags)1566 int lu_context_init(struct lu_context *ctx, __u32 tags)
1567 {
1568 	int	rc;
1569 
1570 	memset(ctx, 0, sizeof(*ctx));
1571 	ctx->lc_state = LCS_INITIALIZED;
1572 	ctx->lc_tags = tags;
1573 	if (tags & LCT_REMEMBER) {
1574 		spin_lock(&lu_keys_guard);
1575 		list_add(&ctx->lc_remember, &lu_context_remembered);
1576 		spin_unlock(&lu_keys_guard);
1577 	} else {
1578 		INIT_LIST_HEAD(&ctx->lc_remember);
1579 	}
1580 
1581 	rc = keys_init(ctx);
1582 	if (rc != 0)
1583 		lu_context_fini(ctx);
1584 
1585 	return rc;
1586 }
1587 EXPORT_SYMBOL(lu_context_init);
1588 
1589 /**
1590  * Finalize context data-structure. Destroy key values.
1591  */
lu_context_fini(struct lu_context * ctx)1592 void lu_context_fini(struct lu_context *ctx)
1593 {
1594 	LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1595 	ctx->lc_state = LCS_FINALIZED;
1596 
1597 	if ((ctx->lc_tags & LCT_REMEMBER) == 0) {
1598 		LASSERT(list_empty(&ctx->lc_remember));
1599 		keys_fini(ctx);
1600 
1601 	} else { /* could race with key degister */
1602 		spin_lock(&lu_keys_guard);
1603 		keys_fini(ctx);
1604 		list_del_init(&ctx->lc_remember);
1605 		spin_unlock(&lu_keys_guard);
1606 	}
1607 }
1608 EXPORT_SYMBOL(lu_context_fini);
1609 
1610 /**
1611  * Called before entering context.
1612  */
lu_context_enter(struct lu_context * ctx)1613 void lu_context_enter(struct lu_context *ctx)
1614 {
1615 	LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1616 	ctx->lc_state = LCS_ENTERED;
1617 }
1618 EXPORT_SYMBOL(lu_context_enter);
1619 
1620 /**
1621  * Called after exiting from \a ctx
1622  */
lu_context_exit(struct lu_context * ctx)1623 void lu_context_exit(struct lu_context *ctx)
1624 {
1625 	int i;
1626 
1627 	LINVRNT(ctx->lc_state == LCS_ENTERED);
1628 	ctx->lc_state = LCS_LEFT;
1629 	if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value != NULL) {
1630 		for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1631 			if (ctx->lc_value[i] != NULL) {
1632 				struct lu_context_key *key;
1633 
1634 				key = lu_keys[i];
1635 				LASSERT(key != NULL);
1636 				if (key->lct_exit != NULL)
1637 					key->lct_exit(ctx,
1638 						      key, ctx->lc_value[i]);
1639 			}
1640 		}
1641 	}
1642 }
1643 EXPORT_SYMBOL(lu_context_exit);
1644 
1645 /**
1646  * Allocate for context all missing keys that were registered after context
1647  * creation. key_set_version is only changed in rare cases when modules
1648  * are loaded and removed.
1649  */
lu_context_refill(struct lu_context * ctx)1650 int lu_context_refill(struct lu_context *ctx)
1651 {
1652 	return likely(ctx->lc_version == key_set_version) ? 0 : keys_fill(ctx);
1653 }
1654 EXPORT_SYMBOL(lu_context_refill);
1655 
1656 /**
1657  * lu_ctx_tags/lu_ses_tags will be updated if there are new types of
1658  * obd being added. Currently, this is only used on client side, specifically
1659  * for echo device client, for other stack (like ptlrpc threads), context are
1660  * predefined when the lu_device type are registered, during the module probe
1661  * phase.
1662  */
1663 __u32 lu_context_tags_default;
1664 __u32 lu_session_tags_default;
1665 
lu_env_init(struct lu_env * env,__u32 tags)1666 int lu_env_init(struct lu_env *env, __u32 tags)
1667 {
1668 	int result;
1669 
1670 	env->le_ses = NULL;
1671 	result = lu_context_init(&env->le_ctx, tags);
1672 	if (likely(result == 0))
1673 		lu_context_enter(&env->le_ctx);
1674 	return result;
1675 }
1676 EXPORT_SYMBOL(lu_env_init);
1677 
lu_env_fini(struct lu_env * env)1678 void lu_env_fini(struct lu_env *env)
1679 {
1680 	lu_context_exit(&env->le_ctx);
1681 	lu_context_fini(&env->le_ctx);
1682 	env->le_ses = NULL;
1683 }
1684 EXPORT_SYMBOL(lu_env_fini);
1685 
lu_env_refill(struct lu_env * env)1686 int lu_env_refill(struct lu_env *env)
1687 {
1688 	int result;
1689 
1690 	result = lu_context_refill(&env->le_ctx);
1691 	if (result == 0 && env->le_ses != NULL)
1692 		result = lu_context_refill(env->le_ses);
1693 	return result;
1694 }
1695 EXPORT_SYMBOL(lu_env_refill);
1696 
1697 struct lu_site_stats {
1698 	unsigned	lss_populated;
1699 	unsigned	lss_max_search;
1700 	unsigned	lss_total;
1701 	unsigned	lss_busy;
1702 };
1703 
lu_site_stats_get(struct cfs_hash * hs,struct lu_site_stats * stats,int populated)1704 static void lu_site_stats_get(struct cfs_hash *hs,
1705 			      struct lu_site_stats *stats, int populated)
1706 {
1707 	struct cfs_hash_bd bd;
1708 	int	   i;
1709 
1710 	cfs_hash_for_each_bucket(hs, &bd, i) {
1711 		struct lu_site_bkt_data *bkt = cfs_hash_bd_extra_get(hs, &bd);
1712 		struct hlist_head	*hhead;
1713 
1714 		cfs_hash_bd_lock(hs, &bd, 1);
1715 		stats->lss_busy  +=
1716 			cfs_hash_bd_count_get(&bd) - bkt->lsb_lru_len;
1717 		stats->lss_total += cfs_hash_bd_count_get(&bd);
1718 		stats->lss_max_search = max((int)stats->lss_max_search,
1719 					    cfs_hash_bd_depmax_get(&bd));
1720 		if (!populated) {
1721 			cfs_hash_bd_unlock(hs, &bd, 1);
1722 			continue;
1723 		}
1724 
1725 		cfs_hash_bd_for_each_hlist(hs, &bd, hhead) {
1726 			if (!hlist_empty(hhead))
1727 				stats->lss_populated++;
1728 		}
1729 		cfs_hash_bd_unlock(hs, &bd, 1);
1730 	}
1731 }
1732 
1733 /*
1734  * lu_cache_shrink_count returns the number of cached objects that are
1735  * candidates to be freed by shrink_slab(). A counter, which tracks
1736  * the number of items in the site's lru, is maintained in the per cpu
1737  * stats of each site. The counter is incremented when an object is added
1738  * to a site's lru and decremented when one is removed. The number of
1739  * free-able objects is the sum of all per cpu counters for all sites.
1740  *
1741  * Using a per cpu counter is a compromise solution to concurrent access:
1742  * lu_object_put() can update the counter without locking the site and
1743  * lu_cache_shrink_count can sum the counters without locking each
1744  * ls_obj_hash bucket.
1745  */
lu_cache_shrink_count(struct shrinker * sk,struct shrink_control * sc)1746 static unsigned long lu_cache_shrink_count(struct shrinker *sk,
1747 					   struct shrink_control *sc)
1748 {
1749 	struct lu_site *s;
1750 	struct lu_site *tmp;
1751 	unsigned long cached = 0;
1752 
1753 	if (!(sc->gfp_mask & __GFP_FS))
1754 		return 0;
1755 
1756 	mutex_lock(&lu_sites_guard);
1757 	list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
1758 		cached += ls_stats_read(s->ls_stats, LU_SS_LRU_LEN);
1759 	}
1760 	mutex_unlock(&lu_sites_guard);
1761 
1762 	cached = (cached / 100) * sysctl_vfs_cache_pressure;
1763 	CDEBUG(D_INODE, "%ld objects cached, cache pressure %d\n",
1764 	       cached, sysctl_vfs_cache_pressure);
1765 
1766 	return cached;
1767 }
1768 
lu_cache_shrink_scan(struct shrinker * sk,struct shrink_control * sc)1769 static unsigned long lu_cache_shrink_scan(struct shrinker *sk,
1770 					  struct shrink_control *sc)
1771 {
1772 	struct lu_site *s;
1773 	struct lu_site *tmp;
1774 	unsigned long remain = sc->nr_to_scan, freed = 0;
1775 	LIST_HEAD(splice);
1776 
1777 	if (!(sc->gfp_mask & __GFP_FS))
1778 		/* We must not take the lu_sites_guard lock when
1779 		 * __GFP_FS is *not* set because of the deadlock
1780 		 * possibility detailed above. Additionally,
1781 		 * since we cannot determine the number of
1782 		 * objects in the cache without taking this
1783 		 * lock, we're in a particularly tough spot. As
1784 		 * a result, we'll just lie and say our cache is
1785 		 * empty. This _should_ be ok, as we can't
1786 		 * reclaim objects when __GFP_FS is *not* set
1787 		 * anyways.
1788 		 */
1789 		return SHRINK_STOP;
1790 
1791 	mutex_lock(&lu_sites_guard);
1792 	list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
1793 		freed = lu_site_purge(&lu_shrink_env, s, remain);
1794 		remain -= freed;
1795 		/*
1796 		 * Move just shrunk site to the tail of site list to
1797 		 * assure shrinking fairness.
1798 		 */
1799 		list_move_tail(&s->ls_linkage, &splice);
1800 	}
1801 	list_splice(&splice, lu_sites.prev);
1802 	mutex_unlock(&lu_sites_guard);
1803 
1804 	return sc->nr_to_scan - remain;
1805 }
1806 
1807 /**
1808  * Debugging printer function using printk().
1809  */
1810 static struct shrinker lu_site_shrinker = {
1811 	.count_objects	= lu_cache_shrink_count,
1812 	.scan_objects	= lu_cache_shrink_scan,
1813 	.seeks 		= DEFAULT_SEEKS,
1814 };
1815 
1816 /**
1817  * Initialization of global lu_* data.
1818  */
lu_global_init(void)1819 int lu_global_init(void)
1820 {
1821 	int result;
1822 
1823 	CDEBUG(D_INFO, "Lustre LU module (%p).\n", &lu_keys);
1824 
1825 	result = lu_ref_global_init();
1826 	if (result != 0)
1827 		return result;
1828 
1829 	LU_CONTEXT_KEY_INIT(&lu_global_key);
1830 	result = lu_context_key_register(&lu_global_key);
1831 	if (result != 0)
1832 		return result;
1833 
1834 	/*
1835 	 * At this level, we don't know what tags are needed, so allocate them
1836 	 * conservatively. This should not be too bad, because this
1837 	 * environment is global.
1838 	 */
1839 	mutex_lock(&lu_sites_guard);
1840 	result = lu_env_init(&lu_shrink_env, LCT_SHRINKER);
1841 	mutex_unlock(&lu_sites_guard);
1842 	if (result != 0)
1843 		return result;
1844 
1845 	/*
1846 	 * seeks estimation: 3 seeks to read a record from oi, one to read
1847 	 * inode, one for ea. Unfortunately setting this high value results in
1848 	 * lu_object/inode cache consuming all the memory.
1849 	 */
1850 	register_shrinker(&lu_site_shrinker);
1851 
1852 	return result;
1853 }
1854 
1855 /**
1856  * Dual to lu_global_init().
1857  */
lu_global_fini(void)1858 void lu_global_fini(void)
1859 {
1860 	unregister_shrinker(&lu_site_shrinker);
1861 	lu_context_key_degister(&lu_global_key);
1862 
1863 	/*
1864 	 * Tear shrinker environment down _after_ de-registering
1865 	 * lu_global_key, because the latter has a value in the former.
1866 	 */
1867 	mutex_lock(&lu_sites_guard);
1868 	lu_env_fini(&lu_shrink_env);
1869 	mutex_unlock(&lu_sites_guard);
1870 
1871 	lu_ref_global_fini();
1872 }
1873 
ls_stats_read(struct lprocfs_stats * stats,int idx)1874 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx)
1875 {
1876 	struct lprocfs_counter ret;
1877 
1878 	lprocfs_stats_collect(stats, idx, &ret);
1879 	if (idx == LU_SS_LRU_LEN)
1880 		/*
1881 		 * protect against counter on cpu A being decremented
1882 		 * before counter is incremented on cpu B; unlikely
1883 		 */
1884 		return (__u32)((ret.lc_sum > 0) ? ret.lc_sum : 0);
1885 
1886 	return (__u32)ret.lc_count;
1887 }
1888 
1889 /**
1890  * Output site statistical counters into a buffer. Suitable for
1891  * lprocfs_rd_*()-style functions.
1892  */
lu_site_stats_print(const struct lu_site * s,struct seq_file * m)1893 int lu_site_stats_print(const struct lu_site *s, struct seq_file *m)
1894 {
1895 	struct lu_site_stats stats;
1896 
1897 	memset(&stats, 0, sizeof(stats));
1898 	lu_site_stats_get(s->ls_obj_hash, &stats, 1);
1899 
1900 	seq_printf(m, "%d/%d %d/%d %d %d %d %d %d %d %d %d\n",
1901 		   stats.lss_busy,
1902 		   stats.lss_total,
1903 		   stats.lss_populated,
1904 		   CFS_HASH_NHLIST(s->ls_obj_hash),
1905 		   stats.lss_max_search,
1906 		   ls_stats_read(s->ls_stats, LU_SS_CREATED),
1907 		   ls_stats_read(s->ls_stats, LU_SS_CACHE_HIT),
1908 		   ls_stats_read(s->ls_stats, LU_SS_CACHE_MISS),
1909 		   ls_stats_read(s->ls_stats, LU_SS_CACHE_RACE),
1910 		   ls_stats_read(s->ls_stats, LU_SS_CACHE_DEATH_RACE),
1911 		   ls_stats_read(s->ls_stats, LU_SS_LRU_PURGED),
1912 		   ls_stats_read(s->ls_stats, LU_SS_LRU_LEN));
1913 	return 0;
1914 }
1915 EXPORT_SYMBOL(lu_site_stats_print);
1916 
1917 /**
1918  * Helper function to initialize a number of kmem slab caches at once.
1919  */
lu_kmem_init(struct lu_kmem_descr * caches)1920 int lu_kmem_init(struct lu_kmem_descr *caches)
1921 {
1922 	int result;
1923 	struct lu_kmem_descr *iter = caches;
1924 
1925 	for (result = 0; iter->ckd_cache != NULL; ++iter) {
1926 		*iter->ckd_cache = kmem_cache_create(iter->ckd_name,
1927 							iter->ckd_size,
1928 							0, 0, NULL);
1929 		if (*iter->ckd_cache == NULL) {
1930 			result = -ENOMEM;
1931 			/* free all previously allocated caches */
1932 			lu_kmem_fini(caches);
1933 			break;
1934 		}
1935 	}
1936 	return result;
1937 }
1938 EXPORT_SYMBOL(lu_kmem_init);
1939 
1940 /**
1941  * Helper function to finalize a number of kmem slab cached at once. Dual to
1942  * lu_kmem_init().
1943  */
lu_kmem_fini(struct lu_kmem_descr * caches)1944 void lu_kmem_fini(struct lu_kmem_descr *caches)
1945 {
1946 	for (; caches->ckd_cache != NULL; ++caches) {
1947 		kmem_cache_destroy(*caches->ckd_cache);
1948 		*caches->ckd_cache = NULL;
1949 	}
1950 }
1951 EXPORT_SYMBOL(lu_kmem_fini);
1952