X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;ds=sidebyside;f=lustre%2Fmdc%2Fmdc_locks.c;h=f2d666f5f6f11d7b95fbd787ec899720a862abd9;hb=a104459086c0e550e11d73eba635a72a1a282eb0;hp=746012e7b29bb352f2ca238e61a5f0c301df9906;hpb=2989b9dab9e87529ccadfc5711960b71e5e57b18;p=fs%2Flustre-release.git diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index 746012e..f2d666f 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -27,7 +27,7 @@ * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2012, Whamcloud, Inc. + * Copyright (c) 2011, 2013, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -117,7 +117,8 @@ EXPORT_SYMBOL(it_open_error); int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data, __u64 *bits) { - struct ldlm_lock *lock; + struct ldlm_lock *lock; + struct inode *new_inode = data; ENTRY; if(bits) @@ -131,18 +132,18 @@ int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data, LASSERT(lock != NULL); lock_res_and_lock(lock); #ifdef __KERNEL__ - if (lock->l_ast_data && lock->l_ast_data != data) { - struct inode *new_inode = data; - struct inode *old_inode = lock->l_ast_data; - LASSERTF(old_inode->i_state & I_FREEING, - "Found existing inode %p/%lu/%u state %lu in lock: " - "setting data to %p/%lu/%u\n", old_inode, - old_inode->i_ino, old_inode->i_generation, - old_inode->i_state, - new_inode, new_inode->i_ino, new_inode->i_generation); - } + if (lock->l_resource->lr_lvb_inode && + lock->l_resource->lr_lvb_inode != data) { + struct inode *old_inode = lock->l_resource->lr_lvb_inode; + LASSERTF(old_inode->i_state & I_FREEING, + "Found existing inode %p/%lu/%u state %lu in lock: " + "setting data to %p/%lu/%u\n", old_inode, + old_inode->i_ino, old_inode->i_generation, + old_inode->i_state, + new_inode, new_inode->i_ino, new_inode->i_generation); + } #endif - lock->l_ast_data = data; + lock->l_resource->lr_lvb_inode = new_inode; if (bits) *bits = lock->l_policy_data.l_inodebits.bits; @@ -152,7 +153,7 @@ int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data, RETURN(0); } -ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags, +ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags, const struct lu_fid *fid, ldlm_type_t type, ldlm_policy_data_t *policy, ldlm_mode_t mode, struct lustre_handle *lockh) @@ -186,19 +187,28 @@ int mdc_cancel_unused(struct obd_export *exp, RETURN(rc); } -int mdc_change_cbdata(struct obd_export *exp, - const struct lu_fid *fid, - ldlm_iterator_t it, void *data) +int mdc_null_inode(struct obd_export *exp, + const struct lu_fid *fid) { - struct ldlm_res_id res_id; - ENTRY; + struct ldlm_res_id res_id; + struct ldlm_resource *res; + struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace; + ENTRY; - fid_build_reg_res_name(fid, &res_id); - ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, - &res_id, it, data); + LASSERTF(ns != NULL, "no namespace passed\n"); - EXIT; - return 0; + fid_build_reg_res_name(fid, &res_id); + + res = ldlm_resource_get(ns, NULL, &res_id, 0, 0); + if(res == NULL) + RETURN(0); + + lock_res(res); + res->lr_lvb_inode = NULL; + unlock_res(res); + + ldlm_resource_putref(res); + RETURN(0); } /* find any ldlm lock of the inode in mdc @@ -225,11 +235,11 @@ int mdc_find_cbdata(struct obd_export *exp, static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc) { - /* Don't hold error requests for replay. */ - if (req->rq_replay) { - cfs_spin_lock(&req->rq_lock); - req->rq_replay = 0; - cfs_spin_unlock(&req->rq_lock); + /* Don't hold error requests for replay. */ + if (req->rq_replay) { + spin_lock(&req->rq_lock); + req->rq_replay = 0; + spin_unlock(&req->rq_lock); } if (rc && req->rq_transno != 0) { DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc); @@ -330,9 +340,9 @@ static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp, return NULL; } - cfs_spin_lock(&req->rq_lock); - req->rq_replay = req->rq_import->imp_replayable; - cfs_spin_unlock(&req->rq_lock); + spin_lock(&req->rq_lock); + req->rq_replay = req->rq_import->imp_replayable; + spin_unlock(&req->rq_lock); /* pack the intent */ lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); @@ -437,7 +447,47 @@ static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp, RETURN(req); } -static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp) +static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp, + struct lookup_intent *it, + struct md_op_data *unused) +{ + struct obd_device *obd = class_exp2obd(exp); + struct ptlrpc_request *req; + struct ldlm_intent *lit; + struct layout_intent *layout; + int rc; + ENTRY; + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), + &RQF_LDLM_INTENT_LAYOUT); + if (req == NULL) + RETURN(ERR_PTR(-ENOMEM)); + + req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0); + rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); + if (rc) { + ptlrpc_request_free(req); + RETURN(ERR_PTR(rc)); + } + + /* pack the intent */ + lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); + lit->opc = (__u64)it->it_op; + + /* pack the layout intent request */ + layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT); + /* LAYOUT_INTENT_ACCESS is generic, specific operation will be + * set for replication */ + layout->li_opc = LAYOUT_INTENT_ACCESS; + + req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, + obd->u.cli.cl_max_mds_easize); + ptlrpc_request_set_replen(req); + RETURN(req); +} + +static struct ptlrpc_request * +mdc_enqueue_pack(struct obd_export *exp, int lvb_len) { struct ptlrpc_request *req; int rc; @@ -453,6 +503,7 @@ static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp) RETURN(ERR_PTR(rc)); } + req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len); ptlrpc_request_set_replen(req); RETURN(req); } @@ -467,8 +518,10 @@ static int mdc_finish_enqueue(struct obd_export *exp, struct req_capsule *pill = &req->rq_pill; struct ldlm_request *lockreq; struct ldlm_reply *lockrep; - __u64 bits = 0; struct lustre_intent_data *intent = &it->d.lustre; + struct ldlm_lock *lock; + void *lvb_data = NULL; + int lvb_len = 0; ENTRY; LASSERT(rc >= 0); @@ -476,7 +529,7 @@ static int mdc_finish_enqueue(struct obd_export *exp, * actually get a lock, just perform the intent. */ if (req->rq_transno || req->rq_replay) { lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ); - lockreq->lock_flags |= LDLM_FL_INTENT_ONLY; + lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY); } if (rc == ELDLM_LOCK_ABORTED) { @@ -484,8 +537,8 @@ static int mdc_finish_enqueue(struct obd_export *exp, memset(lockh, 0, sizeof(*lockh)); rc = 0; } else { /* rc = 0 */ - struct ldlm_lock *lock = ldlm_handle2lock(lockh); - LASSERT(lock); + lock = ldlm_handle2lock(lockh); + LASSERT(lock != NULL); /* If the server gave us back a different lock mode, we should * fix up our variables. */ @@ -494,7 +547,6 @@ static int mdc_finish_enqueue(struct obd_export *exp, ldlm_lock_decref(lockh, einfo->ei_mode); einfo->ei_mode = lock->l_req_mode; } - bits = lock->l_policy_data.l_inodebits.bits; LDLM_LOCK_PUT(lock); } @@ -507,7 +559,9 @@ static int mdc_finish_enqueue(struct obd_export *exp, intent->it_lock_handle = lockh->cookie; intent->it_data = req; - if (intent->it_status < 0 && req->rq_replay) + /* Technically speaking rq_transno must already be zero if + * it_status is in error, so the check is a bit redundant */ + if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay) mdc_clear_replay_flag(req, intent->it_status); /* If we're doing an IT_OPEN which did not result in an actual @@ -545,12 +599,10 @@ static int mdc_finish_enqueue(struct obd_export *exp, mdc_set_open_replay_data(NULL, NULL, req); } - /* TODO: make sure LAYOUT lock must be granted along with EA */ - if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) { void *eadata; - mdc_update_max_ea_from_body(exp, body); + mdc_update_max_ea_from_body(exp, body); /* * The eadata is opaque; just check that it is there. @@ -561,6 +613,11 @@ static int mdc_finish_enqueue(struct obd_export *exp, if (eadata == NULL) RETURN(-EPROTO); + /* save lvb data and length in case this is for layout + * lock */ + lvb_data = eadata; + lvb_len = body->eadatasize; + /* * We save the reply LOV EA in case we have to replay a * create for recovery. If we didn't allocate a large @@ -622,44 +679,45 @@ static int mdc_finish_enqueue(struct obd_export *exp, RETURN(-EPROTO); } } else if (it->it_op & IT_LAYOUT) { - struct ldlm_lock *lock = ldlm_handle2lock(lockh); - - if (lock != NULL && lock->l_lvb_data == NULL) { - int lvb_len; - - /* maybe the lock was granted right away and layout - * is packed into RMF_DLM_LVB of req */ - lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, - RCL_SERVER); - if (lvb_len > 0) { - void *lvb; - void *lmm; - - lvb = req_capsule_server_get(pill, - &RMF_DLM_LVB); - if (lvb == NULL) { - LDLM_LOCK_PUT(lock); - RETURN(-EPROTO); - } - - OBD_ALLOC_LARGE(lmm, lvb_len); - if (lmm == NULL) { - LDLM_LOCK_PUT(lock); - RETURN(-ENOMEM); - } - memcpy(lmm, lvb, lvb_len); - - /* install lvb_data */ - lock_res_and_lock(lock); - LASSERT(lock->l_lvb_data == NULL); - lock->l_lvb_data = lmm; - lock->l_lvb_len = lvb_len; - unlock_res_and_lock(lock); - } + /* maybe the lock was granted right away and layout + * is packed into RMF_DLM_LVB of req */ + lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER); + if (lvb_len > 0) { + lvb_data = req_capsule_server_sized_get(pill, + &RMF_DLM_LVB, lvb_len); + if (lvb_data == NULL) + RETURN(-EPROTO); } - if (lock != NULL) + } + + /* fill in stripe data for layout lock */ + lock = ldlm_handle2lock(lockh); + if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) { + void *lmm; + + LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n", + ldlm_it2str(it->it_op), lvb_len); + + OBD_ALLOC_LARGE(lmm, lvb_len); + if (lmm == NULL) { LDLM_LOCK_PUT(lock); + RETURN(-ENOMEM); + } + memcpy(lmm, lvb_data, lvb_len); + + /* install lvb_data */ + lock_res_and_lock(lock); + if (lock->l_lvb_data == NULL) { + lock->l_lvb_data = lmm; + lock->l_lvb_len = lvb_len; + lmm = NULL; + } + unlock_res_and_lock(lock); + if (lmm != NULL) + OBD_FREE_LARGE(lmm, lvb_len); } + if (lock != NULL) + LDLM_LOCK_PUT(lock); RETURN(rc); } @@ -669,11 +727,11 @@ static int mdc_finish_enqueue(struct obd_export *exp, int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, struct lookup_intent *it, struct md_op_data *op_data, struct lustre_handle *lockh, void *lmm, int lmmsize, - struct ptlrpc_request **reqp, int extra_lock_flags) + struct ptlrpc_request **reqp, __u64 extra_lock_flags) { struct obd_device *obddev = class_exp2obd(exp); struct ptlrpc_request *req = NULL; - int flags, saved_flags = extra_lock_flags; + __u64 flags, saved_flags = extra_lock_flags; int rc; struct ldlm_res_id res_id; static const ldlm_policy_data_t lookup_policy = @@ -685,6 +743,7 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, ldlm_policy_data_t const *policy = &lookup_policy; int generation, resends = 0; struct ldlm_reply *lockrep; + enum lvb_type lvb_type = 0; ENTRY; LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n", @@ -719,13 +778,19 @@ resend: policy = &update_policy; einfo->ei_cbdata = NULL; lmm = NULL; - } else if (it->it_op & IT_UNLINK) - req = mdc_intent_unlink_pack(exp, it, op_data); - else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) + } else if (it->it_op & IT_UNLINK) { + req = mdc_intent_unlink_pack(exp, it, op_data); + } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) { req = mdc_intent_getattr_pack(exp, it, op_data); - else if (it->it_op & (IT_READDIR | IT_LAYOUT)) - req = ldlm_enqueue_pack(exp); - else { + } else if (it->it_op & IT_READDIR) { + req = mdc_enqueue_pack(exp, 0); + } else if (it->it_op & IT_LAYOUT) { + if (!imp_connect_lvb_type(class_exp2cliimp(exp))) + RETURN(-EOPNOTSUPP); + + req = mdc_intent_layout_pack(exp, it, op_data); + lvb_type = LVB_T_LAYOUT; + } else { LBUG(); RETURN(-EINVAL); } @@ -733,6 +798,11 @@ resend: if (IS_ERR(req)) RETURN(PTR_ERR(req)); + if (req != NULL && it && it->it_op & IT_CREAT) + /* ask ptlrpc not to resend on EINPROGRESS since we have our own + * retry logic */ + req->rq_no_retry_einprogress = 1; + if (resends) { req->rq_generation_set = 1; req->rq_import_generation = generation; @@ -754,7 +824,7 @@ resend: } rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL, - 0, lockh, 0); + 0, lvb_type, lockh, 0); if (!it) { /* For flock requests we immediatelly return without further delay and let caller deal with the rest, since rest of @@ -776,6 +846,9 @@ resend: lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP); LASSERT(lockrep != NULL); + lockrep->lock_policy_res2 = + ptlrpc_status_ntoh(lockrep->lock_policy_res2); + /* Retry the create infinitely when we get -EINPROGRESS from * server. This is required by the new quota design. */ if (it && it->it_op & IT_CREAT && @@ -791,14 +864,20 @@ resend: if (generation == obddev->u.cli.cl_import->imp_generation) { goto resend; } else { - CDEBUG(D_HA, "resned cross eviction\n"); + CDEBUG(D_HA, "resend cross eviction\n"); RETURN(-EIO); } } - rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc); - - RETURN(rc); + rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc); + if (rc < 0) { + if (lustre_handle_is_used(lockh)) { + ldlm_lock_decref(lockh, einfo->ei_mode); + memset(lockh, 0, sizeof(*lockh)); + } + ptlrpc_req_finished(req); + } + RETURN(rc); } static int mdc_finish_intent_lock(struct obd_export *exp, @@ -811,7 +890,7 @@ static int mdc_finish_intent_lock(struct obd_export *exp, struct mdt_body *mdt_body; struct ldlm_lock *lock; int rc; - + ENTRY; LASSERT(request != NULL); LASSERT(request != LP_POISON); @@ -993,7 +1072,7 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, void *lmm, int lmmsize, struct lookup_intent *it, int lookup_flags, struct ptlrpc_request **reqp, ldlm_blocking_callback cb_blocking, - int extra_lock_flags) + __u64 extra_lock_flags) { struct lustre_handle lockh; int rc = 0; @@ -1008,7 +1087,7 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, lockh.cookie = 0; if (fid_is_sane(&op_data->op_fid2) && - (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_LAYOUT))) { + (it->it_op & (IT_LOOKUP | IT_GETATTR))) { /* We could just return 1 immediately, but since we should only * be called in revalidate_it if we already have a lock, let's * verify that. */ @@ -1069,7 +1148,8 @@ static int mdc_intent_getattr_async_interpret(const struct lu_env *env, struct lookup_intent *it; struct lustre_handle *lockh; struct obd_device *obddev; - int flags = LDLM_FL_HAS_INTENT; + struct ldlm_reply *lockrep; + __u64 flags = LDLM_FL_HAS_INTENT; ENTRY; it = &minfo->mi_it; @@ -1089,6 +1169,12 @@ static int mdc_intent_getattr_async_interpret(const struct lu_env *env, GOTO(out, rc); } + lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP); + LASSERT(lockrep != NULL); + + lockrep->lock_policy_res2 = + ptlrpc_status_ntoh(lockrep->lock_policy_res2); + rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc); if (rc) GOTO(out, rc); @@ -1120,7 +1206,7 @@ int mdc_intent_getattr_async(struct obd_export *exp, MDS_INODELOCK_UPDATE } }; int rc = 0; - int flags = LDLM_FL_HAS_INTENT; + __u64 flags = LDLM_FL_HAS_INTENT; ENTRY; CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n", @@ -1139,7 +1225,7 @@ int mdc_intent_getattr_async(struct obd_export *exp, } rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL, - 0, &minfo->mi_lockh, 1); + 0, LVB_T_NONE, &minfo->mi_lockh, 1); if (rc < 0) { mdc_exit_request(&obddev->u.cli); ptlrpc_req_finished(req);