"ldlm_rep = %p\n",
PFID(mdt_object_fid(child)), ldlm_rep);
- if (is_resent) {
- /* Do not take lock for resent case. */
- lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
- LASSERTF(lock != NULL, "Invalid lock handle "LPX64"\n",
- lhc->mlh_reg_lh.cookie);
- LASSERT(fid_res_name_eq(mdt_object_fid(child),
- &lock->l_resource->lr_name));
- LDLM_LOCK_PUT(lock);
- } else {
+ rc = mdt_check_resent_lock(info, child, lhc);
+ if (rc < 0) {
+ RETURN(-EPROTO);
+ } else if (rc > 0) {
mdt_lock_handle_init(lhc);
mdt_lock_reg_init(lhc, LCK_PR);
if (unlikely(IS_ERR(child)))
GOTO(out_parent, rc = PTR_ERR(child));
- if (is_resent) {
- /* Do not take lock for resent case. */
- lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
- LASSERTF(lock != NULL, "Invalid lock handle "LPX64"\n",
- lhc->mlh_reg_lh.cookie);
- if (!fid_res_name_eq(mdt_object_fid(child),
- &lock->l_resource->lr_name)) {
- LASSERTF(fid_res_name_eq(mdt_object_fid(parent),
- &lock->l_resource->lr_name),
- "Lock res_id: "DLDLMRES", fid: "DFID"\n",
- PLDLMRES(lock->l_resource),
- PFID(mdt_object_fid(parent)));
- CWARN("Although resent, but still not get child lock"
- "parent:"DFID" child:"DFID"\n",
- PFID(mdt_object_fid(parent)),
- PFID(mdt_object_fid(child)));
- lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
- LDLM_LOCK_PUT(lock);
- GOTO(relock, 0);
- }
- LDLM_LOCK_PUT(lock);
- } else {
-relock:
+ rc = mdt_check_resent_lock(info, child, lhc);
+ if (rc < 0) {
+ GOTO(out_child, rc);
+ } else if (rc > 0) {
OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout*2);
mdt_lock_handle_init(lhc);
mdt_lock_reg_init(lhc, LCK_PR);
RETURN(0);
}
+int mdt_check_resent_lock(struct mdt_thread_info *info,
+ struct mdt_object *mo,
+ struct mdt_lock_handle *lhc)
+{
+ /* the lock might already be gotten in ldlm_handle_enqueue() */
+ if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
+ struct ptlrpc_request *req = mdt_info_req(info);
+ struct ldlm_lock *lock;
+
+ lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
+ LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
+ LASSERTF(lock != NULL, "Invalid lock handle "LPX64"\n",
+ lhc->mlh_reg_lh.cookie);
+
+ if (!fid_res_name_eq(mdt_object_fid(mo),
+ &lock->l_resource->lr_name)) {
+ CWARN("%s: Although resent, but still not "
+ "get child lock:"DFID"\n",
+ info->mti_exp->exp_obd->obd_name,
+ PFID(mdt_object_fid(mo)));
+ LDLM_LOCK_PUT(lock);
+ RETURN(-EPROTO);
+ }
+ LDLM_LOCK_PUT(lock);
+ return 0;
+ }
+ return 1;
+}
+
int mdt_remote_object_lock(struct mdt_thread_info *mti,
struct mdt_object *o, const struct lu_fid *fid,
struct lustre_handle *lh, ldlm_mode_t mode,
};
static int
-mdt_intent_lock_replace(struct mdt_thread_info *info, struct ldlm_lock **lockp,
- struct ldlm_lock *new_lock, struct mdt_lock_handle *lh,
+mdt_intent_lock_replace(struct mdt_thread_info *info,
+ struct ldlm_lock **lockp,
+ struct mdt_lock_handle *lh,
__u64 flags)
{
struct ptlrpc_request *req = mdt_info_req(info);
struct ldlm_lock *lock = *lockp;
+ struct ldlm_lock *new_lock;
- /*
- * Get new lock only for cases when possible resent did not find any
- * lock.
- */
- if (new_lock == NULL)
- new_lock = ldlm_handle2lock_long(&lh->mlh_reg_lh, 0);
+ /* If possible resent found a lock, @lh is set to its handle */
+ new_lock = ldlm_handle2lock_long(&lh->mlh_reg_lh, 0);
if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY)) {
lh->mlh_reg_lh.cookie = 0;
*/
LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
MSG_RESENT);
+
+ LDLM_LOCK_RELEASE(new_lock);
lh->mlh_reg_lh.cookie = 0;
RETURN(ELDLM_LOCK_REPLACED);
}
static void mdt_intent_fixup_resent(struct mdt_thread_info *info,
struct ldlm_lock *new_lock,
- struct ldlm_lock **old_lock,
struct mdt_lock_handle *lh,
- enum mdt_it_code opcode)
+ __u64 flags)
{
struct ptlrpc_request *req = mdt_info_req(info);
- struct obd_export *exp = req->rq_export;
- struct lustre_handle remote_hdl;
struct ldlm_request *dlmreq;
- struct ldlm_lock *lock;
if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
return;
dlmreq = req_capsule_client_get(info->mti_pill, &RMF_DLM_REQ);
- remote_hdl = dlmreq->lock_handle[0];
- /* If the client does not require open lock, it does not need to
- * search lock in exp_lock_hash, since the server thread will
- * make sure the lock will be released, and the resend request
- * can always re-enqueue the lock */
- if ((opcode != MDT_IT_OPEN) || (opcode == MDT_IT_OPEN &&
- info->mti_spec.sp_cr_flags & MDS_OPEN_LOCK)) {
- /* In the function below, .hs_keycmp resolves to
- * ldlm_export_lock_keycmp() */
- /* coverity[overrun-buffer-val] */
- lock = cfs_hash_lookup(exp->exp_lock_hash, &remote_hdl);
- if (lock) {
- lock_res_and_lock(lock);
- if (lock != new_lock) {
- lh->mlh_reg_lh.cookie = lock->l_handle.h_cookie;
- lh->mlh_reg_mode = lock->l_granted_mode;
-
- LDLM_DEBUG(lock, "Restoring lock cookie");
- DEBUG_REQ(D_DLMTRACE, req,
- "restoring lock cookie "LPX64,
- lh->mlh_reg_lh.cookie);
- if (old_lock)
- *old_lock = LDLM_LOCK_GET(lock);
- cfs_hash_put(exp->exp_lock_hash,
- &lock->l_exp_hash);
- unlock_res_and_lock(lock);
- return;
- }
- cfs_hash_put(exp->exp_lock_hash, &lock->l_exp_hash);
- unlock_res_and_lock(lock);
- }
+
+ /* Check if this is a resend case (MSG_RESENT is set on RPC) and a
+ * lock was found by ldlm_handle_enqueue(); if so @lh must be
+ * initialized. */
+ if (flags & LDLM_FL_RESENT) {
+ lh->mlh_reg_lh.cookie = new_lock->l_handle.h_cookie;
+ lh->mlh_reg_mode = new_lock->l_granted_mode;
+
+ LDLM_DEBUG(new_lock, "Restoring lock cookie");
+ DEBUG_REQ(D_DLMTRACE, req, "restoring lock cookie "LPX64,
+ lh->mlh_reg_lh.cookie);
+ return;
}
+
/*
* If the xid matches, then we know this is a resent request, and allow
* it. (It's probably an OPEN, for which we don't send a lock.
*/
lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
- DEBUG_REQ(D_DLMTRACE, req, "no existing lock with rhandle "LPX64,
- remote_hdl.cookie);
+ DEBUG_REQ(D_DLMTRACE, req, "no existing lock with rhandle "LPX64,
+ dlmreq->lock_handle[0].cookie);
}
static int mdt_intent_getxattr(enum mdt_it_code opcode,
* (for the resend case) or a new lock. Below we will use it to
* replace the original lock.
*/
- mdt_intent_fixup_resent(info, *lockp, NULL, lhc, opcode);
+ mdt_intent_fixup_resent(info, *lockp, lhc, flags);
if (!lustre_handle_is_used(&lhc->mlh_reg_lh)) {
mdt_lock_reg_init(lhc, (*lockp)->l_req_mode);
rc = mdt_object_lock(info, info->mti_object, lhc,
grc = mdt_getxattr(info);
- rc = mdt_intent_lock_replace(info, lockp, NULL, lhc, flags);
+ rc = mdt_intent_lock_replace(info, lockp, lhc, flags);
if (mdt_info_req(info)->rq_repmsg != NULL)
ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
__u64 flags)
{
struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
- struct ldlm_lock *new_lock = NULL;
__u64 child_bits;
struct ldlm_reply *ldlm_rep;
struct mdt_body *reqbody;
mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD);
/* Get lock from request for possible resent case. */
- mdt_intent_fixup_resent(info, *lockp, &new_lock, lhc, opcode);
+ mdt_intent_fixup_resent(info, *lockp, lhc, flags);
rc = mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep);
ldlm_rep->lock_policy_res2 = clear_serious(rc);
GOTO(out_ucred, rc = ELDLM_LOCK_ABORTED);
}
- rc = mdt_intent_lock_replace(info, lockp, new_lock, lhc, flags);
+ rc = mdt_intent_lock_replace(info, lockp, lhc, flags);
EXIT;
out_ucred:
mdt_exit_ucred(info);
}
/* Get lock from request for possible resent case. */
- mdt_intent_fixup_resent(info, *lockp, NULL, lhc, opcode);
+ mdt_intent_fixup_resent(info, *lockp, lhc, flags);
rc = mdt_reint_internal(info, lhc, opc);
if (rc == -EREMOTE || mdt_get_disposition(rep, DISP_OPEN_LOCK)) {
LASSERT(lustre_handle_is_used(&lhc->mlh_reg_lh));
rep->lock_policy_res2 = 0;
- rc = mdt_intent_lock_replace(info, lockp, NULL, lhc, flags);
+ rc = mdt_intent_lock_replace(info, lockp, lhc, flags);
RETURN(rc);
}
LASSERTF(rc == 0, "Error occurred but lock handle "
"is still in use, rc = %d\n", rc);
rep->lock_policy_res2 = 0;
- rc = mdt_intent_lock_replace(info, lockp, NULL, lhc, flags);
+ rc = mdt_intent_lock_replace(info, lockp, lhc, flags);
RETURN(rc);
} else {
lhc->mlh_reg_lh.cookie = 0ull;