Whamcloud - gitweb
- fixes about lock fixup, replace and some resent issues.
authoryury <yury>
Wed, 20 Sep 2006 08:13:14 +0000 (08:13 +0000)
committeryury <yury>
Wed, 20 Sep 2006 08:13:14 +0000 (08:13 +0000)
lustre/mdt/mdt_handler.c

index b4a2f0d..951b70c 100644 (file)
@@ -1809,26 +1809,47 @@ static struct mdt_it_flavor {
 };
 
 int mdt_intent_lock_replace(struct mdt_thread_info *info, 
-                           struct ldlm_lock **lockp,
-                           struct mdt_lock_handle *lh, int flags)
+                            struct ldlm_lock **lockp,
+                            struct ldlm_lock *new_lock,
+                            struct mdt_lock_handle *lh,
+                            int flags)
 {
         struct ptlrpc_request  *req = mdt_info_req(info);
-        struct ldlm_lock       *old_lock = *lockp;
-        struct ldlm_lock       *new_lock = NULL;
+        struct ldlm_lock       *lock = *lockp;
 
-        new_lock = ldlm_handle2lock(&lh->mlh_lh);
+        /*
+         * Get new lock only for cases when possible resent did not find any
+         * lock.
+         */
+        if (new_lock == NULL)
+                new_lock = ldlm_handle2lock(&lh->mlh_lh);
 
         if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
                 RETURN(0);
 
-        LASSERTF(new_lock != NULL, "lockh "LPX64"\n", lh->mlh_lh.cookie);
+        LASSERTF(new_lock != NULL,
+                 "lockh "LPX64"\n", lh->mlh_lh.cookie);
+
+        /*
+         * If we've already given this lock to a client once, then we should
+         * have no readers or writers.  Otherwise, we should have one reader
+         * _or_ writer ref (which will be zeroed below) before returning the
+         * lock to a client.
+         */
+        if (new_lock->l_export == req->rq_export) {
+                LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
+        } else {
+                LASSERT(new_lock->l_export == NULL);
+                LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
+        }
 
         *lockp = new_lock;
 
-        /* FIXME:This only happens when MDT can handle RESENT */
         if (new_lock->l_export == req->rq_export) {
-                /* Already gave this to the client, which means that we
-                 * reconstructed a reply. */
+                /*
+                 * Already gave this to the client, which means that we
+                 * reconstructed a reply.
+                 */
                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
                         MSG_RESENT);
                 RETURN(ELDLM_LOCK_REPLACED);
@@ -1843,11 +1864,9 @@ int mdt_intent_lock_replace(struct mdt_thread_info *info,
         list_add(&new_lock->l_export_chain,
                  &new_lock->l_export->exp_ldlm_data.led_held_locks);
 
-        new_lock->l_blocking_ast = old_lock->l_blocking_ast;
-        new_lock->l_completion_ast = old_lock->l_completion_ast;
-
-        new_lock->l_remote_handle = old_lock->l_remote_handle;
-
+        new_lock->l_blocking_ast = lock->l_blocking_ast;
+        new_lock->l_completion_ast = lock->l_completion_ast;
+        new_lock->l_remote_handle = lock->l_remote_handle;
         new_lock->l_flags &= ~LDLM_FL_LOCAL;
 
         unlock_res_and_lock(new_lock);
@@ -1859,6 +1878,7 @@ int mdt_intent_lock_replace(struct mdt_thread_info *info,
 
 static void mdt_fixup_resent(struct req_capsule *pill,
                              struct ldlm_lock *new_lock,
+                             struct ldlm_lock **old_lock,
                              struct mdt_lock_handle *lh)
 {
         struct ptlrpc_request  *req = pill->rc_req;
@@ -1886,7 +1906,10 @@ static void mdt_fixup_resent(struct req_capsule *pill,
                         LDLM_DEBUG(lock, "restoring lock cookie");
                         DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
                                   lh->mlh_lh.cookie);
-                        break;
+                        if (old_lock)
+                                *old_lock = LDLM_LOCK_GET(lock);
+                        spin_unlock(&exp->exp_ldlm_data.led_lock);
+                        return;
                 }
         }
         spin_unlock(&exp->exp_ldlm_data.led_lock);
@@ -1919,10 +1942,11 @@ static int mdt_intent_getattr(enum mdt_it_code opcode,
                               struct ldlm_lock **lockp,
                               int flags)
 {
-        struct ldlm_reply      *ldlm_rep;
-        struct mdt_lock_handle  tmp_lock = { 0 };
+        struct mdt_lock_handle  tmp_lock = { { 0 } };
         struct mdt_lock_handle *lhc = &tmp_lock;
+        struct ldlm_lock       *new_lock = NULL;
         __u64                   child_bits;
+        struct ldlm_reply      *ldlm_rep;
         struct ptlrpc_request  *req;
 
         ENTRY;
@@ -1943,8 +1967,8 @@ static int mdt_intent_getattr(enum mdt_it_code opcode,
         ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
         mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD);
 
-        if ((lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
-                mdt_fixup_resent(&info->mti_pill, *lockp, lhc);
+        /* Get lock from request for possible resent case. */
+        mdt_fixup_resent(&info->mti_pill, *lockp, &new_lock, lhc);
         
         ldlm_rep->lock_policy_res2 =
                 mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep);
@@ -1957,7 +1981,7 @@ static int mdt_intent_getattr(enum mdt_it_code opcode,
                 RETURN(ELDLM_LOCK_ABORTED);
         }
 
-        return mdt_intent_lock_replace(info, lockp, lhc, flags);
+        return mdt_intent_lock_replace(info, lockp, new_lock, lhc, flags);
 }
 
 static int mdt_intent_reint(enum mdt_it_code opcode,
@@ -1999,7 +2023,7 @@ static int mdt_intent_reint(enum mdt_it_code opcode,
                struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_NEW];
                LASSERT(lhc->mlh_lh.cookie != 0);
                rep->lock_policy_res2 = 0;
-               return mdt_intent_lock_replace(info, lockp, lhc, flags);
+               return mdt_intent_lock_replace(info, lockp, NULL, lhc, flags);
         }
         rep->lock_policy_res2 = rc;