Whamcloud - gitweb
LU-1876 hsm: layout lock implementation on server side
[fs/lustre-release.git] / lustre / mdt / mdt_open.c
index 4fc48c7..f370e5e 100644 (file)
@@ -1115,6 +1115,115 @@ int mdt_open_by_fid(struct mdt_thread_info* info,
         RETURN(rc);
 }
 
+/* lock object for open */
+static int mdt_object_open_lock(struct mdt_thread_info *info,
+                               struct mdt_object *obj,
+                               struct mdt_lock_handle *lhc,
+                               __u64 *ibits)
+{
+       struct md_attr *ma = &info->mti_attr;
+       __u64 open_flags = info->mti_spec.sp_cr_flags;
+       ldlm_mode_t lm = LCK_CR;
+       bool try_layout = false;
+       bool create_layout = false;
+       int rc = 0;
+       ENTRY;
+
+       *ibits = 0;
+       if (open_flags & MDS_OPEN_LOCK) {
+               if (open_flags & FMODE_WRITE)
+                       lm = LCK_CW;
+               else if (open_flags & MDS_FMODE_EXEC)
+                       lm = LCK_PR;
+               else
+                       lm = LCK_CR;
+
+               *ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN;
+       }
+
+       if (S_ISREG(lu_object_attr(&obj->mot_obj.mo_lu))) {
+               if (ma->ma_need & MA_LOV && !(ma->ma_valid & MA_LOV) &&
+                   md_should_create(open_flags))
+                       create_layout = true;
+               if (exp_connect_layout(info->mti_exp) && !create_layout &&
+                   ma->ma_need & MA_LOV)
+                       try_layout = true;
+       }
+
+       mdt_lock_handle_init(lhc);
+       mdt_lock_reg_init(lhc, lm);
+
+       /* one problem to return layout lock on open is that it may result
+        * in too many layout locks cached on the client side. */
+       if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_OPEN) && try_layout) {
+               /* return lookup lock to validate inode at the client side,
+                * this is pretty important otherwise mdt will return layout
+                * lock for each open.
+                * However this is a double-edged sword because changing
+                * permission will revoke huge # of LOOKUP locks. */
+               *ibits |= MDS_INODELOCK_LAYOUT | MDS_INODELOCK_LOOKUP;
+               if (!mdt_object_lock_try(info, obj, lhc, *ibits,
+                                        MDT_CROSS_LOCK)) {
+                       *ibits &= ~(MDS_INODELOCK_LAYOUT|MDS_INODELOCK_LOOKUP);
+                       if (*ibits != 0)
+                               rc = mdt_object_lock(info, obj, lhc, *ibits,
+                                               MDT_CROSS_LOCK);
+               }
+       } else if (*ibits != 0) {
+               rc = mdt_object_lock(info, obj, lhc, *ibits, MDT_CROSS_LOCK);
+       }
+
+       CDEBUG(D_INODE, "Requested bits lock:"DFID ", ibits = "LPX64
+               ", open_flags = "LPO64", try_layout = %d, rc = %d\n",
+               PFID(mdt_object_fid(obj)), *ibits, open_flags, try_layout, rc);
+
+       /* will change layout, revoke layout locks by enqueuing EX lock. */
+       if (rc == 0 && create_layout) {
+               struct mdt_lock_handle *ll = &info->mti_lh[MDT_LH_LAYOUT];
+
+               CDEBUG(D_INODE, "Will create layout, get EX layout lock:"DFID
+                       ", open_flags = "LPO64"\n",
+                       PFID(mdt_object_fid(obj)), open_flags);
+
+               LASSERT(!try_layout);
+               mdt_lock_handle_init(ll);
+               mdt_lock_reg_init(ll, LCK_EX);
+               rc = mdt_object_lock(info, obj, ll, MDS_INODELOCK_LAYOUT,
+                                       MDT_LOCAL_LOCK);
+
+               OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LL_BLOCK, 2);
+       }
+
+       RETURN(rc);
+}
+
+static void mdt_object_open_unlock(struct mdt_thread_info *info,
+                                  struct mdt_object *obj,
+                                  struct mdt_lock_handle *lhc,
+                                  __u64 ibits, int rc)
+{
+       __u64 open_flags = info->mti_spec.sp_cr_flags;
+       struct mdt_lock_handle *ll = &info->mti_lh[MDT_LH_LAYOUT];
+
+       /* Release local layout lock - the layout lock put in MDT_LH_LAYOUT
+        * will never return to client side. */
+       if (lustre_handle_is_used(&ll->mlh_reg_lh)) {
+               LASSERT(!(ibits & MDS_INODELOCK_LAYOUT));
+               mdt_object_unlock(info, obj, ll, 1);
+       }
+
+       if (ibits == 0)
+               return;
+
+       if (!(open_flags & MDS_OPEN_LOCK) && !(ibits & MDS_INODELOCK_LAYOUT)) {
+               /* for the open request, the lock will only return to client
+                * if open or layout lock is granted. */
+               rc = 1;
+       }
+       if (rc != 0)
+               mdt_object_unlock(info, obj, lhc, 1);
+}
+
 int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep,
                         struct mdt_lock_handle *lhc)
 {
@@ -1126,7 +1235,7 @@ int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep,
         struct mdt_object       *parent= NULL;
         struct mdt_object       *o;
         int                      rc;
-        ldlm_mode_t              lm;
+       __u64                    ibits;
         ENTRY;
 
        if (md_should_create(flags) && !(flags & MDS_OPEN_HAS_EA)) {
@@ -1160,22 +1269,11 @@ int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep,
         mdt_set_disposition(info, rep, (DISP_IT_EXECD |
                                        DISP_LOOKUP_EXECD));
 
-        if (flags & FMODE_WRITE)
-                lm = LCK_CW;
-        else if (flags & MDS_FMODE_EXEC)
-                lm = LCK_PR;
-        else
-                lm = LCK_CR;
-
-        mdt_lock_handle_init(lhc);
-        mdt_lock_reg_init(lhc, lm);
-        rc = mdt_object_lock(info, o, lhc,
-                             MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN,
-                             MDT_CROSS_LOCK);
+       rc = mdt_attr_get_complex(info, o, ma);
         if (rc)
                 GOTO(out, rc);
 
-       rc = mdt_attr_get_complex(info, o, ma);
+       rc = mdt_object_open_lock(info, o, lhc, &ibits);
         if (rc)
                 GOTO(out, rc);
 
@@ -1191,18 +1289,15 @@ int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep,
         }
 
         rc = mdt_finish_open(info, parent, o, flags, 0, rep);
-
-        if (!(flags & MDS_OPEN_LOCK) || rc)
-                mdt_object_unlock(info, o, lhc, 1);
-
        if (!rc) {
                mdt_set_disposition(info, rep, DISP_LOOKUP_POS);
                if (flags & MDS_OPEN_LOCK)
                        mdt_set_disposition(info, rep, DISP_OPEN_LOCK);
        }
-
         GOTO(out, rc);
+
 out:
+       mdt_object_open_unlock(info, o, lhc, ibits, rc);
         mdt_object_put(env, o);
         if (parent != NULL)
                 mdt_object_put(env, parent);
@@ -1275,6 +1370,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
         struct lu_fid           *child_fid = &info->mti_tmp_fid1;
         struct md_attr          *ma = &info->mti_attr;
         __u64                    create_flags = info->mti_spec.sp_cr_flags;
+       __u64                    ibits;
         struct mdt_reint_record *rr = &info->mti_rr;
         struct lu_name          *lname;
         int                      result, rc;
@@ -1513,41 +1609,24 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
 
         LASSERT(!lustre_handle_is_used(&lhc->mlh_reg_lh));
 
-        /* get openlock if this is not replay and if a client requested it */
-        if (!req_is_replay(req) && create_flags & MDS_OPEN_LOCK) {
-                ldlm_mode_t lm;
-
-                if (create_flags & FMODE_WRITE)
-                        lm = LCK_CW;
-                else if (create_flags & MDS_FMODE_EXEC)
-                        lm = LCK_PR;
-                else
-                        lm = LCK_CR;
-                mdt_lock_handle_init(lhc);
-                mdt_lock_reg_init(lhc, lm);
-                rc = mdt_object_lock(info, child, lhc,
-                                     MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN,
-                                     MDT_CROSS_LOCK);
-                if (rc) {
-                        result = rc;
-                        GOTO(out_child, result);
-                } else {
-                        result = -EREMOTE;
-                        mdt_set_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
-                }
-        }
+       /* get openlock if this is not replay and if a client requested it */
+       if (!req_is_replay(req)) {
+               rc = mdt_object_open_lock(info, child, lhc, &ibits);
+               if (rc != 0) {
+                       GOTO(out_child, result = rc);
+               } else if (create_flags & MDS_OPEN_LOCK) {
+                       result = -EREMOTE;
+                       mdt_set_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
+               }
+       }
 
         /* Try to open it now. */
         rc = mdt_finish_open(info, parent, child, create_flags,
                              created, ldlm_rep);
         if (rc) {
                 result = rc;
-               if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
-                        /* openlock was acquired and mdt_finish_open failed -
-                           drop the openlock */
-                        mdt_object_unlock(info, child, lhc, 1);
-                       mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
-               }
+               /* openlock will be released if mdt_finish_open failed */
+               mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
                 if (created) {
                         ma->ma_need = 0;
                         ma->ma_valid = 0;
@@ -1564,6 +1643,8 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
         }
         EXIT;
 out_child:
+       mdt_object_open_unlock(info, child, lhc, ibits,
+                               result == -EREMOTE ? 0 : result);
         mdt_object_put(info->mti_env, child);
 out_parent:
         mdt_object_unlock_put(info, parent, lh, result || !created);