Whamcloud - gitweb
LU-11025 dne: support directory restripe
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
index c978a46..7ebf57d 100644 (file)
@@ -168,9 +168,9 @@ static int __mdd_links_read(const struct lu_env *env,
        return linkea_init(ldata);
 }
 
-static int mdd_links_read(const struct lu_env *env,
-                         struct mdd_object *mdd_obj,
-                         struct linkea_data *ldata)
+int mdd_links_read(const struct lu_env *env,
+                  struct mdd_object *mdd_obj,
+                  struct linkea_data *ldata)
 {
        int rc;
 
@@ -4040,6 +4040,10 @@ static int mdd_migrate_create(const struct lu_env *env,
        RETURN(rc);
 }
 
+/* NB: if user issued different migrate command, we can't ajust it silently
+ * here, because this command will decide target MDT in subdir migration in
+ * LMV.
+ */
 static int mdd_migrate_cmd_check(struct mdd_device *mdd,
                                 const struct lmv_mds_md_v1 *lmv,
                                 const struct lmv_user_md_v1 *lum,
@@ -4136,12 +4140,14 @@ static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
 
        lmv = pbuf.lb_buf;
        if (lmv) {
-               __u32 hash_type = le32_to_cpu(lmv->lmv_hash_type);
                int index;
 
+               if (!lmv_is_sane(lmv))
+                       GOTO(out, rc = -EBADF);
+
                /* locate target parent stripe */
                /* fail check here to make sure top dir migration succeed. */
-               if ((hash_type & LMV_HASH_FLAG_MIGRATION) &&
+               if (lmv_is_migrating(lmv) &&
                    OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_ENTRIES, 0))
                        GOTO(out, rc = -EIO);
 
@@ -4156,7 +4162,7 @@ static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
                        GOTO(out, rc = PTR_ERR(tpobj));
 
                /* locate source parent stripe */
-               if (hash_type & LMV_HASH_FLAG_LAYOUT_CHANGE) {
+               if (lmv_is_layout_changing(lmv)) {
                        index = lmv_name_to_stripe_index_old(lmv,
                                                             lname->ln_name,
                                                             lname->ln_namelen);
@@ -4167,6 +4173,15 @@ static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
                        spobj = mdd_object_find(env, mdd, fid);
                        if (IS_ERR(spobj))
                                GOTO(out, rc = PTR_ERR(spobj));
+
+                       /* parent stripe unchanged */
+                       if (spobj == tpobj) {
+                               if (!lmv_is_restriping(lmv))
+                                       GOTO(out, rc = -EINVAL);
+                               GOTO(out, rc = -EALREADY);
+                       }
+                       if (S_ISDIR(attr->la_mode))
+                               nsonly = spec->sp_migrate_nsonly;
                } else {
                        spobj = tpobj;
                        mdd_object_get(spobj);
@@ -4186,7 +4201,7 @@ static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
        if (rc)
                GOTO(out, rc);
 
-       if (S_ISDIR(attr->la_mode)) {
+       if (S_ISDIR(attr->la_mode) && !nsonly) {
                struct lmv_user_md_v1 *lum = spec->u.sp_ea.eadata;
 
                LASSERT(lum);
@@ -4202,13 +4217,16 @@ static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
                        GOTO(out, rc);
 
                lmv = sbuf.lb_buf;
-               if (lmv &&
-                   (le32_to_cpu(lmv->lmv_hash_type) &
-                    LMV_HASH_FLAG_MIGRATION)) {
-                       rc = mdd_migrate_cmd_check(mdd, lmv, lum, lname);
-                       GOTO(out, rc);
+               if (lmv) {
+                       if (!lmv_is_sane(lmv))
+                               GOTO(out, rc = -EBADF);
+                       if (lmv_is_migrating(lmv)) {
+                               rc = mdd_migrate_cmd_check(mdd, lmv, lum,
+                                                          lname);
+                               GOTO(out, rc);
+                       }
                }
-       } else {
+       } else if (!S_ISDIR(attr->la_mode)) {
                if (spobj == tpobj)
                        GOTO(out, rc = -EALREADY);
 
@@ -4476,20 +4494,23 @@ int mdd_dir_layout_shrink(const struct lu_env *env,
                RETURN(rc);
 
        lmv = lmv_buf.lb_buf;
+       if (!lmv_is_sane(lmv))
+               RETURN(-EBADF);
+
        lmu = mlc->mlc_buf.lb_buf;
 
        /* adjust the default value '0' to '1' */
        if (lmu->lum_stripe_count == 0)
                lmu->lum_stripe_count = cpu_to_le32(1);
 
-       /* this was checked in MDT */
+       /* these were checked in MDT */
        LASSERT(le32_to_cpu(lmu->lum_stripe_count) <
                le32_to_cpu(lmv->lmv_stripe_count));
+       LASSERT(!lmv_is_splitting(lmv));
+       LASSERT(lmv_is_migrating(lmv) || lmv_is_merging(lmv));
 
-       /*
-        * if obj stripe count will be shrunk to 1, we need to convert it to a
-        * normal dir, which will change its fid and update parent namespace,
-        * get obj name and parent fid from linkea.
+       /* if dir stripe count will be shrunk to 1, it needs to be transformed
+        * to a plain dir, which will cause FID change and namespace update.
         */
        if (le32_to_cpu(lmu->lum_stripe_count) == 1) {
                struct linkea_data *ldata = &info->mti_link_data;
@@ -4585,6 +4606,257 @@ out:
        return rc;
 }
 
+static int mdd_dir_declare_split_plain(const struct lu_env *env,
+                                       struct mdd_device *mdd,
+                                       struct mdd_object *pobj,
+                                       struct mdd_object *obj,
+                                       struct mdd_object *tobj,
+                                       struct md_layout_change *mlc,
+                                       struct dt_allocation_hint *hint,
+                                       struct thandle *handle)
+{
+       struct mdd_thread_info *info = mdd_env_info(env);
+       const struct lu_name *lname = mlc->mlc_name;
+       struct lu_attr *la = &info->mti_la_for_fix;
+       struct lmv_user_md_v1 *lum = mlc->mlc_spec->u.sp_ea.eadata;
+       struct linkea_data *ldata = &info->mti_link_data;
+       struct lmv_mds_md_v1 *lmv;
+       __u32 count;
+       int rc;
+
+       mlc->mlc_opc = MD_LAYOUT_DETACH;
+       rc = mdo_declare_layout_change(env, obj, mlc, handle);
+       if (rc)
+               return rc;
+
+       memset(ldata, 0, sizeof(*ldata));
+       rc = mdd_linkea_prepare(env, obj, NULL, NULL, mdd_object_fid(pobj),
+                               lname, 1, 0, ldata);
+       if (rc)
+               return rc;
+
+       count = lum->lum_stripe_count;
+       lum->lum_stripe_count = 0;
+       mdd_object_make_hint(env, pobj, tobj, mlc->mlc_attr, mlc->mlc_spec,
+                            hint);
+       rc = mdd_declare_create(env, mdo2mdd(&pobj->mod_obj), pobj, tobj,
+                               lname, mlc->mlc_attr, handle, mlc->mlc_spec,
+                               ldata, NULL, NULL, NULL, hint);
+       if (rc)
+               return rc;
+
+       /* tobj mode will be used in lod_declare_xattr_set(), but it's not
+        * createb yet.
+        */
+       tobj->mod_obj.mo_lu.lo_header->loh_attr |= S_IFDIR;
+
+       lmv = (typeof(lmv))info->mti_key;
+       memset(lmv, 0, sizeof(*lmv));
+       lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
+       lmv->lmv_stripe_count = cpu_to_le32(1);
+       lmv->lmv_hash_type = cpu_to_le32(LMV_HASH_TYPE_DEFAULT);
+       fid_le_to_cpu(&lmv->lmv_stripe_fids[0], mdd_object_fid(obj));
+
+       mlc->mlc_opc = MD_LAYOUT_ATTACH;
+       mlc->mlc_buf.lb_buf = lmv;
+       mlc->mlc_buf.lb_len = lmv_mds_md_size(1, LMV_MAGIC_V1);
+       rc = mdo_declare_layout_change(env, tobj, mlc, handle);
+       if (rc)
+               return rc;
+
+       rc = mdd_iterate_xattrs(env, obj, tobj, true, handle,
+                               mdo_declare_xattr_set);
+       if (rc)
+               return rc;
+
+       lum->lum_stripe_count = count;
+       mlc->mlc_opc = MD_LAYOUT_SPLIT;
+       rc = mdo_declare_layout_change(env, tobj, mlc, handle);
+       if (rc)
+               return rc;
+
+       rc = mdo_declare_index_delete(env, pobj, lname->ln_name, handle);
+       if (rc)
+               return rc;
+
+       rc = mdo_declare_index_insert(env, pobj, mdd_object_fid(tobj),
+                                     S_IFDIR, lname->ln_name, handle);
+       if (rc)
+               return rc;
+
+       la->la_valid = LA_CTIME | LA_MTIME;
+       rc = mdo_declare_attr_set(env, obj, la, handle);
+       if (rc)
+               return rc;
+
+       rc = mdo_declare_attr_set(env, pobj, la, handle);
+       if (rc)
+               return rc;
+
+       rc = mdd_declare_changelog_store(env, mdd, CL_MIGRATE, lname, NULL,
+                                        handle);
+       return rc;
+}
+
+/**
+ * plain directory split:
+ * 1. create \a tobj as plain directory.
+ * 2. append \a obj as first stripe of \a tobj.
+ * 3. migrate xattrs from \a obj to \a tobj.
+ * 4. split \a tobj to specific stripe count.
+ */
+static int mdd_dir_split_plain(const struct lu_env *env,
+                               struct mdd_device *mdd,
+                               struct mdd_object *pobj,
+                               struct mdd_object *obj,
+                               struct mdd_object *tobj,
+                               struct md_layout_change *mlc,
+                               struct dt_allocation_hint *hint,
+                               struct thandle *handle)
+{
+       struct mdd_thread_info *info = mdd_env_info(env);
+       struct lu_attr *pattr = &info->mti_pattr;
+       struct lu_attr *la = &info->mti_la_for_fix;
+       const struct lu_name *lname = mlc->mlc_name;
+       struct linkea_data *ldata = &info->mti_link_data;
+       int rc;
+
+       ENTRY;
+
+       /* copy linkea out and set on target later */
+       rc = mdd_links_read(env, obj, ldata);
+       if (rc)
+               RETURN(rc);
+
+       mlc->mlc_opc = MD_LAYOUT_DETACH;
+       rc = mdo_layout_change(env, obj, mlc, handle);
+       if (rc)
+               RETURN(rc);
+
+       /* don't set nlink from obj */
+       mlc->mlc_attr->la_valid &= ~LA_NLINK;
+
+       rc = mdd_create_object(env, pobj, tobj, mlc->mlc_attr, mlc->mlc_spec,
+                              NULL, NULL, NULL, hint, handle, false);
+       if (rc)
+               RETURN(rc);
+
+       rc = mdd_iterate_xattrs(env, obj, tobj, true, handle, mdo_xattr_set);
+       if (rc)
+               RETURN(rc);
+
+       rc = mdd_links_write(env, tobj, ldata, handle);
+       if (rc)
+               RETURN(rc);
+
+       rc = __mdd_index_delete(env, pobj, lname->ln_name, true, handle);
+       if (rc)
+               RETURN(rc);
+
+       rc = __mdd_index_insert(env, pobj, mdd_object_fid(tobj), S_IFDIR,
+                               lname->ln_name, handle);
+       if (rc)
+               RETURN(rc);
+
+       la->la_ctime = la->la_mtime = mlc->mlc_attr->la_mtime;
+       la->la_valid = LA_CTIME | LA_MTIME;
+
+       mdd_write_lock(env, obj, DT_SRC_CHILD);
+       rc = mdd_update_time(env, tobj, mlc->mlc_attr, la, handle);
+       mdd_write_unlock(env, obj);
+       if (rc)
+               RETURN(rc);
+
+       rc = mdd_la_get(env, pobj, pattr);
+       if (rc)
+               RETURN(rc);
+
+       la->la_valid = LA_CTIME | LA_MTIME;
+
+       mdd_write_lock(env, pobj, DT_SRC_PARENT);
+       rc = mdd_update_time(env, pobj, pattr, la, handle);
+       mdd_write_unlock(env, pobj);
+       if (rc)
+               RETURN(rc);
+
+       /* FID changes, record it as CL_MIGRATE */
+       rc = mdd_changelog_ns_store(env, mdd, CL_MIGRATE, 0, tobj,
+                                   mdd_object_fid(pobj), mdd_object_fid(obj),
+                                   mdd_object_fid(pobj), lname, lname, handle);
+       RETURN(rc);
+}
+
+int mdd_dir_layout_split(const struct lu_env *env, struct md_object *o,
+                        struct md_layout_change *mlc)
+{
+       struct mdd_thread_info *info = mdd_env_info(env);
+       struct mdd_device *mdd = mdo2mdd(o);
+       struct mdd_object *obj = md2mdd_obj(o);
+       struct mdd_object *pobj = md2mdd_obj(mlc->mlc_parent);
+       struct mdd_object *tobj = md2mdd_obj(mlc->mlc_target);
+       struct dt_allocation_hint *hint = &info->mti_hint;
+       bool is_plain = false;
+       struct thandle *handle;
+       int rc;
+
+       ENTRY;
+
+       LASSERT(S_ISDIR(mdd_object_type(obj)));
+
+       rc = mdo_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LMV);
+       if (rc == -ENODATA)
+               is_plain = true;
+       else if (rc < 0)
+               RETURN(rc);
+
+       handle = mdd_trans_create(env, mdd);
+       if (IS_ERR(handle))
+               RETURN(PTR_ERR(handle));
+
+       if (is_plain) {
+               rc = mdd_dir_declare_split_plain(env, mdd, pobj, obj, tobj, mlc,
+                                                hint, handle);
+       } else {
+               mlc->mlc_opc = MD_LAYOUT_SPLIT;
+               rc = mdo_declare_layout_change(env, obj, mlc, handle);
+               if (rc)
+                       GOTO(stop_trans, rc);
+
+               rc = mdd_declare_changelog_store(env, mdd, CL_LAYOUT, NULL,
+                                                NULL, handle);
+       }
+       if (rc)
+               GOTO(stop_trans, rc);
+
+       rc = mdd_trans_start(env, mdd, handle);
+       if (rc)
+               GOTO(stop_trans, rc);
+
+       if (is_plain) {
+               rc = mdd_dir_split_plain(env, mdd, pobj, obj, tobj, mlc, hint,
+                                        handle);
+       } else {
+               mdd_write_lock(env, obj, DT_TGT_CHILD);
+               rc = mdo_xattr_set(env, obj, NULL, XATTR_NAME_LMV,
+                                  LU_XATTR_CREATE, handle);
+               mdd_write_unlock(env, obj);
+               if (rc)
+                       GOTO(stop_trans, rc);
+
+               rc = mdd_changelog_data_store_xattr(env, mdd, CL_LAYOUT, 0, obj,
+                                                   XATTR_NAME_LMV, handle);
+       }
+       if (rc)
+               GOTO(stop_trans, rc);
+
+       EXIT;
+
+stop_trans:
+       rc = mdd_trans_stop(env, mdd, rc, handle);
+
+       return rc;
+}
+
 const struct md_dir_operations mdd_dir_ops = {
        .mdo_is_subdir     = mdd_is_subdir,
        .mdo_lookup        = mdd_lookup,