1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
6 * This file is part of Lustre, http://www.lustre.org.
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 # define EXPORT_SYMTAB
25 #define DEBUG_SUBSYSTEM S_LOV
28 #include <asm/div64.h>
30 #include <liblustre.h>
33 #include <linux/obd_class.h>
34 #include <linux/obd_lov.h>
35 #include <linux/lustre_idl.h>
37 #include "lov_internal.h"
39 static void lov_init_set(struct lov_request_set *set)
42 set->set_completes = 0;
44 INIT_LIST_HEAD(&set->set_list);
45 atomic_set(&set->set_refcount, 1);
48 static void lov_finish_set(struct lov_request_set *set)
50 struct list_head *pos, *n;
54 list_for_each_safe(pos, n, &set->set_list) {
55 struct lov_request *req = list_entry(pos, struct lov_request,
57 list_del_init(&req->rq_link);
60 obdo_free(req->rq_oa);
62 OBD_FREE(req->rq_md, req->rq_buflen);
63 OBD_FREE(req, sizeof(*req));
67 int len = set->set_oabufs * sizeof(*set->set_pga);
68 OBD_FREE(set->set_pga, len);
71 lov_llh_put(set->set_lockh);
73 OBD_FREE(set, sizeof(*set));
77 static void lov_update_set(struct lov_request_set *set,
78 struct lov_request *req, int rc)
88 int lov_update_common_set(struct lov_request_set *set,
89 struct lov_request *req, int rc)
91 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
94 lov_update_set(set, req, rc);
96 /* grace error on inactive ost */
97 if (rc && !lov->tgts[req->rq_idx].active)
100 /* FIXME in raid1 regime, should return 0 */
104 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
106 list_add_tail(&req->rq_link, &set->set_list);
110 int lov_update_enqueue_set(struct lov_request_set *set,
111 struct lov_request *req, int rc, int flags)
113 struct lustre_handle *lov_lockhp;
114 struct lov_oinfo *loi;
117 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
118 loi = &set->set_md->lsm_oinfo[req->rq_stripe];
120 /* XXX FIXME: This unpleasantness doesn't belong here at *all*.
121 * It belongs in the OSC, except that the OSC doesn't have
122 * access to the real LOI -- it gets a copy, that we created
123 * above, and that copy can be arbitrarily out of date.
125 * The LOV API is due for a serious rewriting anyways, and this
126 * can be addressed then. */
127 if (rc == ELDLM_OK) {
128 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
129 __u64 tmp = req->rq_md->lsm_oinfo->loi_rss;
131 LASSERT(lock != NULL);
133 loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks;
134 /* Extend KMS up to the end of this lock and no further
135 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
136 if (tmp > lock->l_policy_data.l_extent.end)
137 tmp = lock->l_policy_data.l_extent.end + 1;
138 if (tmp >= loi->loi_kms) {
139 LDLM_DEBUG(lock, "lock acquired, setting rss="
140 LPU64", kms="LPU64, loi->loi_rss, tmp);
142 loi->loi_kms_valid = 1;
144 LDLM_DEBUG(lock, "lock acquired, setting rss="
145 LPU64"; leaving kms="LPU64", end="LPU64,
146 loi->loi_rss, loi->loi_kms,
147 lock->l_policy_data.l_extent.end);
149 ldlm_lock_allow_match(lock);
151 } else if (rc == ELDLM_LOCK_ABORTED && flags & LDLM_FL_HAS_INTENT) {
152 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
153 loi->loi_rss = req->rq_md->lsm_oinfo->loi_rss;
154 loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks;
155 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
156 " kms="LPU64"\n", loi->loi_rss, loi->loi_kms);
159 struct obd_export *exp = set->set_exp;
160 struct lov_obd *lov = &exp->exp_obd->u.lov;
162 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
163 if (lov->tgts[req->rq_idx].active) {
164 CERROR("error: enqueue objid "LPX64" subobj "
165 LPX64" on OST idx %d: rc = %d\n",
166 set->set_md->lsm_object_id, loi->loi_id,
167 loi->loi_ost_idx, rc);
172 lov_update_set(set, req, rc);
176 static int enqueue_done(struct lov_request_set *set, __u32 mode)
178 struct lov_request *req;
179 struct lustre_handle *lov_lockhp = NULL;
180 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
184 LASSERT(set->set_completes);
185 /* enqueue/match success, just return */
186 if (set->set_completes == set->set_success)
189 /* cancel enqueued/matched locks */
190 list_for_each_entry(req, &set->set_list, rq_link) {
191 if (!req->rq_complete || req->rq_rc)
194 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
196 if (lov_lockhp->cookie == 0)
199 rc = obd_cancel(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
201 if (rc && lov->tgts[req->rq_idx].active)
202 CERROR("cancelling obdjid "LPX64" on OST "
203 "idx %d error: rc = %d\n",
204 req->rq_md->lsm_object_id, req->rq_idx, rc);
206 lov_llh_put(set->set_lockh);
210 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode)
217 LASSERT(set->set_exp);
218 if (set->set_completes)
219 rc = enqueue_done(set, mode);
221 lov_llh_put(set->set_lockh);
223 if (atomic_dec_and_test(&set->set_refcount))
229 int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm,
230 ldlm_policy_data_t *policy, __u32 mode,
231 struct lustre_handle *lockh,
232 struct lov_request_set **reqset)
234 struct lov_obd *lov = &exp->exp_obd->u.lov;
235 struct lov_request_set *set;
237 struct lov_oinfo *loi;
240 OBD_ALLOC(set, sizeof(*set));
247 set->set_lockh = lov_llh_new(lsm);
248 if (set->set_lockh == NULL)
249 GOTO(out_set, rc = -ENOMEM);
250 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
252 loi = lsm->lsm_oinfo;
253 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
254 struct lov_request *req;
257 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
258 policy->l_extent.end, &start, &end))
261 if (lov->tgts[loi->loi_ost_idx].active == 0) {
262 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
266 OBD_ALLOC(req, sizeof(*req));
268 GOTO(out_set, rc = -ENOMEM);
270 req->rq_buflen = sizeof(*req->rq_md) +
271 sizeof(struct lov_oinfo);
272 OBD_ALLOC(req->rq_md, req->rq_buflen);
273 if (req->rq_md == NULL)
274 GOTO(out_set, rc = -ENOMEM);
276 req->rq_extent.start = start;
277 req->rq_extent.end = end;
279 req->rq_idx = loi->loi_ost_idx;
282 /* XXX LOV STACKING: submd should be from the subobj */
283 req->rq_md->lsm_object_id = loi->loi_id;
284 req->rq_md->lsm_stripe_count = 0;
285 req->rq_md->lsm_oinfo->loi_kms_valid = loi->loi_kms_valid;
286 req->rq_md->lsm_oinfo->loi_rss = loi->loi_rss;
287 req->rq_md->lsm_oinfo->loi_kms = loi->loi_kms;
288 req->rq_md->lsm_oinfo->loi_blocks = loi->loi_blocks;
290 lov_set_add_req(req, set);
293 GOTO(out_set, rc = -EIO);
297 lov_fini_enqueue_set(set, mode);
301 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
309 lov_update_set(set, req, ret);
313 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
320 LASSERT(set->set_exp);
321 if (set->set_completes) {
322 if (set->set_count == set->set_success &&
323 flags & LDLM_FL_TEST_LOCK)
324 lov_llh_put(set->set_lockh);
325 rc = enqueue_done(set, mode);
327 lov_llh_put(set->set_lockh);
330 if (atomic_dec_and_test(&set->set_refcount))
336 int lov_prep_match_set(struct obd_export *exp, struct lov_stripe_md *lsm,
337 ldlm_policy_data_t *policy, __u32 mode,
338 struct lustre_handle *lockh,
339 struct lov_request_set **reqset)
341 struct lov_obd *lov = &exp->exp_obd->u.lov;
342 struct lov_request_set *set;
344 struct lov_oinfo *loi;
347 OBD_ALLOC(set, sizeof(*set));
354 set->set_lockh = lov_llh_new(lsm);
355 if (set->set_lockh == NULL)
356 GOTO(out_set, rc = -ENOMEM);
357 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
359 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
360 struct lov_request *req;
363 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
364 policy->l_extent.end, &start, &end))
367 /* FIXME raid1 should grace this error */
368 if (lov->tgts[loi->loi_ost_idx].active == 0) {
369 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
370 GOTO(out_set, rc = -EIO);
373 OBD_ALLOC(req, sizeof(*req));
375 GOTO(out_set, rc = -ENOMEM);
377 req->rq_buflen = sizeof(*req->rq_md);
378 OBD_ALLOC(req->rq_md, req->rq_buflen);
379 if (req->rq_md == NULL)
380 GOTO(out_set, rc = -ENOMEM);
382 req->rq_extent.start = start;
383 req->rq_extent.end = end;
385 req->rq_idx = loi->loi_ost_idx;
388 /* XXX LOV STACKING: submd should be from the subobj */
389 req->rq_md->lsm_object_id = loi->loi_id;
390 req->rq_md->lsm_stripe_count = 0;
392 lov_set_add_req(req, set);
395 GOTO(out_set, rc = -EIO);
399 lov_fini_match_set(set, mode, 0);
403 int lov_fini_cancel_set(struct lov_request_set *set)
408 LASSERT(set->set_exp);
413 lov_llh_put(set->set_lockh);
415 if (atomic_dec_and_test(&set->set_refcount))
421 int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm,
422 __u32 mode, struct lustre_handle *lockh,
423 struct lov_request_set **reqset)
425 struct lov_request_set *set;
427 struct lov_oinfo *loi;
430 OBD_ALLOC(set, sizeof(*set));
437 set->set_lockh = lov_handle2llh(lockh);
438 if (set->set_lockh == NULL) {
439 CERROR("LOV: invalid lov lock handle %p\n", lockh);
440 GOTO(out_set, rc = -EINVAL);
442 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
444 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
445 struct lov_request *req;
446 struct lustre_handle *lov_lockhp;
448 lov_lockhp = set->set_lockh->llh_handles + i;
449 if (lov_lockhp->cookie == 0) {
450 CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
451 loi->loi_ost_idx, loi->loi_id);
455 OBD_ALLOC(req, sizeof(*req));
457 GOTO(out_set, rc = -ENOMEM);
459 req->rq_buflen = sizeof(*req->rq_md);
460 OBD_ALLOC(req->rq_md, req->rq_buflen);
461 if (req->rq_md == NULL)
462 GOTO(out_set, rc = -ENOMEM);
464 req->rq_idx = loi->loi_ost_idx;
467 /* XXX LOV STACKING: submd should be from the subobj */
468 req->rq_md->lsm_object_id = loi->loi_id;
469 req->rq_md->lsm_stripe_count = 0;
471 lov_set_add_req(req, set);
474 GOTO(out_set, rc = -EIO);
478 lov_fini_cancel_set(set);
482 static int create_done(struct obd_export *exp, struct lov_request_set *set,
483 struct lov_stripe_md **lsmp)
485 struct lov_obd *lov = &exp->exp_obd->u.lov;
486 struct obd_trans_info *oti = set->set_oti;
487 struct obdo *src_oa = set->set_oa;
488 struct lov_request *req;
489 struct obdo *ret_oa = NULL;
490 int attrset = 0, rc = 0;
493 LASSERT(set->set_completes);
495 if (!set->set_success)
496 GOTO(cleanup, rc = -EIO);
497 if (*lsmp == NULL && set->set_count != set->set_success) {
498 set->set_count = set->set_success;
502 ret_oa = obdo_alloc();
504 GOTO(cleanup, rc = -ENOMEM);
506 list_for_each_entry(req, &set->set_list, rq_link) {
507 if (!req->rq_complete || req->rq_rc)
509 lov_merge_attrs(ret_oa, req->rq_oa, req->rq_oa->o_valid,
510 set->set_md, req->rq_stripe, &attrset);
512 if (src_oa->o_valid & OBD_MD_FLSIZE &&
513 ret_oa->o_size != src_oa->o_size) {
514 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
515 src_oa->o_size, ret_oa->o_size);
518 ret_oa->o_id = src_oa->o_id;
519 memcpy(src_oa, ret_oa, sizeof(*src_oa));
526 list_for_each_entry(req, &set->set_list, rq_link) {
527 struct obd_export *sub_exp;
530 if (!req->rq_complete || req->rq_rc)
533 sub_exp = lov->tgts[req->rq_idx].ltd_exp,
534 err = obd_destroy(sub_exp, req->rq_oa, NULL, oti);
536 CERROR("Failed to uncreate objid "LPX64" subobj "
537 LPX64" on OST idx %d: rc = %d\n",
538 set->set_oa->o_id, req->rq_oa->o_id,
542 obd_free_memmd(exp, &set->set_md);
544 if (oti && set->set_cookies) {
545 oti->oti_logcookies = set->set_cookies;
546 if (!set->set_cookie_sent) {
547 oti_free_cookies(oti);
548 src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
550 src_oa->o_valid |= OBD_MD_FLCOOKIE;
556 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
561 LASSERT(set->set_exp);
564 if (set->set_completes) {
565 rc = create_done(set->set_exp, set, lsmp);
566 /* FIXME update qos data here */
569 if (atomic_dec_and_test(&set->set_refcount))
575 int lov_update_create_set(struct lov_request_set *set,
576 struct lov_request *req, int rc)
578 struct obd_trans_info *oti = set->set_oti;
579 struct lov_stripe_md *lsm = set->set_md;
580 struct lov_oinfo *loi;
581 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
584 req->rq_stripe = set->set_success;
585 loi = &lsm->lsm_oinfo[req->rq_stripe];
587 if (rc && lov->tgts[req->rq_idx].active) {
588 CERROR("error creating objid "LPX64" sub-object"
589 " on OST idx %d/%d: rc = %d\n",
590 set->set_oa->o_id, req->rq_idx,
591 lsm->lsm_stripe_count, rc);
593 CERROR("obd_create returned invalid err %d\n", rc);
597 lov_update_set(set, req, rc);
601 if (oti && oti->oti_objid)
602 oti->oti_objid[req->rq_idx] = req->rq_oa->o_id;
604 loi->loi_id = req->rq_oa->o_id;
605 loi->loi_ost_idx = req->rq_idx;
606 CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPX64" at idx %d\n",
607 lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
610 if (set->set_cookies)
611 ++oti->oti_logcookies;
612 if (req->rq_oa->o_valid & OBD_MD_FLCOOKIE)
613 set->set_cookie_sent++;
618 int lov_prep_create_set(struct obd_export *exp, struct lov_stripe_md **lsmp,
619 struct obdo *src_oa, struct obd_trans_info *oti,
620 struct lov_request_set **reqset)
622 struct lov_obd *lov = &exp->exp_obd->u.lov;
623 struct lov_request_set *set;
624 int rc = 0, newea = 0;
627 OBD_ALLOC(set, sizeof(*set));
634 set->set_oa = src_oa;
637 if (set->set_md == NULL) {
638 int stripes, stripe_cnt;
639 stripe_cnt = lov_get_stripecnt(lov, 0);
641 /* If the MDS file was truncated up to some size, stripe over
642 * enough OSTs to allow the file to be created at that size. */
643 if (src_oa->o_valid & OBD_MD_FLSIZE) {
644 stripes=((src_oa->o_size+LUSTRE_STRIPE_MAXBYTES)>>12)-1;
645 do_div(stripes, (__u32)(LUSTRE_STRIPE_MAXBYTES >> 12));
647 if (stripes > lov->desc.ld_active_tgt_count)
648 GOTO(out_set, rc = -EFBIG);
649 if (stripes < stripe_cnt)
650 stripes = stripe_cnt;
652 stripes = stripe_cnt;
655 rc = lov_alloc_memmd(&set->set_md, stripes,
656 lov->desc.ld_pattern ?
657 lov->desc.ld_pattern : LOV_PATTERN_RAID0);
663 rc = qos_prep_create(lov, set, newea);
667 if (oti && (src_oa->o_valid & OBD_MD_FLCOOKIE)) {
668 oti_alloc_cookies(oti, set->set_count);
669 if (!oti->oti_logcookies)
671 set->set_cookies = oti->oti_logcookies;
678 obd_free_memmd(exp, &set->set_md);
680 lov_fini_create_set(set, lsmp);
684 static int common_attr_done(struct lov_request_set *set)
686 struct list_head *pos;
687 struct lov_request *req;
689 int rc = 0, attrset = 0;
692 if (set->set_oa == NULL)
695 if (!set->set_success)
698 tmp_oa = obdo_alloc();
700 GOTO(out, rc = -ENOMEM);
702 list_for_each (pos, &set->set_list) {
703 req = list_entry(pos, struct lov_request, rq_link);
705 if (!req->rq_complete || req->rq_rc)
707 if (req->rq_oa->o_valid == 0) /* inactive stripe */
709 lov_merge_attrs(tmp_oa, req->rq_oa, req->rq_oa->o_valid,
710 set->set_md, req->rq_stripe, &attrset);
713 CERROR("No stripes had valid attrs\n");
716 tmp_oa->o_id = set->set_oa->o_id;
717 memcpy(set->set_oa, tmp_oa, sizeof(*set->set_oa));
725 static int brw_done(struct lov_request_set *set)
727 struct lov_stripe_md *lsm = set->set_md;
728 struct lov_oinfo *loi = NULL;
729 struct list_head *pos;
730 struct lov_request *req;
733 list_for_each (pos, &set->set_list) {
734 req = list_entry(pos, struct lov_request, rq_link);
736 if (!req->rq_complete || req->rq_rc)
739 loi = &lsm->lsm_oinfo[req->rq_stripe];
741 if (req->rq_oa->o_valid & OBD_MD_FLBLOCKS)
742 loi->loi_blocks = req->rq_oa->o_blocks;
748 int lov_fini_brw_set(struct lov_request_set *set)
753 LASSERT(set->set_exp);
756 if (set->set_completes) {
758 /* FIXME update qos data here */
760 if (atomic_dec_and_test(&set->set_refcount))
766 int lov_prep_brw_set(struct obd_export *exp, struct obdo *src_oa,
767 struct lov_stripe_md *lsm, obd_count oa_bufs,
768 struct brw_page *pga, struct obd_trans_info *oti,
769 struct lov_request_set **reqset)
776 struct lov_request_set *set;
777 struct lov_oinfo *loi = NULL;
778 struct lov_obd *lov = &exp->exp_obd->u.lov;
779 int rc = 0, i, shift;
782 OBD_ALLOC(set, sizeof(*set));
789 set->set_oa = src_oa;
791 set->set_oabufs = oa_bufs;
792 OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
794 GOTO(out, rc = -ENOMEM);
796 OBD_ALLOC(info, sizeof(*info) * lsm->lsm_stripe_count);
798 GOTO(out, rc = -ENOMEM);
800 /* calculate the page count for each stripe */
801 for (i = 0; i < oa_bufs; i++) {
802 int stripe = lov_stripe_number(lsm, pga[i].off);
803 info[stripe].count++;
806 /* alloc and initialize lov request */
808 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
809 struct lov_request *req;
811 if (info[i].count == 0)
814 if (lov->tgts[loi->loi_ost_idx].active == 0) {
815 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
816 GOTO(out, rc = -EIO);
819 OBD_ALLOC(req, sizeof(*req));
821 GOTO(out, rc = -ENOMEM);
823 req->rq_oa = obdo_alloc();
824 if (req->rq_oa == NULL)
825 GOTO(out, rc = -ENOMEM);
828 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
829 req->rq_oa->o_id = loi->loi_id;
831 req->rq_buflen = sizeof(*req->rq_md);
832 OBD_ALLOC(req->rq_md, req->rq_buflen);
833 if (req->rq_md == NULL)
834 GOTO(out, rc = -ENOMEM);
836 req->rq_idx = loi->loi_ost_idx;
839 /* XXX LOV STACKING */
840 req->rq_md->lsm_object_id = loi->loi_id;
841 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
842 req->rq_oabufs = info[i].count;
843 req->rq_pgaidx = shift;
844 shift += req->rq_oabufs;
846 /* remember the index for sort brw_page array */
847 info[i].index = req->rq_pgaidx;
849 lov_set_add_req(req, set);
852 GOTO(out, rc = -EIO);
854 /* rotate & sort the brw_page array */
855 for (i = 0; i < oa_bufs; i++) {
856 int stripe = lov_stripe_number(lsm, pga[i].off);
858 shift = info[stripe].index + info[stripe].off;
859 LASSERT(shift < oa_bufs);
860 set->set_pga[shift] = pga[i];
861 lov_stripe_offset(lsm, pga[i].off, stripe,
862 &set->set_pga[shift].off);
867 OBD_FREE(info, sizeof(*info) * lsm->lsm_stripe_count);
872 lov_fini_brw_set(set);
877 static int getattr_done(struct lov_request_set *set)
879 return common_attr_done(set);
882 int lov_fini_getattr_set(struct lov_request_set *set)
887 LASSERT(set->set_exp);
890 if (set->set_completes)
891 rc = getattr_done(set);
893 if (atomic_dec_and_test(&set->set_refcount))
899 int lov_prep_getattr_set(struct obd_export *exp, struct obdo *src_oa,
900 struct lov_stripe_md *lsm,
901 struct lov_request_set **reqset)
903 struct lov_request_set *set;
904 struct lov_oinfo *loi = NULL;
905 struct lov_obd *lov = &exp->exp_obd->u.lov;
909 OBD_ALLOC(set, sizeof(*set));
916 set->set_oa = src_oa;
918 loi = lsm->lsm_oinfo;
919 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
920 struct lov_request *req;
922 if (lov->tgts[loi->loi_ost_idx].active == 0) {
923 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
927 OBD_ALLOC(req, sizeof(*req));
929 GOTO(out_set, rc = -ENOMEM);
932 req->rq_idx = loi->loi_ost_idx;
934 req->rq_oa = obdo_alloc();
935 if (req->rq_oa == NULL)
936 GOTO(out_set, rc = -ENOMEM);
937 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
938 req->rq_oa->o_id = loi->loi_id;
940 lov_set_add_req(req, set);
943 GOTO(out_set, rc = -EIO);
947 lov_fini_getattr_set(set);
951 int lov_fini_destroy_set(struct lov_request_set *set)
955 LASSERT(set->set_exp);
958 if (set->set_completes) {
959 /* FIXME update qos data here */
962 if (atomic_dec_and_test(&set->set_refcount))
968 int lov_prep_destroy_set(struct obd_export *exp, struct obdo *src_oa,
969 struct lov_stripe_md *lsm,
970 struct obd_trans_info *oti,
971 struct lov_request_set **reqset)
973 struct lov_request_set *set;
974 struct lov_oinfo *loi = NULL;
975 struct lov_obd *lov = &exp->exp_obd->u.lov;
976 int rc = 0, cookie_set = 0, i;
979 OBD_ALLOC(set, sizeof(*set));
986 set->set_oa = src_oa;
988 if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
989 set->set_cookies = oti->oti_logcookies;
991 loi = lsm->lsm_oinfo;
992 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
993 struct lov_request *req;
995 if (lov->tgts[loi->loi_ost_idx].active == 0) {
996 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1000 OBD_ALLOC(req, sizeof(*req));
1002 GOTO(out_set, rc = -ENOMEM);
1005 req->rq_idx = loi->loi_ost_idx;
1007 req->rq_oa = obdo_alloc();
1008 if (req->rq_oa == NULL)
1009 GOTO(out_set, rc = -ENOMEM);
1010 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1011 req->rq_oa->o_id = loi->loi_id;
1013 /* Setup the first request's cookie position */
1014 if (!cookie_set && set->set_cookies) {
1015 oti->oti_logcookies = set->set_cookies + i;
1018 lov_set_add_req(req, set);
1020 if (!set->set_count)
1021 GOTO(out_set, rc = -EIO);
1025 lov_fini_destroy_set(set);
1029 static int setattr_done(struct lov_request_set *set)
1031 return common_attr_done(set);
1034 int lov_fini_setattr_set(struct lov_request_set *set)
1039 LASSERT(set->set_exp);
1042 if (set->set_completes) {
1043 rc = setattr_done(set);
1044 /* FIXME update qos data here */
1047 if (atomic_dec_and_test(&set->set_refcount))
1048 lov_finish_set(set);
1052 int lov_prep_setattr_set(struct obd_export *exp, struct obdo *src_oa,
1053 struct lov_stripe_md *lsm, struct obd_trans_info *oti,
1054 struct lov_request_set **reqset)
1056 struct lov_request_set *set;
1057 struct lov_oinfo *loi = NULL;
1058 struct lov_obd *lov = &exp->exp_obd->u.lov;
1062 OBD_ALLOC(set, sizeof(*set));
1069 set->set_oa = src_oa;
1071 loi = lsm->lsm_oinfo;
1072 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1073 struct lov_request *req;
1075 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1076 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1080 OBD_ALLOC(req, sizeof(*req));
1082 GOTO(out_set, rc = -ENOMEM);
1084 req->rq_idx = loi->loi_ost_idx;
1086 req->rq_oa = obdo_alloc();
1087 if (req->rq_oa == NULL)
1088 GOTO(out_set, rc = -ENOMEM);
1089 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1090 req->rq_oa->o_id = loi->loi_id;
1092 if (src_oa->o_valid & OBD_MD_FLSIZE) {
1093 if (lov_stripe_offset(lsm, src_oa->o_size, i,
1094 &req->rq_oa->o_size) < 0 &&
1096 req->rq_oa->o_size--;
1097 CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1098 i, req->rq_oa->o_size, src_oa->o_size);
1100 lov_set_add_req(req, set);
1102 if (!set->set_count)
1103 GOTO(out_set, rc = -EIO);
1107 lov_fini_setattr_set(set);
1111 int lov_update_punch_set(struct lov_request_set *set, struct lov_request *req,
1114 struct lov_stripe_md *lsm = set->set_md;
1115 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
1118 lov_update_set(set, req, rc);
1120 struct lov_oinfo *loi = &lsm->lsm_oinfo[req->rq_stripe];
1121 loi->loi_kms = loi->loi_rss = req->rq_extent.start;
1123 if (rc && !lov->tgts[req->rq_idx].active)
1125 /* FIXME in raid1 regime, should return 0 */
1129 int lov_fini_punch_set(struct lov_request_set *set)
1134 LASSERT(set->set_exp);
1137 if (set->set_completes) {
1138 if (!set->set_success)
1140 /* FIXME update qos data here */
1143 if (atomic_dec_and_test(&set->set_refcount))
1144 lov_finish_set(set);
1149 int lov_prep_punch_set(struct obd_export *exp, struct obdo *src_oa,
1150 struct lov_stripe_md *lsm, obd_off start,
1151 obd_off end, struct obd_trans_info *oti,
1152 struct lov_request_set **reqset)
1154 struct lov_request_set *set;
1155 struct lov_oinfo *loi = NULL;
1156 struct lov_obd *lov = &exp->exp_obd->u.lov;
1160 OBD_ALLOC(set, sizeof(*set));
1167 set->set_oa = src_oa;
1169 loi = lsm->lsm_oinfo;
1170 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1171 struct lov_request *req;
1174 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1175 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1179 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1182 OBD_ALLOC(req, sizeof(*req));
1184 GOTO(out_set, rc = -ENOMEM);
1186 req->rq_idx = loi->loi_ost_idx;
1188 req->rq_oa = obdo_alloc();
1189 if (req->rq_oa == NULL)
1190 GOTO(out_set, rc = -ENOMEM);
1191 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1192 req->rq_oa->o_id = loi->loi_id;
1194 req->rq_extent.start = rs;
1195 req->rq_extent.end = re;
1197 lov_set_add_req(req, set);
1199 if (!set->set_count)
1200 GOTO(out_set, rc = -EIO);
1204 lov_fini_punch_set(set);
1208 int lov_fini_sync_set(struct lov_request_set *set)
1213 LASSERT(set->set_exp);
1216 if (set->set_completes) {
1217 if (!set->set_success)
1219 /* FIXME update qos data here */
1222 if (atomic_dec_and_test(&set->set_refcount))
1223 lov_finish_set(set);
1228 int lov_prep_sync_set(struct obd_export *exp, struct obdo *src_oa,
1229 struct lov_stripe_md *lsm, obd_off start,
1230 obd_off end, struct lov_request_set **reqset)
1232 struct lov_request_set *set;
1233 struct lov_oinfo *loi = NULL;
1234 struct lov_obd *lov = &exp->exp_obd->u.lov;
1238 OBD_ALLOC(set, sizeof(*set));
1245 set->set_oa = src_oa;
1247 loi = lsm->lsm_oinfo;
1248 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1249 struct lov_request *req;
1252 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1253 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1257 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1260 OBD_ALLOC(req, sizeof(*req));
1262 GOTO(out_set, rc = -ENOMEM);
1264 req->rq_idx = loi->loi_ost_idx;
1266 req->rq_oa = obdo_alloc();
1267 if (req->rq_oa == NULL)
1268 GOTO(out_set, rc = -ENOMEM);
1269 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1270 req->rq_oa->o_id = loi->loi_id;
1272 req->rq_extent.start = rs;
1273 req->rq_extent.end = re;
1275 lov_set_add_req(req, set);
1277 if (!set->set_count)
1278 GOTO(out_set, rc = -EIO);
1282 lov_fini_sync_set(set);