__u64 child_bits,
struct ldlm_reply *ldlm_rep)
{
- struct mdt_object *parent = info->mti_object;
- struct mdt_object *child;
- struct md_object *next = mdt_object_child(info->mti_object);
- struct lu_fid *child_fid = &info->mti_tmp_fid1;
- const char *name;
- int rc;
+ struct ptlrpc_request *req = mdt_info_req(info);
+ struct mdt_object *parent = info->mti_object;
+ struct mdt_object *child;
+ struct md_object *next = mdt_object_child(info->mti_object);
+ struct lu_fid *child_fid = &info->mti_tmp_fid1;
+ int is_resent, rc;
+ const char *name;
struct mdt_lock_handle *lhp;
+ struct ldlm_lock *lock;
ENTRY;
+ is_resent = lustre_handle_is_used(&lhc->mlh_lh);
+ if (is_resent)
+ LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
+
LASSERT(info->mti_object != NULL);
name = req_capsule_client_get(&info->mti_pill, &RMF_NAME);
if (name == NULL)
mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
child = parent;
CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID
- ", ldlm_rep=%p\n",
- PFID(mdt_object_fid(child)), ldlm_rep);
-
- mdt_lock_handle_init(lhc);
- lhc->mlh_mode = LCK_CR;
- rc = mdt_object_lock(info, child, lhc, child_bits);
+ ", ldlm_rep=%p\n", PFID(mdt_object_fid(child)), ldlm_rep);
+
+ if (is_resent) {
+ /* Do not take lock for resent case. */
+ lock = ldlm_handle2lock(&lhc->mlh_lh);
+ if (!lock) {
+ CERROR("Invalid lock handle "LPX64"\n",
+ lhc->mlh_lh.cookie);
+ LBUG();
+ }
+ LASSERT(fid_res_name_eq(mdt_object_fid(child),
+ &lock->l_resource->lr_name));
+ LDLM_LOCK_PUT(lock);
+ rc = 0;
+ } else {
+ mdt_lock_handle_init(lhc);
+ lhc->mlh_mode = LCK_CR;
+ rc = mdt_object_lock(info, child, lhc, child_bits);
+ }
if (rc == 0) {
/* finally, we can get attr for child. */
rc = mdt_getattr_internal(info, child);
*step 3: find the child object by fid & lock it.
* regardless if it is local or remote.
*/
- mdt_lock_handle_init(lhc);
- lhc->mlh_mode = LCK_CR;
- child = mdt_object_find_lock(info, child_fid, lhc, child_bits);
+ if (is_resent) {
+ /* Do not take lock for resent case. */
+ lock = ldlm_handle2lock(&lhc->mlh_lh);
+ if (!lock) {
+ CERROR("Invalid lock handle "LPX64"\n",
+ lhc->mlh_lh.cookie);
+ LBUG();
+ }
+ LASSERT(fid_res_name_eq(mdt_object_fid(child),
+ &lock->l_resource->lr_name));
+ LDLM_LOCK_PUT(lock);
+ child = mdt_object_find(info->mti_ctxt, info->mti_mdt,
+ child_fid);
+ } else {
+ mdt_lock_handle_init(lhc);
+ lhc->mlh_mode = LCK_CR;
+ child = mdt_object_find_lock(info, child_fid, lhc, child_bits);
+ }
if (IS_ERR(child))
GOTO(out_parent, rc = PTR_ERR(child));
/* finally, we can get attr for child. */
rc = mdt_getattr_internal(info, child);
- if (rc != 0)
+ if (rc != 0) {
mdt_object_unlock(info, child, lhc, 1);
- else {
+ } else {
struct ldlm_lock *lock = ldlm_handle2lock(&lhc->mlh_lh);
if (lock) {
struct ldlm_res_id *res_id;
ldlm_policy_data_t *policy,
struct ldlm_res_id *res_id)
{
- int flags = 0; /*XXX: LDLM_FL_LOCAL_ONLY?*/
+ int flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
int rc;
LASSERT(ns != NULL);
return rc == ELDLM_OK ? 0 : -EIO;
}
-/* just call ldlm_lock_decref() if decref,
- * else we only call ptlrpc_save_lock() to save this lock in req.
- * when transaction committed, req will be released, and lock will, too */
+/*
+ * Just call ldlm_lock_decref() if decref, else we only call ptlrpc_save_lock()
+ * to save this lock in req. when transaction committed, req will be released,
+ * and lock will, too.
+ */
void fid_unlock(struct ptlrpc_request *req, const struct lu_fid *f,
struct lustre_handle *lh, ldlm_mode_t mode, int decref)
{
{
- /* FIXME: this is debug stuff, remove it later. */
+ /* FIXME: this is debug stuff, remove it later. */
struct ldlm_lock *lock = ldlm_handle2lock(lh);
if (!lock) {
- CERROR("invalid lock handle "LPX64, lh->cookie);
+ CERROR("Invalid lock handle "LPX64"\n",
+ lh->cookie);
LBUG();
}
LASSERT(fid_res_name_eq(f, &lock->l_resource->lr_name));
LDLM_LOCK_PUT(lock);
}
+
if (decref)
ldlm_lock_decref(lh, mode);
else
RETURN(ELDLM_LOCK_REPLACED);
}
+static void mdt_fixup_resent(struct req_capsule *pill,
+ struct ldlm_lock *new_lock,
+ struct mdt_lock_handle *lh)
+{
+ struct ptlrpc_request *req = pill->rc_req;
+ struct obd_export *exp = req->rq_export;
+ struct lustre_handle remote_hdl;
+ struct ldlm_request *dlmreq;
+ struct list_head *iter;
+
+ if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
+ return;
+
+ dlmreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
+ remote_hdl = dlmreq->lock_handle1;
+
+ spin_lock(&exp->exp_ldlm_data.led_lock);
+ list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
+ struct ldlm_lock *lock;
+ lock = list_entry(iter, struct ldlm_lock, l_export_chain);
+ if (lock == new_lock)
+ continue;
+ if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
+ lh->mlh_lh.cookie = lock->l_handle.h_cookie;
+ lh->mlh_mode = lock->l_granted_mode;
+
+ LDLM_DEBUG(lock, "restoring lock cookie");
+ DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
+ lh->mlh_lh.cookie);
+ break;
+ }
+ }
+ spin_unlock(&exp->exp_ldlm_data.led_lock);
+
+ /*
+ * If the xid matches, then we know this is a resent request, and allow
+ * it. (It's probably an OPEN, for which we don't send a lock.
+ */
+ if (req->rq_xid ==
+ le64_to_cpu(exp->exp_mdt_data.med_mcd->mcd_last_xid))
+ return;
+
+ if (req->rq_xid ==
+ le64_to_cpu(exp->exp_mdt_data.med_mcd->mcd_last_close_xid))
+ return;
+
+ /*
+ * This remote handle isn't enqueued, so we never received or processed
+ * this request. Clear MSG_RESENT, because it can be handled like any
+ * normal request now.
+ */
+ lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
+
+ DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64,
+ remote_hdl.cookie);
+}
+
static int mdt_intent_getattr(enum mdt_it_code opcode,
struct mdt_thread_info *info,
struct ldlm_lock **lockp,
int flags)
{
struct ldlm_reply *ldlm_rep;
- struct mdt_lock_handle tmp_lock;
+ struct mdt_lock_handle tmp_lock = { 0 };
struct mdt_lock_handle *lhc = &tmp_lock;
__u64 child_bits;
+ struct ptlrpc_request *req;
ENTRY;
RETURN(-EINVAL);
}
+ req = info->mti_pill.rc_req;
ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD);
+ if ((lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
+ mdt_fixup_resent(&info->mti_pill, *lockp, lhc);
+
ldlm_rep->lock_policy_res2 =
mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep);
mdt_shrink_reply(info, DLM_REPLY_REC_OFF + 1);