Whamcloud - gitweb
LU-13577 wbc: reimplement mkdir() by using intent lock
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
index 6b4e66a..620fecf 100644 (file)
@@ -486,7 +486,7 @@ unlock_parent:
  * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity
  * check.
  */
-static int mdt_create(struct mdt_thread_info *info)
+static int mdt_create(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
 {
        struct mdt_device *mdt = info->mti_mdt;
        struct mdt_object *parent;
@@ -497,6 +497,7 @@ static int mdt_create(struct mdt_thread_info *info)
        struct mdt_reint_record *rr = &info->mti_rr;
        struct md_op_spec *spec = &info->mti_spec;
        struct lu_ucred *uc = mdt_ucred(info);
+       struct ldlm_reply *dlmrep = NULL;
        bool restripe = false;
        bool recreate_obj = false;
        int rc;
@@ -558,6 +559,15 @@ static int mdt_create(struct mdt_thread_info *info)
        }
 
        repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+       /*
+        * TODO: rewrite ll_mknod(), ll_create_nd(), ll_symlink(),
+        * ll_dir_setdirstripe() to all use intent lock.
+        */
+       if (info->mti_intent_lock) {
+               dlmrep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
+               mdt_set_disposition(info, dlmrep,
+                                   DISP_IT_EXECD | DISP_LOOKUP_EXECD);
+       }
 
        parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
        if (IS_ERR(parent))
@@ -649,6 +659,9 @@ static int mdt_create(struct mdt_thread_info *info)
        if (unlikely(rc == 0 && !recreate_obj))
                GOTO(unlock_parent, rc = -EEXIST);
 
+       if (info->mti_intent_lock)
+               mdt_set_disposition(info, dlmrep, DISP_OPEN_CREATE);
+
        child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
        if (unlikely(IS_ERR(child)))
                GOTO(unlock_parent, rc = PTR_ERR(child));
@@ -689,24 +702,44 @@ static int mdt_create(struct mdt_thread_info *info)
        if (rc < 0)
                GOTO(put_child, rc);
 
-       if (S_ISDIR(ma->ma_attr.la_mode) &&
-           (info->mti_spec.sp_cr_flags & MDS_MKDIR_LMV))
+       if ((S_ISDIR(ma->ma_attr.la_mode) &&
+            (info->mti_spec.sp_cr_flags & MDS_MKDIR_LMV)) ||
+            info->mti_intent_lock)
                mdt_prep_ma_buf_from_rep(info, child, ma, 0);
 
        rc = mdt_attr_get_complex(info, child, ma);
        if (rc < 0)
                GOTO(put_child, rc);
 
+       if (ma->ma_valid & MA_LOV) {
+               LASSERT(ma->ma_lmm_size != 0);
+               repbody->mbo_eadatasize = ma->ma_lmm_size;
+               if (S_ISREG(ma->ma_attr.la_mode))
+                       repbody->mbo_valid |= OBD_MD_FLEASIZE;
+               else if (S_ISDIR(ma->ma_attr.la_mode))
+                       repbody->mbo_valid |= OBD_MD_FLDIREA;
+       }
+
        if (ma->ma_valid & MA_LMV) {
                mdt_dump_lmv(D_INFO, ma->ma_lmv);
                repbody->mbo_eadatasize = ma->ma_lmv_size;
                repbody->mbo_valid |= (OBD_MD_FLDIREA|OBD_MD_MEA);
        }
 
+       if (ma->ma_valid & MA_LMV_DEF) {
+               /* Return -EOPNOTSUPP for old client. */
+               if (!mdt_is_striped_client(mdt_info_req(info)->rq_export))
+                       GOTO(put_child, rc = -EOPNOTSUPP);
+
+               LASSERT(S_ISDIR(ma->ma_attr.la_mode));
+               repbody->mbo_valid |= OBD_MD_FLDIREA | OBD_MD_DEFAULT_MEA;
+       }
+
        /* save child locks to eliminate dependey between 'mkdir a' and
         * 'mkdir a/b' if b is a remote directory
         */
-       if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) {
+       if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode) &&
+           !info->mti_intent_lock) {
                struct mdt_lock_handle *lhc;
                struct ldlm_enqueue_info *einfo = &info->mti_einfo;
 
@@ -723,11 +756,46 @@ static int mdt_create(struct mdt_thread_info *info)
        if (ma->ma_valid & MA_INODE)
                mdt_pack_attr2body(info, repbody, &ma->ma_attr,
                                   mdt_object_fid(child));
+
+       if (info->mti_intent_lock) {
+               mdt_set_disposition(info, dlmrep, DISP_LOOKUP_NEG);
+               rc = mdt_check_resent_lock(info, child, lhc);
+               /*
+                * rc < 0 is error and we fall right back through,
+                * rc == 0 is the open lock might already be gotten in
+                * ldlm_handle_enqueue due to this being a resend.
+                */
+               if (rc <= 0)
+                       GOTO(put_child, rc);
+
+               /*
+                * For the normal intent create (mkdir):
+                * - Grant LOOKUP lock with CR mode to the client at
+                *   least.
+                * - Grant the lock similar to getattr():
+                *   lock mode: PR;
+                *   inodebits: LOOK | UPDATE | PERM [| LAYOUT].
+                * However, it can not grant LCK_CR to the client as during
+                * the setting of LMV layout for a directory from a client,
+                * it will acquire LCK_PW mode lock which is compat with LCK_CR
+                * lock mode, this may result that the cached LMV layout on a
+                * client will not be released when set (default) LMV layout on
+                * a directory.
+                * Due to the above reason, it grants a lock with LCK_PR mode to
+                * the client.
+                */
+               rc = mdt_object_lock(info, child, lhc, MDS_INODELOCK_LOOKUP |
+                                    MDS_INODELOCK_UPDATE | MDS_INODELOCK_PERM,
+                                    LCK_PR);
+       }
+
        EXIT;
 put_child:
        mdt_object_put(info->mti_env, child);
 unlock_parent:
        mdt_object_unlock(info, parent, lh, rc);
+       if (rc && dlmrep)
+               mdt_clear_disposition(info, dlmrep, DISP_OPEN_CREATE);
 put_parent:
        mdt_object_put(info->mti_env, parent);
        return rc;
@@ -1089,7 +1157,7 @@ static int mdt_reint_create(struct mdt_thread_info *info,
                RETURN(err_serious(-EOPNOTSUPP));
        }
 
-       rc = mdt_create(info);
+       rc = mdt_create(info, lhc);
        if (rc == 0) {
                if ((info->mti_attr.ma_attr.la_mode & S_IFMT) == S_IFDIR)
                        mdt_counter_incr(req, LPROC_MDT_MKDIR,