};
int mdt_intent_lock_replace(struct mdt_thread_info *info,
- struct ldlm_lock **lockp,
- struct mdt_lock_handle *lh, int flags)
+ struct ldlm_lock **lockp,
+ struct ldlm_lock *new_lock,
+ struct mdt_lock_handle *lh,
+ int flags)
{
struct ptlrpc_request *req = mdt_info_req(info);
- struct ldlm_lock *old_lock = *lockp;
- struct ldlm_lock *new_lock = NULL;
+ struct ldlm_lock *lock = *lockp;
- new_lock = ldlm_handle2lock(&lh->mlh_lh);
+ /*
+ * Get new lock only for cases when possible resent did not find any
+ * lock.
+ */
+ if (new_lock == NULL)
+ new_lock = ldlm_handle2lock(&lh->mlh_lh);
if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
RETURN(0);
- LASSERTF(new_lock != NULL, "lockh "LPX64"\n", lh->mlh_lh.cookie);
+ LASSERTF(new_lock != NULL,
+ "lockh "LPX64"\n", lh->mlh_lh.cookie);
+
+ /*
+ * If we've already given this lock to a client once, then we should
+ * have no readers or writers. Otherwise, we should have one reader
+ * _or_ writer ref (which will be zeroed below) before returning the
+ * lock to a client.
+ */
+ if (new_lock->l_export == req->rq_export) {
+ LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
+ } else {
+ LASSERT(new_lock->l_export == NULL);
+ LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
+ }
*lockp = new_lock;
- /* FIXME:This only happens when MDT can handle RESENT */
if (new_lock->l_export == req->rq_export) {
- /* Already gave this to the client, which means that we
- * reconstructed a reply. */
+ /*
+ * Already gave this to the client, which means that we
+ * reconstructed a reply.
+ */
LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
MSG_RESENT);
RETURN(ELDLM_LOCK_REPLACED);
list_add(&new_lock->l_export_chain,
&new_lock->l_export->exp_ldlm_data.led_held_locks);
- new_lock->l_blocking_ast = old_lock->l_blocking_ast;
- new_lock->l_completion_ast = old_lock->l_completion_ast;
-
- new_lock->l_remote_handle = old_lock->l_remote_handle;
-
+ new_lock->l_blocking_ast = lock->l_blocking_ast;
+ new_lock->l_completion_ast = lock->l_completion_ast;
+ new_lock->l_remote_handle = lock->l_remote_handle;
new_lock->l_flags &= ~LDLM_FL_LOCAL;
unlock_res_and_lock(new_lock);
static void mdt_fixup_resent(struct req_capsule *pill,
struct ldlm_lock *new_lock,
+ struct ldlm_lock **old_lock,
struct mdt_lock_handle *lh)
{
struct ptlrpc_request *req = pill->rc_req;
LDLM_DEBUG(lock, "restoring lock cookie");
DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
lh->mlh_lh.cookie);
- break;
+ if (old_lock)
+ *old_lock = LDLM_LOCK_GET(lock);
+ spin_unlock(&exp->exp_ldlm_data.led_lock);
+ return;
}
}
spin_unlock(&exp->exp_ldlm_data.led_lock);
struct ldlm_lock **lockp,
int flags)
{
- struct ldlm_reply *ldlm_rep;
- struct mdt_lock_handle tmp_lock = { 0 };
+ struct mdt_lock_handle tmp_lock = { { 0 } };
struct mdt_lock_handle *lhc = &tmp_lock;
+ struct ldlm_lock *new_lock = NULL;
__u64 child_bits;
+ struct ldlm_reply *ldlm_rep;
struct ptlrpc_request *req;
ENTRY;
ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD);
- if ((lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
- mdt_fixup_resent(&info->mti_pill, *lockp, lhc);
+ /* Get lock from request for possible resent case. */
+ mdt_fixup_resent(&info->mti_pill, *lockp, &new_lock, lhc);
ldlm_rep->lock_policy_res2 =
mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep);
RETURN(ELDLM_LOCK_ABORTED);
}
- return mdt_intent_lock_replace(info, lockp, lhc, flags);
+ return mdt_intent_lock_replace(info, lockp, new_lock, lhc, flags);
}
static int mdt_intent_reint(enum mdt_it_code opcode,
struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_NEW];
LASSERT(lhc->mlh_lh.cookie != 0);
rep->lock_policy_res2 = 0;
- return mdt_intent_lock_replace(info, lockp, lhc, flags);
+ return mdt_intent_lock_replace(info, lockp, NULL, lhc, flags);
}
rep->lock_policy_res2 = rc;