1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
6 * This file is part of the Lustre file system, http://www.lustre.org
7 * Lustre is a trademark of Cluster File Systems, Inc.
9 * You may have signed or agreed to another license before downloading
10 * this software. If so, you are bound by the terms and conditions
11 * of that agreement, and the following does not apply to you. See the
12 * LICENSE file included with this distribution for more information.
14 * If you did not agree to a different license, then this copy of Lustre
15 * is open source software; you can redistribute it and/or modify it
16 * under the terms of version 2 of the GNU General Public License as
17 * published by the Free Software Foundation.
19 * In either case, Lustre is distributed in the hope that it will be
20 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22 * license text for more details.
26 # define EXPORT_SYMTAB
28 #define DEBUG_SUBSYSTEM S_LOV
31 #include <libcfs/libcfs.h>
33 #include <liblustre.h>
36 #include <obd_class.h>
38 #include <lustre/lustre_idl.h>
40 #include "lov_internal.h"
42 static void lov_init_set(struct lov_request_set *set)
45 set->set_completes = 0;
48 CFS_INIT_LIST_HEAD(&set->set_list);
49 atomic_set(&set->set_refcount, 1);
52 static void lov_finish_set(struct lov_request_set *set)
54 struct list_head *pos, *n;
58 list_for_each_safe(pos, n, &set->set_list) {
59 struct lov_request *req = list_entry(pos, struct lov_request,
61 list_del_init(&req->rq_link);
64 obdo_free(req->rq_oi.oi_oa);
66 OBD_FREE(req->rq_oi.oi_md, req->rq_buflen);
67 if (req->rq_oi.oi_osfs)
68 OBD_FREE(req->rq_oi.oi_osfs,
69 sizeof(*req->rq_oi.oi_osfs));
70 OBD_FREE(req, sizeof(*req));
74 int len = set->set_oabufs * sizeof(*set->set_pga);
75 OBD_FREE(set->set_pga, len);
78 lov_llh_put(set->set_lockh);
80 OBD_FREE(set, sizeof(*set));
84 void lov_update_set(struct lov_request_set *set,
85 struct lov_request *req, int rc)
95 int lov_update_common_set(struct lov_request_set *set,
96 struct lov_request *req, int rc)
98 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
101 lov_update_set(set, req, rc);
103 /* grace error on inactive ost */
104 if (rc && !(lov->lov_tgts[req->rq_idx] &&
105 lov->lov_tgts[req->rq_idx]->ltd_active))
108 /* FIXME in raid1 regime, should return 0 */
112 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
114 list_add_tail(&req->rq_link, &set->set_list);
118 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
120 struct lov_request_set *set = req->rq_rqset;
121 struct lustre_handle *lov_lockhp;
122 struct lov_oinfo *loi;
125 LASSERT(set != NULL);
126 LASSERT(set->set_oi != NULL);
128 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
129 loi = &set->set_oi->oi_md->lsm_oinfo[req->rq_stripe];
131 /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
132 * and that copy can be arbitrarily out of date.
134 * The LOV API is due for a serious rewriting anyways, and this
135 * can be addressed then. */
137 if (rc == ELDLM_OK) {
138 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
141 LASSERT(lock != NULL);
142 lov_stripe_lock(set->set_oi->oi_md);
143 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo->loi_lvb;
144 tmp = loi->loi_lvb.lvb_size;
145 /* Extend KMS up to the end of this lock and no further
146 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
147 if (tmp > lock->l_policy_data.l_extent.end)
148 tmp = lock->l_policy_data.l_extent.end + 1;
149 if (tmp >= loi->loi_kms) {
150 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
151 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
153 loi->loi_kms_valid = 1;
155 LDLM_DEBUG(lock, "lock acquired, setting rss="
156 LPU64"; leaving kms="LPU64", end="LPU64,
157 loi->loi_lvb.lvb_size, loi->loi_kms,
158 lock->l_policy_data.l_extent.end);
160 lov_stripe_unlock(set->set_oi->oi_md);
161 ldlm_lock_allow_match(lock);
163 } else if ((rc == ELDLM_LOCK_ABORTED) &&
164 (set->set_ei->ei_flags & LDLM_FL_HAS_INTENT)) {
165 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
166 lov_stripe_lock(set->set_oi->oi_md);
167 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo->loi_lvb;
168 lov_stripe_unlock(set->set_oi->oi_md);
169 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
170 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
173 struct obd_export *exp = set->set_exp;
174 struct lov_obd *lov = &exp->exp_obd->u.lov;
176 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
177 if (lov->lov_tgts[req->rq_idx] &&
178 lov->lov_tgts[req->rq_idx]->ltd_active) {
179 CERROR("error: enqueue objid "LPX64" subobj "
180 LPX64" on OST idx %d: rc = %d\n",
181 set->set_oi->oi_md->lsm_object_id,
182 loi->loi_id, loi->loi_ost_idx, rc);
187 lov_update_set(set, req, rc);
191 /* The callback for osc_enqueue that updates lov info for every OSC request. */
192 static int cb_update_enqueue(struct obd_info *oinfo, int rc)
194 struct obd_enqueue_info *einfo;
195 struct lov_request *lovreq;
197 lovreq = container_of(oinfo, struct lov_request, rq_oi);
198 einfo = lovreq->rq_rqset->set_ei;
199 return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
202 static int enqueue_done(struct lov_request_set *set, __u32 mode)
204 struct lov_request *req;
205 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
209 LASSERT(set->set_completes);
210 /* enqueue/match success, just return */
211 if (set->set_completes == set->set_success)
214 /* cancel enqueued/matched locks */
215 list_for_each_entry(req, &set->set_list, rq_link) {
216 struct lustre_handle *lov_lockhp;
218 if (!req->rq_complete || req->rq_rc)
221 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
223 if (!lustre_handle_is_used(lov_lockhp))
226 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
227 req->rq_oi.oi_md, mode, lov_lockhp);
228 if (rc && lov->lov_tgts[req->rq_idx] &&
229 lov->lov_tgts[req->rq_idx]->ltd_active)
230 CERROR("cancelling obdjid "LPX64" on OST "
231 "idx %d error: rc = %d\n",
232 req->rq_oi.oi_md->lsm_object_id,
235 lov_llh_put(set->set_lockh);
239 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode)
246 LASSERT(set->set_exp);
247 /* Do enqueue_done only for sync requests and if any request
249 if (!set->set_ei->ei_rqset && set->set_completes)
250 rc = enqueue_done(set, mode);
252 lov_llh_put(set->set_lockh);
254 if (atomic_dec_and_test(&set->set_refcount))
260 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
261 struct obd_enqueue_info *einfo,
262 struct lov_request_set **reqset)
264 struct lov_obd *lov = &exp->exp_obd->u.lov;
265 struct lov_request_set *set;
267 struct lov_oinfo *loi;
270 OBD_ALLOC(set, sizeof(*set));
278 set->set_lockh = lov_llh_new(oinfo->oi_md);
279 if (set->set_lockh == NULL)
280 GOTO(out_set, rc = -ENOMEM);
281 oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
283 loi = oinfo->oi_md->lsm_oinfo;
284 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++, loi++) {
285 struct lov_request *req;
288 if (!lov_stripe_intersects(oinfo->oi_md, i,
289 oinfo->oi_policy.l_extent.start,
290 oinfo->oi_policy.l_extent.end,
294 if (!lov->lov_tgts[loi->loi_ost_idx] ||
295 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
296 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
300 OBD_ALLOC(req, sizeof(*req));
302 GOTO(out_set, rc = -ENOMEM);
304 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
305 sizeof(struct lov_oinfo);
306 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
307 if (req->rq_oi.oi_md == NULL) {
308 OBD_FREE(req, sizeof(*req));
309 GOTO(out_set, rc = -ENOMEM);
313 /* Set lov request specific parameters. */
314 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
315 req->rq_oi.oi_cb_up = cb_update_enqueue;
317 LASSERT(req->rq_oi.oi_lockh);
319 req->rq_oi.oi_policy.l_extent.gid =
320 oinfo->oi_policy.l_extent.gid;
321 req->rq_oi.oi_policy.l_extent.start = start;
322 req->rq_oi.oi_policy.l_extent.end = end;
324 req->rq_idx = loi->loi_ost_idx;
327 /* XXX LOV STACKING: submd should be from the subobj */
328 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
329 req->rq_oi.oi_md->lsm_stripe_count = 0;
330 req->rq_oi.oi_md->lsm_oinfo->loi_kms_valid =
332 req->rq_oi.oi_md->lsm_oinfo->loi_kms = loi->loi_kms;
333 req->rq_oi.oi_md->lsm_oinfo->loi_lvb = loi->loi_lvb;
335 lov_set_add_req(req, set);
338 GOTO(out_set, rc = -EIO);
342 lov_fini_enqueue_set(set, einfo->ei_mode);
346 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
354 lov_update_set(set, req, ret);
358 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
365 LASSERT(set->set_exp);
366 if (set->set_completes) {
367 if (set->set_count == set->set_success &&
368 flags & LDLM_FL_TEST_LOCK)
369 lov_llh_put(set->set_lockh);
370 rc = enqueue_done(set, mode);
372 lov_llh_put(set->set_lockh);
375 if (atomic_dec_and_test(&set->set_refcount))
381 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
382 struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
383 __u32 mode, struct lustre_handle *lockh,
384 struct lov_request_set **reqset)
386 struct lov_obd *lov = &exp->exp_obd->u.lov;
387 struct lov_request_set *set;
389 struct lov_oinfo *loi;
392 OBD_ALLOC(set, sizeof(*set));
399 set->set_oi->oi_md = lsm;
400 set->set_lockh = lov_llh_new(lsm);
401 if (set->set_lockh == NULL)
402 GOTO(out_set, rc = -ENOMEM);
403 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
405 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
406 struct lov_request *req;
409 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
410 policy->l_extent.end, &start, &end))
413 /* FIXME raid1 should grace this error */
414 if (!lov->lov_tgts[loi->loi_ost_idx] ||
415 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
416 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
417 GOTO(out_set, rc = -EIO);
420 OBD_ALLOC(req, sizeof(*req));
422 GOTO(out_set, rc = -ENOMEM);
424 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
425 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
426 if (req->rq_oi.oi_md == NULL) {
427 OBD_FREE(req, sizeof(*req));
428 GOTO(out_set, rc = -ENOMEM);
431 req->rq_oi.oi_policy.l_extent.start = start;
432 req->rq_oi.oi_policy.l_extent.end = end;
433 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
435 req->rq_idx = loi->loi_ost_idx;
438 /* XXX LOV STACKING: submd should be from the subobj */
439 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
440 req->rq_oi.oi_md->lsm_stripe_count = 0;
442 lov_set_add_req(req, set);
445 GOTO(out_set, rc = -EIO);
449 lov_fini_match_set(set, mode, 0);
453 int lov_fini_cancel_set(struct lov_request_set *set)
461 LASSERT(set->set_exp);
463 lov_llh_put(set->set_lockh);
465 if (atomic_dec_and_test(&set->set_refcount))
471 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
472 struct lov_stripe_md *lsm, __u32 mode,
473 struct lustre_handle *lockh,
474 struct lov_request_set **reqset)
476 struct lov_request_set *set;
478 struct lov_oinfo *loi;
481 OBD_ALLOC(set, sizeof(*set));
488 set->set_oi->oi_md = lsm;
489 set->set_lockh = lov_handle2llh(lockh);
490 if (set->set_lockh == NULL) {
491 CERROR("LOV: invalid lov lock handle %p\n", lockh);
492 GOTO(out_set, rc = -EINVAL);
494 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
496 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
497 struct lov_request *req;
498 struct lustre_handle *lov_lockhp;
500 lov_lockhp = set->set_lockh->llh_handles + i;
501 if (!lustre_handle_is_used(lov_lockhp)) {
502 CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
503 loi->loi_ost_idx, loi->loi_id);
507 OBD_ALLOC(req, sizeof(*req));
509 GOTO(out_set, rc = -ENOMEM);
511 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
512 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
513 if (req->rq_oi.oi_md == NULL) {
514 OBD_FREE(req, sizeof(*req));
515 GOTO(out_set, rc = -ENOMEM);
518 req->rq_idx = loi->loi_ost_idx;
521 /* XXX LOV STACKING: submd should be from the subobj */
522 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
523 req->rq_oi.oi_md->lsm_stripe_count = 0;
525 lov_set_add_req(req, set);
528 GOTO(out_set, rc = -EIO);
532 lov_fini_cancel_set(set);
536 static int create_done(struct obd_export *exp, struct lov_request_set *set,
537 struct lov_stripe_md **lsmp)
539 struct lov_obd *lov = &exp->exp_obd->u.lov;
540 struct obd_trans_info *oti = set->set_oti;
541 struct obdo *src_oa = set->set_oi->oi_oa;
542 struct lov_request *req;
543 struct obdo *ret_oa = NULL;
544 int attrset = 0, rc = 0;
547 LASSERT(set->set_completes);
549 /* try alloc objects on other osts if osc_create fails for
550 * exceptions: RPC failure, ENOSPC, etc */
551 if (set->set_count != set->set_success) {
552 list_for_each_entry (req, &set->set_list, rq_link) {
556 set->set_completes--;
557 req->rq_complete = 0;
559 rc = qos_remedy_create(set, req);
560 lov_update_create_set(set, req, rc);
567 /* no successful creates */
568 if (set->set_success == 0)
571 /* If there was an explicit stripe set, fail. Otherwise, we
572 * got some objects and that's not bad. */
573 if (set->set_count != set->set_success) {
576 set->set_count = set->set_success;
580 ret_oa = obdo_alloc();
582 GOTO(cleanup, rc = -ENOMEM);
584 list_for_each_entry(req, &set->set_list, rq_link) {
585 if (!req->rq_complete || req->rq_rc)
587 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
588 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
589 req->rq_stripe, &attrset);
591 if (src_oa->o_valid & OBD_MD_FLSIZE &&
592 ret_oa->o_size != src_oa->o_size) {
593 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
594 src_oa->o_size, ret_oa->o_size);
597 ret_oa->o_id = src_oa->o_id;
598 memcpy(src_oa, ret_oa, sizeof(*src_oa));
601 *lsmp = set->set_oi->oi_md;
605 list_for_each_entry(req, &set->set_list, rq_link) {
606 struct obd_export *sub_exp;
609 if (!req->rq_complete || req->rq_rc)
612 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
613 err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL);
615 CERROR("Failed to uncreate objid "LPX64" subobj "
616 LPX64" on OST idx %d: rc = %d\n",
617 src_oa->o_id, req->rq_oi.oi_oa->o_id,
621 obd_free_memmd(exp, &set->set_oi->oi_md);
623 if (oti && set->set_cookies) {
624 oti->oti_logcookies = set->set_cookies;
625 if (!set->set_cookie_sent) {
626 oti_free_cookies(oti);
627 src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
629 src_oa->o_valid |= OBD_MD_FLCOOKIE;
635 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
642 LASSERT(set->set_exp);
643 if (set->set_completes)
644 rc = create_done(set->set_exp, set, lsmp);
646 if (atomic_dec_and_test(&set->set_refcount))
652 int lov_update_create_set(struct lov_request_set *set,
653 struct lov_request *req, int rc)
655 struct obd_trans_info *oti = set->set_oti;
656 struct lov_stripe_md *lsm = set->set_oi->oi_md;
657 struct lov_oinfo *loi;
658 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
661 req->rq_stripe = set->set_success;
662 loi = &lsm->lsm_oinfo[req->rq_stripe];
664 if (rc && lov->lov_tgts[req->rq_idx] &&
665 lov->lov_tgts[req->rq_idx]->ltd_active) {
666 CERROR("error creating fid "LPX64" sub-object"
667 " on OST idx %d/%d: rc = %d\n",
668 set->set_oi->oi_oa->o_id, req->rq_idx,
669 lsm->lsm_stripe_count, rc);
671 CERROR("obd_create returned invalid err %d\n", rc);
675 lov_update_set(set, req, rc);
679 if (oti && oti->oti_objid)
680 oti->oti_objid[req->rq_idx] = req->rq_oi.oi_oa->o_id;
682 loi->loi_id = req->rq_oi.oi_oa->o_id;
683 loi->loi_ost_idx = req->rq_idx;
684 CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPX64" at idx %d\n",
685 lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
688 if (oti && set->set_cookies)
689 ++oti->oti_logcookies;
690 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
691 set->set_cookie_sent++;
696 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
697 struct lov_stripe_md **lsmp, struct obdo *src_oa,
698 struct obd_trans_info *oti,
699 struct lov_request_set **reqset)
701 struct lov_request_set *set;
705 OBD_ALLOC(set, sizeof(*set));
712 set->set_oi->oi_md = *lsmp;
713 set->set_oi->oi_oa = src_oa;
716 rc = qos_prep_create(exp, set);
718 lov_fini_create_set(set, lsmp);
724 static int common_attr_done(struct lov_request_set *set)
726 struct list_head *pos;
727 struct lov_request *req;
729 int rc = 0, attrset = 0;
732 LASSERT(set->set_oi != NULL);
734 if (set->set_oi->oi_oa == NULL)
737 if (!set->set_success)
740 tmp_oa = obdo_alloc();
742 GOTO(out, rc = -ENOMEM);
744 list_for_each (pos, &set->set_list) {
745 req = list_entry(pos, struct lov_request, rq_link);
747 if (!req->rq_complete || req->rq_rc)
749 if (req->rq_oi.oi_oa->o_valid == 0) /* inactive stripe */
751 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
752 req->rq_oi.oi_oa->o_valid,
753 set->set_oi->oi_md, req->rq_stripe, &attrset);
756 CERROR("No stripes had valid attrs\n");
759 tmp_oa->o_id = set->set_oi->oi_oa->o_id;
760 memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
768 static int brw_done(struct lov_request_set *set)
770 struct lov_stripe_md *lsm = set->set_oi->oi_md;
771 struct lov_oinfo *loi = NULL;
772 struct list_head *pos;
773 struct lov_request *req;
776 list_for_each (pos, &set->set_list) {
777 req = list_entry(pos, struct lov_request, rq_link);
779 if (!req->rq_complete || req->rq_rc)
782 loi = &lsm->lsm_oinfo[req->rq_stripe];
784 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
785 loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
791 int lov_fini_brw_set(struct lov_request_set *set)
798 LASSERT(set->set_exp);
799 if (set->set_completes) {
801 /* FIXME update qos data here */
803 if (atomic_dec_and_test(&set->set_refcount))
809 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
810 obd_count oa_bufs, struct brw_page *pga,
811 struct obd_trans_info *oti,
812 struct lov_request_set **reqset)
819 struct lov_request_set *set;
820 struct lov_oinfo *loi = NULL;
821 struct lov_obd *lov = &exp->exp_obd->u.lov;
822 int rc = 0, i, shift;
825 OBD_ALLOC(set, sizeof(*set));
833 set->set_oabufs = oa_bufs;
834 OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
836 GOTO(out, rc = -ENOMEM);
838 OBD_ALLOC(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
840 GOTO(out, rc = -ENOMEM);
842 /* calculate the page count for each stripe */
843 for (i = 0; i < oa_bufs; i++) {
844 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
845 info[stripe].count++;
848 /* alloc and initialize lov request */
850 for (i = 0, loi = oinfo->oi_md->lsm_oinfo;
851 i < oinfo->oi_md->lsm_stripe_count; i++, loi++){
852 struct lov_request *req;
854 if (info[i].count == 0)
857 if (!lov->lov_tgts[loi->loi_ost_idx] ||
858 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
859 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
860 GOTO(out, rc = -EIO);
863 OBD_ALLOC(req, sizeof(*req));
865 GOTO(out, rc = -ENOMEM);
867 req->rq_oi.oi_oa = obdo_alloc();
868 if (req->rq_oi.oi_oa == NULL) {
869 OBD_FREE(req, sizeof(*req));
870 GOTO(out, rc = -ENOMEM);
874 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
875 sizeof(*req->rq_oi.oi_oa));
877 req->rq_oi.oi_oa->o_id = loi->loi_id;
878 req->rq_oi.oi_oa->o_stripe_idx = i;
880 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
881 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
882 if (req->rq_oi.oi_md == NULL) {
883 obdo_free(req->rq_oi.oi_oa);
884 OBD_FREE(req, sizeof(*req));
885 GOTO(out, rc = -ENOMEM);
888 req->rq_idx = loi->loi_ost_idx;
891 /* XXX LOV STACKING */
892 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
893 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
894 req->rq_oabufs = info[i].count;
895 req->rq_pgaidx = shift;
896 shift += req->rq_oabufs;
898 /* remember the index for sort brw_page array */
899 info[i].index = req->rq_pgaidx;
901 lov_set_add_req(req, set);
904 GOTO(out, rc = -EIO);
906 /* rotate & sort the brw_page array */
907 for (i = 0; i < oa_bufs; i++) {
908 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
910 shift = info[stripe].index + info[stripe].off;
911 LASSERT(shift < oa_bufs);
912 set->set_pga[shift] = pga[i];
913 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
914 &set->set_pga[shift].off);
919 OBD_FREE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
924 lov_fini_brw_set(set);
929 int lov_fini_getattr_set(struct lov_request_set *set)
936 LASSERT(set->set_exp);
937 if (set->set_completes)
938 rc = common_attr_done(set);
940 if (atomic_dec_and_test(&set->set_refcount))
946 /* The callback for osc_getattr_async that finilizes a request info when a
947 * response is recieved. */
948 static int cb_getattr_update(struct obd_info *oinfo, int rc)
950 struct lov_request *lovreq;
951 lovreq = container_of(oinfo, struct lov_request, rq_oi);
952 return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
955 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
956 struct lov_request_set **reqset)
958 struct lov_request_set *set;
959 struct lov_oinfo *loi = NULL;
960 struct lov_obd *lov = &exp->exp_obd->u.lov;
964 OBD_ALLOC(set, sizeof(*set));
972 loi = oinfo->oi_md->lsm_oinfo;
973 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++, loi++) {
974 struct lov_request *req;
976 if (!lov->lov_tgts[loi->loi_ost_idx] ||
977 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
978 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
982 OBD_ALLOC(req, sizeof(*req));
984 GOTO(out_set, rc = -ENOMEM);
987 req->rq_idx = loi->loi_ost_idx;
989 req->rq_oi.oi_oa = obdo_alloc();
990 if (req->rq_oi.oi_oa == NULL) {
991 OBD_FREE(req, sizeof(*req));
992 GOTO(out_set, rc = -ENOMEM);
994 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
995 sizeof(*req->rq_oi.oi_oa));
996 req->rq_oi.oi_oa->o_id = loi->loi_id;
997 req->rq_oi.oi_cb_up = cb_getattr_update;
999 lov_set_add_req(req, set);
1001 if (!set->set_count)
1002 GOTO(out_set, rc = -EIO);
1006 lov_fini_getattr_set(set);
1010 int lov_fini_destroy_set(struct lov_request_set *set)
1016 LASSERT(set->set_exp);
1017 if (set->set_completes) {
1018 /* FIXME update qos data here */
1021 if (atomic_dec_and_test(&set->set_refcount))
1022 lov_finish_set(set);
1027 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1028 struct obdo *src_oa, struct lov_stripe_md *lsm,
1029 struct obd_trans_info *oti,
1030 struct lov_request_set **reqset)
1032 struct lov_request_set *set;
1033 struct lov_oinfo *loi = NULL;
1034 struct lov_obd *lov = &exp->exp_obd->u.lov;
1038 OBD_ALLOC(set, sizeof(*set));
1044 set->set_oi = oinfo;
1045 set->set_oi->oi_md = lsm;
1046 set->set_oi->oi_oa = src_oa;
1048 if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1049 set->set_cookies = oti->oti_logcookies;
1051 loi = lsm->lsm_oinfo;
1052 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1053 struct lov_request *req;
1055 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1056 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1057 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1061 OBD_ALLOC(req, sizeof(*req));
1063 GOTO(out_set, rc = -ENOMEM);
1066 req->rq_idx = loi->loi_ost_idx;
1068 req->rq_oi.oi_oa = obdo_alloc();
1069 if (req->rq_oi.oi_oa == NULL) {
1070 OBD_FREE(req, sizeof(*req));
1071 GOTO(out_set, rc = -ENOMEM);
1073 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1074 req->rq_oi.oi_oa->o_id = loi->loi_id;
1075 lov_set_add_req(req, set);
1077 if (!set->set_count)
1078 GOTO(out_set, rc = -EIO);
1082 lov_fini_destroy_set(set);
1086 int lov_fini_setattr_set(struct lov_request_set *set)
1093 LASSERT(set->set_exp);
1094 if (set->set_completes) {
1095 rc = common_attr_done(set);
1096 /* FIXME update qos data here */
1099 if (atomic_dec_and_test(&set->set_refcount))
1100 lov_finish_set(set);
1104 int lov_update_setattr_set(struct lov_request_set *set,
1105 struct lov_request *req, int rc)
1107 struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1108 struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1111 lov_update_set(set, req, rc);
1113 /* grace error on inactive ost */
1114 if (rc && !(lov->lov_tgts[req->rq_idx] &&
1115 lov->lov_tgts[req->rq_idx]->ltd_active))
1119 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1120 lsm->lsm_oinfo[req->rq_stripe].loi_lvb.lvb_ctime =
1121 req->rq_oi.oi_oa->o_ctime;
1122 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1123 lsm->lsm_oinfo[req->rq_stripe].loi_lvb.lvb_mtime =
1124 req->rq_oi.oi_oa->o_mtime;
1125 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1126 lsm->lsm_oinfo[req->rq_stripe].loi_lvb.lvb_atime =
1127 req->rq_oi.oi_oa->o_atime;
1133 /* The callback for osc_setattr_async that finilizes a request info when a
1134 * response is recieved. */
1135 static int cb_setattr_update(struct obd_info *oinfo, int rc)
1137 struct lov_request *lovreq;
1138 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1139 return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1142 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1143 struct obd_trans_info *oti,
1144 struct lov_request_set **reqset)
1146 struct lov_request_set *set;
1147 struct lov_oinfo *loi = NULL;
1148 struct lov_obd *lov = &exp->exp_obd->u.lov;
1152 OBD_ALLOC(set, sizeof(*set));
1159 set->set_oi = oinfo;
1160 if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1161 set->set_cookies = oti->oti_logcookies;
1163 loi = oinfo->oi_md->lsm_oinfo;
1164 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++, loi++) {
1165 struct lov_request *req;
1167 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1168 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1169 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1173 OBD_ALLOC(req, sizeof(*req));
1175 GOTO(out_set, rc = -ENOMEM);
1177 req->rq_idx = loi->loi_ost_idx;
1179 req->rq_oi.oi_oa = obdo_alloc();
1180 if (req->rq_oi.oi_oa == NULL) {
1181 OBD_FREE(req, sizeof(*req));
1182 GOTO(out_set, rc = -ENOMEM);
1184 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1185 sizeof(*req->rq_oi.oi_oa));
1186 req->rq_oi.oi_oa->o_id = loi->loi_id;
1187 req->rq_oi.oi_oa->o_stripe_idx = i;
1188 req->rq_oi.oi_cb_up = cb_setattr_update;
1189 req->rq_rqset = set;
1191 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1192 int off = lov_stripe_offset(oinfo->oi_md,
1193 oinfo->oi_oa->o_size, i,
1194 &req->rq_oi.oi_oa->o_size);
1196 if (off < 0 && req->rq_oi.oi_oa->o_size)
1197 req->rq_oi.oi_oa->o_size--;
1199 CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1200 i, req->rq_oi.oi_oa->o_size,
1201 oinfo->oi_oa->o_size);
1203 lov_set_add_req(req, set);
1205 if (!set->set_count)
1206 GOTO(out_set, rc = -EIO);
1210 lov_fini_setattr_set(set);
1214 int lov_fini_punch_set(struct lov_request_set *set)
1221 LASSERT(set->set_exp);
1222 if (set->set_completes) {
1223 if (!set->set_success)
1225 /* FIXME update qos data here */
1228 if (atomic_dec_and_test(&set->set_refcount))
1229 lov_finish_set(set);
1234 /* The callback for osc_punch that finilizes a request info when a response
1236 static int cb_update_punch(struct obd_info *oinfo, int rc)
1238 struct lov_request *lovreq;
1239 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1240 return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
1243 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1244 struct obd_trans_info *oti,
1245 struct lov_request_set **reqset)
1247 struct lov_request_set *set;
1248 struct lov_oinfo *loi = NULL;
1249 struct lov_obd *lov = &exp->exp_obd->u.lov;
1253 OBD_ALLOC(set, sizeof(*set));
1258 set->set_oi = oinfo;
1261 loi = oinfo->oi_md->lsm_oinfo;
1262 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++, loi++) {
1263 struct lov_request *req;
1266 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1267 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1268 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1272 if (!lov_stripe_intersects(oinfo->oi_md, i,
1273 oinfo->oi_policy.l_extent.start,
1274 oinfo->oi_policy.l_extent.end,
1278 OBD_ALLOC(req, sizeof(*req));
1280 GOTO(out_set, rc = -ENOMEM);
1282 req->rq_idx = loi->loi_ost_idx;
1284 req->rq_oi.oi_oa = obdo_alloc();
1285 if (req->rq_oi.oi_oa == NULL) {
1286 OBD_FREE(req, sizeof(*req));
1287 GOTO(out_set, rc = -ENOMEM);
1289 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1290 sizeof(*req->rq_oi.oi_oa));
1291 req->rq_oi.oi_oa->o_id = loi->loi_id;
1292 req->rq_oi.oi_oa->o_stripe_idx = i;
1293 req->rq_oi.oi_cb_up = cb_update_punch;
1294 req->rq_rqset = set;
1296 req->rq_oi.oi_policy.l_extent.start = rs;
1297 req->rq_oi.oi_policy.l_extent.end = re;
1298 req->rq_oi.oi_policy.l_extent.gid = -1;
1300 lov_set_add_req(req, set);
1302 if (!set->set_count)
1303 GOTO(out_set, rc = -EIO);
1307 lov_fini_punch_set(set);
1311 int lov_fini_sync_set(struct lov_request_set *set)
1318 LASSERT(set->set_exp);
1319 if (set->set_completes) {
1320 if (!set->set_success)
1322 /* FIXME update qos data here */
1325 if (atomic_dec_and_test(&set->set_refcount))
1326 lov_finish_set(set);
1331 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1332 struct obdo *src_oa, struct lov_stripe_md *lsm,
1333 obd_off start, obd_off end,
1334 struct lov_request_set **reqset)
1336 struct lov_request_set *set;
1337 struct lov_oinfo *loi = NULL;
1338 struct lov_obd *lov = &exp->exp_obd->u.lov;
1342 OBD_ALLOC(set, sizeof(*set));
1348 set->set_oi = oinfo;
1349 set->set_oi->oi_md = lsm;
1350 set->set_oi->oi_oa = src_oa;
1352 loi = lsm->lsm_oinfo;
1353 for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1354 struct lov_request *req;
1357 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1358 !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1359 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1363 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1366 OBD_ALLOC(req, sizeof(*req));
1368 GOTO(out_set, rc = -ENOMEM);
1370 req->rq_idx = loi->loi_ost_idx;
1372 req->rq_oi.oi_oa = obdo_alloc();
1373 if (req->rq_oi.oi_oa == NULL) {
1374 OBD_FREE(req, sizeof(*req));
1375 GOTO(out_set, rc = -ENOMEM);
1377 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1378 req->rq_oi.oi_oa->o_id = loi->loi_id;
1379 req->rq_oi.oi_oa->o_stripe_idx = i;
1381 req->rq_oi.oi_policy.l_extent.start = rs;
1382 req->rq_oi.oi_policy.l_extent.end = re;
1383 req->rq_oi.oi_policy.l_extent.gid = -1;
1385 lov_set_add_req(req, set);
1387 if (!set->set_count)
1388 GOTO(out_set, rc = -EIO);
1392 lov_fini_sync_set(set);
1396 #define LOV_U64_MAX ((__u64)~0ULL)
1397 #define LOV_SUM_MAX(tot, add) \
1399 if ((tot) + (add) < (tot)) \
1400 (tot) = LOV_U64_MAX; \
1405 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1410 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0);
1412 if (osfs->os_files != LOV_U64_MAX)
1413 do_div(osfs->os_files, expected_stripes);
1414 if (osfs->os_ffree != LOV_U64_MAX)
1415 do_div(osfs->os_ffree, expected_stripes);
1417 spin_lock(&obd->obd_osfs_lock);
1418 memcpy(&obd->obd_osfs, osfs, sizeof(osfs));
1419 obd->obd_osfs_age = cfs_time_current_64();
1420 spin_unlock(&obd->obd_osfs_lock);
1427 int lov_fini_statfs_set(struct lov_request_set *set)
1435 if (set->set_completes) {
1436 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1440 if (atomic_dec_and_test(&set->set_refcount))
1441 lov_finish_set(set);
1446 void lov_update_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1447 struct obd_statfs *lov_sfs, int success)
1449 spin_lock(&obd->obd_osfs_lock);
1450 memcpy(&obd->obd_osfs, lov_sfs, sizeof(osfs));
1451 obd->obd_osfs_age = cfs_time_current_64();
1452 spin_unlock(&obd->obd_osfs_lock);
1455 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1458 /* Sandia requested that df (and so, statfs) only
1459 returned minimal available space on
1460 a single OST, so people would be able to
1461 write this much data guaranteed. */
1462 if (osfs->os_bavail > lov_sfs->os_bavail) {
1463 /* Presumably if new bavail is smaller,
1464 new bfree is bigger as well */
1465 osfs->os_bfree = lov_sfs->os_bfree;
1466 osfs->os_bavail = lov_sfs->os_bavail;
1469 osfs->os_bfree += lov_sfs->os_bfree;
1470 osfs->os_bavail += lov_sfs->os_bavail;
1472 osfs->os_blocks += lov_sfs->os_blocks;
1473 /* XXX not sure about this one - depends on policy.
1474 * - could be minimum if we always stripe on all OBDs
1475 * (but that would be wrong for any other policy,
1476 * if one of the OBDs has no more objects left)
1477 * - could be sum if we stripe whole objects
1478 * - could be average, just to give a nice number
1480 * To give a "reasonable" (if not wholly accurate)
1481 * number, we divide the total number of free objects
1482 * by expected stripe count (watch out for overflow).
1484 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1485 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1489 /* The callback for osc_statfs_async that finilizes a request info when a
1490 * response is recieved. */
1491 static int cb_statfs_update(struct obd_info *oinfo, int rc)
1493 struct lov_request *lovreq;
1494 struct obd_statfs *osfs, *lov_sfs;
1495 struct obd_device *obd;
1496 struct lov_obd *lov;
1500 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1501 lov = &lovreq->rq_rqset->set_obd->u.lov;
1502 obd = class_exp2obd(lov->lov_tgts[lovreq->rq_idx]->ltd_exp);
1504 osfs = lovreq->rq_rqset->set_oi->oi_osfs;
1505 lov_sfs = oinfo->oi_osfs;
1507 success = lovreq->rq_rqset->set_success;
1509 /* XXX: the same is done in lov_update_common_set, however
1510 lovset->set_exp is not initialized. */
1511 lov_update_set(lovreq->rq_rqset, lovreq, rc);
1513 if (rc && !(lov->lov_tgts[lovreq->rq_idx] &&
1514 lov->lov_tgts[lovreq->rq_idx]->ltd_active))
1519 lov_update_statfs(obd, osfs, lov_sfs, success);
1523 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1524 struct lov_request_set **reqset)
1526 struct lov_request_set *set;
1527 struct lov_obd *lov = &obd->u.lov;
1531 OBD_ALLOC(set, sizeof(*set));
1537 set->set_oi = oinfo;
1539 /* We only get block data from the OBD */
1540 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1541 struct lov_request *req;
1543 if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active) {
1544 CDEBUG(D_HA, "lov idx %d inactive\n", i);
1548 OBD_ALLOC(req, sizeof(*req));
1550 GOTO(out_set, rc = -ENOMEM);
1552 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1553 if (req->rq_oi.oi_osfs == NULL) {
1554 OBD_FREE(req, sizeof(*req));
1555 GOTO(out_set, rc = -ENOMEM);
1559 req->rq_oi.oi_cb_up = cb_statfs_update;
1560 req->rq_rqset = set;
1562 lov_set_add_req(req, set);
1564 if (!set->set_count)
1565 GOTO(out_set, rc = -EIO);
1569 lov_fini_statfs_set(set);