Whamcloud - gitweb
- added resent support for getatr_name.
authoryury <yury>
Tue, 19 Sep 2006 18:11:44 +0000 (18:11 +0000)
committeryury <yury>
Tue, 19 Sep 2006 18:11:44 +0000 (18:11 +0000)
lustre/mdt/mdt_handler.c

index 155f385..b4a2f0d 100644 (file)
@@ -423,15 +423,21 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                                  __u64 child_bits,
                                  struct ldlm_reply *ldlm_rep)
 {
-        struct mdt_object *parent = info->mti_object;
-        struct mdt_object *child;
-        struct md_object  *next = mdt_object_child(info->mti_object);
-        struct lu_fid     *child_fid = &info->mti_tmp_fid1;
-        const char        *name;
-        int               rc;
+        struct ptlrpc_request *req = mdt_info_req(info);
+        struct mdt_object     *parent = info->mti_object;
+        struct mdt_object     *child;
+        struct md_object      *next = mdt_object_child(info->mti_object);
+        struct lu_fid         *child_fid = &info->mti_tmp_fid1;
+        int                    is_resent, rc;
+        const char            *name;
         struct mdt_lock_handle *lhp;
+        struct ldlm_lock      *lock;
         ENTRY;
 
+        is_resent = lustre_handle_is_used(&lhc->mlh_lh);
+        if (is_resent)
+                LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
+        
         LASSERT(info->mti_object != NULL);
         name = req_capsule_client_get(&info->mti_pill, &RMF_NAME);
         if (name == NULL)
@@ -446,12 +452,25 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
                 child = parent;
                 CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID
-                               ", ldlm_rep=%p\n",
-                               PFID(mdt_object_fid(child)), ldlm_rep);
-
-                mdt_lock_handle_init(lhc);
-                lhc->mlh_mode = LCK_CR;
-                rc = mdt_object_lock(info, child, lhc, child_bits);
+                       ", ldlm_rep=%p\n", PFID(mdt_object_fid(child)), ldlm_rep);
+
+                if (is_resent) {
+                        /* Do not take lock for resent case. */
+                        lock = ldlm_handle2lock(&lhc->mlh_lh);
+                        if (!lock) {
+                                CERROR("Invalid lock handle "LPX64"\n",
+                                       lhc->mlh_lh.cookie);
+                                LBUG();
+                        }
+                        LASSERT(fid_res_name_eq(mdt_object_fid(child),
+                                                &lock->l_resource->lr_name));
+                        LDLM_LOCK_PUT(lock);
+                        rc = 0;
+                } else {
+                        mdt_lock_handle_init(lhc);
+                        lhc->mlh_mode = LCK_CR;
+                        rc = mdt_object_lock(info, child, lhc, child_bits);
+                }
                 if (rc == 0) {
                         /* finally, we can get attr for child. */
                         rc = mdt_getattr_internal(info, child);
@@ -480,17 +499,32 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
          *step 3: find the child object by fid & lock it.
          *        regardless if it is local or remote.
          */
-        mdt_lock_handle_init(lhc);
-        lhc->mlh_mode = LCK_CR;
-        child = mdt_object_find_lock(info, child_fid, lhc, child_bits);
+        if (is_resent) {
+                /* Do not take lock for resent case. */
+                lock = ldlm_handle2lock(&lhc->mlh_lh);
+                if (!lock) {
+                        CERROR("Invalid lock handle "LPX64"\n",
+                               lhc->mlh_lh.cookie);
+                        LBUG();
+                }
+                LASSERT(fid_res_name_eq(mdt_object_fid(child),
+                                        &lock->l_resource->lr_name));
+                LDLM_LOCK_PUT(lock);
+                child = mdt_object_find(info->mti_ctxt, info->mti_mdt,
+                                        child_fid);
+        } else {
+                mdt_lock_handle_init(lhc);
+                lhc->mlh_mode = LCK_CR;
+                child = mdt_object_find_lock(info, child_fid, lhc, child_bits);
+        }
         if (IS_ERR(child))
                 GOTO(out_parent, rc = PTR_ERR(child));
 
         /* finally, we can get attr for child. */
         rc = mdt_getattr_internal(info, child);
-        if (rc != 0)
+        if (rc != 0) {
                 mdt_object_unlock(info, child, lhc, 1);
-        else {
+        else {
                 struct ldlm_lock *lock = ldlm_handle2lock(&lhc->mlh_lh);
                 if (lock) {
                         struct ldlm_res_id *res_id;
@@ -1084,7 +1118,7 @@ int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f,
              ldlm_policy_data_t *policy,
              struct ldlm_res_id *res_id)
 {
-        int flags = 0; /*XXX: LDLM_FL_LOCAL_ONLY?*/
+        int flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
         int rc;
 
         LASSERT(ns != NULL);
@@ -1098,22 +1132,26 @@ int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f,
         return rc == ELDLM_OK ? 0 : -EIO;
 }
 
-/* just call ldlm_lock_decref() if decref,
- * else we only call ptlrpc_save_lock() to save this lock in req.
- * when transaction committed, req will be released, and lock will, too */
+/*
+ * Just call ldlm_lock_decref() if decref, else we only call ptlrpc_save_lock()
+ * to save this lock in req.  when transaction committed, req will be released,
+ * and lock will, too.
+ */
 void fid_unlock(struct ptlrpc_request *req, const struct lu_fid *f,
                 struct lustre_handle *lh, ldlm_mode_t mode, int decref)
 {
         {
-        /* FIXME: this is debug stuff, remove it later. */
+                /* FIXME: this is debug stuff, remove it later. */
                 struct ldlm_lock *lock = ldlm_handle2lock(lh);
                 if (!lock) {
-                        CERROR("invalid lock handle "LPX64, lh->cookie);
+                        CERROR("Invalid lock handle "LPX64"\n",
+                               lh->cookie);
                         LBUG();
                 }
                 LASSERT(fid_res_name_eq(f, &lock->l_resource->lr_name));
                 LDLM_LOCK_PUT(lock);
         }
+        
         if (decref)
                 ldlm_lock_decref(lh, mode);
         else
@@ -1819,15 +1857,73 @@ int mdt_intent_lock_replace(struct mdt_thread_info *info,
         RETURN(ELDLM_LOCK_REPLACED);
 }
 
+static void mdt_fixup_resent(struct req_capsule *pill,
+                             struct ldlm_lock *new_lock,
+                             struct mdt_lock_handle *lh)
+{
+        struct ptlrpc_request  *req = pill->rc_req;
+        struct obd_export      *exp = req->rq_export;
+        struct lustre_handle    remote_hdl;
+        struct ldlm_request    *dlmreq;
+        struct list_head       *iter;
+
+        if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
+                return;
+
+        dlmreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
+        remote_hdl = dlmreq->lock_handle1;
+        
+        spin_lock(&exp->exp_ldlm_data.led_lock);
+        list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
+                struct ldlm_lock *lock;
+                lock = list_entry(iter, struct ldlm_lock, l_export_chain);
+                if (lock == new_lock)
+                        continue;
+                if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
+                        lh->mlh_lh.cookie = lock->l_handle.h_cookie;
+                        lh->mlh_mode = lock->l_granted_mode;
+                        
+                        LDLM_DEBUG(lock, "restoring lock cookie");
+                        DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
+                                  lh->mlh_lh.cookie);
+                        break;
+                }
+        }
+        spin_unlock(&exp->exp_ldlm_data.led_lock);
+
+        /*
+         * If the xid matches, then we know this is a resent request, and allow
+         * it. (It's probably an OPEN, for which we don't send a lock.
+         */
+        if (req->rq_xid ==
+            le64_to_cpu(exp->exp_mdt_data.med_mcd->mcd_last_xid))
+                return;
+
+        if (req->rq_xid ==
+            le64_to_cpu(exp->exp_mdt_data.med_mcd->mcd_last_close_xid))
+                return;
+
+        /*
+         * This remote handle isn't enqueued, so we never received or processed
+         * this request.  Clear MSG_RESENT, because it can be handled like any
+         * normal request now.
+         */
+        lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
+
+        DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64,
+                  remote_hdl.cookie);
+}
+
 static int mdt_intent_getattr(enum mdt_it_code opcode,
                               struct mdt_thread_info *info,
                               struct ldlm_lock **lockp,
                               int flags)
 {
         struct ldlm_reply      *ldlm_rep;
-        struct mdt_lock_handle  tmp_lock;
+        struct mdt_lock_handle  tmp_lock = { 0 };
         struct mdt_lock_handle *lhc = &tmp_lock;
         __u64                   child_bits;
+        struct ptlrpc_request  *req;
 
         ENTRY;
 
@@ -1843,9 +1939,13 @@ static int mdt_intent_getattr(enum mdt_it_code opcode,
                 RETURN(-EINVAL);
         }
 
+        req = info->mti_pill.rc_req;
         ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
         mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD);
 
+        if ((lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
+                mdt_fixup_resent(&info->mti_pill, *lockp, lhc);
+        
         ldlm_rep->lock_policy_res2 =
                 mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep);
         mdt_shrink_reply(info, DLM_REPLY_REC_OFF + 1);