1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
6 * This file is part of Lustre, http://www.lustre.org.
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 # define EXPORT_SYMTAB
25 #define DEBUG_SUBSYSTEM S_LOV
28 #include <asm/div64.h>
30 #include <liblustre.h>
33 #include <linux/obd_class.h>
34 #include <linux/obd_lov.h>
35 #include <linux/lustre_idl.h>
37 #include "lov_internal.h"
39 static void lov_init_set(struct lov_request_set *set)
42 set->set_completes = 0;
44 INIT_LIST_HEAD(&set->set_list);
45 atomic_set(&set->set_refcount, 1);
48 static void lov_finish_set(struct lov_request_set *set)
50 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
51 struct list_head *pos, *n;
55 list_for_each_safe(pos, n, &set->set_list) {
56 struct lov_request *req;
57 struct lov_tgt_desc *tgt;
59 req = list_entry(pos, struct lov_request, rq_link);
60 LASSERT(req->rq_idx >= 0);
62 tgt = lov->tgts + req->rq_idx;
63 lov_tgt_decref(lov, tgt);
65 list_del_init(&req->rq_link);
68 obdo_free(req->rq_oa);
70 OBD_FREE(req->rq_md, req->rq_buflen);
71 OBD_FREE(req, sizeof(*req));
75 int len = set->set_oabufs * sizeof(*set->set_pga);
76 OBD_FREE(set->set_pga, len);
79 lov_llh_put(set->set_lockh);
81 OBD_FREE(set, sizeof(*set));
85 static void lov_update_set(struct lov_request_set *set,
86 struct lov_request *req, int rc)
96 int lov_update_common_set(struct lov_request_set *set,
97 struct lov_request *req, int rc)
99 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
102 lov_update_set(set, req, rc);
104 /* grace error on inactive ost */
106 struct lov_tgt_desc *tgt = lov->tgts + req->rq_idx;
108 if (lov_tgt_active(lov, tgt, req->rq_gen))
109 lov_tgt_decref(lov, tgt);
114 /* FIXME in raid1 regime, should return 0 */
118 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
120 list_add_tail(&req->rq_link, &set->set_list);
124 int lov_update_enqueue_set(struct lov_request_set *set,
125 struct lov_request *req, int rc, int flags)
127 struct lustre_handle *lov_lockhp;
128 struct lov_oinfo *loi;
131 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
132 loi = &set->set_md->lsm_oinfo[req->rq_stripe];
134 /* XXX FIXME: This unpleasantness doesn't belong here at *all*.
135 * It belongs in the OSC, except that the OSC doesn't have
136 * access to the real LOI -- it gets a copy, that we created
137 * above, and that copy can be arbitrarily out of date.
139 * The LOV API is due for a serious rewriting anyways, and this
140 * can be addressed then. */
141 if (rc == ELDLM_OK) {
142 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
143 __u64 tmp = req->rq_md->lsm_oinfo->loi_rss;
145 LASSERT(lock != NULL);
147 loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
148 loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks;
149 /* Extend KMS up to the end of this lock and no further
150 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
151 if (tmp > lock->l_policy_data.l_extent.end)
152 tmp = lock->l_policy_data.l_extent.end + 1;
153 if (tmp >= loi->loi_kms) {
154 CDEBUG(D_INODE, "lock acquired, setting rss="
155 LPU64", kms="LPU64"\n", loi->loi_rss, tmp);
157 loi->loi_kms_valid = 1;
159 CDEBUG(D_INODE, "lock acquired, setting rss="
160 LPU64"; leaving kms="LPU64", end="LPU64
161 "\n", loi->loi_rss, loi->loi_kms,
162 lock->l_policy_data.l_extent.end);
164 ldlm_lock_allow_match(lock);
166 } else if (rc == ELDLM_LOCK_ABORTED && flags & LDLM_FL_HAS_INTENT) {
167 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
168 loi->loi_rss = req->rq_md->lsm_oinfo->loi_rss;
169 loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
170 loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks;
171 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
172 " kms="LPU64"\n", loi->loi_rss, loi->loi_kms);
175 struct obd_export *exp = set->set_exp;
176 struct lov_obd *lov = &exp->exp_obd->u.lov;
177 struct lov_tgt_desc *tgt = lov->tgts + req->rq_idx;
179 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
180 if (lov_tgt_ready(lov, tgt, req->rq_gen)) {
181 lov_tgt_decref(lov, tgt);
182 CERROR("error: enqueue objid "LPX64" subobj "
183 LPX64" on OST idx %d: rc = %d\n",
184 set->set_md->lsm_object_id, loi->loi_id,
185 loi->loi_ost_idx, rc);
187 CERROR("error: enqueue objid "LPX64" subobj "
188 LPX64" on OST idx %d: rc = %d, NOT ACTIVE !\n",
189 set->set_md->lsm_object_id, loi->loi_id,
190 loi->loi_ost_idx, rc);
194 lov_update_set(set, req, rc);
198 static int enqueue_done(struct lov_request_set *set, __u32 mode, int flags)
200 struct list_head *pos;
201 struct lov_request *req;
202 struct lustre_handle *lov_lockhp = NULL;
203 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
204 struct lov_tgt_desc *tgt;
208 LASSERT(set->set_completes);
209 /* enqueue/match success, just return */
210 if (set->set_completes == set->set_success) {
211 if (flags & LDLM_FL_TEST_LOCK)
212 lov_llh_put(set->set_lockh);
216 /* cancel enqueued/matched locks */
217 list_for_each (pos, &set->set_list) {
218 req = list_entry(pos, struct lov_request, rq_link);
220 if (!req->rq_complete || req->rq_rc)
222 if (flags & LDLM_FL_TEST_LOCK)
225 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
227 if (lov_lockhp->cookie == 0)
230 tgt = lov->tgts + req->rq_idx;
231 rc = obd_cancel(tgt->ltd_exp, req->rq_md, mode, lov_lockhp);
232 if (rc && lov_tgt_ready(lov, tgt, req->rq_gen)) {
233 lov_tgt_decref(lov, tgt);
234 CERROR("cancelling obdjid "LPX64" on OST "
235 "idx %d error: rc = %d\n",
236 req->rq_md->lsm_object_id, req->rq_idx, rc);
239 lov_llh_put(set->set_lockh);
243 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode)
248 LASSERT(set->set_exp);
251 if (set->set_completes)
252 rc = enqueue_done(set, mode, 0);
254 lov_llh_put(set->set_lockh);
256 if (atomic_dec_and_test(&set->set_refcount))
262 int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm,
263 ldlm_policy_data_t *policy, __u32 mode,
264 struct lustre_handle *lockh,
265 struct lov_request_set **reqset)
267 struct lov_obd *lov = &exp->exp_obd->u.lov;
268 struct lov_request_set *set;
270 struct lov_oinfo *loi;
273 OBD_ALLOC(set, sizeof(*set));
280 set->set_lockh = lov_llh_new(lsm);
281 if (set->set_lockh == NULL)
282 GOTO(out_set, rc = -ENOMEM);
283 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
285 loi = lsm->lsm_oinfo;
286 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
287 struct lov_tgt_desc *tgt;
288 struct lov_request *req;
291 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
292 policy->l_extent.end, &start, &end))
295 tgt = lov->tgts + loi->loi_ost_idx;
296 if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
297 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
301 OBD_ALLOC(req, sizeof(*req));
303 lov_tgt_decref(lov, tgt);
304 GOTO(out_set, rc = -ENOMEM);
307 req->rq_buflen = sizeof(*req->rq_md) +
308 sizeof(struct lov_oinfo);
309 OBD_ALLOC(req->rq_md, req->rq_buflen);
310 if (req->rq_md == NULL) {
311 OBD_FREE(req, sizeof(*req));
312 lov_tgt_decref(lov, tgt);
313 GOTO(out_set, rc = -ENOMEM);
316 req->rq_extent.start = start;
317 req->rq_extent.end = end;
319 req->rq_idx = loi->loi_ost_idx;
320 req->rq_gen = loi->loi_ost_gen;
323 /* XXX LOV STACKING: submd should be from the subobj */
324 req->rq_md->lsm_object_id = loi->loi_id;
325 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
326 req->rq_md->lsm_stripe_count = 0;
327 req->rq_md->lsm_oinfo->loi_kms_valid = loi->loi_kms_valid;
328 req->rq_md->lsm_oinfo->loi_rss = loi->loi_rss;
329 req->rq_md->lsm_oinfo->loi_kms = loi->loi_kms;
330 req->rq_md->lsm_oinfo->loi_blocks = loi->loi_blocks;
331 loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
333 lov_set_add_req(req, set);
336 GOTO(out_set, rc = -EIO);
340 lov_fini_enqueue_set(set, mode);
344 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
348 lov_update_set(set, req, !rc);
352 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
357 LASSERT(set->set_exp);
360 if (set->set_completes)
361 rc = enqueue_done(set, mode, flags);
363 lov_llh_put(set->set_lockh);
365 if (atomic_dec_and_test(&set->set_refcount))
371 int lov_prep_match_set(struct obd_export *exp, struct lov_stripe_md *lsm,
372 ldlm_policy_data_t *policy, __u32 mode,
373 struct lustre_handle *lockh,
374 struct lov_request_set **reqset)
376 struct lov_obd *lov = &exp->exp_obd->u.lov;
377 struct lov_request_set *set;
379 struct lov_oinfo *loi;
382 OBD_ALLOC(set, sizeof(*set));
389 set->set_lockh = lov_llh_new(lsm);
390 if (set->set_lockh == NULL)
391 GOTO(out_set, rc = -ENOMEM);
392 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
394 loi = lsm->lsm_oinfo;
395 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
396 struct lov_tgt_desc *tgt;
397 struct lov_request *req;
400 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
401 policy->l_extent.end, &start, &end))
404 /* FIXME raid1 should grace this error */
405 tgt = lov->tgts + loi->loi_ost_idx;
406 if (!lov_tgt_ready(lov, tgt, loi->loi_ost_gen)) {
407 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
408 GOTO(out_set, rc = -EIO);
411 OBD_ALLOC(req, sizeof(*req));
413 lov_tgt_decref(lov, tgt);
414 GOTO(out_set, rc = -ENOMEM);
417 req->rq_buflen = sizeof(*req->rq_md);
418 OBD_ALLOC(req->rq_md, req->rq_buflen);
419 if (req->rq_md == NULL) {
420 OBD_FREE(req, sizeof(*req));
421 lov_tgt_decref(lov, tgt);
422 GOTO(out_set, rc = -ENOMEM);
425 req->rq_extent.start = start;
426 req->rq_extent.end = end;
428 req->rq_idx = loi->loi_ost_idx;
429 req->rq_gen = loi->loi_ost_gen;
432 /* XXX LOV STACKING: submd should be from the subobj */
433 req->rq_md->lsm_object_id = loi->loi_id;
434 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
435 req->rq_md->lsm_stripe_count = 0;
436 lov_set_add_req(req, set);
439 GOTO(out_set, rc = -EIO);
443 lov_fini_match_set(set, mode, 0);
447 int lov_fini_cancel_set(struct lov_request_set *set)
452 LASSERT(set->set_exp);
457 lov_llh_put(set->set_lockh);
459 if (atomic_dec_and_test(&set->set_refcount))
465 int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm,
466 __u32 mode, struct lustre_handle *lockh,
467 struct lov_request_set **reqset)
469 struct lov_obd *lov = &exp->exp_obd->u.lov;
470 struct lov_request_set *set;
472 struct lov_oinfo *loi;
475 OBD_ALLOC(set, sizeof(*set));
482 set->set_lockh = lov_handle2llh(lockh);
483 if (set->set_lockh == NULL) {
484 CERROR("LOV: invalid lov lock handle %p\n", lockh);
485 GOTO(out_set, rc = -EINVAL);
487 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
489 loi = lsm->lsm_oinfo;
490 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
491 struct lov_tgt_desc *tgt;
492 struct lov_request *req;
493 struct lustre_handle *lov_lockhp;
495 lov_lockhp = set->set_lockh->llh_handles + i;
496 if (lov_lockhp->cookie == 0) {
497 CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
498 loi->loi_ost_idx, loi->loi_id);
502 tgt = lov->tgts + loi->loi_ost_idx;
503 if (!lov_tgt_ready(lov, tgt, loi->loi_ost_gen)) {
504 CERROR("lov idx %d subobj "LPX64" osc inactive?\n",
505 loi->loi_ost_idx, loi->loi_id);
509 OBD_ALLOC(req, sizeof(*req));
511 lov_tgt_decref(lov, tgt);
512 GOTO(out_set, rc = -ENOMEM);
515 req->rq_buflen = sizeof(*req->rq_md);
516 OBD_ALLOC(req->rq_md, req->rq_buflen);
517 if (req->rq_md == NULL) {
518 OBD_FREE(req, sizeof(*req));
519 lov_tgt_decref(lov, tgt);
520 GOTO(out_set, rc = -ENOMEM);
523 req->rq_idx = loi->loi_ost_idx;
524 req->rq_gen = loi->loi_ost_gen;
527 /* XXX LOV STACKING: submd should be from the subobj */
528 req->rq_md->lsm_object_id = loi->loi_id;
529 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
530 req->rq_md->lsm_stripe_count = 0;
531 lov_set_add_req(req, set);
534 GOTO(out_set, rc = -EIO);
538 lov_fini_cancel_set(set);
542 static int create_done(struct obd_export *exp, struct lov_request_set *set,
543 struct lov_stripe_md **ea)
545 struct lov_obd *lov = &exp->exp_obd->u.lov;
546 struct obd_trans_info *oti = set->set_oti;
547 struct obdo *src_oa = set->set_oa;
548 struct list_head *pos;
549 struct lov_request *req;
550 struct obdo *ret_oa = NULL;
551 int attrset = 0, rc = 0;
554 LASSERT(set->set_completes);
556 if (!set->set_success)
557 GOTO(cleanup, rc = -EIO);
559 if (*ea == NULL && set->set_count != set->set_success) {
560 set->set_count = set->set_success;
564 ret_oa = obdo_alloc();
566 GOTO(cleanup, rc = -ENOMEM);
568 list_for_each (pos, &set->set_list) {
569 req = list_entry(pos, struct lov_request, rq_link);
570 if (!req->rq_complete || req->rq_rc)
572 lov_merge_attrs(ret_oa, req->rq_oa, req->rq_oa->o_valid,
573 set->set_md, req->rq_stripe, &attrset);
575 if (src_oa->o_valid & OBD_MD_FLSIZE &&
576 ret_oa->o_size != src_oa->o_size) {
577 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
578 src_oa->o_size, ret_oa->o_size);
581 ret_oa->o_id = src_oa->o_id;
582 ret_oa->o_gr = src_oa->o_gr;
583 ret_oa->o_valid |= OBD_MD_FLGROUP;
584 memcpy(src_oa, ret_oa, sizeof(*src_oa));
592 list_for_each (pos, &set->set_list) {
593 struct lov_tgt_desc *tgt;
595 req = list_entry(pos, struct lov_request, rq_link);
597 if (!req->rq_complete || req->rq_rc)
600 tgt = lov->tgts + req->rq_idx;
601 if (!lov_tgt_ready(lov, tgt, req->rq_gen)) {
602 CERROR("Failed to uncreate objid "LPX64" subobj "
603 LPX64" on OST idx %d: osc inactive.\n",
604 set->set_oa->o_id, req->rq_oa->o_id,
609 err = obd_destroy(tgt->ltd_exp, req->rq_oa, NULL, oti);
610 lov_tgt_decref(lov, tgt);
612 CERROR("Failed to uncreate objid "LPX64" subobj "
613 LPX64" on OST idx %d: rc = %d\n",
614 set->set_oa->o_id, req->rq_oa->o_id,
618 obd_free_memmd(exp, &set->set_md);
620 if (oti && set->set_cookies) {
621 oti->oti_logcookies = set->set_cookies;
622 if (!set->set_cookie_sent) {
623 oti_free_cookies(oti);
624 src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
626 src_oa->o_valid |= OBD_MD_FLCOOKIE;
632 int lov_fini_create_set(struct lov_request_set *set, struct lov_stripe_md **ea)
637 LASSERT(set->set_exp);
640 if (set->set_completes) {
641 rc = create_done(set->set_exp, set, ea);
642 /* FIXME update qos data here */
645 if (atomic_dec_and_test(&set->set_refcount))
651 int lov_update_create_set(struct lov_request_set *set,
652 struct lov_request *req, int rc)
654 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
655 struct obd_trans_info *oti = set->set_oti;
656 struct lov_stripe_md *lsm = set->set_md;
657 struct lov_oinfo *loi;
658 struct lov_tgt_desc *tgt;
661 req->rq_stripe = set->set_success;
662 loi = &lsm->lsm_oinfo[req->rq_stripe];
663 tgt = lov->tgts + req->rq_idx;
665 if (rc && lov_tgt_ready(lov, tgt, req->rq_gen)) {
666 lov_tgt_decref(lov, tgt);
667 CERROR("error creating objid "LPX64" sub-object"
668 " on OST idx %d/%d: rc = %d\n",
669 set->set_oa->o_id, req->rq_idx,
670 lsm->lsm_stripe_count, rc);
672 CERROR("obd_create returned invalid err %d\n", rc);
676 lov_update_set(set, req, rc);
680 if (oti && oti->oti_objid)
681 oti->oti_objid[req->rq_idx] = req->rq_oa->o_id;
683 loi->loi_id = req->rq_oa->o_id;
684 loi->loi_gr = req->rq_oa->o_gr;
685 loi->loi_ost_idx = req->rq_idx;
686 loi->loi_ost_gen = req->rq_gen;
687 CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPX64" at "
689 lsm->lsm_object_id, loi->loi_id, loi->loi_id,
690 req->rq_idx, req->rq_gen);
693 if (set->set_cookies)
694 ++oti->oti_logcookies;
695 if (req->rq_oa->o_valid & OBD_MD_FLCOOKIE)
696 set->set_cookie_sent++;
701 int lov_prep_create_set(struct obd_export *exp, struct lov_stripe_md **ea,
702 struct obdo *src_oa, struct obd_trans_info *oti,
703 struct lov_request_set **reqset)
705 struct lov_obd *lov = &exp->exp_obd->u.lov;
706 struct lov_request_set *set;
707 int rc = 0, newea = 0;
710 OBD_ALLOC(set, sizeof(*set));
717 set->set_oa = src_oa;
720 if (set->set_md == NULL) {
721 int stripes, stripe_cnt;
722 stripe_cnt = lov_get_stripecnt(lov, 0);
724 /* If the MDS file was truncated up to some size, stripe over
725 * enough OSTs to allow the file to be created at that size. */
726 if (src_oa->o_valid & OBD_MD_FLSIZE) {
727 stripes = ((src_oa->o_size + LUSTRE_STRIPE_MAXBYTES) >> 12) - 1;
728 do_div(stripes, (__u32)(LUSTRE_STRIPE_MAXBYTES >> 12));
730 if (stripes > lov->desc.ld_active_tgt_count)
731 GOTO(out_set, rc = -EFBIG);
732 if (stripes < stripe_cnt)
733 stripes = stripe_cnt;
735 stripes = stripe_cnt;
738 rc = lov_alloc_memmd(&set->set_md, stripes,
739 lov->desc.ld_pattern ?
740 lov->desc.ld_pattern : LOV_PATTERN_RAID0);
746 rc = qos_prep_create(lov, set, newea);
750 if (oti && (src_oa->o_valid & OBD_MD_FLCOOKIE)) {
751 oti_alloc_cookies(oti, set->set_count);
752 if (!oti->oti_logcookies)
754 set->set_cookies = oti->oti_logcookies;
761 obd_free_memmd(exp, &set->set_md);
763 lov_fini_create_set(set, ea);
767 static int common_attr_done(struct lov_request_set *set)
769 struct list_head *pos;
770 struct lov_request *req;
772 int rc = 0, attrset = 0;
775 if (set->set_oa == NULL)
778 if (!set->set_success)
781 tmp_oa = obdo_alloc();
783 GOTO(out, rc = -ENOMEM);
785 list_for_each (pos, &set->set_list) {
786 req = list_entry(pos, struct lov_request, rq_link);
788 if (!req->rq_complete || req->rq_rc)
790 if (req->rq_oa->o_valid == 0) /* inactive stripe */
792 lov_merge_attrs(tmp_oa, req->rq_oa, req->rq_oa->o_valid,
793 set->set_md, req->rq_stripe, &attrset);
796 CERROR("No stripes had valid attrs\n");
799 tmp_oa->o_id = set->set_oa->o_id;
800 memcpy(set->set_oa, tmp_oa, sizeof(*set->set_oa));
808 static int brw_done(struct lov_request_set *set)
810 struct lov_stripe_md *lsm = set->set_md;
811 struct lov_oinfo *loi = NULL;
812 struct list_head *pos;
813 struct lov_request *req;
816 list_for_each (pos, &set->set_list) {
817 req = list_entry(pos, struct lov_request, rq_link);
819 if (!req->rq_complete || req->rq_rc)
822 loi = &lsm->lsm_oinfo[req->rq_stripe];
824 if (req->rq_oa->o_valid & OBD_MD_FLBLOCKS)
825 loi->loi_blocks = req->rq_oa->o_blocks;
831 int lov_fini_brw_set(struct lov_request_set *set)
836 LASSERT(set->set_exp);
839 if (set->set_completes) {
841 /* FIXME update qos data here */
843 if (atomic_dec_and_test(&set->set_refcount))
849 int lov_prep_brw_set(struct obd_export *exp, struct obdo *src_oa,
850 struct lov_stripe_md *lsm, obd_count oa_bufs,
851 struct brw_page *pga, struct obd_trans_info *oti,
852 struct lov_request_set **reqset)
859 struct lov_request_set *set;
860 struct lov_oinfo *loi = NULL;
861 struct lov_obd *lov = &exp->exp_obd->u.lov;
862 int rc = 0, i, shift;
865 OBD_ALLOC(set, sizeof(*set));
872 set->set_oa = src_oa;
874 set->set_oabufs = oa_bufs;
875 OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
877 GOTO(out, rc = -ENOMEM);
879 OBD_ALLOC(info, sizeof(*info) * lsm->lsm_stripe_count);
881 GOTO(out, rc = -ENOMEM);
883 /* calculate the page count for each stripe */
884 for (i = 0; i < oa_bufs; i++) {
885 int stripe = lov_stripe_number(lsm, pga[i].disk_offset);
886 info[stripe].count++;
889 /* alloc and initialize lov request */
890 loi = lsm->lsm_oinfo;
892 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
893 struct lov_request *req;
894 struct lov_tgt_desc *tgt;
896 if (info[i].count == 0)
899 tgt = lov->tgts + loi->loi_ost_idx;
900 if (!lov_tgt_ready(lov, tgt, loi->loi_ost_gen)) {
901 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
902 GOTO(out, rc = -EIO);
905 OBD_ALLOC(req, sizeof(*req));
907 lov_tgt_decref(lov, tgt);
908 GOTO(out, rc = -ENOMEM);
911 req->rq_oa = obdo_alloc();
912 if (req->rq_oa == NULL) {
913 OBD_FREE(req, sizeof(*req));
914 lov_tgt_decref(lov, tgt);
915 GOTO(out, rc = -ENOMEM);
919 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
920 req->rq_oa->o_id = loi->loi_id;
921 req->rq_buflen = sizeof(*req->rq_md);
922 OBD_ALLOC(req->rq_md, req->rq_buflen);
923 if (req->rq_md == NULL) {
924 obdo_free(req->rq_oa);
925 OBD_FREE(req, sizeof(*req));
926 lov_tgt_decref(lov, tgt);
927 GOTO(out, rc = -ENOMEM);
930 req->rq_idx = loi->loi_ost_idx;
931 req->rq_gen = loi->loi_ost_gen;
934 /* XXX LOV STACKING */
935 req->rq_md->lsm_object_id = loi->loi_id;
936 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
937 req->rq_oabufs = info[i].count;
938 req->rq_pgaidx = shift;
939 shift += req->rq_oabufs;
941 /* remember the index for sort brw_page array */
942 info[i].index = req->rq_pgaidx;
943 lov_set_add_req(req, set);
946 GOTO(out, rc = -EIO);
948 /* rotate & sort the brw_page array */
949 for (i = 0; i < oa_bufs; i++) {
950 int stripe = lov_stripe_number(lsm, pga[i].disk_offset);
952 shift = info[stripe].index + info[stripe].off;
953 LASSERT(shift < oa_bufs);
954 set->set_pga[shift] = pga[i];
955 lov_stripe_offset(lsm, pga[i].disk_offset, stripe,
956 &set->set_pga[shift].disk_offset);
961 OBD_FREE(info, sizeof(*info) * lsm->lsm_stripe_count);
966 lov_fini_brw_set(set);
971 static int getattr_done(struct lov_request_set *set)
973 return common_attr_done(set);
976 int lov_fini_getattr_set(struct lov_request_set *set)
981 LASSERT(set->set_exp);
984 if (set->set_completes)
985 rc = getattr_done(set);
987 if (atomic_dec_and_test(&set->set_refcount))
993 int lov_prep_getattr_set(struct obd_export *exp, struct obdo *src_oa,
994 struct lov_stripe_md *lsm,
995 struct lov_request_set **reqset)
997 struct lov_request_set *set;
998 struct lov_oinfo *loi = NULL;
999 struct lov_obd *lov = &exp->exp_obd->u.lov;
1003 OBD_ALLOC(set, sizeof(*set));
1010 set->set_oa = src_oa;
1012 loi = lsm->lsm_oinfo;
1013 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1014 struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
1015 struct lov_request *req;
1017 if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
1018 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1022 OBD_ALLOC(req, sizeof(*req));
1024 lov_tgt_decref(lov, tgt);
1025 GOTO(out_set, rc = -ENOMEM);
1029 req->rq_idx = loi->loi_ost_idx;
1030 req->rq_gen = loi->loi_ost_gen;
1032 req->rq_oa = obdo_alloc();
1033 if (req->rq_oa == NULL) {
1034 OBD_FREE(req, sizeof(*req));
1035 lov_tgt_decref(lov, tgt);
1036 GOTO(out_set, rc = -ENOMEM);
1038 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1039 req->rq_oa->o_id = loi->loi_id;
1040 lov_set_add_req(req, set);
1042 if (!set->set_count)
1043 GOTO(out_set, rc = -EIO);
1047 lov_fini_getattr_set(set);
1051 int lov_fini_destroy_set(struct lov_request_set *set)
1055 LASSERT(set->set_exp);
1058 if (set->set_completes) {
1059 /* FIXME update qos data here */
1062 if (atomic_dec_and_test(&set->set_refcount))
1063 lov_finish_set(set);
1068 int lov_prep_destroy_set(struct obd_export *exp, struct obdo *src_oa,
1069 struct lov_stripe_md *lsm,
1070 struct obd_trans_info *oti,
1071 struct lov_request_set **reqset)
1073 struct lov_request_set *set;
1074 struct lov_oinfo *loi = NULL;
1075 struct lov_obd *lov = &exp->exp_obd->u.lov;
1076 int rc = 0, cookie_set = 0, i;
1079 OBD_ALLOC(set, sizeof(*set));
1086 set->set_oa = src_oa;
1088 if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1089 set->set_cookies = oti->oti_logcookies;
1091 loi = lsm->lsm_oinfo;
1092 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1093 struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
1094 struct lov_request *req;
1096 if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
1097 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1101 OBD_ALLOC(req, sizeof(*req));
1103 lov_tgt_decref(lov, tgt);
1104 GOTO(out_set, rc = -ENOMEM);
1108 req->rq_idx = loi->loi_ost_idx;
1109 req->rq_gen = loi->loi_ost_gen;
1111 req->rq_oa = obdo_alloc();
1112 if (req->rq_oa == NULL) {
1113 OBD_FREE(req, sizeof(*req));
1114 lov_tgt_decref(lov, tgt);
1115 GOTO(out_set, rc = -ENOMEM);
1118 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1119 req->rq_oa->o_id = loi->loi_id;
1121 /* Setup the first request's cookie position */
1122 if (!cookie_set && set->set_cookies) {
1123 oti->oti_logcookies = set->set_cookies + i;
1126 lov_set_add_req(req, set);
1128 if (!set->set_count)
1129 GOTO(out_set, rc = -EIO);
1133 lov_fini_destroy_set(set);
1137 static int setattr_done(struct lov_request_set *set)
1139 return common_attr_done(set);
1142 int lov_fini_setattr_set(struct lov_request_set *set)
1147 LASSERT(set->set_exp);
1150 if (set->set_completes) {
1151 rc = setattr_done(set);
1152 /* FIXME update qos data here */
1155 if (atomic_dec_and_test(&set->set_refcount))
1156 lov_finish_set(set);
1160 int lov_prep_setattr_set(struct obd_export *exp, struct obdo *src_oa,
1161 struct lov_stripe_md *lsm, struct obd_trans_info *oti,
1162 struct lov_request_set **reqset)
1164 struct lov_request_set *set;
1165 struct lov_oinfo *loi = NULL;
1166 struct lov_obd *lov = &exp->exp_obd->u.lov;
1170 OBD_ALLOC(set, sizeof(*set));
1177 set->set_oa = src_oa;
1179 loi = lsm->lsm_oinfo;
1180 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1181 struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
1182 struct lov_request *req;
1184 if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
1185 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1189 OBD_ALLOC(req, sizeof(*req));
1191 lov_tgt_decref(lov, tgt);
1192 GOTO(out_set, rc = -ENOMEM);
1196 req->rq_idx = loi->loi_ost_idx;
1197 req->rq_gen = loi->loi_ost_gen;
1199 req->rq_oa = obdo_alloc();
1200 if (req->rq_oa == NULL) {
1201 OBD_FREE(req, sizeof(*req));
1202 lov_tgt_decref(lov, tgt);
1203 GOTO(out_set, rc = -ENOMEM);
1206 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1207 req->rq_oa->o_id = loi->loi_id;
1208 LASSERT(!(req->rq_oa->o_valid & OBD_MD_FLGROUP) || req->rq_oa->o_gr>0);
1210 if (src_oa->o_valid & OBD_MD_FLSIZE) {
1211 if (lov_stripe_offset(lsm, src_oa->o_size, i,
1212 &req->rq_oa->o_size) < 0 &&
1214 req->rq_oa->o_size--;
1215 CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1216 i, req->rq_oa->o_size, src_oa->o_size);
1218 lov_set_add_req(req, set);
1220 if (!set->set_count)
1221 GOTO(out_set, rc = -EIO);
1225 lov_fini_setattr_set(set);
1229 int lov_update_punch_set(struct lov_request_set *set, struct lov_request *req,
1232 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
1235 lov_update_set(set, req, rc);
1237 struct lov_tgt_desc *tgt = lov->tgts + req->rq_idx;
1239 if (lov_tgt_active(lov, tgt, req->rq_gen))
1240 lov_tgt_decref(lov, tgt);
1245 /* FIXME in raid1 regime, should return 0 */
1249 int lov_fini_punch_set(struct lov_request_set *set)
1254 LASSERT(set->set_exp);
1257 if (set->set_completes) {
1258 if (!set->set_success)
1260 /* FIXME update qos data here */
1263 if (atomic_dec_and_test(&set->set_refcount))
1264 lov_finish_set(set);
1269 int lov_prep_punch_set(struct obd_export *exp, struct obdo *src_oa,
1270 struct lov_stripe_md *lsm, obd_off start,
1271 obd_off end, struct obd_trans_info *oti,
1272 struct lov_request_set **reqset)
1274 struct lov_request_set *set;
1275 struct lov_oinfo *loi = NULL;
1276 struct lov_obd *lov = &exp->exp_obd->u.lov;
1280 OBD_ALLOC(set, sizeof(*set));
1287 set->set_oa = src_oa;
1289 loi = lsm->lsm_oinfo;
1290 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1291 struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
1292 struct lov_request *req;
1295 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1298 if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
1299 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1303 OBD_ALLOC(req, sizeof(*req));
1305 lov_tgt_decref(lov, tgt);
1306 GOTO(out_set, rc = -ENOMEM);
1310 req->rq_idx = loi->loi_ost_idx;
1311 req->rq_gen = loi->loi_ost_gen;
1313 req->rq_oa = obdo_alloc();
1314 if (req->rq_oa == NULL) {
1315 OBD_FREE(req, sizeof(*req));
1316 lov_tgt_decref(lov, tgt);
1317 GOTO(out_set, rc = -ENOMEM);
1320 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1321 req->rq_oa->o_id = loi->loi_id;
1322 req->rq_oa->o_gr = loi->loi_gr;
1323 req->rq_oa->o_valid |= OBD_MD_FLGROUP;
1325 req->rq_extent.start = rs;
1326 req->rq_extent.end = re;
1328 lov_set_add_req(req, set);
1330 if (!set->set_count)
1331 GOTO(out_set, rc = -EIO);
1335 lov_fini_punch_set(set);
1339 int lov_fini_sync_set(struct lov_request_set *set)
1344 LASSERT(set->set_exp);
1347 if (set->set_completes) {
1348 if (!set->set_success)
1350 /* FIXME update qos data here */
1353 if (atomic_dec_and_test(&set->set_refcount))
1354 lov_finish_set(set);
1359 int lov_prep_sync_set(struct obd_export *exp, struct obdo *src_oa,
1360 struct lov_stripe_md *lsm, obd_off start,
1361 obd_off end, struct lov_request_set **reqset)
1363 struct lov_request_set *set;
1364 struct lov_oinfo *loi = NULL;
1365 struct lov_obd *lov = &exp->exp_obd->u.lov;
1369 OBD_ALLOC(set, sizeof(*set));
1376 set->set_oa = src_oa;
1378 loi = lsm->lsm_oinfo;
1379 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1380 struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
1381 struct lov_request *req;
1384 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1387 if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
1388 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1392 OBD_ALLOC(req, sizeof(*req));
1394 lov_tgt_decref(lov, tgt);
1395 GOTO(out_set, rc = -ENOMEM);
1399 req->rq_idx = loi->loi_ost_idx;
1400 req->rq_gen = loi->loi_ost_gen;
1402 req->rq_oa = obdo_alloc();
1403 if (req->rq_oa == NULL) {
1404 OBD_FREE(req, sizeof(*req));
1405 lov_tgt_decref(lov, tgt);
1406 GOTO(out_set, rc = -ENOMEM);
1408 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1409 req->rq_oa->o_id = loi->loi_id;
1410 req->rq_extent.start = rs;
1411 req->rq_extent.end = re;
1412 lov_set_add_req(req, set);
1414 if (!set->set_count)
1415 GOTO(out_set, rc = -EIO);
1419 lov_fini_sync_set(set);