Whamcloud - gitweb
LU-11025 dne: directory restripe and auto split
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
index 1b65208..7a53b7a 100644 (file)
@@ -190,10 +190,10 @@ int mdt_version_get_check_save(struct mdt_thread_info *info,
  * This checks version of 'name'. Many reint functions uses 'name' for child not
  * FID, therefore we need to get object by name and check its version.
  */
-static int mdt_lookup_version_check(struct mdt_thread_info *info,
-                                   struct mdt_object *p,
-                                   const struct lu_name *lname,
-                                   struct lu_fid *fid, int idx)
+int mdt_lookup_version_check(struct mdt_thread_info *info,
+                            struct mdt_object *p,
+                            const struct lu_name *lname,
+                            struct lu_fid *fid, int idx)
 {
         int rc, vbrc;
 
@@ -343,25 +343,20 @@ void mdt_reint_striped_unlock(struct mdt_thread_info *info,
 }
 
 static int mdt_restripe(struct mdt_thread_info *info,
-                       struct mdt_object *pobj,
+                       struct mdt_object *parent,
                        const struct lu_name *lname,
                        const struct lu_fid *tfid,
                        struct md_op_spec *spec,
                        struct md_attr *ma)
 {
-       const struct lu_env *env = info->mti_env;
        struct mdt_device *mdt = info->mti_mdt;
-       struct lu_fid *cfid = &info->mti_tmp_fid2;
-       struct lmv_user_md *lum = spec->u.sp_ea.eadata;
-       struct md_layout_change *mlc = &info->mti_mlc;
+       struct lu_fid *fid = &info->mti_tmp_fid2;
        struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
        struct lmv_mds_md_v1 *lmv;
        struct mdt_object *child;
-       struct mdt_object *tobj = NULL;
-       struct mdt_lock_handle *lhp = NULL;
+       struct mdt_lock_handle *lhp;
        struct mdt_lock_handle *lhc;
        struct mdt_body *repbody;
-       u32 lmv_stripe_count = 0;
        int rc;
 
        ENTRY;
@@ -369,47 +364,37 @@ static int mdt_restripe(struct mdt_thread_info *info,
        if (!mdt->mdt_enable_dir_restripe)
                RETURN(-EPERM);
 
-       /* mti_big_lmm is used to save LMV, but it may be uninitialized. */
-       if (unlikely(!info->mti_big_lmm)) {
-               info->mti_big_lmmsize = lmv_mds_md_size(64, LMV_MAGIC);
-               OBD_ALLOC(info->mti_big_lmm, info->mti_big_lmmsize);
-               if (!info->mti_big_lmm)
-                       RETURN(-ENOMEM);
-       }
-
-       rc = mdt_version_get_check_save(info, pobj, 0);
+       rc = mdt_version_get_check_save(info, parent, 0);
        if (rc)
                RETURN(rc);
 
-       ma->ma_lmv = info->mti_big_lmm;
-       ma->ma_lmv_size = info->mti_big_lmmsize;
-       ma->ma_valid = 0;
-       rc = mdt_stripe_get(info, pobj, ma, XATTR_NAME_LMV);
+       lhp = &info->mti_lh[MDT_LH_PARENT];
+       mdt_lock_pdo_init(lhp, LCK_PW, lname);
+       rc = mdt_reint_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE,
+                                  true);
        if (rc)
                RETURN(rc);
 
+       rc = mdt_stripe_get(info, parent, ma, XATTR_NAME_LMV);
+       if (rc)
+               GOTO(unlock_parent, rc);
+
        if (ma->ma_valid & MA_LMV) {
                /* don't allow restripe if parent dir layout is changing */
                lmv = &ma->ma_lmv->lmv_md_v1;
                if (!lmv_is_sane(lmv))
-                       RETURN(-EBADF);
+                       GOTO(unlock_parent, rc = -EBADF);
 
                if (lmv_is_layout_changing(lmv))
-                       RETURN(-EBUSY);
+                       GOTO(unlock_parent, rc = -EBUSY);
        }
 
-       lhp = &info->mti_lh[MDT_LH_PARENT];
-       mdt_lock_pdo_init(lhp, LCK_PW, lname);
-       rc = mdt_reint_object_lock(info, pobj, lhp, MDS_INODELOCK_UPDATE, true);
-       if (rc)
-               RETURN(rc);
-
-       fid_zero(cfid);
-       rc = mdt_lookup_version_check(info, pobj, lname, cfid, 1);
+       fid_zero(fid);
+       rc = mdt_lookup_version_check(info, parent, lname, fid, 1);
        if (rc)
                GOTO(unlock_parent, rc);
 
-       child = mdt_object_find(info->mti_env, mdt, cfid);
+       child = mdt_object_find(info->mti_env, mdt, fid);
        if (IS_ERR(child))
                GOTO(unlock_parent, rc = PTR_ERR(child));
 
@@ -423,7 +408,7 @@ static int mdt_restripe(struct mdt_thread_info *info,
                if (!repbody)
                        GOTO(out_child, rc = -EPROTO);
 
-               repbody->mbo_fid1 = *cfid;
+               repbody->mbo_fid1 = *fid;
                repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
                GOTO(out_child, rc = -EREMOTE);
        }
@@ -433,8 +418,9 @@ static int mdt_restripe(struct mdt_thread_info *info,
        mdt_lock_reg_init(lhc, LCK_EX);
 
        /* enqueue object remote LOOKUP lock */
-       if (mdt_object_remote(pobj)) {
-               rc = mdt_remote_object_lock(info, pobj, cfid, &lhc->mlh_rreg_lh,
+       if (mdt_object_remote(parent)) {
+               rc = mdt_remote_object_lock(info, parent, fid,
+                                           &lhc->mlh_rreg_lh,
                                            lhc->mlh_rreg_mode,
                                            MDS_INODELOCK_LOOKUP, false);
                if (rc != ELDLM_OK)
@@ -451,114 +437,35 @@ static int mdt_restripe(struct mdt_thread_info *info,
        if (rc)
                GOTO(unlock_child, rc);
 
-       ma->ma_valid = 0;
-       rc = mdt_stripe_get(info, child, ma, XATTR_NAME_LMV);
-       if (rc)
-               GOTO(unlock_child, rc);
-
-       if (ma->ma_valid & MA_LMV) {
-               lmv = &ma->ma_lmv->lmv_md_v1;
-               if (!lmv_is_sane(lmv))
-                       GOTO(unlock_child, rc = -EBADF);
-
-               /* don't allow restripe if dir layout is changing */
-               if (lmv_is_layout_changing(lmv))
-                       GOTO(unlock_child, rc = -EBUSY);
-
-               /* check whether stripe count and hash unchanged */
-               if (lum->lum_stripe_count == lmv->lmv_stripe_count &&
-                   lum->lum_hash_type == lmv->lmv_hash_type)
-                       GOTO(unlock_child, rc = -EALREADY);
-
-               lmv_stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
-       } else if (le32_to_cpu(lum->lum_stripe_count) < 2) {
-               /* stripe count unchanged for plain directory */
-               GOTO(unlock_child, rc = -EALREADY);
-       }
-
-       repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
-       if (!repbody)
-               GOTO(unlock_child, rc = -EPROTO);
-
-       if (le32_to_cpu(lum->lum_stripe_count) > lmv_stripe_count) {
-               /* split */
-               ma->ma_need = MA_INODE;
-               ma->ma_valid = 0;
-               rc = mdt_attr_get_complex(info, child, ma);
-               if (rc)
-                       GOTO(unlock_child, rc);
-
-               if (!(ma->ma_valid & MA_INODE))
-                       GOTO(unlock_child, rc = -EBADF);
-
-               if (!lmv_stripe_count) {
-                       /* if child is plain directory, allocate @tobj as the
-                        * master object, and make child the first stripe of
-                        * @tobj.
-                        */
-                       tobj = mdt_object_new(info->mti_env, mdt, tfid);
-                       if (unlikely(IS_ERR(tobj)))
-                               GOTO(unlock_child, rc = PTR_ERR(tobj));
-               }
-
-               mlc->mlc_opc = MD_LAYOUT_SPLIT;
-               mlc->mlc_parent = mdt_object_child(pobj);
-               mlc->mlc_target = tobj ? mdt_object_child(tobj) : NULL;
-               mlc->mlc_attr = &ma->ma_attr;
-               mlc->mlc_name = lname;
-               mlc->mlc_spec = spec;
-               rc = mo_layout_change(env, mdt_object_child(child), mlc);
-               if (rc)
-                       GOTO(out_tobj, rc);
-       } else {
-               /* merge only needs to override LMV */
-               struct lu_buf *buf = &info->mti_buf;
-               __u32 version;
-
-               LASSERT(ma->ma_valid & MA_LMV);
-               lmv = &ma->ma_lmv->lmv_md_v1;
-               version = cpu_to_le32(lmv->lmv_layout_version);
-
-               /* adjust 0 to 1 */
-               if (lum->lum_stripe_count == 0)
-                       lum->lum_stripe_count = cpu_to_le32(1);
-
-               lmv->lmv_hash_type |= cpu_to_le32(LMV_HASH_FLAG_MERGE |
-                                                 LMV_HASH_FLAG_MIGRATION);
-               lmv->lmv_merge_offset = lum->lum_stripe_count;
-               lmv->lmv_merge_hash = lum->lum_hash_type;
-               lmv->lmv_layout_version = cpu_to_le32(++version);
-
-               buf->lb_buf = lmv;
-               buf->lb_len = sizeof(*lmv);
-               rc = mo_xattr_set(env, mdt_object_child(child), buf,
-                                 XATTR_NAME_LMV, LU_XATTR_REPLACE);
-               if (rc)
-                       GOTO(unlock_child, rc);
+       spin_lock(&mdt->mdt_restriper.mdr_lock);
+       if (child->mot_restriping) {
+               /* race? */
+               spin_unlock(&mdt->mdt_restriper.mdr_lock);
+               GOTO(unlock_child, rc = -EBUSY);
        }
+       child->mot_restriping = 1;
+       spin_unlock(&mdt->mdt_restriper.mdr_lock);
 
-       ma->ma_need = MA_INODE;
-       ma->ma_valid = 0;
-       rc = mdt_attr_get_complex(info, tobj ? tobj : child, ma);
+       *fid = *tfid;
+       rc = mdt_restripe_internal(info, parent, child, lname, fid, spec, ma);
        if (rc)
-               GOTO(out_tobj, rc);
+               GOTO(restriping_clear, rc);
 
-       if (!(ma->ma_valid & MA_INODE))
-               GOTO(out_tobj, rc = -EBADF);
+       repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+       if (!repbody)
+               GOTO(restriping_clear, rc = -EPROTO);
 
-       mdt_pack_attr2body(info, repbody, &ma->ma_attr,
-                          mdt_object_fid(tobj ? tobj : child));
+       mdt_pack_attr2body(info, repbody, &ma->ma_attr, fid);
        EXIT;
 
-out_tobj:
-       if (tobj)
-               mdt_object_put(env, tobj);
+restriping_clear:
+       child->mot_restriping = 0;
 unlock_child:
        mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
 out_child:
-       mdt_object_put(env, child);
+       mdt_object_put(info->mti_env, child);
 unlock_parent:
-       mdt_object_unlock(info, pobj, lhp, rc);
+       mdt_object_unlock(info, parent, lhp, rc);
 
        return rc;
 }
@@ -753,13 +660,14 @@ static int mdt_create(struct mdt_thread_info *info)
        if (ma->ma_valid & MA_INODE)
                mdt_pack_attr2body(info, repbody, &ma->ma_attr,
                                   mdt_object_fid(child));
+       EXIT;
 put_child:
        mdt_object_put(info->mti_env, child);
 unlock_parent:
        mdt_object_unlock(info, parent, lh, rc);
 put_parent:
        mdt_object_put(info->mti_env, parent);
-       RETURN(rc);
+       return rc;
 }
 
 static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
@@ -2004,9 +1912,6 @@ static int mdt_migrate_object_lock(struct mdt_thread_info *info,
                if (S_ISDIR(lu_object_attr(&obj->mot_obj))) {
                        struct md_attr *ma = &info->mti_attr;
 
-                       ma->ma_lmv = info->mti_big_lmm;
-                       ma->ma_lmv_size = info->mti_big_lmmsize;
-                       ma->ma_valid = 0;
                        rc = mdt_stripe_get(info, obj, ma, XATTR_NAME_LMV);
                        if (rc) {
                                mdt_object_unlock(info, obj, lh, rc);
@@ -2078,9 +1983,9 @@ static int mdt_migrate_lookup(struct mdt_thread_info *info,
                        /*
                         * if parent layout is changeing, and lookup child
                         * failed on source stripe, lookup again on target
-                        *  stripe, if it exists, it means previous migration
-                        *  was interrupted, and current file was migrated
-                        *  already.
+                        * stripe, if it exists, it means previous migration
+                        * was interrupted, and current file was migrated
+                        * already.
                         */
                        mdt_object_put(env, stripe);
 
@@ -2194,8 +2099,8 @@ close:
  *  9. unlock above locks
  * 10. sync device if source has links
  */
-static int mdt_reint_migrate(struct mdt_thread_info *info,
-                            struct mdt_lock_handle *unused)
+int mdt_reint_migrate(struct mdt_thread_info *info,
+                     struct mdt_lock_handle *unused)
 {
        const struct lu_env *env = info->mti_env;
        struct mdt_device *mdt = info->mti_mdt;
@@ -2239,7 +2144,7 @@ static int mdt_reint_migrate(struct mdt_thread_info *info,
        if (!mdt->mdt_enable_remote_dir || !mdt->mdt_enable_dir_migration)
                RETURN(-EPERM);
 
-       if (!md_capable(uc, CFS_CAP_SYS_ADMIN) &&
+       if (uc && !md_capable(uc, CFS_CAP_SYS_ADMIN) &&
            uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
            mdt->mdt_enable_remote_dir_gid != -1)
                RETURN(-EPERM);
@@ -2249,8 +2154,10 @@ static int mdt_reint_migrate(struct mdt_thread_info *info,
         * if other MDT holds rename lock, but being blocked to wait for
         * this MDT to finish its recovery, and the failover MDT can not
         * get rename lock, which will cause deadlock.
+        *
+        * req is NULL if this is called by directory auto-split.
         */
-       if (!req_is_replay(req)) {
+       if (req && !req_is_replay(req)) {
                rc = mdt_rename_lock(info, &rename_lh);
                if (rc != 0) {
                        CERROR("%s: can't lock FS for rename: rc = %d\n",
@@ -2260,20 +2167,22 @@ static int mdt_reint_migrate(struct mdt_thread_info *info,
        }
 
        /* pobj is master object of parent */
-       pobj = mdt_parent_find_check(info, rr->rr_fid1, 0);
+       pobj = mdt_object_find(env, mdt, rr->rr_fid1);
        if (IS_ERR(pobj))
                GOTO(unlock_rename, rc = PTR_ERR(pobj));
 
-       if (unlikely(!info->mti_big_lmm)) {
-               info->mti_big_lmmsize = lmv_mds_md_size(64, LMV_MAGIC);
-               OBD_ALLOC(info->mti_big_lmm, info->mti_big_lmmsize);
-               if (!info->mti_big_lmm)
-                       GOTO(put_parent, rc = -ENOMEM);
+       if (req) {
+               rc = mdt_version_get_check(info, pobj, 0);
+               if (rc)
+                       GOTO(put_parent, rc);
        }
 
-       ma->ma_lmv = info->mti_big_lmm;
-       ma->ma_lmv_size = info->mti_big_lmmsize;
-       ma->ma_valid = 0;
+       if (!mdt_object_exists(pobj))
+               GOTO(put_parent, rc = -ENOENT);
+
+       if (!S_ISDIR(lu_object_attr(&pobj->mot_obj)))
+               GOTO(put_parent, rc = -ENOTDIR);
+
        rc = mdt_stripe_get(info, pobj, ma, XATTR_NAME_LMV);
        if (rc)
                GOTO(put_parent, rc);
@@ -2315,17 +2224,14 @@ lock_parent:
         */
        do_sync = rc;
 
-       /* TODO: DoM migration is not supported yet */
+       /* TODO: DoM migration is not supported, migrate dirent only */
        if (S_ISREG(lu_object_attr(&sobj->mot_obj))) {
-               ma->ma_lmm = info->mti_big_lmm;
-               ma->ma_lmm_size = info->mti_big_lmmsize;
-               ma->ma_valid = 0;
                rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LOV);
                if (rc)
-                       GOTO(put_source, rc);
+                       GOTO(unlock_links, rc);
 
                if (ma->ma_valid & MA_LOV && mdt_lmm_dom_stripesize(ma->ma_lmm))
-                       GOTO(put_source, rc = -EOPNOTSUPP);
+                       info->mti_spec.sp_migrate_nsonly = 1;
        }
 
        /* if migration HSM is allowed */
@@ -2383,7 +2289,8 @@ lock_parent:
                         mdt_object_child(tobj),
                         &info->mti_spec, ma);
        if (!rc)
-               mdt_counter_incr(req, LPROC_MDT_MIGRATE);
+               lprocfs_counter_incr(mdt->mdt_lu_dev.ld_obd->obd_md_stats,
+                                    LPROC_MDT_MIGRATE + LPROC_MD_LAST_OPC);
        EXIT;
 
        mdt_object_unlock(info, tobj, lht, rc);
@@ -2490,10 +2397,6 @@ static int mdt_rename_determine_lock_order(struct mdt_thread_info *info,
                return 0;
 
        /* check whether sobj and tobj are sibling stripes */
-       ma->ma_need = MA_LMV;
-       ma->ma_valid = 0;
-       ma->ma_lmv = (union lmv_mds_md *)info->mti_xattr_buf;
-       ma->ma_lmv_size = sizeof(info->mti_xattr_buf);
        rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
        if (rc)
                return rc;