Whamcloud - gitweb
LU-2827 ldlm: wrong handling of ldlm resends 78/5978/10
authorVitaly Fertman <vitaly_fertman@xyratex.com>
Tue, 6 May 2014 19:18:03 +0000 (23:18 +0400)
committerOleg Drokin <oleg.drokin@intel.com>
Fri, 20 Jun 2014 02:43:25 +0000 (02:43 +0000)
If a successful reply is lost for an intent
lock request, MDS will not correctly recover
from this situation on resend. Instead, a new
lock is created.

Signed-off-by: Vitaly Fertman <vitaly_fertman@xyratex.com>
Reviewed-by: Andrew Perepechko <andrew_perepechko@xyratex.com>
Tested-by: Elena Gryaznova <elena_gryaznova@xyratex.com>
Reviewed-by: Alexey Lyashkov <alexey_lyashkov@xyratex.com>
Change-Id: I9a23c13313d5168d0c232fdc0acbdab330089356
Tested-by: Andrey Bondarenko <andrey_bondarenko@xyratex.com>
Xyratex-bug-id: MRP-975 MRP-1798
Reviewed-on: http://review.whamcloud.com/5978
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: James Simmons <uja.ornl@gmail.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
13 files changed:
lustre/include/lustre_dlm_flags.h
lustre/include/obd_support.h
lustre/ldlm/ldlm_extent.c
lustre/ldlm/ldlm_inodebits.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_plain.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_open.c
lustre/ofd/ofd_dlm.c
lustre/ptlrpc/service.c
lustre/tests/recovery-small.sh
lustre/tests/test-framework.sh

index 27ff3f7..ea70ea0 100644 (file)
 #define ldlm_set_excl(_l)               LDLM_SET_FLAG((  _l), 1ULL << 55)
 #define ldlm_clear_excl(_l)             LDLM_CLEAR_FLAG((_l), 1ULL << 55)
 
 #define ldlm_set_excl(_l)               LDLM_SET_FLAG((  _l), 1ULL << 55)
 #define ldlm_clear_excl(_l)             LDLM_CLEAR_FLAG((_l), 1ULL << 55)
 
+/** Flag whether a lock is found on server for re-sent RPC. */
+#define LDLM_FL_RESENT                   0x0100000000000000ULL // bit  56
+
 /** l_flags bits marked as "ast" bits */
 #define LDLM_FL_AST_MASK                (LDLM_FL_FLOCK_DEADLOCK                |\
                                         LDLM_FL_AST_DISCARD_DATA)
 /** l_flags bits marked as "ast" bits */
 #define LDLM_FL_AST_MASK                (LDLM_FL_FLOCK_DEADLOCK                |\
                                         LDLM_FL_AST_DISCARD_DATA)
index 3cedca7..343343a 100644 (file)
@@ -347,6 +347,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_LDLM_AGL_NOLOCK         0x31b
 #define OBD_FAIL_LDLM_OST_LVB           0x31c
 #define OBD_FAIL_LDLM_ENQUEUE_HANG      0x31d
 #define OBD_FAIL_LDLM_AGL_NOLOCK         0x31b
 #define OBD_FAIL_LDLM_OST_LVB           0x31c
 #define OBD_FAIL_LDLM_ENQUEUE_HANG      0x31d
+#define OBD_FAIL_LDLM_PAUSE_CANCEL2      0x31f
 #define OBD_FAIL_LDLM_CP_CB_WAIT2        0x320
 #define OBD_FAIL_LDLM_CP_CB_WAIT3        0x321
 #define OBD_FAIL_LDLM_CP_CB_WAIT4        0x322
 #define OBD_FAIL_LDLM_CP_CB_WAIT2        0x320
 #define OBD_FAIL_LDLM_CP_CB_WAIT3        0x321
 #define OBD_FAIL_LDLM_CP_CB_WAIT4        0x322
index 12dc35a..c9f1618 100644 (file)
@@ -693,6 +693,7 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
         int contended_locks = 0;
         ENTRY;
 
         int contended_locks = 0;
         ENTRY;
 
+       LASSERT(lock->l_granted_mode != lock->l_req_mode);
         LASSERT(cfs_list_empty(&res->lr_converting));
         LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
                !ldlm_is_ast_discard_data(lock));
         LASSERT(cfs_list_empty(&res->lr_converting));
         LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
                !ldlm_is_ast_discard_data(lock));
index b1dc03d..e43cbea 100644 (file)
@@ -187,6 +187,7 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags,
         int rc;
         ENTRY;
 
         int rc;
         ENTRY;
 
+       LASSERT(lock->l_granted_mode != lock->l_req_mode);
         LASSERT(cfs_list_empty(&res->lr_converting));
         check_res_locked(res);
 
         LASSERT(cfs_list_empty(&res->lr_converting));
         check_res_locked(res);
 
index 2d6a60e..307a823 100644 (file)
@@ -1232,7 +1232,10 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
         }
 #endif
 
         }
 #endif
 
-        if (unlikely(flags & LDLM_FL_REPLAY)) {
+       if (unlikely(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
+               flags |= LDLM_FL_RESENT;
+
+       if (unlikely(flags & (LDLM_FL_REPLAY | LDLM_FL_RESENT))) {
                 /* Find an existing lock in the per-export lock hash */
                /* In the function below, .hs_keycmp resolves to
                 * ldlm_export_lock_keycmp() */
                 /* Find an existing lock in the per-export lock hash */
                /* In the function below, .hs_keycmp resolves to
                 * ldlm_export_lock_keycmp() */
@@ -1243,7 +1246,9 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
                         DEBUG_REQ(D_DLMTRACE, req, "found existing lock cookie "
                                   LPX64, lock->l_handle.h_cookie);
                         GOTO(existing_lock, rc = 0);
                         DEBUG_REQ(D_DLMTRACE, req, "found existing lock cookie "
                                   LPX64, lock->l_handle.h_cookie);
                         GOTO(existing_lock, rc = 0);
-                }
+               } else {
+                       flags &= ~LDLM_FL_RESENT;
+               }
         }
 
        /* The lock's callback data might be set in the policy function */
         }
 
        /* The lock's callback data might be set in the policy function */
@@ -2596,6 +2601,8 @@ static int ldlm_bl_thread_main(void *arg)
                 if (blwi->blwi_mem_pressure)
                        memory_pressure_set();
 
                 if (blwi->blwi_mem_pressure)
                        memory_pressure_set();
 
+               OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL2, 4);
+
                 if (blwi->blwi_count) {
                         int count;
                        /* The special case when we cancel locks in LRU
                 if (blwi->blwi_count) {
                         int count;
                        /* The special case when we cancel locks in LRU
index 7c44f72..f6a37f0 100644 (file)
@@ -149,6 +149,7 @@ int ldlm_process_plain_lock(struct ldlm_lock *lock, __u64 *flags,
         int rc;
         ENTRY;
 
         int rc;
         ENTRY;
 
+       LASSERT(lock->l_granted_mode != lock->l_req_mode);
         check_res_locked(res);
         LASSERT(cfs_list_empty(&res->lr_converting));
 
         check_res_locked(res);
         LASSERT(cfs_list_empty(&res->lr_converting));
 
index d01af8b..6cd5f51 100644 (file)
@@ -1345,15 +1345,10 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                       "ldlm_rep = %p\n",
                       PFID(mdt_object_fid(child)), ldlm_rep);
 
                       "ldlm_rep = %p\n",
                       PFID(mdt_object_fid(child)), ldlm_rep);
 
-               if (is_resent) {
-                       /* Do not take lock for resent case. */
-                       lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
-                       LASSERTF(lock != NULL, "Invalid lock handle "LPX64"\n",
-                                lhc->mlh_reg_lh.cookie);
-                       LASSERT(fid_res_name_eq(mdt_object_fid(child),
-                                               &lock->l_resource->lr_name));
-                       LDLM_LOCK_PUT(lock);
-               } else {
+               rc = mdt_check_resent_lock(info, child, lhc);
+               if (rc < 0) {
+                       RETURN(-EPROTO);
+               } else if (rc > 0) {
                        mdt_lock_handle_init(lhc);
                        mdt_lock_reg_init(lhc, LCK_PR);
 
                        mdt_lock_handle_init(lhc);
                        mdt_lock_reg_init(lhc, LCK_PR);
 
@@ -1478,30 +1473,11 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
 
        if (unlikely(IS_ERR(child)))
                GOTO(out_parent, rc = PTR_ERR(child));
 
        if (unlikely(IS_ERR(child)))
                GOTO(out_parent, rc = PTR_ERR(child));
-       if (is_resent) {
-               /* Do not take lock for resent case. */
-               lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
-               LASSERTF(lock != NULL, "Invalid lock handle "LPX64"\n",
-                        lhc->mlh_reg_lh.cookie);
 
 
-               if (!fid_res_name_eq(mdt_object_fid(child),
-                                    &lock->l_resource->lr_name)) {
-                       LASSERTF(fid_res_name_eq(mdt_object_fid(parent),
-                                                &lock->l_resource->lr_name),
-                                "Lock res_id: "DLDLMRES", fid: "DFID"\n",
-                                PLDLMRES(lock->l_resource),
-                                PFID(mdt_object_fid(parent)));
-                       CWARN("Although resent, but still not get child lock"
-                             "parent:"DFID" child:"DFID"\n",
-                             PFID(mdt_object_fid(parent)),
-                             PFID(mdt_object_fid(child)));
-                       lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
-                       LDLM_LOCK_PUT(lock);
-                       GOTO(relock, 0);
-               }
-               LDLM_LOCK_PUT(lock);
-       } else {
-relock:
+       rc = mdt_check_resent_lock(info, child, lhc);
+       if (rc < 0) {
+               GOTO(out_child, rc);
+       } else if (rc > 0) {
                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout*2);
                 mdt_lock_handle_init(lhc);
                mdt_lock_reg_init(lhc, LCK_PR);
                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout*2);
                 mdt_lock_handle_init(lhc);
                mdt_lock_reg_init(lhc, LCK_PR);
@@ -2335,6 +2311,35 @@ int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
        RETURN(0);
 }
 
        RETURN(0);
 }
 
+int mdt_check_resent_lock(struct mdt_thread_info *info,
+                         struct mdt_object *mo,
+                         struct mdt_lock_handle *lhc)
+{
+       /* the lock might already be gotten in ldlm_handle_enqueue() */
+       if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
+               struct ptlrpc_request *req = mdt_info_req(info);
+               struct ldlm_lock      *lock;
+
+               lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
+               LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
+               LASSERTF(lock != NULL, "Invalid lock handle "LPX64"\n",
+                        lhc->mlh_reg_lh.cookie);
+
+               if (!fid_res_name_eq(mdt_object_fid(mo),
+                                    &lock->l_resource->lr_name)) {
+                       CWARN("%s: Although resent, but still not "
+                             "get child lock:"DFID"\n",
+                             info->mti_exp->exp_obd->obd_name,
+                             PFID(mdt_object_fid(mo)));
+                       LDLM_LOCK_PUT(lock);
+                       RETURN(-EPROTO);
+               }
+               LDLM_LOCK_PUT(lock);
+               return 0;
+       }
+       return 1;
+}
+
 int mdt_remote_object_lock(struct mdt_thread_info *mti,
                           struct mdt_object *o, const struct lu_fid *fid,
                           struct lustre_handle *lh, ldlm_mode_t mode,
 int mdt_remote_object_lock(struct mdt_thread_info *mti,
                           struct mdt_object *o, const struct lu_fid *fid,
                           struct lustre_handle *lh, ldlm_mode_t mode,
@@ -2991,19 +2996,17 @@ static struct mdt_it_flavor {
 };
 
 static int
 };
 
 static int
-mdt_intent_lock_replace(struct mdt_thread_info *info, struct ldlm_lock **lockp,
-                       struct ldlm_lock *new_lock, struct mdt_lock_handle *lh,
+mdt_intent_lock_replace(struct mdt_thread_info *info,
+                       struct ldlm_lock **lockp,
+                       struct mdt_lock_handle *lh,
                        __u64 flags)
 {
         struct ptlrpc_request  *req = mdt_info_req(info);
         struct ldlm_lock       *lock = *lockp;
                        __u64 flags)
 {
         struct ptlrpc_request  *req = mdt_info_req(info);
         struct ldlm_lock       *lock = *lockp;
+       struct ldlm_lock       *new_lock;
 
 
-        /*
-         * Get new lock only for cases when possible resent did not find any
-         * lock.
-         */
-        if (new_lock == NULL)
-                new_lock = ldlm_handle2lock_long(&lh->mlh_reg_lh, 0);
+       /* If possible resent found a lock, @lh is set to its handle */
+       new_lock = ldlm_handle2lock_long(&lh->mlh_reg_lh, 0);
 
         if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY)) {
                 lh->mlh_reg_lh.cookie = 0;
 
         if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY)) {
                 lh->mlh_reg_lh.cookie = 0;
@@ -3035,6 +3038,8 @@ mdt_intent_lock_replace(struct mdt_thread_info *info, struct ldlm_lock **lockp,
                  */
                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
                         MSG_RESENT);
                  */
                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
                         MSG_RESENT);
+
+               LDLM_LOCK_RELEASE(new_lock);
                 lh->mlh_reg_lh.cookie = 0;
                 RETURN(ELDLM_LOCK_REPLACED);
         }
                 lh->mlh_reg_lh.cookie = 0;
                 RETURN(ELDLM_LOCK_REPLACED);
         }
@@ -3076,52 +3081,30 @@ mdt_intent_lock_replace(struct mdt_thread_info *info, struct ldlm_lock **lockp,
 
 static void mdt_intent_fixup_resent(struct mdt_thread_info *info,
                                    struct ldlm_lock *new_lock,
 
 static void mdt_intent_fixup_resent(struct mdt_thread_info *info,
                                    struct ldlm_lock *new_lock,
-                                   struct ldlm_lock **old_lock,
                                    struct mdt_lock_handle *lh,
                                    struct mdt_lock_handle *lh,
-                                   enum mdt_it_code opcode)
+                                   __u64 flags)
 {
         struct ptlrpc_request  *req = mdt_info_req(info);
 {
         struct ptlrpc_request  *req = mdt_info_req(info);
-        struct obd_export      *exp = req->rq_export;
-        struct lustre_handle    remote_hdl;
         struct ldlm_request    *dlmreq;
         struct ldlm_request    *dlmreq;
-        struct ldlm_lock       *lock;
 
         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
                 return;
 
         dlmreq = req_capsule_client_get(info->mti_pill, &RMF_DLM_REQ);
 
         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
                 return;
 
         dlmreq = req_capsule_client_get(info->mti_pill, &RMF_DLM_REQ);
-        remote_hdl = dlmreq->lock_handle[0];
-       /* If the client does not require open lock, it does not need to
-        * search lock in exp_lock_hash, since the server thread will
-        * make sure the lock will be released, and the resend request
-        * can always re-enqueue the lock */
-       if ((opcode != MDT_IT_OPEN) || (opcode == MDT_IT_OPEN &&
-           info->mti_spec.sp_cr_flags & MDS_OPEN_LOCK)) {
-               /* In the function below, .hs_keycmp resolves to
-                * ldlm_export_lock_keycmp() */
-               /* coverity[overrun-buffer-val] */
-               lock = cfs_hash_lookup(exp->exp_lock_hash, &remote_hdl);
-               if (lock) {
-                       lock_res_and_lock(lock);
-                       if (lock != new_lock) {
-                               lh->mlh_reg_lh.cookie = lock->l_handle.h_cookie;
-                               lh->mlh_reg_mode = lock->l_granted_mode;
-
-                               LDLM_DEBUG(lock, "Restoring lock cookie");
-                               DEBUG_REQ(D_DLMTRACE, req,
-                                         "restoring lock cookie "LPX64,
-                                         lh->mlh_reg_lh.cookie);
-                               if (old_lock)
-                                       *old_lock = LDLM_LOCK_GET(lock);
-                               cfs_hash_put(exp->exp_lock_hash,
-                                            &lock->l_exp_hash);
-                               unlock_res_and_lock(lock);
-                               return;
-                       }
-                       cfs_hash_put(exp->exp_lock_hash, &lock->l_exp_hash);
-                       unlock_res_and_lock(lock);
-               }
+
+       /* Check if this is a resend case (MSG_RESENT is set on RPC) and a
+        * lock was found by ldlm_handle_enqueue(); if so @lh must be
+        * initialized. */
+       if (flags & LDLM_FL_RESENT) {
+               lh->mlh_reg_lh.cookie = new_lock->l_handle.h_cookie;
+               lh->mlh_reg_mode = new_lock->l_granted_mode;
+
+               LDLM_DEBUG(new_lock, "Restoring lock cookie");
+               DEBUG_REQ(D_DLMTRACE, req, "restoring lock cookie "LPX64,
+                         lh->mlh_reg_lh.cookie);
+               return;
        }
        }
+
         /*
          * If the xid matches, then we know this is a resent request, and allow
          * it. (It's probably an OPEN, for which we don't send a lock.
         /*
          * If the xid matches, then we know this is a resent request, and allow
          * it. (It's probably an OPEN, for which we don't send a lock.
@@ -3136,8 +3119,8 @@ static void mdt_intent_fixup_resent(struct mdt_thread_info *info,
          */
         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
 
          */
         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
 
-        DEBUG_REQ(D_DLMTRACE, req, "no existing lock with rhandle "LPX64,
-                  remote_hdl.cookie);
+       DEBUG_REQ(D_DLMTRACE, req, "no existing lock with rhandle "LPX64,
+                 dlmreq->lock_handle[0].cookie);
 }
 
 static int mdt_intent_getxattr(enum mdt_it_code opcode,
 }
 
 static int mdt_intent_getxattr(enum mdt_it_code opcode,
@@ -3154,7 +3137,7 @@ static int mdt_intent_getxattr(enum mdt_it_code opcode,
         * (for the resend case) or a new lock. Below we will use it to
         * replace the original lock.
         */
         * (for the resend case) or a new lock. Below we will use it to
         * replace the original lock.
         */
-       mdt_intent_fixup_resent(info, *lockp, NULL, lhc, opcode);
+       mdt_intent_fixup_resent(info, *lockp, lhc, flags);
        if (!lustre_handle_is_used(&lhc->mlh_reg_lh)) {
                mdt_lock_reg_init(lhc, (*lockp)->l_req_mode);
                rc = mdt_object_lock(info, info->mti_object, lhc,
        if (!lustre_handle_is_used(&lhc->mlh_reg_lh)) {
                mdt_lock_reg_init(lhc, (*lockp)->l_req_mode);
                rc = mdt_object_lock(info, info->mti_object, lhc,
@@ -3166,7 +3149,7 @@ static int mdt_intent_getxattr(enum mdt_it_code opcode,
 
        grc = mdt_getxattr(info);
 
 
        grc = mdt_getxattr(info);
 
-       rc = mdt_intent_lock_replace(info, lockp, NULL, lhc, flags);
+       rc = mdt_intent_lock_replace(info, lockp, lhc, flags);
 
        if (mdt_info_req(info)->rq_repmsg != NULL)
                ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
 
        if (mdt_info_req(info)->rq_repmsg != NULL)
                ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
@@ -3184,7 +3167,6 @@ static int mdt_intent_getattr(enum mdt_it_code opcode,
                              __u64 flags)
 {
         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
                              __u64 flags)
 {
         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
-        struct ldlm_lock       *new_lock = NULL;
         __u64                   child_bits;
         struct ldlm_reply      *ldlm_rep;
         struct mdt_body        *reqbody;
         __u64                   child_bits;
         struct ldlm_reply      *ldlm_rep;
         struct mdt_body        *reqbody;
@@ -3223,7 +3205,7 @@ static int mdt_intent_getattr(enum mdt_it_code opcode,
         mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD);
 
        /* Get lock from request for possible resent case. */
         mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD);
 
        /* Get lock from request for possible resent case. */
-       mdt_intent_fixup_resent(info, *lockp, &new_lock, lhc, opcode);
+       mdt_intent_fixup_resent(info, *lockp, lhc, flags);
 
        rc = mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep);
        ldlm_rep->lock_policy_res2 = clear_serious(rc);
 
        rc = mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep);
        ldlm_rep->lock_policy_res2 = clear_serious(rc);
@@ -3236,7 +3218,7 @@ static int mdt_intent_getattr(enum mdt_it_code opcode,
                 GOTO(out_ucred, rc = ELDLM_LOCK_ABORTED);
         }
 
                 GOTO(out_ucred, rc = ELDLM_LOCK_ABORTED);
         }
 
-        rc = mdt_intent_lock_replace(info, lockp, new_lock, lhc, flags);
+       rc = mdt_intent_lock_replace(info, lockp, lhc, flags);
         EXIT;
 out_ucred:
         mdt_exit_ucred(info);
         EXIT;
 out_ucred:
         mdt_exit_ucred(info);
@@ -3329,7 +3311,7 @@ static int mdt_intent_reint(enum mdt_it_code opcode,
         }
 
        /* Get lock from request for possible resent case. */
         }
 
        /* Get lock from request for possible resent case. */
-       mdt_intent_fixup_resent(info, *lockp, NULL, lhc, opcode);
+       mdt_intent_fixup_resent(info, *lockp, lhc, flags);
 
         rc = mdt_reint_internal(info, lhc, opc);
 
 
         rc = mdt_reint_internal(info, lhc, opc);
 
@@ -3348,7 +3330,7 @@ static int mdt_intent_reint(enum mdt_it_code opcode,
        if (rc == -EREMOTE || mdt_get_disposition(rep, DISP_OPEN_LOCK)) {
                LASSERT(lustre_handle_is_used(&lhc->mlh_reg_lh));
                rep->lock_policy_res2 = 0;
        if (rc == -EREMOTE || mdt_get_disposition(rep, DISP_OPEN_LOCK)) {
                LASSERT(lustre_handle_is_used(&lhc->mlh_reg_lh));
                rep->lock_policy_res2 = 0;
-               rc = mdt_intent_lock_replace(info, lockp, NULL, lhc, flags);
+               rc = mdt_intent_lock_replace(info, lockp, lhc, flags);
                RETURN(rc);
        }
 
                RETURN(rc);
        }
 
@@ -3381,7 +3363,7 @@ static int mdt_intent_reint(enum mdt_it_code opcode,
                         LASSERTF(rc == 0, "Error occurred but lock handle "
                                  "is still in use, rc = %d\n", rc);
                         rep->lock_policy_res2 = 0;
                         LASSERTF(rc == 0, "Error occurred but lock handle "
                                  "is still in use, rc = %d\n", rc);
                         rep->lock_policy_res2 = 0;
-                        rc = mdt_intent_lock_replace(info, lockp, NULL, lhc, flags);
+                       rc = mdt_intent_lock_replace(info, lockp, lhc, flags);
                         RETURN(rc);
                 } else {
                         lhc->mlh_reg_lh.cookie = 0ull;
                         RETURN(rc);
                 } else {
                         lhc->mlh_reg_lh.cookie = 0ull;
index de5bd8f..dbfb7c5 100644 (file)
@@ -618,6 +618,10 @@ int mdt_lock_setup(struct mdt_thread_info *info,
                    struct mdt_object *o,
                    struct mdt_lock_handle *lh);
 
                    struct mdt_object *o,
                    struct mdt_lock_handle *lh);
 
+int mdt_check_resent_lock(struct mdt_thread_info *info,
+                         struct mdt_object *mo,
+                         struct mdt_lock_handle *lhc);
+
 int mdt_object_lock(struct mdt_thread_info *,
                     struct mdt_object *,
                     struct mdt_lock_handle *,
 int mdt_object_lock(struct mdt_thread_info *,
                     struct mdt_object *,
                     struct mdt_lock_handle *,
index fd54c2a..6a2d5ea 100644 (file)
@@ -1491,9 +1491,14 @@ int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep,
        if (flags & MDS_OPEN_RELEASE && !mdt_hsm_release_allow(ma))
                GOTO(out, rc = -EPERM);
 
        if (flags & MDS_OPEN_RELEASE && !mdt_hsm_release_allow(ma))
                GOTO(out, rc = -EPERM);
 
-       rc = mdt_object_open_lock(info, o, lhc, &ibits);
-        if (rc)
-                GOTO(out_unlock, rc);
+       rc = mdt_check_resent_lock(info, o, lhc);
+       if (rc < 0) {
+               GOTO(out, rc);
+       } else if (rc > 0) {
+               rc = mdt_object_open_lock(info, o, lhc, &ibits);
+               if (rc)
+                       GOTO(out_unlock, rc);
+       }
 
         if (ma->ma_valid & MA_PFID) {
                 parent = mdt_object_find(env, mdt, &ma->ma_pfid);
 
         if (ma->ma_valid & MA_PFID) {
                 parent = mdt_object_find(env, mdt, &ma->ma_pfid);
@@ -1811,22 +1816,10 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                          */
                         LASSERT(lhc != NULL);
 
                          */
                         LASSERT(lhc != NULL);
 
-                        if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
-                                struct ldlm_lock *lock;
-
-                                LASSERT(msg_flags & MSG_RESENT);
-
-                                lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
-                                if (!lock) {
-                                        CERROR("Invalid lock handle "LPX64"\n",
-                                               lhc->mlh_reg_lh.cookie);
-                                        LBUG();
-                                }
-                                LASSERT(fid_res_name_eq(mdt_object_fid(child),
-                                                        &lock->l_resource->lr_name));
-                                LDLM_LOCK_PUT(lock);
-                                rc = 0;
-                        } else {
+                       rc = mdt_check_resent_lock(info, child, lhc);
+                       if (rc < 0) {
+                               GOTO(out_child, result = rc);
+                       } else if (rc > 0) {
                                 mdt_lock_handle_init(lhc);
                                 mdt_lock_reg_init(lhc, LCK_PR);
 
                                 mdt_lock_handle_init(lhc);
                                 mdt_lock_reg_init(lhc, LCK_PR);
 
@@ -1856,9 +1849,12 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                }
         }
 
                }
         }
 
-       if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
+       rc = mdt_check_resent_lock(info, child, lhc);
+       if (rc < 0) {
+               GOTO(out_child, result = rc);
+       } else if (rc == 0) {
                /* the open lock might already be gotten in
                /* the open lock might already be gotten in
-                * mdt_intent_fixup_resent */
+                * ldlm_handle_enqueue() */
                LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
                if (create_flags & MDS_OPEN_LOCK)
                        mdt_set_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
                LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
                if (create_flags & MDS_OPEN_LOCK)
                        mdt_set_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
index 8c43a06..6337548 100644 (file)
@@ -99,7 +99,6 @@ int ofd_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp,
        struct ldlm_reply               *rep;
        ldlm_error_t                     err;
        int                              idx, rc, only_liblustre = 1;
        struct ldlm_reply               *rep;
        ldlm_error_t                     err;
        int                              idx, rc, only_liblustre = 1;
-       __u64                            tmpflags = 0;
        struct ldlm_interval_tree       *tree;
        struct ofd_intent_args           arg;
        __u32                            repsize[3] = {
        struct ldlm_interval_tree       *tree;
        struct ofd_intent_args           arg;
        __u32                            repsize[3] = {
@@ -142,8 +141,17 @@ int ofd_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp,
 
        LASSERT(ns == ldlm_res_to_ns(res));
        lock_res(res);
 
        LASSERT(ns == ldlm_res_to_ns(res));
        lock_res(res);
-       rc = policy(lock, &tmpflags, 0, &err, NULL);
-       check_res_locked(res);
+
+       /* Check if this is a resend case (MSG_RESENT is set on RPC) and a
+        * lock was found by ldlm_handle_enqueue(); if so no need to grant
+        * it again. */
+       if (flags & LDLM_FL_RESENT) {
+               rc = LDLM_ITER_CONTINUE;
+       } else {
+               __u64 tmpflags = 0;
+               rc = policy(lock, &tmpflags, 0, &err, NULL);
+               check_res_locked(res);
+       }
 
        /* The lock met with no resistance; we're finished. */
        if (rc == LDLM_ITER_CONTINUE) {
 
        /* The lock met with no resistance; we're finished. */
        if (rc == LDLM_ITER_CONTINUE) {
index 7cb957c..9fb5971 100644 (file)
@@ -1561,8 +1561,8 @@ static int ptlrpc_server_check_resend_in_progress(struct ptlrpc_request *req)
        return 0;
 
 found:
        return 0;
 
 found:
-       DEBUG_REQ(D_HA, req, "Found duplicate req in processing\n");
-       DEBUG_REQ(D_HA, tmp, "Request being processed\n");
+       DEBUG_REQ(D_HA, req, "Found duplicate req in processing");
+       DEBUG_REQ(D_HA, tmp, "Request being processed");
        return -EBUSY;
 }
 
        return -EBUSY;
 }
 
@@ -1979,6 +1979,10 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service_part *svcpt,
                 goto err_req;
         }
 
                 goto err_req;
         }
 
+       /* Skip early reply */
+       if (OBD_FAIL_PRECHECK(OBD_FAIL_MDS_RESEND))
+               req->rq_deadline += obd_timeout;
+
        req->rq_svc_thread = thread;
        if (thread != NULL) {
                /* initialize request session, it is needed for request
        req->rq_svc_thread = thread;
        if (thread != NULL) {
                /* initialize request session, it is needed for request
index cf7a5ae..afaef41 100755 (executable)
@@ -1123,11 +1123,11 @@ test_56() { # b=11277
 #define OBD_FAIL_MDS_RESEND      0x136
         touch $DIR/$tfile
         do_facet $SINGLEMDS "lctl set_param fail_loc=0x80000136"
 #define OBD_FAIL_MDS_RESEND      0x136
         touch $DIR/$tfile
         do_facet $SINGLEMDS "lctl set_param fail_loc=0x80000136"
-        stat $DIR/$tfile
+        stat $DIR/$tfile || error "stat failed"
         do_facet $SINGLEMDS "lctl set_param fail_loc=0"
         rm -f $DIR/$tfile
 }
         do_facet $SINGLEMDS "lctl set_param fail_loc=0"
         rm -f $DIR/$tfile
 }
-run_test 56 "do not allow reconnect to busy exports"
+run_test 56 "do not fail on getattr resend"
 
 test_57_helper() {
         # no oscs means no client or mdt 
 
 test_57_helper() {
         # no oscs means no client or mdt 
@@ -1945,6 +1945,36 @@ test_120() {
 }
 run_test 120 "flock race: completion vs. evict"
 
 }
 run_test 120 "flock race: completion vs. evict"
 
+test_113() {
+       local BEFORE=$(date +%s)
+       local EVICT
+
+       # modify dir so that next revalidate would not obtain UPDATE lock
+       touch $DIR
+
+       # drop 1 reply with UPDATE lock,
+       # resend should not create 2nd lock on server
+       mcreate $DIR/$tfile || error "mcreate failed: $?"
+       drop_ldlm_reply_once "stat $DIR/$tfile" || error "stat failed: $?"
+
+       # 2 BL AST will be sent to client, both must find the same lock,
+       # race them to not get EINVAL for 2nd BL AST
+       #define OBD_FAIL_LDLM_PAUSE_CANCEL2      0x31f
+       $LCTL set_param fail_loc=0x8000031f
+
+       $LCTL set_param ldlm.namespaces.*.early_lock_cancel=0 > /dev/null
+       chmod 0777 $DIR/$tfile || error "chmod failed: $?"
+       $LCTL set_param ldlm.namespaces.*.early_lock_cancel=1 > /dev/null
+
+       # let the client reconnect
+       client_reconnect
+       EVICT=$($LCTL get_param mdc.$FSNAME-MDT*.state |
+          awk -F"[ [,]" '/EVICTED]$/ { if (mx<$4) {mx=$4;} } END { print mx }')
+
+       [ -z "$EVICT" ] || [[ $EVICT -le $BEFORE ]] || error "eviction happened"
+}
+run_test 113 "ldlm enqueue dropped reply should not cause deadlocks"
+
 complete $SECONDS
 check_and_cleanup_lustre
 exit_status
 complete $SECONDS
 check_and_cleanup_lustre
 exit_status
index 87981a4..0bd6a22 100755 (executable)
@@ -4317,20 +4317,20 @@ drop_request() {
 
 drop_reply() {
 # OBD_FAIL_MDS_ALL_REPLY_NET
 
 drop_reply() {
 # OBD_FAIL_MDS_ALL_REPLY_NET
-    RC=0
-    do_facet $SINGLEMDS lctl set_param fail_loc=0x122
-    do_facet client "$@" || RC=$?
-    do_facet $SINGLEMDS lctl set_param fail_loc=0
-    return $RC
+       RC=0
+       do_facet $SINGLEMDS $LCTL set_param fail_loc=0x122
+       eval "$@" || RC=$?
+       do_facet $SINGLEMDS $LCTL set_param fail_loc=0
+       return $RC
 }
 
 drop_reint_reply() {
 # OBD_FAIL_MDS_REINT_NET_REP
 }
 
 drop_reint_reply() {
 # OBD_FAIL_MDS_REINT_NET_REP
-    RC=0
-    do_facet $SINGLEMDS lctl set_param fail_loc=0x119
-    do_facet client "$@" || RC=$?
-    do_facet $SINGLEMDS lctl set_param fail_loc=0
-    return $RC
+       RC=0
+       do_facet $SINGLEMDS $LCTL set_param fail_loc=0x119
+       eval "$@" || RC=$?
+       do_facet $SINGLEMDS $LCTL set_param fail_loc=0
+       return $RC
 }
 
 drop_update_reply() {
 }
 
 drop_update_reply() {
@@ -4382,9 +4382,24 @@ drop_bl_callback() {
 drop_ldlm_reply() {
 #define OBD_FAIL_LDLM_REPLY              0x30c
     RC=0
 drop_ldlm_reply() {
 #define OBD_FAIL_LDLM_REPLY              0x30c
     RC=0
-    do_facet $SINGLEMDS lctl set_param fail_loc=0x30c
+    local list=$(comma_list $(mdts_nodes) $(osts_nodes))
+    do_nodes $list lctl set_param fail_loc=0x30c
+
     do_facet client "$@" || RC=$?
     do_facet client "$@" || RC=$?
-    do_facet $SINGLEMDS lctl set_param fail_loc=0
+
+    do_nodes $list lctl set_param fail_loc=0
+    return $RC
+}
+
+drop_ldlm_reply_once() {
+#define OBD_FAIL_LDLM_REPLY              0x30c
+    RC=0
+    local list=$(comma_list $(mdts_nodes) $(osts_nodes))
+    do_nodes $list lctl set_param fail_loc=0x8000030c
+
+    do_facet client "$@" || RC=$?
+
+    do_nodes $list lctl set_param fail_loc=0
     return $RC
 }
 
     return $RC
 }