1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
6 * This file is part of Lustre, http://www.lustre.org.
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 # define EXPORT_SYMTAB
25 #define DEBUG_SUBSYSTEM S_LOV
28 #include <asm/div64.h>
30 #include <liblustre.h>
33 #include <linux/obd_class.h>
34 #include <linux/obd_lov.h>
35 #include <linux/lustre_idl.h>
37 #include "lov_internal.h"
39 static void lov_init_set(struct lov_request_set *set)
42 set->set_completes = 0;
44 INIT_LIST_HEAD(&set->set_list);
45 atomic_set(&set->set_refcount, 1);
48 static void lov_finish_set(struct lov_request_set *set)
50 struct list_head *pos, *n;
54 list_for_each_safe(pos, n, &set->set_list) {
55 struct lov_request *req = list_entry(pos, struct lov_request,
57 list_del_init(&req->rq_link);
60 obdo_free(req->rq_oa);
62 OBD_FREE(req->rq_md, req->rq_buflen);
63 OBD_FREE(req, sizeof(*req));
67 int len = set->set_oabufs * sizeof(*set->set_pga);
68 OBD_FREE(set->set_pga, len);
71 lov_llh_put(set->set_lockh);
73 OBD_FREE(set, sizeof(*set));
77 static void lov_update_set(struct lov_request_set *set,
78 struct lov_request *req, int rc)
88 int lov_update_common_set(struct lov_request_set *set,
89 struct lov_request *req, int rc)
91 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
94 lov_update_set(set, req, rc);
96 /* grace error on inactive ost */
97 if (rc && !lov->tgts[req->rq_idx].active)
100 /* FIXME in raid1 regime, should return 0 */
104 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
106 list_add_tail(&req->rq_link, &set->set_list);
110 int lov_update_enqueue_set(struct lov_request_set *set,
111 struct lov_request *req, int rc, int flags)
113 struct lustre_handle *lov_lockhp;
114 struct lov_oinfo *loi;
117 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
118 loi = &set->set_md->lsm_oinfo[req->rq_stripe];
120 /* XXX FIXME: This unpleasantness doesn't belong here at *all*.
121 * It belongs in the OSC, except that the OSC doesn't have
122 * access to the real LOI -- it gets a copy, that we created
123 * above, and that copy can be arbitrarily out of date.
125 * The LOV API is due for a serious rewriting anyways, and this
126 * can be addressed then. */
127 if (rc == ELDLM_OK) {
128 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
129 __u64 tmp = req->rq_md->lsm_oinfo->loi_rss;
131 LASSERT(lock != NULL);
133 loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
134 loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks;
135 /* Extend KMS up to the end of this lock and no further
136 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
137 if (tmp > lock->l_policy_data.l_extent.end)
138 tmp = lock->l_policy_data.l_extent.end + 1;
139 if (tmp >= loi->loi_kms) {
140 CDEBUG(D_INODE, "lock acquired, setting rss="
141 LPU64", kms="LPU64"\n", loi->loi_rss, tmp);
143 loi->loi_kms_valid = 1;
145 CDEBUG(D_INODE, "lock acquired, setting rss="
146 LPU64"; leaving kms="LPU64", end="LPU64
147 "\n", loi->loi_rss, loi->loi_kms,
148 lock->l_policy_data.l_extent.end);
150 ldlm_lock_allow_match(lock);
152 } else if (rc == ELDLM_LOCK_ABORTED && flags & LDLM_FL_HAS_INTENT) {
153 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
154 loi->loi_rss = req->rq_md->lsm_oinfo->loi_rss;
155 loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
156 loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks;
157 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
158 " kms="LPU64"\n", loi->loi_rss, loi->loi_kms);
161 struct obd_export *exp = set->set_exp;
162 struct lov_obd *lov = &exp->exp_obd->u.lov;
164 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
165 if (lov->tgts[req->rq_idx].active) {
166 CERROR("error: enqueue objid "LPX64" subobj "
167 LPX64" on OST idx %d: rc = %d\n",
168 set->set_md->lsm_object_id, loi->loi_id,
169 loi->loi_ost_idx, rc);
174 lov_update_set(set, req, rc);
178 static int enqueue_done(struct lov_request_set *set, __u32 mode)
180 struct list_head *pos;
181 struct lov_request *req;
182 struct lustre_handle *lov_lockhp = NULL;
183 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
187 LASSERT(set->set_completes);
188 /* enqueue/match success, just return */
189 if (set->set_completes == set->set_success)
192 /* cancel enqueued/matched locks */
193 list_for_each (pos, &set->set_list) {
194 req = list_entry(pos, struct lov_request, rq_link);
196 if (!req->rq_complete || req->rq_rc)
199 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
201 if (lov_lockhp->cookie == 0)
204 rc = obd_cancel(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
206 if (rc && lov->tgts[req->rq_idx].active)
207 CERROR("cancelling obdjid "LPX64" on OST "
208 "idx %d error: rc = %d\n",
209 req->rq_md->lsm_object_id, req->rq_idx, rc);
211 lov_llh_put(set->set_lockh);
215 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode)
220 LASSERT(set->set_exp);
223 if (set->set_completes)
224 rc = enqueue_done(set, mode);
226 lov_llh_put(set->set_lockh);
228 if (atomic_dec_and_test(&set->set_refcount))
234 int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm,
235 ldlm_policy_data_t *policy, __u32 mode,
236 struct lustre_handle *lockh,
237 struct lov_request_set **reqset)
239 struct lov_obd *lov = &exp->exp_obd->u.lov;
240 struct lov_request_set *set;
242 struct lov_oinfo *loi;
245 OBD_ALLOC(set, sizeof(*set));
252 set->set_lockh = lov_llh_new(lsm);
253 if (set->set_lockh == NULL)
254 GOTO(out_set, rc = -ENOMEM);
255 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
257 loi = lsm->lsm_oinfo;
258 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
259 struct lov_request *req;
262 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
263 policy->l_extent.end, &start, &end))
266 if (lov->tgts[loi->loi_ost_idx].active == 0) {
267 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
271 OBD_ALLOC(req, sizeof(*req));
273 GOTO(out_set, rc = -ENOMEM);
275 req->rq_buflen = sizeof(*req->rq_md) +
276 sizeof(struct lov_oinfo);
277 OBD_ALLOC(req->rq_md, req->rq_buflen);
278 if (req->rq_md == NULL)
279 GOTO(out_set, rc = -ENOMEM);
281 req->rq_extent.start = start;
282 req->rq_extent.end = end;
284 req->rq_idx = loi->loi_ost_idx;
287 /* XXX LOV STACKING: submd should be from the subobj */
288 req->rq_md->lsm_object_id = loi->loi_id;
289 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
290 req->rq_md->lsm_stripe_count = 0;
291 req->rq_md->lsm_oinfo->loi_kms_valid = loi->loi_kms_valid;
292 req->rq_md->lsm_oinfo->loi_rss = loi->loi_rss;
293 req->rq_md->lsm_oinfo->loi_kms = loi->loi_kms;
294 req->rq_md->lsm_oinfo->loi_blocks = loi->loi_blocks;
295 loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
297 lov_set_add_req(req, set);
300 GOTO(out_set, rc = -EIO);
304 lov_fini_enqueue_set(set, mode);
308 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
316 lov_update_set(set, req, ret);
320 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
325 LASSERT(set->set_exp);
328 if (set->set_completes) {
329 if (set->set_count == set->set_success &&
330 flags & LDLM_FL_TEST_LOCK)
331 lov_llh_put(set->set_lockh);
332 rc = enqueue_done(set, mode);
334 lov_llh_put(set->set_lockh);
337 if (atomic_dec_and_test(&set->set_refcount))
343 int lov_prep_match_set(struct obd_export *exp, struct lov_stripe_md *lsm,
344 ldlm_policy_data_t *policy, __u32 mode,
345 struct lustre_handle *lockh,
346 struct lov_request_set **reqset)
348 struct lov_obd *lov = &exp->exp_obd->u.lov;
349 struct lov_request_set *set;
351 struct lov_oinfo *loi;
354 OBD_ALLOC(set, sizeof(*set));
361 set->set_lockh = lov_llh_new(lsm);
362 if (set->set_lockh == NULL)
363 GOTO(out_set, rc = -ENOMEM);
364 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
366 loi = lsm->lsm_oinfo;
367 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
368 struct lov_request *req;
371 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
372 policy->l_extent.end, &start, &end))
375 /* FIXME raid1 should grace this error */
376 if (lov->tgts[loi->loi_ost_idx].active == 0) {
377 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
378 GOTO(out_set, rc = -EIO);
381 OBD_ALLOC(req, sizeof(*req));
383 GOTO(out_set, rc = -ENOMEM);
385 req->rq_buflen = sizeof(*req->rq_md);
386 OBD_ALLOC(req->rq_md, req->rq_buflen);
387 if (req->rq_md == NULL)
388 GOTO(out_set, rc = -ENOMEM);
390 req->rq_extent.start = start;
391 req->rq_extent.end = end;
393 req->rq_idx = loi->loi_ost_idx;
396 /* XXX LOV STACKING: submd should be from the subobj */
397 req->rq_md->lsm_object_id = loi->loi_id;
398 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
399 req->rq_md->lsm_stripe_count = 0;
400 lov_set_add_req(req, set);
403 GOTO(out_set, rc = -EIO);
407 lov_fini_match_set(set, mode, 0);
411 int lov_fini_cancel_set(struct lov_request_set *set)
416 LASSERT(set->set_exp);
421 lov_llh_put(set->set_lockh);
423 if (atomic_dec_and_test(&set->set_refcount))
429 int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm,
430 __u32 mode, struct lustre_handle *lockh,
431 struct lov_request_set **reqset)
433 struct lov_request_set *set;
435 struct lov_oinfo *loi;
438 OBD_ALLOC(set, sizeof(*set));
445 set->set_lockh = lov_handle2llh(lockh);
446 if (set->set_lockh == NULL) {
447 CERROR("LOV: invalid lov lock handle %p\n", lockh);
448 GOTO(out_set, rc = -EINVAL);
450 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
452 loi = lsm->lsm_oinfo;
453 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
454 struct lov_request *req;
455 struct lustre_handle *lov_lockhp;
457 lov_lockhp = set->set_lockh->llh_handles + i;
458 if (lov_lockhp->cookie == 0) {
459 CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
460 loi->loi_ost_idx, loi->loi_id);
464 OBD_ALLOC(req, sizeof(*req));
466 GOTO(out_set, rc = -ENOMEM);
468 req->rq_buflen = sizeof(*req->rq_md);
469 OBD_ALLOC(req->rq_md, req->rq_buflen);
470 if (req->rq_md == NULL)
471 GOTO(out_set, rc = -ENOMEM);
473 req->rq_idx = loi->loi_ost_idx;
476 /* XXX LOV STACKING: submd should be from the subobj */
477 req->rq_md->lsm_object_id = loi->loi_id;
478 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
479 req->rq_md->lsm_stripe_count = 0;
480 lov_set_add_req(req, set);
483 GOTO(out_set, rc = -EIO);
487 lov_fini_cancel_set(set);
491 static int create_done(struct obd_export *exp, struct lov_request_set *set,
492 struct lov_stripe_md **ea)
494 struct lov_obd *lov = &exp->exp_obd->u.lov;
495 struct obd_trans_info *oti = set->set_oti;
496 struct obdo *src_oa = set->set_oa;
497 struct list_head *pos;
498 struct lov_request *req;
499 struct obdo *ret_oa = NULL;
500 int attrset = 0, rc = 0;
503 LASSERT(set->set_completes);
505 if (!set->set_success)
506 GOTO(cleanup, rc = -EIO);
508 if (*ea == NULL && set->set_count != set->set_success) {
509 set->set_count = set->set_success;
513 ret_oa = obdo_alloc();
515 GOTO(cleanup, rc = -ENOMEM);
517 list_for_each (pos, &set->set_list) {
518 req = list_entry(pos, struct lov_request, rq_link);
519 if (!req->rq_complete || req->rq_rc)
521 lov_merge_attrs(ret_oa, req->rq_oa, req->rq_oa->o_valid,
522 set->set_md, req->rq_stripe, &attrset);
524 if (src_oa->o_valid & OBD_MD_FLSIZE &&
525 ret_oa->o_size != src_oa->o_size) {
526 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
527 src_oa->o_size, ret_oa->o_size);
530 ret_oa->o_id = src_oa->o_id;
531 ret_oa->o_gr = src_oa->o_gr;
532 ret_oa->o_valid |= OBD_MD_FLGROUP;
533 memcpy(src_oa, ret_oa, sizeof(*src_oa));
541 list_for_each (pos, &set->set_list) {
542 struct obd_export *sub_exp;
544 req = list_entry(pos, struct lov_request, rq_link);
546 if (!req->rq_complete || req->rq_rc)
549 sub_exp = lov->tgts[req->rq_idx].ltd_exp,
550 err = obd_destroy(sub_exp, req->rq_oa, NULL, oti);
552 CERROR("Failed to uncreate objid "LPX64" subobj "
553 LPX64" on OST idx %d: rc = %d\n",
554 set->set_oa->o_id, req->rq_oa->o_id,
558 obd_free_memmd(exp, &set->set_md);
560 if (oti && set->set_cookies) {
561 oti->oti_logcookies = set->set_cookies;
562 if (!set->set_cookie_sent) {
563 oti_free_cookies(oti);
564 src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
566 src_oa->o_valid |= OBD_MD_FLCOOKIE;
572 int lov_fini_create_set(struct lov_request_set *set, struct lov_stripe_md **ea)
577 LASSERT(set->set_exp);
580 if (set->set_completes) {
581 rc = create_done(set->set_exp, set, ea);
582 /* FIXME update qos data here */
585 if (atomic_dec_and_test(&set->set_refcount))
591 int lov_update_create_set(struct lov_request_set *set,
592 struct lov_request *req, int rc)
594 struct obd_trans_info *oti = set->set_oti;
595 struct lov_stripe_md *lsm = set->set_md;
596 struct lov_oinfo *loi;
597 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
600 req->rq_stripe = set->set_success;
601 loi = &lsm->lsm_oinfo[req->rq_stripe];
603 if (rc && lov->tgts[req->rq_idx].active) {
604 CERROR("error creating objid "LPX64" sub-object"
605 " on OST idx %d/%d: rc = %d\n",
606 set->set_oa->o_id, req->rq_idx,
607 lsm->lsm_stripe_count, rc);
609 CERROR("obd_create returned invalid err %d\n", rc);
613 lov_update_set(set, req, rc);
617 if (oti && oti->oti_objid)
618 oti->oti_objid[req->rq_idx] = req->rq_oa->o_id;
620 loi->loi_id = req->rq_oa->o_id;
621 loi->loi_gr = req->rq_oa->o_gr;
622 loi->loi_ost_idx = req->rq_idx;
623 CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPX64" at idx %d\n",
624 lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
627 if (set->set_cookies)
628 ++oti->oti_logcookies;
629 if (req->rq_oa->o_valid & OBD_MD_FLCOOKIE)
630 set->set_cookie_sent++;
635 int lov_prep_create_set(struct obd_export *exp, struct lov_stripe_md **ea,
636 struct obdo *src_oa, struct obd_trans_info *oti,
637 struct lov_request_set **reqset)
639 struct lov_obd *lov = &exp->exp_obd->u.lov;
640 struct lov_request_set *set;
641 int rc = 0, newea = 0;
644 OBD_ALLOC(set, sizeof(*set));
651 set->set_oa = src_oa;
654 if (set->set_md == NULL) {
655 int stripes, stripe_cnt;
656 stripe_cnt = lov_get_stripecnt(lov, 0);
658 /* If the MDS file was truncated up to some size, stripe over
659 * enough OSTs to allow the file to be created at that size. */
660 if (src_oa->o_valid & OBD_MD_FLSIZE) {
661 stripes=((src_oa->o_size+LUSTRE_STRIPE_MAXBYTES)>>12)-1;
662 do_div(stripes, (__u32)(LUSTRE_STRIPE_MAXBYTES >> 12));
664 if (stripes > lov->desc.ld_active_tgt_count)
665 GOTO(out_set, rc = -EFBIG);
666 if (stripes < stripe_cnt)
667 stripes = stripe_cnt;
669 stripes = stripe_cnt;
672 rc = lov_alloc_memmd(&set->set_md, stripes,
673 lov->desc.ld_pattern ?
674 lov->desc.ld_pattern : LOV_PATTERN_RAID0);
680 rc = qos_prep_create(lov, set, newea);
684 if (oti && (src_oa->o_valid & OBD_MD_FLCOOKIE)) {
685 oti_alloc_cookies(oti, set->set_count);
686 if (!oti->oti_logcookies)
688 set->set_cookies = oti->oti_logcookies;
695 obd_free_memmd(exp, &set->set_md);
697 lov_fini_create_set(set, ea);
701 static int common_attr_done(struct lov_request_set *set)
703 struct list_head *pos;
704 struct lov_request *req;
706 int rc = 0, attrset = 0;
709 if (set->set_oa == NULL)
712 if (!set->set_success)
715 tmp_oa = obdo_alloc();
717 GOTO(out, rc = -ENOMEM);
719 list_for_each (pos, &set->set_list) {
720 req = list_entry(pos, struct lov_request, rq_link);
722 if (!req->rq_complete || req->rq_rc)
724 if (req->rq_oa->o_valid == 0) /* inactive stripe */
726 lov_merge_attrs(tmp_oa, req->rq_oa, req->rq_oa->o_valid,
727 set->set_md, req->rq_stripe, &attrset);
730 CERROR("No stripes had valid attrs\n");
733 tmp_oa->o_id = set->set_oa->o_id;
734 memcpy(set->set_oa, tmp_oa, sizeof(*set->set_oa));
742 static int brw_done(struct lov_request_set *set)
744 struct lov_stripe_md *lsm = set->set_md;
745 struct lov_oinfo *loi = NULL;
746 struct list_head *pos;
747 struct lov_request *req;
750 list_for_each (pos, &set->set_list) {
751 req = list_entry(pos, struct lov_request, rq_link);
753 if (!req->rq_complete || req->rq_rc)
756 loi = &lsm->lsm_oinfo[req->rq_stripe];
758 if (req->rq_oa->o_valid & OBD_MD_FLBLOCKS)
759 loi->loi_blocks = req->rq_oa->o_blocks;
765 int lov_fini_brw_set(struct lov_request_set *set)
770 LASSERT(set->set_exp);
773 if (set->set_completes) {
775 /* FIXME update qos data here */
777 if (atomic_dec_and_test(&set->set_refcount))
783 int lov_prep_brw_set(struct obd_export *exp, struct obdo *src_oa,
784 struct lov_stripe_md *lsm, obd_count oa_bufs,
785 struct brw_page *pga, struct obd_trans_info *oti,
786 struct lov_request_set **reqset)
793 struct lov_request_set *set;
794 struct lov_oinfo *loi = NULL;
795 struct lov_obd *lov = &exp->exp_obd->u.lov;
796 int rc = 0, i, shift;
799 OBD_ALLOC(set, sizeof(*set));
806 set->set_oa = src_oa;
808 set->set_oabufs = oa_bufs;
809 OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
811 GOTO(out, rc = -ENOMEM);
813 OBD_ALLOC(info, sizeof(*info) * lsm->lsm_stripe_count);
815 GOTO(out, rc = -ENOMEM);
817 /* calculate the page count for each stripe */
818 for (i = 0; i < oa_bufs; i++) {
819 int stripe = lov_stripe_number(lsm, pga[i].disk_offset);
820 info[stripe].count++;
823 /* alloc and initialize lov request */
824 loi = lsm->lsm_oinfo;
826 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
827 struct lov_request *req;
829 if (info[i].count == 0)
832 if (lov->tgts[loi->loi_ost_idx].active == 0) {
833 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
834 GOTO(out, rc = -EIO);
837 OBD_ALLOC(req, sizeof(*req));
839 GOTO(out, rc = -ENOMEM);
841 req->rq_oa = obdo_alloc();
842 if (req->rq_oa == NULL)
843 GOTO(out, rc = -ENOMEM);
846 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
847 req->rq_oa->o_id = loi->loi_id;
848 req->rq_buflen = sizeof(*req->rq_md);
849 OBD_ALLOC(req->rq_md, req->rq_buflen);
850 if (req->rq_md == NULL)
851 GOTO(out, rc = -ENOMEM);
853 req->rq_idx = loi->loi_ost_idx;
856 /* XXX LOV STACKING */
857 req->rq_md->lsm_object_id = loi->loi_id;
858 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
859 req->rq_oabufs = info[i].count;
860 req->rq_pgaidx = shift;
861 shift += req->rq_oabufs;
863 /* remember the index for sort brw_page array */
864 info[i].index = req->rq_pgaidx;
865 lov_set_add_req(req, set);
868 GOTO(out, rc = -EIO);
870 /* rotate & sort the brw_page array */
871 for (i = 0; i < oa_bufs; i++) {
872 int stripe = lov_stripe_number(lsm, pga[i].disk_offset);
874 shift = info[stripe].index + info[stripe].off;
875 LASSERT(shift < oa_bufs);
876 set->set_pga[shift] = pga[i];
877 lov_stripe_offset(lsm, pga[i].disk_offset, stripe,
878 &set->set_pga[shift].disk_offset);
883 OBD_FREE(info, sizeof(*info) * lsm->lsm_stripe_count);
888 lov_fini_brw_set(set);
893 static int getattr_done(struct lov_request_set *set)
895 return common_attr_done(set);
898 int lov_fini_getattr_set(struct lov_request_set *set)
903 LASSERT(set->set_exp);
906 if (set->set_completes)
907 rc = getattr_done(set);
909 if (atomic_dec_and_test(&set->set_refcount))
915 int lov_prep_getattr_set(struct obd_export *exp, struct obdo *src_oa,
916 struct lov_stripe_md *lsm,
917 struct lov_request_set **reqset)
919 struct lov_request_set *set;
920 struct lov_oinfo *loi = NULL;
921 struct lov_obd *lov = &exp->exp_obd->u.lov;
925 OBD_ALLOC(set, sizeof(*set));
932 set->set_oa = src_oa;
934 loi = lsm->lsm_oinfo;
935 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
936 struct lov_request *req;
938 if (lov->tgts[loi->loi_ost_idx].active == 0) {
939 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
943 OBD_ALLOC(req, sizeof(*req));
945 GOTO(out_set, rc = -ENOMEM);
948 req->rq_idx = loi->loi_ost_idx;
950 req->rq_oa = obdo_alloc();
951 if (req->rq_oa == NULL)
952 GOTO(out_set, rc = -ENOMEM);
953 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
954 req->rq_oa->o_id = loi->loi_id;
955 lov_set_add_req(req, set);
958 GOTO(out_set, rc = -EIO);
962 lov_fini_getattr_set(set);
966 int lov_fini_destroy_set(struct lov_request_set *set)
970 LASSERT(set->set_exp);
973 if (set->set_completes) {
974 /* FIXME update qos data here */
977 if (atomic_dec_and_test(&set->set_refcount))
983 int lov_prep_destroy_set(struct obd_export *exp, struct obdo *src_oa,
984 struct lov_stripe_md *lsm,
985 struct obd_trans_info *oti,
986 struct lov_request_set **reqset)
988 struct lov_request_set *set;
989 struct lov_oinfo *loi = NULL;
990 struct lov_obd *lov = &exp->exp_obd->u.lov;
991 int rc = 0, cookie_set = 0, i;
994 OBD_ALLOC(set, sizeof(*set));
1001 set->set_oa = src_oa;
1003 if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1004 set->set_cookies = oti->oti_logcookies;
1006 loi = lsm->lsm_oinfo;
1007 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1008 struct lov_request *req;
1010 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1011 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1015 OBD_ALLOC(req, sizeof(*req));
1017 GOTO(out_set, rc = -ENOMEM);
1020 req->rq_idx = loi->loi_ost_idx;
1022 req->rq_oa = obdo_alloc();
1023 if (req->rq_oa == NULL)
1024 GOTO(out_set, rc = -ENOMEM);
1025 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1026 req->rq_oa->o_id = loi->loi_id;
1028 /* Setup the first request's cookie position */
1029 if (!cookie_set && set->set_cookies) {
1030 oti->oti_logcookies = set->set_cookies + i;
1033 lov_set_add_req(req, set);
1035 if (!set->set_count)
1036 GOTO(out_set, rc = -EIO);
1040 lov_fini_destroy_set(set);
1044 static int setattr_done(struct lov_request_set *set)
1046 return common_attr_done(set);
1049 int lov_fini_setattr_set(struct lov_request_set *set)
1054 LASSERT(set->set_exp);
1057 if (set->set_completes) {
1058 rc = setattr_done(set);
1059 /* FIXME update qos data here */
1062 if (atomic_dec_and_test(&set->set_refcount))
1063 lov_finish_set(set);
1067 int lov_prep_setattr_set(struct obd_export *exp, struct obdo *src_oa,
1068 struct lov_stripe_md *lsm, struct obd_trans_info *oti,
1069 struct lov_request_set **reqset)
1071 struct lov_request_set *set;
1072 struct lov_oinfo *loi = NULL;
1073 struct lov_obd *lov = &exp->exp_obd->u.lov;
1077 OBD_ALLOC(set, sizeof(*set));
1084 set->set_oa = src_oa;
1086 loi = lsm->lsm_oinfo;
1087 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1088 struct lov_request *req;
1090 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1091 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1095 OBD_ALLOC(req, sizeof(*req));
1097 GOTO(out_set, rc = -ENOMEM);
1099 req->rq_idx = loi->loi_ost_idx;
1101 req->rq_oa = obdo_alloc();
1102 if (req->rq_oa == NULL)
1103 GOTO(out_set, rc = -ENOMEM);
1104 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1105 req->rq_oa->o_id = loi->loi_id;
1106 LASSERT(!(req->rq_oa->o_valid & OBD_MD_FLGROUP) || req->rq_oa->o_gr>0);
1108 if (src_oa->o_valid & OBD_MD_FLSIZE) {
1109 if (lov_stripe_offset(lsm, src_oa->o_size, i,
1110 &req->rq_oa->o_size) < 0 &&
1112 req->rq_oa->o_size--;
1113 CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1114 i, req->rq_oa->o_size, src_oa->o_size);
1116 lov_set_add_req(req, set);
1118 if (!set->set_count)
1119 GOTO(out_set, rc = -EIO);
1123 lov_fini_setattr_set(set);
1127 int lov_update_punch_set(struct lov_request_set *set, struct lov_request *req,
1130 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
1133 lov_update_set(set, req, rc);
1134 if (rc && !lov->tgts[req->rq_idx].active)
1136 /* FIXME in raid1 regime, should return 0 */
1140 int lov_fini_punch_set(struct lov_request_set *set)
1145 LASSERT(set->set_exp);
1148 if (set->set_completes) {
1149 if (!set->set_success)
1151 /* FIXME update qos data here */
1154 if (atomic_dec_and_test(&set->set_refcount))
1155 lov_finish_set(set);
1160 int lov_prep_punch_set(struct obd_export *exp, struct obdo *src_oa,
1161 struct lov_stripe_md *lsm, obd_off start,
1162 obd_off end, struct obd_trans_info *oti,
1163 struct lov_request_set **reqset)
1165 struct lov_request_set *set;
1166 struct lov_oinfo *loi = NULL;
1167 struct lov_obd *lov = &exp->exp_obd->u.lov;
1171 OBD_ALLOC(set, sizeof(*set));
1178 set->set_oa = src_oa;
1180 loi = lsm->lsm_oinfo;
1181 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1182 struct lov_request *req;
1185 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1186 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1190 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1193 OBD_ALLOC(req, sizeof(*req));
1195 GOTO(out_set, rc = -ENOMEM);
1197 req->rq_idx = loi->loi_ost_idx;
1199 req->rq_oa = obdo_alloc();
1200 if (req->rq_oa == NULL)
1201 GOTO(out_set, rc = -ENOMEM);
1202 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1203 req->rq_oa->o_id = loi->loi_id;
1204 req->rq_oa->o_gr = loi->loi_gr;
1205 req->rq_oa->o_valid |= OBD_MD_FLGROUP;
1207 req->rq_extent.start = rs;
1208 req->rq_extent.end = re;
1210 lov_set_add_req(req, set);
1212 if (!set->set_count)
1213 GOTO(out_set, rc = -EIO);
1217 lov_fini_punch_set(set);
1221 int lov_fini_sync_set(struct lov_request_set *set)
1226 LASSERT(set->set_exp);
1229 if (set->set_completes) {
1230 if (!set->set_success)
1232 /* FIXME update qos data here */
1235 if (atomic_dec_and_test(&set->set_refcount))
1236 lov_finish_set(set);
1241 int lov_prep_sync_set(struct obd_export *exp, struct obdo *src_oa,
1242 struct lov_stripe_md *lsm, obd_off start,
1243 obd_off end, struct lov_request_set **reqset)
1245 struct lov_request_set *set;
1246 struct lov_oinfo *loi = NULL;
1247 struct lov_obd *lov = &exp->exp_obd->u.lov;
1251 OBD_ALLOC(set, sizeof(*set));
1258 set->set_oa = src_oa;
1260 loi = lsm->lsm_oinfo;
1261 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1262 struct lov_request *req;
1265 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1266 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1270 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1273 OBD_ALLOC(req, sizeof(*req));
1275 GOTO(out_set, rc = -ENOMEM);
1277 req->rq_idx = loi->loi_ost_idx;
1279 req->rq_oa = obdo_alloc();
1280 if (req->rq_oa == NULL)
1281 GOTO(out_set, rc = -ENOMEM);
1282 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1283 req->rq_oa->o_id = loi->loi_id;
1284 req->rq_extent.start = rs;
1285 req->rq_extent.end = re;
1286 lov_set_add_req(req, set);
1288 if (!set->set_count)
1289 GOTO(out_set, rc = -EIO);
1293 lov_fini_sync_set(set);