X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Flov%2Flov_request.c;h=ba95f06b03dc4b8d9cf67b5a71c143dc8ecde24d;hp=dcf581088c29e5799c8f2cac4b12149993228af2;hb=fbf5870b9848929d352460f1f005b79c0b5ccc5a;hpb=31863530d7617a45ec594dce0f1e8285bbea9970 diff --git a/lustre/lov/lov_request.c b/lustre/lov/lov_request.c index dcf5810..ba95f06 100644 --- a/lustre/lov/lov_request.c +++ b/lustre/lov/lov_request.c @@ -1,22 +1,37 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * Copyright (C) 2002, 2003 Cluster File Systems, Inc. + * GPL HEADER START * - * This file is part of Lustre, http://www.lustre.org. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * - * Lustre is free software; you can redistribute it and/or - * modify it under the terms of version 2 of the GNU General Public - * License as published by the Free Software Foundation. + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. * - * Lustre is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). * - * You should have received a copy of the GNU General Public License - * along with Lustre; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Use is subject to license terms. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. */ #ifndef EXPORT_SYMTAB @@ -25,14 +40,14 @@ #define DEBUG_SUBSYSTEM S_LOV #ifdef __KERNEL__ -#include +#include #else #include #endif -#include -#include -#include +#include +#include +#include #include "lov_internal.h" @@ -41,7 +56,8 @@ static void lov_init_set(struct lov_request_set *set) set->set_count = 0; set->set_completes = 0; set->set_success = 0; - INIT_LIST_HEAD(&set->set_list); + set->set_cookies = 0; + CFS_INIT_LIST_HEAD(&set->set_list); atomic_set(&set->set_refcount, 1); } @@ -56,10 +72,13 @@ static void lov_finish_set(struct lov_request_set *set) rq_link); list_del_init(&req->rq_link); - if (req->rq_oa) - obdo_free(req->rq_oa); - if (req->rq_md) - OBD_FREE(req->rq_md, req->rq_buflen); + if (req->rq_oi.oi_oa) + OBDO_FREE(req->rq_oi.oi_oa); + if (req->rq_oi.oi_md) + OBD_FREE(req->rq_oi.oi_md, req->rq_buflen); + if (req->rq_oi.oi_osfs) + OBD_FREE(req->rq_oi.oi_osfs, + sizeof(*req->rq_oi.oi_osfs)); OBD_FREE(req, sizeof(*req)); } @@ -74,8 +93,8 @@ static void lov_finish_set(struct lov_request_set *set) EXIT; } -static void lov_update_set(struct lov_request_set *set, - struct lov_request *req, int rc) +void lov_update_set(struct lov_request_set *set, + struct lov_request *req, int rc) { req->rq_complete = 1; req->rq_rc = rc; @@ -94,7 +113,8 @@ int lov_update_common_set(struct lov_request_set *set, lov_update_set(set, req, rc); /* grace error on inactive ost */ - if (rc && !lov->tgts[req->rq_idx].active) + if (rc && !(lov->lov_tgts[req->rq_idx] && + lov->lov_tgts[req->rq_idx]->ltd_active)) rc = 0; /* FIXME in raid1 regime, should return 0 */ @@ -107,144 +127,143 @@ void lov_set_add_req(struct lov_request *req, struct lov_request_set *set) set->set_count++; } -int lov_update_enqueue_set(struct lov_request_set *set, - struct lov_request *req, int rc, int flags) +extern void osc_update_enqueue(struct lustre_handle *lov_lockhp, + struct lov_oinfo *loi, int flags, + struct ost_lvb *lvb, __u32 mode, int rc); + +static int lov_update_enqueue_lov(struct obd_export *exp, + struct lustre_handle *lov_lockhp, + struct lov_oinfo *loi, int flags, int idx, + __u64 oid, int rc) { + struct lov_obd *lov = &exp->exp_obd->u.lov; + + if (rc != ELDLM_OK && + !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) { + memset(lov_lockhp, 0, sizeof(*lov_lockhp)); + if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) { + /* -EUSERS used by OST to report file contention */ + if (rc != -EINTR && rc != -EUSERS) + CERROR("enqueue objid "LPX64" subobj " + LPX64" on OST idx %d: rc %d\n", + oid, loi->loi_id, loi->loi_ost_idx, rc); + } else + rc = ELDLM_OK; + } + return rc; +} + +int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc) +{ + struct lov_request_set *set = req->rq_rqset; struct lustre_handle *lov_lockhp; + struct obd_info *oi = set->set_oi; struct lov_oinfo *loi; ENTRY; + LASSERT(oi != NULL); + lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe; - loi = &set->set_md->lsm_oinfo[req->rq_stripe]; + loi = oi->oi_md->lsm_oinfo[req->rq_stripe]; - /* XXX FIXME: This unpleasantness doesn't belong here at *all*. - * It belongs in the OSC, except that the OSC doesn't have - * access to the real LOI -- it gets a copy, that we created - * above, and that copy can be arbitrarily out of date. + /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set + * and that copy can be arbitrarily out of date. * * The LOV API is due for a serious rewriting anyways, and this * can be addressed then. */ - if (rc == ELDLM_OK) { - struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp); - __u64 tmp = req->rq_md->lsm_oinfo->loi_rss; - - LASSERT(lock != NULL); - loi->loi_rss = tmp; - loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime; - loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks; - /* Extend KMS up to the end of this lock and no further - * A lock on [x,y] means a KMS of up to y + 1 bytes! */ - if (tmp > lock->l_policy_data.l_extent.end) - tmp = lock->l_policy_data.l_extent.end + 1; - if (tmp >= loi->loi_kms) { - CDEBUG(D_INODE, "lock acquired, setting rss=" - LPU64", kms="LPU64"\n", loi->loi_rss, tmp); - loi->loi_kms = tmp; - loi->loi_kms_valid = 1; - } else { - CDEBUG(D_INODE, "lock acquired, setting rss=" - LPU64"; leaving kms="LPU64", end="LPU64 - "\n", loi->loi_rss, loi->loi_kms, - lock->l_policy_data.l_extent.end); - } - ldlm_lock_allow_match(lock); - LDLM_LOCK_PUT(lock); - } else if (rc == ELDLM_LOCK_ABORTED && flags & LDLM_FL_HAS_INTENT) { - memset(lov_lockhp, 0, sizeof(*lov_lockhp)); - loi->loi_rss = req->rq_md->lsm_oinfo->loi_rss; - loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime; - loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks; - CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving" - " kms="LPU64"\n", loi->loi_rss, loi->loi_kms); - rc = ELDLM_OK; - } else { - struct obd_export *exp = set->set_exp; - struct lov_obd *lov = &exp->exp_obd->u.lov; - memset(lov_lockhp, 0, sizeof(*lov_lockhp)); - if (lov->tgts[req->rq_idx].active) { - CERROR("error: enqueue objid "LPX64" subobj " - LPX64" on OST idx %d: rc = %d\n", - set->set_md->lsm_object_id, loi->loi_id, - loi->loi_ost_idx, rc); - } else { - rc = ELDLM_OK; - } - } + lov_stripe_lock(oi->oi_md); + osc_update_enqueue(lov_lockhp, loi, oi->oi_flags, + &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc); + if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT)) + memset(lov_lockhp, 0, sizeof *lov_lockhp); + rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags, + req->rq_idx, oi->oi_md->lsm_object_id, rc); + lov_stripe_unlock(oi->oi_md); lov_update_set(set, req, rc); RETURN(rc); } -static int enqueue_done(struct lov_request_set *set, __u32 mode, int flags) +/* The callback for osc_enqueue that updates lov info for every OSC request. */ +static int cb_update_enqueue(void *cookie, int rc) +{ + struct obd_info *oinfo = cookie; + struct ldlm_enqueue_info *einfo; + struct lov_request *lovreq; + + lovreq = container_of(oinfo, struct lov_request, rq_oi); + einfo = lovreq->rq_rqset->set_ei; + return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc); +} + +static int enqueue_done(struct lov_request_set *set, __u32 mode) { - struct list_head *pos; struct lov_request *req; - struct lustre_handle *lov_lockhp = NULL; struct lov_obd *lov = &set->set_exp->exp_obd->u.lov; int rc = 0; ENTRY; - LASSERT(set->set_completes); /* enqueue/match success, just return */ - if (set->set_completes == set->set_success) { - if (flags & LDLM_FL_TEST_LOCK) - lov_llh_put(set->set_lockh); + if (set->set_completes && set->set_completes == set->set_success) RETURN(0); - } /* cancel enqueued/matched locks */ - list_for_each (pos, &set->set_list) { - req = list_entry(pos, struct lov_request, rq_link); + list_for_each_entry(req, &set->set_list, rq_link) { + struct lustre_handle *lov_lockhp; if (!req->rq_complete || req->rq_rc) continue; - if (flags & LDLM_FL_TEST_LOCK) - continue; lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe; LASSERT(lov_lockhp); - if (lov_lockhp->cookie == 0) + if (!lustre_handle_is_used(lov_lockhp)) continue; - rc = obd_cancel(lov->tgts[req->rq_idx].ltd_exp, req->rq_md, - mode, lov_lockhp); - if (rc && lov->tgts[req->rq_idx].active) + rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp, + req->rq_oi.oi_md, mode, lov_lockhp); + if (rc && lov->lov_tgts[req->rq_idx] && + lov->lov_tgts[req->rq_idx]->ltd_active) CERROR("cancelling obdjid "LPX64" on OST " "idx %d error: rc = %d\n", - req->rq_md->lsm_object_id, req->rq_idx, rc); + req->rq_oi.oi_md->lsm_object_id, + req->rq_idx, rc); } - lov_llh_put(set->set_lockh); + if (set->set_lockh) + lov_llh_put(set->set_lockh); RETURN(rc); } -int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode) +int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc, + struct ptlrpc_request_set *rqset) { - int rc = 0; + int ret = 0; ENTRY; - LASSERT(set->set_exp); if (set == NULL) RETURN(0); - if (set->set_completes) - rc = enqueue_done(set, mode, 0); - else + LASSERT(set->set_exp); + /* Do enqueue_done only for sync requests and if any request + * succeeded. */ + if (!rqset) { + if (rc) + set->set_completes = 0; + ret = enqueue_done(set, mode); + } else if (set->set_lockh) lov_llh_put(set->set_lockh); if (atomic_dec_and_test(&set->set_refcount)) lov_finish_set(set); - RETURN(rc); + RETURN(rc ? rc : ret); } -int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm, - ldlm_policy_data_t *policy, __u32 mode, - struct lustre_handle *lockh, +int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo, + struct ldlm_enqueue_info *einfo, struct lov_request_set **reqset) { struct lov_obd *lov = &exp->exp_obd->u.lov; struct lov_request_set *set; int i, rc = 0; - struct lov_oinfo *loi; ENTRY; OBD_ALLOC(set, sizeof(*set)); @@ -253,22 +272,27 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm, lov_init_set(set); set->set_exp = exp; - set->set_md = lsm; - set->set_lockh = lov_llh_new(lsm); + set->set_oi = oinfo; + set->set_ei = einfo; + set->set_lockh = lov_llh_new(oinfo->oi_md); if (set->set_lockh == NULL) GOTO(out_set, rc = -ENOMEM); - lockh->cookie = set->set_lockh->llh_handle.h_cookie; + oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie; - loi = lsm->lsm_oinfo; - for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) { + for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) { + struct lov_oinfo *loi; struct lov_request *req; obd_off start, end; - if (!lov_stripe_intersects(lsm, i, policy->l_extent.start, - policy->l_extent.end, &start, &end)) + loi = oinfo->oi_md->lsm_oinfo[i]; + if (!lov_stripe_intersects(oinfo->oi_md, i, + oinfo->oi_policy.l_extent.start, + oinfo->oi_policy.l_extent.end, + &start, &end)) continue; - if (lov->tgts[loi->loi_ost_idx].active == 0) { + if (!lov->lov_tgts[loi->loi_ost_idx] || + !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) { CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); continue; } @@ -277,27 +301,43 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm, if (req == NULL) GOTO(out_set, rc = -ENOMEM); - req->rq_buflen = sizeof(*req->rq_md) + + req->rq_buflen = sizeof(*req->rq_oi.oi_md) + + sizeof(struct lov_oinfo *) + sizeof(struct lov_oinfo); - OBD_ALLOC(req->rq_md, req->rq_buflen); - if (req->rq_md == NULL) + OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen); + if (req->rq_oi.oi_md == NULL) { + OBD_FREE(req, sizeof(*req)); GOTO(out_set, rc = -ENOMEM); + } + req->rq_oi.oi_md->lsm_oinfo[0] = + ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) + + sizeof(struct lov_oinfo *); + - req->rq_extent.start = start; - req->rq_extent.end = end; + req->rq_rqset = set; + /* Set lov request specific parameters. */ + req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i; + req->rq_oi.oi_cb_up = cb_update_enqueue; + req->rq_oi.oi_flags = oinfo->oi_flags; + + LASSERT(req->rq_oi.oi_lockh); + + req->rq_oi.oi_policy.l_extent.gid = + oinfo->oi_policy.l_extent.gid; + req->rq_oi.oi_policy.l_extent.start = start; + req->rq_oi.oi_policy.l_extent.end = end; req->rq_idx = loi->loi_ost_idx; req->rq_stripe = i; /* XXX LOV STACKING: submd should be from the subobj */ - req->rq_md->lsm_object_id = loi->loi_id; - req->rq_md->lsm_object_gr = lsm->lsm_object_gr; - req->rq_md->lsm_stripe_count = 0; - req->rq_md->lsm_oinfo->loi_kms_valid = loi->loi_kms_valid; - req->rq_md->lsm_oinfo->loi_rss = loi->loi_rss; - req->rq_md->lsm_oinfo->loi_kms = loi->loi_kms; - req->rq_md->lsm_oinfo->loi_blocks = loi->loi_blocks; - loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime; + req->rq_oi.oi_md->lsm_object_id = loi->loi_id; + req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr; + req->rq_oi.oi_md->lsm_stripe_count = 0; + req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid = + loi->loi_kms_valid; + req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms; + req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb; lov_set_add_req(req, set); } @@ -306,15 +346,21 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm, *reqset = set; RETURN(0); out_set: - lov_fini_enqueue_set(set, mode); + lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL); RETURN(rc); } int lov_update_match_set(struct lov_request_set *set, struct lov_request *req, int rc) { + int ret = rc; ENTRY; - lov_update_set(set, req, !rc); + + if (rc > 0) + ret = 0; + else if (rc == 0) + ret = 1; + lov_update_set(set, req, ret); RETURN(rc); } @@ -323,12 +369,12 @@ int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags) int rc = 0; ENTRY; - LASSERT(set->set_exp); if (set == NULL) RETURN(0); - if (set->set_completes) - rc = enqueue_done(set, mode, flags); - else + LASSERT(set->set_exp); + rc = enqueue_done(set, mode); + if ((set->set_count == set->set_success) && + (flags & LDLM_FL_TEST_LOCK)) lov_llh_put(set->set_lockh); if (atomic_dec_and_test(&set->set_refcount)) @@ -337,15 +383,14 @@ int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags) RETURN(rc); } -int lov_prep_match_set(struct obd_export *exp, struct lov_stripe_md *lsm, - ldlm_policy_data_t *policy, __u32 mode, - struct lustre_handle *lockh, +int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo, + struct lov_stripe_md *lsm, ldlm_policy_data_t *policy, + __u32 mode, struct lustre_handle *lockh, struct lov_request_set **reqset) { struct lov_obd *lov = &exp->exp_obd->u.lov; struct lov_request_set *set; int i, rc = 0; - struct lov_oinfo *loi; ENTRY; OBD_ALLOC(set, sizeof(*set)); @@ -354,23 +399,26 @@ int lov_prep_match_set(struct obd_export *exp, struct lov_stripe_md *lsm, lov_init_set(set); set->set_exp = exp; - set->set_md = lsm; + set->set_oi = oinfo; + set->set_oi->oi_md = lsm; set->set_lockh = lov_llh_new(lsm); if (set->set_lockh == NULL) GOTO(out_set, rc = -ENOMEM); lockh->cookie = set->set_lockh->llh_handle.h_cookie; - loi = lsm->lsm_oinfo; - for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) { + for (i = 0; i < lsm->lsm_stripe_count; i++){ + struct lov_oinfo *loi; struct lov_request *req; obd_off start, end; + loi = lsm->lsm_oinfo[i]; if (!lov_stripe_intersects(lsm, i, policy->l_extent.start, policy->l_extent.end, &start, &end)) continue; /* FIXME raid1 should grace this error */ - if (lov->tgts[loi->loi_ost_idx].active == 0) { + if (!lov->lov_tgts[loi->loi_ost_idx] || + !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) { CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); GOTO(out_set, rc = -EIO); } @@ -379,21 +427,25 @@ int lov_prep_match_set(struct obd_export *exp, struct lov_stripe_md *lsm, if (req == NULL) GOTO(out_set, rc = -ENOMEM); - req->rq_buflen = sizeof(*req->rq_md); - OBD_ALLOC(req->rq_md, req->rq_buflen); - if (req->rq_md == NULL) + req->rq_buflen = sizeof(*req->rq_oi.oi_md); + OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen); + if (req->rq_oi.oi_md == NULL) { + OBD_FREE(req, sizeof(*req)); GOTO(out_set, rc = -ENOMEM); + } - req->rq_extent.start = start; - req->rq_extent.end = end; + req->rq_oi.oi_policy.l_extent.start = start; + req->rq_oi.oi_policy.l_extent.end = end; + req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid; req->rq_idx = loi->loi_ost_idx; req->rq_stripe = i; /* XXX LOV STACKING: submd should be from the subobj */ - req->rq_md->lsm_object_id = loi->loi_id; - req->rq_md->lsm_object_gr = lsm->lsm_object_gr; - req->rq_md->lsm_stripe_count = 0; + req->rq_oi.oi_md->lsm_object_id = loi->loi_id; + req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr; + req->rq_oi.oi_md->lsm_stripe_count = 0; + lov_set_add_req(req, set); } if (!set->set_count) @@ -410,10 +462,10 @@ int lov_fini_cancel_set(struct lov_request_set *set) int rc = 0; ENTRY; - LASSERT(set->set_exp); if (set == NULL) RETURN(0); + LASSERT(set->set_exp); if (set->set_lockh) lov_llh_put(set->set_lockh); @@ -423,13 +475,13 @@ int lov_fini_cancel_set(struct lov_request_set *set) RETURN(rc); } -int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm, - __u32 mode, struct lustre_handle *lockh, +int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo, + struct lov_stripe_md *lsm, __u32 mode, + struct lustre_handle *lockh, struct lov_request_set **reqset) { struct lov_request_set *set; int i, rc = 0; - struct lov_oinfo *loi; ENTRY; OBD_ALLOC(set, sizeof(*set)); @@ -438,7 +490,8 @@ int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm, lov_init_set(set); set->set_exp = exp; - set->set_md = lsm; + set->set_oi = oinfo; + set->set_oi->oi_md = lsm; set->set_lockh = lov_handle2llh(lockh); if (set->set_lockh == NULL) { CERROR("LOV: invalid lov lock handle %p\n", lockh); @@ -446,14 +499,14 @@ int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm, } lockh->cookie = set->set_lockh->llh_handle.h_cookie; - loi = lsm->lsm_oinfo; - for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) { + for (i = 0; i < lsm->lsm_stripe_count; i++){ struct lov_request *req; struct lustre_handle *lov_lockhp; + struct lov_oinfo *loi = lsm->lsm_oinfo[i]; lov_lockhp = set->set_lockh->llh_handles + i; - if (lov_lockhp->cookie == 0) { - CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n", + if (!lustre_handle_is_used(lov_lockhp)) { + CDEBUG(D_RPCTRACE,"lov idx %d subobj "LPX64" no lock\n", loi->loi_ost_idx, loi->loi_id); continue; } @@ -462,18 +515,21 @@ int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm, if (req == NULL) GOTO(out_set, rc = -ENOMEM); - req->rq_buflen = sizeof(*req->rq_md); - OBD_ALLOC(req->rq_md, req->rq_buflen); - if (req->rq_md == NULL) + req->rq_buflen = sizeof(*req->rq_oi.oi_md); + OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen); + if (req->rq_oi.oi_md == NULL) { + OBD_FREE(req, sizeof(*req)); GOTO(out_set, rc = -ENOMEM); + } req->rq_idx = loi->loi_ost_idx; req->rq_stripe = i; /* XXX LOV STACKING: submd should be from the subobj */ - req->rq_md->lsm_object_id = loi->loi_id; - req->rq_md->lsm_object_gr = lsm->lsm_object_gr; - req->rq_md->lsm_stripe_count = 0; + req->rq_oi.oi_md->lsm_object_id = loi->loi_id; + req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr; + req->rq_oi.oi_md->lsm_stripe_count = 0; + lov_set_add_req(req, set); } if (!set->set_count) @@ -486,12 +542,11 @@ out_set: } static int create_done(struct obd_export *exp, struct lov_request_set *set, - struct lov_stripe_md **ea) + struct lov_stripe_md **lsmp) { struct lov_obd *lov = &exp->exp_obd->u.lov; struct obd_trans_info *oti = set->set_oti; - struct obdo *src_oa = set->set_oa; - struct list_head *pos; + struct obdo *src_oa = set->set_oi->oi_oa; struct lov_request *req; struct obdo *ret_oa = NULL; int attrset = 0, rc = 0; @@ -499,24 +554,47 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set, LASSERT(set->set_completes); - if (!set->set_success) - GOTO(cleanup, rc = -EIO); + /* try alloc objects on other osts if osc_create fails for + * exceptions: RPC failure, ENOSPC, etc */ + if (set->set_count != set->set_success) { + list_for_each_entry (req, &set->set_list, rq_link) { + if (req->rq_rc == 0) + continue; + + set->set_completes--; + req->rq_complete = 0; + + rc = qos_remedy_create(set, req); + lov_update_create_set(set, req, rc); + + if (rc) + break; + } + } - if (*ea == NULL && set->set_count != set->set_success) { + /* no successful creates */ + if (set->set_success == 0) + GOTO(cleanup, rc); + + /* If there was an explicit stripe set, fail. Otherwise, we + * got some objects and that's not bad. */ + if (set->set_count != set->set_success) { + if (*lsmp) + GOTO(cleanup, rc); set->set_count = set->set_success; qos_shrink_lsm(set); } - ret_oa = obdo_alloc(); + OBDO_ALLOC(ret_oa); if (ret_oa == NULL) GOTO(cleanup, rc = -ENOMEM); - list_for_each (pos, &set->set_list) { - req = list_entry(pos, struct lov_request, rq_link); + list_for_each_entry(req, &set->set_list, rq_link) { if (!req->rq_complete || req->rq_rc) continue; - lov_merge_attrs(ret_oa, req->rq_oa, req->rq_oa->o_valid, - set->set_md, req->rq_stripe, &attrset); + lov_merge_attrs(ret_oa, req->rq_oi.oi_oa, + req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md, + req->rq_stripe, &attrset); } if (src_oa->o_valid & OBD_MD_FLSIZE && ret_oa->o_size != src_oa->o_size) { @@ -526,33 +604,31 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set, } ret_oa->o_id = src_oa->o_id; ret_oa->o_gr = src_oa->o_gr; - ret_oa->o_valid |= OBD_MD_FLGROUP; + ret_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP; memcpy(src_oa, ret_oa, sizeof(*src_oa)); - obdo_free(ret_oa); + OBDO_FREE(ret_oa); - *ea = set->set_md; + *lsmp = set->set_oi->oi_md; GOTO(done, rc = 0); - EXIT; cleanup: - list_for_each (pos, &set->set_list) { + list_for_each_entry(req, &set->set_list, rq_link) { struct obd_export *sub_exp; int err = 0; - req = list_entry(pos, struct lov_request, rq_link); if (!req->rq_complete || req->rq_rc) continue; - sub_exp = lov->tgts[req->rq_idx].ltd_exp, - err = obd_destroy(sub_exp, req->rq_oa, NULL, oti); + sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp; + err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL); if (err) CERROR("Failed to uncreate objid "LPX64" subobj " LPX64" on OST idx %d: rc = %d\n", - set->set_oa->o_id, req->rq_oa->o_id, + src_oa->o_id, req->rq_oi.oi_oa->o_id, req->rq_idx, rc); } - if (*ea == NULL) - obd_free_memmd(exp, &set->set_md); + if (*lsmp == NULL) + obd_free_memmd(exp, &set->set_oi->oi_md); done: if (oti && set->set_cookies) { oti->oti_logcookies = set->set_cookies; @@ -563,21 +639,19 @@ done: src_oa->o_valid |= OBD_MD_FLCOOKIE; } } - return rc; + RETURN(rc); } -int lov_fini_create_set(struct lov_request_set *set, struct lov_stripe_md **ea) +int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp) { int rc = 0; ENTRY; - LASSERT(set->set_exp); if (set == NULL) RETURN(0); - if (set->set_completes) { - rc = create_done(set->set_exp, set, ea); - /* FIXME update qos data here */ - } + LASSERT(set->set_exp); + if (set->set_completes) + rc = create_done(set->set_exp, set, lsmp); if (atomic_dec_and_test(&set->set_refcount)) lov_finish_set(set); @@ -588,19 +662,20 @@ int lov_fini_create_set(struct lov_request_set *set, struct lov_stripe_md **ea) int lov_update_create_set(struct lov_request_set *set, struct lov_request *req, int rc) { - struct lov_obd *lov = &set->set_exp->exp_obd->u.lov; struct obd_trans_info *oti = set->set_oti; - struct lov_stripe_md *lsm = set->set_md; + struct lov_stripe_md *lsm = set->set_oi->oi_md; struct lov_oinfo *loi; + struct lov_obd *lov = &set->set_exp->exp_obd->u.lov; ENTRY; req->rq_stripe = set->set_success; - loi = &lsm->lsm_oinfo[req->rq_stripe]; + loi = lsm->lsm_oinfo[req->rq_stripe]; - if (rc && lov->tgts[req->rq_idx].active) { - CERROR("error creating objid "LPX64" sub-object" + if (rc && lov->lov_tgts[req->rq_idx] && + lov->lov_tgts[req->rq_idx]->ltd_active) { + CERROR("error creating fid "LPX64" sub-object" " on OST idx %d/%d: rc = %d\n", - set->set_oa->o_id, req->rq_idx, + set->set_oi->oi_oa->o_id, req->rq_idx, lsm->lsm_stripe_count, rc); if (rc > 0) { CERROR("obd_create returned invalid err %d\n", rc); @@ -611,31 +686,28 @@ int lov_update_create_set(struct lov_request_set *set, if (rc) RETURN(rc); - if (oti && oti->oti_objid) - oti->oti_objid[req->rq_idx] = req->rq_oa->o_id; - - loi->loi_id = req->rq_oa->o_id; - loi->loi_gr = req->rq_oa->o_gr; + loi->loi_id = req->rq_oi.oi_oa->o_id; + loi->loi_gr = req->rq_oi.oi_oa->o_gr; loi->loi_ost_idx = req->rq_idx; - CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPX64" at idx %d\n", + CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n", lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx); loi_init(loi); - if (set->set_cookies) + if (oti && set->set_cookies) ++oti->oti_logcookies; - if (req->rq_oa->o_valid & OBD_MD_FLCOOKIE) + if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE) set->set_cookie_sent++; RETURN(0); } -int lov_prep_create_set(struct obd_export *exp, struct lov_stripe_md **ea, - struct obdo *src_oa, struct obd_trans_info *oti, +int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo, + struct lov_stripe_md **lsmp, struct obdo *src_oa, + struct obd_trans_info *oti, struct lov_request_set **reqset) { - struct lov_obd *lov = &exp->exp_obd->u.lov; struct lov_request_set *set; - int rc = 0, newea = 0; + int rc = 0; ENTRY; OBD_ALLOC(set, sizeof(*set)); @@ -644,54 +716,16 @@ int lov_prep_create_set(struct obd_export *exp, struct lov_stripe_md **ea, lov_init_set(set); set->set_exp = exp; - set->set_md = *ea; - set->set_oa = src_oa; + set->set_oi = oinfo; + set->set_oi->oi_md = *lsmp; + set->set_oi->oi_oa = src_oa; set->set_oti = oti; - if (set->set_md == NULL) { - int stripes, stripe_cnt; - stripe_cnt = lov_get_stripecnt(lov, 0); - - /* If the MDS file was truncated up to some size, stripe over - * enough OSTs to allow the file to be created at that size. */ - if (src_oa->o_valid & OBD_MD_FLSIZE) { - stripes = ((src_oa->o_size + LUSTRE_STRIPE_MAXBYTES) >> 12) - 1; - do_div(stripes, (__u32)(LUSTRE_STRIPE_MAXBYTES >> 12)); - - if (stripes > lov->desc.ld_active_tgt_count) - GOTO(out_set, rc = -EFBIG); - if (stripes < stripe_cnt) - stripes = stripe_cnt; - } else { - stripes = stripe_cnt; - } - - rc = lov_alloc_memmd(&set->set_md, stripes, - lov->desc.ld_pattern ? - lov->desc.ld_pattern : LOV_PATTERN_RAID0); - if (rc < 0) - goto out_set; - newea = 1; - } - - rc = qos_prep_create(lov, set, newea); + rc = qos_prep_create(exp, set); if (rc) - goto out_lsm; - - if (oti && (src_oa->o_valid & OBD_MD_FLCOOKIE)) { - oti_alloc_cookies(oti, set->set_count); - if (!oti->oti_logcookies) - goto out_lsm; - set->set_cookies = oti->oti_logcookies; - } - *reqset = set; - RETURN(rc); - -out_lsm: - if (*ea == NULL) - obd_free_memmd(exp, &set->set_md); -out_set: - lov_fini_create_set(set, ea); + lov_fini_create_set(set, lsmp); + else + *reqset = set; RETURN(rc); } @@ -703,13 +737,15 @@ static int common_attr_done(struct lov_request_set *set) int rc = 0, attrset = 0; ENTRY; - if (set->set_oa == NULL) + LASSERT(set->set_oi != NULL); + + if (set->set_oi->oi_oa == NULL) RETURN(0); if (!set->set_success) RETURN(-EIO); - tmp_oa = obdo_alloc(); + OBDO_ALLOC(tmp_oa); if (tmp_oa == NULL) GOTO(out, rc = -ENOMEM); @@ -718,44 +754,45 @@ static int common_attr_done(struct lov_request_set *set) if (!req->rq_complete || req->rq_rc) continue; - if (req->rq_oa->o_valid == 0) /* inactive stripe */ + if (req->rq_oi.oi_oa->o_valid == 0) /* inactive stripe */ continue; - lov_merge_attrs(tmp_oa, req->rq_oa, req->rq_oa->o_valid, - set->set_md, req->rq_stripe, &attrset); + lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa, + req->rq_oi.oi_oa->o_valid, + set->set_oi->oi_md, req->rq_stripe, &attrset); } if (!attrset) { CERROR("No stripes had valid attrs\n"); rc = -EIO; } - tmp_oa->o_id = set->set_oa->o_id; - memcpy(set->set_oa, tmp_oa, sizeof(*set->set_oa)); + tmp_oa->o_id = set->set_oi->oi_oa->o_id; + memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa)); out: if (tmp_oa) - obdo_free(tmp_oa); + OBDO_FREE(tmp_oa); RETURN(rc); } static int brw_done(struct lov_request_set *set) { - struct lov_stripe_md *lsm = set->set_md; + struct lov_stripe_md *lsm = set->set_oi->oi_md; struct lov_oinfo *loi = NULL; struct list_head *pos; struct lov_request *req; ENTRY; - + list_for_each (pos, &set->set_list) { req = list_entry(pos, struct lov_request, rq_link); - + if (!req->rq_complete || req->rq_rc) continue; - - loi = &lsm->lsm_oinfo[req->rq_stripe]; - - if (req->rq_oa->o_valid & OBD_MD_FLBLOCKS) - loi->loi_blocks = req->rq_oa->o_blocks; + + loi = lsm->lsm_oinfo[req->rq_stripe]; + + if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) + loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks; } - + RETURN(0); } @@ -764,9 +801,9 @@ int lov_fini_brw_set(struct lov_request_set *set) int rc = 0; ENTRY; - LASSERT(set->set_exp); if (set == NULL) RETURN(0); + LASSERT(set->set_exp); if (set->set_completes) { rc = brw_done(set); /* FIXME update qos data here */ @@ -777,9 +814,9 @@ int lov_fini_brw_set(struct lov_request_set *set) RETURN(rc); } -int lov_prep_brw_set(struct obd_export *exp, struct obdo *src_oa, - struct lov_stripe_md *lsm, obd_count oa_bufs, - struct brw_page *pga, struct obd_trans_info *oti, +int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo, + obd_count oa_bufs, struct brw_page *pga, + struct obd_trans_info *oti, struct lov_request_set **reqset) { struct { @@ -788,7 +825,6 @@ int lov_prep_brw_set(struct obd_export *exp, struct obdo *src_oa, obd_count off; } *info = NULL; struct lov_request_set *set; - struct lov_oinfo *loi = NULL; struct lov_obd *lov = &exp->exp_obd->u.lov; int rc = 0, i, shift; ENTRY; @@ -799,34 +835,35 @@ int lov_prep_brw_set(struct obd_export *exp, struct obdo *src_oa, lov_init_set(set); set->set_exp = exp; - set->set_md = lsm; - set->set_oa = src_oa; set->set_oti = oti; + set->set_oi = oinfo; set->set_oabufs = oa_bufs; OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga)); if (!set->set_pga) GOTO(out, rc = -ENOMEM); - OBD_ALLOC(info, sizeof(*info) * lsm->lsm_stripe_count); + OBD_ALLOC(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count); if (!info) GOTO(out, rc = -ENOMEM); /* calculate the page count for each stripe */ for (i = 0; i < oa_bufs; i++) { - int stripe = lov_stripe_number(lsm, pga[i].disk_offset); + int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off); info[stripe].count++; } /* alloc and initialize lov request */ - loi = lsm->lsm_oinfo; shift = 0; - for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) { + for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++){ + struct lov_oinfo *loi = NULL; struct lov_request *req; if (info[i].count == 0) continue; - if (lov->tgts[loi->loi_ost_idx].active == 0) { + loi = oinfo->oi_md->lsm_oinfo[i]; + if (!lov->lov_tgts[loi->loi_ost_idx] || + !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) { CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); GOTO(out, rc = -EIO); } @@ -835,30 +872,42 @@ int lov_prep_brw_set(struct obd_export *exp, struct obdo *src_oa, if (req == NULL) GOTO(out, rc = -ENOMEM); - req->rq_oa = obdo_alloc(); - if (req->rq_oa == NULL) + OBDO_ALLOC(req->rq_oi.oi_oa); + if (req->rq_oi.oi_oa == NULL) { + OBD_FREE(req, sizeof(*req)); GOTO(out, rc = -ENOMEM); + } - if (src_oa) - memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa)); - req->rq_oa->o_id = loi->loi_id; - req->rq_buflen = sizeof(*req->rq_md); - OBD_ALLOC(req->rq_md, req->rq_buflen); - if (req->rq_md == NULL) + if (oinfo->oi_oa) { + memcpy(req->rq_oi.oi_oa, oinfo->oi_oa, + sizeof(*req->rq_oi.oi_oa)); + } + req->rq_oi.oi_oa->o_id = loi->loi_id; + req->rq_oi.oi_oa->o_stripe_idx = i; + + req->rq_buflen = sizeof(*req->rq_oi.oi_md); + OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen); + if (req->rq_oi.oi_md == NULL) { + OBDO_FREE(req->rq_oi.oi_oa); + OBD_FREE(req, sizeof(*req)); GOTO(out, rc = -ENOMEM); + } req->rq_idx = loi->loi_ost_idx; req->rq_stripe = i; /* XXX LOV STACKING */ - req->rq_md->lsm_object_id = loi->loi_id; - req->rq_md->lsm_object_gr = lsm->lsm_object_gr; + req->rq_oi.oi_md->lsm_object_id = loi->loi_id; + req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr; req->rq_oabufs = info[i].count; req->rq_pgaidx = shift; shift += req->rq_oabufs; /* remember the index for sort brw_page array */ info[i].index = req->rq_pgaidx; + + req->rq_oi.oi_capa = oinfo->oi_capa; + lov_set_add_req(req, set); } if (!set->set_count) @@ -866,18 +915,18 @@ int lov_prep_brw_set(struct obd_export *exp, struct obdo *src_oa, /* rotate & sort the brw_page array */ for (i = 0; i < oa_bufs; i++) { - int stripe = lov_stripe_number(lsm, pga[i].disk_offset); + int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off); shift = info[stripe].index + info[stripe].off; LASSERT(shift < oa_bufs); set->set_pga[shift] = pga[i]; - lov_stripe_offset(lsm, pga[i].disk_offset, stripe, - &set->set_pga[shift].disk_offset); + lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe, + &set->set_pga[shift].off); info[stripe].off++; } out: if (info) - OBD_FREE(info, sizeof(*info) * lsm->lsm_stripe_count); + OBD_FREE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count); if (rc == 0) *reqset = set; @@ -887,21 +936,16 @@ out: RETURN(rc); } -static int getattr_done(struct lov_request_set *set) -{ - return common_attr_done(set); -} - int lov_fini_getattr_set(struct lov_request_set *set) { int rc = 0; ENTRY; - LASSERT(set->set_exp); if (set == NULL) RETURN(0); + LASSERT(set->set_exp); if (set->set_completes) - rc = getattr_done(set); + rc = common_attr_done(set); if (atomic_dec_and_test(&set->set_refcount)) lov_finish_set(set); @@ -909,12 +953,20 @@ int lov_fini_getattr_set(struct lov_request_set *set) RETURN(rc); } -int lov_prep_getattr_set(struct obd_export *exp, struct obdo *src_oa, - struct lov_stripe_md *lsm, +/* The callback for osc_getattr_async that finilizes a request info when a + * response is recieved. */ +static int cb_getattr_update(void *cookie, int rc) +{ + struct obd_info *oinfo = cookie; + struct lov_request *lovreq; + lovreq = container_of(oinfo, struct lov_request, rq_oi); + return lov_update_common_set(lovreq->rq_rqset, lovreq, rc); +} + +int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo, struct lov_request_set **reqset) { struct lov_request_set *set; - struct lov_oinfo *loi = NULL; struct lov_obd *lov = &exp->exp_obd->u.lov; int rc = 0, i; ENTRY; @@ -925,14 +977,15 @@ int lov_prep_getattr_set(struct obd_export *exp, struct obdo *src_oa, lov_init_set(set); set->set_exp = exp; - set->set_md = lsm; - set->set_oa = src_oa; + set->set_oi = oinfo; - loi = lsm->lsm_oinfo; - for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) { + for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) { + struct lov_oinfo *loi; struct lov_request *req; - if (lov->tgts[loi->loi_ost_idx].active == 0) { + loi = oinfo->oi_md->lsm_oinfo[i]; + if (!lov->lov_tgts[loi->loi_ost_idx] || + !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) { CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); continue; } @@ -944,11 +997,18 @@ int lov_prep_getattr_set(struct obd_export *exp, struct obdo *src_oa, req->rq_stripe = i; req->rq_idx = loi->loi_ost_idx; - req->rq_oa = obdo_alloc(); - if (req->rq_oa == NULL) + OBDO_ALLOC(req->rq_oi.oi_oa); + if (req->rq_oi.oi_oa == NULL) { + OBD_FREE(req, sizeof(*req)); GOTO(out_set, rc = -ENOMEM); - memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa)); - req->rq_oa->o_id = loi->loi_id; + } + memcpy(req->rq_oi.oi_oa, oinfo->oi_oa, + sizeof(*req->rq_oi.oi_oa)); + req->rq_oi.oi_oa->o_id = loi->loi_id; + req->rq_oi.oi_cb_up = cb_getattr_update; + req->rq_oi.oi_capa = oinfo->oi_capa; + req->rq_rqset = set; + lov_set_add_req(req, set); } if (!set->set_count) @@ -964,9 +1024,9 @@ int lov_fini_destroy_set(struct lov_request_set *set) { ENTRY; - LASSERT(set->set_exp); if (set == NULL) RETURN(0); + LASSERT(set->set_exp); if (set->set_completes) { /* FIXME update qos data here */ } @@ -977,15 +1037,14 @@ int lov_fini_destroy_set(struct lov_request_set *set) RETURN(0); } -int lov_prep_destroy_set(struct obd_export *exp, struct obdo *src_oa, - struct lov_stripe_md *lsm, +int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo, + struct obdo *src_oa, struct lov_stripe_md *lsm, struct obd_trans_info *oti, struct lov_request_set **reqset) { struct lov_request_set *set; - struct lov_oinfo *loi = NULL; struct lov_obd *lov = &exp->exp_obd->u.lov; - int rc = 0, cookie_set = 0, i; + int rc = 0, i; ENTRY; OBD_ALLOC(set, sizeof(*set)); @@ -994,17 +1053,20 @@ int lov_prep_destroy_set(struct obd_export *exp, struct obdo *src_oa, lov_init_set(set); set->set_exp = exp; - set->set_md = lsm; - set->set_oa = src_oa; + set->set_oi = oinfo; + set->set_oi->oi_md = lsm; + set->set_oi->oi_oa = src_oa; set->set_oti = oti; if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE) set->set_cookies = oti->oti_logcookies; - loi = lsm->lsm_oinfo; - for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) { + for (i = 0; i < lsm->lsm_stripe_count; i++) { + struct lov_oinfo *loi; struct lov_request *req; - if (lov->tgts[loi->loi_ost_idx].active == 0) { + loi = lsm->lsm_oinfo[i]; + if (!lov->lov_tgts[loi->loi_ost_idx] || + !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) { CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); continue; } @@ -1016,17 +1078,13 @@ int lov_prep_destroy_set(struct obd_export *exp, struct obdo *src_oa, req->rq_stripe = i; req->rq_idx = loi->loi_ost_idx; - req->rq_oa = obdo_alloc(); - if (req->rq_oa == NULL) + OBDO_ALLOC(req->rq_oi.oi_oa); + if (req->rq_oi.oi_oa == NULL) { + OBD_FREE(req, sizeof(*req)); GOTO(out_set, rc = -ENOMEM); - memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa)); - req->rq_oa->o_id = loi->loi_id; - - /* Setup the first request's cookie position */ - if (!cookie_set && set->set_cookies) { - oti->oti_logcookies = set->set_cookies + i; - cookie_set = 1; } + memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa)); + req->rq_oi.oi_oa->o_id = loi->loi_id; lov_set_add_req(req, set); } if (!set->set_count) @@ -1038,21 +1096,16 @@ out_set: RETURN(rc); } -static int setattr_done(struct lov_request_set *set) -{ - return common_attr_done(set); -} - int lov_fini_setattr_set(struct lov_request_set *set) { int rc = 0; ENTRY; - LASSERT(set->set_exp); if (set == NULL) RETURN(0); + LASSERT(set->set_exp); if (set->set_completes) { - rc = setattr_done(set); + rc = common_attr_done(set); /* FIXME update qos data here */ } @@ -1061,12 +1114,50 @@ int lov_fini_setattr_set(struct lov_request_set *set) RETURN(rc); } -int lov_prep_setattr_set(struct obd_export *exp, struct obdo *src_oa, - struct lov_stripe_md *lsm, struct obd_trans_info *oti, +int lov_update_setattr_set(struct lov_request_set *set, + struct lov_request *req, int rc) +{ + struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov; + struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md; + ENTRY; + + lov_update_set(set, req, rc); + + /* grace error on inactive ost */ + if (rc && !(lov->lov_tgts[req->rq_idx] && + lov->lov_tgts[req->rq_idx]->ltd_active)) + rc = 0; + + if (rc == 0) { + if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME) + lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime = + req->rq_oi.oi_oa->o_ctime; + if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME) + lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime = + req->rq_oi.oi_oa->o_mtime; + if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME) + lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime = + req->rq_oi.oi_oa->o_atime; + } + + RETURN(rc); +} + +/* The callback for osc_setattr_async that finilizes a request info when a + * response is recieved. */ +static int cb_setattr_update(void *cookie, int rc) +{ + struct obd_info *oinfo = cookie; + struct lov_request *lovreq; + lovreq = container_of(oinfo, struct lov_request, rq_oi); + return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc); +} + +int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo, + struct obd_trans_info *oti, struct lov_request_set **reqset) { struct lov_request_set *set; - struct lov_oinfo *loi = NULL; struct lov_obd *lov = &exp->exp_obd->u.lov; int rc = 0, i; ENTRY; @@ -1077,14 +1168,17 @@ int lov_prep_setattr_set(struct obd_export *exp, struct obdo *src_oa, lov_init_set(set); set->set_exp = exp; - set->set_md = lsm; - set->set_oa = src_oa; + set->set_oti = oti; + set->set_oi = oinfo; + if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) + set->set_cookies = oti->oti_logcookies; - loi = lsm->lsm_oinfo; - for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) { + for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) { + struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i]; struct lov_request *req; - if (lov->tgts[loi->loi_ost_idx].active == 0) { + if (!lov->lov_tgts[loi->loi_ost_idx] || + !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) { CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); continue; } @@ -1095,20 +1189,32 @@ int lov_prep_setattr_set(struct obd_export *exp, struct obdo *src_oa, req->rq_stripe = i; req->rq_idx = loi->loi_ost_idx; - req->rq_oa = obdo_alloc(); - if (req->rq_oa == NULL) + OBDO_ALLOC(req->rq_oi.oi_oa); + if (req->rq_oi.oi_oa == NULL) { + OBD_FREE(req, sizeof(*req)); GOTO(out_set, rc = -ENOMEM); - memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa)); - req->rq_oa->o_id = loi->loi_id; - LASSERT(!(req->rq_oa->o_valid & OBD_MD_FLGROUP) || req->rq_oa->o_gr>0); - - if (src_oa->o_valid & OBD_MD_FLSIZE) { - if (lov_stripe_offset(lsm, src_oa->o_size, i, - &req->rq_oa->o_size) < 0 && - req->rq_oa->o_size) - req->rq_oa->o_size--; + } + memcpy(req->rq_oi.oi_oa, oinfo->oi_oa, + sizeof(*req->rq_oi.oi_oa)); + req->rq_oi.oi_oa->o_id = loi->loi_id; + LASSERT(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP) + || req->rq_oi.oi_oa->o_gr>0); + req->rq_oi.oi_oa->o_stripe_idx = i; + req->rq_oi.oi_cb_up = cb_setattr_update; + req->rq_oi.oi_capa = oinfo->oi_capa; + req->rq_rqset = set; + + if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) { + int off = lov_stripe_offset(oinfo->oi_md, + oinfo->oi_oa->o_size, i, + &req->rq_oi.oi_oa->o_size); + + if (off < 0 && req->rq_oi.oi_oa->o_size) + req->rq_oi.oi_oa->o_size--; + CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n", - i, req->rq_oa->o_size, src_oa->o_size); + i, req->rq_oi.oi_oa->o_size, + oinfo->oi_oa->o_size); } lov_set_add_req(req, set); } @@ -1121,31 +1227,19 @@ out_set: RETURN(rc); } -int lov_update_punch_set(struct lov_request_set *set, struct lov_request *req, - int rc) -{ - struct lov_obd *lov = &set->set_exp->exp_obd->u.lov; - ENTRY; - - lov_update_set(set, req, rc); - if (rc && !lov->tgts[req->rq_idx].active) - rc = 0; - /* FIXME in raid1 regime, should return 0 */ - RETURN(rc); -} - int lov_fini_punch_set(struct lov_request_set *set) { int rc = 0; ENTRY; - LASSERT(set->set_exp); if (set == NULL) RETURN(0); + LASSERT(set->set_exp); if (set->set_completes) { - if (!set->set_success) - rc = -EIO; + rc = -EIO; /* FIXME update qos data here */ + if (set->set_success) + rc = common_attr_done(set); } if (atomic_dec_and_test(&set->set_refcount)) @@ -1154,13 +1248,49 @@ int lov_fini_punch_set(struct lov_request_set *set) RETURN(rc); } -int lov_prep_punch_set(struct obd_export *exp, struct obdo *src_oa, - struct lov_stripe_md *lsm, obd_off start, - obd_off end, struct obd_trans_info *oti, +int lov_update_punch_set(struct lov_request_set *set, + struct lov_request *req, int rc) +{ + struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov; + struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md; + ENTRY; + + lov_update_set(set, req, rc); + + /* grace error on inactive ost */ + if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active) + rc = 0; + + if (rc == 0) { + lov_stripe_lock(lsm); + if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) { + lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks = + req->rq_oi.oi_oa->o_blocks; + } + + /* Do we need to update lvb_size here? It needn't because + * it have been done in ll_truncate(). -jay */ + lov_stripe_unlock(lsm); + } + + RETURN(rc); +} + +/* The callback for osc_punch that finilizes a request info when a response + * is recieved. */ +static int cb_update_punch(void *cookie, int rc) +{ + struct obd_info *oinfo = cookie; + struct lov_request *lovreq; + lovreq = container_of(oinfo, struct lov_request, rq_oi); + return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc); +} + +int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo, + struct obd_trans_info *oti, struct lov_request_set **reqset) { struct lov_request_set *set; - struct lov_oinfo *loi = NULL; struct lov_obd *lov = &exp->exp_obd->u.lov; int rc = 0, i; ENTRY; @@ -1170,21 +1300,24 @@ int lov_prep_punch_set(struct obd_export *exp, struct obdo *src_oa, RETURN(-ENOMEM); lov_init_set(set); + set->set_oi = oinfo; set->set_exp = exp; - set->set_md = lsm; - set->set_oa = src_oa; - loi = lsm->lsm_oinfo; - for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) { + for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) { + struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i]; struct lov_request *req; obd_off rs, re; - if (lov->tgts[loi->loi_ost_idx].active == 0) { + if (!lov->lov_tgts[loi->loi_ost_idx] || + !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) { CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); continue; } - if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re)) + if (!lov_stripe_intersects(oinfo->oi_md, i, + oinfo->oi_policy.l_extent.start, + oinfo->oi_policy.l_extent.end, + &rs, &re)) continue; OBD_ALLOC(req, sizeof(*req)); @@ -1193,16 +1326,26 @@ int lov_prep_punch_set(struct obd_export *exp, struct obdo *src_oa, req->rq_stripe = i; req->rq_idx = loi->loi_ost_idx; - req->rq_oa = obdo_alloc(); - if (req->rq_oa == NULL) + OBDO_ALLOC(req->rq_oi.oi_oa); + if (req->rq_oi.oi_oa == NULL) { + OBD_FREE(req, sizeof(*req)); GOTO(out_set, rc = -ENOMEM); - memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa)); - req->rq_oa->o_id = loi->loi_id; - req->rq_oa->o_gr = loi->loi_gr; - req->rq_oa->o_valid |= OBD_MD_FLGROUP; + } + memcpy(req->rq_oi.oi_oa, oinfo->oi_oa, + sizeof(*req->rq_oi.oi_oa)); + req->rq_oi.oi_oa->o_id = loi->loi_id; + req->rq_oi.oi_oa->o_gr = loi->loi_gr; + req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP; + + req->rq_oi.oi_oa->o_stripe_idx = i; + req->rq_oi.oi_cb_up = cb_update_punch; + req->rq_rqset = set; - req->rq_extent.start = rs; - req->rq_extent.end = re; + req->rq_oi.oi_policy.l_extent.start = rs; + req->rq_oi.oi_policy.l_extent.end = re; + req->rq_oi.oi_policy.l_extent.gid = -1; + + req->rq_oi.oi_capa = oinfo->oi_capa; lov_set_add_req(req, set); } @@ -1220,9 +1363,9 @@ int lov_fini_sync_set(struct lov_request_set *set) int rc = 0; ENTRY; - LASSERT(set->set_exp); if (set == NULL) RETURN(0); + LASSERT(set->set_exp); if (set->set_completes) { if (!set->set_success) rc = -EIO; @@ -1235,12 +1378,12 @@ int lov_fini_sync_set(struct lov_request_set *set) RETURN(rc); } -int lov_prep_sync_set(struct obd_export *exp, struct obdo *src_oa, - struct lov_stripe_md *lsm, obd_off start, - obd_off end, struct lov_request_set **reqset) +int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo, + struct obdo *src_oa, struct lov_stripe_md *lsm, + obd_off start, obd_off end, + struct lov_request_set **reqset) { struct lov_request_set *set; - struct lov_oinfo *loi = NULL; struct lov_obd *lov = &exp->exp_obd->u.lov; int rc = 0, i; ENTRY; @@ -1251,15 +1394,17 @@ int lov_prep_sync_set(struct obd_export *exp, struct obdo *src_oa, lov_init_set(set); set->set_exp = exp; - set->set_md = lsm; - set->set_oa = src_oa; + set->set_oi = oinfo; + set->set_oi->oi_md = lsm; + set->set_oi->oi_oa = src_oa; - loi = lsm->lsm_oinfo; - for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) { + for (i = 0; i < lsm->lsm_stripe_count; i++) { + struct lov_oinfo *loi = lsm->lsm_oinfo[i]; struct lov_request *req; obd_off rs, re; - if (lov->tgts[loi->loi_ost_idx].active == 0) { + if (!lov->lov_tgts[loi->loi_ost_idx] || + !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) { CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); continue; } @@ -1273,13 +1418,19 @@ int lov_prep_sync_set(struct obd_export *exp, struct obdo *src_oa, req->rq_stripe = i; req->rq_idx = loi->loi_ost_idx; - req->rq_oa = obdo_alloc(); - if (req->rq_oa == NULL) + OBDO_ALLOC(req->rq_oi.oi_oa); + if (req->rq_oi.oi_oa == NULL) { + OBD_FREE(req, sizeof(*req)); GOTO(out_set, rc = -ENOMEM); - memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa)); - req->rq_oa->o_id = loi->loi_id; - req->rq_extent.start = rs; - req->rq_extent.end = re; + } + memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa)); + req->rq_oi.oi_oa->o_id = loi->loi_id; + req->rq_oi.oi_oa->o_stripe_idx = i; + + req->rq_oi.oi_policy.l_extent.start = rs; + req->rq_oi.oi_policy.l_extent.end = re; + req->rq_oi.oi_policy.l_extent.gid = -1; + lov_set_add_req(req, set); } if (!set->set_count) @@ -1290,3 +1441,215 @@ out_set: lov_fini_sync_set(set); RETURN(rc); } + +#define LOV_U64_MAX ((__u64)~0ULL) +#define LOV_SUM_MAX(tot, add) \ + do { \ + if ((tot) + (add) < (tot)) \ + (tot) = LOV_U64_MAX; \ + else \ + (tot) += (add); \ + } while(0) + +int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success) +{ + ENTRY; + + if (success) { + __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0); + + if (osfs->os_files != LOV_U64_MAX) + do_div(osfs->os_files, expected_stripes); + if (osfs->os_ffree != LOV_U64_MAX) + do_div(osfs->os_ffree, expected_stripes); + + spin_lock(&obd->obd_osfs_lock); + memcpy(&obd->obd_osfs, osfs, sizeof(*osfs)); + obd->obd_osfs_age = cfs_time_current_64(); + spin_unlock(&obd->obd_osfs_lock); + RETURN(0); + } + + RETURN(-EIO); +} + +int lov_fini_statfs_set(struct lov_request_set *set) +{ + int rc = 0; + ENTRY; + + if (set == NULL) + RETURN(0); + + if (set->set_completes) { + rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs, + set->set_success); + } + + if (atomic_dec_and_test(&set->set_refcount)) + lov_finish_set(set); + + RETURN(rc); +} + +void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs, + int success) +{ + int shift = 0, quit = 0; + __u64 tmp; + + if (success == 0) { + memcpy(osfs, lov_sfs, sizeof(*lov_sfs)); + } else { + if (osfs->os_bsize != lov_sfs->os_bsize) { + /* assume all block sizes are always powers of 2 */ + /* get the bits difference */ + tmp = osfs->os_bsize | lov_sfs->os_bsize; + for (shift = 0; shift <= 64; ++shift) { + if (tmp & 1) { + if (quit) + break; + else + quit = 1; + shift = 0; + } + tmp >>= 1; + } + } + + if (osfs->os_bsize < lov_sfs->os_bsize) { + osfs->os_bsize = lov_sfs->os_bsize; + + osfs->os_bfree >>= shift; + osfs->os_bavail >>= shift; + osfs->os_blocks >>= shift; + } else if (shift != 0) { + lov_sfs->os_bfree >>= shift; + lov_sfs->os_bavail >>= shift; + lov_sfs->os_blocks >>= shift; + } +#ifdef MIN_DF + /* Sandia requested that df (and so, statfs) only + returned minimal available space on + a single OST, so people would be able to + write this much data guaranteed. */ + if (osfs->os_bavail > lov_sfs->os_bavail) { + /* Presumably if new bavail is smaller, + new bfree is bigger as well */ + osfs->os_bfree = lov_sfs->os_bfree; + osfs->os_bavail = lov_sfs->os_bavail; + } +#else + osfs->os_bfree += lov_sfs->os_bfree; + osfs->os_bavail += lov_sfs->os_bavail; +#endif + osfs->os_blocks += lov_sfs->os_blocks; + /* XXX not sure about this one - depends on policy. + * - could be minimum if we always stripe on all OBDs + * (but that would be wrong for any other policy, + * if one of the OBDs has no more objects left) + * - could be sum if we stripe whole objects + * - could be average, just to give a nice number + * + * To give a "reasonable" (if not wholly accurate) + * number, we divide the total number of free objects + * by expected stripe count (watch out for overflow). + */ + LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files); + LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree); + } +} + +/* The callback for osc_statfs_async that finilizes a request info when a + * response is recieved. */ +static int cb_statfs_update(void *cookie, int rc) +{ + struct obd_info *oinfo = cookie; + struct lov_request *lovreq; + struct obd_statfs *osfs, *lov_sfs; + struct obd_device *obd; + struct lov_obd *lov; + int success; + ENTRY; + + lovreq = container_of(oinfo, struct lov_request, rq_oi); + lov = &lovreq->rq_rqset->set_obd->u.lov; + obd = class_exp2obd(lov->lov_tgts[lovreq->rq_idx]->ltd_exp); + + osfs = lovreq->rq_rqset->set_oi->oi_osfs; + lov_sfs = oinfo->oi_osfs; + + success = lovreq->rq_rqset->set_success; + + /* XXX: the same is done in lov_update_common_set, however + lovset->set_exp is not initialized. */ + lov_update_set(lovreq->rq_rqset, lovreq, rc); + if (rc) { + if (rc && !(lov->lov_tgts[lovreq->rq_idx] && + lov->lov_tgts[lovreq->rq_idx]->ltd_active)) + rc = 0; + RETURN(rc); + } + + spin_lock(&obd->obd_osfs_lock); + memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs)); + if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0) + obd->obd_osfs_age = cfs_time_current_64(); + spin_unlock(&obd->obd_osfs_lock); + + lov_update_statfs(osfs, lov_sfs, success); + qos_update(lov); + + RETURN(0); +} + +int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo, + struct lov_request_set **reqset) +{ + struct lov_request_set *set; + struct lov_obd *lov = &obd->u.lov; + int rc = 0, i; + ENTRY; + + OBD_ALLOC(set, sizeof(*set)); + if (set == NULL) + RETURN(-ENOMEM); + lov_init_set(set); + + set->set_obd = obd; + set->set_oi = oinfo; + + /* We only get block data from the OBD */ + for (i = 0; i < lov->desc.ld_tgt_count; i++) { + struct lov_request *req; + + if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active) { + CDEBUG(D_HA, "lov idx %d inactive\n", i); + continue; + } + + OBD_ALLOC(req, sizeof(*req)); + if (req == NULL) + GOTO(out_set, rc = -ENOMEM); + + OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs)); + if (req->rq_oi.oi_osfs == NULL) { + OBD_FREE(req, sizeof(*req)); + GOTO(out_set, rc = -ENOMEM); + } + + req->rq_idx = i; + req->rq_oi.oi_cb_up = cb_statfs_update; + req->rq_oi.oi_flags = oinfo->oi_flags; + req->rq_rqset = set; + + lov_set_add_req(req, set); + } + if (!set->set_count) + GOTO(out_set, rc = -EIO); + *reqset = set; + RETURN(rc); +out_set: + lov_fini_statfs_set(set); + RETURN(rc); +}