1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_LOV
43 #include <libcfs/libcfs.h>
45 #include <liblustre.h>
48 #include <obd_class.h>
50 #include <lustre/lustre_idl.h>
52 #include "lov_internal.h"
54 static void lov_init_set(struct lov_request_set *set)
57 set->set_completes = 0;
60 CFS_INIT_LIST_HEAD(&set->set_list);
61 atomic_set(&set->set_refcount, 1);
62 cfs_waitq_init(&set->set_waitq);
63 spin_lock_init(&set->set_lock);
66 void lov_finish_set(struct lov_request_set *set)
68 struct list_head *pos, *n;
72 list_for_each_safe(pos, n, &set->set_list) {
73 struct lov_request *req = list_entry(pos, struct lov_request,
75 list_del_init(&req->rq_link);
78 OBDO_FREE(req->rq_oi.oi_oa);
80 OBD_FREE(req->rq_oi.oi_md, req->rq_buflen);
81 if (req->rq_oi.oi_osfs)
82 OBD_FREE(req->rq_oi.oi_osfs,
83 sizeof(*req->rq_oi.oi_osfs));
84 OBD_FREE(req, sizeof(*req));
88 int len = set->set_oabufs * sizeof(*set->set_pga);
89 OBD_FREE(set->set_pga, len);
92 lov_llh_put(set->set_lockh);
94 OBD_FREE(set, sizeof(*set));
98 int lov_finished_set(struct lov_request_set *set)
100 CDEBUG(D_INFO, "check set %d/%d\n", set->set_completes,
102 return set->set_completes == set->set_count;
106 void lov_update_set(struct lov_request_set *set,
107 struct lov_request *req, int rc)
109 req->rq_complete = 1;
112 set->set_completes++;
116 cfs_waitq_signal(&set->set_waitq);
119 int lov_update_common_set(struct lov_request_set *set,
120 struct lov_request *req, int rc)
122 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
125 lov_update_set(set, req, rc);
127 /* grace error on inactive ost */
128 if (rc && !(lov->lov_tgts[req->rq_idx] &&
129 lov->lov_tgts[req->rq_idx]->ltd_active))
132 /* FIXME in raid1 regime, should return 0 */
136 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
138 list_add_tail(&req->rq_link, &set->set_list);
143 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
145 struct lov_request_set *set = req->rq_rqset;
146 struct lustre_handle *lov_lockhp;
147 struct lov_oinfo *loi;
150 LASSERT(set != NULL);
151 LASSERT(set->set_oi != NULL);
153 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
154 loi = set->set_oi->oi_md->lsm_oinfo[req->rq_stripe];
156 /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
157 * and that copy can be arbitrarily out of date.
159 * The LOV API is due for a serious rewriting anyways, and this
160 * can be addressed then. */
162 if (rc == ELDLM_OK) {
163 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
166 LASSERT(lock != NULL);
167 lov_stripe_lock(set->set_oi->oi_md);
168 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
169 tmp = loi->loi_lvb.lvb_size;
170 /* Extend KMS up to the end of this lock and no further
171 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
172 if (tmp > lock->l_policy_data.l_extent.end)
173 tmp = lock->l_policy_data.l_extent.end + 1;
174 if (tmp >= loi->loi_kms) {
175 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
176 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
178 loi->loi_kms_valid = 1;
180 LDLM_DEBUG(lock, "lock acquired, setting rss="
181 LPU64"; leaving kms="LPU64", end="LPU64,
182 loi->loi_lvb.lvb_size, loi->loi_kms,
183 lock->l_policy_data.l_extent.end);
185 lov_stripe_unlock(set->set_oi->oi_md);
186 ldlm_lock_allow_match(lock);
188 } else if ((rc == ELDLM_LOCK_ABORTED) &&
189 (set->set_oi->oi_flags & LDLM_FL_HAS_INTENT)) {
190 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
191 lov_stripe_lock(set->set_oi->oi_md);
192 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
193 lov_stripe_unlock(set->set_oi->oi_md);
194 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
195 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
198 struct obd_export *exp = set->set_exp;
199 struct lov_obd *lov = &exp->exp_obd->u.lov;
201 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
202 if (lov->lov_tgts[req->rq_idx] &&
203 lov->lov_tgts[req->rq_idx]->ltd_active) {
204 /* -EUSERS used by OST to report file contention */
205 if (rc != -EINTR && rc != -EUSERS)
206 CERROR("enqueue objid "LPX64" subobj "
207 LPX64" on OST idx %d: rc %d\n",
208 set->set_oi->oi_md->lsm_object_id,
209 loi->loi_id, loi->loi_ost_idx, rc);
214 lov_update_set(set, req, rc);
218 /* The callback for osc_enqueue that updates lov info for every OSC request. */
219 static int cb_update_enqueue(struct obd_info *oinfo, int rc)
221 struct ldlm_enqueue_info *einfo;
222 struct lov_request *lovreq;
224 lovreq = container_of(oinfo, struct lov_request, rq_oi);
225 einfo = lovreq->rq_rqset->set_ei;
226 return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
229 static int enqueue_done(struct lov_request_set *set, __u32 mode)
231 struct lov_request *req;
232 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
236 /* enqueue/match success, just return */
237 if (set->set_completes && set->set_completes == set->set_success)
240 /* cancel enqueued/matched locks */
241 list_for_each_entry(req, &set->set_list, rq_link) {
242 struct lustre_handle *lov_lockhp;
244 if (!req->rq_complete || req->rq_rc)
247 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
249 if (!lustre_handle_is_used(lov_lockhp))
252 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
253 req->rq_oi.oi_md, mode, lov_lockhp, 0, 0);
254 if (rc && lov->lov_tgts[req->rq_idx] &&
255 lov->lov_tgts[req->rq_idx]->ltd_active)
256 CERROR("cancelling obdjid "LPX64" on OST "
257 "idx %d error: rc = %d\n",
258 req->rq_oi.oi_md->lsm_object_id,
262 lov_llh_put(set->set_lockh);
266 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
267 struct ptlrpc_request_set *rqset)
274 LASSERT(set->set_exp);
275 /* Do enqueue_done only for sync requests and if any request
279 set->set_completes = 0;
280 ret = enqueue_done(set, mode);
281 } else if (set->set_lockh)
282 lov_llh_put(set->set_lockh);
286 RETURN(rc ? rc : ret);
289 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
290 struct ldlm_enqueue_info *einfo,
291 struct lov_request_set **reqset)
293 struct lov_obd *lov = &exp->exp_obd->u.lov;
294 struct lov_request_set *set;
296 struct lov_oinfo *loi;
299 OBD_ALLOC(set, sizeof(*set));
307 set->set_lockh = lov_llh_new(oinfo->oi_md);
308 if (set->set_lockh == NULL)
309 GOTO(out_set, rc = -ENOMEM);
310 oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
312 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
313 struct lov_request *req;
316 loi = oinfo->oi_md->lsm_oinfo[i];
317 if (!lov_stripe_intersects(oinfo->oi_md, i,
318 oinfo->oi_policy.l_extent.start,
319 oinfo->oi_policy.l_extent.end,
323 if (!lov->lov_tgts[loi->loi_ost_idx] ||
324 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
325 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
329 OBD_ALLOC(req, sizeof(*req));
331 GOTO(out_set, rc = -ENOMEM);
333 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
334 sizeof(struct lov_oinfo *) +
335 sizeof(struct lov_oinfo);
336 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
337 if (req->rq_oi.oi_md == NULL) {
338 OBD_FREE(req, sizeof(*req));
339 GOTO(out_set, rc = -ENOMEM);
341 req->rq_oi.oi_md->lsm_oinfo[0] =
342 ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
343 sizeof(struct lov_oinfo *);
345 /* Set lov request specific parameters. */
346 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
347 req->rq_oi.oi_cb_up = cb_update_enqueue;
348 req->rq_oi.oi_flags = oinfo->oi_flags;
350 LASSERT(req->rq_oi.oi_lockh);
352 req->rq_oi.oi_policy.l_extent.gid =
353 oinfo->oi_policy.l_extent.gid;
354 req->rq_oi.oi_policy.l_extent.start = start;
355 req->rq_oi.oi_policy.l_extent.end = end;
357 req->rq_idx = loi->loi_ost_idx;
360 /* XXX LOV STACKING: submd should be from the subobj */
361 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
362 req->rq_oi.oi_md->lsm_object_gr = loi->loi_gr;
363 req->rq_oi.oi_md->lsm_stripe_count = 0;
364 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
366 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
367 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
369 lov_set_add_req(req, set);
372 GOTO(out_set, rc = -EIO);
376 lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
380 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
390 lov_update_set(set, req, ret);
394 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
401 LASSERT(set->set_exp);
402 rc = enqueue_done(set, mode);
403 if ((set->set_count == set->set_success) &&
404 (flags & LDLM_FL_TEST_LOCK))
405 lov_llh_put(set->set_lockh);
412 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
413 struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
414 __u32 mode, struct lustre_handle *lockh,
415 struct lov_request_set **reqset)
417 struct lov_obd *lov = &exp->exp_obd->u.lov;
418 struct lov_request_set *set;
420 struct lov_oinfo *loi;
423 OBD_ALLOC(set, sizeof(*set));
430 set->set_oi->oi_md = lsm;
431 set->set_lockh = lov_llh_new(lsm);
432 if (set->set_lockh == NULL)
433 GOTO(out_set, rc = -ENOMEM);
434 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
436 for (i = 0; i < lsm->lsm_stripe_count; i++){
437 struct lov_request *req;
440 loi = lsm->lsm_oinfo[i];
441 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
442 policy->l_extent.end, &start, &end))
445 /* FIXME raid1 should grace this error */
446 if (!lov->lov_tgts[loi->loi_ost_idx] ||
447 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
448 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
449 GOTO(out_set, rc = -EIO);
452 OBD_ALLOC(req, sizeof(*req));
454 GOTO(out_set, rc = -ENOMEM);
456 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
457 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
458 if (req->rq_oi.oi_md == NULL) {
459 OBD_FREE(req, sizeof(*req));
460 GOTO(out_set, rc = -ENOMEM);
463 req->rq_oi.oi_policy.l_extent.start = start;
464 req->rq_oi.oi_policy.l_extent.end = end;
465 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
467 req->rq_idx = loi->loi_ost_idx;
470 /* XXX LOV STACKING: submd should be from the subobj */
471 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
472 req->rq_oi.oi_md->lsm_object_gr = loi->loi_gr;
473 req->rq_oi.oi_md->lsm_stripe_count = 0;
475 lov_set_add_req(req, set);
478 GOTO(out_set, rc = -EIO);
482 lov_fini_match_set(set, mode, 0);
486 int lov_fini_cancel_set(struct lov_request_set *set)
494 LASSERT(set->set_exp);
496 lov_llh_put(set->set_lockh);
503 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
504 struct lov_stripe_md *lsm, __u32 mode,
505 struct lustre_handle *lockh,
506 struct lov_request_set **reqset)
508 struct lov_request_set *set;
510 struct lov_oinfo *loi;
513 OBD_ALLOC(set, sizeof(*set));
520 set->set_oi->oi_md = lsm;
521 set->set_lockh = lov_handle2llh(lockh);
522 if (set->set_lockh == NULL) {
523 CERROR("LOV: invalid lov lock handle %p\n", lockh);
524 GOTO(out_set, rc = -EINVAL);
526 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
528 for (i = 0; i < lsm->lsm_stripe_count; i++){
529 struct lov_request *req;
530 struct lustre_handle *lov_lockhp;
532 loi = lsm->lsm_oinfo[i];
533 lov_lockhp = set->set_lockh->llh_handles + i;
534 if (!lustre_handle_is_used(lov_lockhp)) {
535 CDEBUG(D_INFO, "lov idx %d subobj "LPX64" no lock\n",
536 loi->loi_ost_idx, loi->loi_id);
540 OBD_ALLOC(req, sizeof(*req));
542 GOTO(out_set, rc = -ENOMEM);
544 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
545 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
546 if (req->rq_oi.oi_md == NULL) {
547 OBD_FREE(req, sizeof(*req));
548 GOTO(out_set, rc = -ENOMEM);
551 req->rq_idx = loi->loi_ost_idx;
554 /* XXX LOV STACKING: submd should be from the subobj */
555 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
556 req->rq_oi.oi_md->lsm_stripe_count = 0;
558 lov_set_add_req(req, set);
561 GOTO(out_set, rc = -EIO);
565 lov_fini_cancel_set(set);
569 static int create_done(struct obd_export *exp, struct lov_request_set *set,
570 struct lov_stripe_md **lsmp)
572 struct lov_obd *lov = &exp->exp_obd->u.lov;
573 struct obd_trans_info *oti = set->set_oti;
574 struct obdo *src_oa = set->set_oi->oi_oa;
575 struct lov_request *req;
576 struct obdo *ret_oa = NULL;
577 int attrset = 0, rc = 0;
580 LASSERT(set->set_completes);
582 /* try alloc objects on other osts if osc_create fails for
583 * exceptions: RPC failure, ENOSPC, etc */
584 if (set->set_count != set->set_success) {
585 list_for_each_entry (req, &set->set_list, rq_link) {
589 set->set_completes--;
590 req->rq_complete = 0;
592 rc = qos_remedy_create(set, req);
593 lov_update_create_set(set, req, rc);
597 /* no successful creates */
598 if (set->set_success == 0)
601 if (set->set_count != set->set_success) {
602 set->set_count = set->set_success;
608 GOTO(cleanup, rc = -ENOMEM);
610 list_for_each_entry(req, &set->set_list, rq_link) {
611 if (!req->rq_complete || req->rq_rc)
613 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
614 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
615 req->rq_stripe, &attrset);
617 if (src_oa->o_valid & OBD_MD_FLSIZE &&
618 ret_oa->o_size != src_oa->o_size) {
619 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
620 src_oa->o_size, ret_oa->o_size);
623 ret_oa->o_id = src_oa->o_id;
624 memcpy(src_oa, ret_oa, sizeof(*src_oa));
627 *lsmp = set->set_oi->oi_md;
631 list_for_each_entry(req, &set->set_list, rq_link) {
632 struct obd_export *sub_exp;
635 if (!req->rq_complete || req->rq_rc)
638 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
639 err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL);
641 CERROR("Failed to uncreate objid "LPX64" subobj "
642 LPX64" on OST idx %d: rc = %d\n",
643 src_oa->o_id, req->rq_oi.oi_oa->o_id,
647 obd_free_memmd(exp, &set->set_oi->oi_md);
649 if (oti && set->set_cookies) {
650 oti->oti_logcookies = set->set_cookies;
651 if (!set->set_cookie_sent) {
652 oti_free_cookies(oti);
653 src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
655 src_oa->o_valid |= OBD_MD_FLCOOKIE;
661 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
668 LASSERT(set->set_exp);
669 if (set->set_completes)
670 rc = create_done(set->set_exp, set, lsmp);
676 int lov_update_create_set(struct lov_request_set *set,
677 struct lov_request *req, int rc)
679 struct obd_trans_info *oti = set->set_oti;
680 struct lov_stripe_md *lsm = set->set_oi->oi_md;
681 struct lov_oinfo *loi;
682 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
685 if (rc && lov->lov_tgts[req->rq_idx] &&
686 lov->lov_tgts[req->rq_idx]->ltd_active) {
687 CERROR("error creating fid "LPX64" sub-object"
688 " on OST idx %d/%d: rc = %d\n",
689 set->set_oi->oi_oa->o_id, req->rq_idx,
690 lsm->lsm_stripe_count, rc);
692 CERROR("obd_create returned invalid err %d\n", rc);
697 spin_lock(&set->set_lock);
698 req->rq_stripe = set->set_success;
699 loi = lsm->lsm_oinfo[req->rq_stripe];
701 lov_update_set(set, req, rc);
702 spin_unlock(&set->set_lock);
706 loi->loi_id = req->rq_oi.oi_oa->o_id;
707 loi->loi_ost_idx = req->rq_idx;
710 if (oti && set->set_cookies)
711 ++oti->oti_logcookies;
712 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
713 set->set_cookie_sent++;
715 lov_update_set(set, req, rc);
716 spin_unlock(&set->set_lock);
718 CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
719 lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
724 int cb_create_update(struct obd_info *oinfo, int rc)
726 struct lov_request *lovreq;
728 lovreq = container_of(oinfo, struct lov_request, rq_oi);
729 rc= lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
730 if (lov_finished_set(lovreq->rq_rqset))
731 lov_put_reqset(lovreq->rq_rqset);
736 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
737 struct lov_stripe_md **lsmp, struct obdo *src_oa,
738 struct obd_trans_info *oti,
739 struct lov_request_set **reqset)
741 struct lov_request_set *set;
745 OBD_ALLOC(set, sizeof(*set));
752 set->set_oi->oi_md = *lsmp;
753 set->set_oi->oi_oa = src_oa;
757 rc = qos_prep_create(exp, set);
758 /* qos_shrink_lsm() may have allocated a new lsm */
759 *lsmp = oinfo->oi_md;
761 lov_fini_create_set(set, lsmp);
769 static int common_attr_done(struct lov_request_set *set)
771 struct list_head *pos;
772 struct lov_request *req;
774 int rc = 0, attrset = 0;
777 LASSERT(set->set_oi != NULL);
779 if (set->set_oi->oi_oa == NULL)
782 if (!set->set_success)
787 GOTO(out, rc = -ENOMEM);
789 list_for_each (pos, &set->set_list) {
790 req = list_entry(pos, struct lov_request, rq_link);
792 if (!req->rq_complete || req->rq_rc)
794 if (req->rq_oi.oi_oa->o_valid == 0) /* inactive stripe */
796 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
797 req->rq_oi.oi_oa->o_valid,
798 set->set_oi->oi_md, req->rq_stripe, &attrset);
801 CERROR("No stripes had valid attrs\n");
804 tmp_oa->o_id = set->set_oi->oi_oa->o_id;
805 memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
813 static int brw_done(struct lov_request_set *set)
815 struct lov_stripe_md *lsm = set->set_oi->oi_md;
816 struct lov_oinfo *loi = NULL;
817 struct list_head *pos;
818 struct lov_request *req;
821 list_for_each (pos, &set->set_list) {
822 req = list_entry(pos, struct lov_request, rq_link);
824 if (!req->rq_complete || req->rq_rc)
827 loi = lsm->lsm_oinfo[req->rq_stripe];
829 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
830 loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
836 int lov_fini_brw_set(struct lov_request_set *set)
843 LASSERT(set->set_exp);
844 if (set->set_completes) {
846 /* FIXME update qos data here */
853 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
854 obd_count oa_bufs, struct brw_page *pga,
855 struct obd_trans_info *oti,
856 struct lov_request_set **reqset)
863 struct lov_request_set *set;
864 struct lov_oinfo *loi = NULL;
865 struct lov_obd *lov = &exp->exp_obd->u.lov;
866 int rc = 0, i, shift;
869 OBD_ALLOC(set, sizeof(*set));
877 set->set_oabufs = oa_bufs;
878 OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
880 GOTO(out, rc = -ENOMEM);
882 OBD_ALLOC(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
884 GOTO(out, rc = -ENOMEM);
886 /* calculate the page count for each stripe */
887 for (i = 0; i < oa_bufs; i++) {
888 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
889 info[stripe].count++;
892 /* alloc and initialize lov request */
894 for (i = 0 ; i < oinfo->oi_md->lsm_stripe_count; i++){
895 struct lov_request *req;
897 if (info[i].count == 0)
900 loi = oinfo->oi_md->lsm_oinfo[i];
901 if (!lov->lov_tgts[loi->loi_ost_idx] ||
902 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
903 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
904 GOTO(out, rc = -EIO);
907 OBD_ALLOC(req, sizeof(*req));
909 GOTO(out, rc = -ENOMEM);
911 OBDO_ALLOC(req->rq_oi.oi_oa);
912 if (req->rq_oi.oi_oa == NULL) {
913 OBD_FREE(req, sizeof(*req));
914 GOTO(out, rc = -ENOMEM);
918 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
919 sizeof(*req->rq_oi.oi_oa));
921 req->rq_oi.oi_oa->o_id = loi->loi_id;
922 req->rq_oi.oi_oa->o_stripe_idx = i;
924 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
925 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
926 if (req->rq_oi.oi_md == NULL) {
927 OBDO_FREE(req->rq_oi.oi_oa);
928 OBD_FREE(req, sizeof(*req));
929 GOTO(out, rc = -ENOMEM);
932 req->rq_idx = loi->loi_ost_idx;
935 /* XXX LOV STACKING */
936 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
937 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
938 req->rq_oabufs = info[i].count;
939 req->rq_pgaidx = shift;
940 shift += req->rq_oabufs;
942 /* remember the index for sort brw_page array */
943 info[i].index = req->rq_pgaidx;
945 lov_set_add_req(req, set);
948 GOTO(out, rc = -EIO);
950 /* rotate & sort the brw_page array */
951 for (i = 0; i < oa_bufs; i++) {
952 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
954 shift = info[stripe].index + info[stripe].off;
955 LASSERT(shift < oa_bufs);
956 set->set_pga[shift] = pga[i];
957 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
958 &set->set_pga[shift].off);
963 OBD_FREE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
968 lov_fini_brw_set(set);
973 int lov_fini_getattr_set(struct lov_request_set *set)
980 LASSERT(set->set_exp);
981 if (set->set_completes)
982 rc = common_attr_done(set);
989 /* The callback for osc_getattr_async that finilizes a request info when a
990 * response is recieved. */
991 static int cb_getattr_update(struct obd_info *oinfo, int rc)
993 struct lov_request *lovreq;
994 lovreq = container_of(oinfo, struct lov_request, rq_oi);
995 return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
998 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
999 struct lov_request_set **reqset)
1001 struct lov_request_set *set;
1002 struct lov_oinfo *loi = NULL;
1003 struct lov_obd *lov = &exp->exp_obd->u.lov;
1007 OBD_ALLOC(set, sizeof(*set));
1013 set->set_oi = oinfo;
1015 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1016 struct lov_request *req;
1018 loi = oinfo->oi_md->lsm_oinfo[i];
1019 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1020 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1021 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1025 OBD_ALLOC(req, sizeof(*req));
1027 GOTO(out_set, rc = -ENOMEM);
1030 req->rq_idx = loi->loi_ost_idx;
1032 OBDO_ALLOC(req->rq_oi.oi_oa);
1033 if (req->rq_oi.oi_oa == NULL) {
1034 OBD_FREE(req, sizeof(*req));
1035 GOTO(out_set, rc = -ENOMEM);
1037 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1038 sizeof(*req->rq_oi.oi_oa));
1039 req->rq_oi.oi_oa->o_id = loi->loi_id;
1040 req->rq_oi.oi_cb_up = cb_getattr_update;
1042 lov_set_add_req(req, set);
1044 if (!set->set_count)
1045 GOTO(out_set, rc = -EIO);
1049 lov_fini_getattr_set(set);
1053 int lov_fini_destroy_set(struct lov_request_set *set)
1059 LASSERT(set->set_exp);
1060 if (set->set_completes) {
1061 /* FIXME update qos data here */
1064 lov_put_reqset(set);
1069 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1070 struct obdo *src_oa, struct lov_stripe_md *lsm,
1071 struct obd_trans_info *oti,
1072 struct lov_request_set **reqset)
1074 struct lov_request_set *set;
1075 struct lov_oinfo *loi = NULL;
1076 struct lov_obd *lov = &exp->exp_obd->u.lov;
1080 OBD_ALLOC(set, sizeof(*set));
1086 set->set_oi = oinfo;
1087 set->set_oi->oi_md = lsm;
1088 set->set_oi->oi_oa = src_oa;
1090 if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1091 set->set_cookies = oti->oti_logcookies;
1093 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1094 struct lov_request *req;
1096 loi = lsm->lsm_oinfo[i];
1097 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1098 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1099 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1103 OBD_ALLOC(req, sizeof(*req));
1105 GOTO(out_set, rc = -ENOMEM);
1108 req->rq_idx = loi->loi_ost_idx;
1110 OBDO_ALLOC(req->rq_oi.oi_oa);
1111 if (req->rq_oi.oi_oa == NULL) {
1112 OBD_FREE(req, sizeof(*req));
1113 GOTO(out_set, rc = -ENOMEM);
1115 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1116 req->rq_oi.oi_oa->o_id = loi->loi_id;
1117 lov_set_add_req(req, set);
1119 if (!set->set_count)
1120 GOTO(out_set, rc = -EIO);
1124 lov_fini_destroy_set(set);
1128 int lov_fini_setattr_set(struct lov_request_set *set)
1135 LASSERT(set->set_exp);
1136 if (set->set_completes) {
1137 rc = common_attr_done(set);
1138 /* FIXME update qos data here */
1141 lov_put_reqset(set);
1145 int lov_update_setattr_set(struct lov_request_set *set,
1146 struct lov_request *req, int rc)
1148 struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1149 struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1152 lov_update_set(set, req, rc);
1154 /* grace error on inactive ost */
1155 if (rc && !(lov->lov_tgts[req->rq_idx] &&
1156 lov->lov_tgts[req->rq_idx]->ltd_active))
1160 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1161 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1162 req->rq_oi.oi_oa->o_ctime;
1163 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1164 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1165 req->rq_oi.oi_oa->o_mtime;
1166 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1167 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1168 req->rq_oi.oi_oa->o_atime;
1174 /* The callback for osc_setattr_async that finilizes a request info when a
1175 * response is recieved. */
1176 static int cb_setattr_update(struct obd_info *oinfo, int rc)
1178 struct lov_request *lovreq;
1179 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1180 return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1183 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1184 struct obd_trans_info *oti,
1185 struct lov_request_set **reqset)
1187 struct lov_request_set *set;
1188 struct lov_oinfo *loi = NULL;
1189 struct lov_obd *lov = &exp->exp_obd->u.lov;
1193 OBD_ALLOC(set, sizeof(*set));
1200 set->set_oi = oinfo;
1201 if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1202 set->set_cookies = oti->oti_logcookies;
1204 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1205 struct lov_request *req;
1207 loi = oinfo->oi_md->lsm_oinfo[i];
1208 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1209 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1210 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1214 OBD_ALLOC(req, sizeof(*req));
1216 GOTO(out_set, rc = -ENOMEM);
1218 req->rq_idx = loi->loi_ost_idx;
1220 OBDO_ALLOC(req->rq_oi.oi_oa);
1221 if (req->rq_oi.oi_oa == NULL) {
1222 OBD_FREE(req, sizeof(*req));
1223 GOTO(out_set, rc = -ENOMEM);
1225 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1226 sizeof(*req->rq_oi.oi_oa));
1227 req->rq_oi.oi_oa->o_id = loi->loi_id;
1228 req->rq_oi.oi_oa->o_stripe_idx = i;
1229 req->rq_oi.oi_cb_up = cb_setattr_update;
1231 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1232 int off = lov_stripe_offset(oinfo->oi_md,
1233 oinfo->oi_oa->o_size, i,
1234 &req->rq_oi.oi_oa->o_size);
1236 if (off < 0 && req->rq_oi.oi_oa->o_size)
1237 req->rq_oi.oi_oa->o_size--;
1239 CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1240 i, req->rq_oi.oi_oa->o_size,
1241 oinfo->oi_oa->o_size);
1243 lov_set_add_req(req, set);
1245 if (!set->set_count)
1246 GOTO(out_set, rc = -EIO);
1250 lov_fini_setattr_set(set);
1254 int lov_fini_punch_set(struct lov_request_set *set)
1261 LASSERT(set->set_exp);
1262 if (set->set_completes) {
1264 /* FIXME update qos data here */
1265 if (set->set_success)
1266 rc = common_attr_done(set);
1269 lov_put_reqset(set);
1274 int lov_update_punch_set(struct lov_request_set *set,
1275 struct lov_request *req, int rc)
1277 struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1278 struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1281 lov_update_set(set, req, rc);
1283 /* grace error on inactive ost */
1284 if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1288 lov_stripe_lock(lsm);
1289 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1290 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1291 req->rq_oi.oi_oa->o_blocks;
1294 /* Do we need to update lvb_size here? It needn't because
1295 * it have been done in ll_truncate(). -jay */
1296 lov_stripe_unlock(lsm);
1302 /* The callback for osc_punch that finilizes a request info when a response
1304 static int cb_update_punch(struct obd_info *oinfo, int rc)
1306 struct lov_request *lovreq;
1307 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1308 return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1311 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1312 struct obd_trans_info *oti,
1313 struct lov_request_set **reqset)
1315 struct lov_request_set *set;
1316 struct lov_oinfo *loi = NULL;
1317 struct lov_obd *lov = &exp->exp_obd->u.lov;
1321 OBD_ALLOC(set, sizeof(*set));
1326 set->set_oi = oinfo;
1329 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1330 struct lov_request *req;
1333 if (!lov_stripe_intersects(oinfo->oi_md, i,
1334 oinfo->oi_policy.l_extent.start,
1335 oinfo->oi_policy.l_extent.end,
1339 loi = oinfo->oi_md->lsm_oinfo[i];
1340 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1341 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1342 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1343 GOTO(out_set, rc = -EIO);
1346 OBD_ALLOC(req, sizeof(*req));
1348 GOTO(out_set, rc = -ENOMEM);
1350 req->rq_idx = loi->loi_ost_idx;
1352 OBDO_ALLOC(req->rq_oi.oi_oa);
1353 if (req->rq_oi.oi_oa == NULL) {
1354 OBD_FREE(req, sizeof(*req));
1355 GOTO(out_set, rc = -ENOMEM);
1357 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1358 sizeof(*req->rq_oi.oi_oa));
1359 req->rq_oi.oi_oa->o_id = loi->loi_id;
1360 req->rq_oi.oi_oa->o_stripe_idx = i;
1361 req->rq_oi.oi_cb_up = cb_update_punch;
1363 req->rq_oi.oi_policy.l_extent.start = rs;
1364 req->rq_oi.oi_policy.l_extent.end = re;
1365 req->rq_oi.oi_policy.l_extent.gid = -1;
1367 lov_set_add_req(req, set);
1369 if (!set->set_count)
1370 GOTO(out_set, rc = -EIO);
1374 lov_fini_punch_set(set);
1378 int lov_fini_sync_set(struct lov_request_set *set)
1385 LASSERT(set->set_exp);
1386 if (set->set_completes) {
1387 if (!set->set_success)
1389 /* FIXME update qos data here */
1392 lov_put_reqset(set);
1397 /* The callback for osc_sync that finilizes a request info when a
1398 * response is recieved. */
1399 static int cb_sync_update(struct obd_info *oinfo, int rc)
1401 struct lov_request *lovreq;
1402 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1403 return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
1406 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1407 obd_off start, obd_off end,
1408 struct lov_request_set **reqset)
1410 struct lov_request_set *set;
1411 struct lov_oinfo *loi = NULL;
1412 struct lov_obd *lov = &exp->exp_obd->u.lov;
1416 OBD_ALLOC(set, sizeof(*set));
1422 set->set_oi = oinfo;
1423 set->set_oi->oi_md = oinfo->oi_md;
1424 set->set_oi->oi_oa = oinfo->oi_oa;
1426 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1427 struct lov_request *req;
1430 loi = oinfo->oi_md->lsm_oinfo[i];
1431 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1432 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1433 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1437 if (!lov_stripe_intersects(oinfo->oi_md, i, start,
1441 OBD_ALLOC(req, sizeof(*req));
1443 GOTO(out_set, rc = -ENOMEM);
1445 req->rq_idx = loi->loi_ost_idx;
1447 OBDO_ALLOC(req->rq_oi.oi_oa);
1448 if (req->rq_oi.oi_oa == NULL) {
1449 OBD_FREE(req, sizeof(*req));
1450 GOTO(out_set, rc = -ENOMEM);
1452 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1453 sizeof(*req->rq_oi.oi_oa));
1454 req->rq_oi.oi_oa->o_id = loi->loi_id;
1455 req->rq_oi.oi_oa->o_stripe_idx = i;
1457 req->rq_oi.oi_policy.l_extent.start = rs;
1458 req->rq_oi.oi_policy.l_extent.end = re;
1459 req->rq_oi.oi_policy.l_extent.gid = -1;
1460 req->rq_oi.oi_cb_up = cb_sync_update;
1462 lov_set_add_req(req, set);
1464 if (!set->set_count)
1465 GOTO(out_set, rc = -EIO);
1469 lov_fini_sync_set(set);
1473 #define LOV_U64_MAX ((__u64)~0ULL)
1474 #define LOV_SUM_MAX(tot, add) \
1476 if ((tot) + (add) < (tot)) \
1477 (tot) = LOV_U64_MAX; \
1482 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1487 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0);
1489 if (osfs->os_files != LOV_U64_MAX)
1490 do_div(osfs->os_files, expected_stripes);
1491 if (osfs->os_ffree != LOV_U64_MAX)
1492 do_div(osfs->os_ffree, expected_stripes);
1494 spin_lock(&obd->obd_osfs_lock);
1495 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1496 obd->obd_osfs_age = cfs_time_current_64();
1497 spin_unlock(&obd->obd_osfs_lock);
1504 int lov_fini_statfs_set(struct lov_request_set *set)
1512 if (set->set_completes) {
1513 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1516 lov_put_reqset(set);
1520 void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
1523 int shift = 0, quit = 0;
1527 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1529 if (osfs->os_bsize != lov_sfs->os_bsize) {
1530 /* assume all block sizes are always powers of 2 */
1531 /* get the bits difference */
1532 tmp = osfs->os_bsize | lov_sfs->os_bsize;
1533 for (shift = 0; shift <= 64; ++shift) {
1545 if (osfs->os_bsize < lov_sfs->os_bsize) {
1546 osfs->os_bsize = lov_sfs->os_bsize;
1548 osfs->os_bfree >>= shift;
1549 osfs->os_bavail >>= shift;
1550 osfs->os_blocks >>= shift;
1551 } else if (shift != 0) {
1552 lov_sfs->os_bfree >>= shift;
1553 lov_sfs->os_bavail >>= shift;
1554 lov_sfs->os_blocks >>= shift;
1557 /* Sandia requested that df (and so, statfs) only
1558 returned minimal available space on
1559 a single OST, so people would be able to
1560 write this much data guaranteed. */
1561 if (osfs->os_bavail > lov_sfs->os_bavail) {
1562 /* Presumably if new bavail is smaller,
1563 new bfree is bigger as well */
1564 osfs->os_bfree = lov_sfs->os_bfree;
1565 osfs->os_bavail = lov_sfs->os_bavail;
1568 osfs->os_bfree += lov_sfs->os_bfree;
1569 osfs->os_bavail += lov_sfs->os_bavail;
1571 osfs->os_blocks += lov_sfs->os_blocks;
1572 /* XXX not sure about this one - depends on policy.
1573 * - could be minimum if we always stripe on all OBDs
1574 * (but that would be wrong for any other policy,
1575 * if one of the OBDs has no more objects left)
1576 * - could be sum if we stripe whole objects
1577 * - could be average, just to give a nice number
1579 * To give a "reasonable" (if not wholly accurate)
1580 * number, we divide the total number of free objects
1581 * by expected stripe count (watch out for overflow).
1583 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1584 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1588 /* The callback for osc_statfs_async that finilizes a request info when a
1589 * response is received. */
1590 static int cb_statfs_update(struct obd_info *oinfo, int rc)
1592 struct lov_request *lovreq;
1593 struct obd_statfs *osfs, *lov_sfs;
1594 struct obd_device *obd;
1595 struct lov_obd *lov;
1599 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1600 lov = &lovreq->rq_rqset->set_obd->u.lov;
1601 obd = class_exp2obd(lov->lov_tgts[lovreq->rq_idx]->ltd_exp);
1603 osfs = lovreq->rq_rqset->set_oi->oi_osfs;
1604 lov_sfs = oinfo->oi_osfs;
1606 success = lovreq->rq_rqset->set_success;
1607 /* XXX: the same is done in lov_update_common_set, however
1608 lovset->set_exp is not initialized. */
1609 lov_update_set(lovreq->rq_rqset, lovreq, rc);
1611 /* XXX ignore error for disconnected ost ? */
1612 if (rc && !(lov->lov_tgts[lovreq->rq_idx] &&
1613 lov->lov_tgts[lovreq->rq_idx]->ltd_active))
1618 spin_lock(&obd->obd_osfs_lock);
1619 memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1620 if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
1621 obd->obd_osfs_age = cfs_time_current_64();
1622 spin_unlock(&obd->obd_osfs_lock);
1624 lov_update_statfs(osfs, lov_sfs, success);
1627 if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
1628 lov_finished_set(lovreq->rq_rqset)) {
1629 lov_statfs_interpret(NULL, lovreq->rq_rqset,
1630 lovreq->rq_rqset->set_success !=
1631 lovreq->rq_rqset->set_count);
1632 qos_statfs_done(lov);
1638 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1639 struct lov_request_set **reqset)
1641 struct lov_request_set *set;
1642 struct lov_obd *lov = &obd->u.lov;
1646 OBD_ALLOC(set, sizeof(*set));
1652 set->set_oi = oinfo;
1654 /* We only get block data from the OBD */
1655 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1656 struct lov_request *req;
1658 if (!lov->lov_tgts[i] || (!lov->lov_tgts[i]->ltd_active
1659 && (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
1660 CDEBUG(D_HA, "lov idx %d inactive\n", i);
1664 /* skip targets that have been explicitely disabled by the
1666 if (!lov->lov_tgts[i]->ltd_exp) {
1667 CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
1671 OBD_ALLOC(req, sizeof(*req));
1673 GOTO(out_set, rc = -ENOMEM);
1675 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1676 if (req->rq_oi.oi_osfs == NULL) {
1677 OBD_FREE(req, sizeof(*req));
1678 GOTO(out_set, rc = -ENOMEM);
1682 req->rq_oi.oi_cb_up = cb_statfs_update;
1683 req->rq_oi.oi_flags = oinfo->oi_flags;
1685 lov_set_add_req(req, set);
1687 if (!set->set_count)
1688 GOTO(out_set, rc = -EIO);
1692 lov_fini_statfs_set(set);