* 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity
* check.
*/
-static int mdt_create(struct mdt_thread_info *info)
+static int mdt_create(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
{
struct mdt_device *mdt = info->mti_mdt;
struct mdt_object *parent;
struct mdt_reint_record *rr = &info->mti_rr;
struct md_op_spec *spec = &info->mti_spec;
struct lu_ucred *uc = mdt_ucred(info);
+ struct ldlm_reply *dlmrep = NULL;
bool restripe = false;
bool recreate_obj = false;
int rc;
}
repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+ /*
+ * TODO: rewrite ll_mknod(), ll_create_nd(), ll_symlink(),
+ * ll_dir_setdirstripe() to all use intent lock.
+ */
+ if (info->mti_intent_lock) {
+ dlmrep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
+ mdt_set_disposition(info, dlmrep,
+ DISP_IT_EXECD | DISP_LOOKUP_EXECD);
+ }
parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
if (IS_ERR(parent))
if (unlikely(rc == 0 && !recreate_obj))
GOTO(unlock_parent, rc = -EEXIST);
+ if (info->mti_intent_lock)
+ mdt_set_disposition(info, dlmrep, DISP_OPEN_CREATE);
+
child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
if (unlikely(IS_ERR(child)))
GOTO(unlock_parent, rc = PTR_ERR(child));
if (rc < 0)
GOTO(put_child, rc);
- if (S_ISDIR(ma->ma_attr.la_mode) &&
- (info->mti_spec.sp_cr_flags & MDS_MKDIR_LMV))
+ if ((S_ISDIR(ma->ma_attr.la_mode) &&
+ (info->mti_spec.sp_cr_flags & MDS_MKDIR_LMV)) ||
+ info->mti_intent_lock)
mdt_prep_ma_buf_from_rep(info, child, ma, 0);
rc = mdt_attr_get_complex(info, child, ma);
if (rc < 0)
GOTO(put_child, rc);
+ if (ma->ma_valid & MA_LOV) {
+ LASSERT(ma->ma_lmm_size != 0);
+ repbody->mbo_eadatasize = ma->ma_lmm_size;
+ if (S_ISREG(ma->ma_attr.la_mode))
+ repbody->mbo_valid |= OBD_MD_FLEASIZE;
+ else if (S_ISDIR(ma->ma_attr.la_mode))
+ repbody->mbo_valid |= OBD_MD_FLDIREA;
+ }
+
if (ma->ma_valid & MA_LMV) {
mdt_dump_lmv(D_INFO, ma->ma_lmv);
repbody->mbo_eadatasize = ma->ma_lmv_size;
repbody->mbo_valid |= (OBD_MD_FLDIREA|OBD_MD_MEA);
}
+ if (ma->ma_valid & MA_LMV_DEF) {
+ /* Return -EOPNOTSUPP for old client. */
+ if (!mdt_is_striped_client(mdt_info_req(info)->rq_export))
+ GOTO(put_child, rc = -EOPNOTSUPP);
+
+ LASSERT(S_ISDIR(ma->ma_attr.la_mode));
+ repbody->mbo_valid |= OBD_MD_FLDIREA | OBD_MD_DEFAULT_MEA;
+ }
+
/* save child locks to eliminate dependey between 'mkdir a' and
* 'mkdir a/b' if b is a remote directory
*/
- if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) {
+ if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode) &&
+ !info->mti_intent_lock) {
struct mdt_lock_handle *lhc;
struct ldlm_enqueue_info *einfo = &info->mti_einfo;
if (ma->ma_valid & MA_INODE)
mdt_pack_attr2body(info, repbody, &ma->ma_attr,
mdt_object_fid(child));
+
+ if (info->mti_intent_lock) {
+ mdt_set_disposition(info, dlmrep, DISP_LOOKUP_NEG);
+ rc = mdt_check_resent_lock(info, child, lhc);
+ /*
+ * rc < 0 is error and we fall right back through,
+ * rc == 0 is the open lock might already be gotten in
+ * ldlm_handle_enqueue due to this being a resend.
+ */
+ if (rc <= 0)
+ GOTO(put_child, rc);
+
+ /*
+ * For the normal intent create (mkdir):
+ * - Grant LOOKUP lock with CR mode to the client at
+ * least.
+ * - Grant the lock similar to getattr():
+ * lock mode: PR;
+ * inodebits: LOOK | UPDATE | PERM [| LAYOUT].
+ * However, it can not grant LCK_CR to the client as during
+ * the setting of LMV layout for a directory from a client,
+ * it will acquire LCK_PW mode lock which is compat with LCK_CR
+ * lock mode, this may result that the cached LMV layout on a
+ * client will not be released when set (default) LMV layout on
+ * a directory.
+ * Due to the above reason, it grants a lock with LCK_PR mode to
+ * the client.
+ */
+ rc = mdt_object_lock(info, child, lhc, MDS_INODELOCK_LOOKUP |
+ MDS_INODELOCK_UPDATE | MDS_INODELOCK_PERM,
+ LCK_PR);
+ }
+
EXIT;
put_child:
mdt_object_put(info->mti_env, child);
unlock_parent:
mdt_object_unlock(info, parent, lh, rc);
+ if (rc && dlmrep)
+ mdt_clear_disposition(info, dlmrep, DISP_OPEN_CREATE);
put_parent:
mdt_object_put(info->mti_env, parent);
return rc;
RETURN(err_serious(-EOPNOTSUPP));
}
- rc = mdt_create(info);
+ rc = mdt_create(info, lhc);
if (rc == 0) {
if ((info->mti_attr.ma_attr.la_mode & S_IFMT) == S_IFDIR)
mdt_counter_incr(req, LPROC_MDT_MKDIR,