1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
6 * This file is part of the Lustre file system, http://www.lustre.org
7 * Lustre is a trademark of Cluster File Systems, Inc.
9 * You may have signed or agreed to another license before downloading
10 * this software. If so, you are bound by the terms and conditions
11 * of that agreement, and the following does not apply to you. See the
12 * LICENSE file included with this distribution for more information.
14 * If you did not agree to a different license, then this copy of Lustre
15 * is open source software; you can redistribute it and/or modify it
16 * under the terms of version 2 of the GNU General Public License as
17 * published by the Free Software Foundation.
19 * In either case, Lustre is distributed in the hope that it will be
20 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22 * license text for more details.
26 # define EXPORT_SYMTAB
28 #define DEBUG_SUBSYSTEM S_LOV
31 #include <libcfs/libcfs.h>
33 #include <liblustre.h>
36 #include <obd_class.h>
38 #include <lustre/lustre_idl.h>
40 #include "lov_internal.h"
42 static void lov_init_set(struct lov_request_set *set)
45 set->set_completes = 0;
48 CFS_INIT_LIST_HEAD(&set->set_list);
49 atomic_set(&set->set_refcount, 1);
52 static void lov_finish_set(struct lov_request_set *set)
54 struct list_head *pos, *n;
58 list_for_each_safe(pos, n, &set->set_list) {
59 struct lov_request *req = list_entry(pos, struct lov_request,
61 list_del_init(&req->rq_link);
64 OBDO_FREE(req->rq_oi.oi_oa);
66 OBD_FREE(req->rq_oi.oi_md, req->rq_buflen);
67 if (req->rq_oi.oi_osfs)
68 OBD_FREE(req->rq_oi.oi_osfs,
69 sizeof(*req->rq_oi.oi_osfs));
70 OBD_FREE(req, sizeof(*req));
74 int len = set->set_oabufs * sizeof(*set->set_pga);
75 OBD_FREE(set->set_pga, len);
78 lov_llh_put(set->set_lockh);
80 OBD_FREE(set, sizeof(*set));
84 void lov_update_set(struct lov_request_set *set,
85 struct lov_request *req, int rc)
95 int lov_update_common_set(struct lov_request_set *set,
96 struct lov_request *req, int rc)
98 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
101 lov_update_set(set, req, rc);
103 /* grace error on inactive ost */
104 if (rc && !(lov->lov_tgts[req->rq_idx] &&
105 lov->lov_tgts[req->rq_idx]->ltd_active))
108 /* FIXME in raid1 regime, should return 0 */
112 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
114 list_add_tail(&req->rq_link, &set->set_list);
118 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
120 struct lov_request_set *set = req->rq_rqset;
121 struct lustre_handle *lov_lockhp;
122 struct lov_oinfo *loi;
125 LASSERT(set != NULL);
126 LASSERT(set->set_oi != NULL);
128 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
129 loi = set->set_oi->oi_md->lsm_oinfo[req->rq_stripe];
131 /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
132 * and that copy can be arbitrarily out of date.
134 * The LOV API is due for a serious rewriting anyways, and this
135 * can be addressed then. */
137 if (rc == ELDLM_OK) {
138 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
141 LASSERT(lock != NULL);
142 lov_stripe_lock(set->set_oi->oi_md);
143 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
144 tmp = loi->loi_lvb.lvb_size;
145 /* Extend KMS up to the end of this lock and no further
146 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
147 if (tmp > lock->l_policy_data.l_extent.end)
148 tmp = lock->l_policy_data.l_extent.end + 1;
149 if (tmp >= loi->loi_kms) {
150 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
151 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
153 loi->loi_kms_valid = 1;
155 LDLM_DEBUG(lock, "lock acquired, setting rss="
156 LPU64"; leaving kms="LPU64", end="LPU64,
157 loi->loi_lvb.lvb_size, loi->loi_kms,
158 lock->l_policy_data.l_extent.end);
160 lov_stripe_unlock(set->set_oi->oi_md);
161 ldlm_lock_allow_match(lock);
163 } else if ((rc == ELDLM_LOCK_ABORTED) &&
164 (set->set_oi->oi_flags & LDLM_FL_HAS_INTENT)) {
165 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
166 lov_stripe_lock(set->set_oi->oi_md);
167 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
168 lov_stripe_unlock(set->set_oi->oi_md);
169 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
170 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
173 struct obd_export *exp = set->set_exp;
174 struct lov_obd *lov = &exp->exp_obd->u.lov;
176 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
177 if (lov->lov_tgts[req->rq_idx] &&
178 lov->lov_tgts[req->rq_idx]->ltd_active) {
179 /* -EUSERS used by OST to report file contention */
180 if (rc != -EINTR && rc != -EUSERS)
181 CERROR("enqueue objid "LPX64" subobj "
182 LPX64" on OST idx %d: rc %d\n",
183 set->set_oi->oi_md->lsm_object_id,
184 loi->loi_id, loi->loi_ost_idx, rc);
189 lov_update_set(set, req, rc);
193 /* The callback for osc_enqueue that updates lov info for every OSC request. */
194 static int cb_update_enqueue(struct obd_info *oinfo, int rc)
196 struct ldlm_enqueue_info *einfo;
197 struct lov_request *lovreq;
199 lovreq = container_of(oinfo, struct lov_request, rq_oi);
200 einfo = lovreq->rq_rqset->set_ei;
201 return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
204 static int enqueue_done(struct lov_request_set *set, __u32 mode)
206 struct lov_request *req;
207 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
211 /* enqueue/match success, just return */
212 if (set->set_completes && set->set_completes == set->set_success)
215 /* cancel enqueued/matched locks */
216 list_for_each_entry(req, &set->set_list, rq_link) {
217 struct lustre_handle *lov_lockhp;
219 if (!req->rq_complete || req->rq_rc)
222 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
224 if (!lustre_handle_is_used(lov_lockhp))
227 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
228 req->rq_oi.oi_md, mode, lov_lockhp);
229 if (rc && lov->lov_tgts[req->rq_idx] &&
230 lov->lov_tgts[req->rq_idx]->ltd_active)
231 CERROR("cancelling obdjid "LPX64" on OST "
232 "idx %d error: rc = %d\n",
233 req->rq_oi.oi_md->lsm_object_id,
237 lov_llh_put(set->set_lockh);
241 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
242 struct ptlrpc_request_set *rqset)
249 LASSERT(set->set_exp);
250 /* Do enqueue_done only for sync requests and if any request
254 set->set_completes = 0;
255 ret = enqueue_done(set, mode);
256 } else if (set->set_lockh)
257 lov_llh_put(set->set_lockh);
259 if (atomic_dec_and_test(&set->set_refcount))
262 RETURN(rc ? rc : ret);
265 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
266 struct ldlm_enqueue_info *einfo,
267 struct lov_request_set **reqset)
269 struct lov_obd *lov = &exp->exp_obd->u.lov;
270 struct lov_request_set *set;
272 struct lov_oinfo *loi;
275 OBD_ALLOC(set, sizeof(*set));
283 set->set_lockh = lov_llh_new(oinfo->oi_md);
284 if (set->set_lockh == NULL)
285 GOTO(out_set, rc = -ENOMEM);
286 oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
288 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
289 struct lov_request *req;
292 loi = oinfo->oi_md->lsm_oinfo[i];
293 if (!lov_stripe_intersects(oinfo->oi_md, i,
294 oinfo->oi_policy.l_extent.start,
295 oinfo->oi_policy.l_extent.end,
299 if (!lov->lov_tgts[loi->loi_ost_idx] ||
300 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
301 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
305 OBD_ALLOC(req, sizeof(*req));
307 GOTO(out_set, rc = -ENOMEM);
309 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
310 sizeof(struct lov_oinfo *) +
311 sizeof(struct lov_oinfo);
312 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
313 if (req->rq_oi.oi_md == NULL) {
314 OBD_FREE(req, sizeof(*req));
315 GOTO(out_set, rc = -ENOMEM);
317 req->rq_oi.oi_md->lsm_oinfo[0] =
318 ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
319 sizeof(struct lov_oinfo *);
323 /* Set lov request specific parameters. */
324 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
325 req->rq_oi.oi_cb_up = cb_update_enqueue;
326 req->rq_oi.oi_flags = oinfo->oi_flags;
328 LASSERT(req->rq_oi.oi_lockh);
330 req->rq_oi.oi_policy.l_extent.gid =
331 oinfo->oi_policy.l_extent.gid;
332 req->rq_oi.oi_policy.l_extent.start = start;
333 req->rq_oi.oi_policy.l_extent.end = end;
335 req->rq_idx = loi->loi_ost_idx;
338 /* XXX LOV STACKING: submd should be from the subobj */
339 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
340 req->rq_oi.oi_md->lsm_stripe_count = 0;
341 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
343 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
344 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
346 lov_set_add_req(req, set);
349 GOTO(out_set, rc = -EIO);
353 lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
357 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
367 lov_update_set(set, req, ret);
371 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
378 LASSERT(set->set_exp);
379 rc = enqueue_done(set, mode);
380 if ((set->set_count == set->set_success) &&
381 (flags & LDLM_FL_TEST_LOCK))
382 lov_llh_put(set->set_lockh);
384 if (atomic_dec_and_test(&set->set_refcount))
390 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
391 struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
392 __u32 mode, struct lustre_handle *lockh,
393 struct lov_request_set **reqset)
395 struct lov_obd *lov = &exp->exp_obd->u.lov;
396 struct lov_request_set *set;
398 struct lov_oinfo *loi;
401 OBD_ALLOC(set, sizeof(*set));
408 set->set_oi->oi_md = lsm;
409 set->set_lockh = lov_llh_new(lsm);
410 if (set->set_lockh == NULL)
411 GOTO(out_set, rc = -ENOMEM);
412 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
414 for (i = 0; i < lsm->lsm_stripe_count; i++){
415 struct lov_request *req;
418 loi = lsm->lsm_oinfo[i];
419 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
420 policy->l_extent.end, &start, &end))
423 /* FIXME raid1 should grace this error */
424 if (!lov->lov_tgts[loi->loi_ost_idx] ||
425 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
426 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
427 GOTO(out_set, rc = -EIO);
430 OBD_ALLOC(req, sizeof(*req));
432 GOTO(out_set, rc = -ENOMEM);
434 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
435 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
436 if (req->rq_oi.oi_md == NULL) {
437 OBD_FREE(req, sizeof(*req));
438 GOTO(out_set, rc = -ENOMEM);
441 req->rq_oi.oi_policy.l_extent.start = start;
442 req->rq_oi.oi_policy.l_extent.end = end;
443 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
445 req->rq_idx = loi->loi_ost_idx;
448 /* XXX LOV STACKING: submd should be from the subobj */
449 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
450 req->rq_oi.oi_md->lsm_stripe_count = 0;
452 lov_set_add_req(req, set);
455 GOTO(out_set, rc = -EIO);
459 lov_fini_match_set(set, mode, 0);
463 int lov_fini_cancel_set(struct lov_request_set *set)
471 LASSERT(set->set_exp);
473 lov_llh_put(set->set_lockh);
475 if (atomic_dec_and_test(&set->set_refcount))
481 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
482 struct lov_stripe_md *lsm, __u32 mode,
483 struct lustre_handle *lockh,
484 struct lov_request_set **reqset)
486 struct lov_request_set *set;
488 struct lov_oinfo *loi;
491 OBD_ALLOC(set, sizeof(*set));
498 set->set_oi->oi_md = lsm;
499 set->set_lockh = lov_handle2llh(lockh);
500 if (set->set_lockh == NULL) {
501 CERROR("LOV: invalid lov lock handle %p\n", lockh);
502 GOTO(out_set, rc = -EINVAL);
504 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
506 for (i = 0; i < lsm->lsm_stripe_count; i++){
507 struct lov_request *req;
508 struct lustre_handle *lov_lockhp;
510 loi = lsm->lsm_oinfo[i];
511 lov_lockhp = set->set_lockh->llh_handles + i;
512 if (!lustre_handle_is_used(lov_lockhp)) {
513 CDEBUG(D_RPCTRACE,"lov idx %d subobj "LPX64" no lock\n",
514 loi->loi_ost_idx, loi->loi_id);
518 OBD_ALLOC(req, sizeof(*req));
520 GOTO(out_set, rc = -ENOMEM);
522 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
523 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
524 if (req->rq_oi.oi_md == NULL) {
525 OBD_FREE(req, sizeof(*req));
526 GOTO(out_set, rc = -ENOMEM);
529 req->rq_idx = loi->loi_ost_idx;
532 /* XXX LOV STACKING: submd should be from the subobj */
533 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
534 req->rq_oi.oi_md->lsm_stripe_count = 0;
536 lov_set_add_req(req, set);
539 GOTO(out_set, rc = -EIO);
543 lov_fini_cancel_set(set);
547 static int create_done(struct obd_export *exp, struct lov_request_set *set,
548 struct lov_stripe_md **lsmp)
550 struct lov_obd *lov = &exp->exp_obd->u.lov;
551 struct obd_trans_info *oti = set->set_oti;
552 struct obdo *src_oa = set->set_oi->oi_oa;
553 struct lov_request *req;
554 struct obdo *ret_oa = NULL;
555 int attrset = 0, rc = 0;
558 LASSERT(set->set_completes);
560 /* try alloc objects on other osts if osc_create fails for
561 * exceptions: RPC failure, ENOSPC, etc */
562 if (set->set_count != set->set_success) {
563 list_for_each_entry (req, &set->set_list, rq_link) {
567 set->set_completes--;
568 req->rq_complete = 0;
570 rc = qos_remedy_create(set, req);
571 lov_update_create_set(set, req, rc);
578 /* no successful creates */
579 if (set->set_success == 0)
582 /* If there was an explicit stripe set, fail. Otherwise, we
583 * got some objects and that's not bad. */
584 if (set->set_count != set->set_success) {
587 set->set_count = set->set_success;
593 GOTO(cleanup, rc = -ENOMEM);
595 list_for_each_entry(req, &set->set_list, rq_link) {
596 if (!req->rq_complete || req->rq_rc)
598 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
599 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
600 req->rq_stripe, &attrset);
602 if (src_oa->o_valid & OBD_MD_FLSIZE &&
603 ret_oa->o_size != src_oa->o_size) {
604 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
605 src_oa->o_size, ret_oa->o_size);
608 ret_oa->o_id = src_oa->o_id;
609 memcpy(src_oa, ret_oa, sizeof(*src_oa));
612 *lsmp = set->set_oi->oi_md;
616 list_for_each_entry(req, &set->set_list, rq_link) {
617 struct obd_export *sub_exp;
620 if (!req->rq_complete || req->rq_rc)
623 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
624 err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL);
626 CERROR("Failed to uncreate objid "LPX64" subobj "
627 LPX64" on OST idx %d: rc = %d\n",
628 src_oa->o_id, req->rq_oi.oi_oa->o_id,
632 obd_free_memmd(exp, &set->set_oi->oi_md);
634 if (oti && set->set_cookies) {
635 oti->oti_logcookies = set->set_cookies;
636 if (!set->set_cookie_sent) {
637 oti_free_cookies(oti);
638 src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
640 src_oa->o_valid |= OBD_MD_FLCOOKIE;
646 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
653 LASSERT(set->set_exp);
654 if (set->set_completes)
655 rc = create_done(set->set_exp, set, lsmp);
657 if (atomic_dec_and_test(&set->set_refcount))
663 int lov_update_create_set(struct lov_request_set *set,
664 struct lov_request *req, int rc)
666 struct obd_trans_info *oti = set->set_oti;
667 struct lov_stripe_md *lsm = set->set_oi->oi_md;
668 struct lov_oinfo *loi;
669 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
672 req->rq_stripe = set->set_success;
673 loi = lsm->lsm_oinfo[req->rq_stripe];
675 if (rc && lov->lov_tgts[req->rq_idx] &&
676 lov->lov_tgts[req->rq_idx]->ltd_active) {
677 CERROR("error creating fid "LPX64" sub-object"
678 " on OST idx %d/%d: rc = %d\n",
679 set->set_oi->oi_oa->o_id, req->rq_idx,
680 lsm->lsm_stripe_count, rc);
682 CERROR("obd_create returned invalid err %d\n", rc);
686 lov_update_set(set, req, rc);
690 if (oti && oti->oti_objid)
691 oti->oti_objid[req->rq_idx] = req->rq_oi.oi_oa->o_id;
693 loi->loi_id = req->rq_oi.oi_oa->o_id;
694 loi->loi_ost_idx = req->rq_idx;
695 CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
696 lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
699 if (oti && set->set_cookies)
700 ++oti->oti_logcookies;
701 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
702 set->set_cookie_sent++;
707 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
708 struct lov_stripe_md **lsmp, struct obdo *src_oa,
709 struct obd_trans_info *oti,
710 struct lov_request_set **reqset)
712 struct lov_request_set *set;
716 OBD_ALLOC(set, sizeof(*set));
723 set->set_oi->oi_md = *lsmp;
724 set->set_oi->oi_oa = src_oa;
727 rc = qos_prep_create(exp, set);
729 lov_fini_create_set(set, lsmp);
735 static int common_attr_done(struct lov_request_set *set)
737 struct list_head *pos;
738 struct lov_request *req;
740 int rc = 0, attrset = 0;
743 LASSERT(set->set_oi != NULL);
745 if (set->set_oi->oi_oa == NULL)
748 if (!set->set_success)
753 GOTO(out, rc = -ENOMEM);
755 list_for_each (pos, &set->set_list) {
756 req = list_entry(pos, struct lov_request, rq_link);
758 if (!req->rq_complete || req->rq_rc)
760 if (req->rq_oi.oi_oa->o_valid == 0) /* inactive stripe */
762 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
763 req->rq_oi.oi_oa->o_valid,
764 set->set_oi->oi_md, req->rq_stripe, &attrset);
767 CERROR("No stripes had valid attrs\n");
770 tmp_oa->o_id = set->set_oi->oi_oa->o_id;
771 memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
779 static int brw_done(struct lov_request_set *set)
781 struct lov_stripe_md *lsm = set->set_oi->oi_md;
782 struct lov_oinfo *loi = NULL;
783 struct list_head *pos;
784 struct lov_request *req;
787 list_for_each (pos, &set->set_list) {
788 req = list_entry(pos, struct lov_request, rq_link);
790 if (!req->rq_complete || req->rq_rc)
793 loi = lsm->lsm_oinfo[req->rq_stripe];
795 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
796 loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
802 int lov_fini_brw_set(struct lov_request_set *set)
809 LASSERT(set->set_exp);
810 if (set->set_completes) {
812 /* FIXME update qos data here */
814 if (atomic_dec_and_test(&set->set_refcount))
820 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
821 obd_count oa_bufs, struct brw_page *pga,
822 struct obd_trans_info *oti,
823 struct lov_request_set **reqset)
830 struct lov_request_set *set;
831 struct lov_oinfo *loi = NULL;
832 struct lov_obd *lov = &exp->exp_obd->u.lov;
833 int rc = 0, i, shift;
836 OBD_ALLOC(set, sizeof(*set));
844 set->set_oabufs = oa_bufs;
845 OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
847 GOTO(out, rc = -ENOMEM);
849 OBD_ALLOC(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
851 GOTO(out, rc = -ENOMEM);
853 /* calculate the page count for each stripe */
854 for (i = 0; i < oa_bufs; i++) {
855 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
856 info[stripe].count++;
859 /* alloc and initialize lov request */
861 for (i = 0 ; i < oinfo->oi_md->lsm_stripe_count; i++){
862 struct lov_request *req;
864 if (info[i].count == 0)
867 loi = oinfo->oi_md->lsm_oinfo[i];
868 if (!lov->lov_tgts[loi->loi_ost_idx] ||
869 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
870 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
871 GOTO(out, rc = -EIO);
874 OBD_ALLOC(req, sizeof(*req));
876 GOTO(out, rc = -ENOMEM);
878 OBDO_ALLOC(req->rq_oi.oi_oa);
879 if (req->rq_oi.oi_oa == NULL) {
880 OBD_FREE(req, sizeof(*req));
881 GOTO(out, rc = -ENOMEM);
885 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
886 sizeof(*req->rq_oi.oi_oa));
888 req->rq_oi.oi_oa->o_id = loi->loi_id;
889 req->rq_oi.oi_oa->o_stripe_idx = i;
891 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
892 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
893 if (req->rq_oi.oi_md == NULL) {
894 OBDO_FREE(req->rq_oi.oi_oa);
895 OBD_FREE(req, sizeof(*req));
896 GOTO(out, rc = -ENOMEM);
899 req->rq_idx = loi->loi_ost_idx;
902 /* XXX LOV STACKING */
903 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
904 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
905 req->rq_oabufs = info[i].count;
906 req->rq_pgaidx = shift;
907 shift += req->rq_oabufs;
909 /* remember the index for sort brw_page array */
910 info[i].index = req->rq_pgaidx;
912 lov_set_add_req(req, set);
915 GOTO(out, rc = -EIO);
917 /* rotate & sort the brw_page array */
918 for (i = 0; i < oa_bufs; i++) {
919 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
921 shift = info[stripe].index + info[stripe].off;
922 LASSERT(shift < oa_bufs);
923 set->set_pga[shift] = pga[i];
924 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
925 &set->set_pga[shift].off);
930 OBD_FREE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
935 lov_fini_brw_set(set);
940 int lov_fini_getattr_set(struct lov_request_set *set)
947 LASSERT(set->set_exp);
948 if (set->set_completes)
949 rc = common_attr_done(set);
951 if (atomic_dec_and_test(&set->set_refcount))
957 /* The callback for osc_getattr_async that finilizes a request info when a
958 * response is recieved. */
959 static int cb_getattr_update(struct obd_info *oinfo, int rc)
961 struct lov_request *lovreq;
962 lovreq = container_of(oinfo, struct lov_request, rq_oi);
963 return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
966 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
967 struct lov_request_set **reqset)
969 struct lov_request_set *set;
970 struct lov_oinfo *loi = NULL;
971 struct lov_obd *lov = &exp->exp_obd->u.lov;
975 OBD_ALLOC(set, sizeof(*set));
983 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
984 struct lov_request *req;
986 loi = oinfo->oi_md->lsm_oinfo[i];
987 if (!lov->lov_tgts[loi->loi_ost_idx] ||
988 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
989 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
993 OBD_ALLOC(req, sizeof(*req));
995 GOTO(out_set, rc = -ENOMEM);
998 req->rq_idx = loi->loi_ost_idx;
1000 OBDO_ALLOC(req->rq_oi.oi_oa);
1001 if (req->rq_oi.oi_oa == NULL) {
1002 OBD_FREE(req, sizeof(*req));
1003 GOTO(out_set, rc = -ENOMEM);
1005 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1006 sizeof(*req->rq_oi.oi_oa));
1007 req->rq_oi.oi_oa->o_id = loi->loi_id;
1008 req->rq_oi.oi_cb_up = cb_getattr_update;
1010 lov_set_add_req(req, set);
1012 if (!set->set_count)
1013 GOTO(out_set, rc = -EIO);
1017 lov_fini_getattr_set(set);
1021 int lov_fini_destroy_set(struct lov_request_set *set)
1027 LASSERT(set->set_exp);
1028 if (set->set_completes) {
1029 /* FIXME update qos data here */
1032 if (atomic_dec_and_test(&set->set_refcount))
1033 lov_finish_set(set);
1038 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1039 struct obdo *src_oa, struct lov_stripe_md *lsm,
1040 struct obd_trans_info *oti,
1041 struct lov_request_set **reqset)
1043 struct lov_request_set *set;
1044 struct lov_oinfo *loi = NULL;
1045 struct lov_obd *lov = &exp->exp_obd->u.lov;
1049 OBD_ALLOC(set, sizeof(*set));
1055 set->set_oi = oinfo;
1056 set->set_oi->oi_md = lsm;
1057 set->set_oi->oi_oa = src_oa;
1059 if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1060 set->set_cookies = oti->oti_logcookies;
1062 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1063 struct lov_request *req;
1065 loi = lsm->lsm_oinfo[i];
1066 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1067 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1068 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1072 OBD_ALLOC(req, sizeof(*req));
1074 GOTO(out_set, rc = -ENOMEM);
1077 req->rq_idx = loi->loi_ost_idx;
1079 OBDO_ALLOC(req->rq_oi.oi_oa);
1080 if (req->rq_oi.oi_oa == NULL) {
1081 OBD_FREE(req, sizeof(*req));
1082 GOTO(out_set, rc = -ENOMEM);
1084 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1085 req->rq_oi.oi_oa->o_id = loi->loi_id;
1086 lov_set_add_req(req, set);
1088 if (!set->set_count)
1089 GOTO(out_set, rc = -EIO);
1093 lov_fini_destroy_set(set);
1097 int lov_fini_setattr_set(struct lov_request_set *set)
1104 LASSERT(set->set_exp);
1105 if (set->set_completes) {
1106 rc = common_attr_done(set);
1107 /* FIXME update qos data here */
1110 if (atomic_dec_and_test(&set->set_refcount))
1111 lov_finish_set(set);
1115 int lov_update_setattr_set(struct lov_request_set *set,
1116 struct lov_request *req, int rc)
1118 struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1119 struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1122 lov_update_set(set, req, rc);
1124 /* grace error on inactive ost */
1125 if (rc && !(lov->lov_tgts[req->rq_idx] &&
1126 lov->lov_tgts[req->rq_idx]->ltd_active))
1130 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1131 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1132 req->rq_oi.oi_oa->o_ctime;
1133 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1134 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1135 req->rq_oi.oi_oa->o_mtime;
1136 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1137 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1138 req->rq_oi.oi_oa->o_atime;
1144 /* The callback for osc_setattr_async that finilizes a request info when a
1145 * response is recieved. */
1146 static int cb_setattr_update(struct obd_info *oinfo, int rc)
1148 struct lov_request *lovreq;
1149 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1150 return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1153 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1154 struct obd_trans_info *oti,
1155 struct lov_request_set **reqset)
1157 struct lov_request_set *set;
1158 struct lov_oinfo *loi = NULL;
1159 struct lov_obd *lov = &exp->exp_obd->u.lov;
1163 OBD_ALLOC(set, sizeof(*set));
1170 set->set_oi = oinfo;
1171 if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1172 set->set_cookies = oti->oti_logcookies;
1174 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1175 struct lov_request *req;
1177 loi = oinfo->oi_md->lsm_oinfo[i];
1178 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1179 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1180 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1184 OBD_ALLOC(req, sizeof(*req));
1186 GOTO(out_set, rc = -ENOMEM);
1188 req->rq_idx = loi->loi_ost_idx;
1190 OBDO_ALLOC(req->rq_oi.oi_oa);
1191 if (req->rq_oi.oi_oa == NULL) {
1192 OBD_FREE(req, sizeof(*req));
1193 GOTO(out_set, rc = -ENOMEM);
1195 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1196 sizeof(*req->rq_oi.oi_oa));
1197 req->rq_oi.oi_oa->o_id = loi->loi_id;
1198 req->rq_oi.oi_oa->o_stripe_idx = i;
1199 req->rq_oi.oi_cb_up = cb_setattr_update;
1200 req->rq_rqset = set;
1202 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1203 int off = lov_stripe_offset(oinfo->oi_md,
1204 oinfo->oi_oa->o_size, i,
1205 &req->rq_oi.oi_oa->o_size);
1207 if (off < 0 && req->rq_oi.oi_oa->o_size)
1208 req->rq_oi.oi_oa->o_size--;
1210 CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1211 i, req->rq_oi.oi_oa->o_size,
1212 oinfo->oi_oa->o_size);
1214 lov_set_add_req(req, set);
1216 if (!set->set_count)
1217 GOTO(out_set, rc = -EIO);
1221 lov_fini_setattr_set(set);
1225 int lov_fini_punch_set(struct lov_request_set *set)
1232 LASSERT(set->set_exp);
1233 if (set->set_completes) {
1235 /* FIXME update qos data here */
1236 if (set->set_success)
1237 rc = common_attr_done(set);
1240 if (atomic_dec_and_test(&set->set_refcount))
1241 lov_finish_set(set);
1246 int lov_update_punch_set(struct lov_request_set *set,
1247 struct lov_request *req, int rc)
1249 struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1250 struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1253 lov_update_set(set, req, rc);
1255 /* grace error on inactive ost */
1256 if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1260 lov_stripe_lock(lsm);
1261 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1262 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1263 req->rq_oi.oi_oa->o_blocks;
1266 /* Do we need to update lvb_size here? It needn't because
1267 * it have been done in ll_truncate(). -jay */
1268 lov_stripe_unlock(lsm);
1274 /* The callback for osc_punch that finilizes a request info when a response
1276 static int cb_update_punch(struct obd_info *oinfo, int rc)
1278 struct lov_request *lovreq;
1279 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1280 return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1283 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1284 struct obd_trans_info *oti,
1285 struct lov_request_set **reqset)
1287 struct lov_request_set *set;
1288 struct lov_oinfo *loi = NULL;
1289 struct lov_obd *lov = &exp->exp_obd->u.lov;
1293 OBD_ALLOC(set, sizeof(*set));
1298 set->set_oi = oinfo;
1301 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1302 struct lov_request *req;
1305 loi = oinfo->oi_md->lsm_oinfo[i];
1306 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1307 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1308 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1312 if (!lov_stripe_intersects(oinfo->oi_md, i,
1313 oinfo->oi_policy.l_extent.start,
1314 oinfo->oi_policy.l_extent.end,
1318 OBD_ALLOC(req, sizeof(*req));
1320 GOTO(out_set, rc = -ENOMEM);
1322 req->rq_idx = loi->loi_ost_idx;
1324 OBDO_ALLOC(req->rq_oi.oi_oa);
1325 if (req->rq_oi.oi_oa == NULL) {
1326 OBD_FREE(req, sizeof(*req));
1327 GOTO(out_set, rc = -ENOMEM);
1329 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1330 sizeof(*req->rq_oi.oi_oa));
1331 req->rq_oi.oi_oa->o_id = loi->loi_id;
1332 req->rq_oi.oi_oa->o_stripe_idx = i;
1333 req->rq_oi.oi_cb_up = cb_update_punch;
1334 req->rq_rqset = set;
1336 req->rq_oi.oi_policy.l_extent.start = rs;
1337 req->rq_oi.oi_policy.l_extent.end = re;
1338 req->rq_oi.oi_policy.l_extent.gid = -1;
1340 lov_set_add_req(req, set);
1342 if (!set->set_count)
1343 GOTO(out_set, rc = -EIO);
1347 lov_fini_punch_set(set);
1351 int lov_fini_sync_set(struct lov_request_set *set)
1358 LASSERT(set->set_exp);
1359 if (set->set_completes) {
1360 if (!set->set_success)
1362 /* FIXME update qos data here */
1365 if (atomic_dec_and_test(&set->set_refcount))
1366 lov_finish_set(set);
1371 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1372 struct obdo *src_oa, struct lov_stripe_md *lsm,
1373 obd_off start, obd_off end,
1374 struct lov_request_set **reqset)
1376 struct lov_request_set *set;
1377 struct lov_oinfo *loi = NULL;
1378 struct lov_obd *lov = &exp->exp_obd->u.lov;
1382 OBD_ALLOC(set, sizeof(*set));
1388 set->set_oi = oinfo;
1389 set->set_oi->oi_md = lsm;
1390 set->set_oi->oi_oa = src_oa;
1392 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1393 struct lov_request *req;
1396 loi = lsm->lsm_oinfo[i];
1397 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1398 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1399 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1403 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1406 OBD_ALLOC(req, sizeof(*req));
1408 GOTO(out_set, rc = -ENOMEM);
1410 req->rq_idx = loi->loi_ost_idx;
1412 OBDO_ALLOC(req->rq_oi.oi_oa);
1413 if (req->rq_oi.oi_oa == NULL) {
1414 OBD_FREE(req, sizeof(*req));
1415 GOTO(out_set, rc = -ENOMEM);
1417 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1418 req->rq_oi.oi_oa->o_id = loi->loi_id;
1419 req->rq_oi.oi_oa->o_stripe_idx = i;
1421 req->rq_oi.oi_policy.l_extent.start = rs;
1422 req->rq_oi.oi_policy.l_extent.end = re;
1423 req->rq_oi.oi_policy.l_extent.gid = -1;
1425 lov_set_add_req(req, set);
1427 if (!set->set_count)
1428 GOTO(out_set, rc = -EIO);
1432 lov_fini_sync_set(set);
1436 #define LOV_U64_MAX ((__u64)~0ULL)
1437 #define LOV_SUM_MAX(tot, add) \
1439 if ((tot) + (add) < (tot)) \
1440 (tot) = LOV_U64_MAX; \
1445 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1450 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0);
1452 if (osfs->os_files != LOV_U64_MAX)
1453 do_div(osfs->os_files, expected_stripes);
1454 if (osfs->os_ffree != LOV_U64_MAX)
1455 do_div(osfs->os_ffree, expected_stripes);
1457 spin_lock(&obd->obd_osfs_lock);
1458 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1459 obd->obd_osfs_age = get_jiffies_64();
1460 spin_unlock(&obd->obd_osfs_lock);
1467 int lov_fini_statfs_set(struct lov_request_set *set)
1475 if (set->set_completes) {
1476 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1480 if (atomic_dec_and_test(&set->set_refcount))
1481 lov_finish_set(set);
1486 void lov_update_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1487 struct obd_statfs *lov_sfs, int success)
1489 int shift = 0, quit = 0;
1491 spin_lock(&obd->obd_osfs_lock);
1492 memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1493 obd->obd_osfs_age = get_jiffies_64();
1494 spin_unlock(&obd->obd_osfs_lock);
1497 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1499 if (osfs->os_bsize != lov_sfs->os_bsize) {
1500 /* assume all block sizes are always powers of 2 */
1501 /* get the bits difference */
1502 tmp = osfs->os_bsize | lov_sfs->os_bsize;
1503 for (shift = 0; shift <= 64; ++shift) {
1515 if (osfs->os_bsize < lov_sfs->os_bsize) {
1516 osfs->os_bsize = lov_sfs->os_bsize;
1518 osfs->os_bfree >>= shift;
1519 osfs->os_bavail >>= shift;
1520 osfs->os_blocks >>= shift;
1521 } else if (shift != 0) {
1522 lov_sfs->os_bfree >>= shift;
1523 lov_sfs->os_bavail >>= shift;
1524 lov_sfs->os_blocks >>= shift;
1527 /* Sandia requested that df (and so, statfs) only
1528 returned minimal available space on
1529 a single OST, so people would be able to
1530 write this much data guaranteed. */
1531 if (osfs->os_bavail > lov_sfs->os_bavail) {
1532 /* Presumably if new bavail is smaller,
1533 new bfree is bigger as well */
1534 osfs->os_bfree = lov_sfs->os_bfree;
1535 osfs->os_bavail = lov_sfs->os_bavail;
1538 osfs->os_bfree += lov_sfs->os_bfree;
1539 osfs->os_bavail += lov_sfs->os_bavail;
1541 osfs->os_blocks += lov_sfs->os_blocks;
1542 /* XXX not sure about this one - depends on policy.
1543 * - could be minimum if we always stripe on all OBDs
1544 * (but that would be wrong for any other policy,
1545 * if one of the OBDs has no more objects left)
1546 * - could be sum if we stripe whole objects
1547 * - could be average, just to give a nice number
1549 * To give a "reasonable" (if not wholly accurate)
1550 * number, we divide the total number of free objects
1551 * by expected stripe count (watch out for overflow).
1553 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1554 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1558 /* The callback for osc_statfs_async that finilizes a request info when a
1559 * response is recieved. */
1560 static int cb_statfs_update(struct obd_info *oinfo, int rc)
1562 struct lov_request *lovreq;
1563 struct obd_statfs *osfs, *lov_sfs;
1564 struct obd_device *obd;
1565 struct lov_obd *lov;
1569 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1570 lov = &lovreq->rq_rqset->set_obd->u.lov;
1571 obd = class_exp2obd(lov->lov_tgts[lovreq->rq_idx]->ltd_exp);
1573 osfs = lovreq->rq_rqset->set_oi->oi_osfs;
1574 lov_sfs = oinfo->oi_osfs;
1576 success = lovreq->rq_rqset->set_success;
1578 /* XXX: the same is done in lov_update_common_set, however
1579 lovset->set_exp is not initialized. */
1580 lov_update_set(lovreq->rq_rqset, lovreq, rc);
1582 if (rc && !(lov->lov_tgts[lovreq->rq_idx] &&
1583 lov->lov_tgts[lovreq->rq_idx]->ltd_active))
1588 lov_update_statfs(obd, osfs, lov_sfs, success);
1594 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1595 struct lov_request_set **reqset)
1597 struct lov_request_set *set;
1598 struct lov_obd *lov = &obd->u.lov;
1602 OBD_ALLOC(set, sizeof(*set));
1608 set->set_oi = oinfo;
1610 /* We only get block data from the OBD */
1611 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1612 struct lov_request *req;
1614 if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active) {
1615 CDEBUG(D_HA, "lov idx %d inactive\n", i);
1619 OBD_ALLOC(req, sizeof(*req));
1621 GOTO(out_set, rc = -ENOMEM);
1623 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1624 if (req->rq_oi.oi_osfs == NULL) {
1625 OBD_FREE(req, sizeof(*req));
1626 GOTO(out_set, rc = -ENOMEM);
1630 req->rq_oi.oi_cb_up = cb_statfs_update;
1631 req->rq_rqset = set;
1633 lov_set_add_req(req, set);
1635 if (!set->set_count)
1636 GOTO(out_set, rc = -EIO);
1640 lov_fini_statfs_set(set);