1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
6 * This file is part of Lustre, http://www.lustre.org.
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 # define EXPORT_SYMTAB
25 #define DEBUG_SUBSYSTEM S_LOV
28 #include <asm/div64.h>
30 #include <liblustre.h>
33 #include <linux/obd_class.h>
34 #include <linux/obd_lov.h>
35 #include <linux/lustre_idl.h>
37 #include "lov_internal.h"
39 static void lov_init_set(struct lov_request_set *set)
42 set->set_completes = 0;
44 INIT_LIST_HEAD(&set->set_list);
45 atomic_set(&set->set_refcount, 1);
48 static void lov_finish_set(struct lov_request_set *set)
50 struct list_head *pos, *n;
54 list_for_each_safe(pos, n, &set->set_list) {
55 struct lov_request *req = list_entry(pos, struct lov_request,
57 list_del_init(&req->rq_link);
60 obdo_free(req->rq_oa);
62 OBD_FREE(req->rq_md, req->rq_buflen);
63 OBD_FREE(req, sizeof(*req));
67 int len = set->set_oabufs * sizeof(*set->set_pga);
68 OBD_FREE(set->set_pga, len);
71 lov_llh_put(set->set_lockh);
73 OBD_FREE(set, sizeof(*set));
77 static void lov_update_set(struct lov_request_set *set,
78 struct lov_request *req, int rc)
88 int lov_update_common_set(struct lov_request_set *set,
89 struct lov_request *req, int rc)
91 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
94 lov_update_set(set, req, rc);
96 /* grace error on inactive ost */
97 if (rc && !lov->tgts[req->rq_idx].active)
100 /* FIXME in raid1 regime, should return 0 */
104 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
106 list_add_tail(&req->rq_link, &set->set_list);
110 int lov_update_enqueue_set(struct lov_request_set *set,
111 struct lov_request *req, int rc, int flags)
113 struct lustre_handle *lov_lockhp;
114 struct lov_oinfo *loi;
117 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
118 loi = &set->set_md->lsm_oinfo[req->rq_stripe];
120 /* XXX FIXME: This unpleasantness doesn't belong here at *all*.
121 * It belongs in the OSC, except that the OSC doesn't have
122 * access to the real LOI -- it gets a copy, that we created
123 * above, and that copy can be arbitrarily out of date.
125 * The LOV API is due for a serious rewriting anyways, and this
126 * can be addressed then. */
127 if (rc == ELDLM_OK) {
128 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
129 __u64 tmp = req->rq_md->lsm_oinfo->loi_rss;
131 LASSERT(lock != NULL);
133 loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
134 loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks;
135 /* Extend KMS up to the end of this lock and no further
136 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
137 if (tmp > lock->l_policy_data.l_extent.end)
138 tmp = lock->l_policy_data.l_extent.end + 1;
139 if (tmp >= loi->loi_kms) {
140 CDEBUG(D_INODE, "lock acquired, setting rss="
141 LPU64", kms="LPU64"\n", loi->loi_rss, tmp);
143 loi->loi_kms_valid = 1;
145 CDEBUG(D_INODE, "lock acquired, setting rss="
146 LPU64"; leaving kms="LPU64", end="LPU64
147 "\n", loi->loi_rss, loi->loi_kms,
148 lock->l_policy_data.l_extent.end);
150 ldlm_lock_allow_match(lock);
152 } else if (rc == ELDLM_LOCK_ABORTED && flags & LDLM_FL_HAS_INTENT) {
153 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
154 loi->loi_rss = req->rq_md->lsm_oinfo->loi_rss;
155 loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
156 loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks;
157 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
158 " kms="LPU64"\n", loi->loi_rss, loi->loi_kms);
161 struct obd_export *exp = set->set_exp;
162 struct lov_obd *lov = &exp->exp_obd->u.lov;
164 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
165 if (lov->tgts[req->rq_idx].active) {
166 CERROR("error: enqueue objid "LPX64" subobj "
167 LPX64" on OST idx %d: rc = %d\n",
168 set->set_md->lsm_object_id, loi->loi_id,
169 loi->loi_ost_idx, rc);
174 lov_update_set(set, req, rc);
178 static int enqueue_done(struct lov_request_set *set, __u32 mode, int flags)
180 struct list_head *pos;
181 struct lov_request *req;
182 struct lustre_handle *lov_lockhp = NULL;
183 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
187 LASSERT(set->set_completes);
188 /* enqueue/match success, just return */
189 if (set->set_completes == set->set_success) {
190 if (flags & LDLM_FL_TEST_LOCK)
191 lov_llh_put(set->set_lockh);
195 /* cancel enqueued/matched locks */
196 list_for_each (pos, &set->set_list) {
197 req = list_entry(pos, struct lov_request, rq_link);
199 if (!req->rq_complete || req->rq_rc)
201 if (flags & LDLM_FL_TEST_LOCK)
204 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
206 if (lov_lockhp->cookie == 0)
209 rc = obd_cancel(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
211 if (rc && lov->tgts[req->rq_idx].active)
212 CERROR("cancelling obdjid "LPX64" on OST "
213 "idx %d error: rc = %d\n",
214 req->rq_md->lsm_object_id, req->rq_idx, rc);
216 lov_llh_put(set->set_lockh);
220 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode)
225 LASSERT(set->set_exp);
228 if (set->set_completes)
229 rc = enqueue_done(set, mode, 0);
231 lov_llh_put(set->set_lockh);
233 if (atomic_dec_and_test(&set->set_refcount))
239 int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm,
240 ldlm_policy_data_t *policy, __u32 mode,
241 struct lustre_handle *lockh,
242 struct lov_request_set **reqset)
244 struct lov_obd *lov = &exp->exp_obd->u.lov;
245 struct lov_request_set *set;
247 struct lov_oinfo *loi;
250 OBD_ALLOC(set, sizeof(*set));
257 set->set_lockh = lov_llh_new(lsm);
258 if (set->set_lockh == NULL)
259 GOTO(out_set, rc = -ENOMEM);
260 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
262 loi = lsm->lsm_oinfo;
263 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
264 struct lov_request *req;
267 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
268 policy->l_extent.end, &start, &end))
271 if (lov->tgts[loi->loi_ost_idx].active == 0) {
272 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
276 OBD_ALLOC(req, sizeof(*req));
278 GOTO(out_set, rc = -ENOMEM);
280 req->rq_buflen = sizeof(*req->rq_md) +
281 sizeof(struct lov_oinfo);
282 OBD_ALLOC(req->rq_md, req->rq_buflen);
283 if (req->rq_md == NULL)
284 GOTO(out_set, rc = -ENOMEM);
286 req->rq_extent.start = start;
287 req->rq_extent.end = end;
289 req->rq_idx = loi->loi_ost_idx;
292 /* XXX LOV STACKING: submd should be from the subobj */
293 req->rq_md->lsm_object_id = loi->loi_id;
294 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
295 req->rq_md->lsm_stripe_count = 0;
296 req->rq_md->lsm_oinfo->loi_kms_valid = loi->loi_kms_valid;
297 req->rq_md->lsm_oinfo->loi_rss = loi->loi_rss;
298 req->rq_md->lsm_oinfo->loi_kms = loi->loi_kms;
299 req->rq_md->lsm_oinfo->loi_blocks = loi->loi_blocks;
300 loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
302 lov_set_add_req(req, set);
305 GOTO(out_set, rc = -EIO);
309 lov_fini_enqueue_set(set, mode);
313 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
317 lov_update_set(set, req, !rc);
321 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
326 LASSERT(set->set_exp);
329 if (set->set_completes)
330 rc = enqueue_done(set, mode, flags);
332 lov_llh_put(set->set_lockh);
334 if (atomic_dec_and_test(&set->set_refcount))
340 int lov_prep_match_set(struct obd_export *exp, struct lov_stripe_md *lsm,
341 ldlm_policy_data_t *policy, __u32 mode,
342 struct lustre_handle *lockh,
343 struct lov_request_set **reqset)
345 struct lov_obd *lov = &exp->exp_obd->u.lov;
346 struct lov_request_set *set;
348 struct lov_oinfo *loi;
351 OBD_ALLOC(set, sizeof(*set));
358 set->set_lockh = lov_llh_new(lsm);
359 if (set->set_lockh == NULL)
360 GOTO(out_set, rc = -ENOMEM);
361 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
363 loi = lsm->lsm_oinfo;
364 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
365 struct lov_request *req;
368 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
369 policy->l_extent.end, &start, &end))
372 /* FIXME raid1 should grace this error */
373 if (lov->tgts[loi->loi_ost_idx].active == 0) {
374 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
375 GOTO(out_set, rc = -EIO);
378 OBD_ALLOC(req, sizeof(*req));
380 GOTO(out_set, rc = -ENOMEM);
382 req->rq_buflen = sizeof(*req->rq_md);
383 OBD_ALLOC(req->rq_md, req->rq_buflen);
384 if (req->rq_md == NULL)
385 GOTO(out_set, rc = -ENOMEM);
387 req->rq_extent.start = start;
388 req->rq_extent.end = end;
390 req->rq_idx = loi->loi_ost_idx;
393 /* XXX LOV STACKING: submd should be from the subobj */
394 req->rq_md->lsm_object_id = loi->loi_id;
395 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
396 req->rq_md->lsm_stripe_count = 0;
397 lov_set_add_req(req, set);
400 GOTO(out_set, rc = -EIO);
404 lov_fini_match_set(set, mode, 0);
408 int lov_fini_cancel_set(struct lov_request_set *set)
413 LASSERT(set->set_exp);
418 lov_llh_put(set->set_lockh);
420 if (atomic_dec_and_test(&set->set_refcount))
426 int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm,
427 __u32 mode, struct lustre_handle *lockh,
428 struct lov_request_set **reqset)
430 struct lov_request_set *set;
432 struct lov_oinfo *loi;
435 OBD_ALLOC(set, sizeof(*set));
442 set->set_lockh = lov_handle2llh(lockh);
443 if (set->set_lockh == NULL) {
444 CERROR("LOV: invalid lov lock handle %p\n", lockh);
445 GOTO(out_set, rc = -EINVAL);
447 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
449 loi = lsm->lsm_oinfo;
450 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
451 struct lov_request *req;
452 struct lustre_handle *lov_lockhp;
454 lov_lockhp = set->set_lockh->llh_handles + i;
455 if (lov_lockhp->cookie == 0) {
456 CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
457 loi->loi_ost_idx, loi->loi_id);
461 OBD_ALLOC(req, sizeof(*req));
463 GOTO(out_set, rc = -ENOMEM);
465 req->rq_buflen = sizeof(*req->rq_md);
466 OBD_ALLOC(req->rq_md, req->rq_buflen);
467 if (req->rq_md == NULL)
468 GOTO(out_set, rc = -ENOMEM);
470 req->rq_idx = loi->loi_ost_idx;
473 /* XXX LOV STACKING: submd should be from the subobj */
474 req->rq_md->lsm_object_id = loi->loi_id;
475 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
476 req->rq_md->lsm_stripe_count = 0;
477 lov_set_add_req(req, set);
480 GOTO(out_set, rc = -EIO);
484 lov_fini_cancel_set(set);
488 static int create_done(struct obd_export *exp, struct lov_request_set *set,
489 struct lov_stripe_md **ea)
491 struct lov_obd *lov = &exp->exp_obd->u.lov;
492 struct obd_trans_info *oti = set->set_oti;
493 struct obdo *src_oa = set->set_oa;
494 struct list_head *pos;
495 struct lov_request *req;
496 struct obdo *ret_oa = NULL;
497 int attrset = 0, rc = 0;
500 LASSERT(set->set_completes);
502 if (!set->set_success)
503 GOTO(cleanup, rc = -EIO);
505 if (*ea == NULL && set->set_count != set->set_success) {
506 set->set_count = set->set_success;
510 ret_oa = obdo_alloc();
512 GOTO(cleanup, rc = -ENOMEM);
514 list_for_each (pos, &set->set_list) {
515 req = list_entry(pos, struct lov_request, rq_link);
516 if (!req->rq_complete || req->rq_rc)
518 lov_merge_attrs(ret_oa, req->rq_oa, req->rq_oa->o_valid,
519 set->set_md, req->rq_stripe, &attrset);
521 if (src_oa->o_valid & OBD_MD_FLSIZE &&
522 ret_oa->o_size != src_oa->o_size) {
523 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
524 src_oa->o_size, ret_oa->o_size);
527 ret_oa->o_id = src_oa->o_id;
528 ret_oa->o_gr = src_oa->o_gr;
529 ret_oa->o_valid |= OBD_MD_FLGROUP;
530 memcpy(src_oa, ret_oa, sizeof(*src_oa));
538 list_for_each (pos, &set->set_list) {
539 struct obd_export *sub_exp;
541 req = list_entry(pos, struct lov_request, rq_link);
543 if (!req->rq_complete || req->rq_rc)
546 sub_exp = lov->tgts[req->rq_idx].ltd_exp,
547 err = obd_destroy(sub_exp, req->rq_oa, NULL, oti);
549 CERROR("Failed to uncreate objid "LPX64" subobj "
550 LPX64" on OST idx %d: rc = %d\n",
551 set->set_oa->o_id, req->rq_oa->o_id,
555 obd_free_memmd(exp, &set->set_md);
557 if (oti && set->set_cookies) {
558 oti->oti_logcookies = set->set_cookies;
559 if (!set->set_cookie_sent) {
560 oti_free_cookies(oti);
561 src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
563 src_oa->o_valid |= OBD_MD_FLCOOKIE;
569 int lov_fini_create_set(struct lov_request_set *set, struct lov_stripe_md **ea)
574 LASSERT(set->set_exp);
577 if (set->set_completes) {
578 rc = create_done(set->set_exp, set, ea);
579 /* FIXME update qos data here */
582 if (atomic_dec_and_test(&set->set_refcount))
588 int lov_update_create_set(struct lov_request_set *set,
589 struct lov_request *req, int rc)
591 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
592 struct obd_trans_info *oti = set->set_oti;
593 struct lov_stripe_md *lsm = set->set_md;
594 struct lov_oinfo *loi;
597 req->rq_stripe = set->set_success;
598 loi = &lsm->lsm_oinfo[req->rq_stripe];
600 if (rc && lov->tgts[req->rq_idx].active) {
601 CERROR("error creating objid "LPX64" sub-object"
602 " on OST idx %d/%d: rc = %d\n",
603 set->set_oa->o_id, req->rq_idx,
604 lsm->lsm_stripe_count, rc);
606 CERROR("obd_create returned invalid err %d\n", rc);
610 lov_update_set(set, req, rc);
614 if (oti && oti->oti_objid)
615 oti->oti_objid[req->rq_idx] = req->rq_oa->o_id;
617 loi->loi_id = req->rq_oa->o_id;
618 loi->loi_gr = req->rq_oa->o_gr;
619 loi->loi_ost_idx = req->rq_idx;
620 CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPX64" at idx %d\n",
621 lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
624 if (set->set_cookies)
625 ++oti->oti_logcookies;
626 if (req->rq_oa->o_valid & OBD_MD_FLCOOKIE)
627 set->set_cookie_sent++;
632 int lov_prep_create_set(struct obd_export *exp, struct lov_stripe_md **ea,
633 struct obdo *src_oa, struct obd_trans_info *oti,
634 struct lov_request_set **reqset)
636 struct lov_obd *lov = &exp->exp_obd->u.lov;
637 struct lov_request_set *set;
638 int rc = 0, newea = 0;
641 OBD_ALLOC(set, sizeof(*set));
648 set->set_oa = src_oa;
651 if (set->set_md == NULL) {
652 int stripes, stripe_cnt;
653 stripe_cnt = lov_get_stripecnt(lov, 0);
655 /* If the MDS file was truncated up to some size, stripe over
656 * enough OSTs to allow the file to be created at that size. */
657 if (src_oa->o_valid & OBD_MD_FLSIZE) {
658 stripes = ((src_oa->o_size + LUSTRE_STRIPE_MAXBYTES) >> 12) - 1;
659 do_div(stripes, (__u32)(LUSTRE_STRIPE_MAXBYTES >> 12));
661 if (stripes > lov->desc.ld_active_tgt_count)
662 GOTO(out_set, rc = -EFBIG);
663 if (stripes < stripe_cnt)
664 stripes = stripe_cnt;
666 stripes = stripe_cnt;
669 rc = lov_alloc_memmd(&set->set_md, stripes,
670 lov->desc.ld_pattern ?
671 lov->desc.ld_pattern : LOV_PATTERN_RAID0);
677 rc = qos_prep_create(lov, set, newea);
681 if (oti && (src_oa->o_valid & OBD_MD_FLCOOKIE)) {
682 oti_alloc_cookies(oti, set->set_count);
683 if (!oti->oti_logcookies)
685 set->set_cookies = oti->oti_logcookies;
692 obd_free_memmd(exp, &set->set_md);
694 lov_fini_create_set(set, ea);
698 static int common_attr_done(struct lov_request_set *set)
700 struct list_head *pos;
701 struct lov_request *req;
703 int rc = 0, attrset = 0;
706 if (set->set_oa == NULL)
709 if (!set->set_success)
712 tmp_oa = obdo_alloc();
714 GOTO(out, rc = -ENOMEM);
716 list_for_each (pos, &set->set_list) {
717 req = list_entry(pos, struct lov_request, rq_link);
719 if (!req->rq_complete || req->rq_rc)
721 if (req->rq_oa->o_valid == 0) /* inactive stripe */
723 lov_merge_attrs(tmp_oa, req->rq_oa, req->rq_oa->o_valid,
724 set->set_md, req->rq_stripe, &attrset);
727 CERROR("No stripes had valid attrs\n");
730 tmp_oa->o_id = set->set_oa->o_id;
731 memcpy(set->set_oa, tmp_oa, sizeof(*set->set_oa));
739 static int brw_done(struct lov_request_set *set)
741 struct lov_stripe_md *lsm = set->set_md;
742 struct lov_oinfo *loi = NULL;
743 struct list_head *pos;
744 struct lov_request *req;
747 list_for_each (pos, &set->set_list) {
748 req = list_entry(pos, struct lov_request, rq_link);
750 if (!req->rq_complete || req->rq_rc)
753 loi = &lsm->lsm_oinfo[req->rq_stripe];
755 if (req->rq_oa->o_valid & OBD_MD_FLBLOCKS)
756 loi->loi_blocks = req->rq_oa->o_blocks;
762 int lov_fini_brw_set(struct lov_request_set *set)
767 LASSERT(set->set_exp);
770 if (set->set_completes) {
772 /* FIXME update qos data here */
774 if (atomic_dec_and_test(&set->set_refcount))
780 int lov_prep_brw_set(struct obd_export *exp, struct obdo *src_oa,
781 struct lov_stripe_md *lsm, obd_count oa_bufs,
782 struct brw_page *pga, struct obd_trans_info *oti,
783 struct lov_request_set **reqset)
790 struct lov_request_set *set;
791 struct lov_oinfo *loi = NULL;
792 struct lov_obd *lov = &exp->exp_obd->u.lov;
793 int rc = 0, i, shift;
796 OBD_ALLOC(set, sizeof(*set));
803 set->set_oa = src_oa;
805 set->set_oabufs = oa_bufs;
806 OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
808 GOTO(out, rc = -ENOMEM);
810 OBD_ALLOC(info, sizeof(*info) * lsm->lsm_stripe_count);
812 GOTO(out, rc = -ENOMEM);
814 /* calculate the page count for each stripe */
815 for (i = 0; i < oa_bufs; i++) {
816 int stripe = lov_stripe_number(lsm, pga[i].disk_offset);
817 info[stripe].count++;
820 /* alloc and initialize lov request */
821 loi = lsm->lsm_oinfo;
823 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
824 struct lov_request *req;
826 if (info[i].count == 0)
829 if (lov->tgts[loi->loi_ost_idx].active == 0) {
830 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
831 GOTO(out, rc = -EIO);
834 OBD_ALLOC(req, sizeof(*req));
836 GOTO(out, rc = -ENOMEM);
838 req->rq_oa = obdo_alloc();
839 if (req->rq_oa == NULL)
840 GOTO(out, rc = -ENOMEM);
843 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
844 req->rq_oa->o_id = loi->loi_id;
845 req->rq_buflen = sizeof(*req->rq_md);
846 OBD_ALLOC(req->rq_md, req->rq_buflen);
847 if (req->rq_md == NULL)
848 GOTO(out, rc = -ENOMEM);
850 req->rq_idx = loi->loi_ost_idx;
853 /* XXX LOV STACKING */
854 req->rq_md->lsm_object_id = loi->loi_id;
855 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
856 req->rq_oabufs = info[i].count;
857 req->rq_pgaidx = shift;
858 shift += req->rq_oabufs;
860 /* remember the index for sort brw_page array */
861 info[i].index = req->rq_pgaidx;
862 lov_set_add_req(req, set);
865 GOTO(out, rc = -EIO);
867 /* rotate & sort the brw_page array */
868 for (i = 0; i < oa_bufs; i++) {
869 int stripe = lov_stripe_number(lsm, pga[i].disk_offset);
871 shift = info[stripe].index + info[stripe].off;
872 LASSERT(shift < oa_bufs);
873 set->set_pga[shift] = pga[i];
874 lov_stripe_offset(lsm, pga[i].disk_offset, stripe,
875 &set->set_pga[shift].disk_offset);
880 OBD_FREE(info, sizeof(*info) * lsm->lsm_stripe_count);
885 lov_fini_brw_set(set);
890 static int getattr_done(struct lov_request_set *set)
892 return common_attr_done(set);
895 int lov_fini_getattr_set(struct lov_request_set *set)
900 LASSERT(set->set_exp);
903 if (set->set_completes)
904 rc = getattr_done(set);
906 if (atomic_dec_and_test(&set->set_refcount))
912 int lov_prep_getattr_set(struct obd_export *exp, struct obdo *src_oa,
913 struct lov_stripe_md *lsm,
914 struct lov_request_set **reqset)
916 struct lov_request_set *set;
917 struct lov_oinfo *loi = NULL;
918 struct lov_obd *lov = &exp->exp_obd->u.lov;
922 OBD_ALLOC(set, sizeof(*set));
929 set->set_oa = src_oa;
931 loi = lsm->lsm_oinfo;
932 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
933 struct lov_request *req;
935 if (lov->tgts[loi->loi_ost_idx].active == 0) {
936 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
940 OBD_ALLOC(req, sizeof(*req));
942 GOTO(out_set, rc = -ENOMEM);
945 req->rq_idx = loi->loi_ost_idx;
947 req->rq_oa = obdo_alloc();
948 if (req->rq_oa == NULL)
949 GOTO(out_set, rc = -ENOMEM);
950 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
951 req->rq_oa->o_id = loi->loi_id;
952 lov_set_add_req(req, set);
955 GOTO(out_set, rc = -EIO);
959 lov_fini_getattr_set(set);
963 int lov_fini_destroy_set(struct lov_request_set *set)
967 LASSERT(set->set_exp);
970 if (set->set_completes) {
971 /* FIXME update qos data here */
974 if (atomic_dec_and_test(&set->set_refcount))
980 int lov_prep_destroy_set(struct obd_export *exp, struct obdo *src_oa,
981 struct lov_stripe_md *lsm,
982 struct obd_trans_info *oti,
983 struct lov_request_set **reqset)
985 struct lov_request_set *set;
986 struct lov_oinfo *loi = NULL;
987 struct lov_obd *lov = &exp->exp_obd->u.lov;
988 int rc = 0, cookie_set = 0, i;
991 OBD_ALLOC(set, sizeof(*set));
998 set->set_oa = src_oa;
1000 if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1001 set->set_cookies = oti->oti_logcookies;
1003 loi = lsm->lsm_oinfo;
1004 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1005 struct lov_request *req;
1007 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1008 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1012 OBD_ALLOC(req, sizeof(*req));
1014 GOTO(out_set, rc = -ENOMEM);
1017 req->rq_idx = loi->loi_ost_idx;
1019 req->rq_oa = obdo_alloc();
1020 if (req->rq_oa == NULL)
1021 GOTO(out_set, rc = -ENOMEM);
1022 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1023 req->rq_oa->o_id = loi->loi_id;
1025 /* Setup the first request's cookie position */
1026 if (!cookie_set && set->set_cookies) {
1027 oti->oti_logcookies = set->set_cookies + i;
1030 lov_set_add_req(req, set);
1032 if (!set->set_count)
1033 GOTO(out_set, rc = -EIO);
1037 lov_fini_destroy_set(set);
1041 static int setattr_done(struct lov_request_set *set)
1043 return common_attr_done(set);
1046 int lov_fini_setattr_set(struct lov_request_set *set)
1051 LASSERT(set->set_exp);
1054 if (set->set_completes) {
1055 rc = setattr_done(set);
1056 /* FIXME update qos data here */
1059 if (atomic_dec_and_test(&set->set_refcount))
1060 lov_finish_set(set);
1064 int lov_prep_setattr_set(struct obd_export *exp, struct obdo *src_oa,
1065 struct lov_stripe_md *lsm, struct obd_trans_info *oti,
1066 struct lov_request_set **reqset)
1068 struct lov_request_set *set;
1069 struct lov_oinfo *loi = NULL;
1070 struct lov_obd *lov = &exp->exp_obd->u.lov;
1074 OBD_ALLOC(set, sizeof(*set));
1081 set->set_oa = src_oa;
1083 loi = lsm->lsm_oinfo;
1084 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1085 struct lov_request *req;
1087 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1088 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1092 OBD_ALLOC(req, sizeof(*req));
1094 GOTO(out_set, rc = -ENOMEM);
1096 req->rq_idx = loi->loi_ost_idx;
1098 req->rq_oa = obdo_alloc();
1099 if (req->rq_oa == NULL)
1100 GOTO(out_set, rc = -ENOMEM);
1101 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1102 req->rq_oa->o_id = loi->loi_id;
1103 LASSERT(!(req->rq_oa->o_valid & OBD_MD_FLGROUP) || req->rq_oa->o_gr>0);
1105 if (src_oa->o_valid & OBD_MD_FLSIZE) {
1106 if (lov_stripe_offset(lsm, src_oa->o_size, i,
1107 &req->rq_oa->o_size) < 0 &&
1109 req->rq_oa->o_size--;
1110 CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1111 i, req->rq_oa->o_size, src_oa->o_size);
1113 lov_set_add_req(req, set);
1115 if (!set->set_count)
1116 GOTO(out_set, rc = -EIO);
1120 lov_fini_setattr_set(set);
1124 int lov_update_punch_set(struct lov_request_set *set, struct lov_request *req,
1127 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
1130 lov_update_set(set, req, rc);
1131 if (rc && !lov->tgts[req->rq_idx].active)
1133 /* FIXME in raid1 regime, should return 0 */
1137 int lov_fini_punch_set(struct lov_request_set *set)
1142 LASSERT(set->set_exp);
1145 if (set->set_completes) {
1146 if (!set->set_success)
1148 /* FIXME update qos data here */
1151 if (atomic_dec_and_test(&set->set_refcount))
1152 lov_finish_set(set);
1157 int lov_prep_punch_set(struct obd_export *exp, struct obdo *src_oa,
1158 struct lov_stripe_md *lsm, obd_off start,
1159 obd_off end, struct obd_trans_info *oti,
1160 struct lov_request_set **reqset)
1162 struct lov_request_set *set;
1163 struct lov_oinfo *loi = NULL;
1164 struct lov_obd *lov = &exp->exp_obd->u.lov;
1168 OBD_ALLOC(set, sizeof(*set));
1175 set->set_oa = src_oa;
1177 loi = lsm->lsm_oinfo;
1178 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1179 struct lov_request *req;
1182 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1183 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1187 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1190 OBD_ALLOC(req, sizeof(*req));
1192 GOTO(out_set, rc = -ENOMEM);
1194 req->rq_idx = loi->loi_ost_idx;
1196 req->rq_oa = obdo_alloc();
1197 if (req->rq_oa == NULL)
1198 GOTO(out_set, rc = -ENOMEM);
1199 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1200 req->rq_oa->o_id = loi->loi_id;
1201 req->rq_oa->o_gr = loi->loi_gr;
1202 req->rq_oa->o_valid |= OBD_MD_FLGROUP;
1204 req->rq_extent.start = rs;
1205 req->rq_extent.end = re;
1207 lov_set_add_req(req, set);
1209 if (!set->set_count)
1210 GOTO(out_set, rc = -EIO);
1214 lov_fini_punch_set(set);
1218 int lov_fini_sync_set(struct lov_request_set *set)
1223 LASSERT(set->set_exp);
1226 if (set->set_completes) {
1227 if (!set->set_success)
1229 /* FIXME update qos data here */
1232 if (atomic_dec_and_test(&set->set_refcount))
1233 lov_finish_set(set);
1238 int lov_prep_sync_set(struct obd_export *exp, struct obdo *src_oa,
1239 struct lov_stripe_md *lsm, obd_off start,
1240 obd_off end, struct lov_request_set **reqset)
1242 struct lov_request_set *set;
1243 struct lov_oinfo *loi = NULL;
1244 struct lov_obd *lov = &exp->exp_obd->u.lov;
1248 OBD_ALLOC(set, sizeof(*set));
1255 set->set_oa = src_oa;
1257 loi = lsm->lsm_oinfo;
1258 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1259 struct lov_request *req;
1262 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1263 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1267 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1270 OBD_ALLOC(req, sizeof(*req));
1272 GOTO(out_set, rc = -ENOMEM);
1274 req->rq_idx = loi->loi_ost_idx;
1276 req->rq_oa = obdo_alloc();
1277 if (req->rq_oa == NULL)
1278 GOTO(out_set, rc = -ENOMEM);
1279 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1280 req->rq_oa->o_id = loi->loi_id;
1281 req->rq_extent.start = rs;
1282 req->rq_extent.end = re;
1283 lov_set_add_req(req, set);
1285 if (!set->set_count)
1286 GOTO(out_set, rc = -EIO);
1290 lov_fini_sync_set(set);