X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Flov%2Flov_request.c;h=ac5299140e0aa5640e5159602bf9f5f665e67e0e;hb=0d0b8d65d84c644290f938a43fdcb0d68015c519;hp=8262a86833e63ef05233fe72fd6aa29f1520ffcf;hpb=113303973ec9f8484eb2355a1a6ef3c4c7fd6a56;p=fs%2Flustre-release.git diff --git a/lustre/lov/lov_request.c b/lustre/lov/lov_request.c index 8262a86..ac52991 100644 --- a/lustre/lov/lov_request.c +++ b/lustre/lov/lov_request.c @@ -1,25 +1,40 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * Copyright (C) 2002, 2003 Cluster File Systems, Inc. + * GPL HEADER START * - * This file is part of the Lustre file system, http://www.lustre.org - * Lustre is a trademark of Cluster File Systems, Inc. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * - * You may have signed or agreed to another license before downloading - * this software. If so, you are bound by the terms and conditions - * of that agreement, and the following does not apply to you. See the - * LICENSE file included with this distribution for more information. + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. * - * If you did not agree to a different license, then this copy of Lustre - * is open source software; you can redistribute it and/or modify it - * under the terms of version 2 of the GNU General Public License as - * published by the Free Software Foundation. + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). * - * In either case, Lustre is distributed in the hope that it will be - * useful, but WITHOUT ANY WARRANTY; without even the implied warranty - * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * license text for more details. + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved. + * Use is subject to license terms. + * + * Copyright (c) 2011 Whamcloud, Inc. + * + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. */ #ifndef EXPORT_SYMTAB @@ -46,24 +61,27 @@ static void lov_init_set(struct lov_request_set *set) set->set_success = 0; set->set_cookies = 0; CFS_INIT_LIST_HEAD(&set->set_list); - atomic_set(&set->set_refcount, 1); + cfs_atomic_set(&set->set_refcount, 1); + cfs_waitq_init(&set->set_waitq); + cfs_spin_lock_init(&set->set_lock); } -static void lov_finish_set(struct lov_request_set *set) +void lov_finish_set(struct lov_request_set *set) { - struct list_head *pos, *n; + cfs_list_t *pos, *n; ENTRY; LASSERT(set); - list_for_each_safe(pos, n, &set->set_list) { - struct lov_request *req = list_entry(pos, struct lov_request, - rq_link); - list_del_init(&req->rq_link); + cfs_list_for_each_safe(pos, n, &set->set_list) { + struct lov_request *req = cfs_list_entry(pos, + struct lov_request, + rq_link); + cfs_list_del_init(&req->rq_link); if (req->rq_oi.oi_oa) - obdo_free(req->rq_oi.oi_oa); + OBDO_FREE(req->rq_oi.oi_oa); if (req->rq_oi.oi_md) - OBD_FREE(req->rq_oi.oi_md, req->rq_buflen); + OBD_FREE_LARGE(req->rq_oi.oi_md, req->rq_buflen); if (req->rq_oi.oi_osfs) OBD_FREE(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs)); @@ -72,7 +90,7 @@ static void lov_finish_set(struct lov_request_set *set) if (set->set_pga) { int len = set->set_oabufs * sizeof(*set->set_pga); - OBD_FREE(set->set_pga, len); + OBD_FREE_LARGE(set->set_pga, len); } if (set->set_lockh) lov_llh_put(set->set_lockh); @@ -81,6 +99,13 @@ static void lov_finish_set(struct lov_request_set *set) EXIT; } +int lov_finished_set(struct lov_request_set *set) +{ + CDEBUG(D_INFO, "check set %d/%d\n", set->set_completes, + set->set_count); + return set->set_completes == set->set_count; +} + void lov_update_set(struct lov_request_set *set, struct lov_request *req, int rc) { @@ -90,6 +115,8 @@ void lov_update_set(struct lov_request_set *set, set->set_completes++; if (rc == 0) set->set_success++; + + cfs_waitq_signal(&set->set_waitq); } int lov_update_common_set(struct lov_request_set *set, @@ -101,7 +128,7 @@ int lov_update_common_set(struct lov_request_set *set, lov_update_set(set, req, rc); /* grace error on inactive ost */ - if (rc && !(lov->lov_tgts[req->rq_idx] && + if (rc && !(lov->lov_tgts[req->rq_idx] && lov->lov_tgts[req->rq_idx]->ltd_active)) rc = 0; @@ -111,22 +138,49 @@ int lov_update_common_set(struct lov_request_set *set, void lov_set_add_req(struct lov_request *req, struct lov_request_set *set) { - list_add_tail(&req->rq_link, &set->set_list); + cfs_list_add_tail(&req->rq_link, &set->set_list); set->set_count++; + req->rq_rqset = set; +} + +extern void osc_update_enqueue(struct lustre_handle *lov_lockhp, + struct lov_oinfo *loi, int flags, + struct ost_lvb *lvb, __u32 mode, int rc); + +static int lov_update_enqueue_lov(struct obd_export *exp, + struct lustre_handle *lov_lockhp, + struct lov_oinfo *loi, int flags, int idx, + __u64 oid, int rc) +{ + struct lov_obd *lov = &exp->exp_obd->u.lov; + + if (rc != ELDLM_OK && + !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) { + memset(lov_lockhp, 0, sizeof(*lov_lockhp)); + if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) { + /* -EUSERS used by OST to report file contention */ + if (rc != -EINTR && rc != -EUSERS) + CERROR("enqueue objid "LPX64" subobj " + LPX64" on OST idx %d: rc %d\n", + oid, loi->loi_id, loi->loi_ost_idx, rc); + } else + rc = ELDLM_OK; + } + return rc; } int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc) { struct lov_request_set *set = req->rq_rqset; struct lustre_handle *lov_lockhp; + struct obd_info *oi = set->set_oi; struct lov_oinfo *loi; ENTRY; - LASSERT(set != NULL); - LASSERT(set->set_oi != NULL); + LASSERT(oi != NULL); lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe; - loi = &set->set_oi->oi_md->lsm_oinfo[req->rq_stripe]; + loi = oi->oi_md->lsm_oinfo[req->rq_stripe]; /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set * and that copy can be arbitrarily out of date. @@ -134,65 +188,23 @@ int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc) * The LOV API is due for a serious rewriting anyways, and this * can be addressed then. */ - if (rc == ELDLM_OK) { - struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp); - __u64 tmp; - - LASSERT(lock != NULL); - lov_stripe_lock(set->set_oi->oi_md); - loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo->loi_lvb; - tmp = loi->loi_lvb.lvb_size; - /* Extend KMS up to the end of this lock and no further - * A lock on [x,y] means a KMS of up to y + 1 bytes! */ - if (tmp > lock->l_policy_data.l_extent.end) - tmp = lock->l_policy_data.l_extent.end + 1; - if (tmp >= loi->loi_kms) { - LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64 - ", kms="LPU64, loi->loi_lvb.lvb_size, tmp); - loi->loi_kms = tmp; - loi->loi_kms_valid = 1; - } else { - LDLM_DEBUG(lock, "lock acquired, setting rss=" - LPU64"; leaving kms="LPU64", end="LPU64, - loi->loi_lvb.lvb_size, loi->loi_kms, - lock->l_policy_data.l_extent.end); - } - lov_stripe_unlock(set->set_oi->oi_md); - ldlm_lock_allow_match(lock); - LDLM_LOCK_PUT(lock); - } else if ((rc == ELDLM_LOCK_ABORTED) && - (set->set_ei->ei_flags & LDLM_FL_HAS_INTENT)) { - memset(lov_lockhp, 0, sizeof(*lov_lockhp)); - lov_stripe_lock(set->set_oi->oi_md); - loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo->loi_lvb; - lov_stripe_unlock(set->set_oi->oi_md); - CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving" - " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms); - rc = ELDLM_OK; - } else { - struct obd_export *exp = set->set_exp; - struct lov_obd *lov = &exp->exp_obd->u.lov; - - memset(lov_lockhp, 0, sizeof(*lov_lockhp)); - if (lov->lov_tgts[req->rq_idx] && - lov->lov_tgts[req->rq_idx]->ltd_active) { - if (rc != -EINTR) - CERROR("enqueue objid "LPX64" subobj " - LPX64" on OST idx %d: rc %d\n", - set->set_oi->oi_md->lsm_object_id, - loi->loi_id, loi->loi_ost_idx, rc); - } else { - rc = ELDLM_OK; - } - } + lov_stripe_lock(oi->oi_md); + osc_update_enqueue(lov_lockhp, loi, oi->oi_flags, + &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc); + if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT)) + memset(lov_lockhp, 0, sizeof *lov_lockhp); + rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags, + req->rq_idx, oi->oi_md->lsm_object_id, rc); + lov_stripe_unlock(oi->oi_md); lov_update_set(set, req, rc); RETURN(rc); } /* The callback for osc_enqueue that updates lov info for every OSC request. */ -static int cb_update_enqueue(struct obd_info *oinfo, int rc) +static int cb_update_enqueue(void *cookie, int rc) { - struct obd_enqueue_info *einfo; + struct obd_info *oinfo = cookie; + struct ldlm_enqueue_info *einfo; struct lov_request *lovreq; lovreq = container_of(oinfo, struct lov_request, rq_oi); @@ -212,7 +224,7 @@ static int enqueue_done(struct lov_request_set *set, __u32 mode) RETURN(0); /* cancel enqueued/matched locks */ - list_for_each_entry(req, &set->set_list, rq_link) { + cfs_list_for_each_entry(req, &set->set_list, rq_link) { struct lustre_handle *lov_lockhp; if (!req->rq_complete || req->rq_rc) @@ -237,7 +249,8 @@ static int enqueue_done(struct lov_request_set *set, __u32 mode) RETURN(rc); } -int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc) +int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc, + struct ptlrpc_request_set *rqset) { int ret = 0; ENTRY; @@ -247,27 +260,25 @@ int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc) LASSERT(set->set_exp); /* Do enqueue_done only for sync requests and if any request * succeeded. */ - if (!set->set_ei->ei_rqset) { + if (!rqset) { if (rc) set->set_completes = 0; ret = enqueue_done(set, mode); } else if (set->set_lockh) lov_llh_put(set->set_lockh); - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); + lov_put_reqset(set); RETURN(rc ? rc : ret); } int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo, - struct obd_enqueue_info *einfo, + struct ldlm_enqueue_info *einfo, struct lov_request_set **reqset) { struct lov_obd *lov = &exp->exp_obd->u.lov; struct lov_request_set *set; int i, rc = 0; - struct lov_oinfo *loi; ENTRY; OBD_ALLOC(set, sizeof(*set)); @@ -283,11 +294,12 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo, GOTO(out_set, rc = -ENOMEM); oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie; - loi = oinfo->oi_md->lsm_oinfo; - for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++, loi++) { + for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) { + struct lov_oinfo *loi; struct lov_request *req; obd_off start, end; + loi = oinfo->oi_md->lsm_oinfo[i]; if (!lov_stripe_intersects(oinfo->oi_md, i, oinfo->oi_policy.l_extent.start, oinfo->oi_policy.l_extent.end, @@ -305,17 +317,21 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo, GOTO(out_set, rc = -ENOMEM); req->rq_buflen = sizeof(*req->rq_oi.oi_md) + + sizeof(struct lov_oinfo *) + sizeof(struct lov_oinfo); - OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen); + OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen); if (req->rq_oi.oi_md == NULL) { OBD_FREE(req, sizeof(*req)); GOTO(out_set, rc = -ENOMEM); } + req->rq_oi.oi_md->lsm_oinfo[0] = + ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) + + sizeof(struct lov_oinfo *); - req->rq_rqset = set; /* Set lov request specific parameters. */ req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i; req->rq_oi.oi_cb_up = cb_update_enqueue; + req->rq_oi.oi_flags = oinfo->oi_flags; LASSERT(req->rq_oi.oi_lockh); @@ -329,11 +345,12 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo, /* XXX LOV STACKING: submd should be from the subobj */ req->rq_oi.oi_md->lsm_object_id = loi->loi_id; + req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq; req->rq_oi.oi_md->lsm_stripe_count = 0; - req->rq_oi.oi_md->lsm_oinfo->loi_kms_valid = + req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid = loi->loi_kms_valid; - req->rq_oi.oi_md->lsm_oinfo->loi_kms = loi->loi_kms; - req->rq_oi.oi_md->lsm_oinfo->loi_lvb = loi->loi_lvb; + req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms; + req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb; lov_set_add_req(req, set); } @@ -342,7 +359,7 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo, *reqset = set; RETURN(0); out_set: - lov_fini_enqueue_set(set, einfo->ei_mode, rc); + lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL); RETURN(rc); } @@ -352,7 +369,7 @@ int lov_update_match_set(struct lov_request_set *set, struct lov_request *req, int ret = rc; ENTRY; - if (rc == 1) + if (rc > 0) ret = 0; else if (rc == 0) ret = 1; @@ -373,8 +390,7 @@ int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags) (flags & LDLM_FL_TEST_LOCK)) lov_llh_put(set->set_lockh); - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); + lov_put_reqset(set); RETURN(rc); } @@ -387,7 +403,6 @@ int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo, struct lov_obd *lov = &exp->exp_obd->u.lov; struct lov_request_set *set; int i, rc = 0; - struct lov_oinfo *loi; ENTRY; OBD_ALLOC(set, sizeof(*set)); @@ -403,10 +418,12 @@ int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo, GOTO(out_set, rc = -ENOMEM); lockh->cookie = set->set_lockh->llh_handle.h_cookie; - for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){ + for (i = 0; i < lsm->lsm_stripe_count; i++){ + struct lov_oinfo *loi; struct lov_request *req; obd_off start, end; + loi = lsm->lsm_oinfo[i]; if (!lov_stripe_intersects(lsm, i, policy->l_extent.start, policy->l_extent.end, &start, &end)) continue; @@ -423,7 +440,7 @@ int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo, GOTO(out_set, rc = -ENOMEM); req->rq_buflen = sizeof(*req->rq_oi.oi_md); - OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen); + OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen); if (req->rq_oi.oi_md == NULL) { OBD_FREE(req, sizeof(*req)); GOTO(out_set, rc = -ENOMEM); @@ -438,6 +455,7 @@ int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo, /* XXX LOV STACKING: submd should be from the subobj */ req->rq_oi.oi_md->lsm_object_id = loi->loi_id; + req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq; req->rq_oi.oi_md->lsm_stripe_count = 0; lov_set_add_req(req, set); @@ -463,8 +481,7 @@ int lov_fini_cancel_set(struct lov_request_set *set) if (set->set_lockh) lov_llh_put(set->set_lockh); - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); + lov_put_reqset(set); RETURN(rc); } @@ -476,7 +493,6 @@ int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo, { struct lov_request_set *set; int i, rc = 0; - struct lov_oinfo *loi; ENTRY; OBD_ALLOC(set, sizeof(*set)); @@ -494,13 +510,14 @@ int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo, } lockh->cookie = set->set_lockh->llh_handle.h_cookie; - for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){ + for (i = 0; i < lsm->lsm_stripe_count; i++){ struct lov_request *req; struct lustre_handle *lov_lockhp; + struct lov_oinfo *loi = lsm->lsm_oinfo[i]; lov_lockhp = set->set_lockh->llh_handles + i; if (!lustre_handle_is_used(lov_lockhp)) { - CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n", + CDEBUG(D_INFO, "lov idx %d subobj "LPX64" no lock\n", loi->loi_ost_idx, loi->loi_id); continue; } @@ -510,7 +527,7 @@ int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo, GOTO(out_set, rc = -ENOMEM); req->rq_buflen = sizeof(*req->rq_oi.oi_md); - OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen); + OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen); if (req->rq_oi.oi_md == NULL) { OBD_FREE(req, sizeof(*req)); GOTO(out_set, rc = -ENOMEM); @@ -521,6 +538,7 @@ int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo, /* XXX LOV STACKING: submd should be from the subobj */ req->rq_oi.oi_md->lsm_object_id = loi->loi_id; + req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq; req->rq_oi.oi_md->lsm_stripe_count = 0; lov_set_add_req(req, set); @@ -534,6 +552,56 @@ out_set: RETURN(rc); } +static int lov_update_create_set(struct lov_request_set *set, + struct lov_request *req, int rc) +{ + struct obd_trans_info *oti = set->set_oti; + struct lov_stripe_md *lsm = set->set_oi->oi_md; + struct lov_oinfo *loi; + struct lov_obd *lov = &set->set_exp->exp_obd->u.lov; + ENTRY; + + if (rc && lov->lov_tgts[req->rq_idx] && + lov->lov_tgts[req->rq_idx]->ltd_active) { + CERROR("error creating fid "LPX64" sub-object" + " on OST idx %d/%d: rc = %d\n", + set->set_oi->oi_oa->o_id, req->rq_idx, + lsm->lsm_stripe_count, rc); + if (rc > 0) { + CERROR("obd_create returned invalid err %d\n", rc); + rc = -EIO; + } + } + + cfs_spin_lock(&set->set_lock); + req->rq_stripe = set->set_success; + loi = lsm->lsm_oinfo[req->rq_stripe]; + + + if (rc) { + lov_update_set(set, req, rc); + cfs_spin_unlock(&set->set_lock); + RETURN(rc); + } + + loi->loi_id = req->rq_oi.oi_oa->o_id; + loi->loi_seq = req->rq_oi.oi_oa->o_seq; + loi->loi_ost_idx = req->rq_idx; + loi_init(loi); + + if (oti && set->set_cookies) + ++oti->oti_logcookies; + if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE) + set->set_cookie_sent++; + + lov_update_set(set, req, rc); + cfs_spin_unlock(&set->set_lock); + + CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n", + lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx); + RETURN(rc); +} + static int create_done(struct obd_export *exp, struct lov_request_set *set, struct lov_stripe_md **lsmp) { @@ -550,7 +618,7 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set, /* try alloc objects on other osts if osc_create fails for * exceptions: RPC failure, ENOSPC, etc */ if (set->set_count != set->set_success) { - list_for_each_entry (req, &set->set_list, rq_link) { + cfs_list_for_each_entry (req, &set->set_list, rq_link) { if (req->rq_rc == 0) continue; @@ -559,9 +627,6 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set, rc = qos_remedy_create(set, req); lov_update_create_set(set, req, rc); - - if (rc) - break; } } @@ -569,20 +634,16 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set, if (set->set_success == 0) GOTO(cleanup, rc); - /* If there was an explicit stripe set, fail. Otherwise, we - * got some objects and that's not bad. */ if (set->set_count != set->set_success) { - if (*lsmp) - GOTO(cleanup, rc); set->set_count = set->set_success; qos_shrink_lsm(set); } - ret_oa = obdo_alloc(); + OBDO_ALLOC(ret_oa); if (ret_oa == NULL) GOTO(cleanup, rc = -ENOMEM); - list_for_each_entry(req, &set->set_list, rq_link) { + cfs_list_for_each_entry(req, &set->set_list, rq_link) { if (!req->rq_complete || req->rq_rc) continue; lov_merge_attrs(ret_oa, req->rq_oi.oi_oa, @@ -596,14 +657,16 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set, LBUG(); } ret_oa->o_id = src_oa->o_id; + ret_oa->o_seq = src_oa->o_seq; + ret_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP; memcpy(src_oa, ret_oa, sizeof(*src_oa)); - obdo_free(ret_oa); + OBDO_FREE(ret_oa); *lsmp = set->set_oi->oi_md; GOTO(done, rc = 0); cleanup: - list_for_each_entry(req, &set->set_list, rq_link) { + cfs_list_for_each_entry(req, &set->set_list, rq_link) { struct obd_export *sub_exp; int err = 0; @@ -611,7 +674,8 @@ cleanup: continue; sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp; - err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL); + err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL, + NULL); if (err) CERROR("Failed to uncreate objid "LPX64" subobj " LPX64" on OST idx %d: rc = %d\n", @@ -644,54 +708,25 @@ int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp) if (set->set_completes) rc = create_done(set->set_exp, set, lsmp); - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); - + lov_put_reqset(set); RETURN(rc); } -int lov_update_create_set(struct lov_request_set *set, - struct lov_request *req, int rc) +int cb_create_update(void *cookie, int rc) { - struct obd_trans_info *oti = set->set_oti; - struct lov_stripe_md *lsm = set->set_oi->oi_md; - struct lov_oinfo *loi; - struct lov_obd *lov = &set->set_exp->exp_obd->u.lov; - ENTRY; - - req->rq_stripe = set->set_success; - loi = &lsm->lsm_oinfo[req->rq_stripe]; - - if (rc && lov->lov_tgts[req->rq_idx] && - lov->lov_tgts[req->rq_idx]->ltd_active) { - CERROR("error creating fid "LPX64" sub-object" - " on OST idx %d/%d: rc = %d\n", - set->set_oi->oi_oa->o_id, req->rq_idx, - lsm->lsm_stripe_count, rc); - if (rc > 0) { - CERROR("obd_create returned invalid err %d\n", rc); - rc = -EIO; - } - } - lov_update_set(set, req, rc); - if (rc) - RETURN(rc); + struct obd_info *oinfo = cookie; + struct lov_request *lovreq; - if (oti && oti->oti_objid) - oti->oti_objid[req->rq_idx] = req->rq_oi.oi_oa->o_id; + lovreq = container_of(oinfo, struct lov_request, rq_oi); - loi->loi_id = req->rq_oi.oi_oa->o_id; - loi->loi_ost_idx = req->rq_idx; - CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n", - lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx); - loi_init(loi); + if (CFS_FAIL_CHECK(OBD_FAIL_MDS_OSC_CREATE_FAIL)) + if (lovreq->rq_idx == cfs_fail_val) + rc = -ENOTCONN; - if (oti && set->set_cookies) - ++oti->oti_logcookies; - if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE) - set->set_cookie_sent++; - - RETURN(0); + rc= lov_update_create_set(lovreq->rq_rqset, lovreq, rc); + if (lov_finished_set(lovreq->rq_rqset)) + lov_put_reqset(lovreq->rq_rqset); + return rc; } int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo, @@ -713,18 +748,23 @@ int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo, set->set_oi->oi_md = *lsmp; set->set_oi->oi_oa = src_oa; set->set_oti = oti; + lov_get_reqset(set); rc = qos_prep_create(exp, set); - if (rc) + /* qos_shrink_lsm() may have allocated a new lsm */ + *lsmp = oinfo->oi_md; + if (rc) { lov_fini_create_set(set, lsmp); - else + lov_put_reqset(set); + } else { *reqset = set; + } RETURN(rc); } static int common_attr_done(struct lov_request_set *set) { - struct list_head *pos; + cfs_list_t *pos; struct lov_request *req; struct obdo *tmp_oa; int rc = 0, attrset = 0; @@ -738,12 +778,12 @@ static int common_attr_done(struct lov_request_set *set) if (!set->set_success) RETURN(-EIO); - tmp_oa = obdo_alloc(); + OBDO_ALLOC(tmp_oa); if (tmp_oa == NULL) GOTO(out, rc = -ENOMEM); - list_for_each (pos, &set->set_list) { - req = list_entry(pos, struct lov_request, rq_link); + cfs_list_for_each (pos, &set->set_list) { + req = cfs_list_entry(pos, struct lov_request, rq_link); if (!req->rq_complete || req->rq_rc) continue; @@ -757,11 +797,19 @@ static int common_attr_done(struct lov_request_set *set) CERROR("No stripes had valid attrs\n"); rc = -EIO; } + if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) && + (set->set_oi->oi_md->lsm_stripe_count != attrset)) { + /* When we take attributes of some epoch, we require all the + * ost to be active. */ + CERROR("Not all the stripes had valid attrs\n"); + GOTO(out, rc = -EIO); + } + tmp_oa->o_id = set->set_oi->oi_oa->o_id; memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa)); out: if (tmp_oa) - obdo_free(tmp_oa); + OBDO_FREE(tmp_oa); RETURN(rc); } @@ -770,17 +818,17 @@ static int brw_done(struct lov_request_set *set) { struct lov_stripe_md *lsm = set->set_oi->oi_md; struct lov_oinfo *loi = NULL; - struct list_head *pos; + cfs_list_t *pos; struct lov_request *req; ENTRY; - list_for_each (pos, &set->set_list) { - req = list_entry(pos, struct lov_request, rq_link); + cfs_list_for_each (pos, &set->set_list) { + req = cfs_list_entry(pos, struct lov_request, rq_link); if (!req->rq_complete || req->rq_rc) continue; - loi = &lsm->lsm_oinfo[req->rq_stripe]; + loi = lsm->lsm_oinfo[req->rq_stripe]; if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks; @@ -801,8 +849,7 @@ int lov_fini_brw_set(struct lov_request_set *set) rc = brw_done(set); /* FIXME update qos data here */ } - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); + lov_put_reqset(set); RETURN(rc); } @@ -818,7 +865,6 @@ int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo, obd_count off; } *info = NULL; struct lov_request_set *set; - struct lov_oinfo *loi = NULL; struct lov_obd *lov = &exp->exp_obd->u.lov; int rc = 0, i, shift; ENTRY; @@ -832,11 +878,11 @@ int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo, set->set_oti = oti; set->set_oi = oinfo; set->set_oabufs = oa_bufs; - OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga)); + OBD_ALLOC_LARGE(set->set_pga, oa_bufs * sizeof(*set->set_pga)); if (!set->set_pga) GOTO(out, rc = -ENOMEM); - OBD_ALLOC(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count); + OBD_ALLOC_LARGE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count); if (!info) GOTO(out, rc = -ENOMEM); @@ -848,14 +894,15 @@ int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo, /* alloc and initialize lov request */ shift = 0; - for (i = 0, loi = oinfo->oi_md->lsm_oinfo; - i < oinfo->oi_md->lsm_stripe_count; i++, loi++){ + for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++){ + struct lov_oinfo *loi = NULL; struct lov_request *req; if (info[i].count == 0) continue; - if (!lov->lov_tgts[loi->loi_ost_idx] || + loi = oinfo->oi_md->lsm_oinfo[i]; + if (!lov->lov_tgts[loi->loi_ost_idx] || !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) { CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); GOTO(out, rc = -EIO); @@ -865,7 +912,7 @@ int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo, if (req == NULL) GOTO(out, rc = -ENOMEM); - req->rq_oi.oi_oa = obdo_alloc(); + OBDO_ALLOC(req->rq_oi.oi_oa); if (req->rq_oi.oi_oa == NULL) { OBD_FREE(req, sizeof(*req)); GOTO(out, rc = -ENOMEM); @@ -876,12 +923,13 @@ int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo, sizeof(*req->rq_oi.oi_oa)); } req->rq_oi.oi_oa->o_id = loi->loi_id; + req->rq_oi.oi_oa->o_seq = loi->loi_seq; req->rq_oi.oi_oa->o_stripe_idx = i; req->rq_buflen = sizeof(*req->rq_oi.oi_md); - OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen); + OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen); if (req->rq_oi.oi_md == NULL) { - obdo_free(req->rq_oi.oi_oa); + OBDO_FREE(req->rq_oi.oi_oa); OBD_FREE(req, sizeof(*req)); GOTO(out, rc = -ENOMEM); } @@ -891,7 +939,7 @@ int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo, /* XXX LOV STACKING */ req->rq_oi.oi_md->lsm_object_id = loi->loi_id; - req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr; + req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq; req->rq_oabufs = info[i].count; req->rq_pgaidx = shift; shift += req->rq_oabufs; @@ -899,6 +947,8 @@ int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo, /* remember the index for sort brw_page array */ info[i].index = req->rq_pgaidx; + req->rq_oi.oi_capa = oinfo->oi_capa; + lov_set_add_req(req, set); } if (!set->set_count) @@ -917,7 +967,8 @@ int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo, } out: if (info) - OBD_FREE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count); + OBD_FREE_LARGE(info, + sizeof(*info) * oinfo->oi_md->lsm_stripe_count); if (rc == 0) *reqset = set; @@ -938,16 +989,16 @@ int lov_fini_getattr_set(struct lov_request_set *set) if (set->set_completes) rc = common_attr_done(set); - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); + lov_put_reqset(set); RETURN(rc); } /* The callback for osc_getattr_async that finilizes a request info when a - * response is recieved. */ -static int cb_getattr_update(struct obd_info *oinfo, int rc) + * response is received. */ +static int cb_getattr_update(void *cookie, int rc) { + struct obd_info *oinfo = cookie; struct lov_request *lovreq; lovreq = container_of(oinfo, struct lov_request, rq_oi); return lov_update_common_set(lovreq->rq_rqset, lovreq, rc); @@ -957,7 +1008,6 @@ int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo, struct lov_request_set **reqset) { struct lov_request_set *set; - struct lov_oinfo *loi = NULL; struct lov_obd *lov = &exp->exp_obd->u.lov; int rc = 0, i; ENTRY; @@ -970,13 +1020,17 @@ int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo, set->set_exp = exp; set->set_oi = oinfo; - loi = oinfo->oi_md->lsm_oinfo; - for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++, loi++) { + for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) { + struct lov_oinfo *loi; struct lov_request *req; + loi = oinfo->oi_md->lsm_oinfo[i]; if (!lov->lov_tgts[loi->loi_ost_idx] || !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) { CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); + if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH) + /* SOM requires all the OSTs to be active. */ + GOTO(out_set, rc = -EIO); continue; } @@ -987,7 +1041,7 @@ int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo, req->rq_stripe = i; req->rq_idx = loi->loi_ost_idx; - req->rq_oi.oi_oa = obdo_alloc(); + OBDO_ALLOC(req->rq_oi.oi_oa); if (req->rq_oi.oi_oa == NULL) { OBD_FREE(req, sizeof(*req)); GOTO(out_set, rc = -ENOMEM); @@ -995,7 +1049,9 @@ int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo, memcpy(req->rq_oi.oi_oa, oinfo->oi_oa, sizeof(*req->rq_oi.oi_oa)); req->rq_oi.oi_oa->o_id = loi->loi_id; + req->rq_oi.oi_oa->o_seq = loi->loi_seq; req->rq_oi.oi_cb_up = cb_getattr_update; + req->rq_oi.oi_capa = oinfo->oi_capa; lov_set_add_req(req, set); } @@ -1019,8 +1075,7 @@ int lov_fini_destroy_set(struct lov_request_set *set) /* FIXME update qos data here */ } - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); + lov_put_reqset(set); RETURN(0); } @@ -1031,7 +1086,6 @@ int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo, struct lov_request_set **reqset) { struct lov_request_set *set; - struct lov_oinfo *loi = NULL; struct lov_obd *lov = &exp->exp_obd->u.lov; int rc = 0, i; ENTRY; @@ -1049,11 +1103,12 @@ int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo, if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE) set->set_cookies = oti->oti_logcookies; - loi = lsm->lsm_oinfo; - for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) { + for (i = 0; i < lsm->lsm_stripe_count; i++) { + struct lov_oinfo *loi; struct lov_request *req; - if (!lov->lov_tgts[loi->loi_ost_idx] || + loi = lsm->lsm_oinfo[i]; + if (!lov->lov_tgts[loi->loi_ost_idx] || !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) { CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); continue; @@ -1066,13 +1121,14 @@ int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo, req->rq_stripe = i; req->rq_idx = loi->loi_ost_idx; - req->rq_oi.oi_oa = obdo_alloc(); + OBDO_ALLOC(req->rq_oi.oi_oa); if (req->rq_oi.oi_oa == NULL) { OBD_FREE(req, sizeof(*req)); GOTO(out_set, rc = -ENOMEM); } memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa)); req->rq_oi.oi_oa->o_id = loi->loi_id; + req->rq_oi.oi_oa->o_seq = loi->loi_seq; lov_set_add_req(req, set); } if (!set->set_count) @@ -1097,8 +1153,7 @@ int lov_fini_setattr_set(struct lov_request_set *set) /* FIXME update qos data here */ } - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); + lov_put_reqset(set); RETURN(rc); } @@ -1112,19 +1167,19 @@ int lov_update_setattr_set(struct lov_request_set *set, lov_update_set(set, req, rc); /* grace error on inactive ost */ - if (rc && !(lov->lov_tgts[req->rq_idx] && + if (rc && !(lov->lov_tgts[req->rq_idx] && lov->lov_tgts[req->rq_idx]->ltd_active)) rc = 0; if (rc == 0) { if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME) - lsm->lsm_oinfo[req->rq_stripe].loi_lvb.lvb_ctime = + lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime = req->rq_oi.oi_oa->o_ctime; if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME) - lsm->lsm_oinfo[req->rq_stripe].loi_lvb.lvb_mtime = + lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime = req->rq_oi.oi_oa->o_mtime; if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME) - lsm->lsm_oinfo[req->rq_stripe].loi_lvb.lvb_atime = + lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime = req->rq_oi.oi_oa->o_atime; } @@ -1132,9 +1187,10 @@ int lov_update_setattr_set(struct lov_request_set *set, } /* The callback for osc_setattr_async that finilizes a request info when a - * response is recieved. */ -static int cb_setattr_update(struct obd_info *oinfo, int rc) + * response is received. */ +static int cb_setattr_update(void *cookie, int rc) { + struct obd_info *oinfo = cookie; struct lov_request *lovreq; lovreq = container_of(oinfo, struct lov_request, rq_oi); return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc); @@ -1145,7 +1201,6 @@ int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo, struct lov_request_set **reqset) { struct lov_request_set *set; - struct lov_oinfo *loi = NULL; struct lov_obd *lov = &exp->exp_obd->u.lov; int rc = 0, i; ENTRY; @@ -1161,8 +1216,8 @@ int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo, if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) set->set_cookies = oti->oti_logcookies; - loi = oinfo->oi_md->lsm_oinfo; - for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++, loi++) { + for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) { + struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i]; struct lov_request *req; if (!lov->lov_tgts[loi->loi_ost_idx] || @@ -1177,7 +1232,7 @@ int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo, req->rq_stripe = i; req->rq_idx = loi->loi_ost_idx; - req->rq_oi.oi_oa = obdo_alloc(); + OBDO_ALLOC(req->rq_oi.oi_oa); if (req->rq_oi.oi_oa == NULL) { OBD_FREE(req, sizeof(*req)); GOTO(out_set, rc = -ENOMEM); @@ -1185,9 +1240,10 @@ int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo, memcpy(req->rq_oi.oi_oa, oinfo->oi_oa, sizeof(*req->rq_oi.oi_oa)); req->rq_oi.oi_oa->o_id = loi->loi_id; + req->rq_oi.oi_oa->o_seq= loi->loi_seq; req->rq_oi.oi_oa->o_stripe_idx = i; req->rq_oi.oi_cb_up = cb_setattr_update; - req->rq_rqset = set; + req->rq_oi.oi_capa = oinfo->oi_capa; if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) { int off = lov_stripe_offset(oinfo->oi_md, @@ -1227,14 +1283,13 @@ int lov_fini_punch_set(struct lov_request_set *set) rc = common_attr_done(set); } - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); + lov_put_reqset(set); RETURN(rc); } int lov_update_punch_set(struct lov_request_set *set, - struct lov_request *req, int rc) + struct lov_request *req, int rc) { struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov; struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md; @@ -1249,7 +1304,7 @@ int lov_update_punch_set(struct lov_request_set *set, if (rc == 0) { lov_stripe_lock(lsm); if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) { - lsm->lsm_oinfo[req->rq_stripe].loi_lvb.lvb_blocks = + lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks; } @@ -1262,9 +1317,10 @@ int lov_update_punch_set(struct lov_request_set *set, } /* The callback for osc_punch that finilizes a request info when a response - * is recieved. */ -static int cb_update_punch(struct obd_info *oinfo, int rc) + * is received. */ +static int cb_update_punch(void *cookie, int rc) { + struct obd_info *oinfo = cookie; struct lov_request *lovreq; lovreq = container_of(oinfo, struct lov_request, rq_oi); return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc); @@ -1275,7 +1331,6 @@ int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo, struct lov_request_set **reqset) { struct lov_request_set *set; - struct lov_oinfo *loi = NULL; struct lov_obd *lov = &exp->exp_obd->u.lov; int rc = 0, i; ENTRY; @@ -1288,30 +1343,30 @@ int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo, set->set_oi = oinfo; set->set_exp = exp; - loi = oinfo->oi_md->lsm_oinfo; - for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++, loi++) { + for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) { + struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i]; struct lov_request *req; obd_off rs, re; - if (!lov->lov_tgts[loi->loi_ost_idx] || - !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) { - CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); - continue; - } - if (!lov_stripe_intersects(oinfo->oi_md, i, oinfo->oi_policy.l_extent.start, oinfo->oi_policy.l_extent.end, &rs, &re)) continue; + if (!lov->lov_tgts[loi->loi_ost_idx] || + !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) { + CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); + GOTO(out_set, rc = -EIO); + } + OBD_ALLOC(req, sizeof(*req)); if (req == NULL) GOTO(out_set, rc = -ENOMEM); req->rq_stripe = i; req->rq_idx = loi->loi_ost_idx; - req->rq_oi.oi_oa = obdo_alloc(); + OBDO_ALLOC(req->rq_oi.oi_oa); if (req->rq_oi.oi_oa == NULL) { OBD_FREE(req, sizeof(*req)); GOTO(out_set, rc = -ENOMEM); @@ -1319,14 +1374,18 @@ int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo, memcpy(req->rq_oi.oi_oa, oinfo->oi_oa, sizeof(*req->rq_oi.oi_oa)); req->rq_oi.oi_oa->o_id = loi->loi_id; + req->rq_oi.oi_oa->o_seq = loi->loi_seq; + req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP; + req->rq_oi.oi_oa->o_stripe_idx = i; req->rq_oi.oi_cb_up = cb_update_punch; - req->rq_rqset = set; req->rq_oi.oi_policy.l_extent.start = rs; req->rq_oi.oi_policy.l_extent.end = re; req->rq_oi.oi_policy.l_extent.gid = -1; + req->rq_oi.oi_capa = oinfo->oi_capa; + lov_set_add_req(req, set); } if (!set->set_count) @@ -1352,35 +1411,41 @@ int lov_fini_sync_set(struct lov_request_set *set) /* FIXME update qos data here */ } - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); + lov_put_reqset(set); RETURN(rc); } +/* The callback for osc_sync that finilizes a request info when a + * response is recieved. */ +static int cb_sync_update(void *cookie, int rc) +{ + struct obd_info *oinfo = cookie; + struct lov_request *lovreq; + + lovreq = container_of(oinfo, struct lov_request, rq_oi); + return lov_update_common_set(lovreq->rq_rqset, lovreq, rc); +} + int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo, - struct obdo *src_oa, struct lov_stripe_md *lsm, obd_off start, obd_off end, struct lov_request_set **reqset) { struct lov_request_set *set; - struct lov_oinfo *loi = NULL; struct lov_obd *lov = &exp->exp_obd->u.lov; int rc = 0, i; ENTRY; - OBD_ALLOC(set, sizeof(*set)); + OBD_ALLOC_PTR(set); if (set == NULL) RETURN(-ENOMEM); lov_init_set(set); set->set_exp = exp; set->set_oi = oinfo; - set->set_oi->oi_md = lsm; - set->set_oi->oi_oa = src_oa; - loi = lsm->lsm_oinfo; - for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) { + for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) { + struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i]; struct lov_request *req; obd_off rs, re; @@ -1390,27 +1455,30 @@ int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo, continue; } - if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re)) + if (!lov_stripe_intersects(oinfo->oi_md, i, start, end, &rs, + &re)) continue; - OBD_ALLOC(req, sizeof(*req)); + OBD_ALLOC_PTR(req); if (req == NULL) GOTO(out_set, rc = -ENOMEM); req->rq_stripe = i; req->rq_idx = loi->loi_ost_idx; - req->rq_oi.oi_oa = obdo_alloc(); + OBDO_ALLOC(req->rq_oi.oi_oa); if (req->rq_oi.oi_oa == NULL) { OBD_FREE(req, sizeof(*req)); GOTO(out_set, rc = -ENOMEM); } - memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa)); + *req->rq_oi.oi_oa = *oinfo->oi_oa; req->rq_oi.oi_oa->o_id = loi->loi_id; + req->rq_oi.oi_oa->o_seq = loi->loi_seq; req->rq_oi.oi_oa->o_stripe_idx = i; req->rq_oi.oi_policy.l_extent.start = rs; req->rq_oi.oi_policy.l_extent.end = re; req->rq_oi.oi_policy.l_extent.gid = -1; + req->rq_oi.oi_cb_up = cb_sync_update; lov_set_add_req(req, set); } @@ -1444,10 +1512,10 @@ int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success) if (osfs->os_ffree != LOV_U64_MAX) do_div(osfs->os_ffree, expected_stripes); - spin_lock(&obd->obd_osfs_lock); + cfs_spin_lock(&obd->obd_osfs_lock); memcpy(&obd->obd_osfs, osfs, sizeof(*osfs)); - obd->obd_osfs_age = get_jiffies_64(); - spin_unlock(&obd->obd_osfs_lock); + obd->obd_osfs_age = cfs_time_current_64(); + cfs_spin_unlock(&obd->obd_osfs_lock); RETURN(0); } @@ -1466,24 +1534,46 @@ int lov_fini_statfs_set(struct lov_request_set *set) rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs, set->set_success); } - - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); - + lov_put_reqset(set); RETURN(rc); } -void lov_update_statfs(struct obd_device *obd, struct obd_statfs *osfs, - struct obd_statfs *lov_sfs, int success) +void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs, + int success) { - spin_lock(&obd->obd_osfs_lock); - memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs)); - obd->obd_osfs_age = get_jiffies_64(); - spin_unlock(&obd->obd_osfs_lock); + int shift = 0, quit = 0; + __u64 tmp; if (success == 0) { memcpy(osfs, lov_sfs, sizeof(*lov_sfs)); } else { + if (osfs->os_bsize != lov_sfs->os_bsize) { + /* assume all block sizes are always powers of 2 */ + /* get the bits difference */ + tmp = osfs->os_bsize | lov_sfs->os_bsize; + for (shift = 0; shift <= 64; ++shift) { + if (tmp & 1) { + if (quit) + break; + else + quit = 1; + shift = 0; + } + tmp >>= 1; + } + } + + if (osfs->os_bsize < lov_sfs->os_bsize) { + osfs->os_bsize = lov_sfs->os_bsize; + + osfs->os_bfree >>= shift; + osfs->os_bavail >>= shift; + osfs->os_blocks >>= shift; + } else if (shift != 0) { + lov_sfs->os_bfree >>= shift; + lov_sfs->os_bavail >>= shift; + lov_sfs->os_blocks >>= shift; + } #ifdef MIN_DF /* Sandia requested that df (and so, statfs) only returned minimal available space on @@ -1517,37 +1607,55 @@ void lov_update_statfs(struct obd_device *obd, struct obd_statfs *osfs, } /* The callback for osc_statfs_async that finilizes a request info when a - * response is recieved. */ -static int cb_statfs_update(struct obd_info *oinfo, int rc) + * response is received. */ +static int cb_statfs_update(void *cookie, int rc) { + struct obd_info *oinfo = cookie; struct lov_request *lovreq; struct obd_statfs *osfs, *lov_sfs; - struct obd_device *obd; struct lov_obd *lov; + struct lov_tgt_desc *tgt; + struct obd_device *lovobd, *tgtobd; int success; ENTRY; lovreq = container_of(oinfo, struct lov_request, rq_oi); - lov = &lovreq->rq_rqset->set_obd->u.lov; - obd = class_exp2obd(lov->lov_tgts[lovreq->rq_idx]->ltd_exp); - + lovobd = lovreq->rq_rqset->set_obd; + lov = &lovobd->u.lov; osfs = lovreq->rq_rqset->set_oi->oi_osfs; lov_sfs = oinfo->oi_osfs; - success = lovreq->rq_rqset->set_success; - /* XXX: the same is done in lov_update_common_set, however lovset->set_exp is not initialized. */ lov_update_set(lovreq->rq_rqset, lovreq, rc); - if (rc) { - if (rc && !(lov->lov_tgts[lovreq->rq_idx] && - lov->lov_tgts[lovreq->rq_idx]->ltd_active)) - rc = 0; - RETURN(rc); - } - - lov_update_statfs(obd, osfs, lov_sfs, success); + if (rc) + GOTO(out, rc); + + obd_getref(lovobd); + tgt = lov->lov_tgts[lovreq->rq_idx]; + if (!tgt || !tgt->ltd_active) + GOTO(out_update, rc); + + tgtobd = class_exp2obd(tgt->ltd_exp); + cfs_spin_lock(&tgtobd->obd_osfs_lock); + memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs)); + if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0) + tgtobd->obd_osfs_age = cfs_time_current_64(); + cfs_spin_unlock(&tgtobd->obd_osfs_lock); + +out_update: + lov_update_statfs(osfs, lov_sfs, success); qos_update(lov); + obd_putref(lovobd); + +out: + if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD && + lov_finished_set(lovreq->rq_rqset)) { + lov_statfs_interpret(NULL, lovreq->rq_rqset, + lovreq->rq_rqset->set_success != + lovreq->rq_rqset->set_count); + qos_statfs_done(lov); + } RETURN(0); } @@ -1572,11 +1680,19 @@ int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo, for (i = 0; i < lov->desc.ld_tgt_count; i++) { struct lov_request *req; - if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active) { + if (!lov->lov_tgts[i] || (!lov->lov_tgts[i]->ltd_active + && (oinfo->oi_flags & OBD_STATFS_NODELAY))) { CDEBUG(D_HA, "lov idx %d inactive\n", i); continue; } + /* skip targets that have been explicitely disabled by the + * administrator */ + if (!lov->lov_tgts[i]->ltd_exp) { + CDEBUG(D_HA, "lov idx %d administratively disabled\n", i); + continue; + } + OBD_ALLOC(req, sizeof(*req)); if (req == NULL) GOTO(out_set, rc = -ENOMEM); @@ -1589,7 +1705,7 @@ int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo, req->rq_idx = i; req->rq_oi.oi_cb_up = cb_statfs_update; - req->rq_rqset = set; + req->rq_oi.oi_flags = oinfo->oi_flags; lov_set_add_req(req, set); }