1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_LOV
43 #include <libcfs/libcfs.h>
45 #include <liblustre.h>
48 #include <obd_class.h>
50 #include <lustre/lustre_idl.h>
52 #include "lov_internal.h"
54 static void lov_init_set(struct lov_request_set *set)
57 set->set_completes = 0;
60 CFS_INIT_LIST_HEAD(&set->set_list);
61 atomic_set(&set->set_refcount, 1);
62 cfs_waitq_init(&set->set_waitq);
63 spin_lock_init(&set->set_lock);
66 void lov_finish_set(struct lov_request_set *set)
68 struct list_head *pos, *n;
72 list_for_each_safe(pos, n, &set->set_list) {
73 struct lov_request *req = list_entry(pos, struct lov_request,
75 list_del_init(&req->rq_link);
78 OBDO_FREE(req->rq_oi.oi_oa);
80 OBD_FREE(req->rq_oi.oi_md, req->rq_buflen);
81 if (req->rq_oi.oi_osfs)
82 OBD_FREE(req->rq_oi.oi_osfs,
83 sizeof(*req->rq_oi.oi_osfs));
84 OBD_FREE(req, sizeof(*req));
88 int len = set->set_oabufs * sizeof(*set->set_pga);
89 OBD_FREE(set->set_pga, len);
92 lov_llh_put(set->set_lockh);
94 OBD_FREE(set, sizeof(*set));
98 int lov_finished_set(struct lov_request_set *set)
100 CDEBUG(D_INFO, "check set %d/%d\n", set->set_completes,
102 return set->set_completes == set->set_count;
106 void lov_update_set(struct lov_request_set *set,
107 struct lov_request *req, int rc)
109 req->rq_complete = 1;
112 set->set_completes++;
116 cfs_waitq_signal(&set->set_waitq);
119 int lov_update_common_set(struct lov_request_set *set,
120 struct lov_request *req, int rc)
122 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
125 lov_update_set(set, req, rc);
127 /* grace error on inactive ost */
128 if (rc && !(lov->lov_tgts[req->rq_idx] &&
129 lov->lov_tgts[req->rq_idx]->ltd_active))
132 /* FIXME in raid1 regime, should return 0 */
136 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
138 list_add_tail(&req->rq_link, &set->set_list);
143 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
145 struct lov_request_set *set = req->rq_rqset;
146 struct lustre_handle *lov_lockhp;
147 struct lov_oinfo *loi;
150 LASSERT(set != NULL);
151 LASSERT(set->set_oi != NULL);
153 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
154 loi = set->set_oi->oi_md->lsm_oinfo[req->rq_stripe];
156 /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
157 * and that copy can be arbitrarily out of date.
159 * The LOV API is due for a serious rewriting anyways, and this
160 * can be addressed then. */
162 if (rc == ELDLM_OK) {
163 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
166 LASSERT(lock != NULL);
167 lov_stripe_lock(set->set_oi->oi_md);
168 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
169 tmp = loi->loi_lvb.lvb_size;
170 /* Extend KMS up to the end of this lock and no further
171 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
172 if (tmp > lock->l_policy_data.l_extent.end)
173 tmp = lock->l_policy_data.l_extent.end + 1;
174 if (tmp >= loi->loi_kms) {
175 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
176 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
178 loi->loi_kms_valid = 1;
180 LDLM_DEBUG(lock, "lock acquired, setting rss="
181 LPU64"; leaving kms="LPU64", end="LPU64,
182 loi->loi_lvb.lvb_size, loi->loi_kms,
183 lock->l_policy_data.l_extent.end);
185 lov_stripe_unlock(set->set_oi->oi_md);
186 ldlm_lock_allow_match(lock);
188 } else if ((rc == ELDLM_LOCK_ABORTED) &&
189 (set->set_oi->oi_flags & LDLM_FL_HAS_INTENT)) {
190 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
191 lov_stripe_lock(set->set_oi->oi_md);
192 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
193 lov_stripe_unlock(set->set_oi->oi_md);
194 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
195 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
198 struct obd_export *exp = set->set_exp;
199 struct lov_obd *lov = &exp->exp_obd->u.lov;
201 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
202 if (lov->lov_tgts[req->rq_idx] &&
203 lov->lov_tgts[req->rq_idx]->ltd_active) {
204 /* -EUSERS used by OST to report file contention */
205 if (rc != -EINTR && rc != -EUSERS)
206 CERROR("enqueue objid "LPX64" subobj "
207 LPX64" on OST idx %d: rc %d\n",
208 set->set_oi->oi_md->lsm_object_id,
209 loi->loi_id, loi->loi_ost_idx, rc);
214 lov_update_set(set, req, rc);
218 /* The callback for osc_enqueue that updates lov info for every OSC request. */
219 static int cb_update_enqueue(struct obd_info *oinfo, int rc)
221 struct ldlm_enqueue_info *einfo;
222 struct lov_request *lovreq;
224 lovreq = container_of(oinfo, struct lov_request, rq_oi);
225 einfo = lovreq->rq_rqset->set_ei;
226 return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
229 static int enqueue_done(struct lov_request_set *set, __u32 mode)
231 struct lov_request *req;
232 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
236 /* enqueue/match success, just return */
237 if (set->set_completes && set->set_completes == set->set_success)
240 /* cancel enqueued/matched locks */
241 list_for_each_entry(req, &set->set_list, rq_link) {
242 struct lustre_handle *lov_lockhp;
244 if (!req->rq_complete || req->rq_rc)
247 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
249 if (!lustre_handle_is_used(lov_lockhp))
252 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
253 req->rq_oi.oi_md, mode, lov_lockhp, 0, 0);
254 if (rc && lov->lov_tgts[req->rq_idx] &&
255 lov->lov_tgts[req->rq_idx]->ltd_active)
256 CERROR("cancelling obdjid "LPX64" on OST "
257 "idx %d error: rc = %d\n",
258 req->rq_oi.oi_md->lsm_object_id,
262 lov_llh_put(set->set_lockh);
266 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
267 struct ptlrpc_request_set *rqset)
274 LASSERT(set->set_exp);
275 /* Do enqueue_done only for sync requests and if any request
279 set->set_completes = 0;
280 ret = enqueue_done(set, mode);
281 } else if (set->set_lockh)
282 lov_llh_put(set->set_lockh);
286 RETURN(rc ? rc : ret);
289 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
290 struct ldlm_enqueue_info *einfo,
291 struct lov_request_set **reqset)
293 struct lov_obd *lov = &exp->exp_obd->u.lov;
294 struct lov_request_set *set;
296 struct lov_oinfo *loi;
299 OBD_ALLOC(set, sizeof(*set));
307 set->set_lockh = lov_llh_new(oinfo->oi_md);
308 if (set->set_lockh == NULL)
309 GOTO(out_set, rc = -ENOMEM);
310 oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
312 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
313 struct lov_request *req;
316 loi = oinfo->oi_md->lsm_oinfo[i];
317 if (!lov_stripe_intersects(oinfo->oi_md, i,
318 oinfo->oi_policy.l_extent.start,
319 oinfo->oi_policy.l_extent.end,
323 if (!lov->lov_tgts[loi->loi_ost_idx] ||
324 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
325 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
329 OBD_ALLOC(req, sizeof(*req));
331 GOTO(out_set, rc = -ENOMEM);
333 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
334 sizeof(struct lov_oinfo *) +
335 sizeof(struct lov_oinfo);
336 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
337 if (req->rq_oi.oi_md == NULL) {
338 OBD_FREE(req, sizeof(*req));
339 GOTO(out_set, rc = -ENOMEM);
341 req->rq_oi.oi_md->lsm_oinfo[0] =
342 ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
343 sizeof(struct lov_oinfo *);
345 /* Set lov request specific parameters. */
346 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
347 req->rq_oi.oi_cb_up = cb_update_enqueue;
348 req->rq_oi.oi_flags = oinfo->oi_flags;
350 LASSERT(req->rq_oi.oi_lockh);
352 req->rq_oi.oi_policy.l_extent.gid =
353 oinfo->oi_policy.l_extent.gid;
354 req->rq_oi.oi_policy.l_extent.start = start;
355 req->rq_oi.oi_policy.l_extent.end = end;
357 req->rq_idx = loi->loi_ost_idx;
360 /* XXX LOV STACKING: submd should be from the subobj */
361 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
362 req->rq_oi.oi_md->lsm_object_gr = loi->loi_gr;
363 req->rq_oi.oi_md->lsm_stripe_count = 0;
364 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
366 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
367 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
369 lov_set_add_req(req, set);
372 GOTO(out_set, rc = -EIO);
376 lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
380 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
390 lov_update_set(set, req, ret);
394 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
401 LASSERT(set->set_exp);
402 rc = enqueue_done(set, mode);
403 if ((set->set_count == set->set_success) &&
404 (flags & LDLM_FL_TEST_LOCK))
405 lov_llh_put(set->set_lockh);
412 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
413 struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
414 __u32 mode, struct lustre_handle *lockh,
415 struct lov_request_set **reqset)
417 struct lov_obd *lov = &exp->exp_obd->u.lov;
418 struct lov_request_set *set;
420 struct lov_oinfo *loi;
423 OBD_ALLOC(set, sizeof(*set));
430 set->set_oi->oi_md = lsm;
431 set->set_lockh = lov_llh_new(lsm);
432 if (set->set_lockh == NULL)
433 GOTO(out_set, rc = -ENOMEM);
434 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
436 for (i = 0; i < lsm->lsm_stripe_count; i++){
437 struct lov_request *req;
440 loi = lsm->lsm_oinfo[i];
441 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
442 policy->l_extent.end, &start, &end))
445 /* FIXME raid1 should grace this error */
446 if (!lov->lov_tgts[loi->loi_ost_idx] ||
447 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
448 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
449 GOTO(out_set, rc = -EIO);
452 OBD_ALLOC(req, sizeof(*req));
454 GOTO(out_set, rc = -ENOMEM);
456 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
457 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
458 if (req->rq_oi.oi_md == NULL) {
459 OBD_FREE(req, sizeof(*req));
460 GOTO(out_set, rc = -ENOMEM);
463 req->rq_oi.oi_policy.l_extent.start = start;
464 req->rq_oi.oi_policy.l_extent.end = end;
465 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
467 req->rq_idx = loi->loi_ost_idx;
470 /* XXX LOV STACKING: submd should be from the subobj */
471 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
472 req->rq_oi.oi_md->lsm_object_gr = loi->loi_gr;
473 req->rq_oi.oi_md->lsm_stripe_count = 0;
475 lov_set_add_req(req, set);
478 GOTO(out_set, rc = -EIO);
482 lov_fini_match_set(set, mode, 0);
486 int lov_fini_cancel_set(struct lov_request_set *set)
494 LASSERT(set->set_exp);
496 lov_llh_put(set->set_lockh);
503 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
504 struct lov_stripe_md *lsm, __u32 mode,
505 struct lustre_handle *lockh,
506 struct lov_request_set **reqset)
508 struct lov_request_set *set;
510 struct lov_oinfo *loi;
513 OBD_ALLOC(set, sizeof(*set));
520 set->set_oi->oi_md = lsm;
521 set->set_lockh = lov_handle2llh(lockh);
522 if (set->set_lockh == NULL) {
523 CERROR("LOV: invalid lov lock handle %p\n", lockh);
524 GOTO(out_set, rc = -EINVAL);
526 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
528 for (i = 0; i < lsm->lsm_stripe_count; i++){
529 struct lov_request *req;
530 struct lustre_handle *lov_lockhp;
532 loi = lsm->lsm_oinfo[i];
533 lov_lockhp = set->set_lockh->llh_handles + i;
534 if (!lustre_handle_is_used(lov_lockhp)) {
535 CDEBUG(D_INFO, "lov idx %d subobj "LPX64" no lock\n",
536 loi->loi_ost_idx, loi->loi_id);
540 OBD_ALLOC(req, sizeof(*req));
542 GOTO(out_set, rc = -ENOMEM);
544 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
545 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
546 if (req->rq_oi.oi_md == NULL) {
547 OBD_FREE(req, sizeof(*req));
548 GOTO(out_set, rc = -ENOMEM);
551 req->rq_idx = loi->loi_ost_idx;
554 /* XXX LOV STACKING: submd should be from the subobj */
555 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
556 req->rq_oi.oi_md->lsm_stripe_count = 0;
558 lov_set_add_req(req, set);
561 GOTO(out_set, rc = -EIO);
565 lov_fini_cancel_set(set);
569 static int create_done(struct obd_export *exp, struct lov_request_set *set,
570 struct lov_stripe_md **lsmp)
572 struct lov_obd *lov = &exp->exp_obd->u.lov;
573 struct obd_trans_info *oti = set->set_oti;
574 struct obdo *src_oa = set->set_oi->oi_oa;
575 struct lov_request *req;
576 struct obdo *ret_oa = NULL;
577 int attrset = 0, rc = 0;
580 LASSERT(set->set_completes);
582 /* try alloc objects on other osts if osc_create fails for
583 * exceptions: RPC failure, ENOSPC, etc */
584 if (set->set_count != set->set_success) {
585 list_for_each_entry (req, &set->set_list, rq_link) {
589 set->set_completes--;
590 req->rq_complete = 0;
592 rc = qos_remedy_create(set, req);
593 lov_update_create_set(set, req, rc);
597 /* no successful creates */
598 if (set->set_success == 0)
601 if (set->set_count != set->set_success) {
602 set->set_count = set->set_success;
608 GOTO(cleanup, rc = -ENOMEM);
610 list_for_each_entry(req, &set->set_list, rq_link) {
611 if (!req->rq_complete || req->rq_rc)
613 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
614 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
615 req->rq_stripe, &attrset);
617 if (src_oa->o_valid & OBD_MD_FLSIZE &&
618 ret_oa->o_size != src_oa->o_size) {
619 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
620 src_oa->o_size, ret_oa->o_size);
623 ret_oa->o_id = src_oa->o_id;
624 ret_oa->o_gr = src_oa->o_gr;
625 ret_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
626 memcpy(src_oa, ret_oa, sizeof(*src_oa));
629 *lsmp = set->set_oi->oi_md;
633 list_for_each_entry(req, &set->set_list, rq_link) {
634 struct obd_export *sub_exp;
637 if (!req->rq_complete || req->rq_rc)
640 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
641 err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL);
643 CERROR("Failed to uncreate objid "LPX64" subobj "
644 LPX64" on OST idx %d: rc = %d\n",
645 src_oa->o_id, req->rq_oi.oi_oa->o_id,
649 obd_free_memmd(exp, &set->set_oi->oi_md);
651 if (oti && set->set_cookies) {
652 oti->oti_logcookies = set->set_cookies;
653 if (!set->set_cookie_sent) {
654 oti_free_cookies(oti);
655 src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
657 src_oa->o_valid |= OBD_MD_FLCOOKIE;
663 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
670 LASSERT(set->set_exp);
671 if (set->set_completes)
672 rc = create_done(set->set_exp, set, lsmp);
678 int lov_update_create_set(struct lov_request_set *set,
679 struct lov_request *req, int rc)
681 struct obd_trans_info *oti = set->set_oti;
682 struct lov_stripe_md *lsm = set->set_oi->oi_md;
683 struct lov_oinfo *loi;
684 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
687 if (rc && lov->lov_tgts[req->rq_idx] &&
688 lov->lov_tgts[req->rq_idx]->ltd_active) {
689 CERROR("error creating fid "LPX64" sub-object"
690 " on OST idx %d/%d: rc = %d\n",
691 set->set_oi->oi_oa->o_id, req->rq_idx,
692 lsm->lsm_stripe_count, rc);
694 CERROR("obd_create returned invalid err %d\n", rc);
699 spin_lock(&set->set_lock);
700 req->rq_stripe = set->set_success;
701 loi = lsm->lsm_oinfo[req->rq_stripe];
703 lov_update_set(set, req, rc);
704 spin_unlock(&set->set_lock);
708 loi->loi_id = req->rq_oi.oi_oa->o_id;
709 loi->loi_gr = req->rq_oi.oi_oa->o_gr;
710 loi->loi_ost_idx = req->rq_idx;
713 if (oti && set->set_cookies)
714 ++oti->oti_logcookies;
715 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
716 set->set_cookie_sent++;
718 lov_update_set(set, req, rc);
719 spin_unlock(&set->set_lock);
721 CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
722 lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
727 int cb_create_update(struct obd_info *oinfo, int rc)
729 struct lov_request *lovreq;
731 lovreq = container_of(oinfo, struct lov_request, rq_oi);
732 rc= lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
733 if (lov_finished_set(lovreq->rq_rqset))
734 lov_put_reqset(lovreq->rq_rqset);
739 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
740 struct lov_stripe_md **lsmp, struct obdo *src_oa,
741 struct obd_trans_info *oti,
742 struct lov_request_set **reqset)
744 struct lov_request_set *set;
748 OBD_ALLOC(set, sizeof(*set));
755 set->set_oi->oi_md = *lsmp;
756 set->set_oi->oi_oa = src_oa;
760 rc = qos_prep_create(exp, set);
761 /* qos_shrink_lsm() may have allocated a new lsm */
762 *lsmp = oinfo->oi_md;
764 lov_fini_create_set(set, lsmp);
772 static int common_attr_done(struct lov_request_set *set)
774 struct list_head *pos;
775 struct lov_request *req;
777 int rc = 0, attrset = 0;
780 LASSERT(set->set_oi != NULL);
782 if (set->set_oi->oi_oa == NULL)
785 if (!set->set_success)
790 GOTO(out, rc = -ENOMEM);
792 list_for_each (pos, &set->set_list) {
793 req = list_entry(pos, struct lov_request, rq_link);
795 if (!req->rq_complete || req->rq_rc)
797 if (req->rq_oi.oi_oa->o_valid == 0) /* inactive stripe */
799 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
800 req->rq_oi.oi_oa->o_valid,
801 set->set_oi->oi_md, req->rq_stripe, &attrset);
804 CERROR("No stripes had valid attrs\n");
807 tmp_oa->o_id = set->set_oi->oi_oa->o_id;
808 tmp_oa->o_gr = set->set_oi->oi_oa->o_gr;
809 memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
817 static int brw_done(struct lov_request_set *set)
819 struct lov_stripe_md *lsm = set->set_oi->oi_md;
820 struct lov_oinfo *loi = NULL;
821 struct list_head *pos;
822 struct lov_request *req;
825 list_for_each (pos, &set->set_list) {
826 req = list_entry(pos, struct lov_request, rq_link);
828 if (!req->rq_complete || req->rq_rc)
831 loi = lsm->lsm_oinfo[req->rq_stripe];
833 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
834 loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
840 int lov_fini_brw_set(struct lov_request_set *set)
847 LASSERT(set->set_exp);
848 if (set->set_completes) {
850 /* FIXME update qos data here */
857 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
858 obd_count oa_bufs, struct brw_page *pga,
859 struct obd_trans_info *oti,
860 struct lov_request_set **reqset)
867 struct lov_request_set *set;
868 struct lov_oinfo *loi = NULL;
869 struct lov_obd *lov = &exp->exp_obd->u.lov;
870 int rc = 0, i, shift;
873 OBD_ALLOC(set, sizeof(*set));
881 set->set_oabufs = oa_bufs;
882 OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
884 GOTO(out, rc = -ENOMEM);
886 OBD_ALLOC(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
888 GOTO(out, rc = -ENOMEM);
890 /* calculate the page count for each stripe */
891 for (i = 0; i < oa_bufs; i++) {
892 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
893 info[stripe].count++;
896 /* alloc and initialize lov request */
898 for (i = 0 ; i < oinfo->oi_md->lsm_stripe_count; i++){
899 struct lov_request *req;
901 if (info[i].count == 0)
904 loi = oinfo->oi_md->lsm_oinfo[i];
905 if (!lov->lov_tgts[loi->loi_ost_idx] ||
906 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
907 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
908 GOTO(out, rc = -EIO);
911 OBD_ALLOC(req, sizeof(*req));
913 GOTO(out, rc = -ENOMEM);
915 OBDO_ALLOC(req->rq_oi.oi_oa);
916 if (req->rq_oi.oi_oa == NULL) {
917 OBD_FREE(req, sizeof(*req));
918 GOTO(out, rc = -ENOMEM);
922 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
923 sizeof(*req->rq_oi.oi_oa));
925 req->rq_oi.oi_oa->o_id = loi->loi_id;
926 req->rq_oi.oi_oa->o_gr = loi->loi_gr;
927 req->rq_oi.oi_oa->o_stripe_idx = i;
929 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
930 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
931 if (req->rq_oi.oi_md == NULL) {
932 OBDO_FREE(req->rq_oi.oi_oa);
933 OBD_FREE(req, sizeof(*req));
934 GOTO(out, rc = -ENOMEM);
937 req->rq_idx = loi->loi_ost_idx;
940 /* XXX LOV STACKING */
941 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
942 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
943 req->rq_oabufs = info[i].count;
944 req->rq_pgaidx = shift;
945 shift += req->rq_oabufs;
947 /* remember the index for sort brw_page array */
948 info[i].index = req->rq_pgaidx;
950 lov_set_add_req(req, set);
953 GOTO(out, rc = -EIO);
955 /* rotate & sort the brw_page array */
956 for (i = 0; i < oa_bufs; i++) {
957 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
959 shift = info[stripe].index + info[stripe].off;
960 LASSERT(shift < oa_bufs);
961 set->set_pga[shift] = pga[i];
962 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
963 &set->set_pga[shift].off);
968 OBD_FREE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
973 lov_fini_brw_set(set);
978 int lov_fini_getattr_set(struct lov_request_set *set)
985 LASSERT(set->set_exp);
986 if (set->set_completes)
987 rc = common_attr_done(set);
994 /* The callback for osc_getattr_async that finilizes a request info when a
995 * response is recieved. */
996 static int cb_getattr_update(struct obd_info *oinfo, int rc)
998 struct lov_request *lovreq;
999 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1000 return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
1003 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
1004 struct lov_request_set **reqset)
1006 struct lov_request_set *set;
1007 struct lov_oinfo *loi = NULL;
1008 struct lov_obd *lov = &exp->exp_obd->u.lov;
1012 OBD_ALLOC(set, sizeof(*set));
1018 set->set_oi = oinfo;
1020 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1021 struct lov_request *req;
1023 loi = oinfo->oi_md->lsm_oinfo[i];
1024 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1025 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1026 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1030 OBD_ALLOC(req, sizeof(*req));
1032 GOTO(out_set, rc = -ENOMEM);
1035 req->rq_idx = loi->loi_ost_idx;
1037 OBDO_ALLOC(req->rq_oi.oi_oa);
1038 if (req->rq_oi.oi_oa == NULL) {
1039 OBD_FREE(req, sizeof(*req));
1040 GOTO(out_set, rc = -ENOMEM);
1042 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1043 sizeof(*req->rq_oi.oi_oa));
1044 req->rq_oi.oi_oa->o_id = loi->loi_id;
1045 req->rq_oi.oi_oa->o_gr = loi->loi_gr;
1046 req->rq_oi.oi_cb_up = cb_getattr_update;
1048 lov_set_add_req(req, set);
1050 if (!set->set_count)
1051 GOTO(out_set, rc = -EIO);
1055 lov_fini_getattr_set(set);
1059 int lov_fini_destroy_set(struct lov_request_set *set)
1065 LASSERT(set->set_exp);
1066 if (set->set_completes) {
1067 /* FIXME update qos data here */
1070 lov_put_reqset(set);
1075 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1076 struct obdo *src_oa, struct lov_stripe_md *lsm,
1077 struct obd_trans_info *oti,
1078 struct lov_request_set **reqset)
1080 struct lov_request_set *set;
1081 struct lov_oinfo *loi = NULL;
1082 struct lov_obd *lov = &exp->exp_obd->u.lov;
1086 OBD_ALLOC(set, sizeof(*set));
1092 set->set_oi = oinfo;
1093 set->set_oi->oi_md = lsm;
1094 set->set_oi->oi_oa = src_oa;
1096 if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1097 set->set_cookies = oti->oti_logcookies;
1099 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1100 struct lov_request *req;
1102 loi = lsm->lsm_oinfo[i];
1103 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1104 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1105 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1109 OBD_ALLOC(req, sizeof(*req));
1111 GOTO(out_set, rc = -ENOMEM);
1114 req->rq_idx = loi->loi_ost_idx;
1116 OBDO_ALLOC(req->rq_oi.oi_oa);
1117 if (req->rq_oi.oi_oa == NULL) {
1118 OBD_FREE(req, sizeof(*req));
1119 GOTO(out_set, rc = -ENOMEM);
1121 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1122 req->rq_oi.oi_oa->o_id = loi->loi_id;
1123 req->rq_oi.oi_oa->o_gr = loi->loi_gr;
1124 lov_set_add_req(req, set);
1126 if (!set->set_count)
1127 GOTO(out_set, rc = -EIO);
1131 lov_fini_destroy_set(set);
1135 int lov_fini_setattr_set(struct lov_request_set *set)
1142 LASSERT(set->set_exp);
1143 if (set->set_completes) {
1144 rc = common_attr_done(set);
1145 /* FIXME update qos data here */
1148 lov_put_reqset(set);
1152 int lov_update_setattr_set(struct lov_request_set *set,
1153 struct lov_request *req, int rc)
1155 struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1156 struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1159 lov_update_set(set, req, rc);
1161 /* grace error on inactive ost */
1162 if (rc && !(lov->lov_tgts[req->rq_idx] &&
1163 lov->lov_tgts[req->rq_idx]->ltd_active))
1167 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1168 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1169 req->rq_oi.oi_oa->o_ctime;
1170 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1171 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1172 req->rq_oi.oi_oa->o_mtime;
1173 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1174 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1175 req->rq_oi.oi_oa->o_atime;
1181 /* The callback for osc_setattr_async that finilizes a request info when a
1182 * response is recieved. */
1183 static int cb_setattr_update(struct obd_info *oinfo, int rc)
1185 struct lov_request *lovreq;
1186 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1187 return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1190 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1191 struct obd_trans_info *oti,
1192 struct lov_request_set **reqset)
1194 struct lov_request_set *set;
1195 struct lov_oinfo *loi = NULL;
1196 struct lov_obd *lov = &exp->exp_obd->u.lov;
1200 OBD_ALLOC(set, sizeof(*set));
1207 set->set_oi = oinfo;
1208 if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1209 set->set_cookies = oti->oti_logcookies;
1211 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1212 struct lov_request *req;
1214 loi = oinfo->oi_md->lsm_oinfo[i];
1215 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1216 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1217 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1221 OBD_ALLOC(req, sizeof(*req));
1223 GOTO(out_set, rc = -ENOMEM);
1225 req->rq_idx = loi->loi_ost_idx;
1227 OBDO_ALLOC(req->rq_oi.oi_oa);
1228 if (req->rq_oi.oi_oa == NULL) {
1229 OBD_FREE(req, sizeof(*req));
1230 GOTO(out_set, rc = -ENOMEM);
1232 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1233 sizeof(*req->rq_oi.oi_oa));
1234 req->rq_oi.oi_oa->o_id = loi->loi_id;
1235 req->rq_oi.oi_oa->o_gr = loi->loi_gr;
1236 req->rq_oi.oi_oa->o_stripe_idx = i;
1237 req->rq_oi.oi_cb_up = cb_setattr_update;
1239 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1240 int off = lov_stripe_offset(oinfo->oi_md,
1241 oinfo->oi_oa->o_size, i,
1242 &req->rq_oi.oi_oa->o_size);
1244 if (off < 0 && req->rq_oi.oi_oa->o_size)
1245 req->rq_oi.oi_oa->o_size--;
1247 CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1248 i, req->rq_oi.oi_oa->o_size,
1249 oinfo->oi_oa->o_size);
1251 lov_set_add_req(req, set);
1253 if (!set->set_count)
1254 GOTO(out_set, rc = -EIO);
1258 lov_fini_setattr_set(set);
1262 int lov_fini_punch_set(struct lov_request_set *set)
1269 LASSERT(set->set_exp);
1270 if (set->set_completes) {
1272 /* FIXME update qos data here */
1273 if (set->set_success)
1274 rc = common_attr_done(set);
1277 lov_put_reqset(set);
1282 int lov_update_punch_set(struct lov_request_set *set,
1283 struct lov_request *req, int rc)
1285 struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1286 struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1289 lov_update_set(set, req, rc);
1291 /* grace error on inactive ost */
1292 if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1296 lov_stripe_lock(lsm);
1297 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1298 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1299 req->rq_oi.oi_oa->o_blocks;
1302 /* Do we need to update lvb_size here? It needn't because
1303 * it have been done in ll_truncate(). -jay */
1304 lov_stripe_unlock(lsm);
1310 /* The callback for osc_punch that finilizes a request info when a response
1312 static int cb_update_punch(struct obd_info *oinfo, int rc)
1314 struct lov_request *lovreq;
1315 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1316 return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1319 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1320 struct obd_trans_info *oti,
1321 struct lov_request_set **reqset)
1323 struct lov_request_set *set;
1324 struct lov_oinfo *loi = NULL;
1325 struct lov_obd *lov = &exp->exp_obd->u.lov;
1329 OBD_ALLOC(set, sizeof(*set));
1334 set->set_oi = oinfo;
1337 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1338 struct lov_request *req;
1341 if (!lov_stripe_intersects(oinfo->oi_md, i,
1342 oinfo->oi_policy.l_extent.start,
1343 oinfo->oi_policy.l_extent.end,
1347 loi = oinfo->oi_md->lsm_oinfo[i];
1348 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1349 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1350 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1351 GOTO(out_set, rc = -EIO);
1354 OBD_ALLOC(req, sizeof(*req));
1356 GOTO(out_set, rc = -ENOMEM);
1358 req->rq_idx = loi->loi_ost_idx;
1360 OBDO_ALLOC(req->rq_oi.oi_oa);
1361 if (req->rq_oi.oi_oa == NULL) {
1362 OBD_FREE(req, sizeof(*req));
1363 GOTO(out_set, rc = -ENOMEM);
1365 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1366 sizeof(*req->rq_oi.oi_oa));
1367 req->rq_oi.oi_oa->o_id = loi->loi_id;
1368 req->rq_oi.oi_oa->o_gr = loi->loi_gr;
1369 req->rq_oi.oi_oa->o_stripe_idx = i;
1370 req->rq_oi.oi_cb_up = cb_update_punch;
1372 req->rq_oi.oi_policy.l_extent.start = rs;
1373 req->rq_oi.oi_policy.l_extent.end = re;
1374 req->rq_oi.oi_policy.l_extent.gid = -1;
1376 lov_set_add_req(req, set);
1378 if (!set->set_count)
1379 GOTO(out_set, rc = -EIO);
1383 lov_fini_punch_set(set);
1387 int lov_fini_sync_set(struct lov_request_set *set)
1394 LASSERT(set->set_exp);
1395 if (set->set_completes) {
1396 if (!set->set_success)
1398 /* FIXME update qos data here */
1401 lov_put_reqset(set);
1406 /* The callback for osc_sync that finilizes a request info when a
1407 * response is recieved. */
1408 static int cb_sync_update(struct obd_info *oinfo, int rc)
1410 struct lov_request *lovreq;
1411 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1412 return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
1415 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1416 obd_off start, obd_off end,
1417 struct lov_request_set **reqset)
1419 struct lov_request_set *set;
1420 struct lov_oinfo *loi = NULL;
1421 struct lov_obd *lov = &exp->exp_obd->u.lov;
1425 OBD_ALLOC(set, sizeof(*set));
1431 set->set_oi = oinfo;
1432 set->set_oi->oi_md = oinfo->oi_md;
1433 set->set_oi->oi_oa = oinfo->oi_oa;
1435 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1436 struct lov_request *req;
1439 loi = oinfo->oi_md->lsm_oinfo[i];
1440 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1441 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1442 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1446 if (!lov_stripe_intersects(oinfo->oi_md, i, start,
1450 OBD_ALLOC(req, sizeof(*req));
1452 GOTO(out_set, rc = -ENOMEM);
1454 req->rq_idx = loi->loi_ost_idx;
1456 OBDO_ALLOC(req->rq_oi.oi_oa);
1457 if (req->rq_oi.oi_oa == NULL) {
1458 OBD_FREE(req, sizeof(*req));
1459 GOTO(out_set, rc = -ENOMEM);
1461 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1462 sizeof(*req->rq_oi.oi_oa));
1463 req->rq_oi.oi_oa->o_id = loi->loi_id;
1464 req->rq_oi.oi_oa->o_gr = loi->loi_gr;
1465 req->rq_oi.oi_oa->o_stripe_idx = i;
1467 req->rq_oi.oi_policy.l_extent.start = rs;
1468 req->rq_oi.oi_policy.l_extent.end = re;
1469 req->rq_oi.oi_policy.l_extent.gid = -1;
1470 req->rq_oi.oi_cb_up = cb_sync_update;
1472 lov_set_add_req(req, set);
1474 if (!set->set_count)
1475 GOTO(out_set, rc = -EIO);
1479 lov_fini_sync_set(set);
1483 #define LOV_U64_MAX ((__u64)~0ULL)
1484 #define LOV_SUM_MAX(tot, add) \
1486 if ((tot) + (add) < (tot)) \
1487 (tot) = LOV_U64_MAX; \
1492 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1497 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0);
1499 if (osfs->os_files != LOV_U64_MAX)
1500 do_div(osfs->os_files, expected_stripes);
1501 if (osfs->os_ffree != LOV_U64_MAX)
1502 do_div(osfs->os_ffree, expected_stripes);
1504 spin_lock(&obd->obd_osfs_lock);
1505 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1506 obd->obd_osfs_age = cfs_time_current_64();
1507 spin_unlock(&obd->obd_osfs_lock);
1514 int lov_fini_statfs_set(struct lov_request_set *set)
1522 if (set->set_completes) {
1523 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1526 lov_put_reqset(set);
1530 void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
1533 int shift = 0, quit = 0;
1537 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1539 if (osfs->os_bsize != lov_sfs->os_bsize) {
1540 /* assume all block sizes are always powers of 2 */
1541 /* get the bits difference */
1542 tmp = osfs->os_bsize | lov_sfs->os_bsize;
1543 for (shift = 0; shift <= 64; ++shift) {
1555 if (osfs->os_bsize < lov_sfs->os_bsize) {
1556 osfs->os_bsize = lov_sfs->os_bsize;
1558 osfs->os_bfree >>= shift;
1559 osfs->os_bavail >>= shift;
1560 osfs->os_blocks >>= shift;
1561 } else if (shift != 0) {
1562 lov_sfs->os_bfree >>= shift;
1563 lov_sfs->os_bavail >>= shift;
1564 lov_sfs->os_blocks >>= shift;
1567 /* Sandia requested that df (and so, statfs) only
1568 returned minimal available space on
1569 a single OST, so people would be able to
1570 write this much data guaranteed. */
1571 if (osfs->os_bavail > lov_sfs->os_bavail) {
1572 /* Presumably if new bavail is smaller,
1573 new bfree is bigger as well */
1574 osfs->os_bfree = lov_sfs->os_bfree;
1575 osfs->os_bavail = lov_sfs->os_bavail;
1578 osfs->os_bfree += lov_sfs->os_bfree;
1579 osfs->os_bavail += lov_sfs->os_bavail;
1581 osfs->os_blocks += lov_sfs->os_blocks;
1582 /* XXX not sure about this one - depends on policy.
1583 * - could be minimum if we always stripe on all OBDs
1584 * (but that would be wrong for any other policy,
1585 * if one of the OBDs has no more objects left)
1586 * - could be sum if we stripe whole objects
1587 * - could be average, just to give a nice number
1589 * To give a "reasonable" (if not wholly accurate)
1590 * number, we divide the total number of free objects
1591 * by expected stripe count (watch out for overflow).
1593 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1594 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1598 /* The callback for osc_statfs_async that finilizes a request info when a
1599 * response is received. */
1600 static int cb_statfs_update(struct obd_info *oinfo, int rc)
1602 struct lov_request *lovreq;
1603 struct obd_statfs *osfs, *lov_sfs;
1604 struct obd_device *obd;
1605 struct lov_obd *lov;
1609 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1610 lov = &lovreq->rq_rqset->set_obd->u.lov;
1611 obd = class_exp2obd(lov->lov_tgts[lovreq->rq_idx]->ltd_exp);
1613 osfs = lovreq->rq_rqset->set_oi->oi_osfs;
1614 lov_sfs = oinfo->oi_osfs;
1616 success = lovreq->rq_rqset->set_success;
1617 /* XXX: the same is done in lov_update_common_set, however
1618 lovset->set_exp is not initialized. */
1619 lov_update_set(lovreq->rq_rqset, lovreq, rc);
1621 /* XXX ignore error for disconnected ost ? */
1622 if (rc && !(lov->lov_tgts[lovreq->rq_idx] &&
1623 lov->lov_tgts[lovreq->rq_idx]->ltd_active))
1628 spin_lock(&obd->obd_osfs_lock);
1629 memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1630 if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
1631 obd->obd_osfs_age = cfs_time_current_64();
1632 spin_unlock(&obd->obd_osfs_lock);
1634 lov_update_statfs(osfs, lov_sfs, success);
1637 if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
1638 lov_finished_set(lovreq->rq_rqset)) {
1639 lov_statfs_interpret(NULL, lovreq->rq_rqset,
1640 lovreq->rq_rqset->set_success !=
1641 lovreq->rq_rqset->set_count);
1642 qos_statfs_done(lov);
1648 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1649 struct lov_request_set **reqset)
1651 struct lov_request_set *set;
1652 struct lov_obd *lov = &obd->u.lov;
1656 OBD_ALLOC(set, sizeof(*set));
1662 set->set_oi = oinfo;
1664 /* We only get block data from the OBD */
1665 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1666 struct lov_request *req;
1668 if (!lov->lov_tgts[i] || (!lov->lov_tgts[i]->ltd_active
1669 && (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
1670 CDEBUG(D_HA, "lov idx %d inactive\n", i);
1674 /* skip targets that have been explicitely disabled by the
1676 if (!lov->lov_tgts[i]->ltd_exp) {
1677 CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
1681 OBD_ALLOC(req, sizeof(*req));
1683 GOTO(out_set, rc = -ENOMEM);
1685 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1686 if (req->rq_oi.oi_osfs == NULL) {
1687 OBD_FREE(req, sizeof(*req));
1688 GOTO(out_set, rc = -ENOMEM);
1692 req->rq_oi.oi_cb_up = cb_statfs_update;
1693 req->rq_oi.oi_flags = oinfo->oi_flags;
1695 lov_set_add_req(req, set);
1697 if (!set->set_count)
1698 GOTO(out_set, rc = -EIO);
1702 lov_fini_statfs_set(set);