• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36 
37 #define DEBUG_SUBSYSTEM S_LOV
38 
39 #include "../../include/linux/libcfs/libcfs.h"
40 
41 #include "../include/obd_class.h"
42 #include "../include/lustre/lustre_idl.h"
43 #include "lov_internal.h"
44 
lov_init_set(struct lov_request_set * set)45 static void lov_init_set(struct lov_request_set *set)
46 {
47 	set->set_count = 0;
48 	atomic_set(&set->set_completes, 0);
49 	atomic_set(&set->set_success, 0);
50 	atomic_set(&set->set_finish_checked, 0);
51 	set->set_cookies = NULL;
52 	INIT_LIST_HEAD(&set->set_list);
53 	atomic_set(&set->set_refcount, 1);
54 	init_waitqueue_head(&set->set_waitq);
55 	spin_lock_init(&set->set_lock);
56 }
57 
lov_finish_set(struct lov_request_set * set)58 void lov_finish_set(struct lov_request_set *set)
59 {
60 	struct list_head *pos, *n;
61 
62 	LASSERT(set);
63 	list_for_each_safe(pos, n, &set->set_list) {
64 		struct lov_request *req = list_entry(pos,
65 							 struct lov_request,
66 							 rq_link);
67 		list_del_init(&req->rq_link);
68 
69 		if (req->rq_oi.oi_oa)
70 			kmem_cache_free(obdo_cachep, req->rq_oi.oi_oa);
71 		kfree(req->rq_oi.oi_osfs);
72 		kfree(req);
73 	}
74 	kfree(set);
75 }
76 
lov_set_finished(struct lov_request_set * set,int idempotent)77 int lov_set_finished(struct lov_request_set *set, int idempotent)
78 {
79 	int completes = atomic_read(&set->set_completes);
80 
81 	CDEBUG(D_INFO, "check set %d/%d\n", completes, set->set_count);
82 
83 	if (completes == set->set_count) {
84 		if (idempotent)
85 			return 1;
86 		if (atomic_inc_return(&set->set_finish_checked) == 1)
87 			return 1;
88 	}
89 	return 0;
90 }
91 
lov_update_set(struct lov_request_set * set,struct lov_request * req,int rc)92 void lov_update_set(struct lov_request_set *set,
93 		    struct lov_request *req, int rc)
94 {
95 	req->rq_complete = 1;
96 	req->rq_rc = rc;
97 
98 	atomic_inc(&set->set_completes);
99 	if (rc == 0)
100 		atomic_inc(&set->set_success);
101 
102 	wake_up(&set->set_waitq);
103 }
104 
lov_update_common_set(struct lov_request_set * set,struct lov_request * req,int rc)105 int lov_update_common_set(struct lov_request_set *set,
106 			  struct lov_request *req, int rc)
107 {
108 	struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
109 
110 	lov_update_set(set, req, rc);
111 
112 	/* grace error on inactive ost */
113 	if (rc && !(lov->lov_tgts[req->rq_idx] &&
114 		    lov->lov_tgts[req->rq_idx]->ltd_active))
115 		rc = 0;
116 
117 	/* FIXME in raid1 regime, should return 0 */
118 	return rc;
119 }
120 
lov_set_add_req(struct lov_request * req,struct lov_request_set * set)121 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
122 {
123 	list_add_tail(&req->rq_link, &set->set_list);
124 	set->set_count++;
125 	req->rq_rqset = set;
126 }
127 
lov_check_set(struct lov_obd * lov,int idx)128 static int lov_check_set(struct lov_obd *lov, int idx)
129 {
130 	int rc;
131 	struct lov_tgt_desc *tgt;
132 
133 	mutex_lock(&lov->lov_lock);
134 	tgt = lov->lov_tgts[idx];
135 	rc = !tgt || tgt->ltd_active ||
136 		(tgt->ltd_exp &&
137 		 class_exp2cliimp(tgt->ltd_exp)->imp_connect_tried);
138 	mutex_unlock(&lov->lov_lock);
139 
140 	return rc;
141 }
142 
143 /* Check if the OSC connection exists and is active.
144  * If the OSC has not yet had a chance to connect to the OST the first time,
145  * wait once for it to connect instead of returning an error.
146  */
lov_check_and_wait_active(struct lov_obd * lov,int ost_idx)147 int lov_check_and_wait_active(struct lov_obd *lov, int ost_idx)
148 {
149 	wait_queue_head_t waitq;
150 	struct l_wait_info lwi;
151 	struct lov_tgt_desc *tgt;
152 	int rc = 0;
153 
154 	mutex_lock(&lov->lov_lock);
155 
156 	tgt = lov->lov_tgts[ost_idx];
157 
158 	if (unlikely(tgt == NULL)) {
159 		rc = 0;
160 		goto out;
161 	}
162 
163 	if (likely(tgt->ltd_active)) {
164 		rc = 1;
165 		goto out;
166 	}
167 
168 	if (tgt->ltd_exp && class_exp2cliimp(tgt->ltd_exp)->imp_connect_tried) {
169 		rc = 0;
170 		goto out;
171 	}
172 
173 	mutex_unlock(&lov->lov_lock);
174 
175 	init_waitqueue_head(&waitq);
176 	lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(obd_timeout),
177 				   cfs_time_seconds(1), NULL, NULL);
178 
179 	rc = l_wait_event(waitq, lov_check_set(lov, ost_idx), &lwi);
180 	if (tgt != NULL && tgt->ltd_active)
181 		return 1;
182 
183 	return 0;
184 
185 out:
186 	mutex_unlock(&lov->lov_lock);
187 	return rc;
188 }
189 
common_attr_done(struct lov_request_set * set)190 static int common_attr_done(struct lov_request_set *set)
191 {
192 	struct list_head *pos;
193 	struct lov_request *req;
194 	struct obdo *tmp_oa;
195 	int rc = 0, attrset = 0;
196 
197 	LASSERT(set->set_oi != NULL);
198 
199 	if (set->set_oi->oi_oa == NULL)
200 		return 0;
201 
202 	if (!atomic_read(&set->set_success))
203 		return -EIO;
204 
205 	tmp_oa = kmem_cache_alloc(obdo_cachep, GFP_NOFS | __GFP_ZERO);
206 	if (tmp_oa == NULL) {
207 		rc = -ENOMEM;
208 		goto out;
209 	}
210 
211 	list_for_each(pos, &set->set_list) {
212 		req = list_entry(pos, struct lov_request, rq_link);
213 
214 		if (!req->rq_complete || req->rq_rc)
215 			continue;
216 		if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
217 			continue;
218 		lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
219 				req->rq_oi.oi_oa->o_valid,
220 				set->set_oi->oi_md, req->rq_stripe, &attrset);
221 	}
222 	if (!attrset) {
223 		CERROR("No stripes had valid attrs\n");
224 		rc = -EIO;
225 	}
226 	if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
227 	    (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
228 		/* When we take attributes of some epoch, we require all the
229 		 * ost to be active. */
230 		CERROR("Not all the stripes had valid attrs\n");
231 		rc = -EIO;
232 		goto out;
233 	}
234 
235 	tmp_oa->o_oi = set->set_oi->oi_oa->o_oi;
236 	memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
237 out:
238 	if (tmp_oa)
239 		kmem_cache_free(obdo_cachep, tmp_oa);
240 	return rc;
241 
242 }
243 
lov_fini_getattr_set(struct lov_request_set * set)244 int lov_fini_getattr_set(struct lov_request_set *set)
245 {
246 	int rc = 0;
247 
248 	if (set == NULL)
249 		return 0;
250 	LASSERT(set->set_exp);
251 	if (atomic_read(&set->set_completes))
252 		rc = common_attr_done(set);
253 
254 	lov_put_reqset(set);
255 
256 	return rc;
257 }
258 
259 /* The callback for osc_getattr_async that finalizes a request info when a
260  * response is received. */
cb_getattr_update(void * cookie,int rc)261 static int cb_getattr_update(void *cookie, int rc)
262 {
263 	struct obd_info *oinfo = cookie;
264 	struct lov_request *lovreq;
265 
266 	lovreq = container_of(oinfo, struct lov_request, rq_oi);
267 	return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
268 }
269 
lov_prep_getattr_set(struct obd_export * exp,struct obd_info * oinfo,struct lov_request_set ** reqset)270 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
271 			 struct lov_request_set **reqset)
272 {
273 	struct lov_request_set *set;
274 	struct lov_obd *lov = &exp->exp_obd->u.lov;
275 	int rc = 0, i;
276 
277 	set = kzalloc(sizeof(*set), GFP_NOFS);
278 	if (!set)
279 		return -ENOMEM;
280 	lov_init_set(set);
281 
282 	set->set_exp = exp;
283 	set->set_oi = oinfo;
284 
285 	for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
286 		struct lov_oinfo *loi;
287 		struct lov_request *req;
288 
289 		loi = oinfo->oi_md->lsm_oinfo[i];
290 		if (lov_oinfo_is_dummy(loi))
291 			continue;
292 
293 		if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
294 			CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
295 			if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH) {
296 				/* SOM requires all the OSTs to be active. */
297 				rc = -EIO;
298 				goto out_set;
299 			}
300 			continue;
301 		}
302 
303 		req = kzalloc(sizeof(*req), GFP_NOFS);
304 		if (!req) {
305 			rc = -ENOMEM;
306 			goto out_set;
307 		}
308 
309 		req->rq_stripe = i;
310 		req->rq_idx = loi->loi_ost_idx;
311 
312 		req->rq_oi.oi_oa = kmem_cache_alloc(obdo_cachep,
313 						    GFP_NOFS | __GFP_ZERO);
314 		if (req->rq_oi.oi_oa == NULL) {
315 			kfree(req);
316 			rc = -ENOMEM;
317 			goto out_set;
318 		}
319 		memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
320 		       sizeof(*req->rq_oi.oi_oa));
321 		req->rq_oi.oi_oa->o_oi = loi->loi_oi;
322 		req->rq_oi.oi_cb_up = cb_getattr_update;
323 
324 		lov_set_add_req(req, set);
325 	}
326 	if (!set->set_count) {
327 		rc = -EIO;
328 		goto out_set;
329 	}
330 	*reqset = set;
331 	return rc;
332 out_set:
333 	lov_fini_getattr_set(set);
334 	return rc;
335 }
336 
lov_fini_destroy_set(struct lov_request_set * set)337 int lov_fini_destroy_set(struct lov_request_set *set)
338 {
339 	if (set == NULL)
340 		return 0;
341 	LASSERT(set->set_exp);
342 	if (atomic_read(&set->set_completes)) {
343 		/* FIXME update qos data here */
344 	}
345 
346 	lov_put_reqset(set);
347 
348 	return 0;
349 }
350 
lov_prep_destroy_set(struct obd_export * exp,struct obd_info * oinfo,struct obdo * src_oa,struct lov_stripe_md * lsm,struct obd_trans_info * oti,struct lov_request_set ** reqset)351 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
352 			 struct obdo *src_oa, struct lov_stripe_md *lsm,
353 			 struct obd_trans_info *oti,
354 			 struct lov_request_set **reqset)
355 {
356 	struct lov_request_set *set;
357 	struct lov_obd *lov = &exp->exp_obd->u.lov;
358 	int rc = 0, i;
359 
360 	set = kzalloc(sizeof(*set), GFP_NOFS);
361 	if (!set)
362 		return -ENOMEM;
363 	lov_init_set(set);
364 
365 	set->set_exp = exp;
366 	set->set_oi = oinfo;
367 	set->set_oi->oi_md = lsm;
368 	set->set_oi->oi_oa = src_oa;
369 	set->set_oti = oti;
370 	if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
371 		set->set_cookies = oti->oti_logcookies;
372 
373 	for (i = 0; i < lsm->lsm_stripe_count; i++) {
374 		struct lov_oinfo *loi;
375 		struct lov_request *req;
376 
377 		loi = lsm->lsm_oinfo[i];
378 		if (lov_oinfo_is_dummy(loi))
379 			continue;
380 
381 		if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
382 			CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
383 			continue;
384 		}
385 
386 		req = kzalloc(sizeof(*req), GFP_NOFS);
387 		if (!req) {
388 			rc = -ENOMEM;
389 			goto out_set;
390 		}
391 
392 		req->rq_stripe = i;
393 		req->rq_idx = loi->loi_ost_idx;
394 
395 		req->rq_oi.oi_oa = kmem_cache_alloc(obdo_cachep,
396 						    GFP_NOFS | __GFP_ZERO);
397 		if (req->rq_oi.oi_oa == NULL) {
398 			kfree(req);
399 			rc = -ENOMEM;
400 			goto out_set;
401 		}
402 		memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
403 		req->rq_oi.oi_oa->o_oi = loi->loi_oi;
404 		lov_set_add_req(req, set);
405 	}
406 	if (!set->set_count) {
407 		rc = -EIO;
408 		goto out_set;
409 	}
410 	*reqset = set;
411 	return rc;
412 out_set:
413 	lov_fini_destroy_set(set);
414 	return rc;
415 }
416 
lov_fini_setattr_set(struct lov_request_set * set)417 int lov_fini_setattr_set(struct lov_request_set *set)
418 {
419 	int rc = 0;
420 
421 	if (set == NULL)
422 		return 0;
423 	LASSERT(set->set_exp);
424 	if (atomic_read(&set->set_completes)) {
425 		rc = common_attr_done(set);
426 		/* FIXME update qos data here */
427 	}
428 
429 	lov_put_reqset(set);
430 	return rc;
431 }
432 
lov_update_setattr_set(struct lov_request_set * set,struct lov_request * req,int rc)433 int lov_update_setattr_set(struct lov_request_set *set,
434 			   struct lov_request *req, int rc)
435 {
436 	struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
437 	struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
438 
439 	lov_update_set(set, req, rc);
440 
441 	/* grace error on inactive ost */
442 	if (rc && !(lov->lov_tgts[req->rq_idx] &&
443 		    lov->lov_tgts[req->rq_idx]->ltd_active))
444 		rc = 0;
445 
446 	if (rc == 0) {
447 		if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
448 			lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
449 				req->rq_oi.oi_oa->o_ctime;
450 		if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
451 			lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
452 				req->rq_oi.oi_oa->o_mtime;
453 		if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
454 			lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
455 				req->rq_oi.oi_oa->o_atime;
456 	}
457 
458 	return rc;
459 }
460 
461 /* The callback for osc_setattr_async that finalizes a request info when a
462  * response is received. */
cb_setattr_update(void * cookie,int rc)463 static int cb_setattr_update(void *cookie, int rc)
464 {
465 	struct obd_info *oinfo = cookie;
466 	struct lov_request *lovreq;
467 
468 	lovreq = container_of(oinfo, struct lov_request, rq_oi);
469 	return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
470 }
471 
lov_prep_setattr_set(struct obd_export * exp,struct obd_info * oinfo,struct obd_trans_info * oti,struct lov_request_set ** reqset)472 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
473 			 struct obd_trans_info *oti,
474 			 struct lov_request_set **reqset)
475 {
476 	struct lov_request_set *set;
477 	struct lov_obd *lov = &exp->exp_obd->u.lov;
478 	int rc = 0, i;
479 
480 	set = kzalloc(sizeof(*set), GFP_NOFS);
481 	if (!set)
482 		return -ENOMEM;
483 	lov_init_set(set);
484 
485 	set->set_exp = exp;
486 	set->set_oti = oti;
487 	set->set_oi = oinfo;
488 	if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
489 		set->set_cookies = oti->oti_logcookies;
490 
491 	for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
492 		struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
493 		struct lov_request *req;
494 
495 		if (lov_oinfo_is_dummy(loi))
496 			continue;
497 
498 		if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
499 			CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
500 			continue;
501 		}
502 
503 		req = kzalloc(sizeof(*req), GFP_NOFS);
504 		if (!req) {
505 			rc = -ENOMEM;
506 			goto out_set;
507 		}
508 		req->rq_stripe = i;
509 		req->rq_idx = loi->loi_ost_idx;
510 
511 		req->rq_oi.oi_oa = kmem_cache_alloc(obdo_cachep,
512 						    GFP_NOFS | __GFP_ZERO);
513 		if (req->rq_oi.oi_oa == NULL) {
514 			kfree(req);
515 			rc = -ENOMEM;
516 			goto out_set;
517 		}
518 		memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
519 		       sizeof(*req->rq_oi.oi_oa));
520 		req->rq_oi.oi_oa->o_oi = loi->loi_oi;
521 		req->rq_oi.oi_oa->o_stripe_idx = i;
522 		req->rq_oi.oi_cb_up = cb_setattr_update;
523 
524 		if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
525 			int off = lov_stripe_offset(oinfo->oi_md,
526 						    oinfo->oi_oa->o_size, i,
527 						    &req->rq_oi.oi_oa->o_size);
528 
529 			if (off < 0 && req->rq_oi.oi_oa->o_size)
530 				req->rq_oi.oi_oa->o_size--;
531 
532 			CDEBUG(D_INODE, "stripe %d has size %llu/%llu\n",
533 			       i, req->rq_oi.oi_oa->o_size,
534 			       oinfo->oi_oa->o_size);
535 		}
536 		lov_set_add_req(req, set);
537 	}
538 	if (!set->set_count) {
539 		rc = -EIO;
540 		goto out_set;
541 	}
542 	*reqset = set;
543 	return rc;
544 out_set:
545 	lov_fini_setattr_set(set);
546 	return rc;
547 }
548 
549 #define LOV_U64_MAX ((__u64)~0ULL)
550 #define LOV_SUM_MAX(tot, add)					   \
551 	do {							    \
552 		if ((tot) + (add) < (tot))			      \
553 			(tot) = LOV_U64_MAX;			    \
554 		else						    \
555 			(tot) += (add);				 \
556 	} while (0)
557 
lov_fini_statfs(struct obd_device * obd,struct obd_statfs * osfs,int success)558 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,
559 		    int success)
560 {
561 	if (success) {
562 		__u32 expected_stripes = lov_get_stripecnt(&obd->u.lov,
563 							   LOV_MAGIC, 0);
564 		if (osfs->os_files != LOV_U64_MAX)
565 			lov_do_div64(osfs->os_files, expected_stripes);
566 		if (osfs->os_ffree != LOV_U64_MAX)
567 			lov_do_div64(osfs->os_ffree, expected_stripes);
568 
569 		spin_lock(&obd->obd_osfs_lock);
570 		memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
571 		obd->obd_osfs_age = cfs_time_current_64();
572 		spin_unlock(&obd->obd_osfs_lock);
573 		return 0;
574 	}
575 
576 	return -EIO;
577 }
578 
lov_fini_statfs_set(struct lov_request_set * set)579 int lov_fini_statfs_set(struct lov_request_set *set)
580 {
581 	int rc = 0;
582 
583 	if (set == NULL)
584 		return 0;
585 
586 	if (atomic_read(&set->set_completes)) {
587 		rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
588 				     atomic_read(&set->set_success));
589 	}
590 	lov_put_reqset(set);
591 	return rc;
592 }
593 
lov_update_statfs(struct obd_statfs * osfs,struct obd_statfs * lov_sfs,int success)594 void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
595 		       int success)
596 {
597 	int shift = 0, quit = 0;
598 	__u64 tmp;
599 
600 	if (success == 0) {
601 		memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
602 	} else {
603 		if (osfs->os_bsize != lov_sfs->os_bsize) {
604 			/* assume all block sizes are always powers of 2 */
605 			/* get the bits difference */
606 			tmp = osfs->os_bsize | lov_sfs->os_bsize;
607 			for (shift = 0; shift <= 64; ++shift) {
608 				if (tmp & 1) {
609 					if (quit)
610 						break;
611 					quit = 1;
612 					shift = 0;
613 				}
614 				tmp >>= 1;
615 			}
616 		}
617 
618 		if (osfs->os_bsize < lov_sfs->os_bsize) {
619 			osfs->os_bsize = lov_sfs->os_bsize;
620 
621 			osfs->os_bfree  >>= shift;
622 			osfs->os_bavail >>= shift;
623 			osfs->os_blocks >>= shift;
624 		} else if (shift != 0) {
625 			lov_sfs->os_bfree  >>= shift;
626 			lov_sfs->os_bavail >>= shift;
627 			lov_sfs->os_blocks >>= shift;
628 		}
629 		osfs->os_bfree += lov_sfs->os_bfree;
630 		osfs->os_bavail += lov_sfs->os_bavail;
631 		osfs->os_blocks += lov_sfs->os_blocks;
632 		/* XXX not sure about this one - depends on policy.
633 		 *   - could be minimum if we always stripe on all OBDs
634 		 *     (but that would be wrong for any other policy,
635 		 *     if one of the OBDs has no more objects left)
636 		 *   - could be sum if we stripe whole objects
637 		 *   - could be average, just to give a nice number
638 		 *
639 		 * To give a "reasonable" (if not wholly accurate)
640 		 * number, we divide the total number of free objects
641 		 * by expected stripe count (watch out for overflow).
642 		 */
643 		LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
644 		LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
645 	}
646 }
647 
648 /* The callback for osc_statfs_async that finalizes a request info when a
649  * response is received. */
cb_statfs_update(void * cookie,int rc)650 static int cb_statfs_update(void *cookie, int rc)
651 {
652 	struct obd_info *oinfo = cookie;
653 	struct lov_request *lovreq;
654 	struct lov_request_set *set;
655 	struct obd_statfs *osfs, *lov_sfs;
656 	struct lov_obd *lov;
657 	struct lov_tgt_desc *tgt;
658 	struct obd_device *lovobd, *tgtobd;
659 	int success;
660 
661 	lovreq = container_of(oinfo, struct lov_request, rq_oi);
662 	set = lovreq->rq_rqset;
663 	lovobd = set->set_obd;
664 	lov = &lovobd->u.lov;
665 	osfs = set->set_oi->oi_osfs;
666 	lov_sfs = oinfo->oi_osfs;
667 	success = atomic_read(&set->set_success);
668 	/* XXX: the same is done in lov_update_common_set, however
669 	   lovset->set_exp is not initialized. */
670 	lov_update_set(set, lovreq, rc);
671 	if (rc)
672 		goto out;
673 
674 	obd_getref(lovobd);
675 	tgt = lov->lov_tgts[lovreq->rq_idx];
676 	if (!tgt || !tgt->ltd_active)
677 		goto out_update;
678 
679 	tgtobd = class_exp2obd(tgt->ltd_exp);
680 	spin_lock(&tgtobd->obd_osfs_lock);
681 	memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
682 	if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
683 		tgtobd->obd_osfs_age = cfs_time_current_64();
684 	spin_unlock(&tgtobd->obd_osfs_lock);
685 
686 out_update:
687 	lov_update_statfs(osfs, lov_sfs, success);
688 	obd_putref(lovobd);
689 
690 out:
691 	if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
692 	    lov_set_finished(set, 0)) {
693 		lov_statfs_interpret(NULL, set, set->set_count !=
694 				     atomic_read(&set->set_success));
695 	}
696 
697 	return 0;
698 }
699 
lov_prep_statfs_set(struct obd_device * obd,struct obd_info * oinfo,struct lov_request_set ** reqset)700 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
701 			struct lov_request_set **reqset)
702 {
703 	struct lov_request_set *set;
704 	struct lov_obd *lov = &obd->u.lov;
705 	int rc = 0, i;
706 
707 	set = kzalloc(sizeof(*set), GFP_NOFS);
708 	if (!set)
709 		return -ENOMEM;
710 	lov_init_set(set);
711 
712 	set->set_obd = obd;
713 	set->set_oi = oinfo;
714 
715 	/* We only get block data from the OBD */
716 	for (i = 0; i < lov->desc.ld_tgt_count; i++) {
717 		struct lov_request *req;
718 
719 		if (lov->lov_tgts[i] == NULL ||
720 		    (!lov_check_and_wait_active(lov, i) &&
721 		     (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
722 			CDEBUG(D_HA, "lov idx %d inactive\n", i);
723 			continue;
724 		}
725 
726 		/* skip targets that have been explicitly disabled by the
727 		 * administrator */
728 		if (!lov->lov_tgts[i]->ltd_exp) {
729 			CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
730 			continue;
731 		}
732 
733 		req = kzalloc(sizeof(*req), GFP_NOFS);
734 		if (!req) {
735 			rc = -ENOMEM;
736 			goto out_set;
737 		}
738 
739 		req->rq_oi.oi_osfs = kzalloc(sizeof(*req->rq_oi.oi_osfs),
740 					     GFP_NOFS);
741 		if (!req->rq_oi.oi_osfs) {
742 			kfree(req);
743 			rc = -ENOMEM;
744 			goto out_set;
745 		}
746 
747 		req->rq_idx = i;
748 		req->rq_oi.oi_cb_up = cb_statfs_update;
749 		req->rq_oi.oi_flags = oinfo->oi_flags;
750 
751 		lov_set_add_req(req, set);
752 	}
753 	if (!set->set_count) {
754 		rc = -EIO;
755 		goto out_set;
756 	}
757 	*reqset = set;
758 	return rc;
759 out_set:
760 	lov_fini_statfs_set(set);
761 	return rc;
762 }
763