4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_LOV
40 #include <libcfs/libcfs.h>
42 #include <liblustre.h>
45 #include <obd_class.h>
48 #include <lustre/lustre_idl.h>
50 #include "lov_internal.h"
52 static void lov_init_set(struct lov_request_set *set)
55 cfs_atomic_set(&set->set_completes, 0);
56 cfs_atomic_set(&set->set_success, 0);
57 cfs_atomic_set(&set->set_finish_checked, 0);
59 CFS_INIT_LIST_HEAD(&set->set_list);
60 cfs_atomic_set(&set->set_refcount, 1);
61 init_waitqueue_head(&set->set_waitq);
62 spin_lock_init(&set->set_lock);
65 void lov_finish_set(struct lov_request_set *set)
71 cfs_list_for_each_safe(pos, n, &set->set_list) {
72 struct lov_request *req = cfs_list_entry(pos,
75 cfs_list_del_init(&req->rq_link);
78 OBDO_FREE(req->rq_oi.oi_oa);
80 OBD_FREE_LARGE(req->rq_oi.oi_md, req->rq_buflen);
81 if (req->rq_oi.oi_osfs)
82 OBD_FREE(req->rq_oi.oi_osfs,
83 sizeof(*req->rq_oi.oi_osfs));
84 OBD_FREE(req, sizeof(*req));
88 int len = set->set_oabufs * sizeof(*set->set_pga);
89 OBD_FREE_LARGE(set->set_pga, len);
92 lov_llh_put(set->set_lockh);
94 OBD_FREE(set, sizeof(*set));
98 int lov_set_finished(struct lov_request_set *set, int idempotent)
100 int completes = cfs_atomic_read(&set->set_completes);
102 CDEBUG(D_INFO, "check set %d/%d\n", completes, set->set_count);
104 if (completes == set->set_count) {
107 if (cfs_atomic_inc_return(&set->set_finish_checked) == 1)
113 void lov_update_set(struct lov_request_set *set,
114 struct lov_request *req, int rc)
116 req->rq_complete = 1;
119 cfs_atomic_inc(&set->set_completes);
121 cfs_atomic_inc(&set->set_success);
123 wake_up(&set->set_waitq);
126 int lov_update_common_set(struct lov_request_set *set,
127 struct lov_request *req, int rc)
129 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
132 lov_update_set(set, req, rc);
134 /* grace error on inactive ost */
135 if (rc && !(lov->lov_tgts[req->rq_idx] &&
136 lov->lov_tgts[req->rq_idx]->ltd_active))
139 /* FIXME in raid1 regime, should return 0 */
143 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
145 cfs_list_add_tail(&req->rq_link, &set->set_list);
150 static int lov_check_set(struct lov_obd *lov, int idx)
153 mutex_lock(&lov->lov_lock);
155 if (lov->lov_tgts[idx] == NULL ||
156 lov->lov_tgts[idx]->ltd_active ||
157 (lov->lov_tgts[idx]->ltd_exp != NULL &&
158 class_exp2cliimp(lov->lov_tgts[idx]->ltd_exp)->imp_connect_tried))
161 mutex_unlock(&lov->lov_lock);
165 /* Check if the OSC connection exists and is active.
166 * If the OSC has not yet had a chance to connect to the OST the first time,
167 * wait once for it to connect instead of returning an error.
169 int lov_check_and_wait_active(struct lov_obd *lov, int ost_idx)
171 wait_queue_head_t waitq;
172 struct l_wait_info lwi;
173 struct lov_tgt_desc *tgt;
176 mutex_lock(&lov->lov_lock);
178 tgt = lov->lov_tgts[ost_idx];
180 if (unlikely(tgt == NULL))
183 if (likely(tgt->ltd_active))
186 if (tgt->ltd_exp && class_exp2cliimp(tgt->ltd_exp)->imp_connect_tried)
189 mutex_unlock(&lov->lov_lock);
191 init_waitqueue_head(&waitq);
192 lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(obd_timeout),
193 cfs_time_seconds(1), NULL, NULL);
195 rc = l_wait_event(waitq, lov_check_set(lov, ost_idx), &lwi);
202 mutex_unlock(&lov->lov_lock);
206 static int lov_update_enqueue_lov(struct obd_export *exp,
207 struct lustre_handle *lov_lockhp,
208 struct lov_oinfo *loi, __u64 flags, int idx,
209 struct ost_id *oi, int rc)
211 struct lov_obd *lov = &exp->exp_obd->u.lov;
213 if (rc != ELDLM_OK &&
214 !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) {
215 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
216 if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) {
217 /* -EUSERS used by OST to report file contention */
218 if (rc != -EINTR && rc != -EUSERS)
219 CERROR("%s: enqueue objid "DOSTID" subobj"
220 DOSTID" on OST idx %d: rc %d\n",
221 exp->exp_obd->obd_name,
222 POSTID(oi), POSTID(&loi->loi_oi),
223 loi->loi_ost_idx, rc);
230 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
232 struct lov_request_set *set = req->rq_rqset;
233 struct lustre_handle *lov_lockhp;
234 struct obd_info *oi = set->set_oi;
235 struct lov_oinfo *loi;
240 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
241 loi = oi->oi_md->lsm_oinfo[req->rq_stripe];
243 /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
244 * and that copy can be arbitrarily out of date.
246 * The LOV API is due for a serious rewriting anyways, and this
247 * can be addressed then. */
249 lov_stripe_lock(oi->oi_md);
250 osc_update_enqueue(lov_lockhp, loi, oi->oi_flags,
251 &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc);
252 if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT))
253 memset(lov_lockhp, 0, sizeof *lov_lockhp);
254 rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags,
255 req->rq_idx, &oi->oi_md->lsm_oi, rc);
256 lov_stripe_unlock(oi->oi_md);
257 lov_update_set(set, req, rc);
261 /* The callback for osc_enqueue that updates lov info for every OSC request. */
262 static int cb_update_enqueue(void *cookie, int rc)
264 struct obd_info *oinfo = cookie;
265 struct ldlm_enqueue_info *einfo;
266 struct lov_request *lovreq;
268 lovreq = container_of(oinfo, struct lov_request, rq_oi);
269 einfo = lovreq->rq_rqset->set_ei;
270 return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
273 static int enqueue_done(struct lov_request_set *set, __u32 mode)
275 struct lov_request *req;
276 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
277 int completes = cfs_atomic_read(&set->set_completes);
281 /* enqueue/match success, just return */
282 if (completes && completes == cfs_atomic_read(&set->set_success))
285 /* cancel enqueued/matched locks */
286 cfs_list_for_each_entry(req, &set->set_list, rq_link) {
287 struct lustre_handle *lov_lockhp;
289 if (!req->rq_complete || req->rq_rc)
292 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
294 if (!lustre_handle_is_used(lov_lockhp))
297 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
298 req->rq_oi.oi_md, mode, lov_lockhp);
299 if (rc && lov->lov_tgts[req->rq_idx] &&
300 lov->lov_tgts[req->rq_idx]->ltd_active)
301 CERROR("%s: cancelling obdjid "DOSTID" on OST"
302 "idx %d error: rc = %d\n",
303 set->set_exp->exp_obd->obd_name,
304 POSTID(&req->rq_oi.oi_md->lsm_oi),
308 lov_llh_put(set->set_lockh);
312 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
313 struct ptlrpc_request_set *rqset)
320 LASSERT(set->set_exp);
321 /* Do enqueue_done only for sync requests and if any request
325 cfs_atomic_set(&set->set_completes, 0);
326 ret = enqueue_done(set, mode);
327 } else if (set->set_lockh)
328 lov_llh_put(set->set_lockh);
332 RETURN(rc ? rc : ret);
335 static void lov_llh_addref(void *llhp)
337 struct lov_lock_handles *llh = llhp;
339 cfs_atomic_inc(&llh->llh_refcount);
340 CDEBUG(D_INFO, "GETting llh %p : new refcount %d\n", llh,
341 cfs_atomic_read(&llh->llh_refcount));
344 static struct portals_handle_ops lov_handle_ops = {
345 .hop_addref = lov_llh_addref,
349 static struct lov_lock_handles *lov_llh_new(struct lov_stripe_md *lsm)
351 struct lov_lock_handles *llh;
353 OBD_ALLOC(llh, sizeof *llh +
354 sizeof(*llh->llh_handles) * lsm->lsm_stripe_count);
358 cfs_atomic_set(&llh->llh_refcount, 2);
359 llh->llh_stripe_count = lsm->lsm_stripe_count;
360 CFS_INIT_LIST_HEAD(&llh->llh_handle.h_link);
361 class_handle_hash(&llh->llh_handle, &lov_handle_ops);
366 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
367 struct ldlm_enqueue_info *einfo,
368 struct lov_request_set **reqset)
370 struct lov_obd *lov = &exp->exp_obd->u.lov;
371 struct lov_request_set *set;
375 OBD_ALLOC(set, sizeof(*set));
383 set->set_lockh = lov_llh_new(oinfo->oi_md);
384 if (set->set_lockh == NULL)
385 GOTO(out_set, rc = -ENOMEM);
386 oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
388 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
389 struct lov_oinfo *loi;
390 struct lov_request *req;
393 loi = oinfo->oi_md->lsm_oinfo[i];
394 if (!lov_stripe_intersects(oinfo->oi_md, i,
395 oinfo->oi_policy.l_extent.start,
396 oinfo->oi_policy.l_extent.end,
400 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
401 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
405 OBD_ALLOC(req, sizeof(*req));
407 GOTO(out_set, rc = -ENOMEM);
409 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
410 sizeof(struct lov_oinfo *) +
411 sizeof(struct lov_oinfo);
412 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
413 if (req->rq_oi.oi_md == NULL) {
414 OBD_FREE(req, sizeof(*req));
415 GOTO(out_set, rc = -ENOMEM);
417 req->rq_oi.oi_md->lsm_oinfo[0] =
418 ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
419 sizeof(struct lov_oinfo *);
421 /* Set lov request specific parameters. */
422 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
423 req->rq_oi.oi_cb_up = cb_update_enqueue;
424 req->rq_oi.oi_flags = oinfo->oi_flags;
426 LASSERT(req->rq_oi.oi_lockh);
428 req->rq_oi.oi_policy.l_extent.gid =
429 oinfo->oi_policy.l_extent.gid;
430 req->rq_oi.oi_policy.l_extent.start = start;
431 req->rq_oi.oi_policy.l_extent.end = end;
433 req->rq_idx = loi->loi_ost_idx;
436 /* XXX LOV STACKING: submd should be from the subobj */
437 req->rq_oi.oi_md->lsm_oi = loi->loi_oi;
438 req->rq_oi.oi_md->lsm_stripe_count = 0;
439 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
441 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
442 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
444 lov_set_add_req(req, set);
447 GOTO(out_set, rc = -EIO);
451 lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
455 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, __u64 flags)
462 LASSERT(set->set_exp);
463 rc = enqueue_done(set, mode);
464 if ((set->set_count == cfs_atomic_read(&set->set_success)) &&
465 (flags & LDLM_FL_TEST_LOCK))
466 lov_llh_put(set->set_lockh);
473 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
474 struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
475 __u32 mode, struct lustre_handle *lockh,
476 struct lov_request_set **reqset)
478 struct lov_obd *lov = &exp->exp_obd->u.lov;
479 struct lov_request_set *set;
483 OBD_ALLOC(set, sizeof(*set));
490 set->set_oi->oi_md = lsm;
491 set->set_lockh = lov_llh_new(lsm);
492 if (set->set_lockh == NULL)
493 GOTO(out_set, rc = -ENOMEM);
494 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
496 for (i = 0; i < lsm->lsm_stripe_count; i++){
497 struct lov_oinfo *loi;
498 struct lov_request *req;
501 loi = lsm->lsm_oinfo[i];
502 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
503 policy->l_extent.end, &start, &end))
506 /* FIXME raid1 should grace this error */
507 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
508 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
509 GOTO(out_set, rc = -EIO);
512 OBD_ALLOC(req, sizeof(*req));
514 GOTO(out_set, rc = -ENOMEM);
516 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
517 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
518 if (req->rq_oi.oi_md == NULL) {
519 OBD_FREE(req, sizeof(*req));
520 GOTO(out_set, rc = -ENOMEM);
523 req->rq_oi.oi_policy.l_extent.start = start;
524 req->rq_oi.oi_policy.l_extent.end = end;
525 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
527 req->rq_idx = loi->loi_ost_idx;
530 /* XXX LOV STACKING: submd should be from the subobj */
531 req->rq_oi.oi_md->lsm_oi = loi->loi_oi;
532 req->rq_oi.oi_md->lsm_stripe_count = 0;
534 lov_set_add_req(req, set);
537 GOTO(out_set, rc = -EIO);
541 lov_fini_match_set(set, mode, 0);
545 int lov_fini_cancel_set(struct lov_request_set *set)
553 LASSERT(set->set_exp);
555 lov_llh_put(set->set_lockh);
562 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
563 struct lov_stripe_md *lsm, __u32 mode,
564 struct lustre_handle *lockh,
565 struct lov_request_set **reqset)
567 struct lov_request_set *set;
571 OBD_ALLOC(set, sizeof(*set));
578 set->set_oi->oi_md = lsm;
579 set->set_lockh = lov_handle2llh(lockh);
580 if (set->set_lockh == NULL) {
581 CERROR("LOV: invalid lov lock handle %p\n", lockh);
582 GOTO(out_set, rc = -EINVAL);
584 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
586 for (i = 0; i < lsm->lsm_stripe_count; i++){
587 struct lov_request *req;
588 struct lustre_handle *lov_lockhp;
589 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
591 lov_lockhp = set->set_lockh->llh_handles + i;
592 if (!lustre_handle_is_used(lov_lockhp)) {
593 CDEBUG(D_INFO, "lov idx %d subobj "DOSTID" no lock\n",
594 loi->loi_ost_idx, POSTID(&loi->loi_oi));
598 OBD_ALLOC(req, sizeof(*req));
600 GOTO(out_set, rc = -ENOMEM);
602 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
603 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
604 if (req->rq_oi.oi_md == NULL) {
605 OBD_FREE(req, sizeof(*req));
606 GOTO(out_set, rc = -ENOMEM);
609 req->rq_idx = loi->loi_ost_idx;
612 /* XXX LOV STACKING: submd should be from the subobj */
613 req->rq_oi.oi_md->lsm_oi = loi->loi_oi;
614 req->rq_oi.oi_md->lsm_stripe_count = 0;
616 lov_set_add_req(req, set);
619 GOTO(out_set, rc = -EIO);
623 lov_fini_cancel_set(set);
626 static int common_attr_done(struct lov_request_set *set)
629 struct lov_request *req;
631 int rc = 0, attrset = 0;
634 LASSERT(set->set_oi != NULL);
636 if (set->set_oi->oi_oa == NULL)
639 if (!cfs_atomic_read(&set->set_success))
644 GOTO(out, rc = -ENOMEM);
646 cfs_list_for_each (pos, &set->set_list) {
647 req = cfs_list_entry(pos, struct lov_request, rq_link);
649 if (!req->rq_complete || req->rq_rc)
651 if (req->rq_oi.oi_oa->o_valid == 0) /* inactive stripe */
653 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
654 req->rq_oi.oi_oa->o_valid,
655 set->set_oi->oi_md, req->rq_stripe, &attrset);
658 CERROR("No stripes had valid attrs\n");
661 if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
662 (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
663 /* When we take attributes of some epoch, we require all the
664 * ost to be active. */
665 CERROR("Not all the stripes had valid attrs\n");
666 GOTO(out, rc = -EIO);
669 tmp_oa->o_oi = set->set_oi->oi_oa->o_oi;
670 memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
678 static int brw_done(struct lov_request_set *set)
680 struct lov_stripe_md *lsm = set->set_oi->oi_md;
681 struct lov_oinfo *loi = NULL;
683 struct lov_request *req;
686 cfs_list_for_each (pos, &set->set_list) {
687 req = cfs_list_entry(pos, struct lov_request, rq_link);
689 if (!req->rq_complete || req->rq_rc)
692 loi = lsm->lsm_oinfo[req->rq_stripe];
694 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
695 loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
701 int lov_fini_brw_set(struct lov_request_set *set)
708 LASSERT(set->set_exp);
709 if (cfs_atomic_read(&set->set_completes)) {
711 /* FIXME update qos data here */
718 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
719 obd_count oa_bufs, struct brw_page *pga,
720 struct obd_trans_info *oti,
721 struct lov_request_set **reqset)
728 struct lov_request_set *set;
729 struct lov_obd *lov = &exp->exp_obd->u.lov;
730 int rc = 0, i, shift;
733 OBD_ALLOC(set, sizeof(*set));
741 set->set_oabufs = oa_bufs;
742 OBD_ALLOC_LARGE(set->set_pga, oa_bufs * sizeof(*set->set_pga));
744 GOTO(out, rc = -ENOMEM);
746 OBD_ALLOC_LARGE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
748 GOTO(out, rc = -ENOMEM);
750 /* calculate the page count for each stripe */
751 for (i = 0; i < oa_bufs; i++) {
752 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
753 info[stripe].count++;
756 /* alloc and initialize lov request */
758 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++){
759 struct lov_oinfo *loi = NULL;
760 struct lov_request *req;
762 if (info[i].count == 0)
765 loi = oinfo->oi_md->lsm_oinfo[i];
766 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
767 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
768 GOTO(out, rc = -EIO);
771 OBD_ALLOC(req, sizeof(*req));
773 GOTO(out, rc = -ENOMEM);
775 OBDO_ALLOC(req->rq_oi.oi_oa);
776 if (req->rq_oi.oi_oa == NULL) {
777 OBD_FREE(req, sizeof(*req));
778 GOTO(out, rc = -ENOMEM);
782 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
783 sizeof(*req->rq_oi.oi_oa));
785 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
786 req->rq_oi.oi_oa->o_stripe_idx = i;
788 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
789 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
790 if (req->rq_oi.oi_md == NULL) {
791 OBDO_FREE(req->rq_oi.oi_oa);
792 OBD_FREE(req, sizeof(*req));
793 GOTO(out, rc = -ENOMEM);
796 req->rq_idx = loi->loi_ost_idx;
799 /* XXX LOV STACKING */
800 req->rq_oi.oi_md->lsm_oi = loi->loi_oi;
801 req->rq_oabufs = info[i].count;
802 req->rq_pgaidx = shift;
803 shift += req->rq_oabufs;
805 /* remember the index for sort brw_page array */
806 info[i].index = req->rq_pgaidx;
808 req->rq_oi.oi_capa = oinfo->oi_capa;
810 lov_set_add_req(req, set);
813 GOTO(out, rc = -EIO);
815 /* rotate & sort the brw_page array */
816 for (i = 0; i < oa_bufs; i++) {
817 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
819 shift = info[stripe].index + info[stripe].off;
820 LASSERT(shift < oa_bufs);
821 set->set_pga[shift] = pga[i];
822 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
823 &set->set_pga[shift].off);
829 sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
834 lov_fini_brw_set(set);
839 int lov_fini_getattr_set(struct lov_request_set *set)
846 LASSERT(set->set_exp);
847 if (cfs_atomic_read(&set->set_completes))
848 rc = common_attr_done(set);
855 /* The callback for osc_getattr_async that finilizes a request info when a
856 * response is received. */
857 static int cb_getattr_update(void *cookie, int rc)
859 struct obd_info *oinfo = cookie;
860 struct lov_request *lovreq;
861 lovreq = container_of(oinfo, struct lov_request, rq_oi);
862 return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
865 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
866 struct lov_request_set **reqset)
868 struct lov_request_set *set;
869 struct lov_obd *lov = &exp->exp_obd->u.lov;
873 OBD_ALLOC(set, sizeof(*set));
881 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
882 struct lov_oinfo *loi;
883 struct lov_request *req;
885 loi = oinfo->oi_md->lsm_oinfo[i];
886 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
887 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
888 if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH)
889 /* SOM requires all the OSTs to be active. */
890 GOTO(out_set, rc = -EIO);
894 OBD_ALLOC(req, sizeof(*req));
896 GOTO(out_set, rc = -ENOMEM);
899 req->rq_idx = loi->loi_ost_idx;
901 OBDO_ALLOC(req->rq_oi.oi_oa);
902 if (req->rq_oi.oi_oa == NULL) {
903 OBD_FREE(req, sizeof(*req));
904 GOTO(out_set, rc = -ENOMEM);
906 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
907 sizeof(*req->rq_oi.oi_oa));
908 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
909 req->rq_oi.oi_cb_up = cb_getattr_update;
910 req->rq_oi.oi_capa = oinfo->oi_capa;
912 lov_set_add_req(req, set);
915 GOTO(out_set, rc = -EIO);
919 lov_fini_getattr_set(set);
923 int lov_fini_destroy_set(struct lov_request_set *set)
929 LASSERT(set->set_exp);
930 if (cfs_atomic_read(&set->set_completes)) {
931 /* FIXME update qos data here */
939 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
940 struct obdo *src_oa, struct lov_stripe_md *lsm,
941 struct obd_trans_info *oti,
942 struct lov_request_set **reqset)
944 struct lov_request_set *set;
945 struct lov_obd *lov = &exp->exp_obd->u.lov;
949 OBD_ALLOC(set, sizeof(*set));
956 set->set_oi->oi_md = lsm;
957 set->set_oi->oi_oa = src_oa;
959 if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
960 set->set_cookies = oti->oti_logcookies;
962 for (i = 0; i < lsm->lsm_stripe_count; i++) {
963 struct lov_oinfo *loi;
964 struct lov_request *req;
966 loi = lsm->lsm_oinfo[i];
967 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
968 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
972 OBD_ALLOC(req, sizeof(*req));
974 GOTO(out_set, rc = -ENOMEM);
977 req->rq_idx = loi->loi_ost_idx;
979 OBDO_ALLOC(req->rq_oi.oi_oa);
980 if (req->rq_oi.oi_oa == NULL) {
981 OBD_FREE(req, sizeof(*req));
982 GOTO(out_set, rc = -ENOMEM);
984 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
985 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
986 lov_set_add_req(req, set);
989 GOTO(out_set, rc = -EIO);
993 lov_fini_destroy_set(set);
997 int lov_fini_setattr_set(struct lov_request_set *set)
1004 LASSERT(set->set_exp);
1005 if (cfs_atomic_read(&set->set_completes)) {
1006 rc = common_attr_done(set);
1007 /* FIXME update qos data here */
1010 lov_put_reqset(set);
1014 int lov_update_setattr_set(struct lov_request_set *set,
1015 struct lov_request *req, int rc)
1017 struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1018 struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1021 lov_update_set(set, req, rc);
1023 /* grace error on inactive ost */
1024 if (rc && !(lov->lov_tgts[req->rq_idx] &&
1025 lov->lov_tgts[req->rq_idx]->ltd_active))
1029 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1030 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1031 req->rq_oi.oi_oa->o_ctime;
1032 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1033 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1034 req->rq_oi.oi_oa->o_mtime;
1035 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1036 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1037 req->rq_oi.oi_oa->o_atime;
1043 /* The callback for osc_setattr_async that finilizes a request info when a
1044 * response is received. */
1045 static int cb_setattr_update(void *cookie, int rc)
1047 struct obd_info *oinfo = cookie;
1048 struct lov_request *lovreq;
1049 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1050 return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1053 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1054 struct obd_trans_info *oti,
1055 struct lov_request_set **reqset)
1057 struct lov_request_set *set;
1058 struct lov_obd *lov = &exp->exp_obd->u.lov;
1062 OBD_ALLOC(set, sizeof(*set));
1069 set->set_oi = oinfo;
1070 if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1071 set->set_cookies = oti->oti_logcookies;
1073 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1074 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1075 struct lov_request *req;
1077 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
1078 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1082 OBD_ALLOC(req, sizeof(*req));
1084 GOTO(out_set, rc = -ENOMEM);
1086 req->rq_idx = loi->loi_ost_idx;
1088 OBDO_ALLOC(req->rq_oi.oi_oa);
1089 if (req->rq_oi.oi_oa == NULL) {
1090 OBD_FREE(req, sizeof(*req));
1091 GOTO(out_set, rc = -ENOMEM);
1093 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1094 sizeof(*req->rq_oi.oi_oa));
1095 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
1096 req->rq_oi.oi_oa->o_stripe_idx = i;
1097 req->rq_oi.oi_cb_up = cb_setattr_update;
1098 req->rq_oi.oi_capa = oinfo->oi_capa;
1100 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1101 int off = lov_stripe_offset(oinfo->oi_md,
1102 oinfo->oi_oa->o_size, i,
1103 &req->rq_oi.oi_oa->o_size);
1105 if (off < 0 && req->rq_oi.oi_oa->o_size)
1106 req->rq_oi.oi_oa->o_size--;
1108 CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1109 i, req->rq_oi.oi_oa->o_size,
1110 oinfo->oi_oa->o_size);
1112 lov_set_add_req(req, set);
1114 if (!set->set_count)
1115 GOTO(out_set, rc = -EIO);
1119 lov_fini_setattr_set(set);
1123 int lov_fini_punch_set(struct lov_request_set *set)
1130 LASSERT(set->set_exp);
1131 if (cfs_atomic_read(&set->set_completes)) {
1133 /* FIXME update qos data here */
1134 if (cfs_atomic_read(&set->set_success))
1135 rc = common_attr_done(set);
1138 lov_put_reqset(set);
1143 int lov_update_punch_set(struct lov_request_set *set,
1144 struct lov_request *req, int rc)
1146 struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1147 struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1150 lov_update_set(set, req, rc);
1152 /* grace error on inactive ost */
1153 if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1157 lov_stripe_lock(lsm);
1158 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1159 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1160 req->rq_oi.oi_oa->o_blocks;
1163 lov_stripe_unlock(lsm);
1169 /* The callback for osc_punch that finilizes a request info when a response
1171 static int cb_update_punch(void *cookie, int rc)
1173 struct obd_info *oinfo = cookie;
1174 struct lov_request *lovreq;
1175 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1176 return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1179 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1180 struct obd_trans_info *oti,
1181 struct lov_request_set **reqset)
1183 struct lov_request_set *set;
1184 struct lov_obd *lov = &exp->exp_obd->u.lov;
1188 OBD_ALLOC(set, sizeof(*set));
1193 set->set_oi = oinfo;
1196 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1197 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1198 struct lov_request *req;
1201 if (!lov_stripe_intersects(oinfo->oi_md, i,
1202 oinfo->oi_policy.l_extent.start,
1203 oinfo->oi_policy.l_extent.end,
1207 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
1208 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1209 GOTO(out_set, rc = -EIO);
1212 OBD_ALLOC(req, sizeof(*req));
1214 GOTO(out_set, rc = -ENOMEM);
1216 req->rq_idx = loi->loi_ost_idx;
1218 OBDO_ALLOC(req->rq_oi.oi_oa);
1219 if (req->rq_oi.oi_oa == NULL) {
1220 OBD_FREE(req, sizeof(*req));
1221 GOTO(out_set, rc = -ENOMEM);
1223 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1224 sizeof(*req->rq_oi.oi_oa));
1225 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
1226 req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
1228 req->rq_oi.oi_oa->o_stripe_idx = i;
1229 req->rq_oi.oi_cb_up = cb_update_punch;
1231 req->rq_oi.oi_policy.l_extent.start = rs;
1232 req->rq_oi.oi_policy.l_extent.end = re;
1233 req->rq_oi.oi_policy.l_extent.gid = -1;
1235 req->rq_oi.oi_capa = oinfo->oi_capa;
1237 lov_set_add_req(req, set);
1239 if (!set->set_count)
1240 GOTO(out_set, rc = -EIO);
1244 lov_fini_punch_set(set);
1248 int lov_fini_sync_set(struct lov_request_set *set)
1255 LASSERT(set->set_exp);
1256 if (cfs_atomic_read(&set->set_completes)) {
1257 if (!cfs_atomic_read(&set->set_success))
1259 /* FIXME update qos data here */
1262 lov_put_reqset(set);
1267 /* The callback for osc_sync that finilizes a request info when a
1268 * response is recieved. */
1269 static int cb_sync_update(void *cookie, int rc)
1271 struct obd_info *oinfo = cookie;
1272 struct lov_request *lovreq;
1274 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1275 return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
1278 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1279 obd_off start, obd_off end,
1280 struct lov_request_set **reqset)
1282 struct lov_request_set *set;
1283 struct lov_obd *lov = &exp->exp_obd->u.lov;
1293 set->set_oi = oinfo;
1295 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1296 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1297 struct lov_request *req;
1300 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
1301 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1305 if (!lov_stripe_intersects(oinfo->oi_md, i, start, end, &rs,
1311 GOTO(out_set, rc = -ENOMEM);
1313 req->rq_idx = loi->loi_ost_idx;
1315 OBDO_ALLOC(req->rq_oi.oi_oa);
1316 if (req->rq_oi.oi_oa == NULL) {
1317 OBD_FREE(req, sizeof(*req));
1318 GOTO(out_set, rc = -ENOMEM);
1320 *req->rq_oi.oi_oa = *oinfo->oi_oa;
1321 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
1322 req->rq_oi.oi_oa->o_stripe_idx = i;
1324 req->rq_oi.oi_policy.l_extent.start = rs;
1325 req->rq_oi.oi_policy.l_extent.end = re;
1326 req->rq_oi.oi_policy.l_extent.gid = -1;
1327 req->rq_oi.oi_cb_up = cb_sync_update;
1329 lov_set_add_req(req, set);
1331 if (!set->set_count)
1332 GOTO(out_set, rc = -EIO);
1336 lov_fini_sync_set(set);
1340 #define LOV_U64_MAX ((__u64)~0ULL)
1341 #define LOV_SUM_MAX(tot, add) \
1343 if ((tot) + (add) < (tot)) \
1344 (tot) = LOV_U64_MAX; \
1349 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1354 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov,
1356 if (osfs->os_files != LOV_U64_MAX)
1357 lov_do_div64(osfs->os_files, expected_stripes);
1358 if (osfs->os_ffree != LOV_U64_MAX)
1359 lov_do_div64(osfs->os_ffree, expected_stripes);
1361 spin_lock(&obd->obd_osfs_lock);
1362 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1363 obd->obd_osfs_age = cfs_time_current_64();
1364 spin_unlock(&obd->obd_osfs_lock);
1371 int lov_fini_statfs_set(struct lov_request_set *set)
1379 if (cfs_atomic_read(&set->set_completes)) {
1380 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1381 cfs_atomic_read(&set->set_success));
1383 lov_put_reqset(set);
1387 void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
1390 int shift = 0, quit = 0;
1394 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1396 if (osfs->os_bsize != lov_sfs->os_bsize) {
1397 /* assume all block sizes are always powers of 2 */
1398 /* get the bits difference */
1399 tmp = osfs->os_bsize | lov_sfs->os_bsize;
1400 for (shift = 0; shift <= 64; ++shift) {
1412 if (osfs->os_bsize < lov_sfs->os_bsize) {
1413 osfs->os_bsize = lov_sfs->os_bsize;
1415 osfs->os_bfree >>= shift;
1416 osfs->os_bavail >>= shift;
1417 osfs->os_blocks >>= shift;
1418 } else if (shift != 0) {
1419 lov_sfs->os_bfree >>= shift;
1420 lov_sfs->os_bavail >>= shift;
1421 lov_sfs->os_blocks >>= shift;
1424 /* Sandia requested that df (and so, statfs) only
1425 returned minimal available space on
1426 a single OST, so people would be able to
1427 write this much data guaranteed. */
1428 if (osfs->os_bavail > lov_sfs->os_bavail) {
1429 /* Presumably if new bavail is smaller,
1430 new bfree is bigger as well */
1431 osfs->os_bfree = lov_sfs->os_bfree;
1432 osfs->os_bavail = lov_sfs->os_bavail;
1435 osfs->os_bfree += lov_sfs->os_bfree;
1436 osfs->os_bavail += lov_sfs->os_bavail;
1438 osfs->os_blocks += lov_sfs->os_blocks;
1439 /* XXX not sure about this one - depends on policy.
1440 * - could be minimum if we always stripe on all OBDs
1441 * (but that would be wrong for any other policy,
1442 * if one of the OBDs has no more objects left)
1443 * - could be sum if we stripe whole objects
1444 * - could be average, just to give a nice number
1446 * To give a "reasonable" (if not wholly accurate)
1447 * number, we divide the total number of free objects
1448 * by expected stripe count (watch out for overflow).
1450 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1451 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1455 /* The callback for osc_statfs_async that finilizes a request info when a
1456 * response is received. */
1457 static int cb_statfs_update(void *cookie, int rc)
1459 struct obd_info *oinfo = cookie;
1460 struct lov_request *lovreq;
1461 struct lov_request_set *set;
1462 struct obd_statfs *osfs, *lov_sfs;
1463 struct lov_obd *lov;
1464 struct lov_tgt_desc *tgt;
1465 struct obd_device *lovobd, *tgtobd;
1469 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1470 set = lovreq->rq_rqset;
1471 lovobd = set->set_obd;
1472 lov = &lovobd->u.lov;
1473 osfs = set->set_oi->oi_osfs;
1474 lov_sfs = oinfo->oi_osfs;
1475 success = cfs_atomic_read(&set->set_success);
1476 /* XXX: the same is done in lov_update_common_set, however
1477 lovset->set_exp is not initialized. */
1478 lov_update_set(set, lovreq, rc);
1483 tgt = lov->lov_tgts[lovreq->rq_idx];
1484 if (!tgt || !tgt->ltd_active)
1485 GOTO(out_update, rc);
1487 tgtobd = class_exp2obd(tgt->ltd_exp);
1488 spin_lock(&tgtobd->obd_osfs_lock);
1489 memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1490 if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
1491 tgtobd->obd_osfs_age = cfs_time_current_64();
1492 spin_unlock(&tgtobd->obd_osfs_lock);
1495 lov_update_statfs(osfs, lov_sfs, success);
1499 if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
1500 lov_set_finished(set, 0)) {
1501 lov_statfs_interpret(NULL, set, set->set_count !=
1502 cfs_atomic_read(&set->set_success));
1508 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1509 struct lov_request_set **reqset)
1511 struct lov_request_set *set;
1512 struct lov_obd *lov = &obd->u.lov;
1516 OBD_ALLOC(set, sizeof(*set));
1522 set->set_oi = oinfo;
1524 /* We only get block data from the OBD */
1525 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1526 struct lov_request *req;
1528 if (lov->lov_tgts[i] == NULL ||
1529 (oinfo->oi_flags & OBD_STATFS_NODELAY &&
1530 !lov->lov_tgts[i]->ltd_active)) {
1531 CDEBUG(D_HA, "lov idx %d inactive\n", i);
1535 if (!lov->lov_tgts[i]->ltd_active)
1536 lov_check_and_wait_active(lov, i);
1538 /* skip targets that have been explicitely disabled by the
1540 if (!lov->lov_tgts[i]->ltd_exp) {
1541 CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
1545 OBD_ALLOC(req, sizeof(*req));
1547 GOTO(out_set, rc = -ENOMEM);
1549 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1550 if (req->rq_oi.oi_osfs == NULL) {
1551 OBD_FREE(req, sizeof(*req));
1552 GOTO(out_set, rc = -ENOMEM);
1556 req->rq_oi.oi_cb_up = cb_statfs_update;
1557 req->rq_oi.oi_flags = oinfo->oi_flags;
1559 lov_set_add_req(req, set);
1561 if (!set->set_count)
1562 GOTO(out_set, rc = -EIO);
1566 lov_fini_statfs_set(set);