Whamcloud - gitweb
LU-13577 wbc: reimplement mkdir() by using intent lock 47/38647/32
authorQian Yingjin <qian@ddn.com>
Mon, 18 May 2020 07:18:08 +0000 (15:18 +0800)
committerOleg Drokin <green@whamcloud.com>
Tue, 30 Apr 2024 06:51:35 +0000 (06:51 +0000)
This patch reworks mkdir() by using intent lock.
Instead of reint mkdir implementation without any lock returned,
a ibits lock (current PR LOOKUP|PERM) is granted to the client and
cached on the client-side lock namespaces by the mkdir() intent
lock request.

This is also a basic requirement for the coming WBC feature, i.e,
create a new directory and an EX WBC lock is returned from MDT in
intent lock request, then this root WBC directory can be safely
cached on the client under the protection of the root WBC EX lock.

This patch also adds a tuning parameter "llite.*.intent_mkdir" to
enable or disable mkdir() by using intent lock. It is set with 0
by default to disable intent mkdir().

Signed-off-by: Qian Yingjin <qian@ddn.com>
Change-Id: I94e4c2f8262d7ffb27d85b5569070049a47354d7
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/38647
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Lai Siyao <lai.siyao@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
17 files changed:
lustre/llite/llite_internal.h
lustre/llite/lproc_llite.c
lustre/llite/namei.c
lustre/lmv/lmv_intent.c
lustre/lmv/lmv_internal.h
lustre/lmv/lmv_obd.c
lustre/mdc/mdc_locks.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_lib.c
lustre/mdt/mdt_recovery.c
lustre/mdt/mdt_reint.c
lustre/tests/replay-single-lmv.sh
lustre/tests/replay-single.sh
lustre/tests/sanity-pcc.sh
lustre/tests/sanity.sh
lustre/tests/sanityn.sh

index 4527bac..2841870 100644 (file)
@@ -884,7 +884,8 @@ struct ll_sb_info {
                                 ll_client_common_fill_super_succeeded:1,
                                 ll_checksum_set:1,
                                 ll_inode_cache_enabled:1,
-                                ll_enable_statahead_fname:1;
+                                ll_enable_statahead_fname:1,
+                                ll_intent_mkdir_enabled:1;
 
        struct lustre_client_ocd ll_lco;
 
index ca54967..5ae18df 100644 (file)
@@ -1239,6 +1239,33 @@ static ssize_t xattr_cache_store(struct kobject *kobj,
 }
 LUSTRE_RW_ATTR(xattr_cache);
 
+static ssize_t intent_mkdir_show(struct kobject *kobj,
+                                struct attribute *attr, char *buf)
+{
+       struct ll_sb_info *sbi = container_of(kobj, struct ll_sb_info,
+                                             ll_kset.kobj);
+
+       return scnprintf(buf, PAGE_SIZE, "%u\n", sbi->ll_intent_mkdir_enabled);
+}
+
+static ssize_t intent_mkdir_store(struct kobject *kobj, struct attribute *attr,
+                                 const char *buffer, size_t count)
+{
+       struct ll_sb_info *sbi = container_of(kobj, struct ll_sb_info,
+                                             ll_kset.kobj);
+       bool val;
+       int rc;
+
+       rc = kstrtobool(buffer, &val);
+       if (rc)
+               return rc;
+
+       sbi->ll_intent_mkdir_enabled = val;
+
+       return count;
+}
+LUSTRE_RW_ATTR(intent_mkdir);
+
 static ssize_t tiny_write_show(struct kobject *kobj,
                               struct attribute *attr,
                               char *buf)
@@ -2097,6 +2124,7 @@ static struct attribute *llite_attrs[] = {
        &lustre_attr_max_easize.attr,
        &lustre_attr_default_easize.attr,
        &lustre_attr_xattr_cache.attr,
+       &lustre_attr_intent_mkdir.attr,
        &lustre_attr_fast_read.attr,
        &lustre_attr_tiny_write.attr,
        &lustre_attr_parallel_dio.attr,
index f5c9177..42d587a 100644 (file)
@@ -1552,37 +1552,25 @@ unlock:
        up_read(&rlli->lli_lsm_sem);
 }
 
-static int ll_new_node(struct inode *dir, struct dentry *dchild,
-                      const char *tgt, umode_t mode, __u64 rdev, __u32 opc)
+static int ll_new_node_prepare(struct inode *dir, struct dentry *dchild,
+                              umode_t mode, __u32 opc, bool *encrypt,
+                              const char *tgt, struct md_op_data **op_datap,
+                              struct lmv_user_md **lump, void **datap,
+                              size_t *datalen, struct llcrypt_str *disk_link)
 {
-       struct qstr *name = &dchild->d_name;
-       struct ptlrpc_request *request = NULL;
-       struct md_op_data *op_data = NULL;
-       struct inode *inode = NULL;
        struct ll_sb_info *sbi = ll_i2sbi(dir);
-       struct llcrypt_str *disk_link = NULL;
-       bool encrypt = false;
-       struct lmv_user_md *lum = NULL;
-       const void *data = NULL;
-       size_t datalen = 0;
+       struct lmv_user_md *lum = *lump;
+       struct md_op_data *op_data = NULL;
        int err;
 
        ENTRY;
-       if (unlikely(tgt != NULL)) {
-               disk_link = (struct llcrypt_str *)rdev;
-               rdev = 0;
-               if (!disk_link)
-                       RETURN(-EINVAL);
-               data = disk_link->name;
-               datalen = disk_link->len;
-       }
 
-again:
-       op_data = ll_prep_md_op_data(NULL, dir, NULL, name->name,
-                                    name->len, 0, opc, NULL);
+       op_data = ll_prep_md_op_data(NULL, dir, NULL, dchild->d_name.name,
+                                    dchild->d_name.len, mode, opc, NULL);
        if (IS_ERR(op_data))
-               GOTO(err_exit, err = PTR_ERR(op_data));
+               RETURN(PTR_ERR(op_data));
 
+       *op_datap = op_data;
        if (S_ISDIR(mode)) {
                ll_qos_mkdir_prep(op_data, dir);
                if ((exp_connect_flags2(ll_i2mdexp(dir)) &
@@ -1609,8 +1597,9 @@ again:
                        lum->lum_max_inherit_rr = lsm->lsm_md_max_inherit_rr;
                        lum->lum_pool_name[0] = 0;
                        op_data->op_bias |= MDS_CREATE_DEFAULT_LMV;
-                       data = lum;
-                       datalen = sizeof(*lum);
+                       *lump = lum;
+                       *datap = lum;
+                       *datalen = sizeof(*lum);
                }
        }
 
@@ -1635,10 +1624,10 @@ again:
                        GOTO(err_exit, err);
                if (!llcrypt_has_encryption_key(dir))
                        GOTO(err_exit, err = -ENOKEY);
-               encrypt = true;
+               *encrypt = true;
        }
 
-       if (encrypt) {
+       if (*encrypt) {
                err = llcrypt_inherit_context(dir, NULL, op_data, false);
                if (err)
                        GOTO(err_exit, err);
@@ -1674,11 +1663,111 @@ again:
                        if (err)
                                GOTO(err_exit, err);
 
-                       data = disk_link->name;
-                       datalen = disk_link->len;
+                       *datap = disk_link->name;
+                       *datalen = disk_link->len;
+               }
+       }
+
+       RETURN(0);
+err_exit:
+       if (!IS_ERR_OR_NULL(op_data)) {
+               ll_finish_md_op_data(op_data);
+               *op_datap = NULL;
+       }
+       if (lum) {
+               OBD_FREE_PTR(lum);
+               *lump = NULL;
+       }
+       RETURN(err);
+}
+
+static int ll_new_node_finish(struct inode *dir, struct dentry *dchild,
+                             bool encrypt, umode_t mode, const char *tgt,
+                             struct inode **inode, struct md_op_data *op_data,
+                             struct ptlrpc_request *request)
+{
+       int err;
+
+       ENTRY;
+
+       ll_update_times(request, dir);
+
+       CFS_FAIL_TIMEOUT(OBD_FAIL_LLITE_NEWNODE_PAUSE, cfs_fail_val);
+
+       err = ll_prep_inode(inode, &request->rq_pill, dchild->d_sb, NULL);
+       if (err)
+               RETURN(err);
+
+       /* must be done before d_instantiate, because it calls
+        * security_d_instantiate, which means a getxattr if security
+        * context is not set yet
+        */
+       err = ll_inode_notifysecctx(*inode,
+                                   op_data->op_file_secctx,
+                                   op_data->op_file_secctx_size);
+       if (err)
+               RETURN(err);
+
+       d_instantiate(dchild, *inode);
+
+       if (encrypt) {
+               err = ll_set_encflags(*inode, op_data->op_file_encctx,
+                                     op_data->op_file_encctx_size, true);
+               if (err)
+                       RETURN(err);
+
+               if (S_ISLNK(mode)) {
+                       struct ll_inode_info *lli = ll_i2info(*inode);
+
+                       /* Cache the plaintext symlink target
+                        * for later use by get_link()
+                        */
+                       OBD_ALLOC(lli->lli_symlink_name, strlen(tgt) + 1);
+                       /* do not return an error if we cannot
+                        * cache the symlink locally
+                        */
+                       if (lli->lli_symlink_name)
+                               memcpy(lli->lli_symlink_name,
+                                      tgt, strlen(tgt) + 1);
                }
        }
 
+       if (!test_bit(LL_SBI_FILE_SECCTX, ll_i2sbi(dir)->ll_flags))
+               err = ll_inode_init_security(dchild, *inode, dir);
+
+       RETURN(err);
+}
+
+static int ll_new_node(struct inode *dir, struct dentry *dchild,
+                      const char *tgt, umode_t mode, __u64 rdev, __u32 opc)
+{
+       struct ptlrpc_request *request = NULL;
+       struct md_op_data *op_data = NULL;
+       struct inode *inode = NULL;
+       struct ll_sb_info *sbi = ll_i2sbi(dir);
+       struct llcrypt_str *disk_link = NULL;
+       bool encrypt = false;
+       struct lmv_user_md *lum = NULL;
+       void *data = NULL;
+       size_t datalen = 0;
+       int err;
+
+       ENTRY;
+       if (unlikely(tgt != NULL)) {
+               disk_link = (struct llcrypt_str *)rdev;
+               rdev = 0;
+               if (!disk_link)
+                       RETURN(-EINVAL);
+               data = disk_link->name;
+               datalen = disk_link->len;
+       }
+
+again:
+       err = ll_new_node_prepare(dir, dchild, mode, opc, &encrypt, tgt,
+                                 &op_data, &lum, &data, &datalen, disk_link);
+       if (err)
+               GOTO(err_exit, err);
+
        err = md_create(sbi->ll_md_exp, op_data, data, datalen, mode,
                        from_kuid(&init_user_ns, current_fsuid()),
                        from_kgid(&init_user_ns, current_fsgid()),
@@ -1753,54 +1842,11 @@ again:
        if (err < 0)
                GOTO(err_exit, err);
 
-       ll_update_times(request, dir);
-
-       CFS_FAIL_TIMEOUT(OBD_FAIL_LLITE_NEWNODE_PAUSE, cfs_fail_val);
-
-       err = ll_prep_inode(&inode, &request->rq_pill, dchild->d_sb, NULL);
+       err = ll_new_node_finish(dir, dchild, encrypt, mode, tgt,
+                                &inode, op_data, request);
        if (err)
                GOTO(err_exit, err);
 
-       /* must be done before d_instantiate, because it calls
-        * security_d_instantiate, which means a getxattr if security
-        * context is not set yet
-        */
-       err = ll_inode_notifysecctx(inode,
-                                   op_data->op_file_secctx,
-                                   op_data->op_file_secctx_size);
-       if (err)
-               GOTO(err_exit, err);
-
-       d_instantiate(dchild, inode);
-
-       if (encrypt) {
-               err = ll_set_encflags(inode, op_data->op_file_encctx,
-                                     op_data->op_file_encctx_size, true);
-               if (err)
-                       GOTO(err_exit, err);
-
-               if (S_ISLNK(mode)) {
-                       struct ll_inode_info *lli = ll_i2info(inode);
-
-                       /* Cache the plaintext symlink target
-                        * for later use by get_link()
-                        */
-                       OBD_ALLOC(lli->lli_symlink_name, strlen(tgt) + 1);
-                       /* do not return an error if we cannot
-                        * cache the symlink locally
-                        */
-                       if (lli->lli_symlink_name)
-                               memcpy(lli->lli_symlink_name,
-                                      tgt, strlen(tgt) + 1);
-               }
-       }
-
-       if (!test_bit(LL_SBI_FILE_SECCTX, sbi->ll_flags)) {
-               err = ll_inode_init_security(dchild, inode, dir);
-               if (err)
-                       GOTO(err_exit, err);
-       }
-
        EXIT;
 err_exit:
        if (request != NULL)
@@ -1977,8 +2023,18 @@ clear:
 static int ll_mkdir(struct mnt_idmap *map, struct inode *dir,
                    struct dentry *dchild, umode_t mode)
 {
+       struct lookup_intent mkdir_it = { .it_op = IT_CREAT };
+       struct ll_sb_info *sbi = ll_i2sbi(dir);
+       struct ptlrpc_request *request = NULL;
+       struct md_op_data *op_data;
+       struct inode *inode = NULL;
+       struct lmv_user_md *lum = NULL;
+       bool encrypt = false;
+       void *data = NULL;
+       size_t datalen = 0;
        ktime_t kstart = ktime_get();
-       int err;
+       int rc;
+
        ENTRY;
 
        /* VFS has locked the inode before calling this */
@@ -1990,16 +2046,60 @@ static int ll_mkdir(struct mnt_idmap *map, struct inode *dir,
        if (!IS_POSIXACL(dir) || !exp_connect_umask(ll_i2mdexp(dir)))
                mode &= ~current_umask();
 
-       mode = (mode & (S_IRWXUGO|S_ISVTX)) | S_IFDIR;
+       mode = (mode & (S_IRWXUGO | S_ISVTX)) | S_IFDIR;
+       if (!sbi->ll_intent_mkdir_enabled) {
+               rc = ll_new_node(dir, dchild, NULL, mode, 0, LUSTRE_OPC_MKDIR);
+               GOTO(out_tally, rc);
+       }
+
+       mkdir_it.it_create_mode = mode;
+       rc = ll_new_node_prepare(dir, dchild, mode, LUSTRE_OPC_MKDIR, &encrypt,
+                                NULL, &op_data, &lum, &data, &datalen, NULL);
+       if (rc)
+               GOTO(out_tally, rc);
+
+       op_data->op_data = data;
+       op_data->op_data_size = datalen;
+       rc = md_intent_lock(sbi->ll_md_exp, op_data, &mkdir_it,
+                           &request, &ll_md_blocking_ast, 0);
+       if (rc)
+               GOTO(out_fini, rc);
+
+       /* dir layout may change */
+       ll_unlock_md_op_lsm(op_data);
+
+       rc = ll_new_node_finish(dir, dchild, encrypt, mode, NULL,
+                               &inode, op_data, request);
+       if (rc)
+               GOTO(out_fini, rc);
+
+       if (mkdir_it.it_lock_mode) {
+               __u64 bits = 0;
 
-       err = ll_new_node(dir, dchild, NULL, mode, 0, LUSTRE_OPC_MKDIR);
-       if (err == 0)
-               ll_stats_ops_tally(ll_i2sbi(dir), LPROC_LL_MKDIR,
+               LASSERT(it_disposition(&mkdir_it, DISP_LOOKUP_NEG));
+               ll_set_lock_data(sbi->ll_md_exp, inode, &mkdir_it, &bits);
+               if (bits & MDS_INODELOCK_LOOKUP) {
+                       if (!ll_d_setup(dchild, false))
+                               GOTO(out_fini, rc = -ENOMEM);
+                       d_lustre_revalidate(dchild);
+               }
+       }
+
+out_fini:
+       ll_finish_md_op_data(op_data);
+       ll_intent_release(&mkdir_it);
+       ptlrpc_req_finished(request);
+       if (lum)
+               OBD_FREE_PTR(lum);
+
+out_tally:
+       if (rc == 0)
+               ll_stats_ops_tally(sbi, LPROC_LL_MKDIR,
                                   ktime_us_delta(ktime_get(), kstart));
 
        ll_clear_inode_lock_owner(dir);
 
-       RETURN(err);
+       RETURN(rc);
 }
 
 static int ll_rmdir(struct inode *dir, struct dentry *dchild)
index 41993b8..e588b72 100644 (file)
@@ -353,7 +353,7 @@ retry:
                LASSERT(fid_is_zero(&op_data->op_fid2));
                LASSERT(op_data->op_name != NULL);
 
-               tgt = lmv_locate_tgt(lmv, op_data);
+               tgt = lmv_locate_tgt_create(obd, lmv, op_data);
                if (IS_ERR(tgt))
                        RETURN(PTR_ERR(tgt));
        }
@@ -564,7 +564,7 @@ int lmv_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
        if (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_LAYOUT | IT_GETXATTR))
                rc = lmv_intent_lookup(exp, op_data, it, reqp, cb_blocking,
                                       extra_lock_flags);
-       else if (it->it_op & IT_OPEN)
+       else if (it->it_op & (IT_OPEN | IT_CREAT))
                rc = lmv_intent_open(exp, op_data, it, reqp, cb_blocking,
                                     extra_lock_flags);
        else
index 41b9b12..d201cdf 100644 (file)
@@ -209,6 +209,9 @@ static inline bool lmv_dir_retry_check_update(struct md_op_data *op_data)
        return false;
 }
 
+struct lmv_tgt_desc *lmv_locate_tgt_create(struct obd_device *obd,
+                                          struct lmv_obd *lmv,
+                                          struct md_op_data *op_data);
 struct lmv_tgt_desc *lmv_locate_tgt(struct lmv_obd *lmv,
                                    struct md_op_data *op_data);
 int lmv_old_layout_lookup(struct lmv_obd *lmv, struct md_op_data *op_data);
index a8aa5ca..3de626e 100644 (file)
@@ -2162,40 +2162,17 @@ static bool lmv_qos_exclude(struct lmv_obd *lmv, struct md_op_data *op_data)
        return prefix != NULL;
 }
 
-static int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
-                     const void *data, size_t datalen, umode_t mode, uid_t uid,
-                     gid_t gid, kernel_cap_t cap_effective, __u64 rdev,
-                     struct ptlrpc_request **request)
+struct lmv_tgt_desc *lmv_locate_tgt_create(struct obd_device *obd,
+                                          struct lmv_obd *lmv,
+                                          struct md_op_data *op_data)
 {
-       struct obd_device *obd = exp->exp_obd;
-       struct lmv_obd *lmv = &obd->u.lmv;
        struct lmv_tgt_desc *tgt;
-       struct mdt_body *repbody;
-       int rc;
 
        ENTRY;
 
-       if (!lmv->lmv_mdt_descs.ltd_lmv_desc.ld_active_tgt_count)
-               RETURN(-EIO);
-
-       if (lmv_dir_bad_hash(op_data->op_lso1))
-               RETURN(-EBADF);
-
-       if (lmv_dir_layout_changing(op_data->op_lso1)) {
-               /*
-                * if parent is migrating, create() needs to lookup existing
-                * name in both old and new layout, check old layout on client.
-                */
-               rc = lmv_old_layout_lookup(lmv, op_data);
-               if (rc != -ENOENT)
-                       RETURN(rc);
-
-               op_data->op_new_layout = true;
-       }
-
        tgt = lmv_locate_tgt(lmv, op_data);
        if (IS_ERR(tgt))
-               RETURN(PTR_ERR(tgt));
+               RETURN(tgt);
 
        /* the order to apply policy in mkdir:
         * 1. is "lfs mkdir -i N"? mkdir on MDT N.
@@ -2212,20 +2189,20 @@ static int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
                op_data->op_mds = le32_to_cpu(lum->lum_stripe_offset);
                tgt = lmv_tgt(lmv, op_data->op_mds);
                if (!tgt)
-                       RETURN(-ENODEV);
+                       RETURN(ERR_PTR(-ENODEV));
                if (unlikely(tgt->ltd_statfs.os_state & OS_STATFS_NOCREATE))
                        GOTO(new_tgt, -EAGAIN);
        } else if (lmv_op_user_qos_mkdir(op_data)) {
                tgt = lmv_locate_tgt_by_space(lmv, op_data, tgt);
                if (IS_ERR(tgt))
-                       RETURN(PTR_ERR(tgt));
+                       RETURN(tgt);
        } else if (lmv_op_default_specific_mkdir(op_data)) {
                struct lmv_stripe_md *lsm = &op_data->op_default_lso1->lso_lsm;
 
                op_data->op_mds = lsm->lsm_md_master_mdt_index;
                tgt = lmv_tgt(lmv, op_data->op_mds);
                if (!tgt)
-                       RETURN(-ENODEV);
+                       RETURN(ERR_PTR(-ENODEV));
                if (unlikely(tgt->ltd_statfs.os_state & OS_STATFS_NOCREATE))
                        GOTO(new_tgt, -EAGAIN);
        } else if ((lmv_op_default_qos_mkdir(op_data) &&
@@ -2234,9 +2211,47 @@ static int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
 new_tgt:
                tgt = lmv_locate_tgt_by_space(lmv, op_data, tgt);
                if (IS_ERR(tgt))
-                       RETURN(PTR_ERR(tgt));
+                       RETURN(tgt);
        }
 
+       RETURN(tgt);
+}
+
+static int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
+                     const void *data, size_t datalen, umode_t mode, uid_t uid,
+                     gid_t gid, kernel_cap_t cap_effective, __u64 rdev,
+                     struct ptlrpc_request **request)
+{
+       struct obd_device *obd = exp->exp_obd;
+       struct lmv_obd *lmv = &obd->u.lmv;
+       struct lmv_tgt_desc *tgt;
+       struct mdt_body *repbody;
+       int rc;
+
+       ENTRY;
+
+       if (!lmv->lmv_mdt_descs.ltd_lmv_desc.ld_active_tgt_count)
+               RETURN(-EIO);
+
+       if (lmv_dir_bad_hash(op_data->op_lso1))
+               RETURN(-EBADF);
+
+       if (lmv_dir_layout_changing(op_data->op_lso1)) {
+               /*
+                * if parent is migrating, create() needs to lookup existing
+                * name in both old and new layout, check old layout on client.
+                */
+               rc = lmv_old_layout_lookup(lmv, op_data);
+               if (rc != -ENOENT)
+                       RETURN(rc);
+
+               op_data->op_new_layout = true;
+       }
+
+       tgt = lmv_locate_tgt_create(obd, lmv, op_data);
+       if (IS_ERR(tgt))
+               RETURN(PTR_ERR(tgt));
+
 retry:
        rc = lmv_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
        if (rc)
index 09b8f6f..e5b3556 100644 (file)
@@ -439,6 +439,90 @@ err_free_rq:
        return ERR_PTR(rc);
 }
 
+static struct ptlrpc_request *
+mdc_intent_create_pack(struct obd_export *exp, struct lookup_intent *it,
+                      struct md_op_data *op_data, __u32 acl_bufsize,
+                      __u64 extra_lock_flags)
+{
+       LIST_HEAD(cancels);
+       struct ptlrpc_request *req;
+       struct obd_device *obd = class_exp2obd(exp);
+       struct sptlrpc_sepol *sepol;
+       struct ldlm_intent *lit;
+       int count = 0;
+       int rc;
+
+       ENTRY;
+
+       if (fid_is_sane(&op_data->op_fid1))
+               /* cancel parent's UPDATE lock. */
+               count = mdc_resource_get_unused(exp, &op_data->op_fid1,
+                                               &cancels, LCK_EX,
+                                               MDS_INODELOCK_UPDATE);
+
+       req = ptlrpc_request_alloc(class_exp2cliimp(exp),
+                                  &RQF_LDLM_INTENT_CREATE);
+       if (req == NULL) {
+               ldlm_lock_list_put(&cancels, l_bl_ast, count);
+               RETURN(ERR_PTR(-ENOMEM));
+       }
+
+       req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
+                            op_data->op_namelen + 1);
+       req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
+                            RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
+                            strlen(op_data->op_file_secctx_name) + 1 : 0);
+       req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
+                            op_data->op_file_secctx_size);
+       req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
+                            op_data->op_data_size);
+       req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX, RCL_CLIENT,
+                            op_data->op_file_encctx_size);
+
+       /* get SELinux policy info if any */
+       sepol = sptlrpc_sepol_get(req);
+       if (IS_ERR(sepol)) {
+               ldlm_lock_list_put(&cancels, l_bl_ast, count);
+               GOTO(err_free_rq, rc = PTR_ERR(sepol));
+       }
+       req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
+                            sptlrpc_sepol_size(sepol));
+
+       rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
+       if (rc < 0)
+               GOTO(err_put_sepol, rc);
+
+       /* Pack the intent */
+       lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
+       lit->opc = (__u64)it->it_op;
+
+       /* Pack the intent request. */
+       mdc_create_pack(&req->rq_pill, op_data, op_data->op_data,
+                       op_data->op_data_size, it->it_create_mode,
+                       op_data->op_fsuid, op_data->op_fsgid,
+                       op_data->op_cap, 0, sepol);
+
+       sptlrpc_sepol_put(sepol);
+
+       req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
+                            obd->u.cli.cl_default_mds_easize);
+       req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
+       req_capsule_set_size(&req->rq_pill, &RMF_DEFAULT_MDT_MD, RCL_SERVER,
+                            sizeof(struct lmv_user_md));
+       req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
+                            RCL_SERVER, 0);
+       req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX, RCL_SERVER, 0);
+
+       ptlrpc_request_set_replen(req);
+       RETURN(req);
+
+err_put_sepol:
+       sptlrpc_sepol_put(sepol);
+err_free_rq:
+       ptlrpc_request_free(req);
+       return ERR_PTR(rc);
+}
+
 #define GA_DEFAULT_EA_NAME_LEN  20
 #define GA_DEFAULT_EA_VAL_LEN  250
 #define GA_DEFAULT_EA_NUM       10
@@ -949,7 +1033,7 @@ static int mdc_enqueue_base(struct obd_export *exp,
                LASSERT(policy == NULL);
 
                saved_flags |= LDLM_FL_HAS_INTENT;
-               if (it->it_op & (IT_GETATTR | IT_READDIR))
+               if (it->it_op & (IT_GETATTR | IT_READDIR | IT_CREAT))
                        policy = &update_policy;
                else if (it->it_op & IT_LAYOUT)
                        policy = &layout_policy;
@@ -987,6 +1071,9 @@ resend:
                lvb_type = LVB_T_LAYOUT;
        } else if (it->it_op & IT_GETXATTR) {
                req = mdc_intent_getxattr_pack(exp, it, op_data);
+       } else if (it->it_op == IT_CREAT) {
+               req = mdc_intent_create_pack(exp, it, op_data, acl_bufsize,
+                                            extra_lock_flags);
        } else {
                LBUG();
                RETURN(-EINVAL);
index 7a8e853..059bd4f 100644 (file)
@@ -4493,6 +4493,7 @@ void mdt_thread_info_reset(struct mdt_thread_info *info)
        info->mti_big_lmm_used = 0;
        info->mti_big_acl_used = 0;
        info->mti_som_strict = 0;
+       info->mti_intent_lock = 0;
 
        info->mti_spec.no_create = 0;
        info->mti_spec.sp_rm_entry = 0;
@@ -5105,6 +5106,7 @@ static int mdt_intent_opc(enum ldlm_intent_flags it_opc,
 
        switch (it_opc) {
        case IT_OPEN:
+       case IT_CREAT:
        case IT_OPEN|IT_CREAT:
                /*
                 * OCREAT is not a IS_MUTABLE request since the file may
@@ -5252,6 +5254,13 @@ static int mdt_intent_policy(const struct lu_env *env,
                it = req_capsule_client_get(pill, &RMF_LDLM_INTENT);
                if (it != NULL) {
                        mdt_ptlrpc_stats_update(req, it->opc);
+                       info->mti_intent_lock = 1;
+                       /*
+                        * For intent lock request with policy, the ELC locks
+                        * have been cancelled in ldlm_handle_enqueue0().
+                        * Thus set @mti_dlm_req with null here.
+                        */
+                       info->mti_dlm_req = NULL;
                        rc = mdt_intent_opc(it->opc, info, lockp, flags);
                        if (rc == 0)
                                rc = ELDLM_OK;
index 7aecce4..4657f52 100644 (file)
@@ -527,7 +527,8 @@ struct mdt_thread_info {
                                   mti_big_acl_used:1,
                                   mti_som_strict:1,
        /* Batch processing environment */
-                                  mti_batch_env:1;
+                                  mti_batch_env:1,
+                                  mti_intent_lock:1;
 
        /* opdata for mdt_reint_open(), has the same as
         * ldlm_reply:lock_policy_res1.  mdt_update_last_rcvd() stores this
index 9925214..3e33d8b 100644 (file)
@@ -1279,15 +1279,16 @@ static int mdt_close_handle_unpack(struct mdt_thread_info *info)
 }
 
 static inline int mdt_dlmreq_unpack(struct mdt_thread_info *info) {
-        struct req_capsule      *pill = info->mti_pill;
+       struct req_capsule      *pill = info->mti_pill;
 
-        if (req_capsule_get_size(pill, &RMF_DLM_REQ, RCL_CLIENT)) {
-                info->mti_dlm_req = req_capsule_client_get(pill, &RMF_DLM_REQ);
-                if (info->mti_dlm_req == NULL)
-                        RETURN(-EFAULT);
-        }
+       if (!info->mti_intent_lock &&
+           req_capsule_get_size(pill, &RMF_DLM_REQ, RCL_CLIENT)) {
+               info->mti_dlm_req = req_capsule_client_get(pill, &RMF_DLM_REQ);
+               if (info->mti_dlm_req == NULL)
+                       RETURN(-EFAULT);
+       }
 
-        RETURN(0);
+       RETURN(0);
 }
 
 static int mdt_setattr_unpack(struct mdt_thread_info *info)
@@ -1425,22 +1426,25 @@ static int mdt_create_unpack(struct mdt_thread_info *info)
                if (tgt == NULL)
                        RETURN(-EFAULT);
        } else {
-               req_capsule_extend(pill, &RQF_MDS_REINT_CREATE_ACL);
-               if (S_ISDIR(attr->la_mode)) {
-                       struct obd_export *exp = mdt_info_req(info)->rq_export;
-
-                       sp->sp_dmv_imp_inherit =
-                               info->mti_mdt->mdt_enable_dmv_implicit_inherit;
-                       if (req_capsule_get_size(pill, &RMF_EADATA, RCL_CLIENT)
-                           > 0) {
+               if (!info->mti_intent_lock)
+                       req_capsule_extend(pill, &RQF_MDS_REINT_CREATE_ACL);
+               rr->rr_eadatalen = req_capsule_get_size(pill, &RMF_EADATA,
+                                                       RCL_CLIENT);
+               if (rr->rr_eadatalen > 0) {
+                       sp->no_create = !!req_is_replay(mdt_info_req(info));
+                       if (S_ISDIR(attr->la_mode)) {
                                sp->u.sp_ea.eadata =
                                        req_capsule_client_get(pill,
                                                               &RMF_EADATA);
-                               sp->u.sp_ea.eadatalen =
-                                       req_capsule_get_size(pill, &RMF_EADATA,
-                                                            RCL_CLIENT);
+                               sp->u.sp_ea.eadatalen = rr->rr_eadatalen;
                                sp->sp_cr_flags |= MDS_OPEN_HAS_EA;
                        }
+               }
+               if (S_ISDIR(attr->la_mode)) {
+                       struct obd_export *exp = mdt_info_req(info)->rq_export;
+
+                       sp->sp_dmv_imp_inherit =
+                               info->mti_mdt->mdt_enable_dmv_implicit_inherit;
                        if (OCD_HAS_FLAG2(&exp->exp_connect_data,
                                          DMV_IMP_INHERIT)) {
                                if ((sp->sp_cr_flags & MDS_OPEN_DEFAULT_LMV) &&
index f8be62f..607fa3a 100644 (file)
@@ -191,6 +191,8 @@ static void mdt_reconstruct_create(struct mdt_thread_info *mti,
        struct mdt_body *body;
        int rc;
 
+       ENTRY;
+
        mdt_req_from_lrd(req, mti->mti_reply_data);
        if (req->rq_status)
                return;
@@ -232,6 +234,8 @@ static void mdt_reconstruct_create(struct mdt_thread_info *mti,
        }
        mdt_pack_attr2body(mti, body, &ma->ma_attr, mdt_object_fid(child));
        mdt_object_put(mti->mti_env, child);
+
+       RETURN_EXIT;
 }
 
 static void mdt_reconstruct_setattr(struct mdt_thread_info *mti,
index 6b4e66a..620fecf 100644 (file)
@@ -486,7 +486,7 @@ unlock_parent:
  * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity
  * check.
  */
-static int mdt_create(struct mdt_thread_info *info)
+static int mdt_create(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
 {
        struct mdt_device *mdt = info->mti_mdt;
        struct mdt_object *parent;
@@ -497,6 +497,7 @@ static int mdt_create(struct mdt_thread_info *info)
        struct mdt_reint_record *rr = &info->mti_rr;
        struct md_op_spec *spec = &info->mti_spec;
        struct lu_ucred *uc = mdt_ucred(info);
+       struct ldlm_reply *dlmrep = NULL;
        bool restripe = false;
        bool recreate_obj = false;
        int rc;
@@ -558,6 +559,15 @@ static int mdt_create(struct mdt_thread_info *info)
        }
 
        repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+       /*
+        * TODO: rewrite ll_mknod(), ll_create_nd(), ll_symlink(),
+        * ll_dir_setdirstripe() to all use intent lock.
+        */
+       if (info->mti_intent_lock) {
+               dlmrep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
+               mdt_set_disposition(info, dlmrep,
+                                   DISP_IT_EXECD | DISP_LOOKUP_EXECD);
+       }
 
        parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
        if (IS_ERR(parent))
@@ -649,6 +659,9 @@ static int mdt_create(struct mdt_thread_info *info)
        if (unlikely(rc == 0 && !recreate_obj))
                GOTO(unlock_parent, rc = -EEXIST);
 
+       if (info->mti_intent_lock)
+               mdt_set_disposition(info, dlmrep, DISP_OPEN_CREATE);
+
        child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
        if (unlikely(IS_ERR(child)))
                GOTO(unlock_parent, rc = PTR_ERR(child));
@@ -689,24 +702,44 @@ static int mdt_create(struct mdt_thread_info *info)
        if (rc < 0)
                GOTO(put_child, rc);
 
-       if (S_ISDIR(ma->ma_attr.la_mode) &&
-           (info->mti_spec.sp_cr_flags & MDS_MKDIR_LMV))
+       if ((S_ISDIR(ma->ma_attr.la_mode) &&
+            (info->mti_spec.sp_cr_flags & MDS_MKDIR_LMV)) ||
+            info->mti_intent_lock)
                mdt_prep_ma_buf_from_rep(info, child, ma, 0);
 
        rc = mdt_attr_get_complex(info, child, ma);
        if (rc < 0)
                GOTO(put_child, rc);
 
+       if (ma->ma_valid & MA_LOV) {
+               LASSERT(ma->ma_lmm_size != 0);
+               repbody->mbo_eadatasize = ma->ma_lmm_size;
+               if (S_ISREG(ma->ma_attr.la_mode))
+                       repbody->mbo_valid |= OBD_MD_FLEASIZE;
+               else if (S_ISDIR(ma->ma_attr.la_mode))
+                       repbody->mbo_valid |= OBD_MD_FLDIREA;
+       }
+
        if (ma->ma_valid & MA_LMV) {
                mdt_dump_lmv(D_INFO, ma->ma_lmv);
                repbody->mbo_eadatasize = ma->ma_lmv_size;
                repbody->mbo_valid |= (OBD_MD_FLDIREA|OBD_MD_MEA);
        }
 
+       if (ma->ma_valid & MA_LMV_DEF) {
+               /* Return -EOPNOTSUPP for old client. */
+               if (!mdt_is_striped_client(mdt_info_req(info)->rq_export))
+                       GOTO(put_child, rc = -EOPNOTSUPP);
+
+               LASSERT(S_ISDIR(ma->ma_attr.la_mode));
+               repbody->mbo_valid |= OBD_MD_FLDIREA | OBD_MD_DEFAULT_MEA;
+       }
+
        /* save child locks to eliminate dependey between 'mkdir a' and
         * 'mkdir a/b' if b is a remote directory
         */
-       if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) {
+       if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode) &&
+           !info->mti_intent_lock) {
                struct mdt_lock_handle *lhc;
                struct ldlm_enqueue_info *einfo = &info->mti_einfo;
 
@@ -723,11 +756,46 @@ static int mdt_create(struct mdt_thread_info *info)
        if (ma->ma_valid & MA_INODE)
                mdt_pack_attr2body(info, repbody, &ma->ma_attr,
                                   mdt_object_fid(child));
+
+       if (info->mti_intent_lock) {
+               mdt_set_disposition(info, dlmrep, DISP_LOOKUP_NEG);
+               rc = mdt_check_resent_lock(info, child, lhc);
+               /*
+                * rc < 0 is error and we fall right back through,
+                * rc == 0 is the open lock might already be gotten in
+                * ldlm_handle_enqueue due to this being a resend.
+                */
+               if (rc <= 0)
+                       GOTO(put_child, rc);
+
+               /*
+                * For the normal intent create (mkdir):
+                * - Grant LOOKUP lock with CR mode to the client at
+                *   least.
+                * - Grant the lock similar to getattr():
+                *   lock mode: PR;
+                *   inodebits: LOOK | UPDATE | PERM [| LAYOUT].
+                * However, it can not grant LCK_CR to the client as during
+                * the setting of LMV layout for a directory from a client,
+                * it will acquire LCK_PW mode lock which is compat with LCK_CR
+                * lock mode, this may result that the cached LMV layout on a
+                * client will not be released when set (default) LMV layout on
+                * a directory.
+                * Due to the above reason, it grants a lock with LCK_PR mode to
+                * the client.
+                */
+               rc = mdt_object_lock(info, child, lhc, MDS_INODELOCK_LOOKUP |
+                                    MDS_INODELOCK_UPDATE | MDS_INODELOCK_PERM,
+                                    LCK_PR);
+       }
+
        EXIT;
 put_child:
        mdt_object_put(info->mti_env, child);
 unlock_parent:
        mdt_object_unlock(info, parent, lh, rc);
+       if (rc && dlmrep)
+               mdt_clear_disposition(info, dlmrep, DISP_OPEN_CREATE);
 put_parent:
        mdt_object_put(info->mti_env, parent);
        return rc;
@@ -1089,7 +1157,7 @@ static int mdt_reint_create(struct mdt_thread_info *info,
                RETURN(err_serious(-EOPNOTSUPP));
        }
 
-       rc = mdt_create(info);
+       rc = mdt_create(info, lhc);
        if (rc == 0) {
                if ((info->mti_attr.ma_attr.la_mode & S_IFMT) == S_IFDIR)
                        mdt_counter_incr(req, LPROC_MDT_MKDIR,
index 8c9113a..b6a2d5b 100755 (executable)
@@ -5,6 +5,7 @@ set -e
 LUSTRE=${LUSTRE:-$(dirname $0)/..}
 . $LUSTRE/tests/test-framework.sh
 init_test_env "$@"
+init_logging
 
 # bug number for skipped test:
 ALWAYS_EXCEPT="REPLAY_SINGLE_LMV_EXCEPT "
index e0e2092..a9f4200 100755 (executable)
@@ -5064,6 +5064,109 @@ test_136() {
 }
 run_test 136 "MDS to disconnect all OSPs first, then cleanup ldlm"
 
+check_striped_create_137() {
+       local stripe_count
+
+       cancel_lru_locks mdc
+       $CHECKSTAT -t dir $DIR/$tdir/striped_dir/dir0 ||
+               error "Create under striped dir failed"
+       $LFS getdirstripe $DIR/$tdir/striped_dir/dir0
+       stripe_count=$($LFS getdirstripe -c $DIR/$tdir/striped_dir/dir0)
+       [ $stripe_count -eq 0 ] || error "$stripe_count != 0 after recovery"
+
+       $CHECKSTAT -t dir $DIR/$tdir/striped_dir/dir1 ||
+               error "Create under striped dir failed"
+       $LFS getdirstripe $DIR/$tdir/striped_dir/dir1
+       stripe_count=$($LFS getdirstripe -c $DIR/$tdir/striped_dir/dir1)
+       [ $stripe_count -eq 0 ] || error "$stripe_count != 0 after recovery"
+}
+
+test_137a() {
+       (( $MDSCOUNT >= 2 )) || skip "needs >= 2 MDTs"
+       (( $MDS1_VERSION >= $(version_code 2.15.61) )) ||
+               skip "Need MDS >= 2.15.61 for intent mkdir"
+
+       [[ $FAILURE_MODE != "HARD" ]] ||
+               [[ "$(facet_host mds1)" != "$(facet_host mds2)" ]] ||
+               skip "MDTs needs to be on diff hosts for HARD fail mode"
+
+       local save="$TMP/$TESTSUITE-$TESTNAME.parameters"
+
+       save_lustre_params client "llite.*.intent_mkdir" > $save
+       stack_trap "restore_lustre_params < $save; rm -f $save" EXIT
+       $LCTL set_param llite.*.intent_mkdir=1
+
+       mkdir -p $DIR/$tdir
+       $LFS mkdir -i1 -c$MDSCOUNT $DIR/$tdir/striped_dir
+       replay_barrier mds1
+       mkdir $DIR/$tdir/striped_dir/dir0
+       mkdir $DIR/$tdir/striped_dir/dir1
+       fail mds1
+
+       check_striped_create_137 || error "check striped dir0 failed"
+       rm -rf $DIR/$tdir || error "rm -rf $DIR/$tdir failed"
+}
+run_test 137a "DNE: create under striped dir, fail MDT1"
+
+test_137b() {
+       (( $MDSCOUNT >= 2 )) || skip "needs >= 2 MDTs"
+       (( $MDS1_VERSION >= $(version_code 2.15.61) )) ||
+               skip "Need MDS version at least 2.15.61 for intent mkdir"
+
+       [[ $FAILURE_MODE != "HARD" ]] ||
+               [[ "$(facet_host mds1)" != "$(facet_host mds2)" ]] ||
+               skip "MDTs needs to be on diff hosts for HARD fail mode"
+
+       local save="$TMP/$TESTSUITE-$TESTNAME.parameters"
+
+       save_lustre_params client "llite.*.intent_mkdir" > $save
+       stack_trap "restore_lustre_params < $save; rm -f $save" EXIT
+       $LCTL set_param llite.*.intent_mkdir=1
+
+       mkdir -p $DIR/$tdir
+       $LFS mkdir -i1 -c$MDSCOUNT $DIR/$tdir/striped_dir
+       replay_barrier mds2
+       mkdir $DIR/$tdir/striped_dir/dir0
+       mkdir $DIR/$tdir/striped_dir/dir1
+       fail mds2
+
+       check_striped_create_137 ||
+               error "check create under striped_dir failed"
+
+       rm -rf $DIR/$tdir
+}
+run_test 137b "DNE: create under striped dir, fail MDT2"
+
+test_137c() {
+       (( $MDSCOUNT >= 2 )) || skip "needs >= 2 MDTs"
+       (( $MDS1_VERSION >= $(version_code 2.15.61) )) ||
+               skip "Need MDS version at least 2.15.61 for intent mkdir"
+
+       [[ $FAILURE_MODE != "HARD" ]] ||
+               [[ "$(facet_host mds1)" != "$(facet_host mds2)" ]] ||
+               skip "MDTs needs to be on diff hosts for HARD fail mode"
+
+       local save="$TMP/$TESTSUITE-$TESTNAME.parameters"
+
+       save_lustre_params client "llite.*.intent_mkdir" > $save
+       stack_trap "restore_lustre_params < $save; rm -f $save" EXIT
+       $LCTL set_param llite.*.intent_mkdir=1
+
+       mkdir -p $DIR/$tdir
+       $LFS mkdir -i1 -c$MDSCOUNT $DIR/$tdir/striped_dir
+       replay_barrier mds1
+       replay_barrier mds2
+       mkdir $DIR/$tdir/striped_dir/dir0
+       mkdir $DIR/$tdir/striped_dir/dir1
+       fail mds2,mds1
+
+       check_striped_create_137 ||
+               error "check create under striped_dir failed"
+
+       rm -rf $DIR/$tdir
+}
+run_test 137c "DNE: create under striped dir, fail MDT1/MDT2"
+
 test_200() {
        [[ -z $RCLIENTS ]] && skip "Need remote client"
 
index a8448a0..40db631 100755 (executable)
@@ -239,7 +239,7 @@ lpcc_rw_test() {
 
        do_facet $SINGLEAGT $LFS mkdir -i0 -c1 $DIR/$tdir
        setup_pcc_mapping
-       $project && lfs project -sp $project_id $DIR/$tdir
+       $project && lfs project -sp $project_id $DIR2/$tdir
 
        do_facet $SINGLEAGT "echo -n attach_origin > $file"
        if ! $project; then
index 8868245..039a6e8 100755 (executable)
@@ -32140,6 +32140,32 @@ test_851() {
 }
 run_test 851 "fanotify can monitor open/read/write/close events for lustre fs"
 
+test_852() {
+       (( $MDSCOUNT >= 2 )) || skip "needs >= 2 MDTs"
+       (( $MDS1_VERSION >= $(version_code 2.15.61) )) ||
+               skip "Need MDS version at least 2.15.61 for intent mkdir"
+
+       local save="$TMP/$TESTSUITE-$TESTNAME.parameters"
+
+       save_lustre_params client "llite.*.intent_mkdir" > $save
+       stack_trap "restore_lustre_params < $save; rm -f $save" EXIT
+       $LCTL set_param llite.*.intent_mkdir=1
+
+       test_mkdir -p -c$MDSCOUNT $DIR/$tdir
+       if [ $MDSCOUNT -ge 2 ]; then
+               $LFS setdirstripe -D -c$MDSCOUNT $DIR/$tdir ||
+                       error "set default dirstripe failed"
+       fi
+
+       mkdir $DIR/$tdir/tdir || error "mkdir tdir failed"
+       mkdir $DIR/$tdir/tdir/tfile || error "mkdir tdir/tfile failed"
+       touch -d "2020-08-25 15:08" $DIR/$tdir/tdir/tfile ||
+               error "touch time failed"
+       chown 0:0 $DIR/$tdir/tdir/tfile || error "chown 0:0 tdir/tfile failed"
+       chmod 755 $DIR/$tdir/tdir/tfile || error "chmod 755 tdir/tfile failed"
+}
+run_test 852 "mkdir using intent lock for striped directory"
+
 #
 # tests that do cleanup/setup should be run at the end
 #
index 77b45ff..4c76231 100755 (executable)
@@ -6705,6 +6705,38 @@ test_115() {
 }
 run_test 115 "ldiskfs doesn't check direntry for uniqueness"
 
+test_116() {
+       (( $MDSCOUNT >= 2 )) || skip "needs >= 2 MDTs"
+       (( $MDS1_VERSION >= $(version_code 2.15.61) )) ||
+               skip "Need MDS version at least 2.15.61 for intent mkdir"
+
+       local mdt_idx
+       local save="$TMP/$TESTSUITE-$TESTNAME.parameters"
+
+       save_lustre_params client "llite.*.intent_mkdir" > $save
+       stack_trap "restore_lustre_params < $save; rm -f $save" EXIT
+       $LCTL set_param llite.*.intent_mkdir=1
+
+       $LFS mkdir -c$MDSCOUNT -i0 $DIR/$tdir ||
+               error "$LFS mkdir $DIR/$tdir failed"
+       echo "MD layout $DIR/$tdir:"
+       $LFS getdirstripe $DIR/$tdir
+       echo "mkdir $DIR/$tdir/tdir0"
+       mkdir $DIR/$tdir/tdir0 || error "mkdir tdir0 failed"
+       echo "setdirstripe -D -i1 $DIR2/$tdir/tdir0"
+       $LFS setdirstripe -D -i1 $DIR2/$tdir/tdir0 ||
+               error "$LFS setdirstripe $DIR2/$tdir/tdir0 failed"
+       echo "mkdir $DIR/$tdir/tdir0/tdir11"
+       mkdir $DIR/$tdir/tdir0/tdir11 || error "mkdir tdir0/tdir11 failed"
+       $LFS getdirstripe $DIR/$tdir/tdir0
+       $LFS getdirstripe $DIR/$tdir/tdir0/tdir11
+
+       mdt_idx=$($LFS getstripe -m $DIR/$tdir/tdir0/tdir11)
+       [ $mdt_idx == 1 ] ||
+               error "$DIR/$tdir/tdir0/tdir11 on wrong MDT $mdt_idx"
+}
+run_test 116 "DNE: Set default LMV layout from a remote client"
+
 log "cleanup: ======================================================"
 
 # kill and wait in each test only guarentee script finish, but command in script