1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
6 * This file is part of Lustre, http://www.lustre.org.
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 # define EXPORT_SYMTAB
25 #define DEBUG_SUBSYSTEM S_LOV
28 #include <asm/div64.h>
30 #include <liblustre.h>
33 #include <linux/obd_class.h>
34 #include <linux/obd_lov.h>
35 #include <linux/lustre_idl.h>
37 #include "lov_internal.h"
39 static void lov_init_set(struct lov_request_set *set)
42 set->set_completes = 0;
44 INIT_LIST_HEAD(&set->set_list);
45 atomic_set(&set->set_refcount, 1);
48 static void lov_finish_set(struct lov_request_set *set)
50 struct list_head *pos, *n;
54 list_for_each_safe(pos, n, &set->set_list) {
55 struct lov_request *req = list_entry(pos, struct lov_request,
57 list_del_init(&req->rq_link);
60 obdo_free(req->rq_oa);
62 OBD_FREE(req->rq_md, req->rq_buflen);
63 OBD_FREE(req, sizeof(*req));
67 int len = set->set_oabufs * sizeof(*set->set_pga);
68 OBD_FREE(set->set_pga, len);
71 lov_llh_put(set->set_lockh);
73 OBD_FREE(set, sizeof(*set));
77 static void lov_update_set(struct lov_request_set *set,
78 struct lov_request *req, int rc)
88 int lov_update_common_set(struct lov_request_set *set,
89 struct lov_request *req, int rc)
91 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
94 lov_update_set(set, req, rc);
96 /* grace error on inactive ost */
97 if (rc && !lov->tgts[req->rq_idx].active)
100 /* FIXME in raid1 regime, should return 0 */
104 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
106 list_add_tail(&req->rq_link, &set->set_list);
110 int lov_update_enqueue_set(struct lov_request_set *set,
111 struct lov_request *req, int rc, int flags)
113 struct lustre_handle *lov_lockhp;
114 struct lov_oinfo *loi;
117 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
118 loi = &set->set_md->lsm_oinfo[req->rq_stripe];
120 /* XXX FIXME: This unpleasantness doesn't belong here at *all*.
121 * It belongs in the OSC, except that the OSC doesn't have
122 * access to the real LOI -- it gets a copy, that we created
123 * above, and that copy can be arbitrarily out of date.
125 * The LOV API is due for a serious rewriting anyways, and this
126 * can be addressed then. */
127 if (rc == ELDLM_OK) {
128 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
129 __u64 tmp = req->rq_md->lsm_oinfo->loi_rss;
131 LASSERT(lock != NULL);
133 loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
134 loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks;
135 /* Extend KMS up to the end of this lock and no further
136 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
137 if (tmp > lock->l_policy_data.l_extent.end)
138 tmp = lock->l_policy_data.l_extent.end + 1;
139 if (tmp >= loi->loi_kms) {
140 LDLM_DEBUG(lock, "lock acquired, setting rss="
141 LPU64", kms="LPU64, loi->loi_rss, tmp);
143 loi->loi_kms_valid = 1;
145 LDLM_DEBUG(lock, "lock acquired, setting rss="
146 LPU64"; leaving kms="LPU64", end="LPU64,
147 loi->loi_rss, loi->loi_kms,
148 lock->l_policy_data.l_extent.end);
150 ldlm_lock_allow_match(lock);
152 } else if (rc == ELDLM_LOCK_ABORTED && flags & LDLM_FL_HAS_INTENT) {
153 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
154 loi->loi_rss = req->rq_md->lsm_oinfo->loi_rss;
155 loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
156 loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks;
157 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
158 " kms="LPU64"\n", loi->loi_rss, loi->loi_kms);
161 struct obd_export *exp = set->set_exp;
162 struct lov_obd *lov = &exp->exp_obd->u.lov;
164 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
165 if (lov->tgts[req->rq_idx].active) {
166 CERROR("error: enqueue objid "LPX64" subobj "
167 LPX64" on OST idx %d: rc = %d\n",
168 set->set_md->lsm_object_id, loi->loi_id,
169 loi->loi_ost_idx, rc);
174 lov_update_set(set, req, rc);
178 static int enqueue_done(struct lov_request_set *set, __u32 mode)
180 struct lov_request *req;
181 struct lustre_handle *lov_lockhp = NULL;
182 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
186 LASSERT(set->set_completes);
187 /* enqueue/match success, just return */
188 if (set->set_completes == set->set_success)
191 /* cancel enqueued/matched locks */
192 list_for_each_entry(req, &set->set_list, rq_link) {
193 if (!req->rq_complete || req->rq_rc)
196 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
198 if (lov_lockhp->cookie == 0)
201 rc = obd_cancel(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
203 if (rc && lov->tgts[req->rq_idx].active)
204 CERROR("cancelling obdjid "LPX64" on OST "
205 "idx %d error: rc = %d\n",
206 req->rq_md->lsm_object_id, req->rq_idx, rc);
208 lov_llh_put(set->set_lockh);
212 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode)
219 LASSERT(set->set_exp);
220 if (set->set_completes)
221 rc = enqueue_done(set, mode);
223 lov_llh_put(set->set_lockh);
225 if (atomic_dec_and_test(&set->set_refcount))
231 int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm,
232 ldlm_policy_data_t *policy, __u32 mode,
233 struct lustre_handle *lockh,
234 struct lov_request_set **reqset)
236 struct lov_obd *lov = &exp->exp_obd->u.lov;
237 struct lov_request_set *set;
239 struct lov_oinfo *loi;
242 OBD_ALLOC(set, sizeof(*set));
249 set->set_lockh = lov_llh_new(lsm);
250 if (set->set_lockh == NULL)
251 GOTO(out_set, rc = -ENOMEM);
252 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
254 loi = lsm->lsm_oinfo;
255 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
256 struct lov_request *req;
259 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
260 policy->l_extent.end, &start, &end))
263 if (lov->tgts[loi->loi_ost_idx].active == 0) {
264 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
268 OBD_ALLOC(req, sizeof(*req));
270 GOTO(out_set, rc = -ENOMEM);
272 req->rq_buflen = sizeof(*req->rq_md) +
273 sizeof(struct lov_oinfo);
274 OBD_ALLOC(req->rq_md, req->rq_buflen);
275 if (req->rq_md == NULL)
276 GOTO(out_set, rc = -ENOMEM);
278 req->rq_extent.start = start;
279 req->rq_extent.end = end;
281 req->rq_idx = loi->loi_ost_idx;
284 /* XXX LOV STACKING: submd should be from the subobj */
285 req->rq_md->lsm_object_id = loi->loi_id;
286 req->rq_md->lsm_stripe_count = 0;
287 req->rq_md->lsm_oinfo->loi_kms_valid = loi->loi_kms_valid;
288 req->rq_md->lsm_oinfo->loi_rss = loi->loi_rss;
289 req->rq_md->lsm_oinfo->loi_kms = loi->loi_kms;
290 req->rq_md->lsm_oinfo->loi_blocks = loi->loi_blocks;
291 req->rq_md->lsm_oinfo->loi_mtime = loi->loi_mtime;
293 lov_set_add_req(req, set);
296 GOTO(out_set, rc = -EIO);
300 lov_fini_enqueue_set(set, mode);
304 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
312 lov_update_set(set, req, ret);
316 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
323 LASSERT(set->set_exp);
324 if (set->set_completes) {
325 if (set->set_count == set->set_success &&
326 flags & LDLM_FL_TEST_LOCK)
327 lov_llh_put(set->set_lockh);
328 rc = enqueue_done(set, mode);
330 lov_llh_put(set->set_lockh);
333 if (atomic_dec_and_test(&set->set_refcount))
339 int lov_prep_match_set(struct obd_export *exp, struct lov_stripe_md *lsm,
340 ldlm_policy_data_t *policy, __u32 mode,
341 struct lustre_handle *lockh,
342 struct lov_request_set **reqset)
344 struct lov_obd *lov = &exp->exp_obd->u.lov;
345 struct lov_request_set *set;
347 struct lov_oinfo *loi;
350 OBD_ALLOC(set, sizeof(*set));
357 set->set_lockh = lov_llh_new(lsm);
358 if (set->set_lockh == NULL)
359 GOTO(out_set, rc = -ENOMEM);
360 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
362 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
363 struct lov_request *req;
366 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
367 policy->l_extent.end, &start, &end))
370 /* FIXME raid1 should grace this error */
371 if (lov->tgts[loi->loi_ost_idx].active == 0) {
372 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
373 GOTO(out_set, rc = -EIO);
376 OBD_ALLOC(req, sizeof(*req));
378 GOTO(out_set, rc = -ENOMEM);
380 req->rq_buflen = sizeof(*req->rq_md);
381 OBD_ALLOC(req->rq_md, req->rq_buflen);
382 if (req->rq_md == NULL)
383 GOTO(out_set, rc = -ENOMEM);
385 req->rq_extent.start = start;
386 req->rq_extent.end = end;
388 req->rq_idx = loi->loi_ost_idx;
391 /* XXX LOV STACKING: submd should be from the subobj */
392 req->rq_md->lsm_object_id = loi->loi_id;
393 req->rq_md->lsm_stripe_count = 0;
395 lov_set_add_req(req, set);
398 GOTO(out_set, rc = -EIO);
402 lov_fini_match_set(set, mode, 0);
406 int lov_fini_cancel_set(struct lov_request_set *set)
411 LASSERT(set->set_exp);
416 lov_llh_put(set->set_lockh);
418 if (atomic_dec_and_test(&set->set_refcount))
424 int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm,
425 __u32 mode, struct lustre_handle *lockh,
426 struct lov_request_set **reqset)
428 struct lov_request_set *set;
430 struct lov_oinfo *loi;
433 OBD_ALLOC(set, sizeof(*set));
440 set->set_lockh = lov_handle2llh(lockh);
441 if (set->set_lockh == NULL) {
442 CERROR("LOV: invalid lov lock handle %p\n", lockh);
443 GOTO(out_set, rc = -EINVAL);
445 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
447 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
448 struct lov_request *req;
449 struct lustre_handle *lov_lockhp;
451 lov_lockhp = set->set_lockh->llh_handles + i;
452 if (lov_lockhp->cookie == 0) {
453 CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
454 loi->loi_ost_idx, loi->loi_id);
458 OBD_ALLOC(req, sizeof(*req));
460 GOTO(out_set, rc = -ENOMEM);
462 req->rq_buflen = sizeof(*req->rq_md);
463 OBD_ALLOC(req->rq_md, req->rq_buflen);
464 if (req->rq_md == NULL)
465 GOTO(out_set, rc = -ENOMEM);
467 req->rq_idx = loi->loi_ost_idx;
470 /* XXX LOV STACKING: submd should be from the subobj */
471 req->rq_md->lsm_object_id = loi->loi_id;
472 req->rq_md->lsm_stripe_count = 0;
474 lov_set_add_req(req, set);
477 GOTO(out_set, rc = -EIO);
481 lov_fini_cancel_set(set);
485 static int create_done(struct obd_export *exp, struct lov_request_set *set,
486 struct lov_stripe_md **lsmp)
488 struct lov_obd *lov = &exp->exp_obd->u.lov;
489 struct obd_trans_info *oti = set->set_oti;
490 struct obdo *src_oa = set->set_oa;
491 struct lov_request *req;
492 struct obdo *ret_oa = NULL;
493 int attrset = 0, rc = 0;
496 LASSERT(set->set_completes);
498 if (!set->set_success)
499 GOTO(cleanup, rc = -EIO);
500 if (*lsmp == NULL && set->set_count != set->set_success) {
501 set->set_count = set->set_success;
505 ret_oa = obdo_alloc();
507 GOTO(cleanup, rc = -ENOMEM);
509 list_for_each_entry(req, &set->set_list, rq_link) {
510 if (!req->rq_complete || req->rq_rc)
512 lov_merge_attrs(ret_oa, req->rq_oa, req->rq_oa->o_valid,
513 set->set_md, req->rq_stripe, &attrset);
515 if (src_oa->o_valid & OBD_MD_FLSIZE &&
516 ret_oa->o_size != src_oa->o_size) {
517 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
518 src_oa->o_size, ret_oa->o_size);
521 ret_oa->o_id = src_oa->o_id;
522 memcpy(src_oa, ret_oa, sizeof(*src_oa));
529 list_for_each_entry(req, &set->set_list, rq_link) {
530 struct obd_export *sub_exp;
533 if (!req->rq_complete || req->rq_rc)
536 sub_exp = lov->tgts[req->rq_idx].ltd_exp,
537 err = obd_destroy(sub_exp, req->rq_oa, NULL, oti);
539 CERROR("Failed to uncreate objid "LPX64" subobj "
540 LPX64" on OST idx %d: rc = %d\n",
541 set->set_oa->o_id, req->rq_oa->o_id,
545 obd_free_memmd(exp, &set->set_md);
547 if (oti && set->set_cookies) {
548 oti->oti_logcookies = set->set_cookies;
549 if (!set->set_cookie_sent) {
550 oti_free_cookies(oti);
551 src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
553 src_oa->o_valid |= OBD_MD_FLCOOKIE;
559 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
564 LASSERT(set->set_exp);
567 if (set->set_completes) {
568 rc = create_done(set->set_exp, set, lsmp);
569 /* FIXME update qos data here */
572 if (atomic_dec_and_test(&set->set_refcount))
578 int lov_update_create_set(struct lov_request_set *set,
579 struct lov_request *req, int rc)
581 struct obd_trans_info *oti = set->set_oti;
582 struct lov_stripe_md *lsm = set->set_md;
583 struct lov_oinfo *loi;
584 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
587 req->rq_stripe = set->set_success;
588 loi = &lsm->lsm_oinfo[req->rq_stripe];
590 if (rc && lov->tgts[req->rq_idx].active) {
591 CERROR("error creating objid "LPX64" sub-object"
592 " on OST idx %d/%d: rc = %d\n",
593 set->set_oa->o_id, req->rq_idx,
594 lsm->lsm_stripe_count, rc);
596 CERROR("obd_create returned invalid err %d\n", rc);
600 lov_update_set(set, req, rc);
604 if (oti && oti->oti_objid)
605 oti->oti_objid[req->rq_idx] = req->rq_oa->o_id;
607 loi->loi_id = req->rq_oa->o_id;
608 loi->loi_ost_idx = req->rq_idx;
609 CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPX64" at idx %d\n",
610 lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
613 if (set->set_cookies)
614 ++oti->oti_logcookies;
615 if (req->rq_oa->o_valid & OBD_MD_FLCOOKIE)
616 set->set_cookie_sent++;
621 int lov_prep_create_set(struct obd_export *exp, struct lov_stripe_md **lsmp,
622 struct obdo *src_oa, struct obd_trans_info *oti,
623 struct lov_request_set **reqset)
625 struct lov_obd *lov = &exp->exp_obd->u.lov;
626 struct lov_request_set *set;
627 int rc = 0, newea = 0;
630 OBD_ALLOC(set, sizeof(*set));
637 set->set_oa = src_oa;
640 if (set->set_md == NULL) {
641 int stripes, stripe_cnt;
642 stripe_cnt = lov_get_stripecnt(lov, 0);
644 /* If the MDS file was truncated up to some size, stripe over
645 * enough OSTs to allow the file to be created at that size. */
646 if (src_oa->o_valid & OBD_MD_FLSIZE) {
647 stripes=((src_oa->o_size+LUSTRE_STRIPE_MAXBYTES)>>12)-1;
648 do_div(stripes, (__u32)(LUSTRE_STRIPE_MAXBYTES >> 12));
650 if (stripes > lov->desc.ld_active_tgt_count)
651 GOTO(out_set, rc = -EFBIG);
652 if (stripes < stripe_cnt)
653 stripes = stripe_cnt;
655 stripes = stripe_cnt;
658 rc = lov_alloc_memmd(&set->set_md, stripes,
659 lov->desc.ld_pattern ?
660 lov->desc.ld_pattern : LOV_PATTERN_RAID0);
666 rc = qos_prep_create(lov, set, newea);
670 if (oti && (src_oa->o_valid & OBD_MD_FLCOOKIE)) {
671 oti_alloc_cookies(oti, set->set_count);
672 if (!oti->oti_logcookies)
674 set->set_cookies = oti->oti_logcookies;
681 obd_free_memmd(exp, &set->set_md);
683 lov_fini_create_set(set, lsmp);
687 static int common_attr_done(struct lov_request_set *set)
689 struct list_head *pos;
690 struct lov_request *req;
692 int rc = 0, attrset = 0;
695 if (set->set_oa == NULL)
698 if (!set->set_success)
701 tmp_oa = obdo_alloc();
703 GOTO(out, rc = -ENOMEM);
705 list_for_each (pos, &set->set_list) {
706 req = list_entry(pos, struct lov_request, rq_link);
708 if (!req->rq_complete || req->rq_rc)
710 if (req->rq_oa->o_valid == 0) /* inactive stripe */
712 lov_merge_attrs(tmp_oa, req->rq_oa, req->rq_oa->o_valid,
713 set->set_md, req->rq_stripe, &attrset);
716 CERROR("No stripes had valid attrs\n");
719 tmp_oa->o_id = set->set_oa->o_id;
720 memcpy(set->set_oa, tmp_oa, sizeof(*set->set_oa));
728 static int brw_done(struct lov_request_set *set)
730 struct lov_stripe_md *lsm = set->set_md;
731 struct lov_oinfo *loi = NULL;
732 struct list_head *pos;
733 struct lov_request *req;
736 list_for_each (pos, &set->set_list) {
737 req = list_entry(pos, struct lov_request, rq_link);
739 if (!req->rq_complete || req->rq_rc)
742 loi = &lsm->lsm_oinfo[req->rq_stripe];
744 if (req->rq_oa->o_valid & OBD_MD_FLBLOCKS)
745 loi->loi_blocks = req->rq_oa->o_blocks;
751 int lov_fini_brw_set(struct lov_request_set *set)
756 LASSERT(set->set_exp);
759 if (set->set_completes) {
761 /* FIXME update qos data here */
763 if (atomic_dec_and_test(&set->set_refcount))
769 int lov_prep_brw_set(struct obd_export *exp, struct obdo *src_oa,
770 struct lov_stripe_md *lsm, obd_count oa_bufs,
771 struct brw_page *pga, struct obd_trans_info *oti,
772 struct lov_request_set **reqset)
779 struct lov_request_set *set;
780 struct lov_oinfo *loi = NULL;
781 struct lov_obd *lov = &exp->exp_obd->u.lov;
782 int rc = 0, i, shift;
785 OBD_ALLOC(set, sizeof(*set));
792 set->set_oa = src_oa;
794 set->set_oabufs = oa_bufs;
795 OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
797 GOTO(out, rc = -ENOMEM);
799 OBD_ALLOC(info, sizeof(*info) * lsm->lsm_stripe_count);
801 GOTO(out, rc = -ENOMEM);
803 /* calculate the page count for each stripe */
804 for (i = 0; i < oa_bufs; i++) {
805 int stripe = lov_stripe_number(lsm, pga[i].off);
806 info[stripe].count++;
809 /* alloc and initialize lov request */
811 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
812 struct lov_request *req;
814 if (info[i].count == 0)
817 if (lov->tgts[loi->loi_ost_idx].active == 0) {
818 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
819 GOTO(out, rc = -EIO);
822 OBD_ALLOC(req, sizeof(*req));
824 GOTO(out, rc = -ENOMEM);
826 req->rq_oa = obdo_alloc();
827 if (req->rq_oa == NULL)
828 GOTO(out, rc = -ENOMEM);
831 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
832 req->rq_oa->o_id = loi->loi_id;
834 req->rq_buflen = sizeof(*req->rq_md);
835 OBD_ALLOC(req->rq_md, req->rq_buflen);
836 if (req->rq_md == NULL)
837 GOTO(out, rc = -ENOMEM);
839 req->rq_idx = loi->loi_ost_idx;
842 /* XXX LOV STACKING */
843 req->rq_md->lsm_object_id = loi->loi_id;
844 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
845 req->rq_oabufs = info[i].count;
846 req->rq_pgaidx = shift;
847 shift += req->rq_oabufs;
849 /* remember the index for sort brw_page array */
850 info[i].index = req->rq_pgaidx;
852 lov_set_add_req(req, set);
855 GOTO(out, rc = -EIO);
857 /* rotate & sort the brw_page array */
858 for (i = 0; i < oa_bufs; i++) {
859 int stripe = lov_stripe_number(lsm, pga[i].off);
861 shift = info[stripe].index + info[stripe].off;
862 LASSERT(shift < oa_bufs);
863 set->set_pga[shift] = pga[i];
864 lov_stripe_offset(lsm, pga[i].off, stripe,
865 &set->set_pga[shift].off);
870 OBD_FREE(info, sizeof(*info) * lsm->lsm_stripe_count);
875 lov_fini_brw_set(set);
880 int lov_fini_getattr_set(struct lov_request_set *set)
885 LASSERT(set->set_exp);
888 if (set->set_completes)
889 rc = common_attr_done(set);
891 if (atomic_dec_and_test(&set->set_refcount))
897 int lov_prep_getattr_set(struct obd_export *exp, struct obdo *src_oa,
898 struct lov_stripe_md *lsm,
899 struct lov_request_set **reqset)
901 struct lov_request_set *set;
902 struct lov_oinfo *loi = NULL;
903 struct lov_obd *lov = &exp->exp_obd->u.lov;
907 OBD_ALLOC(set, sizeof(*set));
914 set->set_oa = src_oa;
916 loi = lsm->lsm_oinfo;
917 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
918 struct lov_request *req;
920 if (lov->tgts[loi->loi_ost_idx].active == 0) {
921 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
925 OBD_ALLOC(req, sizeof(*req));
927 GOTO(out_set, rc = -ENOMEM);
930 req->rq_idx = loi->loi_ost_idx;
932 req->rq_oa = obdo_alloc();
933 if (req->rq_oa == NULL)
934 GOTO(out_set, rc = -ENOMEM);
935 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
936 req->rq_oa->o_id = loi->loi_id;
938 lov_set_add_req(req, set);
941 GOTO(out_set, rc = -EIO);
945 lov_fini_getattr_set(set);
949 int lov_fini_destroy_set(struct lov_request_set *set)
953 LASSERT(set->set_exp);
956 if (set->set_completes) {
957 /* FIXME update qos data here */
960 if (atomic_dec_and_test(&set->set_refcount))
966 int lov_prep_destroy_set(struct obd_export *exp, struct obdo *src_oa,
967 struct lov_stripe_md *lsm,
968 struct obd_trans_info *oti,
969 struct lov_request_set **reqset)
971 struct lov_request_set *set;
972 struct lov_oinfo *loi = NULL;
973 struct lov_obd *lov = &exp->exp_obd->u.lov;
974 int rc = 0, cookie_set = 0, i;
977 OBD_ALLOC(set, sizeof(*set));
984 set->set_oa = src_oa;
986 if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
987 set->set_cookies = oti->oti_logcookies;
989 loi = lsm->lsm_oinfo;
990 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
991 struct lov_request *req;
993 if (lov->tgts[loi->loi_ost_idx].active == 0) {
994 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
998 OBD_ALLOC(req, sizeof(*req));
1000 GOTO(out_set, rc = -ENOMEM);
1003 req->rq_idx = loi->loi_ost_idx;
1005 req->rq_oa = obdo_alloc();
1006 if (req->rq_oa == NULL)
1007 GOTO(out_set, rc = -ENOMEM);
1008 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1009 req->rq_oa->o_id = loi->loi_id;
1011 /* Setup the first request's cookie position */
1012 if (!cookie_set && set->set_cookies) {
1013 oti->oti_logcookies = set->set_cookies + i;
1016 lov_set_add_req(req, set);
1018 if (!set->set_count)
1019 GOTO(out_set, rc = -EIO);
1023 lov_fini_destroy_set(set);
1027 int lov_fini_setattr_set(struct lov_request_set *set)
1032 LASSERT(set->set_exp);
1035 if (set->set_completes) {
1036 rc = common_attr_done(set);
1037 /* FIXME update qos data here */
1040 if (atomic_dec_and_test(&set->set_refcount))
1041 lov_finish_set(set);
1045 int lov_prep_setattr_set(struct obd_export *exp, struct obdo *src_oa,
1046 struct lov_stripe_md *lsm, struct obd_trans_info *oti,
1047 struct lov_request_set **reqset)
1049 struct lov_request_set *set;
1050 struct lov_oinfo *loi = NULL;
1051 struct lov_obd *lov = &exp->exp_obd->u.lov;
1055 OBD_ALLOC(set, sizeof(*set));
1062 set->set_oa = src_oa;
1064 loi = lsm->lsm_oinfo;
1065 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1066 struct lov_request *req;
1068 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1069 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1073 OBD_ALLOC(req, sizeof(*req));
1075 GOTO(out_set, rc = -ENOMEM);
1077 req->rq_idx = loi->loi_ost_idx;
1079 req->rq_oa = obdo_alloc();
1080 if (req->rq_oa == NULL)
1081 GOTO(out_set, rc = -ENOMEM);
1082 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1083 req->rq_oa->o_id = loi->loi_id;
1085 if (src_oa->o_valid & OBD_MD_FLSIZE) {
1086 if (lov_stripe_offset(lsm, src_oa->o_size, i,
1087 &req->rq_oa->o_size) < 0 &&
1089 req->rq_oa->o_size--;
1090 CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1091 i, req->rq_oa->o_size, src_oa->o_size);
1093 lov_set_add_req(req, set);
1095 if (!set->set_count)
1096 GOTO(out_set, rc = -EIO);
1100 lov_fini_setattr_set(set);
1104 int lov_update_setattr_set(struct lov_request_set *set,
1105 struct lov_request *req, int rc)
1107 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
1110 lov_update_set(set, req, rc);
1112 /* grace error on inactive ost */
1113 if (rc && !lov->tgts[req->rq_idx].active)
1116 /* FIXME: LOV STACKING update loi data should be done by OSC *
1117 * when this is gone we can go back to using lov_update_common_set() */
1118 if (rc == 0 && req->rq_oa->o_valid & OBD_MD_FLMTIME)
1119 set->set_md->lsm_oinfo[req->rq_stripe].loi_mtime =
1120 req->rq_oa->o_mtime;
1121 /* ditto loi_atime, loi_ctime when available */
1126 int lov_update_punch_set(struct lov_request_set *set, struct lov_request *req,
1129 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
1132 lov_update_set(set, req, rc);
1133 if (rc && !lov->tgts[req->rq_idx].active)
1135 /* FIXME in raid1 regime, should return 0 */
1139 int lov_fini_punch_set(struct lov_request_set *set)
1144 LASSERT(set->set_exp);
1147 if (set->set_completes) {
1148 if (!set->set_success)
1150 /* FIXME update qos data here */
1153 if (atomic_dec_and_test(&set->set_refcount))
1154 lov_finish_set(set);
1159 int lov_prep_punch_set(struct obd_export *exp, struct obdo *src_oa,
1160 struct lov_stripe_md *lsm, obd_off start,
1161 obd_off end, struct obd_trans_info *oti,
1162 struct lov_request_set **reqset)
1164 struct lov_request_set *set;
1165 struct lov_oinfo *loi = NULL;
1166 struct lov_obd *lov = &exp->exp_obd->u.lov;
1170 OBD_ALLOC(set, sizeof(*set));
1177 set->set_oa = src_oa;
1179 loi = lsm->lsm_oinfo;
1180 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1181 struct lov_request *req;
1184 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1185 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1189 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1192 OBD_ALLOC(req, sizeof(*req));
1194 GOTO(out_set, rc = -ENOMEM);
1196 req->rq_idx = loi->loi_ost_idx;
1198 req->rq_oa = obdo_alloc();
1199 if (req->rq_oa == NULL)
1200 GOTO(out_set, rc = -ENOMEM);
1201 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1202 req->rq_oa->o_id = loi->loi_id;
1204 req->rq_extent.start = rs;
1205 req->rq_extent.end = re;
1207 lov_set_add_req(req, set);
1209 if (!set->set_count)
1210 GOTO(out_set, rc = -EIO);
1214 lov_fini_punch_set(set);
1218 int lov_fini_sync_set(struct lov_request_set *set)
1223 LASSERT(set->set_exp);
1226 if (set->set_completes) {
1227 if (!set->set_success)
1229 /* FIXME update qos data here */
1232 if (atomic_dec_and_test(&set->set_refcount))
1233 lov_finish_set(set);
1238 int lov_prep_sync_set(struct obd_export *exp, struct obdo *src_oa,
1239 struct lov_stripe_md *lsm, obd_off start,
1240 obd_off end, struct lov_request_set **reqset)
1242 struct lov_request_set *set;
1243 struct lov_oinfo *loi = NULL;
1244 struct lov_obd *lov = &exp->exp_obd->u.lov;
1248 OBD_ALLOC(set, sizeof(*set));
1255 set->set_oa = src_oa;
1257 loi = lsm->lsm_oinfo;
1258 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1259 struct lov_request *req;
1262 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1263 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1267 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1270 OBD_ALLOC(req, sizeof(*req));
1272 GOTO(out_set, rc = -ENOMEM);
1274 req->rq_idx = loi->loi_ost_idx;
1276 req->rq_oa = obdo_alloc();
1277 if (req->rq_oa == NULL)
1278 GOTO(out_set, rc = -ENOMEM);
1279 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1280 req->rq_oa->o_id = loi->loi_id;
1282 req->rq_extent.start = rs;
1283 req->rq_extent.end = re;
1285 lov_set_add_req(req, set);
1287 if (!set->set_count)
1288 GOTO(out_set, rc = -EIO);
1292 lov_fini_sync_set(set);