1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
6 * This file is part of Lustre, http://www.lustre.org.
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 # define EXPORT_SYMTAB
25 #define DEBUG_SUBSYSTEM S_LOV
28 #include <asm/div64.h>
30 #include <liblustre.h>
33 #include <linux/obd_class.h>
34 #include <linux/obd_lov.h>
35 #include <linux/lustre_idl.h>
37 #include "lov_internal.h"
39 static void lov_init_set(struct lov_request_set *set)
42 set->set_completes = 0;
44 INIT_LIST_HEAD(&set->set_list);
45 atomic_set(&set->set_refcount, 1);
48 static void lov_finish_set(struct lov_request_set *set)
50 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
51 struct list_head *pos, *n;
55 list_for_each_safe(pos, n, &set->set_list) {
56 struct lov_request *req;
57 struct lov_tgt_desc *tgt;
59 req = list_entry(pos, struct lov_request, rq_link);
60 LASSERT(req->rq_idx >= 0);
62 tgt = lov->tgts + req->rq_idx;
63 lov_tgt_decref(lov, tgt);
65 list_del_init(&req->rq_link);
68 obdo_free(req->rq_oa);
70 OBD_FREE(req->rq_md, req->rq_buflen);
71 OBD_FREE(req, sizeof(*req));
75 int len = set->set_oabufs * sizeof(*set->set_pga);
76 OBD_FREE(set->set_pga, len);
79 lov_llh_put(set->set_lockh);
81 OBD_FREE(set, sizeof(*set));
85 static void lov_update_set(struct lov_request_set *set,
86 struct lov_request *req, int rc)
96 int lov_update_common_set(struct lov_request_set *set,
97 struct lov_request *req, int rc)
99 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
102 lov_update_set(set, req, rc);
104 /* grace error on inactive ost */
106 struct lov_tgt_desc *tgt = lov->tgts + req->rq_idx;
108 if (lov_tgt_active(lov, tgt, req->rq_gen))
109 lov_tgt_decref(lov, tgt);
114 /* FIXME in raid1 regime, should return 0 */
118 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
120 list_add_tail(&req->rq_link, &set->set_list);
124 int lov_update_enqueue_set(struct lov_request_set *set,
125 struct lov_request *req, int rc, int flags)
127 struct lustre_handle *lov_lockhp;
128 struct lov_oinfo *loi;
131 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
132 loi = &set->set_md->lsm_oinfo[req->rq_stripe];
134 /* XXX FIXME: This unpleasantness doesn't belong here at *all*.
135 * It belongs in the OSC, except that the OSC doesn't have
136 * access to the real LOI -- it gets a copy, that we created
137 * above, and that copy can be arbitrarily out of date.
139 * The LOV API is due for a serious rewriting anyways, and this
140 * can be addressed then. */
141 if (rc == ELDLM_OK) {
142 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
143 __u64 tmp = req->rq_md->lsm_oinfo->loi_rss;
145 LASSERT(lock != NULL);
147 loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
148 loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks;
149 /* Extend KMS up to the end of this lock and no further
150 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
151 if (tmp > lock->l_policy_data.l_extent.end)
152 tmp = lock->l_policy_data.l_extent.end + 1;
153 if (tmp >= loi->loi_kms) {
154 CDEBUG(D_INODE, "lock acquired, setting rss="
155 LPU64", kms="LPU64"\n", loi->loi_rss, tmp);
157 loi->loi_kms_valid = 1;
159 CDEBUG(D_INODE, "lock acquired, setting rss="
160 LPU64"; leaving kms="LPU64", end="LPU64
161 "\n", loi->loi_rss, loi->loi_kms,
162 lock->l_policy_data.l_extent.end);
164 ldlm_lock_allow_match(lock);
166 } else if (rc == ELDLM_LOCK_ABORTED && flags & LDLM_FL_HAS_INTENT) {
167 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
168 loi->loi_rss = req->rq_md->lsm_oinfo->loi_rss;
169 loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
170 loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks;
171 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
172 " kms="LPU64"\n", loi->loi_rss, loi->loi_kms);
175 struct obd_export *exp = set->set_exp;
176 struct lov_obd *lov = &exp->exp_obd->u.lov;
177 struct lov_tgt_desc *tgt = lov->tgts + req->rq_idx;
179 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
180 if (lov_tgt_ready(lov, tgt, req->rq_gen)) {
181 lov_tgt_decref(lov, tgt);
182 CERROR("error: enqueue objid "LPX64" subobj "
183 LPX64" on OST idx %d: rc = %d\n",
184 set->set_md->lsm_object_id, loi->loi_id,
185 loi->loi_ost_idx, rc);
187 CERROR("error: enqueue objid "LPX64" subobj "
188 LPX64" on OST idx %d: rc = %d, NOT ACTIVE !\n",
189 set->set_md->lsm_object_id, loi->loi_id,
190 loi->loi_ost_idx, rc);
194 lov_update_set(set, req, rc);
198 static int enqueue_done(struct lov_request_set *set, __u32 mode, int flags)
200 struct list_head *pos;
201 struct lov_request *req;
202 struct lustre_handle *lov_lockhp = NULL;
203 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
204 struct lov_tgt_desc *tgt;
208 LASSERT(set->set_completes);
209 /* enqueue/match success, just return */
210 if (set->set_completes == set->set_success) {
211 if (flags & LDLM_FL_TEST_LOCK)
212 lov_llh_put(set->set_lockh);
216 /* cancel enqueued/matched locks */
217 list_for_each (pos, &set->set_list) {
218 req = list_entry(pos, struct lov_request, rq_link);
220 if (!req->rq_complete || req->rq_rc)
222 if (flags & LDLM_FL_TEST_LOCK)
225 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
227 if (lov_lockhp->cookie == 0)
230 tgt = lov->tgts + req->rq_idx;
231 rc = obd_cancel(tgt->ltd_exp, req->rq_md, mode, lov_lockhp);
232 if (rc && lov_tgt_ready(lov, tgt, req->rq_gen)) {
233 lov_tgt_decref(lov, tgt);
234 CERROR("cancelling obdjid "LPX64" on OST "
235 "idx %d error: rc = %d\n",
236 req->rq_md->lsm_object_id, req->rq_idx, rc);
239 lov_llh_put(set->set_lockh);
243 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode)
248 LASSERT(set->set_exp);
251 if (set->set_completes)
252 rc = enqueue_done(set, mode, 0);
254 lov_llh_put(set->set_lockh);
256 if (atomic_dec_and_test(&set->set_refcount))
262 int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm,
263 ldlm_policy_data_t *policy, __u32 mode,
264 struct lustre_handle *lockh,
265 struct lov_request_set **reqset)
267 struct lov_obd *lov = &exp->exp_obd->u.lov;
268 struct lov_request_set *set;
270 struct lov_oinfo *loi;
273 OBD_ALLOC(set, sizeof(*set));
280 set->set_lockh = lov_llh_new(lsm);
281 if (set->set_lockh == NULL)
282 GOTO(out_set, rc = -ENOMEM);
283 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
285 loi = lsm->lsm_oinfo;
286 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
287 struct lov_tgt_desc *tgt;
288 struct lov_request *req;
291 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
292 policy->l_extent.end, &start, &end))
295 tgt = lov->tgts + loi->loi_ost_idx;
296 if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
297 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
301 OBD_ALLOC(req, sizeof(*req));
303 lov_tgt_decref(lov, tgt);
304 GOTO(out_set, rc = -ENOMEM);
307 req->rq_buflen = sizeof(*req->rq_md) +
308 sizeof(struct lov_oinfo);
309 OBD_ALLOC(req->rq_md, req->rq_buflen);
310 if (req->rq_md == NULL) {
311 OBD_FREE(req, sizeof(*req));
312 lov_tgt_decref(lov, tgt);
313 GOTO(out_set, rc = -ENOMEM);
316 req->rq_extent.start = start;
317 req->rq_extent.end = end;
318 req->rq_extent.gid = policy->l_extent.gid;
320 req->rq_idx = loi->loi_ost_idx;
321 req->rq_gen = loi->loi_ost_gen;
324 /* XXX LOV STACKING: submd should be from the subobj */
325 req->rq_md->lsm_object_id = loi->loi_id;
326 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
327 req->rq_md->lsm_stripe_count = 0;
328 req->rq_md->lsm_oinfo->loi_kms_valid = loi->loi_kms_valid;
329 req->rq_md->lsm_oinfo->loi_rss = loi->loi_rss;
330 req->rq_md->lsm_oinfo->loi_kms = loi->loi_kms;
331 req->rq_md->lsm_oinfo->loi_blocks = loi->loi_blocks;
332 loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
334 lov_set_add_req(req, set);
337 GOTO(out_set, rc = -EIO);
341 lov_fini_enqueue_set(set, mode);
345 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
349 lov_update_set(set, req, !rc);
353 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
358 LASSERT(set->set_exp);
361 if (set->set_completes)
362 rc = enqueue_done(set, mode, flags);
364 lov_llh_put(set->set_lockh);
366 if (atomic_dec_and_test(&set->set_refcount))
372 int lov_prep_match_set(struct obd_export *exp, struct lov_stripe_md *lsm,
373 ldlm_policy_data_t *policy, __u32 mode,
374 struct lustre_handle *lockh,
375 struct lov_request_set **reqset)
377 struct lov_obd *lov = &exp->exp_obd->u.lov;
378 struct lov_request_set *set;
380 struct lov_oinfo *loi;
383 OBD_ALLOC(set, sizeof(*set));
390 set->set_lockh = lov_llh_new(lsm);
391 if (set->set_lockh == NULL)
392 GOTO(out_set, rc = -ENOMEM);
393 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
395 loi = lsm->lsm_oinfo;
396 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
397 struct lov_tgt_desc *tgt;
398 struct lov_request *req;
401 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
402 policy->l_extent.end, &start, &end))
405 /* FIXME raid1 should grace this error */
406 tgt = lov->tgts + loi->loi_ost_idx;
407 if (!lov_tgt_ready(lov, tgt, loi->loi_ost_gen)) {
408 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
409 GOTO(out_set, rc = -EIO);
412 OBD_ALLOC(req, sizeof(*req));
414 lov_tgt_decref(lov, tgt);
415 GOTO(out_set, rc = -ENOMEM);
418 req->rq_buflen = sizeof(*req->rq_md);
419 OBD_ALLOC(req->rq_md, req->rq_buflen);
420 if (req->rq_md == NULL) {
421 OBD_FREE(req, sizeof(*req));
422 lov_tgt_decref(lov, tgt);
423 GOTO(out_set, rc = -ENOMEM);
426 req->rq_extent.start = start;
427 req->rq_extent.end = end;
428 req->rq_extent.gid = policy->l_extent.gid;
430 req->rq_idx = loi->loi_ost_idx;
431 req->rq_gen = loi->loi_ost_gen;
434 /* XXX LOV STACKING: submd should be from the subobj */
435 req->rq_md->lsm_object_id = loi->loi_id;
436 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
437 req->rq_md->lsm_stripe_count = 0;
438 lov_set_add_req(req, set);
441 GOTO(out_set, rc = -EIO);
445 lov_fini_match_set(set, mode, 0);
449 int lov_fini_cancel_set(struct lov_request_set *set)
454 LASSERT(set->set_exp);
459 lov_llh_put(set->set_lockh);
461 if (atomic_dec_and_test(&set->set_refcount))
467 int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm,
468 __u32 mode, struct lustre_handle *lockh,
469 struct lov_request_set **reqset)
471 struct lov_obd *lov = &exp->exp_obd->u.lov;
472 struct lov_request_set *set;
474 struct lov_oinfo *loi;
477 OBD_ALLOC(set, sizeof(*set));
484 set->set_lockh = lov_handle2llh(lockh);
485 if (set->set_lockh == NULL) {
486 CERROR("LOV: invalid lov lock handle %p\n", lockh);
487 GOTO(out_set, rc = -EINVAL);
489 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
491 loi = lsm->lsm_oinfo;
492 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
493 struct lov_tgt_desc *tgt;
494 struct lov_request *req;
495 struct lustre_handle *lov_lockhp;
497 lov_lockhp = set->set_lockh->llh_handles + i;
498 if (lov_lockhp->cookie == 0) {
499 CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
500 loi->loi_ost_idx, loi->loi_id);
504 tgt = lov->tgts + loi->loi_ost_idx;
505 if (!lov_tgt_ready(lov, tgt, loi->loi_ost_gen)) {
506 CERROR("lov idx %d subobj "LPX64" osc inactive?\n",
507 loi->loi_ost_idx, loi->loi_id);
511 OBD_ALLOC(req, sizeof(*req));
513 lov_tgt_decref(lov, tgt);
514 GOTO(out_set, rc = -ENOMEM);
517 req->rq_buflen = sizeof(*req->rq_md);
518 OBD_ALLOC(req->rq_md, req->rq_buflen);
519 if (req->rq_md == NULL) {
520 OBD_FREE(req, sizeof(*req));
521 lov_tgt_decref(lov, tgt);
522 GOTO(out_set, rc = -ENOMEM);
525 req->rq_idx = loi->loi_ost_idx;
526 req->rq_gen = loi->loi_ost_gen;
529 /* XXX LOV STACKING: submd should be from the subobj */
530 req->rq_md->lsm_object_id = loi->loi_id;
531 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
532 req->rq_md->lsm_stripe_count = 0;
533 lov_set_add_req(req, set);
536 GOTO(out_set, rc = -EIO);
540 lov_fini_cancel_set(set);
544 static int create_done(struct obd_export *exp, struct lov_request_set *set,
545 struct lov_stripe_md **ea)
547 struct lov_obd *lov = &exp->exp_obd->u.lov;
548 struct obd_trans_info *oti = set->set_oti;
549 struct obdo *src_oa = set->set_oa;
550 struct list_head *pos;
551 struct lov_request *req;
552 struct obdo *ret_oa = NULL;
553 int attrset = 0, rc = 0;
556 LASSERT(set->set_completes);
558 if (!set->set_success)
559 GOTO(cleanup, rc = -EIO);
561 if (*ea == NULL && set->set_count != set->set_success) {
562 set->set_count = set->set_success;
566 ret_oa = obdo_alloc();
568 GOTO(cleanup, rc = -ENOMEM);
570 list_for_each (pos, &set->set_list) {
571 req = list_entry(pos, struct lov_request, rq_link);
572 if (!req->rq_complete || req->rq_rc)
574 lov_merge_attrs(ret_oa, req->rq_oa, req->rq_oa->o_valid,
575 set->set_md, req->rq_stripe, &attrset);
577 if (src_oa->o_valid & OBD_MD_FLSIZE &&
578 ret_oa->o_size != src_oa->o_size) {
579 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
580 src_oa->o_size, ret_oa->o_size);
583 ret_oa->o_id = src_oa->o_id;
584 ret_oa->o_gr = src_oa->o_gr;
585 ret_oa->o_valid |= OBD_MD_FLGROUP;
586 memcpy(src_oa, ret_oa, sizeof(*src_oa));
594 list_for_each (pos, &set->set_list) {
595 struct lov_tgt_desc *tgt;
597 req = list_entry(pos, struct lov_request, rq_link);
599 if (!req->rq_complete || req->rq_rc)
602 tgt = lov->tgts + req->rq_idx;
603 if (!lov_tgt_ready(lov, tgt, req->rq_gen)) {
604 CERROR("Failed to uncreate objid "LPX64" subobj "
605 LPX64" on OST idx %d: osc inactive.\n",
606 set->set_oa->o_id, req->rq_oa->o_id,
611 err = obd_destroy(tgt->ltd_exp, req->rq_oa, NULL, oti);
612 lov_tgt_decref(lov, tgt);
614 CERROR("Failed to uncreate objid "LPX64" subobj "
615 LPX64" on OST idx %d: rc = %d\n",
616 set->set_oa->o_id, req->rq_oa->o_id,
620 obd_free_memmd(exp, &set->set_md);
622 if (oti && set->set_cookies) {
623 oti->oti_logcookies = set->set_cookies;
624 if (!set->set_cookie_sent) {
625 oti_free_cookies(oti);
626 src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
628 src_oa->o_valid |= OBD_MD_FLCOOKIE;
634 int lov_fini_create_set(struct lov_request_set *set, struct lov_stripe_md **ea)
639 LASSERT(set->set_exp);
642 if (set->set_completes) {
643 rc = create_done(set->set_exp, set, ea);
644 /* FIXME update qos data here */
647 if (atomic_dec_and_test(&set->set_refcount))
653 int lov_update_create_set(struct lov_request_set *set,
654 struct lov_request *req, int rc)
656 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
657 struct obd_trans_info *oti = set->set_oti;
658 struct lov_stripe_md *lsm = set->set_md;
659 struct lov_oinfo *loi;
660 struct lov_tgt_desc *tgt;
663 req->rq_stripe = set->set_success;
664 loi = &lsm->lsm_oinfo[req->rq_stripe];
665 tgt = lov->tgts + req->rq_idx;
667 if (rc && lov_tgt_ready(lov, tgt, req->rq_gen)) {
668 lov_tgt_decref(lov, tgt);
669 CERROR("error creating objid "LPX64" sub-object"
670 " on OST idx %d/%d: rc = %d\n",
671 set->set_oa->o_id, req->rq_idx,
672 lsm->lsm_stripe_count, rc);
674 CERROR("obd_create returned invalid err %d\n", rc);
678 lov_update_set(set, req, rc);
682 if (oti && oti->oti_objid)
683 oti->oti_objid[req->rq_idx] = req->rq_oa->o_id;
685 loi->loi_id = req->rq_oa->o_id;
686 loi->loi_gr = req->rq_oa->o_gr;
687 loi->loi_ost_idx = req->rq_idx;
688 loi->loi_ost_gen = req->rq_gen;
689 CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPX64" at "
691 lsm->lsm_object_id, loi->loi_id, loi->loi_id,
692 req->rq_idx, req->rq_gen);
695 if (set->set_cookies)
696 ++oti->oti_logcookies;
697 if (req->rq_oa->o_valid & OBD_MD_FLCOOKIE)
698 set->set_cookie_sent++;
703 int lov_prep_create_set(struct obd_export *exp, struct lov_stripe_md **ea,
704 struct obdo *src_oa, struct obd_trans_info *oti,
705 struct lov_request_set **reqset)
707 struct lov_obd *lov = &exp->exp_obd->u.lov;
708 struct lov_request_set *set;
709 int rc = 0, newea = 0;
712 OBD_ALLOC(set, sizeof(*set));
719 set->set_oa = src_oa;
722 if (set->set_md == NULL) {
723 int stripes, stripe_cnt;
724 stripe_cnt = lov_get_stripecnt(lov, 0);
726 /* If the MDS file was truncated up to some size, stripe over
727 * enough OSTs to allow the file to be created at that size. */
728 if (src_oa->o_valid & OBD_MD_FLSIZE) {
729 stripes = ((src_oa->o_size + LUSTRE_STRIPE_MAXBYTES) >> 12) - 1;
730 do_div(stripes, (__u32)(LUSTRE_STRIPE_MAXBYTES >> 12));
732 if (stripes > lov->desc.ld_active_tgt_count)
733 GOTO(out_set, rc = -EFBIG);
734 if (stripes < stripe_cnt)
735 stripes = stripe_cnt;
737 stripes = stripe_cnt;
740 rc = lov_alloc_memmd(&set->set_md, stripes,
741 lov->desc.ld_pattern ?
742 lov->desc.ld_pattern : LOV_PATTERN_RAID0);
748 rc = qos_prep_create(lov, set, newea);
752 if (oti && (src_oa->o_valid & OBD_MD_FLCOOKIE)) {
753 oti_alloc_cookies(oti, set->set_count);
754 if (!oti->oti_logcookies)
756 set->set_cookies = oti->oti_logcookies;
763 obd_free_memmd(exp, &set->set_md);
765 lov_fini_create_set(set, ea);
769 static int common_attr_done(struct lov_request_set *set)
771 struct list_head *pos;
772 struct lov_request *req;
774 int rc = 0, attrset = 0;
777 if (set->set_oa == NULL)
780 if (!set->set_success)
783 tmp_oa = obdo_alloc();
785 GOTO(out, rc = -ENOMEM);
787 list_for_each (pos, &set->set_list) {
788 req = list_entry(pos, struct lov_request, rq_link);
790 if (!req->rq_complete || req->rq_rc)
792 if (req->rq_oa->o_valid == 0) /* inactive stripe */
794 lov_merge_attrs(tmp_oa, req->rq_oa, req->rq_oa->o_valid,
795 set->set_md, req->rq_stripe, &attrset);
798 CERROR("No stripes had valid attrs\n");
801 tmp_oa->o_id = set->set_oa->o_id;
802 memcpy(set->set_oa, tmp_oa, sizeof(*set->set_oa));
810 static int brw_done(struct lov_request_set *set)
812 struct lov_stripe_md *lsm = set->set_md;
813 struct lov_oinfo *loi = NULL;
814 struct list_head *pos;
815 struct lov_request *req;
818 list_for_each (pos, &set->set_list) {
819 req = list_entry(pos, struct lov_request, rq_link);
821 if (!req->rq_complete || req->rq_rc)
824 loi = &lsm->lsm_oinfo[req->rq_stripe];
826 if (req->rq_oa->o_valid & OBD_MD_FLBLOCKS)
827 loi->loi_blocks = req->rq_oa->o_blocks;
833 int lov_fini_brw_set(struct lov_request_set *set)
838 LASSERT(set->set_exp);
841 if (set->set_completes) {
843 /* FIXME update qos data here */
845 if (atomic_dec_and_test(&set->set_refcount))
851 int lov_prep_brw_set(struct obd_export *exp, struct obdo *src_oa,
852 struct lov_stripe_md *lsm, obd_count oa_bufs,
853 struct brw_page *pga, struct obd_trans_info *oti,
854 struct lov_request_set **reqset)
861 struct lov_request_set *set;
862 struct lov_oinfo *loi = NULL;
863 struct lov_obd *lov = &exp->exp_obd->u.lov;
864 int rc = 0, i, shift;
867 OBD_ALLOC(set, sizeof(*set));
874 set->set_oa = src_oa;
876 set->set_oabufs = oa_bufs;
877 OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
879 GOTO(out, rc = -ENOMEM);
881 OBD_ALLOC(info, sizeof(*info) * lsm->lsm_stripe_count);
883 GOTO(out, rc = -ENOMEM);
885 /* calculate the page count for each stripe */
886 for (i = 0; i < oa_bufs; i++) {
887 int stripe = lov_stripe_number(lsm, pga[i].disk_offset);
888 info[stripe].count++;
891 /* alloc and initialize lov request */
892 loi = lsm->lsm_oinfo;
894 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
895 struct lov_request *req;
896 struct lov_tgt_desc *tgt;
898 if (info[i].count == 0)
901 tgt = lov->tgts + loi->loi_ost_idx;
902 if (!lov_tgt_ready(lov, tgt, loi->loi_ost_gen)) {
903 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
904 GOTO(out, rc = -EIO);
907 OBD_ALLOC(req, sizeof(*req));
909 lov_tgt_decref(lov, tgt);
910 GOTO(out, rc = -ENOMEM);
913 req->rq_oa = obdo_alloc();
914 if (req->rq_oa == NULL) {
915 OBD_FREE(req, sizeof(*req));
916 lov_tgt_decref(lov, tgt);
917 GOTO(out, rc = -ENOMEM);
921 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
922 req->rq_oa->o_id = loi->loi_id;
923 req->rq_buflen = sizeof(*req->rq_md);
924 OBD_ALLOC(req->rq_md, req->rq_buflen);
925 if (req->rq_md == NULL) {
926 obdo_free(req->rq_oa);
927 OBD_FREE(req, sizeof(*req));
928 lov_tgt_decref(lov, tgt);
929 GOTO(out, rc = -ENOMEM);
932 req->rq_idx = loi->loi_ost_idx;
933 req->rq_gen = loi->loi_ost_gen;
936 /* XXX LOV STACKING */
937 req->rq_md->lsm_object_id = loi->loi_id;
938 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
939 req->rq_oabufs = info[i].count;
940 req->rq_pgaidx = shift;
941 shift += req->rq_oabufs;
943 /* remember the index for sort brw_page array */
944 info[i].index = req->rq_pgaidx;
945 lov_set_add_req(req, set);
948 GOTO(out, rc = -EIO);
950 /* rotate & sort the brw_page array */
951 for (i = 0; i < oa_bufs; i++) {
952 int stripe = lov_stripe_number(lsm, pga[i].disk_offset);
954 shift = info[stripe].index + info[stripe].off;
955 LASSERT(shift < oa_bufs);
956 set->set_pga[shift] = pga[i];
957 lov_stripe_offset(lsm, pga[i].disk_offset, stripe,
958 &set->set_pga[shift].disk_offset);
963 OBD_FREE(info, sizeof(*info) * lsm->lsm_stripe_count);
968 lov_fini_brw_set(set);
973 static int getattr_done(struct lov_request_set *set)
975 return common_attr_done(set);
978 int lov_fini_getattr_set(struct lov_request_set *set)
983 LASSERT(set->set_exp);
986 if (set->set_completes)
987 rc = getattr_done(set);
989 if (atomic_dec_and_test(&set->set_refcount))
995 int lov_prep_getattr_set(struct obd_export *exp, struct obdo *src_oa,
996 struct lov_stripe_md *lsm,
997 struct lov_request_set **reqset)
999 struct lov_request_set *set;
1000 struct lov_oinfo *loi = NULL;
1001 struct lov_obd *lov = &exp->exp_obd->u.lov;
1005 OBD_ALLOC(set, sizeof(*set));
1012 set->set_oa = src_oa;
1014 loi = lsm->lsm_oinfo;
1015 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1016 struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
1017 struct lov_request *req;
1019 if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
1020 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1024 OBD_ALLOC(req, sizeof(*req));
1026 lov_tgt_decref(lov, tgt);
1027 GOTO(out_set, rc = -ENOMEM);
1031 req->rq_idx = loi->loi_ost_idx;
1032 req->rq_gen = loi->loi_ost_gen;
1034 req->rq_oa = obdo_alloc();
1035 if (req->rq_oa == NULL) {
1036 OBD_FREE(req, sizeof(*req));
1037 lov_tgt_decref(lov, tgt);
1038 GOTO(out_set, rc = -ENOMEM);
1040 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1041 req->rq_oa->o_id = loi->loi_id;
1042 lov_set_add_req(req, set);
1044 if (!set->set_count)
1045 GOTO(out_set, rc = -EIO);
1049 lov_fini_getattr_set(set);
1053 int lov_fini_destroy_set(struct lov_request_set *set)
1057 LASSERT(set->set_exp);
1060 if (set->set_completes) {
1061 /* FIXME update qos data here */
1064 if (atomic_dec_and_test(&set->set_refcount))
1065 lov_finish_set(set);
1070 int lov_prep_destroy_set(struct obd_export *exp, struct obdo *src_oa,
1071 struct lov_stripe_md *lsm,
1072 struct obd_trans_info *oti,
1073 struct lov_request_set **reqset)
1075 struct lov_request_set *set;
1076 struct lov_oinfo *loi = NULL;
1077 struct lov_obd *lov = &exp->exp_obd->u.lov;
1078 int rc = 0, cookie_set = 0, i;
1081 OBD_ALLOC(set, sizeof(*set));
1088 set->set_oa = src_oa;
1090 if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1091 set->set_cookies = oti->oti_logcookies;
1093 loi = lsm->lsm_oinfo;
1094 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1095 struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
1096 struct lov_request *req;
1098 if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
1099 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1103 OBD_ALLOC(req, sizeof(*req));
1105 lov_tgt_decref(lov, tgt);
1106 GOTO(out_set, rc = -ENOMEM);
1110 req->rq_idx = loi->loi_ost_idx;
1111 req->rq_gen = loi->loi_ost_gen;
1113 req->rq_oa = obdo_alloc();
1114 if (req->rq_oa == NULL) {
1115 OBD_FREE(req, sizeof(*req));
1116 lov_tgt_decref(lov, tgt);
1117 GOTO(out_set, rc = -ENOMEM);
1120 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1121 req->rq_oa->o_id = loi->loi_id;
1123 /* Setup the first request's cookie position */
1124 if (!cookie_set && set->set_cookies) {
1125 oti->oti_logcookies = set->set_cookies + i;
1128 lov_set_add_req(req, set);
1130 if (!set->set_count)
1131 GOTO(out_set, rc = -EIO);
1135 lov_fini_destroy_set(set);
1139 static int setattr_done(struct lov_request_set *set)
1141 return common_attr_done(set);
1144 int lov_fini_setattr_set(struct lov_request_set *set)
1149 LASSERT(set->set_exp);
1152 if (set->set_completes) {
1153 rc = setattr_done(set);
1154 /* FIXME update qos data here */
1157 if (atomic_dec_and_test(&set->set_refcount))
1158 lov_finish_set(set);
1162 int lov_prep_setattr_set(struct obd_export *exp, struct obdo *src_oa,
1163 struct lov_stripe_md *lsm, struct obd_trans_info *oti,
1164 struct lov_request_set **reqset)
1166 struct lov_request_set *set;
1167 struct lov_oinfo *loi = NULL;
1168 struct lov_obd *lov = &exp->exp_obd->u.lov;
1172 OBD_ALLOC(set, sizeof(*set));
1179 set->set_oa = src_oa;
1181 loi = lsm->lsm_oinfo;
1182 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1183 struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
1184 struct lov_request *req;
1186 if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
1187 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1191 OBD_ALLOC(req, sizeof(*req));
1193 lov_tgt_decref(lov, tgt);
1194 GOTO(out_set, rc = -ENOMEM);
1198 req->rq_idx = loi->loi_ost_idx;
1199 req->rq_gen = loi->loi_ost_gen;
1201 req->rq_oa = obdo_alloc();
1202 if (req->rq_oa == NULL) {
1203 OBD_FREE(req, sizeof(*req));
1204 lov_tgt_decref(lov, tgt);
1205 GOTO(out_set, rc = -ENOMEM);
1208 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1209 req->rq_oa->o_id = loi->loi_id;
1210 LASSERT(!(req->rq_oa->o_valid & OBD_MD_FLGROUP) || req->rq_oa->o_gr>0);
1212 if (src_oa->o_valid & OBD_MD_FLSIZE) {
1213 if (lov_stripe_offset(lsm, src_oa->o_size, i,
1214 &req->rq_oa->o_size) < 0 &&
1216 req->rq_oa->o_size--;
1217 CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1218 i, req->rq_oa->o_size, src_oa->o_size);
1220 lov_set_add_req(req, set);
1222 if (!set->set_count)
1223 GOTO(out_set, rc = -EIO);
1227 lov_fini_setattr_set(set);
1231 int lov_update_punch_set(struct lov_request_set *set, struct lov_request *req,
1234 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
1237 lov_update_set(set, req, rc);
1239 struct lov_tgt_desc *tgt = lov->tgts + req->rq_idx;
1241 if (lov_tgt_active(lov, tgt, req->rq_gen))
1242 lov_tgt_decref(lov, tgt);
1247 /* FIXME in raid1 regime, should return 0 */
1251 int lov_fini_punch_set(struct lov_request_set *set)
1256 LASSERT(set->set_exp);
1259 if (set->set_completes) {
1260 if (!set->set_success)
1262 /* FIXME update qos data here */
1265 if (atomic_dec_and_test(&set->set_refcount))
1266 lov_finish_set(set);
1271 int lov_prep_punch_set(struct obd_export *exp, struct obdo *src_oa,
1272 struct lov_stripe_md *lsm, obd_off start,
1273 obd_off end, struct obd_trans_info *oti,
1274 struct lov_request_set **reqset)
1276 struct lov_request_set *set;
1277 struct lov_oinfo *loi = NULL;
1278 struct lov_obd *lov = &exp->exp_obd->u.lov;
1282 OBD_ALLOC(set, sizeof(*set));
1289 set->set_oa = src_oa;
1291 loi = lsm->lsm_oinfo;
1292 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1293 struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
1294 struct lov_request *req;
1297 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1300 if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
1301 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1305 OBD_ALLOC(req, sizeof(*req));
1307 lov_tgt_decref(lov, tgt);
1308 GOTO(out_set, rc = -ENOMEM);
1312 req->rq_idx = loi->loi_ost_idx;
1313 req->rq_gen = loi->loi_ost_gen;
1315 req->rq_oa = obdo_alloc();
1316 if (req->rq_oa == NULL) {
1317 OBD_FREE(req, sizeof(*req));
1318 lov_tgt_decref(lov, tgt);
1319 GOTO(out_set, rc = -ENOMEM);
1322 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1323 req->rq_oa->o_id = loi->loi_id;
1324 req->rq_oa->o_gr = loi->loi_gr;
1325 req->rq_oa->o_valid |= OBD_MD_FLGROUP;
1327 req->rq_extent.start = rs;
1328 req->rq_extent.end = re;
1329 req->rq_extent.gid = -1;
1331 lov_set_add_req(req, set);
1333 if (!set->set_count)
1334 GOTO(out_set, rc = -EIO);
1338 lov_fini_punch_set(set);
1342 int lov_fini_sync_set(struct lov_request_set *set)
1347 LASSERT(set->set_exp);
1350 if (set->set_completes) {
1351 if (!set->set_success)
1353 /* FIXME update qos data here */
1356 if (atomic_dec_and_test(&set->set_refcount))
1357 lov_finish_set(set);
1362 int lov_prep_sync_set(struct obd_export *exp, struct obdo *src_oa,
1363 struct lov_stripe_md *lsm, obd_off start,
1364 obd_off end, struct lov_request_set **reqset)
1366 struct lov_request_set *set;
1367 struct lov_oinfo *loi = NULL;
1368 struct lov_obd *lov = &exp->exp_obd->u.lov;
1372 OBD_ALLOC(set, sizeof(*set));
1379 set->set_oa = src_oa;
1381 loi = lsm->lsm_oinfo;
1382 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1383 struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
1384 struct lov_request *req;
1387 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1390 if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
1391 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1395 OBD_ALLOC(req, sizeof(*req));
1397 lov_tgt_decref(lov, tgt);
1398 GOTO(out_set, rc = -ENOMEM);
1402 req->rq_idx = loi->loi_ost_idx;
1403 req->rq_gen = loi->loi_ost_gen;
1405 req->rq_oa = obdo_alloc();
1406 if (req->rq_oa == NULL) {
1407 OBD_FREE(req, sizeof(*req));
1408 lov_tgt_decref(lov, tgt);
1409 GOTO(out_set, rc = -ENOMEM);
1411 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1412 req->rq_oa->o_id = loi->loi_id;
1413 req->rq_extent.start = rs;
1414 req->rq_extent.end = re;
1415 req->rq_extent.gid = -1;
1417 lov_set_add_req(req, set);
1419 if (!set->set_count)
1420 GOTO(out_set, rc = -EIO);
1424 lov_fini_sync_set(set);