X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Flov%2Flov_request.c;h=672eca2832030b6ffc5d66f455a453eab0790d3e;hb=3ad0df02123a49a8c98ecf4661f677af509ac6d4;hp=a8c350db58ccb175af9d7d36d87c3434b361a5d2;hpb=8ea6a840e00e40220ab769ec329e10f0ad190c09;p=fs%2Flustre-release.git diff --git a/lustre/lov/lov_request.c b/lustre/lov/lov_request.c index a8c350d..672eca2 100644 --- a/lustre/lov/lov_request.c +++ b/lustre/lov/lov_request.c @@ -1,6 +1,4 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * +/* * GPL HEADER START * * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. @@ -28,15 +26,14 @@ /* * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2011, Whamcloud, Inc. */ /* * This file is part of Lustre, http://www.lustre.org/ * Lustre is a trademark of Sun Microsystems, Inc. */ -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif #define DEBUG_SUBSYSTEM S_LOV #ifdef __KERNEL__ @@ -54,8 +51,8 @@ static void lov_init_set(struct lov_request_set *set) { set->set_count = 0; - set->set_completes = 0; - set->set_success = 0; + cfs_atomic_set(&set->set_completes, 0); + cfs_atomic_set(&set->set_success, 0); set->set_cookies = 0; CFS_INIT_LIST_HEAD(&set->set_list); cfs_atomic_set(&set->set_refcount, 1); @@ -98,9 +95,11 @@ void lov_finish_set(struct lov_request_set *set) int lov_finished_set(struct lov_request_set *set) { - CDEBUG(D_INFO, "check set %d/%d\n", set->set_completes, + int completes = cfs_atomic_read(&set->set_completes); + + CDEBUG(D_INFO, "check set %d/%d\n", completes, set->set_count); - return set->set_completes == set->set_count; + return completes == set->set_count; } void lov_update_set(struct lov_request_set *set, @@ -109,9 +108,9 @@ void lov_update_set(struct lov_request_set *set, req->rq_complete = 1; req->rq_rc = rc; - set->set_completes++; + cfs_atomic_inc(&set->set_completes); if (rc == 0) - set->set_success++; + cfs_atomic_inc(&set->set_success); cfs_waitq_signal(&set->set_waitq); } @@ -213,11 +212,12 @@ static int enqueue_done(struct lov_request_set *set, __u32 mode) { struct lov_request *req; struct lov_obd *lov = &set->set_exp->exp_obd->u.lov; + int completes = cfs_atomic_read(&set->set_completes); int rc = 0; ENTRY; /* enqueue/match success, just return */ - if (set->set_completes && set->set_completes == set->set_success) + if (completes && completes == cfs_atomic_read(&set->set_success)) RETURN(0); /* cancel enqueued/matched locks */ @@ -259,7 +259,7 @@ int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc, * succeeded. */ if (!rqset) { if (rc) - set->set_completes = 0; + cfs_atomic_set(&set->set_completes, 0); ret = enqueue_done(set, mode); } else if (set->set_lockh) lov_llh_put(set->set_lockh); @@ -269,6 +269,37 @@ int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc, RETURN(rc ? rc : ret); } +static void lov_llh_addref(void *llhp) +{ + struct lov_lock_handles *llh = llhp; + + cfs_atomic_inc(&llh->llh_refcount); + CDEBUG(D_INFO, "GETting llh %p : new refcount %d\n", llh, + cfs_atomic_read(&llh->llh_refcount)); +} + +static struct portals_handle_ops lov_handle_ops = { + .hop_addref = lov_llh_addref, + .hop_free = NULL, +}; + +static struct lov_lock_handles *lov_llh_new(struct lov_stripe_md *lsm) +{ + struct lov_lock_handles *llh; + + OBD_ALLOC(llh, sizeof *llh + + sizeof(*llh->llh_handles) * lsm->lsm_stripe_count); + if (llh == NULL) + return NULL; + + cfs_atomic_set(&llh->llh_refcount, 2); + llh->llh_stripe_count = lsm->lsm_stripe_count; + CFS_INIT_LIST_HEAD(&llh->llh_handle.h_link); + class_handle_hash(&llh->llh_handle, &lov_handle_ops); + + return llh; +} + int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo, struct ldlm_enqueue_info *einfo, struct lov_request_set **reqset) @@ -360,20 +391,6 @@ out_set: RETURN(rc); } -int lov_update_match_set(struct lov_request_set *set, struct lov_request *req, - int rc) -{ - int ret = rc; - ENTRY; - - if (rc > 0) - ret = 0; - else if (rc == 0) - ret = 1; - lov_update_set(set, req, ret); - RETURN(rc); -} - int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags) { int rc = 0; @@ -383,7 +400,7 @@ int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags) RETURN(0); LASSERT(set->set_exp); rc = enqueue_done(set, mode); - if ((set->set_count == set->set_success) && + if ((set->set_count == cfs_atomic_read(&set->set_success)) && (flags & LDLM_FL_TEST_LOCK)) lov_llh_put(set->set_lockh); @@ -560,8 +577,11 @@ static int lov_update_create_set(struct lov_request_set *set, if (rc && lov->lov_tgts[req->rq_idx] && lov->lov_tgts[req->rq_idx]->ltd_active) { - CERROR("error creating fid "LPX64" sub-object" - " on OST idx %d/%d: rc = %d\n", + /* Pre-creating objects may timeout via -ETIMEDOUT or + * -ENOTCONN both are always non-critical events. */ + CDEBUG(rc == -ETIMEDOUT || rc == -ENOTCONN ? D_HA : D_ERROR, + "error creating fid "LPX64" sub-object " + "on OST idx %d/%d: rc = %d\n", set->set_oi->oi_oa->o_id, req->rq_idx, lsm->lsm_stripe_count, rc); if (rc > 0) { @@ -571,7 +591,7 @@ static int lov_update_create_set(struct lov_request_set *set, } cfs_spin_lock(&set->set_lock); - req->rq_stripe = set->set_success; + req->rq_stripe = cfs_atomic_read(&set->set_success); loi = lsm->lsm_oinfo[req->rq_stripe]; @@ -607,19 +627,19 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set, struct obdo *src_oa = set->set_oi->oi_oa; struct lov_request *req; struct obdo *ret_oa = NULL; - int attrset = 0, rc = 0; + int success, attrset = 0, rc = 0; ENTRY; - LASSERT(set->set_completes); + LASSERT(cfs_atomic_read(&set->set_completes)); /* try alloc objects on other osts if osc_create fails for * exceptions: RPC failure, ENOSPC, etc */ - if (set->set_count != set->set_success) { + if (set->set_count != cfs_atomic_read(&set->set_success)) { cfs_list_for_each_entry (req, &set->set_list, rq_link) { if (req->rq_rc == 0) continue; - set->set_completes--; + cfs_atomic_dec(&set->set_completes); req->rq_complete = 0; rc = qos_remedy_create(set, req); @@ -627,12 +647,13 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set, } } + success = cfs_atomic_read(&set->set_success); /* no successful creates */ - if (set->set_success == 0) + if (success == 0) GOTO(cleanup, rc); - if (set->set_count != set->set_success) { - set->set_count = set->set_success; + if (set->set_count != success) { + set->set_count = success; qos_shrink_lsm(set); } @@ -671,8 +692,8 @@ cleanup: continue; sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp; - err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL, - NULL); + err = obd_destroy(NULL, sub_exp, req->rq_oi.oi_oa, NULL, oti, + NULL, NULL); if (err) CERROR("Failed to uncreate objid "LPX64" subobj " LPX64" on OST idx %d: rc = %d\n", @@ -702,7 +723,7 @@ int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp) if (set == NULL) RETURN(0); LASSERT(set->set_exp); - if (set->set_completes) + if (cfs_atomic_read(&set->set_completes)) rc = create_done(set->set_exp, set, lsmp); lov_put_reqset(set); @@ -716,8 +737,8 @@ int cb_create_update(void *cookie, int rc) lovreq = container_of(oinfo, struct lov_request, rq_oi); - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_CREATE_FAIL)) - if (lovreq->rq_idx == obd_fail_val) + if (CFS_FAIL_CHECK(OBD_FAIL_MDS_OSC_CREATE_FAIL)) + if (lovreq->rq_idx == cfs_fail_val) rc = -ENOTCONN; rc= lov_update_create_set(lovreq->rq_rqset, lovreq, rc); @@ -772,7 +793,7 @@ static int common_attr_done(struct lov_request_set *set) if (set->set_oi->oi_oa == NULL) RETURN(0); - if (!set->set_success) + if (!cfs_atomic_read(&set->set_success)) RETURN(-EIO); OBDO_ALLOC(tmp_oa); @@ -842,7 +863,7 @@ int lov_fini_brw_set(struct lov_request_set *set) if (set == NULL) RETURN(0); LASSERT(set->set_exp); - if (set->set_completes) { + if (cfs_atomic_read(&set->set_completes)) { rc = brw_done(set); /* FIXME update qos data here */ } @@ -983,7 +1004,7 @@ int lov_fini_getattr_set(struct lov_request_set *set) if (set == NULL) RETURN(0); LASSERT(set->set_exp); - if (set->set_completes) + if (cfs_atomic_read(&set->set_completes)) rc = common_attr_done(set); lov_put_reqset(set); @@ -1068,7 +1089,7 @@ int lov_fini_destroy_set(struct lov_request_set *set) if (set == NULL) RETURN(0); LASSERT(set->set_exp); - if (set->set_completes) { + if (cfs_atomic_read(&set->set_completes)) { /* FIXME update qos data here */ } @@ -1145,7 +1166,7 @@ int lov_fini_setattr_set(struct lov_request_set *set) if (set == NULL) RETURN(0); LASSERT(set->set_exp); - if (set->set_completes) { + if (cfs_atomic_read(&set->set_completes)) { rc = common_attr_done(set); /* FIXME update qos data here */ } @@ -1273,10 +1294,10 @@ int lov_fini_punch_set(struct lov_request_set *set) if (set == NULL) RETURN(0); LASSERT(set->set_exp); - if (set->set_completes) { + if (cfs_atomic_read(&set->set_completes)) { rc = -EIO; /* FIXME update qos data here */ - if (set->set_success) + if (cfs_atomic_read(&set->set_success)) rc = common_attr_done(set); } @@ -1402,8 +1423,8 @@ int lov_fini_sync_set(struct lov_request_set *set) if (set == NULL) RETURN(0); LASSERT(set->set_exp); - if (set->set_completes) { - if (!set->set_success) + if (cfs_atomic_read(&set->set_completes)) { + if (!cfs_atomic_read(&set->set_success)) rc = -EIO; /* FIXME update qos data here */ } @@ -1502,12 +1523,12 @@ int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success) ENTRY; if (success) { - __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0); - - if (osfs->os_files != LOV_U64_MAX) - do_div(osfs->os_files, expected_stripes); - if (osfs->os_ffree != LOV_U64_MAX) - do_div(osfs->os_ffree, expected_stripes); + __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, + LOV_MAGIC, 0); + if (osfs->os_files != LOV_U64_MAX) + lov_do_div64(osfs->os_files, expected_stripes); + if (osfs->os_ffree != LOV_U64_MAX) + lov_do_div64(osfs->os_ffree, expected_stripes); cfs_spin_lock(&obd->obd_osfs_lock); memcpy(&obd->obd_osfs, osfs, sizeof(*osfs)); @@ -1527,9 +1548,9 @@ int lov_fini_statfs_set(struct lov_request_set *set) if (set == NULL) RETURN(0); - if (set->set_completes) { + if (cfs_atomic_read(&set->set_completes)) { rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs, - set->set_success); + cfs_atomic_read(&set->set_success)); } lov_put_reqset(set); RETURN(rc); @@ -1609,6 +1630,7 @@ static int cb_statfs_update(void *cookie, int rc) { struct obd_info *oinfo = cookie; struct lov_request *lovreq; + struct lov_request_set *set; struct obd_statfs *osfs, *lov_sfs; struct lov_obd *lov; struct lov_tgt_desc *tgt; @@ -1617,14 +1639,15 @@ static int cb_statfs_update(void *cookie, int rc) ENTRY; lovreq = container_of(oinfo, struct lov_request, rq_oi); - lovobd = lovreq->rq_rqset->set_obd; + set = lovreq->rq_rqset; + lovobd = set->set_obd; lov = &lovobd->u.lov; - osfs = lovreq->rq_rqset->set_oi->oi_osfs; + osfs = set->set_oi->oi_osfs; lov_sfs = oinfo->oi_osfs; - success = lovreq->rq_rqset->set_success; + success = cfs_atomic_read(&set->set_success); /* XXX: the same is done in lov_update_common_set, however lovset->set_exp is not initialized. */ - lov_update_set(lovreq->rq_rqset, lovreq, rc); + lov_update_set(set, lovreq, rc); if (rc) GOTO(out, rc); @@ -1646,12 +1669,12 @@ out_update: obd_putref(lovobd); out: - if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD && - lov_finished_set(lovreq->rq_rqset)) { - lov_statfs_interpret(NULL, lovreq->rq_rqset, - lovreq->rq_rqset->set_success != - lovreq->rq_rqset->set_count); - qos_statfs_done(lov); + if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD && + lov_finished_set(set)) { + lov_statfs_interpret(NULL, set, set->set_count != + cfs_atomic_read(&set->set_success)); + if (lov->lov_qos.lq_statfs_in_progress) + qos_statfs_done(lov); } RETURN(0);