1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
32 * Copyright (c) 2011 Whamcloud, Inc.
36 * This file is part of Lustre, http://www.lustre.org/
37 * Lustre is a trademark of Sun Microsystems, Inc.
41 # define EXPORT_SYMTAB
43 #define DEBUG_SUBSYSTEM S_LOV
46 #include <libcfs/libcfs.h>
48 #include <liblustre.h>
51 #include <obd_class.h>
53 #include <lustre/lustre_idl.h>
55 #include "lov_internal.h"
57 static void lov_init_set(struct lov_request_set *set)
60 set->set_completes = 0;
63 CFS_INIT_LIST_HEAD(&set->set_list);
64 cfs_atomic_set(&set->set_refcount, 1);
65 cfs_waitq_init(&set->set_waitq);
66 cfs_spin_lock_init(&set->set_lock);
69 void lov_finish_set(struct lov_request_set *set)
75 cfs_list_for_each_safe(pos, n, &set->set_list) {
76 struct lov_request *req = cfs_list_entry(pos,
79 cfs_list_del_init(&req->rq_link);
82 OBDO_FREE(req->rq_oi.oi_oa);
84 OBD_FREE_LARGE(req->rq_oi.oi_md, req->rq_buflen);
85 if (req->rq_oi.oi_osfs)
86 OBD_FREE(req->rq_oi.oi_osfs,
87 sizeof(*req->rq_oi.oi_osfs));
88 OBD_FREE(req, sizeof(*req));
92 int len = set->set_oabufs * sizeof(*set->set_pga);
93 OBD_FREE_LARGE(set->set_pga, len);
96 lov_llh_put(set->set_lockh);
98 OBD_FREE(set, sizeof(*set));
102 int lov_finished_set(struct lov_request_set *set)
104 CDEBUG(D_INFO, "check set %d/%d\n", set->set_completes,
106 return set->set_completes == set->set_count;
109 void lov_update_set(struct lov_request_set *set,
110 struct lov_request *req, int rc)
112 req->rq_complete = 1;
115 set->set_completes++;
119 cfs_waitq_signal(&set->set_waitq);
122 int lov_update_common_set(struct lov_request_set *set,
123 struct lov_request *req, int rc)
125 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
128 lov_update_set(set, req, rc);
130 /* grace error on inactive ost */
131 if (rc && !(lov->lov_tgts[req->rq_idx] &&
132 lov->lov_tgts[req->rq_idx]->ltd_active))
135 /* FIXME in raid1 regime, should return 0 */
139 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
141 cfs_list_add_tail(&req->rq_link, &set->set_list);
146 extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
147 struct lov_oinfo *loi, int flags,
148 struct ost_lvb *lvb, __u32 mode, int rc);
150 static int lov_update_enqueue_lov(struct obd_export *exp,
151 struct lustre_handle *lov_lockhp,
152 struct lov_oinfo *loi, int flags, int idx,
155 struct lov_obd *lov = &exp->exp_obd->u.lov;
157 if (rc != ELDLM_OK &&
158 !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) {
159 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
160 if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) {
161 /* -EUSERS used by OST to report file contention */
162 if (rc != -EINTR && rc != -EUSERS)
163 CERROR("enqueue objid "LPX64" subobj "
164 LPX64" on OST idx %d: rc %d\n",
165 oid, loi->loi_id, loi->loi_ost_idx, rc);
172 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
174 struct lov_request_set *set = req->rq_rqset;
175 struct lustre_handle *lov_lockhp;
176 struct obd_info *oi = set->set_oi;
177 struct lov_oinfo *loi;
182 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
183 loi = oi->oi_md->lsm_oinfo[req->rq_stripe];
185 /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
186 * and that copy can be arbitrarily out of date.
188 * The LOV API is due for a serious rewriting anyways, and this
189 * can be addressed then. */
191 lov_stripe_lock(oi->oi_md);
192 osc_update_enqueue(lov_lockhp, loi, oi->oi_flags,
193 &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc);
194 if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT))
195 memset(lov_lockhp, 0, sizeof *lov_lockhp);
196 rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags,
197 req->rq_idx, oi->oi_md->lsm_object_id, rc);
198 lov_stripe_unlock(oi->oi_md);
199 lov_update_set(set, req, rc);
203 /* The callback for osc_enqueue that updates lov info for every OSC request. */
204 static int cb_update_enqueue(void *cookie, int rc)
206 struct obd_info *oinfo = cookie;
207 struct ldlm_enqueue_info *einfo;
208 struct lov_request *lovreq;
210 lovreq = container_of(oinfo, struct lov_request, rq_oi);
211 einfo = lovreq->rq_rqset->set_ei;
212 return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
215 static int enqueue_done(struct lov_request_set *set, __u32 mode)
217 struct lov_request *req;
218 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
222 /* enqueue/match success, just return */
223 if (set->set_completes && set->set_completes == set->set_success)
226 /* cancel enqueued/matched locks */
227 cfs_list_for_each_entry(req, &set->set_list, rq_link) {
228 struct lustre_handle *lov_lockhp;
230 if (!req->rq_complete || req->rq_rc)
233 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
235 if (!lustre_handle_is_used(lov_lockhp))
238 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
239 req->rq_oi.oi_md, mode, lov_lockhp);
240 if (rc && lov->lov_tgts[req->rq_idx] &&
241 lov->lov_tgts[req->rq_idx]->ltd_active)
242 CERROR("cancelling obdjid "LPX64" on OST "
243 "idx %d error: rc = %d\n",
244 req->rq_oi.oi_md->lsm_object_id,
248 lov_llh_put(set->set_lockh);
252 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
253 struct ptlrpc_request_set *rqset)
260 LASSERT(set->set_exp);
261 /* Do enqueue_done only for sync requests and if any request
265 set->set_completes = 0;
266 ret = enqueue_done(set, mode);
267 } else if (set->set_lockh)
268 lov_llh_put(set->set_lockh);
272 RETURN(rc ? rc : ret);
275 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
276 struct ldlm_enqueue_info *einfo,
277 struct lov_request_set **reqset)
279 struct lov_obd *lov = &exp->exp_obd->u.lov;
280 struct lov_request_set *set;
284 OBD_ALLOC(set, sizeof(*set));
292 set->set_lockh = lov_llh_new(oinfo->oi_md);
293 if (set->set_lockh == NULL)
294 GOTO(out_set, rc = -ENOMEM);
295 oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
297 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
298 struct lov_oinfo *loi;
299 struct lov_request *req;
302 loi = oinfo->oi_md->lsm_oinfo[i];
303 if (!lov_stripe_intersects(oinfo->oi_md, i,
304 oinfo->oi_policy.l_extent.start,
305 oinfo->oi_policy.l_extent.end,
309 if (!lov->lov_tgts[loi->loi_ost_idx] ||
310 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
311 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
315 OBD_ALLOC(req, sizeof(*req));
317 GOTO(out_set, rc = -ENOMEM);
319 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
320 sizeof(struct lov_oinfo *) +
321 sizeof(struct lov_oinfo);
322 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
323 if (req->rq_oi.oi_md == NULL) {
324 OBD_FREE(req, sizeof(*req));
325 GOTO(out_set, rc = -ENOMEM);
327 req->rq_oi.oi_md->lsm_oinfo[0] =
328 ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
329 sizeof(struct lov_oinfo *);
331 /* Set lov request specific parameters. */
332 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
333 req->rq_oi.oi_cb_up = cb_update_enqueue;
334 req->rq_oi.oi_flags = oinfo->oi_flags;
336 LASSERT(req->rq_oi.oi_lockh);
338 req->rq_oi.oi_policy.l_extent.gid =
339 oinfo->oi_policy.l_extent.gid;
340 req->rq_oi.oi_policy.l_extent.start = start;
341 req->rq_oi.oi_policy.l_extent.end = end;
343 req->rq_idx = loi->loi_ost_idx;
346 /* XXX LOV STACKING: submd should be from the subobj */
347 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
348 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
349 req->rq_oi.oi_md->lsm_stripe_count = 0;
350 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
352 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
353 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
355 lov_set_add_req(req, set);
358 GOTO(out_set, rc = -EIO);
362 lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
366 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
376 lov_update_set(set, req, ret);
380 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
387 LASSERT(set->set_exp);
388 rc = enqueue_done(set, mode);
389 if ((set->set_count == set->set_success) &&
390 (flags & LDLM_FL_TEST_LOCK))
391 lov_llh_put(set->set_lockh);
398 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
399 struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
400 __u32 mode, struct lustre_handle *lockh,
401 struct lov_request_set **reqset)
403 struct lov_obd *lov = &exp->exp_obd->u.lov;
404 struct lov_request_set *set;
408 OBD_ALLOC(set, sizeof(*set));
415 set->set_oi->oi_md = lsm;
416 set->set_lockh = lov_llh_new(lsm);
417 if (set->set_lockh == NULL)
418 GOTO(out_set, rc = -ENOMEM);
419 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
421 for (i = 0; i < lsm->lsm_stripe_count; i++){
422 struct lov_oinfo *loi;
423 struct lov_request *req;
426 loi = lsm->lsm_oinfo[i];
427 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
428 policy->l_extent.end, &start, &end))
431 /* FIXME raid1 should grace this error */
432 if (!lov->lov_tgts[loi->loi_ost_idx] ||
433 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
434 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
435 GOTO(out_set, rc = -EIO);
438 OBD_ALLOC(req, sizeof(*req));
440 GOTO(out_set, rc = -ENOMEM);
442 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
443 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
444 if (req->rq_oi.oi_md == NULL) {
445 OBD_FREE(req, sizeof(*req));
446 GOTO(out_set, rc = -ENOMEM);
449 req->rq_oi.oi_policy.l_extent.start = start;
450 req->rq_oi.oi_policy.l_extent.end = end;
451 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
453 req->rq_idx = loi->loi_ost_idx;
456 /* XXX LOV STACKING: submd should be from the subobj */
457 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
458 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
459 req->rq_oi.oi_md->lsm_stripe_count = 0;
461 lov_set_add_req(req, set);
464 GOTO(out_set, rc = -EIO);
468 lov_fini_match_set(set, mode, 0);
472 int lov_fini_cancel_set(struct lov_request_set *set)
480 LASSERT(set->set_exp);
482 lov_llh_put(set->set_lockh);
489 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
490 struct lov_stripe_md *lsm, __u32 mode,
491 struct lustre_handle *lockh,
492 struct lov_request_set **reqset)
494 struct lov_request_set *set;
498 OBD_ALLOC(set, sizeof(*set));
505 set->set_oi->oi_md = lsm;
506 set->set_lockh = lov_handle2llh(lockh);
507 if (set->set_lockh == NULL) {
508 CERROR("LOV: invalid lov lock handle %p\n", lockh);
509 GOTO(out_set, rc = -EINVAL);
511 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
513 for (i = 0; i < lsm->lsm_stripe_count; i++){
514 struct lov_request *req;
515 struct lustre_handle *lov_lockhp;
516 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
518 lov_lockhp = set->set_lockh->llh_handles + i;
519 if (!lustre_handle_is_used(lov_lockhp)) {
520 CDEBUG(D_INFO, "lov idx %d subobj "LPX64" no lock\n",
521 loi->loi_ost_idx, loi->loi_id);
525 OBD_ALLOC(req, sizeof(*req));
527 GOTO(out_set, rc = -ENOMEM);
529 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
530 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
531 if (req->rq_oi.oi_md == NULL) {
532 OBD_FREE(req, sizeof(*req));
533 GOTO(out_set, rc = -ENOMEM);
536 req->rq_idx = loi->loi_ost_idx;
539 /* XXX LOV STACKING: submd should be from the subobj */
540 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
541 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
542 req->rq_oi.oi_md->lsm_stripe_count = 0;
544 lov_set_add_req(req, set);
547 GOTO(out_set, rc = -EIO);
551 lov_fini_cancel_set(set);
555 static int lov_update_create_set(struct lov_request_set *set,
556 struct lov_request *req, int rc)
558 struct obd_trans_info *oti = set->set_oti;
559 struct lov_stripe_md *lsm = set->set_oi->oi_md;
560 struct lov_oinfo *loi;
561 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
564 if (rc && lov->lov_tgts[req->rq_idx] &&
565 lov->lov_tgts[req->rq_idx]->ltd_active) {
566 CERROR("error creating fid "LPX64" sub-object"
567 " on OST idx %d/%d: rc = %d\n",
568 set->set_oi->oi_oa->o_id, req->rq_idx,
569 lsm->lsm_stripe_count, rc);
571 CERROR("obd_create returned invalid err %d\n", rc);
576 cfs_spin_lock(&set->set_lock);
577 req->rq_stripe = set->set_success;
578 loi = lsm->lsm_oinfo[req->rq_stripe];
582 lov_update_set(set, req, rc);
583 cfs_spin_unlock(&set->set_lock);
587 loi->loi_id = req->rq_oi.oi_oa->o_id;
588 loi->loi_seq = req->rq_oi.oi_oa->o_seq;
589 loi->loi_ost_idx = req->rq_idx;
592 if (oti && set->set_cookies)
593 ++oti->oti_logcookies;
594 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
595 set->set_cookie_sent++;
597 lov_update_set(set, req, rc);
598 cfs_spin_unlock(&set->set_lock);
600 CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
601 lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
605 static int create_done(struct obd_export *exp, struct lov_request_set *set,
606 struct lov_stripe_md **lsmp)
608 struct lov_obd *lov = &exp->exp_obd->u.lov;
609 struct obd_trans_info *oti = set->set_oti;
610 struct obdo *src_oa = set->set_oi->oi_oa;
611 struct lov_request *req;
612 struct obdo *ret_oa = NULL;
613 int attrset = 0, rc = 0;
616 LASSERT(set->set_completes);
618 /* try alloc objects on other osts if osc_create fails for
619 * exceptions: RPC failure, ENOSPC, etc */
620 if (set->set_count != set->set_success) {
621 cfs_list_for_each_entry (req, &set->set_list, rq_link) {
625 set->set_completes--;
626 req->rq_complete = 0;
628 rc = qos_remedy_create(set, req);
629 lov_update_create_set(set, req, rc);
633 /* no successful creates */
634 if (set->set_success == 0)
637 if (set->set_count != set->set_success) {
638 set->set_count = set->set_success;
644 GOTO(cleanup, rc = -ENOMEM);
646 cfs_list_for_each_entry(req, &set->set_list, rq_link) {
647 if (!req->rq_complete || req->rq_rc)
649 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
650 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
651 req->rq_stripe, &attrset);
653 if (src_oa->o_valid & OBD_MD_FLSIZE &&
654 ret_oa->o_size != src_oa->o_size) {
655 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
656 src_oa->o_size, ret_oa->o_size);
659 ret_oa->o_id = src_oa->o_id;
660 ret_oa->o_seq = src_oa->o_seq;
661 ret_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
662 memcpy(src_oa, ret_oa, sizeof(*src_oa));
665 *lsmp = set->set_oi->oi_md;
669 cfs_list_for_each_entry(req, &set->set_list, rq_link) {
670 struct obd_export *sub_exp;
673 if (!req->rq_complete || req->rq_rc)
676 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
677 err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL,
680 CERROR("Failed to uncreate objid "LPX64" subobj "
681 LPX64" on OST idx %d: rc = %d\n",
682 src_oa->o_id, req->rq_oi.oi_oa->o_id,
686 obd_free_memmd(exp, &set->set_oi->oi_md);
688 if (oti && set->set_cookies) {
689 oti->oti_logcookies = set->set_cookies;
690 if (!set->set_cookie_sent) {
691 oti_free_cookies(oti);
692 src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
694 src_oa->o_valid |= OBD_MD_FLCOOKIE;
700 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
707 LASSERT(set->set_exp);
708 if (set->set_completes)
709 rc = create_done(set->set_exp, set, lsmp);
715 int cb_create_update(void *cookie, int rc)
717 struct obd_info *oinfo = cookie;
718 struct lov_request *lovreq;
720 lovreq = container_of(oinfo, struct lov_request, rq_oi);
722 if (CFS_FAIL_CHECK(OBD_FAIL_MDS_OSC_CREATE_FAIL))
723 if (lovreq->rq_idx == cfs_fail_val)
726 rc= lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
727 if (lov_finished_set(lovreq->rq_rqset))
728 lov_put_reqset(lovreq->rq_rqset);
732 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
733 struct lov_stripe_md **lsmp, struct obdo *src_oa,
734 struct obd_trans_info *oti,
735 struct lov_request_set **reqset)
737 struct lov_request_set *set;
741 OBD_ALLOC(set, sizeof(*set));
748 set->set_oi->oi_md = *lsmp;
749 set->set_oi->oi_oa = src_oa;
753 rc = qos_prep_create(exp, set);
754 /* qos_shrink_lsm() may have allocated a new lsm */
755 *lsmp = oinfo->oi_md;
757 lov_fini_create_set(set, lsmp);
765 static int common_attr_done(struct lov_request_set *set)
768 struct lov_request *req;
770 int rc = 0, attrset = 0;
773 LASSERT(set->set_oi != NULL);
775 if (set->set_oi->oi_oa == NULL)
778 if (!set->set_success)
783 GOTO(out, rc = -ENOMEM);
785 cfs_list_for_each (pos, &set->set_list) {
786 req = cfs_list_entry(pos, struct lov_request, rq_link);
788 if (!req->rq_complete || req->rq_rc)
790 if (req->rq_oi.oi_oa->o_valid == 0) /* inactive stripe */
792 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
793 req->rq_oi.oi_oa->o_valid,
794 set->set_oi->oi_md, req->rq_stripe, &attrset);
797 CERROR("No stripes had valid attrs\n");
800 if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
801 (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
802 /* When we take attributes of some epoch, we require all the
803 * ost to be active. */
804 CERROR("Not all the stripes had valid attrs\n");
805 GOTO(out, rc = -EIO);
808 tmp_oa->o_id = set->set_oi->oi_oa->o_id;
809 memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
817 static int brw_done(struct lov_request_set *set)
819 struct lov_stripe_md *lsm = set->set_oi->oi_md;
820 struct lov_oinfo *loi = NULL;
822 struct lov_request *req;
825 cfs_list_for_each (pos, &set->set_list) {
826 req = cfs_list_entry(pos, struct lov_request, rq_link);
828 if (!req->rq_complete || req->rq_rc)
831 loi = lsm->lsm_oinfo[req->rq_stripe];
833 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
834 loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
840 int lov_fini_brw_set(struct lov_request_set *set)
847 LASSERT(set->set_exp);
848 if (set->set_completes) {
850 /* FIXME update qos data here */
857 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
858 obd_count oa_bufs, struct brw_page *pga,
859 struct obd_trans_info *oti,
860 struct lov_request_set **reqset)
867 struct lov_request_set *set;
868 struct lov_obd *lov = &exp->exp_obd->u.lov;
869 int rc = 0, i, shift;
872 OBD_ALLOC(set, sizeof(*set));
880 set->set_oabufs = oa_bufs;
881 OBD_ALLOC_LARGE(set->set_pga, oa_bufs * sizeof(*set->set_pga));
883 GOTO(out, rc = -ENOMEM);
885 OBD_ALLOC_LARGE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
887 GOTO(out, rc = -ENOMEM);
889 /* calculate the page count for each stripe */
890 for (i = 0; i < oa_bufs; i++) {
891 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
892 info[stripe].count++;
895 /* alloc and initialize lov request */
897 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++){
898 struct lov_oinfo *loi = NULL;
899 struct lov_request *req;
901 if (info[i].count == 0)
904 loi = oinfo->oi_md->lsm_oinfo[i];
905 if (!lov->lov_tgts[loi->loi_ost_idx] ||
906 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
907 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
908 GOTO(out, rc = -EIO);
911 OBD_ALLOC(req, sizeof(*req));
913 GOTO(out, rc = -ENOMEM);
915 OBDO_ALLOC(req->rq_oi.oi_oa);
916 if (req->rq_oi.oi_oa == NULL) {
917 OBD_FREE(req, sizeof(*req));
918 GOTO(out, rc = -ENOMEM);
922 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
923 sizeof(*req->rq_oi.oi_oa));
925 req->rq_oi.oi_oa->o_id = loi->loi_id;
926 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
927 req->rq_oi.oi_oa->o_stripe_idx = i;
929 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
930 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
931 if (req->rq_oi.oi_md == NULL) {
932 OBDO_FREE(req->rq_oi.oi_oa);
933 OBD_FREE(req, sizeof(*req));
934 GOTO(out, rc = -ENOMEM);
937 req->rq_idx = loi->loi_ost_idx;
940 /* XXX LOV STACKING */
941 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
942 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
943 req->rq_oabufs = info[i].count;
944 req->rq_pgaidx = shift;
945 shift += req->rq_oabufs;
947 /* remember the index for sort brw_page array */
948 info[i].index = req->rq_pgaidx;
950 req->rq_oi.oi_capa = oinfo->oi_capa;
952 lov_set_add_req(req, set);
955 GOTO(out, rc = -EIO);
957 /* rotate & sort the brw_page array */
958 for (i = 0; i < oa_bufs; i++) {
959 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
961 shift = info[stripe].index + info[stripe].off;
962 LASSERT(shift < oa_bufs);
963 set->set_pga[shift] = pga[i];
964 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
965 &set->set_pga[shift].off);
971 sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
976 lov_fini_brw_set(set);
981 int lov_fini_getattr_set(struct lov_request_set *set)
988 LASSERT(set->set_exp);
989 if (set->set_completes)
990 rc = common_attr_done(set);
997 /* The callback for osc_getattr_async that finilizes a request info when a
998 * response is received. */
999 static int cb_getattr_update(void *cookie, int rc)
1001 struct obd_info *oinfo = cookie;
1002 struct lov_request *lovreq;
1003 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1004 return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
1007 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
1008 struct lov_request_set **reqset)
1010 struct lov_request_set *set;
1011 struct lov_obd *lov = &exp->exp_obd->u.lov;
1015 OBD_ALLOC(set, sizeof(*set));
1021 set->set_oi = oinfo;
1023 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1024 struct lov_oinfo *loi;
1025 struct lov_request *req;
1027 loi = oinfo->oi_md->lsm_oinfo[i];
1028 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1029 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1030 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1031 if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH)
1032 /* SOM requires all the OSTs to be active. */
1033 GOTO(out_set, rc = -EIO);
1037 OBD_ALLOC(req, sizeof(*req));
1039 GOTO(out_set, rc = -ENOMEM);
1042 req->rq_idx = loi->loi_ost_idx;
1044 OBDO_ALLOC(req->rq_oi.oi_oa);
1045 if (req->rq_oi.oi_oa == NULL) {
1046 OBD_FREE(req, sizeof(*req));
1047 GOTO(out_set, rc = -ENOMEM);
1049 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1050 sizeof(*req->rq_oi.oi_oa));
1051 req->rq_oi.oi_oa->o_id = loi->loi_id;
1052 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1053 req->rq_oi.oi_cb_up = cb_getattr_update;
1054 req->rq_oi.oi_capa = oinfo->oi_capa;
1056 lov_set_add_req(req, set);
1058 if (!set->set_count)
1059 GOTO(out_set, rc = -EIO);
1063 lov_fini_getattr_set(set);
1067 int lov_fini_destroy_set(struct lov_request_set *set)
1073 LASSERT(set->set_exp);
1074 if (set->set_completes) {
1075 /* FIXME update qos data here */
1078 lov_put_reqset(set);
1083 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1084 struct obdo *src_oa, struct lov_stripe_md *lsm,
1085 struct obd_trans_info *oti,
1086 struct lov_request_set **reqset)
1088 struct lov_request_set *set;
1089 struct lov_obd *lov = &exp->exp_obd->u.lov;
1093 OBD_ALLOC(set, sizeof(*set));
1099 set->set_oi = oinfo;
1100 set->set_oi->oi_md = lsm;
1101 set->set_oi->oi_oa = src_oa;
1103 if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1104 set->set_cookies = oti->oti_logcookies;
1106 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1107 struct lov_oinfo *loi;
1108 struct lov_request *req;
1110 loi = lsm->lsm_oinfo[i];
1111 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1112 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1113 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1117 OBD_ALLOC(req, sizeof(*req));
1119 GOTO(out_set, rc = -ENOMEM);
1122 req->rq_idx = loi->loi_ost_idx;
1124 OBDO_ALLOC(req->rq_oi.oi_oa);
1125 if (req->rq_oi.oi_oa == NULL) {
1126 OBD_FREE(req, sizeof(*req));
1127 GOTO(out_set, rc = -ENOMEM);
1129 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1130 req->rq_oi.oi_oa->o_id = loi->loi_id;
1131 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1132 lov_set_add_req(req, set);
1134 if (!set->set_count)
1135 GOTO(out_set, rc = -EIO);
1139 lov_fini_destroy_set(set);
1143 int lov_fini_setattr_set(struct lov_request_set *set)
1150 LASSERT(set->set_exp);
1151 if (set->set_completes) {
1152 rc = common_attr_done(set);
1153 /* FIXME update qos data here */
1156 lov_put_reqset(set);
1160 int lov_update_setattr_set(struct lov_request_set *set,
1161 struct lov_request *req, int rc)
1163 struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1164 struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1167 lov_update_set(set, req, rc);
1169 /* grace error on inactive ost */
1170 if (rc && !(lov->lov_tgts[req->rq_idx] &&
1171 lov->lov_tgts[req->rq_idx]->ltd_active))
1175 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1176 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1177 req->rq_oi.oi_oa->o_ctime;
1178 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1179 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1180 req->rq_oi.oi_oa->o_mtime;
1181 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1182 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1183 req->rq_oi.oi_oa->o_atime;
1189 /* The callback for osc_setattr_async that finilizes a request info when a
1190 * response is received. */
1191 static int cb_setattr_update(void *cookie, int rc)
1193 struct obd_info *oinfo = cookie;
1194 struct lov_request *lovreq;
1195 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1196 return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1199 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1200 struct obd_trans_info *oti,
1201 struct lov_request_set **reqset)
1203 struct lov_request_set *set;
1204 struct lov_obd *lov = &exp->exp_obd->u.lov;
1208 OBD_ALLOC(set, sizeof(*set));
1215 set->set_oi = oinfo;
1216 if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1217 set->set_cookies = oti->oti_logcookies;
1219 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1220 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1221 struct lov_request *req;
1223 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1224 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1225 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1229 OBD_ALLOC(req, sizeof(*req));
1231 GOTO(out_set, rc = -ENOMEM);
1233 req->rq_idx = loi->loi_ost_idx;
1235 OBDO_ALLOC(req->rq_oi.oi_oa);
1236 if (req->rq_oi.oi_oa == NULL) {
1237 OBD_FREE(req, sizeof(*req));
1238 GOTO(out_set, rc = -ENOMEM);
1240 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1241 sizeof(*req->rq_oi.oi_oa));
1242 req->rq_oi.oi_oa->o_id = loi->loi_id;
1243 req->rq_oi.oi_oa->o_seq= loi->loi_seq;
1244 req->rq_oi.oi_oa->o_stripe_idx = i;
1245 req->rq_oi.oi_cb_up = cb_setattr_update;
1246 req->rq_oi.oi_capa = oinfo->oi_capa;
1248 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1249 int off = lov_stripe_offset(oinfo->oi_md,
1250 oinfo->oi_oa->o_size, i,
1251 &req->rq_oi.oi_oa->o_size);
1253 if (off < 0 && req->rq_oi.oi_oa->o_size)
1254 req->rq_oi.oi_oa->o_size--;
1256 CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1257 i, req->rq_oi.oi_oa->o_size,
1258 oinfo->oi_oa->o_size);
1260 lov_set_add_req(req, set);
1262 if (!set->set_count)
1263 GOTO(out_set, rc = -EIO);
1267 lov_fini_setattr_set(set);
1271 int lov_fini_punch_set(struct lov_request_set *set)
1278 LASSERT(set->set_exp);
1279 if (set->set_completes) {
1281 /* FIXME update qos data here */
1282 if (set->set_success)
1283 rc = common_attr_done(set);
1286 lov_put_reqset(set);
1291 int lov_update_punch_set(struct lov_request_set *set,
1292 struct lov_request *req, int rc)
1294 struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1295 struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1298 lov_update_set(set, req, rc);
1300 /* grace error on inactive ost */
1301 if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1305 lov_stripe_lock(lsm);
1306 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1307 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1308 req->rq_oi.oi_oa->o_blocks;
1311 /* Do we need to update lvb_size here? It needn't because
1312 * it have been done in ll_truncate(). -jay */
1313 lov_stripe_unlock(lsm);
1319 /* The callback for osc_punch that finilizes a request info when a response
1321 static int cb_update_punch(void *cookie, int rc)
1323 struct obd_info *oinfo = cookie;
1324 struct lov_request *lovreq;
1325 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1326 return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1329 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1330 struct obd_trans_info *oti,
1331 struct lov_request_set **reqset)
1333 struct lov_request_set *set;
1334 struct lov_obd *lov = &exp->exp_obd->u.lov;
1338 OBD_ALLOC(set, sizeof(*set));
1343 set->set_oi = oinfo;
1346 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1347 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1348 struct lov_request *req;
1351 if (!lov_stripe_intersects(oinfo->oi_md, i,
1352 oinfo->oi_policy.l_extent.start,
1353 oinfo->oi_policy.l_extent.end,
1357 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1358 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1359 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1360 GOTO(out_set, rc = -EIO);
1363 OBD_ALLOC(req, sizeof(*req));
1365 GOTO(out_set, rc = -ENOMEM);
1367 req->rq_idx = loi->loi_ost_idx;
1369 OBDO_ALLOC(req->rq_oi.oi_oa);
1370 if (req->rq_oi.oi_oa == NULL) {
1371 OBD_FREE(req, sizeof(*req));
1372 GOTO(out_set, rc = -ENOMEM);
1374 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1375 sizeof(*req->rq_oi.oi_oa));
1376 req->rq_oi.oi_oa->o_id = loi->loi_id;
1377 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1378 req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
1380 req->rq_oi.oi_oa->o_stripe_idx = i;
1381 req->rq_oi.oi_cb_up = cb_update_punch;
1383 req->rq_oi.oi_policy.l_extent.start = rs;
1384 req->rq_oi.oi_policy.l_extent.end = re;
1385 req->rq_oi.oi_policy.l_extent.gid = -1;
1387 req->rq_oi.oi_capa = oinfo->oi_capa;
1389 lov_set_add_req(req, set);
1391 if (!set->set_count)
1392 GOTO(out_set, rc = -EIO);
1396 lov_fini_punch_set(set);
1400 int lov_fini_sync_set(struct lov_request_set *set)
1407 LASSERT(set->set_exp);
1408 if (set->set_completes) {
1409 if (!set->set_success)
1411 /* FIXME update qos data here */
1414 lov_put_reqset(set);
1419 /* The callback for osc_sync that finilizes a request info when a
1420 * response is recieved. */
1421 static int cb_sync_update(void *cookie, int rc)
1423 struct obd_info *oinfo = cookie;
1424 struct lov_request *lovreq;
1426 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1427 return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
1430 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1431 obd_off start, obd_off end,
1432 struct lov_request_set **reqset)
1434 struct lov_request_set *set;
1435 struct lov_obd *lov = &exp->exp_obd->u.lov;
1445 set->set_oi = oinfo;
1447 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1448 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1449 struct lov_request *req;
1452 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1453 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1454 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1458 if (!lov_stripe_intersects(oinfo->oi_md, i, start, end, &rs,
1464 GOTO(out_set, rc = -ENOMEM);
1466 req->rq_idx = loi->loi_ost_idx;
1468 OBDO_ALLOC(req->rq_oi.oi_oa);
1469 if (req->rq_oi.oi_oa == NULL) {
1470 OBD_FREE(req, sizeof(*req));
1471 GOTO(out_set, rc = -ENOMEM);
1473 *req->rq_oi.oi_oa = *oinfo->oi_oa;
1474 req->rq_oi.oi_oa->o_id = loi->loi_id;
1475 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1476 req->rq_oi.oi_oa->o_stripe_idx = i;
1478 req->rq_oi.oi_policy.l_extent.start = rs;
1479 req->rq_oi.oi_policy.l_extent.end = re;
1480 req->rq_oi.oi_policy.l_extent.gid = -1;
1481 req->rq_oi.oi_cb_up = cb_sync_update;
1483 lov_set_add_req(req, set);
1485 if (!set->set_count)
1486 GOTO(out_set, rc = -EIO);
1490 lov_fini_sync_set(set);
1494 #define LOV_U64_MAX ((__u64)~0ULL)
1495 #define LOV_SUM_MAX(tot, add) \
1497 if ((tot) + (add) < (tot)) \
1498 (tot) = LOV_U64_MAX; \
1503 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1508 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0);
1510 if (osfs->os_files != LOV_U64_MAX)
1511 do_div(osfs->os_files, expected_stripes);
1512 if (osfs->os_ffree != LOV_U64_MAX)
1513 do_div(osfs->os_ffree, expected_stripes);
1515 cfs_spin_lock(&obd->obd_osfs_lock);
1516 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1517 obd->obd_osfs_age = cfs_time_current_64();
1518 cfs_spin_unlock(&obd->obd_osfs_lock);
1525 int lov_fini_statfs_set(struct lov_request_set *set)
1533 if (set->set_completes) {
1534 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1537 lov_put_reqset(set);
1541 void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
1544 int shift = 0, quit = 0;
1548 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1550 if (osfs->os_bsize != lov_sfs->os_bsize) {
1551 /* assume all block sizes are always powers of 2 */
1552 /* get the bits difference */
1553 tmp = osfs->os_bsize | lov_sfs->os_bsize;
1554 for (shift = 0; shift <= 64; ++shift) {
1566 if (osfs->os_bsize < lov_sfs->os_bsize) {
1567 osfs->os_bsize = lov_sfs->os_bsize;
1569 osfs->os_bfree >>= shift;
1570 osfs->os_bavail >>= shift;
1571 osfs->os_blocks >>= shift;
1572 } else if (shift != 0) {
1573 lov_sfs->os_bfree >>= shift;
1574 lov_sfs->os_bavail >>= shift;
1575 lov_sfs->os_blocks >>= shift;
1578 /* Sandia requested that df (and so, statfs) only
1579 returned minimal available space on
1580 a single OST, so people would be able to
1581 write this much data guaranteed. */
1582 if (osfs->os_bavail > lov_sfs->os_bavail) {
1583 /* Presumably if new bavail is smaller,
1584 new bfree is bigger as well */
1585 osfs->os_bfree = lov_sfs->os_bfree;
1586 osfs->os_bavail = lov_sfs->os_bavail;
1589 osfs->os_bfree += lov_sfs->os_bfree;
1590 osfs->os_bavail += lov_sfs->os_bavail;
1592 osfs->os_blocks += lov_sfs->os_blocks;
1593 /* XXX not sure about this one - depends on policy.
1594 * - could be minimum if we always stripe on all OBDs
1595 * (but that would be wrong for any other policy,
1596 * if one of the OBDs has no more objects left)
1597 * - could be sum if we stripe whole objects
1598 * - could be average, just to give a nice number
1600 * To give a "reasonable" (if not wholly accurate)
1601 * number, we divide the total number of free objects
1602 * by expected stripe count (watch out for overflow).
1604 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1605 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1609 /* The callback for osc_statfs_async that finilizes a request info when a
1610 * response is received. */
1611 static int cb_statfs_update(void *cookie, int rc)
1613 struct obd_info *oinfo = cookie;
1614 struct lov_request *lovreq;
1615 struct obd_statfs *osfs, *lov_sfs;
1616 struct lov_obd *lov;
1617 struct lov_tgt_desc *tgt;
1618 struct obd_device *lovobd, *tgtobd;
1622 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1623 lovobd = lovreq->rq_rqset->set_obd;
1624 lov = &lovobd->u.lov;
1625 osfs = lovreq->rq_rqset->set_oi->oi_osfs;
1626 lov_sfs = oinfo->oi_osfs;
1627 success = lovreq->rq_rqset->set_success;
1628 /* XXX: the same is done in lov_update_common_set, however
1629 lovset->set_exp is not initialized. */
1630 lov_update_set(lovreq->rq_rqset, lovreq, rc);
1635 tgt = lov->lov_tgts[lovreq->rq_idx];
1636 if (!tgt || !tgt->ltd_active)
1637 GOTO(out_update, rc);
1639 tgtobd = class_exp2obd(tgt->ltd_exp);
1640 cfs_spin_lock(&tgtobd->obd_osfs_lock);
1641 memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1642 if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
1643 tgtobd->obd_osfs_age = cfs_time_current_64();
1644 cfs_spin_unlock(&tgtobd->obd_osfs_lock);
1647 lov_update_statfs(osfs, lov_sfs, success);
1652 if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
1653 lov_finished_set(lovreq->rq_rqset)) {
1654 lov_statfs_interpret(NULL, lovreq->rq_rqset,
1655 lovreq->rq_rqset->set_success !=
1656 lovreq->rq_rqset->set_count);
1657 qos_statfs_done(lov);
1663 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1664 struct lov_request_set **reqset)
1666 struct lov_request_set *set;
1667 struct lov_obd *lov = &obd->u.lov;
1671 OBD_ALLOC(set, sizeof(*set));
1677 set->set_oi = oinfo;
1679 /* We only get block data from the OBD */
1680 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1681 struct lov_request *req;
1683 if (!lov->lov_tgts[i] || (!lov->lov_tgts[i]->ltd_active
1684 && (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
1685 CDEBUG(D_HA, "lov idx %d inactive\n", i);
1689 /* skip targets that have been explicitely disabled by the
1691 if (!lov->lov_tgts[i]->ltd_exp) {
1692 CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
1696 OBD_ALLOC(req, sizeof(*req));
1698 GOTO(out_set, rc = -ENOMEM);
1700 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1701 if (req->rq_oi.oi_osfs == NULL) {
1702 OBD_FREE(req, sizeof(*req));
1703 GOTO(out_set, rc = -ENOMEM);
1707 req->rq_oi.oi_cb_up = cb_statfs_update;
1708 req->rq_oi.oi_flags = oinfo->oi_flags;
1710 lov_set_add_req(req, set);
1712 if (!set->set_count)
1713 GOTO(out_set, rc = -EIO);
1717 lov_fini_statfs_set(set);