X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Flov%2Flov_request.c;h=5d2334e69ad27d6869193f4aa635b68332ed9aec;hb=abd9d33f92f5d49310e2a315f8e74dc64f56e764;hp=a6b3ade1a4249ef903012094afd0998caf3fde27;hpb=0a859380c36ac24871f221b35042f76c56b04438;p=fs%2Flustre-release.git diff --git a/lustre/lov/lov_request.c b/lustre/lov/lov_request.c index a6b3ade..5d2334e 100644 --- a/lustre/lov/lov_request.c +++ b/lustre/lov/lov_request.c @@ -26,8 +26,10 @@ * GPL HEADER END */ /* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2011, Whamcloud, Inc. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -54,8 +56,8 @@ static void lov_init_set(struct lov_request_set *set) { set->set_count = 0; - set->set_completes = 0; - set->set_success = 0; + cfs_atomic_set(&set->set_completes, 0); + cfs_atomic_set(&set->set_success, 0); set->set_cookies = 0; CFS_INIT_LIST_HEAD(&set->set_list); cfs_atomic_set(&set->set_refcount, 1); @@ -78,7 +80,7 @@ void lov_finish_set(struct lov_request_set *set) if (req->rq_oi.oi_oa) OBDO_FREE(req->rq_oi.oi_oa); if (req->rq_oi.oi_md) - OBD_FREE(req->rq_oi.oi_md, req->rq_buflen); + OBD_FREE_LARGE(req->rq_oi.oi_md, req->rq_buflen); if (req->rq_oi.oi_osfs) OBD_FREE(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs)); @@ -87,7 +89,7 @@ void lov_finish_set(struct lov_request_set *set) if (set->set_pga) { int len = set->set_oabufs * sizeof(*set->set_pga); - OBD_FREE(set->set_pga, len); + OBD_FREE_LARGE(set->set_pga, len); } if (set->set_lockh) lov_llh_put(set->set_lockh); @@ -98,21 +100,22 @@ void lov_finish_set(struct lov_request_set *set) int lov_finished_set(struct lov_request_set *set) { - CDEBUG(D_INFO, "check set %d/%d\n", set->set_completes, + int completes = cfs_atomic_read(&set->set_completes); + + CDEBUG(D_INFO, "check set %d/%d\n", completes, set->set_count); - return set->set_completes == set->set_count; + return completes == set->set_count; } - void lov_update_set(struct lov_request_set *set, struct lov_request *req, int rc) { req->rq_complete = 1; req->rq_rc = rc; - set->set_completes++; + cfs_atomic_inc(&set->set_completes); if (rc == 0) - set->set_success++; + cfs_atomic_inc(&set->set_success); cfs_waitq_signal(&set->set_waitq); } @@ -214,11 +217,12 @@ static int enqueue_done(struct lov_request_set *set, __u32 mode) { struct lov_request *req; struct lov_obd *lov = &set->set_exp->exp_obd->u.lov; + int completes = cfs_atomic_read(&set->set_completes); int rc = 0; ENTRY; /* enqueue/match success, just return */ - if (set->set_completes && set->set_completes == set->set_success) + if (completes && completes == cfs_atomic_read(&set->set_success)) RETURN(0); /* cancel enqueued/matched locks */ @@ -260,7 +264,7 @@ int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc, * succeeded. */ if (!rqset) { if (rc) - set->set_completes = 0; + cfs_atomic_set(&set->set_completes, 0); ret = enqueue_done(set, mode); } else if (set->set_lockh) lov_llh_put(set->set_lockh); @@ -317,7 +321,7 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo, req->rq_buflen = sizeof(*req->rq_oi.oi_md) + sizeof(struct lov_oinfo *) + sizeof(struct lov_oinfo); - OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen); + OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen); if (req->rq_oi.oi_md == NULL) { OBD_FREE(req, sizeof(*req)); GOTO(out_set, rc = -ENOMEM); @@ -361,20 +365,6 @@ out_set: RETURN(rc); } -int lov_update_match_set(struct lov_request_set *set, struct lov_request *req, - int rc) -{ - int ret = rc; - ENTRY; - - if (rc > 0) - ret = 0; - else if (rc == 0) - ret = 1; - lov_update_set(set, req, ret); - RETURN(rc); -} - int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags) { int rc = 0; @@ -384,7 +374,7 @@ int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags) RETURN(0); LASSERT(set->set_exp); rc = enqueue_done(set, mode); - if ((set->set_count == set->set_success) && + if ((set->set_count == cfs_atomic_read(&set->set_success)) && (flags & LDLM_FL_TEST_LOCK)) lov_llh_put(set->set_lockh); @@ -438,7 +428,7 @@ int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo, GOTO(out_set, rc = -ENOMEM); req->rq_buflen = sizeof(*req->rq_oi.oi_md); - OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen); + OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen); if (req->rq_oi.oi_md == NULL) { OBD_FREE(req, sizeof(*req)); GOTO(out_set, rc = -ENOMEM); @@ -525,7 +515,7 @@ int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo, GOTO(out_set, rc = -ENOMEM); req->rq_buflen = sizeof(*req->rq_oi.oi_md); - OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen); + OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen); if (req->rq_oi.oi_md == NULL) { OBD_FREE(req, sizeof(*req)); GOTO(out_set, rc = -ENOMEM); @@ -550,6 +540,59 @@ out_set: RETURN(rc); } +static int lov_update_create_set(struct lov_request_set *set, + struct lov_request *req, int rc) +{ + struct obd_trans_info *oti = set->set_oti; + struct lov_stripe_md *lsm = set->set_oi->oi_md; + struct lov_oinfo *loi; + struct lov_obd *lov = &set->set_exp->exp_obd->u.lov; + ENTRY; + + if (rc && lov->lov_tgts[req->rq_idx] && + lov->lov_tgts[req->rq_idx]->ltd_active) { + /* Pre-creating objects may timeout via -ETIMEDOUT or + * -ENOTCONN both are always non-critical events. */ + CDEBUG(rc == -ETIMEDOUT || rc == -ENOTCONN ? D_HA : D_ERROR, + "error creating fid "LPX64" sub-object " + "on OST idx %d/%d: rc = %d\n", + set->set_oi->oi_oa->o_id, req->rq_idx, + lsm->lsm_stripe_count, rc); + if (rc > 0) { + CERROR("obd_create returned invalid err %d\n", rc); + rc = -EIO; + } + } + + cfs_spin_lock(&set->set_lock); + req->rq_stripe = cfs_atomic_read(&set->set_success); + loi = lsm->lsm_oinfo[req->rq_stripe]; + + + if (rc) { + lov_update_set(set, req, rc); + cfs_spin_unlock(&set->set_lock); + RETURN(rc); + } + + loi->loi_id = req->rq_oi.oi_oa->o_id; + loi->loi_seq = req->rq_oi.oi_oa->o_seq; + loi->loi_ost_idx = req->rq_idx; + loi_init(loi); + + if (oti && set->set_cookies) + ++oti->oti_logcookies; + if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE) + set->set_cookie_sent++; + + lov_update_set(set, req, rc); + cfs_spin_unlock(&set->set_lock); + + CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n", + lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx); + RETURN(rc); +} + static int create_done(struct obd_export *exp, struct lov_request_set *set, struct lov_stripe_md **lsmp) { @@ -558,19 +601,19 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set, struct obdo *src_oa = set->set_oi->oi_oa; struct lov_request *req; struct obdo *ret_oa = NULL; - int attrset = 0, rc = 0; + int success, attrset = 0, rc = 0; ENTRY; - LASSERT(set->set_completes); + LASSERT(cfs_atomic_read(&set->set_completes)); /* try alloc objects on other osts if osc_create fails for * exceptions: RPC failure, ENOSPC, etc */ - if (set->set_count != set->set_success) { + if (set->set_count != cfs_atomic_read(&set->set_success)) { cfs_list_for_each_entry (req, &set->set_list, rq_link) { if (req->rq_rc == 0) continue; - set->set_completes--; + cfs_atomic_dec(&set->set_completes); req->rq_complete = 0; rc = qos_remedy_create(set, req); @@ -578,12 +621,13 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set, } } + success = cfs_atomic_read(&set->set_success); /* no successful creates */ - if (set->set_success == 0) + if (success == 0) GOTO(cleanup, rc); - if (set->set_count != set->set_success) { - set->set_count = set->set_success; + if (set->set_count != success) { + set->set_count = success; qos_shrink_lsm(set); } @@ -653,74 +697,30 @@ int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp) if (set == NULL) RETURN(0); LASSERT(set->set_exp); - if (set->set_completes) + if (cfs_atomic_read(&set->set_completes)) rc = create_done(set->set_exp, set, lsmp); lov_put_reqset(set); RETURN(rc); } -int lov_update_create_set(struct lov_request_set *set, - struct lov_request *req, int rc) -{ - struct obd_trans_info *oti = set->set_oti; - struct lov_stripe_md *lsm = set->set_oi->oi_md; - struct lov_oinfo *loi; - struct lov_obd *lov = &set->set_exp->exp_obd->u.lov; - ENTRY; - - if (rc && lov->lov_tgts[req->rq_idx] && - lov->lov_tgts[req->rq_idx]->ltd_active) { - CERROR("error creating fid "LPX64" sub-object" - " on OST idx %d/%d: rc = %d\n", - set->set_oi->oi_oa->o_id, req->rq_idx, - lsm->lsm_stripe_count, rc); - if (rc > 0) { - CERROR("obd_create returned invalid err %d\n", rc); - rc = -EIO; - } - } - - cfs_spin_lock(&set->set_lock); - req->rq_stripe = set->set_success; - loi = lsm->lsm_oinfo[req->rq_stripe]; - if (rc) { - lov_update_set(set, req, rc); - cfs_spin_unlock(&set->set_lock); - RETURN(rc); - } - - loi->loi_id = req->rq_oi.oi_oa->o_id; - loi->loi_seq = req->rq_oi.oi_oa->o_seq; - loi->loi_ost_idx = req->rq_idx; - loi_init(loi); - - if (oti && set->set_cookies) - ++oti->oti_logcookies; - if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE) - set->set_cookie_sent++; - - lov_update_set(set, req, rc); - cfs_spin_unlock(&set->set_lock); - - CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n", - lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx); - RETURN(rc); -} - int cb_create_update(void *cookie, int rc) { struct obd_info *oinfo = cookie; struct lov_request *lovreq; lovreq = container_of(oinfo, struct lov_request, rq_oi); + + if (CFS_FAIL_CHECK(OBD_FAIL_MDS_OSC_CREATE_FAIL)) + if (lovreq->rq_idx == cfs_fail_val) + rc = -ENOTCONN; + rc= lov_update_create_set(lovreq->rq_rqset, lovreq, rc); if (lov_finished_set(lovreq->rq_rqset)) lov_put_reqset(lovreq->rq_rqset); return rc; } - int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo, struct lov_stripe_md **lsmp, struct obdo *src_oa, struct obd_trans_info *oti, @@ -767,7 +767,7 @@ static int common_attr_done(struct lov_request_set *set) if (set->set_oi->oi_oa == NULL) RETURN(0); - if (!set->set_success) + if (!cfs_atomic_read(&set->set_success)) RETURN(-EIO); OBDO_ALLOC(tmp_oa); @@ -837,7 +837,7 @@ int lov_fini_brw_set(struct lov_request_set *set) if (set == NULL) RETURN(0); LASSERT(set->set_exp); - if (set->set_completes) { + if (cfs_atomic_read(&set->set_completes)) { rc = brw_done(set); /* FIXME update qos data here */ } @@ -870,11 +870,11 @@ int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo, set->set_oti = oti; set->set_oi = oinfo; set->set_oabufs = oa_bufs; - OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga)); + OBD_ALLOC_LARGE(set->set_pga, oa_bufs * sizeof(*set->set_pga)); if (!set->set_pga) GOTO(out, rc = -ENOMEM); - OBD_ALLOC(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count); + OBD_ALLOC_LARGE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count); if (!info) GOTO(out, rc = -ENOMEM); @@ -919,7 +919,7 @@ int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo, req->rq_oi.oi_oa->o_stripe_idx = i; req->rq_buflen = sizeof(*req->rq_oi.oi_md); - OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen); + OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen); if (req->rq_oi.oi_md == NULL) { OBDO_FREE(req->rq_oi.oi_oa); OBD_FREE(req, sizeof(*req)); @@ -959,7 +959,8 @@ int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo, } out: if (info) - OBD_FREE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count); + OBD_FREE_LARGE(info, + sizeof(*info) * oinfo->oi_md->lsm_stripe_count); if (rc == 0) *reqset = set; @@ -977,7 +978,7 @@ int lov_fini_getattr_set(struct lov_request_set *set) if (set == NULL) RETURN(0); LASSERT(set->set_exp); - if (set->set_completes) + if (cfs_atomic_read(&set->set_completes)) rc = common_attr_done(set); lov_put_reqset(set); @@ -1062,7 +1063,7 @@ int lov_fini_destroy_set(struct lov_request_set *set) if (set == NULL) RETURN(0); LASSERT(set->set_exp); - if (set->set_completes) { + if (cfs_atomic_read(&set->set_completes)) { /* FIXME update qos data here */ } @@ -1139,7 +1140,7 @@ int lov_fini_setattr_set(struct lov_request_set *set) if (set == NULL) RETURN(0); LASSERT(set->set_exp); - if (set->set_completes) { + if (cfs_atomic_read(&set->set_completes)) { rc = common_attr_done(set); /* FIXME update qos data here */ } @@ -1267,10 +1268,10 @@ int lov_fini_punch_set(struct lov_request_set *set) if (set == NULL) RETURN(0); LASSERT(set->set_exp); - if (set->set_completes) { + if (cfs_atomic_read(&set->set_completes)) { rc = -EIO; /* FIXME update qos data here */ - if (set->set_success) + if (cfs_atomic_read(&set->set_success)) rc = common_attr_done(set); } @@ -1396,8 +1397,8 @@ int lov_fini_sync_set(struct lov_request_set *set) if (set == NULL) RETURN(0); LASSERT(set->set_exp); - if (set->set_completes) { - if (!set->set_success) + if (cfs_atomic_read(&set->set_completes)) { + if (!cfs_atomic_read(&set->set_success)) rc = -EIO; /* FIXME update qos data here */ } @@ -1407,8 +1408,18 @@ int lov_fini_sync_set(struct lov_request_set *set) RETURN(rc); } +/* The callback for osc_sync that finilizes a request info when a + * response is recieved. */ +static int cb_sync_update(void *cookie, int rc) +{ + struct obd_info *oinfo = cookie; + struct lov_request *lovreq; + + lovreq = container_of(oinfo, struct lov_request, rq_oi); + return lov_update_common_set(lovreq->rq_rqset, lovreq, rc); +} + int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo, - struct obdo *src_oa, struct lov_stripe_md *lsm, obd_off start, obd_off end, struct lov_request_set **reqset) { @@ -1417,18 +1428,16 @@ int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo, int rc = 0, i; ENTRY; - OBD_ALLOC(set, sizeof(*set)); + OBD_ALLOC_PTR(set); if (set == NULL) RETURN(-ENOMEM); lov_init_set(set); set->set_exp = exp; set->set_oi = oinfo; - set->set_oi->oi_md = lsm; - set->set_oi->oi_oa = src_oa; - for (i = 0; i < lsm->lsm_stripe_count; i++) { - struct lov_oinfo *loi = lsm->lsm_oinfo[i]; + for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) { + struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i]; struct lov_request *req; obd_off rs, re; @@ -1438,10 +1447,11 @@ int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo, continue; } - if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re)) + if (!lov_stripe_intersects(oinfo->oi_md, i, start, end, &rs, + &re)) continue; - OBD_ALLOC(req, sizeof(*req)); + OBD_ALLOC_PTR(req); if (req == NULL) GOTO(out_set, rc = -ENOMEM); req->rq_stripe = i; @@ -1452,7 +1462,7 @@ int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo, OBD_FREE(req, sizeof(*req)); GOTO(out_set, rc = -ENOMEM); } - memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa)); + *req->rq_oi.oi_oa = *oinfo->oi_oa; req->rq_oi.oi_oa->o_id = loi->loi_id; req->rq_oi.oi_oa->o_seq = loi->loi_seq; req->rq_oi.oi_oa->o_stripe_idx = i; @@ -1460,6 +1470,7 @@ int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo, req->rq_oi.oi_policy.l_extent.start = rs; req->rq_oi.oi_policy.l_extent.end = re; req->rq_oi.oi_policy.l_extent.gid = -1; + req->rq_oi.oi_cb_up = cb_sync_update; lov_set_add_req(req, set); } @@ -1486,8 +1497,8 @@ int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success) ENTRY; if (success) { - __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0); - + __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, + LOV_MAGIC, 0); if (osfs->os_files != LOV_U64_MAX) do_div(osfs->os_files, expected_stripes); if (osfs->os_ffree != LOV_U64_MAX) @@ -1511,9 +1522,9 @@ int lov_fini_statfs_set(struct lov_request_set *set) if (set == NULL) RETURN(0); - if (set->set_completes) { + if (cfs_atomic_read(&set->set_completes)) { rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs, - set->set_success); + cfs_atomic_read(&set->set_success)); } lov_put_reqset(set); RETURN(rc); @@ -1593,6 +1604,7 @@ static int cb_statfs_update(void *cookie, int rc) { struct obd_info *oinfo = cookie; struct lov_request *lovreq; + struct lov_request_set *set; struct obd_statfs *osfs, *lov_sfs; struct lov_obd *lov; struct lov_tgt_desc *tgt; @@ -1601,14 +1613,15 @@ static int cb_statfs_update(void *cookie, int rc) ENTRY; lovreq = container_of(oinfo, struct lov_request, rq_oi); - lovobd = lovreq->rq_rqset->set_obd; + set = lovreq->rq_rqset; + lovobd = set->set_obd; lov = &lovobd->u.lov; - osfs = lovreq->rq_rqset->set_oi->oi_osfs; + osfs = set->set_oi->oi_osfs; lov_sfs = oinfo->oi_osfs; - success = lovreq->rq_rqset->set_success; + success = cfs_atomic_read(&set->set_success); /* XXX: the same is done in lov_update_common_set, however lovset->set_exp is not initialized. */ - lov_update_set(lovreq->rq_rqset, lovreq, rc); + lov_update_set(set, lovreq, rc); if (rc) GOTO(out, rc); @@ -1630,12 +1643,12 @@ out_update: obd_putref(lovobd); out: - if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD && - lov_finished_set(lovreq->rq_rqset)) { - lov_statfs_interpret(NULL, lovreq->rq_rqset, - lovreq->rq_rqset->set_success != - lovreq->rq_rqset->set_count); - qos_statfs_done(lov); + if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD && + lov_finished_set(set)) { + lov_statfs_interpret(NULL, set, set->set_count != + cfs_atomic_read(&set->set_success)); + if (lov->lov_qos.lq_statfs_in_progress) + qos_statfs_done(lov); } RETURN(0);