1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
6 * This file is part of the Lustre file system, http://www.lustre.org
7 * Lustre is a trademark of Cluster File Systems, Inc.
9 * You may have signed or agreed to another license before downloading
10 * this software. If so, you are bound by the terms and conditions
11 * of that agreement, and the following does not apply to you. See the
12 * LICENSE file included with this distribution for more information.
14 * If you did not agree to a different license, then this copy of Lustre
15 * is open source software; you can redistribute it and/or modify it
16 * under the terms of version 2 of the GNU General Public License as
17 * published by the Free Software Foundation.
19 * In either case, Lustre is distributed in the hope that it will be
20 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22 * license text for more details.
26 # define EXPORT_SYMTAB
28 #define DEBUG_SUBSYSTEM S_LOV
31 #include <asm/div64.h>
33 #include <liblustre.h>
36 #include <linux/obd_class.h>
37 #include <linux/obd_lov.h>
38 #include <linux/lustre_idl.h>
40 #include "lov_internal.h"
42 static void lov_init_set(struct lov_request_set *set)
45 set->set_completes = 0;
47 INIT_LIST_HEAD(&set->set_list);
48 atomic_set(&set->set_refcount, 1);
51 static void lov_finish_set(struct lov_request_set *set)
53 struct list_head *pos, *n;
57 list_for_each_safe(pos, n, &set->set_list) {
58 struct lov_request *req = list_entry(pos, struct lov_request,
60 list_del_init(&req->rq_link);
63 obdo_free(req->rq_oa);
65 OBD_FREE(req->rq_md, req->rq_buflen);
66 OBD_FREE(req, sizeof(*req));
70 int len = set->set_oabufs * sizeof(*set->set_pga);
71 OBD_FREE(set->set_pga, len);
74 lov_llh_put(set->set_lockh);
76 OBD_FREE(set, sizeof(*set));
80 static void lov_update_set(struct lov_request_set *set,
81 struct lov_request *req, int rc)
91 int lov_update_common_set(struct lov_request_set *set,
92 struct lov_request *req, int rc)
94 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
97 lov_update_set(set, req, rc);
99 /* grace error on inactive ost */
100 if (rc && !lov->tgts[req->rq_idx].active)
103 /* FIXME in raid1 regime, should return 0 */
107 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
109 list_add_tail(&req->rq_link, &set->set_list);
113 int lov_update_enqueue_set(struct lov_request_set *set,
114 struct lov_request *req, int rc, int flags)
116 struct lustre_handle *lov_lockhp;
117 struct lov_oinfo *loi;
120 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
121 loi = &set->set_md->lsm_oinfo[req->rq_stripe];
123 /* XXX FIXME: This unpleasantness doesn't belong here at *all*.
124 * It belongs in the OSC, except that the OSC doesn't have
125 * access to the real LOI -- it gets a copy, that we created
126 * above, and that copy can be arbitrarily out of date.
128 * The LOV API is due for a serious rewriting anyways, and this
129 * can be addressed then. */
130 if (rc == ELDLM_OK) {
131 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
132 __u64 tmp = req->rq_md->lsm_oinfo->loi_lvb.lvb_size;
134 LASSERT(lock != NULL);
135 lov_stripe_lock(set->set_md);
136 loi->loi_lvb = req->rq_md->lsm_oinfo->loi_lvb;
137 /* Extend KMS up to the end of this lock and no further
138 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
139 if (tmp > lock->l_policy_data.l_extent.end)
140 tmp = lock->l_policy_data.l_extent.end + 1;
141 if (tmp >= loi->loi_kms) {
142 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
143 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
145 loi->loi_kms_valid = 1;
147 LDLM_DEBUG(lock, "lock acquired, setting rss="
148 LPU64"; leaving kms="LPU64", end="LPU64,
149 loi->loi_lvb.lvb_size, loi->loi_kms,
150 lock->l_policy_data.l_extent.end);
152 lov_stripe_unlock(set->set_md);
153 ldlm_lock_allow_match(lock);
155 } else if (rc == ELDLM_LOCK_ABORTED && flags & LDLM_FL_HAS_INTENT) {
156 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
157 lov_stripe_lock(set->set_md);
158 loi->loi_lvb = req->rq_md->lsm_oinfo->loi_lvb;
159 lov_stripe_unlock(set->set_md);
160 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
161 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
164 struct obd_export *exp = set->set_exp;
165 struct lov_obd *lov = &exp->exp_obd->u.lov;
167 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
168 if (lov->tgts[req->rq_idx].active) {
169 CERROR("error: enqueue objid "LPX64" subobj "
170 LPX64" on OST idx %d: rc = %d\n",
171 set->set_md->lsm_object_id, loi->loi_id,
172 loi->loi_ost_idx, rc);
177 lov_update_set(set, req, rc);
181 static int enqueue_done(struct lov_request_set *set, __u32 mode)
183 struct lov_request *req;
184 struct lustre_handle *lov_lockhp = NULL;
185 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
189 LASSERT(set->set_completes);
190 /* enqueue/match success, just return */
191 if (set->set_completes == set->set_success)
194 /* cancel enqueued/matched locks */
195 list_for_each_entry(req, &set->set_list, rq_link) {
196 if (!req->rq_complete || req->rq_rc)
199 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
201 if (!lustre_handle_is_used(lov_lockhp))
204 rc = obd_cancel(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
206 if (rc && lov->tgts[req->rq_idx].active)
207 CERROR("cancelling obdjid "LPX64" on OST "
208 "idx %d error: rc = %d\n",
209 req->rq_md->lsm_object_id, req->rq_idx, rc);
211 lov_llh_put(set->set_lockh);
215 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode)
222 LASSERT(set->set_exp);
223 if (set->set_completes)
224 rc = enqueue_done(set, mode);
226 lov_llh_put(set->set_lockh);
228 if (atomic_dec_and_test(&set->set_refcount))
234 int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm,
235 ldlm_policy_data_t *policy, __u32 mode,
236 struct lustre_handle *lockh,
237 struct lov_request_set **reqset)
239 struct lov_obd *lov = &exp->exp_obd->u.lov;
240 struct lov_request_set *set;
242 struct lov_oinfo *loi;
245 OBD_ALLOC(set, sizeof(*set));
252 set->set_lockh = lov_llh_new(lsm);
253 if (set->set_lockh == NULL)
254 GOTO(out_set, rc = -ENOMEM);
255 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
257 loi = lsm->lsm_oinfo;
258 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
259 struct lov_request *req;
262 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
263 policy->l_extent.end, &start, &end))
266 if (lov->tgts[loi->loi_ost_idx].active == 0) {
267 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
271 OBD_ALLOC(req, sizeof(*req));
273 GOTO(out_set, rc = -ENOMEM);
275 req->rq_buflen = sizeof(*req->rq_md) +
276 sizeof(struct lov_oinfo);
277 OBD_ALLOC(req->rq_md, req->rq_buflen);
278 if (req->rq_md == NULL)
279 GOTO(out_set, rc = -ENOMEM);
281 req->rq_extent.start = start;
282 req->rq_extent.end = end;
283 req->rq_extent.gid = policy->l_extent.gid;
285 req->rq_idx = loi->loi_ost_idx;
288 /* XXX LOV STACKING: submd should be from the subobj */
289 req->rq_md->lsm_object_id = loi->loi_id;
290 req->rq_md->lsm_stripe_count = 0;
291 req->rq_md->lsm_oinfo->loi_kms_valid = loi->loi_kms_valid;
292 req->rq_md->lsm_oinfo->loi_kms = loi->loi_kms;
293 req->rq_md->lsm_oinfo->loi_lvb = loi->loi_lvb;
295 lov_set_add_req(req, set);
298 GOTO(out_set, rc = -EIO);
302 lov_fini_enqueue_set(set, mode);
306 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
314 lov_update_set(set, req, ret);
318 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
325 LASSERT(set->set_exp);
326 if (set->set_completes) {
327 if (set->set_count == set->set_success &&
328 flags & LDLM_FL_TEST_LOCK)
329 lov_llh_put(set->set_lockh);
330 rc = enqueue_done(set, mode);
332 lov_llh_put(set->set_lockh);
335 if (atomic_dec_and_test(&set->set_refcount))
341 int lov_prep_match_set(struct obd_export *exp, struct lov_stripe_md *lsm,
342 ldlm_policy_data_t *policy, __u32 mode,
343 struct lustre_handle *lockh,
344 struct lov_request_set **reqset)
346 struct lov_obd *lov = &exp->exp_obd->u.lov;
347 struct lov_request_set *set;
349 struct lov_oinfo *loi;
352 OBD_ALLOC(set, sizeof(*set));
359 set->set_lockh = lov_llh_new(lsm);
360 if (set->set_lockh == NULL)
361 GOTO(out_set, rc = -ENOMEM);
362 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
364 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
365 struct lov_request *req;
368 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
369 policy->l_extent.end, &start, &end))
372 /* FIXME raid1 should grace this error */
373 if (lov->tgts[loi->loi_ost_idx].active == 0) {
374 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
375 GOTO(out_set, rc = -EIO);
378 OBD_ALLOC(req, sizeof(*req));
380 GOTO(out_set, rc = -ENOMEM);
382 req->rq_buflen = sizeof(*req->rq_md);
383 OBD_ALLOC(req->rq_md, req->rq_buflen);
384 if (req->rq_md == NULL)
385 GOTO(out_set, rc = -ENOMEM);
387 req->rq_extent.start = start;
388 req->rq_extent.end = end;
389 req->rq_extent.gid = policy->l_extent.gid;
391 req->rq_idx = loi->loi_ost_idx;
394 /* XXX LOV STACKING: submd should be from the subobj */
395 req->rq_md->lsm_object_id = loi->loi_id;
396 req->rq_md->lsm_stripe_count = 0;
398 lov_set_add_req(req, set);
401 GOTO(out_set, rc = -EIO);
405 lov_fini_match_set(set, mode, 0);
409 int lov_fini_cancel_set(struct lov_request_set *set)
417 LASSERT(set->set_exp);
419 lov_llh_put(set->set_lockh);
421 if (atomic_dec_and_test(&set->set_refcount))
427 int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm,
428 __u32 mode, struct lustre_handle *lockh,
429 struct lov_request_set **reqset)
431 struct lov_request_set *set;
433 struct lov_oinfo *loi;
436 OBD_ALLOC(set, sizeof(*set));
443 set->set_lockh = lov_handle2llh(lockh);
444 if (set->set_lockh == NULL) {
445 CERROR("LOV: invalid lov lock handle %p\n", lockh);
446 GOTO(out_set, rc = -EINVAL);
448 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
450 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
451 struct lov_request *req;
452 struct lustre_handle *lov_lockhp;
454 lov_lockhp = set->set_lockh->llh_handles + i;
455 if (!lustre_handle_is_used(lov_lockhp)) {
456 CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
457 loi->loi_ost_idx, loi->loi_id);
461 OBD_ALLOC(req, sizeof(*req));
463 GOTO(out_set, rc = -ENOMEM);
465 req->rq_buflen = sizeof(*req->rq_md);
466 OBD_ALLOC(req->rq_md, req->rq_buflen);
467 if (req->rq_md == NULL)
468 GOTO(out_set, rc = -ENOMEM);
470 req->rq_idx = loi->loi_ost_idx;
473 /* XXX LOV STACKING: submd should be from the subobj */
474 req->rq_md->lsm_object_id = loi->loi_id;
475 req->rq_md->lsm_stripe_count = 0;
477 lov_set_add_req(req, set);
480 GOTO(out_set, rc = -EIO);
484 lov_fini_cancel_set(set);
488 static int create_done(struct obd_export *exp, struct lov_request_set *set,
489 struct lov_stripe_md **lsmp)
491 struct lov_obd *lov = &exp->exp_obd->u.lov;
492 struct obd_trans_info *oti = set->set_oti;
493 struct obdo *src_oa = set->set_oa;
494 struct lov_request *req;
495 struct obdo *ret_oa = NULL;
496 int attrset = 0, rc = 0;
499 LASSERT(set->set_completes);
501 /* try alloc objects on other osts if osc_create fails for
502 * exceptions: RPC failure, ENOSPC, etc */
503 if (set->set_count != set->set_success) {
504 list_for_each_entry (req, &set->set_list, rq_link) {
508 set->set_completes--;
509 req->rq_complete = 0;
511 rc = qos_remedy_create(set, req);
512 lov_update_create_set(set, req, rc);
519 /* no successful creates */
520 if (set->set_success == 0)
523 /* If there was an explicit stripe set, fail. Otherwise, we
524 * got some objects and that's not bad. */
525 if (set->set_count != set->set_success) {
528 set->set_count = set->set_success;
532 ret_oa = obdo_alloc();
534 GOTO(cleanup, rc = -ENOMEM);
536 list_for_each_entry(req, &set->set_list, rq_link) {
537 if (!req->rq_complete || req->rq_rc)
539 lov_merge_attrs(ret_oa, req->rq_oa, req->rq_oa->o_valid,
540 set->set_md, req->rq_stripe, &attrset);
542 if (src_oa->o_valid & OBD_MD_FLSIZE &&
543 ret_oa->o_size != src_oa->o_size) {
544 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
545 src_oa->o_size, ret_oa->o_size);
548 ret_oa->o_id = src_oa->o_id;
549 memcpy(src_oa, ret_oa, sizeof(*src_oa));
556 list_for_each_entry(req, &set->set_list, rq_link) {
557 struct obd_export *sub_exp;
560 if (!req->rq_complete || req->rq_rc)
563 sub_exp = lov->tgts[req->rq_idx].ltd_exp;
564 err = obd_destroy(sub_exp, req->rq_oa, NULL, oti, NULL);
566 CERROR("Failed to uncreate objid "LPX64" subobj "
567 LPX64" on OST idx %d: rc = %d\n",
568 set->set_oa->o_id, req->rq_oa->o_id,
572 obd_free_memmd(exp, &set->set_md);
574 if (oti && set->set_cookies) {
575 oti->oti_logcookies = set->set_cookies;
576 if (!set->set_cookie_sent) {
577 oti_free_cookies(oti);
578 src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
580 src_oa->o_valid |= OBD_MD_FLCOOKIE;
586 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
593 LASSERT(set->set_exp);
594 if (set->set_completes) {
595 rc = create_done(set->set_exp, set, lsmp);
596 /* FIXME update qos data here */
599 if (atomic_dec_and_test(&set->set_refcount))
605 int lov_update_create_set(struct lov_request_set *set,
606 struct lov_request *req, int rc)
608 struct obd_trans_info *oti = set->set_oti;
609 struct lov_stripe_md *lsm = set->set_md;
610 struct lov_oinfo *loi;
611 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
614 req->rq_stripe = set->set_success;
615 loi = &lsm->lsm_oinfo[req->rq_stripe];
617 if (rc && lov->tgts[req->rq_idx].active) {
618 CERROR("error creating fid "LPX64" sub-object"
619 " on OST idx %d/%d: rc = %d\n",
620 set->set_oa->o_id, req->rq_idx,
621 lsm->lsm_stripe_count, rc);
623 CERROR("obd_create returned invalid err %d\n", rc);
627 lov_update_set(set, req, rc);
631 if (oti && oti->oti_objid)
632 oti->oti_objid[req->rq_idx] = req->rq_oa->o_id;
634 loi->loi_id = req->rq_oa->o_id;
635 loi->loi_ost_idx = req->rq_idx;
636 CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPX64" at idx %d\n",
637 lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
640 if (set->set_cookies)
641 ++oti->oti_logcookies;
642 if (req->rq_oa->o_valid & OBD_MD_FLCOOKIE)
643 set->set_cookie_sent++;
648 int lov_prep_create_set(struct obd_export *exp, struct lov_stripe_md **lsmp,
649 struct obdo *src_oa, struct obd_trans_info *oti,
650 struct lov_request_set **reqset)
652 struct lov_obd *lov = &exp->exp_obd->u.lov;
653 struct lov_request_set *set;
654 int rc = 0, newea = 0;
657 OBD_ALLOC(set, sizeof(*set));
664 set->set_oa = src_oa;
667 if (set->set_md == NULL) {
668 int stripes, stripe_cnt;
669 stripe_cnt = lov_get_stripecnt(lov, 0);
671 /* If the MDS file was truncated up to some size, stripe over
672 * enough OSTs to allow the file to be created at that size. */
673 if (src_oa->o_valid & OBD_MD_FLSIZE) {
674 stripes=((src_oa->o_size+LUSTRE_STRIPE_MAXBYTES)>>12)-1;
675 do_div(stripes, (__u32)(LUSTRE_STRIPE_MAXBYTES >> 12));
677 if (stripes > lov->desc.ld_active_tgt_count)
678 GOTO(out_set, rc = -EFBIG);
679 if (stripes < stripe_cnt)
680 stripes = stripe_cnt;
682 stripes = stripe_cnt;
685 rc = lov_alloc_memmd(&set->set_md, stripes,
686 lov->desc.ld_pattern ?
687 lov->desc.ld_pattern : LOV_PATTERN_RAID0,
694 rc = qos_prep_create(lov, set, newea);
698 if (oti && (src_oa->o_valid & OBD_MD_FLCOOKIE)) {
699 oti_alloc_cookies(oti, set->set_count);
700 if (!oti->oti_logcookies)
702 set->set_cookies = oti->oti_logcookies;
709 obd_free_memmd(exp, &set->set_md);
711 lov_fini_create_set(set, lsmp);
715 static int common_attr_done(struct lov_request_set *set)
717 struct list_head *pos;
718 struct lov_request *req;
720 int rc = 0, attrset = 0;
723 if (set->set_oa == NULL)
726 if (!set->set_success)
729 tmp_oa = obdo_alloc();
731 GOTO(out, rc = -ENOMEM);
733 list_for_each (pos, &set->set_list) {
734 req = list_entry(pos, struct lov_request, rq_link);
736 if (!req->rq_complete || req->rq_rc)
738 if (req->rq_oa->o_valid == 0) /* inactive stripe */
740 lov_merge_attrs(tmp_oa, req->rq_oa, req->rq_oa->o_valid,
741 set->set_md, req->rq_stripe, &attrset);
744 CERROR("No stripes had valid attrs\n");
747 tmp_oa->o_id = set->set_oa->o_id;
748 memcpy(set->set_oa, tmp_oa, sizeof(*set->set_oa));
756 static int brw_done(struct lov_request_set *set)
758 struct lov_stripe_md *lsm = set->set_md;
759 struct lov_oinfo *loi = NULL;
760 struct list_head *pos;
761 struct lov_request *req;
764 list_for_each (pos, &set->set_list) {
765 req = list_entry(pos, struct lov_request, rq_link);
767 if (!req->rq_complete || req->rq_rc)
770 loi = &lsm->lsm_oinfo[req->rq_stripe];
772 if (req->rq_oa->o_valid & OBD_MD_FLBLOCKS)
773 loi->loi_lvb.lvb_blocks = req->rq_oa->o_blocks;
779 int lov_fini_brw_set(struct lov_request_set *set)
786 LASSERT(set->set_exp);
787 if (set->set_completes) {
789 /* FIXME update qos data here */
791 if (atomic_dec_and_test(&set->set_refcount))
797 int lov_prep_brw_set(struct obd_export *exp, struct obdo *src_oa,
798 struct lov_stripe_md *lsm, obd_count oa_bufs,
799 struct brw_page *pga, struct obd_trans_info *oti,
800 struct lov_request_set **reqset)
807 struct lov_request_set *set;
808 struct lov_oinfo *loi = NULL;
809 struct lov_obd *lov = &exp->exp_obd->u.lov;
810 int rc = 0, i, shift;
813 OBD_ALLOC(set, sizeof(*set));
820 set->set_oa = src_oa;
822 set->set_oabufs = oa_bufs;
823 OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
825 GOTO(out, rc = -ENOMEM);
827 OBD_ALLOC(info, sizeof(*info) * lsm->lsm_stripe_count);
829 GOTO(out, rc = -ENOMEM);
831 /* calculate the page count for each stripe */
832 for (i = 0; i < oa_bufs; i++) {
833 int stripe = lov_stripe_number(lsm, pga[i].off);
834 info[stripe].count++;
837 /* alloc and initialize lov request */
839 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
840 struct lov_request *req;
842 if (info[i].count == 0)
845 if (lov->tgts[loi->loi_ost_idx].active == 0) {
846 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
847 GOTO(out, rc = -EIO);
850 OBD_ALLOC(req, sizeof(*req));
852 GOTO(out, rc = -ENOMEM);
854 req->rq_oa = obdo_alloc();
855 if (req->rq_oa == NULL)
856 GOTO(out, rc = -ENOMEM);
859 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
860 req->rq_oa->o_id = loi->loi_id;
861 req->rq_oa->o_stripe_idx = i;
863 req->rq_buflen = sizeof(*req->rq_md);
864 OBD_ALLOC(req->rq_md, req->rq_buflen);
865 if (req->rq_md == NULL)
866 GOTO(out, rc = -ENOMEM);
868 req->rq_idx = loi->loi_ost_idx;
871 /* XXX LOV STACKING */
872 req->rq_md->lsm_object_id = loi->loi_id;
873 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
874 req->rq_oabufs = info[i].count;
875 req->rq_pgaidx = shift;
876 shift += req->rq_oabufs;
878 /* remember the index for sort brw_page array */
879 info[i].index = req->rq_pgaidx;
881 lov_set_add_req(req, set);
884 GOTO(out, rc = -EIO);
886 /* rotate & sort the brw_page array */
887 for (i = 0; i < oa_bufs; i++) {
888 int stripe = lov_stripe_number(lsm, pga[i].off);
890 shift = info[stripe].index + info[stripe].off;
891 LASSERT(shift < oa_bufs);
892 set->set_pga[shift] = pga[i];
893 lov_stripe_offset(lsm, pga[i].off, stripe,
894 &set->set_pga[shift].off);
899 OBD_FREE(info, sizeof(*info) * lsm->lsm_stripe_count);
904 lov_fini_brw_set(set);
909 int lov_fini_getattr_set(struct lov_request_set *set)
916 LASSERT(set->set_exp);
917 if (set->set_completes)
918 rc = common_attr_done(set);
920 if (atomic_dec_and_test(&set->set_refcount))
926 int lov_prep_getattr_set(struct obd_export *exp, struct obdo *src_oa,
927 struct lov_stripe_md *lsm,
928 struct lov_request_set **reqset)
930 struct lov_request_set *set;
931 struct lov_oinfo *loi = NULL;
932 struct lov_obd *lov = &exp->exp_obd->u.lov;
936 OBD_ALLOC(set, sizeof(*set));
943 set->set_oa = src_oa;
945 loi = lsm->lsm_oinfo;
946 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
947 struct lov_request *req;
949 if (lov->tgts[loi->loi_ost_idx].active == 0) {
950 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
954 OBD_ALLOC(req, sizeof(*req));
956 GOTO(out_set, rc = -ENOMEM);
959 req->rq_idx = loi->loi_ost_idx;
961 req->rq_oa = obdo_alloc();
962 if (req->rq_oa == NULL)
963 GOTO(out_set, rc = -ENOMEM);
964 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
965 req->rq_oa->o_id = loi->loi_id;
967 lov_set_add_req(req, set);
970 GOTO(out_set, rc = -EIO);
974 lov_fini_getattr_set(set);
978 int lov_fini_destroy_set(struct lov_request_set *set)
984 LASSERT(set->set_exp);
985 if (set->set_completes) {
986 /* FIXME update qos data here */
989 if (atomic_dec_and_test(&set->set_refcount))
995 int lov_prep_destroy_set(struct obd_export *exp, struct obdo *src_oa,
996 struct lov_stripe_md *lsm,
997 struct obd_trans_info *oti,
998 struct lov_request_set **reqset)
1000 struct lov_request_set *set;
1001 struct lov_oinfo *loi = NULL;
1002 struct lov_obd *lov = &exp->exp_obd->u.lov;
1003 int rc = 0, cookie_set = 0, i;
1006 OBD_ALLOC(set, sizeof(*set));
1013 set->set_oa = src_oa;
1015 if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1016 set->set_cookies = oti->oti_logcookies;
1018 loi = lsm->lsm_oinfo;
1019 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1020 struct lov_request *req;
1022 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1023 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1027 OBD_ALLOC(req, sizeof(*req));
1029 GOTO(out_set, rc = -ENOMEM);
1032 req->rq_idx = loi->loi_ost_idx;
1034 req->rq_oa = obdo_alloc();
1035 if (req->rq_oa == NULL)
1036 GOTO(out_set, rc = -ENOMEM);
1037 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1038 req->rq_oa->o_id = loi->loi_id;
1040 /* Setup the first request's cookie position */
1041 if (!cookie_set && set->set_cookies) {
1042 oti->oti_logcookies = set->set_cookies + i;
1045 lov_set_add_req(req, set);
1047 if (!set->set_count)
1048 GOTO(out_set, rc = -EIO);
1052 lov_fini_destroy_set(set);
1056 int lov_fini_setattr_set(struct lov_request_set *set)
1063 LASSERT(set->set_exp);
1064 if (set->set_completes) {
1065 rc = common_attr_done(set);
1066 /* FIXME update qos data here */
1069 if (atomic_dec_and_test(&set->set_refcount))
1070 lov_finish_set(set);
1074 int lov_prep_setattr_set(struct obd_export *exp, struct obdo *src_oa,
1075 struct lov_stripe_md *lsm, struct obd_trans_info *oti,
1076 struct lov_request_set **reqset)
1078 struct lov_request_set *set;
1079 struct lov_oinfo *loi = NULL;
1080 struct lov_obd *lov = &exp->exp_obd->u.lov;
1084 OBD_ALLOC(set, sizeof(*set));
1091 set->set_oa = src_oa;
1093 loi = lsm->lsm_oinfo;
1094 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1095 struct lov_request *req;
1097 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1098 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1102 OBD_ALLOC(req, sizeof(*req));
1104 GOTO(out_set, rc = -ENOMEM);
1106 req->rq_idx = loi->loi_ost_idx;
1108 req->rq_oa = obdo_alloc();
1109 if (req->rq_oa == NULL)
1110 GOTO(out_set, rc = -ENOMEM);
1111 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1112 req->rq_oa->o_id = loi->loi_id;
1113 req->rq_oa->o_stripe_idx = i;
1115 if (src_oa->o_valid & OBD_MD_FLSIZE) {
1116 if (lov_stripe_offset(lsm, src_oa->o_size, i,
1117 &req->rq_oa->o_size) < 0 &&
1119 req->rq_oa->o_size--;
1120 CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1121 i, req->rq_oa->o_size, src_oa->o_size);
1123 lov_set_add_req(req, set);
1125 if (!set->set_count)
1126 GOTO(out_set, rc = -EIO);
1130 lov_fini_setattr_set(set);
1134 int lov_update_setattr_set(struct lov_request_set *set,
1135 struct lov_request *req, int rc)
1137 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
1138 struct lov_stripe_md *lsm = set->set_md;
1141 lov_update_set(set, req, rc);
1143 /* grace error on inactive ost */
1144 if (rc && !lov->tgts[req->rq_idx].active)
1147 /* FIXME: LOV STACKING update loi data should be done by OSC *
1148 * when this is gone we can go back to using lov_update_common_set() */
1150 if (req->rq_oa->o_valid & OBD_MD_FLMTIME)
1151 lsm->lsm_oinfo[req->rq_stripe].loi_lvb.lvb_ctime =
1152 req->rq_oa->o_ctime;
1153 if (req->rq_oa->o_valid & OBD_MD_FLMTIME)
1154 lsm->lsm_oinfo[req->rq_stripe].loi_lvb.lvb_mtime =
1155 req->rq_oa->o_mtime;
1156 if (req->rq_oa->o_valid & OBD_MD_FLATIME)
1157 lsm->lsm_oinfo[req->rq_stripe].loi_lvb.lvb_atime =
1158 req->rq_oa->o_atime;
1164 int lov_update_punch_set(struct lov_request_set *set, struct lov_request *req,
1167 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
1170 lov_update_set(set, req, rc);
1171 if (rc && !lov->tgts[req->rq_idx].active)
1173 /* FIXME in raid1 regime, should return 0 */
1177 int lov_fini_punch_set(struct lov_request_set *set)
1184 LASSERT(set->set_exp);
1185 if (set->set_completes) {
1186 if (!set->set_success)
1188 /* FIXME update qos data here */
1191 if (atomic_dec_and_test(&set->set_refcount))
1192 lov_finish_set(set);
1197 int lov_prep_punch_set(struct obd_export *exp, struct obdo *src_oa,
1198 struct lov_stripe_md *lsm, obd_off start,
1199 obd_off end, struct obd_trans_info *oti,
1200 struct lov_request_set **reqset)
1202 struct lov_request_set *set;
1203 struct lov_oinfo *loi = NULL;
1204 struct lov_obd *lov = &exp->exp_obd->u.lov;
1208 OBD_ALLOC(set, sizeof(*set));
1215 set->set_oa = src_oa;
1217 loi = lsm->lsm_oinfo;
1218 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1219 struct lov_request *req;
1222 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1223 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1227 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1230 OBD_ALLOC(req, sizeof(*req));
1232 GOTO(out_set, rc = -ENOMEM);
1234 req->rq_idx = loi->loi_ost_idx;
1236 req->rq_oa = obdo_alloc();
1237 if (req->rq_oa == NULL)
1238 GOTO(out_set, rc = -ENOMEM);
1239 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1240 req->rq_oa->o_id = loi->loi_id;
1241 req->rq_oa->o_stripe_idx = i;
1243 req->rq_extent.start = rs;
1244 req->rq_extent.end = re;
1245 req->rq_extent.gid = -1;
1247 lov_set_add_req(req, set);
1249 if (!set->set_count)
1250 GOTO(out_set, rc = -EIO);
1254 lov_fini_punch_set(set);
1258 int lov_fini_sync_set(struct lov_request_set *set)
1265 LASSERT(set->set_exp);
1266 if (set->set_completes) {
1267 if (!set->set_success)
1269 /* FIXME update qos data here */
1272 if (atomic_dec_and_test(&set->set_refcount))
1273 lov_finish_set(set);
1278 int lov_prep_sync_set(struct obd_export *exp, struct obdo *src_oa,
1279 struct lov_stripe_md *lsm, obd_off start,
1280 obd_off end, struct lov_request_set **reqset)
1282 struct lov_request_set *set;
1283 struct lov_oinfo *loi = NULL;
1284 struct lov_obd *lov = &exp->exp_obd->u.lov;
1288 OBD_ALLOC(set, sizeof(*set));
1295 set->set_oa = src_oa;
1297 loi = lsm->lsm_oinfo;
1298 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1299 struct lov_request *req;
1302 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1303 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1307 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1310 OBD_ALLOC(req, sizeof(*req));
1312 GOTO(out_set, rc = -ENOMEM);
1314 req->rq_idx = loi->loi_ost_idx;
1316 req->rq_oa = obdo_alloc();
1317 if (req->rq_oa == NULL)
1318 GOTO(out_set, rc = -ENOMEM);
1319 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1320 req->rq_oa->o_id = loi->loi_id;
1321 req->rq_oa->o_stripe_idx = i;
1323 req->rq_extent.start = rs;
1324 req->rq_extent.end = re;
1325 req->rq_extent.gid = -1;
1327 lov_set_add_req(req, set);
1329 if (!set->set_count)
1330 GOTO(out_set, rc = -EIO);
1334 lov_fini_sync_set(set);