mdt_object_unlock(info, o, lh, decref);
}
+static int mdt_restripe(struct mdt_thread_info *info,
+ struct mdt_object *pobj,
+ const struct lu_name *lname,
+ const struct lu_fid *tfid,
+ struct md_op_spec *spec,
+ struct md_attr *ma)
+{
+ const struct lu_env *env = info->mti_env;
+ struct mdt_device *mdt = info->mti_mdt;
+ struct lu_fid *cfid = &info->mti_tmp_fid2;
+ struct lmv_user_md *lum = spec->u.sp_ea.eadata;
+ struct md_layout_change *mlc = &info->mti_mlc;
+ struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
+ struct lmv_mds_md_v1 *lmv;
+ struct mdt_object *child;
+ struct mdt_object *tobj = NULL;
+ struct mdt_lock_handle *lhp = NULL;
+ struct mdt_lock_handle *lhc;
+ struct mdt_body *repbody;
+ u32 lmv_stripe_count = 0;
+ int rc;
+
+ ENTRY;
+
+ if (!mdt->mdt_enable_dir_restripe)
+ RETURN(-EPERM);
+
+ /* mti_big_lmm is used to save LMV, but it may be uninitialized. */
+ if (unlikely(!info->mti_big_lmm)) {
+ info->mti_big_lmmsize = lmv_mds_md_size(64, LMV_MAGIC);
+ OBD_ALLOC(info->mti_big_lmm, info->mti_big_lmmsize);
+ if (!info->mti_big_lmm)
+ RETURN(-ENOMEM);
+ }
+
+ rc = mdt_version_get_check_save(info, pobj, 0);
+ if (rc)
+ RETURN(rc);
+
+ ma->ma_lmv = info->mti_big_lmm;
+ ma->ma_lmv_size = info->mti_big_lmmsize;
+ ma->ma_valid = 0;
+ rc = mdt_stripe_get(info, pobj, ma, XATTR_NAME_LMV);
+ if (rc)
+ RETURN(rc);
+
+ if (ma->ma_valid & MA_LMV) {
+ /* don't allow restripe if parent dir layout is changing */
+ lmv = &ma->ma_lmv->lmv_md_v1;
+ if (!lmv_is_sane(lmv))
+ RETURN(-EBADF);
+
+ if (lmv_is_layout_changing(lmv))
+ RETURN(-EBUSY);
+ }
+
+ lhp = &info->mti_lh[MDT_LH_PARENT];
+ mdt_lock_pdo_init(lhp, LCK_PW, lname);
+ rc = mdt_reint_object_lock(info, pobj, lhp, MDS_INODELOCK_UPDATE, true);
+ if (rc)
+ RETURN(rc);
+
+ fid_zero(cfid);
+ rc = mdt_lookup_version_check(info, pobj, lname, cfid, 1);
+ if (rc)
+ GOTO(unlock_parent, rc);
+
+ child = mdt_object_find(info->mti_env, mdt, cfid);
+ if (IS_ERR(child))
+ GOTO(unlock_parent, rc = PTR_ERR(child));
+
+ if (!mdt_object_exists(child))
+ GOTO(out_child, rc = -ENOENT);
+
+ if (mdt_object_remote(child)) {
+ struct mdt_body *repbody;
+
+ repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+ if (!repbody)
+ GOTO(out_child, rc = -EPROTO);
+
+ repbody->mbo_fid1 = *cfid;
+ repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
+ GOTO(out_child, rc = -EREMOTE);
+ }
+
+ /* lock object */
+ lhc = &info->mti_lh[MDT_LH_CHILD];
+ mdt_lock_reg_init(lhc, LCK_EX);
+
+ /* enqueue object remote LOOKUP lock */
+ if (mdt_object_remote(pobj)) {
+ rc = mdt_remote_object_lock(info, pobj, cfid, &lhc->mlh_rreg_lh,
+ lhc->mlh_rreg_mode,
+ MDS_INODELOCK_LOOKUP, false);
+ if (rc != ELDLM_OK)
+ GOTO(out_child, rc);
+ }
+
+ rc = mdt_reint_striped_lock(info, child, lhc, MDS_INODELOCK_FULL, einfo,
+ true);
+ if (rc)
+ GOTO(unlock_child, rc);
+
+ tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
+ rc = mdt_version_get_check_save(info, child, 1);
+ if (rc)
+ GOTO(unlock_child, rc);
+
+ ma->ma_valid = 0;
+ rc = mdt_stripe_get(info, child, ma, XATTR_NAME_LMV);
+ if (rc)
+ GOTO(unlock_child, rc);
+
+ if (ma->ma_valid & MA_LMV) {
+ lmv = &ma->ma_lmv->lmv_md_v1;
+ if (!lmv_is_sane(lmv))
+ GOTO(unlock_child, rc = -EBADF);
+
+ /* don't allow restripe if dir layout is changing */
+ if (lmv_is_layout_changing(lmv))
+ GOTO(unlock_child, rc = -EBUSY);
+
+ /* check whether stripe count and hash unchanged */
+ if (lum->lum_stripe_count == lmv->lmv_stripe_count &&
+ lum->lum_hash_type == lmv->lmv_hash_type)
+ GOTO(unlock_child, rc = -EALREADY);
+
+ lmv_stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
+ } else if (le32_to_cpu(lum->lum_stripe_count) < 2) {
+ /* stripe count unchanged for plain directory */
+ GOTO(unlock_child, rc = -EALREADY);
+ }
+
+ repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+ if (!repbody)
+ GOTO(unlock_child, rc = -EPROTO);
+
+ if (le32_to_cpu(lum->lum_stripe_count) > lmv_stripe_count) {
+ /* split */
+ ma->ma_need = MA_INODE;
+ ma->ma_valid = 0;
+ rc = mdt_attr_get_complex(info, child, ma);
+ if (rc)
+ GOTO(unlock_child, rc);
+
+ if (!(ma->ma_valid & MA_INODE))
+ GOTO(unlock_child, rc = -EBADF);
+
+ if (!lmv_stripe_count) {
+ /* if child is plain directory, allocate @tobj as the
+ * master object, and make child the first stripe of
+ * @tobj.
+ */
+ tobj = mdt_object_new(info->mti_env, mdt, tfid);
+ if (unlikely(IS_ERR(tobj)))
+ GOTO(unlock_child, rc = PTR_ERR(tobj));
+ }
+
+ mlc->mlc_opc = MD_LAYOUT_SPLIT;
+ mlc->mlc_parent = mdt_object_child(pobj);
+ mlc->mlc_target = tobj ? mdt_object_child(tobj) : NULL;
+ mlc->mlc_attr = &ma->ma_attr;
+ mlc->mlc_name = lname;
+ mlc->mlc_spec = spec;
+ rc = mo_layout_change(env, mdt_object_child(child), mlc);
+ if (rc)
+ GOTO(out_tobj, rc);
+ } else {
+ /* merge only needs to override LMV */
+ struct lu_buf *buf = &info->mti_buf;
+ __u32 version;
+
+ LASSERT(ma->ma_valid & MA_LMV);
+ lmv = &ma->ma_lmv->lmv_md_v1;
+ version = cpu_to_le32(lmv->lmv_layout_version);
+
+ /* adjust 0 to 1 */
+ if (lum->lum_stripe_count == 0)
+ lum->lum_stripe_count = cpu_to_le32(1);
+
+ lmv->lmv_hash_type |= cpu_to_le32(LMV_HASH_FLAG_MERGE |
+ LMV_HASH_FLAG_MIGRATION);
+ lmv->lmv_merge_offset = lum->lum_stripe_count;
+ lmv->lmv_merge_hash = lum->lum_hash_type;
+ lmv->lmv_layout_version = cpu_to_le32(++version);
+
+ buf->lb_buf = lmv;
+ buf->lb_len = sizeof(*lmv);
+ rc = mo_xattr_set(env, mdt_object_child(child), buf,
+ XATTR_NAME_LMV, LU_XATTR_REPLACE);
+ if (rc)
+ GOTO(unlock_child, rc);
+ }
+
+ ma->ma_need = MA_INODE;
+ ma->ma_valid = 0;
+ rc = mdt_attr_get_complex(info, tobj ? tobj : child, ma);
+ if (rc)
+ GOTO(out_tobj, rc);
+
+ if (!(ma->ma_valid & MA_INODE))
+ GOTO(out_tobj, rc = -EBADF);
+
+ mdt_pack_attr2body(info, repbody, &ma->ma_attr,
+ mdt_object_fid(tobj ? tobj : child));
+ EXIT;
+
+out_tobj:
+ if (tobj)
+ mdt_object_put(env, tobj);
+unlock_child:
+ mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
+out_child:
+ mdt_object_put(env, child);
+unlock_parent:
+ mdt_object_unlock(info, pobj, lhp, rc);
+
+ return rc;
+}
+
/*
* VBR: we save three versions in reply:
* 0 - parent. Check that parent version is the same during replay.
*/
static int mdt_create(struct mdt_thread_info *info)
{
- struct mdt_device *mdt = info->mti_mdt;
- struct mdt_object *parent;
- struct mdt_object *child;
- struct mdt_lock_handle *lh;
- struct mdt_body *repbody;
- struct md_attr *ma = &info->mti_attr;
+ struct mdt_device *mdt = info->mti_mdt;
+ struct mdt_object *parent;
+ struct mdt_object *child;
+ struct mdt_lock_handle *lh;
+ struct mdt_body *repbody;
+ struct md_attr *ma = &info->mti_attr;
struct mdt_reint_record *rr = &info->mti_rr;
- struct md_op_spec *spec = &info->mti_spec;
+ struct md_op_spec *spec = &info->mti_spec;
+ bool restripe = false;
int rc;
ENTRY;
uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
mdt->mdt_enable_remote_dir_gid != -1)
RETURN(-EPERM);
+
+ /* restripe if later found dir exists */
+ if (le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT)
+ restripe = true;
}
repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
*/
rc = mdt_lookup_version_check(info, parent, &rr->rr_name,
&info->mti_tmp_fid1, 1);
- if (rc == 0)
- GOTO(put_parent, rc = -EEXIST);
+ if (rc == 0) {
+ if (!restripe)
+ GOTO(put_parent, rc = -EEXIST);
+
+ rc = mdt_restripe(info, parent, &rr->rr_name, rr->rr_fid2, spec,
+ ma);
+ }
/* -ENOENT is expected here */
if (rc != -ENOENT)
/* if parent is striped, lookup on corresponding stripe */
struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
+ if (!lmv_is_sane(lmv))
+ return -EBADF;
+
rc = lmv_name_to_stripe_index_old(lmv, lname->ln_name,
lname->ln_namelen);
if (rc < 0)
fid_zero(fid);
rc = mdo_lookup(env, mdt_object_child(stripe), lname, fid,
&info->mti_spec);
- if (rc == -ENOENT &&
- (cpu_to_le32(lmv->lmv_hash_type) &
- LMV_HASH_FLAG_LAYOUT_CHANGE)) {
+ if (rc == -ENOENT && lmv_is_layout_changing(lmv)) {
/*
- * if parent is migrating, and lookup child failed on
- * source stripe, lookup again on target stripe, if it
- * exists, it means previous migration was interrupted,
- * and current file was migrated already.
+ * if parent layout is changeing, and lookup child
+ * failed on source stripe, lookup again on target
+ * stripe, if it exists, it means previous migration
+ * was interrupted, and current file was migrated
+ * already.
*/
mdt_object_put(env, stripe);