1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
6 * This file is part of the Lustre file system, http://www.lustre.org
7 * Lustre is a trademark of Cluster File Systems, Inc.
9 * You may have signed or agreed to another license before downloading
10 * this software. If so, you are bound by the terms and conditions
11 * of that agreement, and the following does not apply to you. See the
12 * LICENSE file included with this distribution for more information.
14 * If you did not agree to a different license, then this copy of Lustre
15 * is open source software; you can redistribute it and/or modify it
16 * under the terms of version 2 of the GNU General Public License as
17 * published by the Free Software Foundation.
19 * In either case, Lustre is distributed in the hope that it will be
20 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22 * license text for more details.
26 # define EXPORT_SYMTAB
28 #define DEBUG_SUBSYSTEM S_LOV
31 #include <asm/div64.h>
33 #include <liblustre.h>
36 #include <linux/obd_class.h>
37 #include <linux/obd_lov.h>
38 #include <linux/lustre_idl.h>
40 #include "lov_internal.h"
42 static void lov_init_set(struct lov_request_set *set)
45 set->set_completes = 0;
47 INIT_LIST_HEAD(&set->set_list);
48 atomic_set(&set->set_refcount, 1);
51 static void lov_finish_set(struct lov_request_set *set)
53 struct list_head *pos, *n;
57 list_for_each_safe(pos, n, &set->set_list) {
58 struct lov_request *req = list_entry(pos, struct lov_request,
60 list_del_init(&req->rq_link);
63 obdo_free(req->rq_oa);
65 OBD_FREE(req->rq_md, req->rq_buflen);
66 OBD_FREE(req, sizeof(*req));
70 int len = set->set_oabufs * sizeof(*set->set_pga);
71 OBD_FREE(set->set_pga, len);
74 lov_llh_put(set->set_lockh);
76 OBD_FREE(set, sizeof(*set));
80 static void lov_update_set(struct lov_request_set *set,
81 struct lov_request *req, int rc)
91 int lov_update_common_set(struct lov_request_set *set,
92 struct lov_request *req, int rc)
94 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
97 lov_update_set(set, req, rc);
99 /* grace error on inactive ost */
100 if (rc && !lov->tgts[req->rq_idx].active)
103 /* FIXME in raid1 regime, should return 0 */
107 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
109 list_add_tail(&req->rq_link, &set->set_list);
113 int lov_update_enqueue_set(struct lov_request_set *set,
114 struct lov_request *req, int rc, int flags)
116 struct lustre_handle *lov_lockhp;
117 struct lov_oinfo *loi;
120 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
121 loi = &set->set_md->lsm_oinfo[req->rq_stripe];
123 /* XXX FIXME: This unpleasantness doesn't belong here at *all*.
124 * It belongs in the OSC, except that the OSC doesn't have
125 * access to the real LOI -- it gets a copy, that we created
126 * above, and that copy can be arbitrarily out of date.
128 * The LOV API is due for a serious rewriting anyways, and this
129 * can be addressed then. */
130 if (rc == ELDLM_OK) {
131 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
132 __u64 tmp = req->rq_md->lsm_oinfo->loi_rss;
134 LASSERT(lock != NULL);
136 loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
137 loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks;
138 /* Extend KMS up to the end of this lock and no further
139 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
140 if (tmp > lock->l_policy_data.l_extent.end)
141 tmp = lock->l_policy_data.l_extent.end + 1;
142 if (tmp >= loi->loi_kms) {
143 LDLM_DEBUG(lock, "lock acquired, setting rss="
144 LPU64", kms="LPU64, loi->loi_rss, tmp);
146 loi->loi_kms_valid = 1;
148 LDLM_DEBUG(lock, "lock acquired, setting rss="
149 LPU64"; leaving kms="LPU64", end="LPU64,
150 loi->loi_rss, loi->loi_kms,
151 lock->l_policy_data.l_extent.end);
153 ldlm_lock_allow_match(lock);
155 } else if (rc == ELDLM_LOCK_ABORTED && flags & LDLM_FL_HAS_INTENT) {
156 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
157 loi->loi_rss = req->rq_md->lsm_oinfo->loi_rss;
158 loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
159 loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks;
160 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
161 " kms="LPU64"\n", loi->loi_rss, loi->loi_kms);
164 struct obd_export *exp = set->set_exp;
165 struct lov_obd *lov = &exp->exp_obd->u.lov;
167 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
168 if (lov->tgts[req->rq_idx].active) {
169 CERROR("error: enqueue objid "LPX64" subobj "
170 LPX64" on OST idx %d: rc = %d\n",
171 set->set_md->lsm_object_id, loi->loi_id,
172 loi->loi_ost_idx, rc);
177 lov_update_set(set, req, rc);
181 static int enqueue_done(struct lov_request_set *set, __u32 mode)
183 struct lov_request *req;
184 struct lustre_handle *lov_lockhp = NULL;
185 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
189 LASSERT(set->set_completes);
190 /* enqueue/match success, just return */
191 if (set->set_completes == set->set_success)
194 /* cancel enqueued/matched locks */
195 list_for_each_entry(req, &set->set_list, rq_link) {
196 if (!req->rq_complete || req->rq_rc)
199 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
201 if (lov_lockhp->cookie == 0)
204 rc = obd_cancel(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
206 if (rc && lov->tgts[req->rq_idx].active)
207 CERROR("cancelling obdjid "LPX64" on OST "
208 "idx %d error: rc = %d\n",
209 req->rq_md->lsm_object_id, req->rq_idx, rc);
211 lov_llh_put(set->set_lockh);
215 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode)
222 LASSERT(set->set_exp);
223 if (set->set_completes)
224 rc = enqueue_done(set, mode);
226 lov_llh_put(set->set_lockh);
228 if (atomic_dec_and_test(&set->set_refcount))
234 int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm,
235 ldlm_policy_data_t *policy, __u32 mode,
236 struct lustre_handle *lockh,
237 struct lov_request_set **reqset)
239 struct lov_obd *lov = &exp->exp_obd->u.lov;
240 struct lov_request_set *set;
242 struct lov_oinfo *loi;
245 OBD_ALLOC(set, sizeof(*set));
252 set->set_lockh = lov_llh_new(lsm);
253 if (set->set_lockh == NULL)
254 GOTO(out_set, rc = -ENOMEM);
255 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
257 loi = lsm->lsm_oinfo;
258 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
259 struct lov_request *req;
262 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
263 policy->l_extent.end, &start, &end))
266 if (lov->tgts[loi->loi_ost_idx].active == 0) {
267 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
271 OBD_ALLOC(req, sizeof(*req));
273 GOTO(out_set, rc = -ENOMEM);
275 req->rq_buflen = sizeof(*req->rq_md) +
276 sizeof(struct lov_oinfo);
277 OBD_ALLOC(req->rq_md, req->rq_buflen);
278 if (req->rq_md == NULL)
279 GOTO(out_set, rc = -ENOMEM);
281 req->rq_extent.start = start;
282 req->rq_extent.end = end;
283 req->rq_extent.gid = policy->l_extent.gid;
285 req->rq_idx = loi->loi_ost_idx;
288 /* XXX LOV STACKING: submd should be from the subobj */
289 req->rq_md->lsm_object_id = loi->loi_id;
290 req->rq_md->lsm_stripe_count = 0;
291 req->rq_md->lsm_oinfo->loi_kms_valid = loi->loi_kms_valid;
292 req->rq_md->lsm_oinfo->loi_rss = loi->loi_rss;
293 req->rq_md->lsm_oinfo->loi_kms = loi->loi_kms;
294 req->rq_md->lsm_oinfo->loi_blocks = loi->loi_blocks;
295 req->rq_md->lsm_oinfo->loi_mtime = loi->loi_mtime;
297 lov_set_add_req(req, set);
300 GOTO(out_set, rc = -EIO);
304 lov_fini_enqueue_set(set, mode);
308 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
316 lov_update_set(set, req, ret);
320 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
327 LASSERT(set->set_exp);
328 if (set->set_completes) {
329 if (set->set_count == set->set_success &&
330 flags & LDLM_FL_TEST_LOCK)
331 lov_llh_put(set->set_lockh);
332 rc = enqueue_done(set, mode);
334 lov_llh_put(set->set_lockh);
337 if (atomic_dec_and_test(&set->set_refcount))
343 int lov_prep_match_set(struct obd_export *exp, struct lov_stripe_md *lsm,
344 ldlm_policy_data_t *policy, __u32 mode,
345 struct lustre_handle *lockh,
346 struct lov_request_set **reqset)
348 struct lov_obd *lov = &exp->exp_obd->u.lov;
349 struct lov_request_set *set;
351 struct lov_oinfo *loi;
354 OBD_ALLOC(set, sizeof(*set));
361 set->set_lockh = lov_llh_new(lsm);
362 if (set->set_lockh == NULL)
363 GOTO(out_set, rc = -ENOMEM);
364 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
366 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
367 struct lov_request *req;
370 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
371 policy->l_extent.end, &start, &end))
374 /* FIXME raid1 should grace this error */
375 if (lov->tgts[loi->loi_ost_idx].active == 0) {
376 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
377 GOTO(out_set, rc = -EIO);
380 OBD_ALLOC(req, sizeof(*req));
382 GOTO(out_set, rc = -ENOMEM);
384 req->rq_buflen = sizeof(*req->rq_md);
385 OBD_ALLOC(req->rq_md, req->rq_buflen);
386 if (req->rq_md == NULL)
387 GOTO(out_set, rc = -ENOMEM);
389 req->rq_extent.start = start;
390 req->rq_extent.end = end;
391 req->rq_extent.gid = policy->l_extent.gid;
393 req->rq_idx = loi->loi_ost_idx;
396 /* XXX LOV STACKING: submd should be from the subobj */
397 req->rq_md->lsm_object_id = loi->loi_id;
398 req->rq_md->lsm_stripe_count = 0;
400 lov_set_add_req(req, set);
403 GOTO(out_set, rc = -EIO);
407 lov_fini_match_set(set, mode, 0);
411 int lov_fini_cancel_set(struct lov_request_set *set)
416 LASSERT(set->set_exp);
421 lov_llh_put(set->set_lockh);
423 if (atomic_dec_and_test(&set->set_refcount))
429 int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm,
430 __u32 mode, struct lustre_handle *lockh,
431 struct lov_request_set **reqset)
433 struct lov_request_set *set;
435 struct lov_oinfo *loi;
438 OBD_ALLOC(set, sizeof(*set));
445 set->set_lockh = lov_handle2llh(lockh);
446 if (set->set_lockh == NULL) {
447 CERROR("LOV: invalid lov lock handle %p\n", lockh);
448 GOTO(out_set, rc = -EINVAL);
450 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
452 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
453 struct lov_request *req;
454 struct lustre_handle *lov_lockhp;
456 lov_lockhp = set->set_lockh->llh_handles + i;
457 if (lov_lockhp->cookie == 0) {
458 CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
459 loi->loi_ost_idx, loi->loi_id);
463 OBD_ALLOC(req, sizeof(*req));
465 GOTO(out_set, rc = -ENOMEM);
467 req->rq_buflen = sizeof(*req->rq_md);
468 OBD_ALLOC(req->rq_md, req->rq_buflen);
469 if (req->rq_md == NULL)
470 GOTO(out_set, rc = -ENOMEM);
472 req->rq_idx = loi->loi_ost_idx;
475 /* XXX LOV STACKING: submd should be from the subobj */
476 req->rq_md->lsm_object_id = loi->loi_id;
477 req->rq_md->lsm_stripe_count = 0;
479 lov_set_add_req(req, set);
482 GOTO(out_set, rc = -EIO);
486 lov_fini_cancel_set(set);
490 static int create_done(struct obd_export *exp, struct lov_request_set *set,
491 struct lov_stripe_md **lsmp)
493 struct lov_obd *lov = &exp->exp_obd->u.lov;
494 struct obd_trans_info *oti = set->set_oti;
495 struct obdo *src_oa = set->set_oa;
496 struct lov_request *req;
497 struct obdo *ret_oa = NULL;
498 int attrset = 0, rc = 0;
501 LASSERT(set->set_completes);
503 /* try alloc objects on other osts if osc_create fails for
504 * exceptions: RPC failure, ENOSPC, etc */
505 if (set->set_count != set->set_success) {
506 list_for_each_entry (req, &set->set_list, rq_link) {
510 set->set_completes--;
511 req->rq_complete = 0;
513 rc = qos_remedy_create(set, req);
514 lov_update_create_set(set, req, rc);
521 /* no successful creates */
522 if (set->set_success == 0)
525 /* If there was an explicit stripe set, fail. Otherwise, we
526 * got some objects and that's not bad. */
527 if (set->set_count != set->set_success) {
530 set->set_count = set->set_success;
534 ret_oa = obdo_alloc();
536 GOTO(cleanup, rc = -ENOMEM);
538 list_for_each_entry(req, &set->set_list, rq_link) {
539 if (!req->rq_complete || req->rq_rc)
541 lov_merge_attrs(ret_oa, req->rq_oa, req->rq_oa->o_valid,
542 set->set_md, req->rq_stripe, &attrset);
544 if (src_oa->o_valid & OBD_MD_FLSIZE &&
545 ret_oa->o_size != src_oa->o_size) {
546 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
547 src_oa->o_size, ret_oa->o_size);
550 ret_oa->o_id = src_oa->o_id;
551 memcpy(src_oa, ret_oa, sizeof(*src_oa));
558 list_for_each_entry(req, &set->set_list, rq_link) {
559 struct obd_export *sub_exp;
562 if (!req->rq_complete || req->rq_rc)
565 sub_exp = lov->tgts[req->rq_idx].ltd_exp;
566 err = obd_destroy(sub_exp, req->rq_oa, NULL, oti);
568 CERROR("Failed to uncreate objid "LPX64" subobj "
569 LPX64" on OST idx %d: rc = %d\n",
570 set->set_oa->o_id, req->rq_oa->o_id,
574 obd_free_memmd(exp, &set->set_md);
576 if (oti && set->set_cookies) {
577 oti->oti_logcookies = set->set_cookies;
578 if (!set->set_cookie_sent) {
579 oti_free_cookies(oti);
580 src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
582 src_oa->o_valid |= OBD_MD_FLCOOKIE;
588 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
593 LASSERT(set->set_exp);
596 if (set->set_completes) {
597 rc = create_done(set->set_exp, set, lsmp);
598 /* FIXME update qos data here */
601 if (atomic_dec_and_test(&set->set_refcount))
607 int lov_update_create_set(struct lov_request_set *set,
608 struct lov_request *req, int rc)
610 struct obd_trans_info *oti = set->set_oti;
611 struct lov_stripe_md *lsm = set->set_md;
612 struct lov_oinfo *loi;
613 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
616 req->rq_stripe = set->set_success;
617 loi = &lsm->lsm_oinfo[req->rq_stripe];
619 if (rc && lov->tgts[req->rq_idx].active) {
620 CERROR("error creating fid "LPX64" sub-object"
621 " on OST idx %d/%d: rc = %d\n",
622 set->set_oa->o_id, req->rq_idx,
623 lsm->lsm_stripe_count, rc);
625 CERROR("obd_create returned invalid err %d\n", rc);
629 lov_update_set(set, req, rc);
633 if (oti && oti->oti_objid)
634 oti->oti_objid[req->rq_idx] = req->rq_oa->o_id;
636 loi->loi_id = req->rq_oa->o_id;
637 loi->loi_ost_idx = req->rq_idx;
638 CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPX64" at idx %d\n",
639 lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
642 if (set->set_cookies)
643 ++oti->oti_logcookies;
644 if (req->rq_oa->o_valid & OBD_MD_FLCOOKIE)
645 set->set_cookie_sent++;
650 int lov_prep_create_set(struct obd_export *exp, struct lov_stripe_md **lsmp,
651 struct obdo *src_oa, struct obd_trans_info *oti,
652 struct lov_request_set **reqset)
654 struct lov_obd *lov = &exp->exp_obd->u.lov;
655 struct lov_request_set *set;
656 int rc = 0, newea = 0;
659 OBD_ALLOC(set, sizeof(*set));
666 set->set_oa = src_oa;
669 if (set->set_md == NULL) {
670 int stripes, stripe_cnt;
671 stripe_cnt = lov_get_stripecnt(lov, 0);
673 /* If the MDS file was truncated up to some size, stripe over
674 * enough OSTs to allow the file to be created at that size. */
675 if (src_oa->o_valid & OBD_MD_FLSIZE) {
676 stripes=((src_oa->o_size+LUSTRE_STRIPE_MAXBYTES)>>12)-1;
677 do_div(stripes, (__u32)(LUSTRE_STRIPE_MAXBYTES >> 12));
679 if (stripes > lov->desc.ld_active_tgt_count)
680 GOTO(out_set, rc = -EFBIG);
681 if (stripes < stripe_cnt)
682 stripes = stripe_cnt;
684 stripes = stripe_cnt;
687 rc = lov_alloc_memmd(&set->set_md, stripes,
688 lov->desc.ld_pattern ?
689 lov->desc.ld_pattern : LOV_PATTERN_RAID0);
695 rc = qos_prep_create(lov, set, newea);
699 if (oti && (src_oa->o_valid & OBD_MD_FLCOOKIE)) {
700 oti_alloc_cookies(oti, set->set_count);
701 if (!oti->oti_logcookies)
703 set->set_cookies = oti->oti_logcookies;
710 obd_free_memmd(exp, &set->set_md);
712 lov_fini_create_set(set, lsmp);
716 static int common_attr_done(struct lov_request_set *set)
718 struct list_head *pos;
719 struct lov_request *req;
721 int rc = 0, attrset = 0;
724 if (set->set_oa == NULL)
727 if (!set->set_success)
730 tmp_oa = obdo_alloc();
732 GOTO(out, rc = -ENOMEM);
734 list_for_each (pos, &set->set_list) {
735 req = list_entry(pos, struct lov_request, rq_link);
737 if (!req->rq_complete || req->rq_rc)
739 if (req->rq_oa->o_valid == 0) /* inactive stripe */
741 lov_merge_attrs(tmp_oa, req->rq_oa, req->rq_oa->o_valid,
742 set->set_md, req->rq_stripe, &attrset);
745 CERROR("No stripes had valid attrs\n");
748 tmp_oa->o_id = set->set_oa->o_id;
749 memcpy(set->set_oa, tmp_oa, sizeof(*set->set_oa));
757 static int brw_done(struct lov_request_set *set)
759 struct lov_stripe_md *lsm = set->set_md;
760 struct lov_oinfo *loi = NULL;
761 struct list_head *pos;
762 struct lov_request *req;
765 list_for_each (pos, &set->set_list) {
766 req = list_entry(pos, struct lov_request, rq_link);
768 if (!req->rq_complete || req->rq_rc)
771 loi = &lsm->lsm_oinfo[req->rq_stripe];
773 if (req->rq_oa->o_valid & OBD_MD_FLBLOCKS)
774 loi->loi_blocks = req->rq_oa->o_blocks;
780 int lov_fini_brw_set(struct lov_request_set *set)
785 LASSERT(set->set_exp);
788 if (set->set_completes) {
790 /* FIXME update qos data here */
792 if (atomic_dec_and_test(&set->set_refcount))
798 int lov_prep_brw_set(struct obd_export *exp, struct obdo *src_oa,
799 struct lov_stripe_md *lsm, obd_count oa_bufs,
800 struct brw_page *pga, struct obd_trans_info *oti,
801 struct lov_request_set **reqset)
808 struct lov_request_set *set;
809 struct lov_oinfo *loi = NULL;
810 struct lov_obd *lov = &exp->exp_obd->u.lov;
811 int rc = 0, i, shift;
814 OBD_ALLOC(set, sizeof(*set));
821 set->set_oa = src_oa;
823 set->set_oabufs = oa_bufs;
824 OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
826 GOTO(out, rc = -ENOMEM);
828 OBD_ALLOC(info, sizeof(*info) * lsm->lsm_stripe_count);
830 GOTO(out, rc = -ENOMEM);
832 /* calculate the page count for each stripe */
833 for (i = 0; i < oa_bufs; i++) {
834 int stripe = lov_stripe_number(lsm, pga[i].off);
835 info[stripe].count++;
838 /* alloc and initialize lov request */
840 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
841 struct lov_request *req;
843 if (info[i].count == 0)
846 if (lov->tgts[loi->loi_ost_idx].active == 0) {
847 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
848 GOTO(out, rc = -EIO);
851 OBD_ALLOC(req, sizeof(*req));
853 GOTO(out, rc = -ENOMEM);
855 req->rq_oa = obdo_alloc();
856 if (req->rq_oa == NULL)
857 GOTO(out, rc = -ENOMEM);
860 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
861 req->rq_oa->o_id = loi->loi_id;
863 req->rq_buflen = sizeof(*req->rq_md);
864 OBD_ALLOC(req->rq_md, req->rq_buflen);
865 if (req->rq_md == NULL)
866 GOTO(out, rc = -ENOMEM);
868 req->rq_idx = loi->loi_ost_idx;
871 /* XXX LOV STACKING */
872 req->rq_md->lsm_object_id = loi->loi_id;
873 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
874 req->rq_oabufs = info[i].count;
875 req->rq_pgaidx = shift;
876 shift += req->rq_oabufs;
878 /* remember the index for sort brw_page array */
879 info[i].index = req->rq_pgaidx;
881 lov_set_add_req(req, set);
884 GOTO(out, rc = -EIO);
886 /* rotate & sort the brw_page array */
887 for (i = 0; i < oa_bufs; i++) {
888 int stripe = lov_stripe_number(lsm, pga[i].off);
890 shift = info[stripe].index + info[stripe].off;
891 LASSERT(shift < oa_bufs);
892 set->set_pga[shift] = pga[i];
893 lov_stripe_offset(lsm, pga[i].off, stripe,
894 &set->set_pga[shift].off);
899 OBD_FREE(info, sizeof(*info) * lsm->lsm_stripe_count);
904 lov_fini_brw_set(set);
909 int lov_fini_getattr_set(struct lov_request_set *set)
914 LASSERT(set->set_exp);
917 if (set->set_completes)
918 rc = common_attr_done(set);
920 if (atomic_dec_and_test(&set->set_refcount))
926 int lov_prep_getattr_set(struct obd_export *exp, struct obdo *src_oa,
927 struct lov_stripe_md *lsm,
928 struct lov_request_set **reqset)
930 struct lov_request_set *set;
931 struct lov_oinfo *loi = NULL;
932 struct lov_obd *lov = &exp->exp_obd->u.lov;
936 OBD_ALLOC(set, sizeof(*set));
943 set->set_oa = src_oa;
945 loi = lsm->lsm_oinfo;
946 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
947 struct lov_request *req;
949 if (lov->tgts[loi->loi_ost_idx].active == 0) {
950 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
954 OBD_ALLOC(req, sizeof(*req));
956 GOTO(out_set, rc = -ENOMEM);
959 req->rq_idx = loi->loi_ost_idx;
961 req->rq_oa = obdo_alloc();
962 if (req->rq_oa == NULL)
963 GOTO(out_set, rc = -ENOMEM);
964 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
965 req->rq_oa->o_id = loi->loi_id;
967 lov_set_add_req(req, set);
970 GOTO(out_set, rc = -EIO);
974 lov_fini_getattr_set(set);
978 int lov_fini_destroy_set(struct lov_request_set *set)
982 LASSERT(set->set_exp);
985 if (set->set_completes) {
986 /* FIXME update qos data here */
989 if (atomic_dec_and_test(&set->set_refcount))
995 int lov_prep_destroy_set(struct obd_export *exp, struct obdo *src_oa,
996 struct lov_stripe_md *lsm,
997 struct obd_trans_info *oti,
998 struct lov_request_set **reqset)
1000 struct lov_request_set *set;
1001 struct lov_oinfo *loi = NULL;
1002 struct lov_obd *lov = &exp->exp_obd->u.lov;
1003 int rc = 0, cookie_set = 0, i;
1006 OBD_ALLOC(set, sizeof(*set));
1013 set->set_oa = src_oa;
1015 if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1016 set->set_cookies = oti->oti_logcookies;
1018 loi = lsm->lsm_oinfo;
1019 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1020 struct lov_request *req;
1022 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1023 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1027 OBD_ALLOC(req, sizeof(*req));
1029 GOTO(out_set, rc = -ENOMEM);
1032 req->rq_idx = loi->loi_ost_idx;
1034 req->rq_oa = obdo_alloc();
1035 if (req->rq_oa == NULL)
1036 GOTO(out_set, rc = -ENOMEM);
1037 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1038 req->rq_oa->o_id = loi->loi_id;
1040 /* Setup the first request's cookie position */
1041 if (!cookie_set && set->set_cookies) {
1042 oti->oti_logcookies = set->set_cookies + i;
1045 lov_set_add_req(req, set);
1047 if (!set->set_count)
1048 GOTO(out_set, rc = -EIO);
1052 lov_fini_destroy_set(set);
1056 int lov_fini_setattr_set(struct lov_request_set *set)
1061 LASSERT(set->set_exp);
1064 if (set->set_completes) {
1065 rc = common_attr_done(set);
1066 /* FIXME update qos data here */
1069 if (atomic_dec_and_test(&set->set_refcount))
1070 lov_finish_set(set);
1074 int lov_prep_setattr_set(struct obd_export *exp, struct obdo *src_oa,
1075 struct lov_stripe_md *lsm, struct obd_trans_info *oti,
1076 struct lov_request_set **reqset)
1078 struct lov_request_set *set;
1079 struct lov_oinfo *loi = NULL;
1080 struct lov_obd *lov = &exp->exp_obd->u.lov;
1084 OBD_ALLOC(set, sizeof(*set));
1091 set->set_oa = src_oa;
1093 loi = lsm->lsm_oinfo;
1094 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1095 struct lov_request *req;
1097 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1098 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1102 OBD_ALLOC(req, sizeof(*req));
1104 GOTO(out_set, rc = -ENOMEM);
1106 req->rq_idx = loi->loi_ost_idx;
1108 req->rq_oa = obdo_alloc();
1109 if (req->rq_oa == NULL)
1110 GOTO(out_set, rc = -ENOMEM);
1111 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1112 req->rq_oa->o_id = loi->loi_id;
1114 if (src_oa->o_valid & OBD_MD_FLSIZE) {
1115 if (lov_stripe_offset(lsm, src_oa->o_size, i,
1116 &req->rq_oa->o_size) < 0 &&
1118 req->rq_oa->o_size--;
1119 CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1120 i, req->rq_oa->o_size, src_oa->o_size);
1122 lov_set_add_req(req, set);
1124 if (!set->set_count)
1125 GOTO(out_set, rc = -EIO);
1129 lov_fini_setattr_set(set);
1133 int lov_update_setattr_set(struct lov_request_set *set,
1134 struct lov_request *req, int rc)
1136 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
1139 lov_update_set(set, req, rc);
1141 /* grace error on inactive ost */
1142 if (rc && !lov->tgts[req->rq_idx].active)
1145 /* FIXME: LOV STACKING update loi data should be done by OSC *
1146 * when this is gone we can go back to using lov_update_common_set() */
1147 if (rc == 0 && req->rq_oa->o_valid & OBD_MD_FLMTIME)
1148 set->set_md->lsm_oinfo[req->rq_stripe].loi_mtime =
1149 req->rq_oa->o_mtime;
1150 /* ditto loi_atime, loi_ctime when available */
1155 int lov_update_punch_set(struct lov_request_set *set, struct lov_request *req,
1158 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
1161 lov_update_set(set, req, rc);
1162 if (rc && !lov->tgts[req->rq_idx].active)
1164 /* FIXME in raid1 regime, should return 0 */
1168 int lov_fini_punch_set(struct lov_request_set *set)
1173 LASSERT(set->set_exp);
1176 if (set->set_completes) {
1177 if (!set->set_success)
1179 /* FIXME update qos data here */
1182 if (atomic_dec_and_test(&set->set_refcount))
1183 lov_finish_set(set);
1188 int lov_prep_punch_set(struct obd_export *exp, struct obdo *src_oa,
1189 struct lov_stripe_md *lsm, obd_off start,
1190 obd_off end, struct obd_trans_info *oti,
1191 struct lov_request_set **reqset)
1193 struct lov_request_set *set;
1194 struct lov_oinfo *loi = NULL;
1195 struct lov_obd *lov = &exp->exp_obd->u.lov;
1199 OBD_ALLOC(set, sizeof(*set));
1206 set->set_oa = src_oa;
1208 loi = lsm->lsm_oinfo;
1209 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1210 struct lov_request *req;
1213 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1214 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1218 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1221 OBD_ALLOC(req, sizeof(*req));
1223 GOTO(out_set, rc = -ENOMEM);
1225 req->rq_idx = loi->loi_ost_idx;
1227 req->rq_oa = obdo_alloc();
1228 if (req->rq_oa == NULL)
1229 GOTO(out_set, rc = -ENOMEM);
1230 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1231 req->rq_oa->o_id = loi->loi_id;
1233 req->rq_extent.start = rs;
1234 req->rq_extent.end = re;
1235 req->rq_extent.gid = -1;
1237 lov_set_add_req(req, set);
1239 if (!set->set_count)
1240 GOTO(out_set, rc = -EIO);
1244 lov_fini_punch_set(set);
1248 int lov_fini_sync_set(struct lov_request_set *set)
1253 LASSERT(set->set_exp);
1256 if (set->set_completes) {
1257 if (!set->set_success)
1259 /* FIXME update qos data here */
1262 if (atomic_dec_and_test(&set->set_refcount))
1263 lov_finish_set(set);
1268 int lov_prep_sync_set(struct obd_export *exp, struct obdo *src_oa,
1269 struct lov_stripe_md *lsm, obd_off start,
1270 obd_off end, struct lov_request_set **reqset)
1272 struct lov_request_set *set;
1273 struct lov_oinfo *loi = NULL;
1274 struct lov_obd *lov = &exp->exp_obd->u.lov;
1278 OBD_ALLOC(set, sizeof(*set));
1285 set->set_oa = src_oa;
1287 loi = lsm->lsm_oinfo;
1288 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1289 struct lov_request *req;
1292 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1293 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1297 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1300 OBD_ALLOC(req, sizeof(*req));
1302 GOTO(out_set, rc = -ENOMEM);
1304 req->rq_idx = loi->loi_ost_idx;
1306 req->rq_oa = obdo_alloc();
1307 if (req->rq_oa == NULL)
1308 GOTO(out_set, rc = -ENOMEM);
1309 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1310 req->rq_oa->o_id = loi->loi_id;
1312 req->rq_extent.start = rs;
1313 req->rq_extent.end = re;
1314 req->rq_extent.gid = -1;
1316 lov_set_add_req(req, set);
1318 if (!set->set_count)
1319 GOTO(out_set, rc = -EIO);
1323 lov_fini_sync_set(set);